Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

If you configured the gateway to authenticate devices using the devices' JWTs, the attach attached message’s payload must include the token in JSON format: { "authorization" : "{JWT_token}" }. Otherwise, ClearBlade IoT Core authenticates the device by checking its gateway’s association.

...

After the device is authorized, ClearBlade IoT Core sends a PUBACK message to the gateway in response to the attach attached message. After the gateway receives the PUBACK message, it can publish and subscribe to ClearBlade IoT Core topics on the device’s behalf, such as telemetry or configuration messages.

If a device is already attached when the gateway sends the attach attached message, ClearBlade IoT Core responds with a PUBACK message.

...

To be notified when a device encounters an error, subscribe the gateway to the MQTT /devices/{gateway_ID}/errors topic using QoS level 0.

These code sample illustrates samples illustrate how to subscribe the gateway to the MQTT/devices/{gateway_ID}/errors topic using QoS level 0:

...

Code Block
// const deviceId = `myDevice`;
// const gatewayId = `mygateway`;
// const registryId = `myRegistry`;
// const projectId = `my-project-123`;
// const region = `us-central1`;
// const algorithm = `RS256`;
// const privateKeyFile = `./rsa_private.pem`;
// const serverCertFile = `./roots.pem`;
/**
 * @see https://clearblade.atlassian.net/wiki/spaces/IC/pages/2210299905/Retargeting+Devices#Production-URL-%2F-URLs for a full list of URLs
 */
// const mqttBridgeHostname = `us-central1-mqtt.clearblade.com`;
// const mqttBridgePort = 8883;
// const clientDuration = 60000;

