Stream services

Introduction

A stream service is a software service that can be configured to handle requests continuously. The most common use of stream services within the ClearBlade Platform or Edge is to listen to MQTT topics and handle MQTT message requests from devices/users.

When to use stream services?

  • When users invoke a code service more than once a minute through a timer or trigger event

  • When users expect a real-time response since they reduce the time to invoke the code service

  • When users choose to perform parallel processing of incoming data, i.e., distribution of the incoming loads across multiple instances of the same stream service

Configuration

Set execution timeout to never.

Template for stream services

Parallel execution using shared topic

Stream services are very efficient and can be designed to perform parallel processing. In this template, users can have ten stream service instances.

The template code below is written so that every stream service instance acts as a unique client to the shared MQTT group topic. Shared subscriptions allow for uniform load distribution across all the stream service instances. Refer to Shared subscriptions to understand how shared topics work.

function SharedTopicService(req, resp) { ClearBlade.init({ request: req }); var messaging = ClearBlade.Messaging(); var sharedTopic = "$share/Group1/device/+"; var deviceCollection = ClearBlade.Collection({ collectionName: "some_collection" }); // DEBUG MESSAGE messaging.publish("success", "Service Started"); messaging.subscribe(sharedTopic, function(err, errMsg) { if (err) { // DEBUG MESSAGE messaging.publish("error", "Subscribe failed: " + errMsg); resp.error(); } // DEBUG MESSAGE messaging.publish("success", "Subscribed to Shared Topic"); // Once successfully subscribed WaitLoop(); }); function WaitLoop() { // DEBUG MESSAGE messaging.publish("success", "Starting the Loop"); while (true) { messaging.waitForMessage([sharedTopic], function(err, msg, topic) { if (err) { // DEBUG MESSAGE messaging.publish("error", "Failed to wait for message: " + err + " " + msg + " " + topic); resp.error("Failed to wait for message: " + err + " " + msg + " " + topic); } else { // any action addCollectionRow(msg); } }); } } function addCollectionRow(msg) { try { var parseMsg = JSON.parse(msg); } catch (e) { // DEBUG MESSAGE messaging.publish("error","Problem with parsing: " + e); resp.error("Problem with parsing: " + e); } var data = { column_1_name: "column_1_data", column_2_name: "column_2_data" }; //debugging deviceCollection.create(data, function(err, result) { if (err) { // DEBUG MESSAGE messaging.publish("error", "failed to create: " + result); resp.error("create failed: " + result); } else { //no op } }); } }