/
Streaming MQTT Connectors

Streaming MQTT Connectors

ClearBlade’s streaming MQTT Connectors along for setting standard bi-directional mappings between data flowing over ClearBlade Brokered MQTT topics to Third party cloud streaming services and middleware.

This is ideal for sending MQTT messages into your cloud services or integrating data from other services directly with your ClearBlade application.

Currently supporting third party streaming offerings are

  • Google Pub/Sub - supports offline caching up to 10k messages

  • AWS Kinesis

  • Apache Kafka

  • Apache Pulsar

Technical Overview

To create a stream MQTT connect all that is required is to make a REST API call to the mqtt-connector endpoints and the system will immediately began honoring the configured mapping. ClearBlade recommends making this REST API call in a microservice that is triggered to run at IoT Enterprise environment start.

Environment Setup

Generally MQTT Connectors are designed to be started as the IoT Enterprise comes online and begins to allow device connections. When the service runs at start up it will make an HTTP call to the platform The recommended way to implement this is

  1. Create a new Code Service of type micro-service

  2. Include the http library

  3. With the service created click the trigger tab and create new trigger that runs the service on startup of the IoT Enterprise environment.

  4. Add the following source code into the body of your service

  5. Click save

The micro-service will now run each time the any time the IoT Enterprise environment is up and accepting connections.

Next: Follow the connector setup.

Connector Setup

The triggered microservice will need to make an API call to the IoT Enterprise platform with the necessary configuration.

API:

Create

URI: /api/v/1/mqtt-connectors/{systemmKey}/name

Method: POST

Headers: 'ClearBlade-UserToken': <userToken>

Body: { "type": <string>, "credentials": <object>, "config": <object> }

 

Update

URI: /api/v/1/mqtt-connectors/{systemmKey}/name

Method: PUT

Headers: 'ClearBlade-UserToken': <userToken>

Body: { "type": <string>, "credentials": <object>, "config": <object> }

Get

URI: /api/v/1/mqtt-connectors/{systemmKey}/name

Method: PUT

Headers: 'ClearBlade-UserToken': <userToken>

Body:

Get All

URI: /api/v/1/mqtt-connectors/{systemmKey}/

Method: GET

Headers: 'ClearBlade-UserToken': <userToken>

Body:

Delete

URI: /api/v/1/mqtt-connectors/{systemmKey}/name

Method: DELETE

Headers: 'ClearBlade-UserToken': <userToken>

Body:

Sample Code

Ex: A sample microservice to create a new MQTT Connector Configuration

