Stream services using mqtt and clearblade_async libraries
Introduction
A stream service is a software service that can be configured to handle requests continuously. Stream services within the ClearBlade Platform or Edge are most commonly used to listen to MQTT topics and handle MQTT message requests from devices/users.
When to use stream services?
When users invoke a code service more than once a minute through a timer or trigger event
When users expect a real-time response since they reduce the time to invoke the code service
When users choose to perform parallel processing of incoming data, i.e., distribution of the incoming loads across multiple instances of the same stream service
Configuration
Set execution timeout to never.
Template for stream services
Parallel execution using shared topic
Stream services are very efficient and can be designed to perform parallel processing. In this template, users can have ten stream service instances.
The template code below is written so that every stream service instance acts as a unique client to the shared MQTT group topic. Shared subscriptions allow for uniform load distribution across all the stream service instances. Refer to Shared subscriptions to understand how shared topics work.
Add the native mqtt and clearblade_async libraries as service dependencies.
function SharedTopicService(req, resp) {
// This stream service receives messages on topics that match the shared topic pattern.
// It retrieves the serial number of the widget from the last part of the topic and gets the widget data from the message.
// It then updates the row in 'widgets_collection' to correspond to the serial number if it is present. Otherwise, it adds a new row.
const messaging = new MQTT.Client();
const sharedTopic = "$share/shareGroup1/widgets/+";
const errorsTopic = "errors"
const widgetsCollection = ClearBladeAsync.Collection("widgets_collection");
const widgetSerialNumberColumnName = "serial_number";
function publishErrorMessage(originalErrorMessage) {
// Error handling function
messaging.publish(errorsTopic, originalErrorMessage).then(function () {
resp.error(originalErrorMessage);
}).catch(function (publishError) {
const overallErrorMessage = originalErrorMessage + "; also failed to publish to 'errors' topic: " + JSON.stringify(publishError);
resp.error(overallErrorMessage);
})
}
messaging.subscribe(sharedTopic, onMessage).catch(function (subscribeError) {
// Subscribe to shared topic
const subscribeErrorMessage = "Failed to subscribe to sharedTopic: " + JSON.stringify(subscribeError);
publishErrorMessage(subscribeErrorMessage);
});
function onMessage(incomingTopic, incomingMessage) {
// Message handler
try {
// Get widget serial number from topic and widget data from message.
// Combine both into one object used to upsert widgets_collection
const incomingTopicArray = incomingTopic.split("/");
const incomingSerialNumber = incomingTopicArray[incomingTopicArray.length - 1];
var widgetData = JSON.parse(incomingMessage.payload);
widgetData[widgetSerialNumberColumnName] = incomingSerialNumber;
} catch (parseError) {
const parseErrorMessage = "Failed to parse incoming message: " + JSON.stringify(parseError);
publishErrorMessage(parseErrorMessage);
};
widgetsCollection.upsert(widgetData, "serial_number").catch(function(widgetUpsertError) {
// Upsert into widgets_collection: if a row already exists for the incoming serial number, then update that row;
// Otherwise create a new row for that serial number including the
const widgetUpsertErrorMessage = "Failed to upsert widget: " + JSON.stringify(widgetUpsertError);
publishErrorMessage(widgetUpsertError);
resp.error(widgetUpsertError);
})
}
}