This page explains how gateways can use the MQTT bridge to communicate with ClearBlade IoT Core and publish telemetry events on behalf of bound devices. Before you begin, read Using the MQTT bridge for general information on using the MQTT bridge with ClearBlade IoT Core.
Using gateways with the MQTT bridge
After you've created and configured the gateway, connect it to ClearBlade IoT Core over the MQTT bridge.
Create devices if you haven't already.
Optional: Bind the devices to the gateway.
When using the MQTT bridge, you only need to bind the devices if they can't generate their JWTs.
Optional: Subscribe to the system error topic to get feedback on whether device operations are successful.
Use the gateway to relay telemetry, device state, and configuration messages on behalf of its devices. Try the end-to-end demo to learn how.
Gateway messages
After the gateway connects to ClearBlade IoT Core over the MQTT bridge, it can send or receive three message types:
Control messages: Attaches a device to the gateway or detaches a device from the gateway. These messages are sent between the gateway and ClearBlade IoT Core. ClearBlade IoT Core accepts control messages only from gateways; if another type of device attempts to send a control message, ClearBlade IoT Core closes the connection.
Messages from gateways and devices: Can be relayed by the gateway on a device’s behalf or sent directly from the gateway.
System error messages: When the gateway is subscribed to the MQTT system error topic on the device’s behalf, ClearBlade IoT Core sends error messages whenever the device encounters an error.
Attaching devices to a gateway
To enable the gateway to proxy device communications with ClearBlade IoT Core, have the gateway publish a QoS 1 /devices/{device_ID_to_attach}/attach
control message over the MQTT bridge.
If you configured the gateway to authenticate devices using the devices' JWTs, the attach message’s payload must include the token in JSON format: { "authorization" : "{JWT_token}" }
. Otherwise, ClearBlade IoT Core authenticates the device by checking its gateway’s association.
Success response
After the device is authorized, ClearBlade IoT Core sends a PUBACK message to the gateway in response to the attach message. After the gateway receives the PUBACK message, it can publish and subscribe to ClearBlade IoT Core topics on the device’s behalf, such as telemetry or configuration messages.
If a device is already attached when the gateway sends the attach message, ClearBlade IoT Core responds with a PUBACK message.
Detaching devices from the gateway
To detach a device from the gateway, have the gateway publish a QoS 1 /devices/{device_ID}/detach
control message over the MQTT bridge. If the device isn't attached when the message is sent, ClearBlade IoT Core ignores the detach control message and sends a PUBACK message.
Troubleshooting
To be notified when a device encounters an error, subscribe the gateway to the MQTT /devices/{gateway_ID}/errors
topic using QoS level 0.
These code sample illustrates how to subscribe the gateway to the MQTT /devices/{gateway_ID}/errors topic using QoS level 0:
Node.js
// const deviceId = `myDevice`; // const gatewayId = `mygateway`; // const registryId = `myRegistry`; // const projectId = `my-project-123`; // const region = `us-central1`; // const algorithm = `RS256`; // const privateKeyFile = `./rsa_private.pem`; // const serverCertFile = `./roots.pem`; /** * @see https://clearblade.atlassian.net/wiki/spaces/IC/pages/2210299905/Retargeting+Devices#Production-URL-%2F-URLs for full list of URLs */ // const mqttBridgeHostname = `us-central1-mqtt.clearblade.com`; // const mqttBridgePort = 8883; // const clientDuration = 60000; const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${gatewayId}`; console.log(mqttClientId); const connectionArgs = { host: mqttBridgeHostname, port: mqttBridgePort, clientId: mqttClientId, username: "unused", password: createJwt(projectId, privateKeyFile, algorithm), protocol: "mqtts", qos: 1, secureProtocol: "TLSv1_2_method", ca: [readFileSync(serverCertFile)], }; // Create a client, and connect to the ClearBlade MQTT bridge. const client = mqtt.connect(connectionArgs); client.on("connect", (success) => { if (!success) { console.log("Client not connected..."); } else { setTimeout(() => { // Subscribe to gateway error topic. client.subscribe(`/devices/${gatewayId}/errors`, { qos: 0 }); attachDevice(deviceId, client); setTimeout(() => { console.log("Closing connection to MQTT. Goodbye!"); client.end(true); }, clientDuration); // Safely detach device and close connection. }, 5000); } }); client.on("close", () => { console.log("Connection closed"); shouldBackoff = true; }); client.on("error", (err) => { console.log("error", err); }); client.on("message", (topic, message) => { const decodedMessage = Buffer.from(message, "base64").toString("ascii"); console.log(`message received on error topic ${topic}: ${decodedMessage}`); }); client.on("packetsend", () => { // Logging packet send is very verbose });
Python
import time import datetime import ssl import jwt import random import paho.mqtt.client as mqtt # The initial backoff time after a disconnection occurs, in seconds. minimum_backoff_time = 1 # The maximum backoff time before giving up, in seconds. MAXIMUM_BACKOFF_TIME = 32 # Whether to wait with exponential backoff before publishing. should_backoff = False def create_jwt(project_id, private_key_file, algorithm): """Creates a JWT (https://jwt.io) to establish an MQTT connection. Args: project_id: The cloud project ID this device belongs to private_key_file: A path to a file containing an RSA256 or ES256 private key. algorithm: The encryption algorithm to use (RS256 or ES256) Returns: A JWT generated from the given project_id and private key, which expires in 20 minutes. After 20 minutes, your client will be disconnected, and a new JWT will have to be generated. Raises: ValueError: If the private_key_file does not contain a known key. """ token = { # The time that the token was issued at "iat": datetime.datetime.now(tz=datetime.timezone.utc), # The time the token expires. "exp": datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(minutes=20), # The audience field should always be set to the GCP project id. "aud": project_id, } # Read the private key file. with open(private_key_file, "r") as f: private_key = f.read() print( "Creating JWT using {} from private key file {}".format( algorithm, private_key_file ) ) return jwt.encode(token, private_key, algorithm=algorithm) def error_str(rc): """Convert a Paho error to a human readable string.""" return "{}: {}".format(rc, mqtt.error_string(rc)) def on_connect(unused_client, unused_userdata, unused_flags, rc): """Callback for when a device connects.""" print("on_connect", mqtt.connack_string(rc)) # After a successful connect, reset backoff time and stop backing off. global should_backoff global minimum_backoff_time should_backoff = False minimum_backoff_time = 1 def on_disconnect(unused_client, unused_userdata, rc): """Paho callback for when a device disconnects.""" print("on_disconnect", error_str(rc)) # Since a disconnect occurred, the next loop iteration will wait with # exponential backoff. global should_backoff should_backoff = True def on_publish(unused_client, unused_userdata, unused_mid): """Paho callback when a message is sent to the broker.""" print("on_publish") def on_message(unused_client, unused_userdata, message): """Callback when the device receives a message on a subscription.""" payload = str(message.payload.decode("utf-8")) print( "Received message '{}' on topic '{}' with Qos {}".format( payload, message.topic, str(message.qos) ) ) def attach_device(client, device_id, auth): """Attach the device to the gateway.""" # [START iot_attach_device] attach_topic = "/devices/{}/attach".format(device_id) attach_payload = '{{"authorization" : "{}"}}'.format(auth) client.publish(attach_topic, attach_payload, qos=1) # [END iot_attach_device] def detach_device(client, device_id): """Detach the device from the gateway.""" # [START iot_detach_device] detach_topic = "/devices/{}/detach".format(device_id) print("Detaching: {}".format(detach_topic)) client.publish(detach_topic, "{}", qos=1) # [END iot_detach_device] def get_client( project_id, cloud_region, registry_id, device_id, private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, mqtt_bridge_port, ): """Create our MQTT client. The client_id is a unique string that identifies this device. For Google Cloud IoT Core, it must be in the format below.""" client_id = "projects/{}/locations/{}/registries/{}/devices/{}".format( project_id, cloud_region, registry_id, device_id ) print("Device client_id is '{}'".format(client_id)) client = mqtt.Client(client_id=client_id) # With Google Cloud IoT Core, the username field is ignored, and the # password field is used to transmit a JWT to authorize the device. client.username_pw_set( username="unused", password=create_jwt(project_id, private_key_file, algorithm) ) # Enable SSL/TLS support. client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2) # Register message callbacks. https://eclipse.org/paho/clients/python/docs/ # describes additional callbacks that Paho supports. In this example, the # callbacks just print to standard out. client.on_connect = on_connect client.on_publish = on_publish client.on_disconnect = on_disconnect client.on_message = on_message # Connect to the Google MQTT bridge. client.connect(mqtt_bridge_hostname, mqtt_bridge_port) # This is the topic that the device will receive configuration updates on. mqtt_config_topic = "/devices/{}/config".format(device_id) # Subscribe to the config topic. client.subscribe(mqtt_config_topic, qos=1) # The topic that the device will receive commands on. mqtt_command_topic = "/devices/{}/commands/#".format(device_id) # Subscribe to the commands topic, QoS 1 enables message acknowledgement. print("Subscribing to {}".format(mqtt_command_topic)) client.subscribe(mqtt_command_topic, qos=0) return client def listen_for_messages( service_account_json, project_id, cloud_region, registry_id, device_id, gateway_id, num_messages, private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, mqtt_bridge_port, jwt_expires_minutes, duration, cb=None, ): """Listens for messages sent to the gateway and bound devices.""" # [START iot_listen_for_messages] global minimum_backoff_time jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc) jwt_exp_mins = jwt_expires_minutes # Use gateway to connect to server client = get_client( project_id, cloud_region, registry_id, gateway_id, private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, mqtt_bridge_port, ) attach_device(client, device_id, "") print("Waiting for device to attach.") time.sleep(5) # The topic devices receive configuration updates on. device_config_topic = "/devices/{}/config".format(device_id) client.subscribe(device_config_topic, qos=1) # The topic gateways receive configuration updates on. gateway_config_topic = "/devices/{}/config".format(gateway_id) client.subscribe(gateway_config_topic, qos=1) # The topic gateways receive error updates on. QoS must be 0. error_topic = "/devices/{}/errors".format(gateway_id) client.subscribe(error_topic, qos=0) # Wait for about a minute for config messages. for i in range(1, duration): client.loop() if cb is not None: cb(client) if should_backoff: # If backoff time is too large, give up. if minimum_backoff_time > MAXIMUM_BACKOFF_TIME: print("Exceeded maximum backoff time. Giving up.") break delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0 time.sleep(delay) minimum_backoff_time *= 2 client.connect(mqtt_bridge_hostname, mqtt_bridge_port) seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds if seconds_since_issue > 60 * jwt_exp_mins: print("Refreshing token after {}s".format(seconds_since_issue)) jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc) client = get_client( project_id, cloud_region, registry_id, gateway_id, private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, mqtt_bridge_port, ) time.sleep(1) detach_device(client, device_id) print("Finished.") # [END iot_listen_for_messages] service_account_json = "/path/to/your/credentials.json" project_id = "YOUR_PROJECT_ID" cloud_region = "us-central1" registry_id = "your-registry-id" gateway_id = "your-gateway-id" device_id = "your-test-device" gateway_private_key_file = "/path/to/your/gateway/rsa_private.pem" algorithm = "RS256" ca_certs = "DigiCertGlobalRootCA.crt.pem" mqtt_bridge_hostname = "us-central1-mqtt.clearblade.com" mqtt_bridge_port = 8883 num_messages = 10 jwt_expires_minutes = 60 duration = 3 listen_for_messages( service_account_json, project_id, cloud_region, registry_id, device_id, gateway_id, num_messages, gateway_private_key_file, algorithm, ca_certs, mqtt_bridge_hostname, mqtt_bridge_port, jwt_expires_minutes, duration, cb=None, )
ClearBlade IoT Core sends gateway errors on a best-effort basis, delivered over QoS 0. If the gateway isn't subscribed to /devices/{gateway_ID}/errors
, ClearBlade IoT Core logs failure events but doesn't send a PUBACK message.
MQTT errors have the following structure:
string error_type; // An error type's string description. string device_id; // The device ID that caused the error. string description; // An error's description.
If the error message were triggered by an MQTT message, the following information would be attached as well:
string message_type; // The string MQTT message type. string topic; // The MQTT topic, if applicable; otherwise, it is empty. int packet_id; // The MQTT message's packet ID, if applicable; otherwise, it is zero.
Error codes and error handling
Error code | Description | Recommended action |
---|---|---|
GATEWAY_ATTACHMENT_ERROR | A gateway attachment request failed. | Do not retry without fixing the problem. |
GATEWAY_DEVICE_NOT_FOUND | The gateway could not find an attached device to handle an incoming message. | Do not retry without fixing the problem. |
GATEWAY_INVALID_MQTT_TOPIC | The gateway could not parse the specified MQTT topic, which was ill-formatted or contained an invalid device ID or name. | Do not retry without fixing the problem. |
GATEWAY_UNEXPECTED_PACKET_ID | The gateway could not process the message based on its packet ID. For example, a PUBACK may have contained a packet ID, but nothing awaited the response. | Do not retry without fixing the problem. |
GATEWAY_UNEXPECTED_MESSAGE_TYPE | The gateway received unexpected messages, such as unsupported PUBREL, PUBREC, etc. | Do not retry without fixing the problem. |
GATEWAY_DETACHMENT_DEVICE_ERROR | The gateway detached a device because of a device error. | Do not retry without fixing the problem. |
UNKNOWN | The error is unknown. | Retry using exponential backoff. |