function StartMQTTConnector(req, resp) { const params = req.params; const url = cbmeta.platform_url+"/api/v/1/mqtt-connectors/req.systemKey/{name}"; const connectorConfig = { "type": "pubsub", // pubsub, kinesis, kafka, pulsar "credentials": {}, // specific to each type of connector "config": {} // populate according to the connector } var options = { "uri":cbmeta.platform_url, "body": connectorConfig, "headers": { "Content-Type" : "application/json", 'ClearBlade-UserToken': req.userToken }, } var requestObject = Requests(); // POST to create the MQTT connector requestObject.post(options,function(err,body){ if(err){ //the error is a JSON value of the error in question, shaped like {"error":"message"} resp.error("Failed to create MQTT Connector:" +err); }else{ //body is JSON of the response resp.success("Created MQTT Connector: "+body) } }); }

 

Connector Configs

Google Pubsub

Credential

"credentials": { // This is the service account JSON object "type": "service_account", "project_id": "myprojectid", "private_key_id": "blahblahblah", "private_key": "<priv_key>", "client_email": "a@a.com", "client_id": "9126871624161341", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/dahjkdhajkd%40dajsdgajdadsad.iam.gserviceaccount.com" },

Config

{ "topic_mappings": [ { "pubsub_topic": "my-topic", "outgoing": "foo/+/bar", "type": 0, "sub_folder": "" }, { "pubsub_topic": "my-subscription", "incoming": "foo/bar/baz", "type": 1, "sub_folder": "bob" } ] }


For Non IoT Core integrations, type is not used.

The following parameters exist for setting up your Google Pub/Sub forwarders to ClearBlade MQTT:

ack_strategy: 0 = ack immediately, 1 = never ack. For now, 0 should always be used.

poll_rate: Time (ms) between sending message batches to the broker. 0 will send messages immediately (deprecated after 9.36.x - ignored if included)

max_messages_per_poll: Max number of messages to send in a single batch to the broker. Implicit minimum value of 1 (deprecated after 9.36.x - ignored if included)

Messages are received by ClearBlade as soon as possible from Google. ClearBlade will buffer them internally and send them in batches as defined above.

AWS Kinesis

Credentials Object

{ "access_key": <string>, "secret": <string>, "region": <string> }

 

Config Object:

{ "topic_mappings": { <stream_name::string>: { "outgoing": <outgoing_mqtt_topic::string>, "incoming": <incoming_mqtt_topic::string> } }, "partition_key": <string> }

All fields required.

stream_name is kinesis stream name.

Data published on theoutgoing_mqtt_topic will be sent to the respective kinesis stream. This topic can have wildcards and it can be a shared topic.

Data received from the kinesis stream will be published on the incoming_mqtt_topic. This topic cannot contain wildcards or be a shared topic.

partition_key specifies the type of partition key used when putting records on the kinesis stream. The partition key has three types:

  • clientid: The MQTT client ID of the publisher will be used as the partition key

  • topic: The MQTT topic path will be used as the partition key

  • random: A randomly generated string will be used as the partition key

Apache Pulsar

Credentials object:

{ "url": <string>, "tls_certificate": <string>, "tls_key": <string>, "tls_trusted_certificate": <string> }

url field is required, TLS fields are optional.

Config object:

{ "outgoing": [{ "pulsar_publish_topic": <string>, "mqtt_subscribe_topic": <string>, "key": <string>, "ordering_key": <string>, "hashing_scheme": <int>, "compression_type": <int>, "compression_level": <int>, "router_type": <int> }], "incoming": [{ "pulsar_subscribe_topic_pattern": <string>, "mqtt_publish_topic": <string>, "subscription_name": <string>, "subscription_type": <int> }] }

Outgoing config options:

pulsar_publish_topic and mqtt_subscribe_topic fields are required.

hashing_scheme: 0 (default) = Java string hash, 1 = Murmur3 hash

compression_type: 0 (default) = no compression, 1 = LZ4, 2 = zlib, 3 = zstd

compression_level: 0 (default) = standard, 1 = faster, 2 = better

router_type: 0 (default) = round robin, 1 = single partition

Incoming config options:

pulsar_subscribe_topic_pattern, mqtt_publish_topic, and subscription_name are required.

subscription_type: 0 (default) = exclusive, 1 = shared, 2 = failover, 3 = key shared

! Note ! exclusive subscriptions will cause errors in clusters, as each cluster node will attempt to establish an exclusive subscription. Either shared or key shared is recommended in clusters.

 

Apache Kafka

Kafka credentials and config

Credentials object:

{ "seed_brokers": [<string>], "tls": <bool> "auth_mechanism": <string>, "auth_params": <object> },

At least one seed broker url required. auth_mechanism required. If auth_mechanism != none, auth_params is required.

auth_mechanism can be one of: none, plain, oauth, aws, scram256, scram512.

auth_params:

plain: { Zid: <string>, // optional authorization id User: <string>, Pass: <string> } oauth: { Zid: <string>, // optional authorization id Token: <string>, Extensions: <object> } aws: { AccessKey: <string>, SecretKey: <string>, SessionToken: <string>, // optional UserAgent: <string> // optional } scram256/scram512: { Zid: <string>, // optional authorization id User: <string>, Pass: <string>, Nonce: <[byte]>, // optional IsToken: <bool>, // set if user/pass are from a delegation token }

Config object:

{ "topic_mappings": { <string>: { "incoming": <string>, "outgoing": <string> }, ... } }

The topic_mappings object keys are Kafka topic names. The incoming and outgoing fields are both MQTT topics. You must provide either an incoming or outgoing MQTT topic (or both) for every Kafka topic.