We will use the pubsub Golang library - https://pkg.go.dev/cloud.google.com/go/pubsub which uses ClearBlade provides a native integration for high-performance interactions with Google Pub/Sub. This is ideal for sending MQTT messages into Google Cloud services or integrating data from other Google Cloud services directly with your ClearBlade application.
Publish
ClearBlade uses the Pub/Sub Golang library, which leverages a persistent TCP connection under the covers.
A new table for adding PubSub connection details and topics: google_pubsub
name
- string column PK
system_key
- string column
project_id
- string column
credentials
- string column (encrypted. Will contain the credentials JSON file)
topics
- jsonb column (array of PubSub topics)
There will be endpoints for CRUD. (Details will be added later)
The topics entered during PubSub Pub/Sub connection creation will be added as subscribers to the topic tree. The PubSub Pub/Sub connection will implement the client
interface in core_messaging/broker/client_manager.go. We will use Use the Publish
func to forward messages to Google function to forward messages.
Endpoints:
/admin/{systemKey}/{name}/google-integrations/pub-sub
GET, POST, PUT, and DELETE support
POST:
Body required. The body can have IoT Core and non-IoT Core topics. type
and target
are required for IoT Core topics. type
can be 1 for events
and 2 for state
:
Code Block |
---|
{
"credentials_map": { // This is the service account JSON object
"type": "service_account",
"project_id": "myprojectid",
"private_key_id": "blahblahblah",
"private_key": "<priv_key>",
"client_email": "a@a.com",
"client_id": "9126871624161341",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/dahjkdhajkd%40dajsdgajdadsad.iam.gserviceaccount.com"
},
"project_id": "myprojectid",
"topics": [ // This is to send data to Google Pub/Sub
{"path": "iotdemo"}, // Non IoT Core topic
{"path": "/devices/+/events", "type": 1, "target": "performance-testing"}, // IoT Core topic
{"path": "/devices/+/events/sub", "type": 1, "target": "performance-testing-sub", "sub_folder": "sub"}, // IoT Core topic
{"path": "/devices/+/state", "type": 2, "target": "performance-testing-state"} // IoT Core topic
],
"forwarders": { // This is to receive Google Pub/Sub messages
"ack_strategy": 0,
"poll_rate": 5000, //Deprecated after 9.36.x (ignored if included)
"max_messages_per_poll": 2, //Deprecated after 9.36.x (ignored if included)
"subscriptions": [
{
"mqtt_topic": "pubsub",
"pubsub_subscription": "test-sub"
}
]
}
} |
path
is the MQTT topic on the ClearBlade broker that the Pub/Sub client will subscribe to internally. For Non IoT Core integrations, path must match the Google Pub/Sub topic being published to exactly. Hence, in this case path cannot contain wildcards (+, #).type
is the topic type. 1 is for event
and 2 is for state
. For Non IoT Core integrations, type is not used.target
is the Google Pub/Sub topic the Pub/Sub client will publish to. sub_folder
is for the event's topic if it has a subfolder. For Non IoT Core integrations, target is not used.
GET:
No query support. Admin or Dev.
Returns the same object as the POST body.
/admin/{systemKey}/google-integrations/pub-sub
GET and DELETE support only. GET obtains all Pub/Sub connections for that system, and DELETE deletes all.
Receiving data
The following parameters exist for setting up your Google Pub/Sub forwarders to ClearBlade MQTT:
ack_strategy: 0 = ack immediately, 1 = never ack. For now, 0 should always be used.
poll_rate: Time (ms) between sending message batches to the broker. 0 will send messages immediately (deprecated after 9.36.x - ignored if included)
max_messages_per_poll: Max number of messages to send in a single batch to the broker. Implicit minimum value of 1 (deprecated after 9.36.x - ignored if included)
Messages are received by ClearBlade as soon as possible from Google. ClearBlade will buffer them internally and send them in batches as defined above.