Streaming MQTT Connectors
ClearBlade’s streaming MQTT Connectors along for setting standard bi-directional mappings between data flowing over ClearBlade Brokered MQTT topics to Third party cloud streaming services and middleware.
This is ideal for sending MQTT messages into your cloud services or integrating data from other services directly with your ClearBlade application.
Currently supporting third party streaming offerings are
Google Pub/Sub - supports offline caching up to 10k messages
AWS Kinesis
Apache Kafka
Apache Pulsar
Technical Overview
To create a stream MQTT connect all that is required is to make a REST API call to the mqtt-connector endpoints and the system will immediately began honoring the configured mapping. ClearBlade recommends making this REST API call in a microservice that is triggered to run at IoT Enterprise environment start.
Environment Setup
Generally MQTT Connectors are designed to be started as the IoT Enterprise comes online and begins to allow device connections. When the service runs at start up it will make an HTTP call to the platform The recommended way to implement this is
Create a new Code Service of type micro-service
Include the http library
With the service created click the trigger tab and create new trigger that runs the service on startup of the IoT Enterprise environment.
Add the following source code into the body of your service
Click save
The micro-service will now run each time the any time the IoT Enterprise environment is up and accepting connections.
Next: Follow the connector setup.
Connector Setup
The triggered microservice will need to make an API call to the IoT Enterprise platform with the necessary configuration.
API:
Create
URI: /api/v/1/mqtt-connectors/{systemmKey}/name
Method: POST
Headers: 'ClearBlade-UserToken': <userToken>
Body: { "type": <string>, "credentials": <object>, "config": <object> }
Update
URI: /api/v/1/mqtt-connectors/{systemmKey}/name
Method: PUT
Headers: 'ClearBlade-UserToken': <userToken>
Body: { "type": <string>, "credentials": <object>, "config": <object> }
Get
URI: /api/v/1/mqtt-connectors/{systemmKey}/name
Method: PUT
Headers: 'ClearBlade-UserToken': <userToken>
Body:
Get All
URI: /api/v/1/mqtt-connectors/{systemmKey}/
Method: GET
Headers: 'ClearBlade-UserToken': <userToken>
Body:
Delete
URI: /api/v/1/mqtt-connectors/{systemmKey}/name
Method: DELETE
Headers: 'ClearBlade-UserToken': <userToken>
Body:
Sample Code
Ex: A sample microservice to create a new MQTT Connector Configuration
function StartMQTTConnector(req, resp) {
const params = req.params;
const url = cbmeta.platform_url+"/api/v/1/mqtt-connectors/req.systemKey/{name}";
const connectorConfig = {
"type": "pubsub", // pubsub, kinesis, kafka, pulsar
"credentials": {}, // specific to each type of connector
"config": {} // populate according to the connector
}
var options = {
"uri":cbmeta.platform_url,
"body": connectorConfig,
"headers": {
"Content-Type" : "application/json",
'ClearBlade-UserToken': req.userToken
},
}
var requestObject = Requests();
// POST to create the MQTT connector
requestObject.post(options,function(err,body){
if(err){
//the error is a JSON value of the error in question, shaped like {"error":"message"}
resp.error("Failed to create MQTT Connector:" +err);
}else{
//body is JSON of the response
resp.success("Created MQTT Connector: "+body)
}
});
}
Connector Configs
Google Pubsub
Credential
"credentials": { // This is the service account JSON object
"type": "service_account",
"project_id": "myprojectid",
"private_key_id": "blahblahblah",
"private_key": "<priv_key>",
"client_email": "a@a.com",
"client_id": "9126871624161341",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/dahjkdhajkd%40dajsdgajdadsad.iam.gserviceaccount.com"
},
Config
{
"topic_mappings": [
{
"pubsub_topic": "my-topic",
"outgoing": "foo/+/bar",
"type": 0,
"sub_folder": ""
},
{
"pubsub_topic": "my-subscription",
"incoming": "foo/bar/baz",
"type": 1,
"sub_folder": "bob"
}
]
}
For Non IoT Core integrations, type is not used.
The following parameters exist for setting up your Google Pub/Sub forwarders to ClearBlade MQTT:
ack_strategy: 0 = ack immediately, 1 = never ack. For now, 0 should always be used.
poll_rate: Time (ms) between sending message batches to the broker. 0 will send messages immediately (deprecated after 9.36.x - ignored if included)
max_messages_per_poll: Max number of messages to send in a single batch to the broker. Implicit minimum value of 1 (deprecated after 9.36.x - ignored if included)
Messages are received by ClearBlade as soon as possible from Google. ClearBlade will buffer them internally and send them in batches as defined above.
AWS Kinesis
Credentials Object
{
"access_key": <string>,
"secret": <string>,
"region": <string>
}
Config Object:
{
"topic_mappings": {
<stream_name::string>: {
"outgoing": <outgoing_mqtt_topic::string>,
"incoming": <incoming_mqtt_topic::string>
}
},
"partition_key": <string>
}
All fields required.
stream_name
is kinesis stream name.
Data published on theoutgoing_mqtt_topic
will be sent to the respective kinesis stream. This topic can have wildcards and it can be a shared topic.
Data received from the kinesis stream will be published on the incoming_mqtt_topic
. This topic cannot contain wildcards or be a shared topic.
partition_key
specifies the type of partition key used when putting records on the kinesis stream. The partition key has three types:
clientid
: The MQTT client ID of the publisher will be used as the partition keytopic
: The MQTT topic path will be used as the partition keyrandom
: A randomly generated string will be used as the partition key
Apache Pulsar
Credentials object:
{
"url": <string>,
"tls_certificate": <string>,
"tls_key": <string>,
"tls_trusted_certificate": <string>
}
url field is required, TLS fields are optional.
Config object:
{
"outgoing": [{
"pulsar_publish_topic": <string>,
"mqtt_subscribe_topic": <string>,
"key": <string>,
"ordering_key": <string>,
"hashing_scheme": <int>,
"compression_type": <int>,
"compression_level": <int>,
"router_type": <int>
}],
"incoming": [{
"pulsar_subscribe_topic_pattern": <string>,
"mqtt_publish_topic": <string>,
"subscription_name": <string>,
"subscription_type": <int>
}]
}
Outgoing config options:
pulsar_publish_topic and mqtt_subscribe_topic fields are required.
hashing_scheme: 0 (default) = Java string hash, 1 = Murmur3 hash
compression_type: 0 (default) = no compression, 1 = LZ4, 2 = zlib, 3 = zstd
compression_level: 0 (default) = standard, 1 = faster, 2 = better
router_type: 0 (default) = round robin, 1 = single partition
Incoming config options:
pulsar_subscribe_topic_pattern, mqtt_publish_topic, and subscription_name are required.
subscription_type: 0 (default) = exclusive, 1 = shared, 2 = failover, 3 = key shared
! Note ! exclusive subscriptions will cause errors in clusters, as each cluster node will attempt to establish an exclusive subscription. Either shared or key shared is recommended in clusters.
Apache Kafka
Kafka credentials and config
Credentials object:
{
"seed_brokers": [<string>],
"tls": <bool>
"auth_mechanism": <string>,
"auth_params": <object>
},
At least one seed broker url required. auth_mechanism required. If auth_mechanism != none, auth_params is required.
auth_mechanism can be one of: none, plain, oauth, aws, scram256, scram512.
auth_params:
plain: {
Zid: <string>, // optional authorization id
User: <string>,
Pass: <string>
}
oauth: {
Zid: <string>, // optional authorization id
Token: <string>,
Extensions: <object>
}
aws: {
AccessKey: <string>,
SecretKey: <string>,
SessionToken: <string>, // optional
UserAgent: <string> // optional
}
scram256/scram512: {
Zid: <string>, // optional authorization id
User: <string>,
Pass: <string>,
Nonce: <[byte]>, // optional
IsToken: <bool>, // set if user/pass are from a delegation token
}
Config object:
{
"topic_mappings": {
<string>: {
"incoming": <string>,
"outgoing": <string>
},
...
}
}
The topic_mappings object keys are Kafka topic names. The incoming and outgoing fields are both MQTT topics. You must provide either an incoming or outgoing MQTT topic (or both) for every Kafka topic.