Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import org.bouncycastle.util.io.pem.PemObject;
import org.bouncycastle.util.io.pem.PemReader;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.*;
import java.security.*;
import java.security.cert.CertificateFactory;
import java.security.spec.PKCS8EncodedKeySpec;

public class MqttConnection {
    public static void main(String[] args) throws Exception {
        String projectId = "";
        String cloudRegion = "";
        String registryId = "";
        String deviceId = "";
        String privateKeyFile = "";
        String caCerts = "";
        String mqttBridgeHostname = "";
        int mqttBridgePort = 8883;
        String clientId = String.format("projects/%s/locations/%s/registries/%s/devices/%s",
            projectId, cloudRegion, registryId, deviceId);

        MqttClient client = new MqttClient("ssl://" + mqttBridgeHostname + ":" + mqttBridgePort, clientId);

        MqttConnectOptions options = new MqttConnectOptions();
        options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
        options.setSocketFactory(getSocketFactory(caCerts));
        options.setUserName("unused");
        PrivateKey privateKey = getPrivateKeyFromFile(privateKeyFile);
        String jwtToken = generateJwtToken(privateKey, projectId);
        options.setPassword(jwtToken.toCharArray());


        client.setCallback(new MqttCallback() {
            public void connectionLost(Throwable cause) {
                System.out.println("Connection lost");
            }

            public void messageArrived(String topic, MqttMessage message) throws Exception {
                System.out.println("Received message '" + new String(message.getPayload()) +
                    "' on topic '" + topic + "' with QoS " + message.getQos());
            }

            public void deliveryComplete(IMqttDeliveryToken token) {
                System.out.println("Delivery complete");
            }
        });

        client.connect(options);
        System.out.println("Connection successful");

        String mqttConfigTopic = "/devices/" + deviceId + "/config";
        client.subscribe(mqttConfigTopic, 1);
        String mqttCommandTopic = "/devices/" + deviceId + "/commands/#";
        client.subscribe(mqttCommandTopic, 0);
    }

    public static javax.net.ssl.SSLSocketFactory getSocketFactory(String caCerts) {
        try {
            CertificateFactory cf = CertificateFactory.getInstance("X.509");
            FileInputStream fis = new FileInputStream(caCerts);
            java.security.cert.X509Certificate caCert = (java.security.cert.X509Certificate) cf.generateCertificate(fis);

            KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
            ks.load(null, null);
            ks.setCertificateEntry("caCert", caCert);

            TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            tmf.init(ks);

            SSLContext ctx = SSLContext.getInstance("TLS");
            ctx.init(null, tmf.getTrustManagers(), null);

            return ctx.getSocketFactory();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    private static PrivateKey getPrivateKeyFromFile(String privateKeyFilePath) throws Exception {
        File file = new File(privateKeyFilePath);
        PemObject pemObject;

        try (PemReader pemReader = new PemReader(new FileReader(file))) {
            pemObject = pemReader.readPemObject();
        }

        byte[] content = pemObject.getContent();
        PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(content);
        KeyFactory keyFactory = KeyFactory.getInstance("RSA");
        return keyFactory.generatePrivate(keySpec);
    }

    private static String generateJwtToken(PrivateKey privateKey, String projectID) {
        return Jwts.builder()
            .claim("iat", System.currentTimeMillis() / 1000)
            .claim("exp", (System.currentTimeMillis() / 1000) + 1440 * 60)
            .claim("aud", projectID)
            .signWith(SignatureAlgorithm.RS256, privateKey)
            .compact();
    }
}