const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${gatewayId}`;
console.log(mqttClientId);
const connectionArgs = {
  host: mqttBridgeHostname,
  port: mqttBridgePort,
  clientId: mqttClientId,
  username: "unused",
  password: createJwt(projectId, privateKeyFile, algorithm),
  protocol: "mqtts",
  qos: 1,
  secureProtocol: "TLSv1_2_method",
  ca: [readFileSync(serverCertFile)],
};

// Create a client, and connect to the ClearBlade MQTT bridge.
const client = mqtt.connect(connectionArgs);

client.on("connect", (success) => {
  if (!success) {
    console.log("Client not connected...");
  } else {
    setTimeout(() => {
      // Subscribe to gateway error topic.
      client.subscribe(`/devices/${gatewayId}/errors`, { qos: 0 });

      attachDevice(deviceId, client);

      setTimeout(() => {
        console.log("Closing connection to MQTT. Goodbye!");
        client.end(true);
      }, clientDuration); // Safely detach the device and close the connection.
    }, 5000);
  }
});

client.on("close", () => {
  console.log("Connection closed");
  shouldBackoff = true;
});

client.on("error", (err) => {
  console.log("error", err);
});

client.on("message", (topic, message) => {
  const decodedMessage = Buffer.from(message, "base64").toString("ascii");

  console.log(`message received on error topic ${topic}: ${decodedMessage}`);
});

client.on("packetsend", () => {
  // Logging packet send is very verbose
});

...

Code Block
import time
import datetime
import ssl
import jwt
import random
import paho.mqtt.client as mqtt

# The initial backoff time after a disconnection occurs, in seconds.
minimum_backoff_time = 1

# The maximum backoff time before giving up, in seconds.
MAXIMUM_BACKOFF_TIME = 32

# Whether to wait with exponential backoff before publishing.
should_backoff = False

def create_jwt(project_id, private_key_file, algorithm):
  """Creates a JWT (https://jwt.io) to establish an MQTT connection.
  Args:
    project_id: The cloud project ID this device belongs to
    private_key_file: A path to a file containing an RSA256 or
            ES256 private key.
    algorithm: The encryption algorithm to use (RS256 or ES256)
  Returns:
      A JWT generated from the given project_id and private key, which
      expires in 20 minutes. After 20 minutes, your client will be
      disconnected, and a new JWT will have to be generated.
  Raises:
      ValueError: If the private_key_file does not contain a known key.
  """

  token = {
    # The time that the token was issued at
    "iat": datetime.datetime.now(tz=datetime.timezone.utc),
    # The time the token expires.
    "exp": datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(minutes=20),
    # The audience field should always be set to the GCP project id.
    "aud": project_id,
  }

  # Read the private key file.
  with open(private_key_file, "r") as f:
    private_key = f.read()

  print(
    "Creating JWT using {} from private key file {}".format(
      algorithm, private_key_file
    )
  )

  return jwt.encode(token, private_key, algorithm=algorithm)

def error_str(rc):
  """Convert a Paho error to a human -readable string."""
  return "{}: {}".format(rc, mqtt.error_string(rc))


def on_connect(unused_client, unused_userdata, unused_flags, rc):
  """Callback for when a device connects."""
  print("on_connect", mqtt.connack_string(rc))

  # After a successful connect, reset backoff time and stop backing off.
  global should_backoff
  global minimum_backoff_time
  should_backoff = False
  minimum_backoff_time = 1


def on_disconnect(unused_client, unused_userdata, rc):
  """Paho callback for when a device disconnects."""
  print("on_disconnect", error_str(rc))

  # Since a disconnect occurred, the next loop iteration will wait with
  # exponential backoff.
  global should_backoff
  should_backoff = True


def on_publish(unused_client, unused_userdata, unused_mid):
  """Paho callback when a message is sent to the broker."""
  print("on_publish")


def on_message(unused_client, unused_userdata, message):
  """Callback when the device receives a message on a subscription."""
  payload = str(message.payload.decode("utf-8"))
  print(
    "Received message '{}' on topic '{}' with Qos {}".format(
      payload, message.topic, str(message.qos)
    )
  )


def attach_device(client, device_id, auth):
    """Attach the device to the gateway."""
    # [START iot_attach_device]
    attach_topic = "/devices/{}/attach".format(device_id)
    attach_payload = '{{"authorization" : "{}"}}'.format(auth)
    client.publish(attach_topic, attach_payload, qos=1)
    # [END iot_attach_device]


def detach_device(client, device_id):
    """Detach the device from the gateway."""
    # [START iot_detach_device]
    detach_topic = "/devices/{}/detach".format(device_id)
    print("Detaching: {}".format(detach_topic))
    client.publish(detach_topic, "{}", qos=1)
    # [END iot_detach_device]


def get_client(
  project_id,
  cloud_region,
  registry_id,
  device_id,
  private_key_file,
  algorithm,
  ca_certs,
  mqtt_bridge_hostname,
  mqtt_bridge_port,
):
  """Create our MQTT client. The client_id is a unique string that identifies
  this device. For Google Cloud IoT Core, it must be in the format below."""
  client_id = "projects/{}/locations/{}/registries/{}/devices/{}".format(
    project_id, cloud_region, registry_id, device_id
  )
  print("Device client_id is '{}'".format(client_id))

  client = mqtt.Client(client_id=client_id)

  # With Google Cloud IoT Core, the username field is ignored, and the
  # password field is used to transmit a JWT to authorize the device.
  client.username_pw_set(
    username="unused", password=create_jwt(project_id, private_key_file, algorithm)
  )

  # Enable SSL/TLS support.
  client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)

  # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
  # describes additional callbacks that Paho supports. In this example, the
  # callbacks just print to standard out.
  client.on_connect = on_connect
  client.on_publish = on_publish
  client.on_disconnect = on_disconnect
  client.on_message = on_message

  # Connect to the Google MQTT bridge.
  client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

  # This is the topic thaton which the device will receive configuration updates on.
  mqtt_config_topic = "/devices/{}/config".format(device_id)

  # Subscribe to the config topic.
  client.subscribe(mqtt_config_topic, qos=1)

  # The topic that the device will receive commands on.
  mqtt_command_topic = "/devices/{}/commands/#".format(device_id)

  # Subscribe to the commands topic,; QoS 1 enables message acknowledgementacknowledgment.
  print("Subscribing to {}".format(mqtt_command_topic))
  client.subscribe(mqtt_command_topic, qos=0)

  return client


def listen_for_messages(
  service_account_json,
  project_id,
  cloud_region,
  registry_id,
  device_id,
  gateway_id,
  num_messages,
  private_key_file,
  algorithm,
  ca_certs,
  mqtt_bridge_hostname,
  mqtt_bridge_port,
  jwt_expires_minutes,
  duration,
  cb=None,
):
  """Listens for messages sent to the gateway and bound devices."""
  # [START iot_listen_for_messages]
  global minimum_backoff_time

  jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
  jwt_exp_mins = jwt_expires_minutes
  # Use the gateway to connect to the server
  client = get_client(
    project_id,
    cloud_region,
    registry_id,
    gateway_id,
    private_key_file,
    algorithm,
    ca_certs,
    mqtt_bridge_hostname,
    mqtt_bridge_port,
  )

  attach_device(client, device_id, "")
  print("Waiting for device to attach.")
  time.sleep(5)

  # The topic devices receive configuration updates on.
  device_config_topic = "/devices/{}/config".format(device_id)
  client.subscribe(device_config_topic, qos=1)
  
  # The topic gateways receive configuration updates on.
  gateway_config_topic = "/devices/{}/config".format(gateway_id)
  client.subscribe(gateway_config_topic, qos=1)
  
  # The topic gateways receive error updates on. QoS must be 0.
  error_topic = "/devices/{}/errors".format(gateway_id)
  client.subscribe(error_topic, qos=0)
  
  # Wait about a minute for config messages.
  for i in range(1, duration):
    client.loop()
    
    if cb is not None:
      cb(client)

    if should_backoff:
      # If backoff time is too large, give up.
      if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
        print("Exceeded maximum backoff time. Giving up.")
        break

      delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
      time.sleep(delay)
      minimum_backoff_time *= 2
      client.connect(mqtt_bridge_hostname, mqtt_bridge_port)
    
    seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
    if seconds_since_issue > 60 * jwt_exp_mins:
      print("Refreshing token after {}s".format(seconds_since_issue))
      jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
      client = get_client(
        project_id,
        cloud_region,
        registry_id,
        gateway_id,
        private_key_file,
        algorithm,
        ca_certs,
        mqtt_bridge_hostname,
        mqtt_bridge_port,
      )
    
    time.sleep(1)

  detach_device(client, device_id)

  print("Finished.")
  # [END iot_listen_for_messages]



service_account_json = "/path/to/your/credentials.json"
project_id = "YOUR_PROJECT_ID"
cloud_region = "us-central1"
registry_id = "your-registry-id"
gateway_id = "your-gateway-id"
device_id = "your-test-device"

gateway_private_key_file = "/path/to/your/gateway/rsa_private.pem"
algorithm = "RS256"
ca_certs = "DigiCertGlobalRootCA.crt.pem"
mqtt_bridge_hostname = "us-central1-mqtt.clearblade.com"
mqtt_bridge_port = 8883

num_messages = 10
jwt_expires_minutes = 60
duration = 3

listen_for_messages(
  service_account_json,
  project_id,
  cloud_region,
  registry_id,
  device_id,
  gateway_id,
  num_messages,
  gateway_private_key_file,
  algorithm,
  ca_certs,
  mqtt_bridge_hostname,
  mqtt_bridge_port,
  jwt_expires_minutes,
  duration,
  cb=None,
)

...