Stream services
Introduction
A stream service is a software service that can be configured to handle requests continuously. The most common use of stream services within the ClearBlade Platform or Edge is to listen to MQTT topics and handle MQTT message requests from devices/users.
When to use stream services?
When users invoke a code service more than once a minute through a timer or trigger event
When users expect a real-time response since they reduce the time to invoke the code service
When users choose to perform parallel processing of incoming data, i.e., distribution of the incoming loads across multiple instances of the same stream service
Configuration
Set execution timeout to never.
Template for stream services
Parallel execution using shared topic
Stream services are very efficient and can be designed to perform parallel processing. In this template, users can have ten stream service instances.
The template code below is written so that every stream service instance acts as a unique client to the shared MQTT group topic. Shared subscriptions allow for uniform load distribution across all the stream service instances. Refer to Shared subscriptions to understand how shared topics work.
function SharedTopicService(req, resp) {
ClearBlade.init({ request: req });
var messaging = ClearBlade.Messaging();
var sharedTopic = "$share/Group1/device/+";
var deviceCollection = ClearBlade.Collection({
collectionName: "some_collection"
});
// DEBUG MESSAGE
messaging.publish("success", "Service Started");
messaging.subscribe(sharedTopic, function(err, errMsg) {
if (err) {
// DEBUG MESSAGE
messaging.publish("error", "Subscribe failed: " + errMsg);
resp.error();
}
// DEBUG MESSAGE
messaging.publish("success", "Subscribed to Shared Topic");
// Once successfully subscribed
WaitLoop();
});
function WaitLoop() {
// DEBUG MESSAGE
messaging.publish("success", "Starting the Loop");
while (true) {
messaging.waitForMessage([sharedTopic], function(err, msg, topic) {
if (err) {
// DEBUG MESSAGE
messaging.publish("error", "Failed to wait for message: " + err + " " + msg + " " + topic);
resp.error("Failed to wait for message: " + err + " " + msg + " " + topic);
} else {
// any action
addCollectionRow(msg);
}
});
}
}
function addCollectionRow(msg) {
try {
var parseMsg = JSON.parse(msg);
} catch (e) {
// DEBUG MESSAGE
messaging.publish("error","Problem with parsing: " + e);
resp.error("Problem with parsing: " + e);
}
var data = {
column_1_name: "column_1_data",
column_2_name: "column_2_data"
};
//debugging
deviceCollection.create(data, function(err, result) {
if (err) {
// DEBUG MESSAGE
messaging.publish("error", "failed to create: " + result);
resp.error("create failed: " + result);
} else {
//no op
}
});
}
}