...
Code Block |
---|
import time
import datetime
import ssl
import jwt
import random
import paho.mqtt.client as mqtt
# The initial backoff time after a disconnection occurs, in seconds.
minimum_backoff_time = 1
# The maximum backoff time before giving up, in seconds.
MAXIMUM_BACKOFF_TIME = 32
# Whether to wait with exponential backoff before publishing.
should_backoff = False
def create_jwt(project_id, private_key_file, algorithm):
"""Creates a JWT (https://jwt.io) to establish an MQTT connection.
Args:
project_id: The cloud project ID this device belongs to
private_key_file: A path to a file containing an RSA256 or
ES256 private key.
algorithm: The encryption algorithm to use (RS256 or ES256)
Returns:
A JWT generated from the given project_id and private key, which
expires in 20 minutes. After 20 minutes, your client will be
disconnected, and a new JWT will have to be generated.
Raises:
ValueError: If the private_key_file does not contain a known key.
"""
token = {
# The time that the token was issued at
"iat": datetime.datetime.now(tz=datetime.timezone.utc),
# The time the token expires.
"exp": datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(minutes=20),
# The audience field should always be set to the GCP project id.
"aud": project_id,
}
# Read the private key file.
with open(private_key_file, "r") as f:
private_key = f.read()
print(
"Creating JWT using {} from private key file {}".format(
algorithm, private_key_file
)
)
return jwt.encode(token, private_key, algorithm=algorithm)
def error_str(rc):
"""Convert a Paho error to a human readable string."""
return "{}: {}".format(rc, mqtt.error_string(rc))
def on_connect(unused_client, unused_userdata, unused_flags, rc):
"""Callback for when a device connects."""
print("on_connect", mqtt.connack_string(rc))
# After a successful connect, reset backoff time and stop backing off.
global should_backoff
global minimum_backoff_time
should_backoff = False
minimum_backoff_time = 1
def on_disconnect(unused_client, unused_userdata, rc):
"""Paho callback for when a device disconnects."""
print("on_disconnect", error_str(rc))
# Since a disconnect occurred, the next loop iteration will wait with
# exponential backoff.
global should_backoff
should_backoff = True
def on_publish(unused_client, unused_userdata, unused_mid):
"""Paho callback when a message is sent to the broker."""
print("on_publish")
def on_message(unused_client, unused_userdata, message):
"""Callback when the device receives a message on a subscription."""
payload = str(message.payload.decode("utf-8"))
print(
"Received message '{}' on topic '{}' with Qos {}".format(
payload, message.topic, str(message.qos)
)
)
def attach_device(client, device_id, auth):
"""Attach the device to the gateway."""
# [START iot_attach_device]
attach_topic = "/devices/{}/attach".format(device_id)
attach_payload = '{{"authorization" : "{}"}}'.format(auth)
client.publish(attach_topic, attach_payload, qos=1)
# [END iot_attach_device]
def detach_device(client, device_id):
"""Detach the device from the gateway."""
# [START iot_detach_device]
detach_topic = "/devices/{}/detach".format(device_id)
print("Detaching: {}".format(detach_topic))
client.publish(detach_topic, "{}", qos=1)
# [END iot_detach_device]
def get_client(
project_id,
cloud_region,
registry_id,
device_id,
private_key_file,
algorithm,
ca_certs,
mqtt_bridge_hostname,
mqtt_bridge_port,
):
"""Create our MQTT client. The client_id is a unique string that identifies
this device. For Google Cloud IoT Core, it must be in the format below."""
client_id = "projects/{}/locations/{}/registries/{}/devices/{}".format(
project_id, cloud_region, registry_id, device_id
)
print("Device client_id is '{}'".format(client_id))
client = mqtt.Client(client_id=client_id)
# With Google Cloud IoT Core, the username field is ignored, and the
# password field is used to transmit a JWT to authorize the device.
client.username_pw_set(
username="unused", password=create_jwt(project_id, private_key_file, algorithm)
)
# Enable SSL/TLS support.
client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)
# Register message callbacks. https://eclipse.org/paho/clients/python/docs/
# describes additional callbacks that Paho supports. In this example, the
# callbacks just print to standard out.
client.on_connect = on_connect
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.on_message = on_message
# Connect to the Google MQTT bridge.
client.connect(mqtt_bridge_hostname, mqtt_bridge_port)
# This is the topic that the device will receive configuration updates on.
mqtt_config_topic = "/devices/{}/config".format(device_id)
# Subscribe to the config topic.
client.subscribe(mqtt_config_topic, qos=1)
# The topic that the device will receive commands on.
mqtt_command_topic = "/devices/{}/commands/#".format(device_id)
# Subscribe to the commands topic, QoS 1 enables message acknowledgement.
print("Subscribing to {}".format(mqtt_command_topic))
client.subscribe(mqtt_command_topic, qos=0)
return client
def listen_for_messages(
service_account_json,
project_id,
cloud_region,
registry_id,
device_id,
gateway_id,
num_messages,
private_key_file,
algorithm,
ca_certs,
mqtt_bridge_hostname,
mqtt_bridge_port,
jwt_expires_minutes,
duration,
cb=None,
):
"""Listens for messages sent to the gateway and bound devices."""
# [START iot_listen_for_messages]
global minimum_backoff_time
jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
project_id,
cloud_region,
registry_id,
gateway_id,
private_key_file,
algorithm,
ca_certs,
mqtt_bridge_hostname,
mqtt_bridge_port,
)
attach_device(client, device_id, "")
print("Waiting for device to attach.")
time.sleep(5)
# The topic devices receive configuration updates on.
device_config_topic = "/devices/{}/config".format(device_id)
client.subscribe(device_config_topic, qos=1)
# The topic gateways receive configuration updates on.
gateway_config_topic = "/devices/{}/config".format(gateway_id)
client.subscribe(gateway_config_topic, qos=1)
# The topic gateways receive error updates on. QoS must be 0.
error_topic = "/devices/{}/errors".format(gateway_id)
client.subscribe(error_topic, qos=0)
# Wait for about a minute for config messages.
for i in range(1, duration):
client.loop()
if cb is not None:
cb(client)
if should_backoff:
# If backoff time is too large, give up.
if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
print("Exceeded maximum backoff time. Giving up.")
break
delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
time.sleep(delay)
minimum_backoff_time *= 2
client.connect(mqtt_bridge_hostname, mqtt_bridge_port)
seconds_since_issue = (datetime.datetime.now(tz=datetime.timezone.utc) - jwt_iat).seconds
if seconds_since_issue > 60 * jwt_exp_mins:
print("Refreshing token after {}s".format(seconds_since_issue))
jwt_iat = datetime.datetime.now(tz=datetime.timezone.utc)
client = get_client(
project_id,
cloud_region,
registry_id,
gateway_id,
private_key_file,
algorithm,
ca_certs,
mqtt_bridge_hostname,
mqtt_bridge_port,
)
time.sleep(1)
detach_device(client, device_id)
print("Finished.")
# [END iot_listen_for_messages]
service_account_json = "/path/to/your/credentials.json"
project_id = "YOUR_PROJECT_ID"
cloud_region = "us-central1"
registry_id = "your-registry-id"
gateway_id = "your-gateway-id"
device_id = "your-test-device"
gateway_private_key_file = "/path/to/your/gateway/rsa_private.pem"
algorithm = "RS256"
ca_certs = "DigiCertGlobalRootCA.crt.pem"
mqtt_bridge_hostname = "us-central1-mqtt.clearblade.com"
mqtt_bridge_port = 8883
num_messages = 10
jwt_expires_minutes = 60
duration = 3
listen_for_messages(
service_account_json,
project_id,
cloud_region,
registry_id,
device_id,
gateway_id,
num_messages,
gateway_private_key_file,
algorithm,
ca_certs,
mqtt_bridge_hostname,
mqtt_bridge_port,
jwt_expires_minutes,
duration,
cb=None,
) |
...