- 印刷する
- PDF
Cloud IoT Coreのサンプルコード
- 印刷する
- PDF
Classic/VPC環境で利用できます。
Cloud IoT Coreからダウンロード証明書を使用して、MQTTプロトコルに基づき Cloud IoT Coreサーバにメッセージを発行し、購読するサンプルを説明します。
Java
Cloud IoT Coreにアクセスし、計5回のメッセージの発行および購読を繰り返してからプログラムが終了するサンプルコードです。メッセージの発行は factory/room1/temperature
トピックで発生し、Cloud IoT Coreを経て再発行された alertトピックを購読します。
環境設定
サンプルファイルを実行するには MQTT、TLSライブラリが必要です。
- maven
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${version}</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>${version}</version>
</dependency>
</dependencies>
- gradle
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:${version}'
implementation 'org.bouncycastle:bcpkix-jdk15on:${version}'
サンプルコード
Javaのサンプルコードは次の通りです。
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMKeyPair;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.security.*;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
public class MqttExample {
public static void main(String[] args) throws Exception {
String clientId = "myClientTest";
/*
*** Modify file path and server ***
*/
// MQTT server hostname
String iotServer = "ssl://msg01.cloudiot.ntruss.com";
String rootCaCertFilePath = "/<Your>/<file>/<path>/rootCaCert.pem";
String clientCaChainFilePath = "/<Your>/<file>/<path>/caChain.pem";
String clientCertFilePath = "/<Your>/<file>/<path>/cert.pem";
String clientKeyFilePath = "/<Your>/<file>/<path>/private.pem";
String password = "";
/*
Make Root CA TrustManager and Client Keystore
*/
MqttExample mqttExample = new MqttExample();
// CA certificate is used to **authenticate server**
TrustManagerFactory tmf = mqttExample.getTrustManagerFactory(rootCaCertFilePath);
// client key, certificates and cachain are sent to server to authenticate client
KeyManagerFactory kmf = mqttExample.getKeyManagerFactory(clientCaChainFilePath, clientCertFilePath, clientKeyFilePath, password);
/*
MQTT Connection
*/
// Set MQTT Client
MqttClient mqttClient = new MqttClient(iotServer, clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setConnectionTimeout(60);
connOpts.setKeepAliveInterval(60);
connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
// create SSL socket factory
SSLContext context = SSLContext.getInstance("TLSv1.2");
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
connOpts.setSocketFactory(context.getSocketFactory());
mqttClient.connect(connOpts);
System.out.println("=== Successfully Connected");
/*
Subscribe & Publish Message
Publish message on the device ---> IoT Platform(alert republish action) ---> Subscribe message on the device
*/
mqttClient.setCallback(new MqttCallback() {
public void connectionLost(Throwable throwable) {
System.out.println("=== Connection lost.");
}
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println("<<< Subscribe from IoT server. topic : " + s + ", message : " + mqttMessage.toString());
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println(">>> Publish to IoT server.");
}
});
/*
Topic & Message Example
*/
String sendTopic = "factory/room1/temperature";
// Test for MQTT republish
// In order to receive alert topic message, alert republish action must be set in Rule engine.
String alertTopic = "alert";
// Console guide example message
String message = "{ \"deviceId\": \"device_1\", \"deviceType\": \"temperature\", \"value\": 35, \"battery\": 9, \"date\": \"2016-12-15\", \"time\": \"15:12:00\"}";
/*
Subscribe message
*/
boolean isConnected = mqttClient.isConnected();
if (isConnected) {
mqttClient.subscribe(alertTopic);
// You can check whether the message is being delivered normally.
// mqttClient.subscribe(sendTopic);
}
/*
Publish message
*/
if (isConnected) {
for (int i=0; i < 5; i++) {
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(0);
mqttMessage.setRetained(false);
try {
MqttTopic topic = mqttClient.getTopic(sendTopic);
MqttDeliveryToken token = topic.publish(mqttMessage);
token.waitForCompletion();
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/*
Enable disconnect() and close() if you needed
*/
// mqttClient.disconnect();
// mqttClient.close();
}
/*
Code for Certificate Access
*/
private MqttExample() {
init();
}
private KeyManagerFactory getKeyManagerFactory(String clientCaChainFilePath, String clientCertFilePath, String clientKeyFilePath, String password) throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, UnrecoverableKeyException {
// load client private key
KeyPair key = getClientKey(clientKeyFilePath);
// load client Cert
Certificate clientCert = getClientCert(clientCertFilePath);
// load client CA Chain
List<Certificate> caChain = getClientCaChain(clientCaChainFilePath, clientCert);
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(null, null);
int caChainSize = caChain.size();
Certificate[] caChainArray = caChain.toArray(new Certificate[caChainSize]);
ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(), caChainArray);
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, password.toCharArray());
return kmf;
}
private TrustManagerFactory getTrustManagerFactory(String rootCaCertFilePath) throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException {
// load CA certificate
X509Certificate rootCaCert = getRootCaCert(rootCaCertFilePath);
KeyStore rootCaKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
rootCaKeyStore.load(null, null);
rootCaKeyStore.setCertificateEntry("ca-certificate", rootCaCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
tmf.init(rootCaKeyStore);
return tmf;
}
private void init() {
Security.addProvider(new BouncyCastleProvider());
}
private KeyPair getClientKey(String clientKeyFilePath) throws IOException {
PEMParser pemParser = new PEMParser(new FileReader(clientKeyFilePath));
Object object = pemParser.readObject();
JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
KeyPair key = converter.getKeyPair((PEMKeyPair) object);
pemParser.close();
return key;
}
private List<Certificate> getClientCaChain(String clientCaChainFilePath, Certificate clientCert) throws CertificateException, IOException {
X509Certificate cert;
List<Certificate> caChain = new ArrayList<Certificate>();
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(clientCaChainFilePath));
CertificateFactory cf = CertificateFactory.getInstance("X.509");
while (bis.available() > 0) {
cert = (X509Certificate) cf.generateCertificate(bis);
caChain.add(cert);
}
caChain.add(0, clientCert);
return caChain;
}
private X509Certificate getClientCert(String clientCertFilePath) throws CertificateException, IOException {
return getX509Certificate(clientCertFilePath);
}
private X509Certificate getRootCaCert(String rootCaCertFilePath) throws CertificateException, IOException {
return getX509Certificate(rootCaCertFilePath);
}
private X509Certificate getX509Certificate(String rootCaCertFilePath) throws CertificateException, IOException {
X509Certificate caCert = null;
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(rootCaCertFilePath));
CertificateFactory cf = CertificateFactory.getInstance("X.509");
while (bis.available() > 0) {
caCert = (X509Certificate) cf.generateCertificate(bis);
}
return caCert;
}
}
動作結果
Javaサンプルコードの動作結果は次の通りです。
=== Successfully Connected
>>> Publish to IoT server.
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
Python
Cloud IoT Coreにアクセスし、計5回のメッセージの発行および購読を繰り返してからプログラムが終了するサンプルコードです。メッセージの発行は factory/room1/temperature
トピックで発生し、Cloud IoT Coreを経て再発行された alertトピックを購読します。
環境設定
サンプルファイルを実行するには paho-mqttライブラリのインストールが必要です。
pip install paho-mqtt //python 2.x
python -m pip install paho-mqtt // python 2.7.9+
python3 -m pip install paho-mqtt //python 3.x
サンプルファイル内の Full Certificate Chain値の場合、証明書とチェーン証明書を接続した Full Certificate Chainファイルを保存するパスを意味します。Pythonサンプルの実行時、Full Certificate Chainファイルが設定されたパスに自動的に別途保存されます。
サンプルコード
mqttTLSClient.py
# coding=utf-8 import argparse import logging import paho.mqtt.client as mqttClient import ssl import time import json def main(): # init initLogger() initParameter() appendCert() # Connect MQTT Broker client = mqttClient.Client() if not connectTls(client, hostname, port, rootCa, fullCertChain, private): client.loop_stop() exit(1) attempts = 0 while attempts < 5: # Subscribe message client.subscribe("alert",0) # Publish Message to Message broker publish(client, topic, payload) time.sleep(publishDelay) attempts += 1 time.sleep(5) def initLogger(): global log log = logging.getLogger(__name__) log.setLevel(logging.ERROR) # stream handler (print to console) streamHandler = logging.StreamHandler() # file formatter formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] (%(filename)s:%(funcName)s:%(lineno)d) : %(message)s') streamHandler.setFormatter(formatter) log.addHandler(streamHandler) def initParameter(): global rootCa global caChain global cert global private global fullCertChain global hostname global port global publishDelay global topic global payload global connected rootCa = '/<Your>/<file>/<path>/rootCaCert.pem' caChain = '/<Your>/<file>/<path>/caChain.pem' cert = '/<Your>/<file>/<path>/cert.pem' private = '/<Your>/<file>/<path>/private.pem' hostname = 'msg01.cloudiot.ntruss.com' port = 8883 publishDelay = 1 # sec topic = "factory/room1/temperature" payload = "{ \"deviceId\": \"device_1\", \"deviceType\": \"temperature\", \"value\": 35, \"battery\": 9, \"date\": \"2016-12-15\", \"time\": \"15:12:00\"}" connected = False #fullCertChain is automatically generated by cert and caChain fullCertChain = './fullCertChain.pem' def appendCert(): filenames = [cert, caChain] with open(fullCertChain,'w') as outfile: for fname in filenames : with open(fname) as infile: outfile.write(infile.read()+"\n") def connectTls(client, hostname, port, rootCa, fullCertChain, clientKey): client.on_connect = on_connect client.on_message = on_message client.on_publish = on_publish client.tls_set(ca_certs=rootCa, certfile=fullCertChain, keyfile=clientKey, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None) ssl.match_hostname = lambda cert, hostname: True client.tls_insecure_set(False) client.connect(hostname, port=port) client.loop_start() attempts = 0 while not connected and attempts < 5: # Wait for connection time.sleep(1) attempts += 1 if not connected: return False return True def on_connect(client, userdata, flags, rc): if rc == 0: log.info("=== Successfully Connected") global connected # Use global variable connected = True # Signal connection else: log.error("=== Connection lost") def on_publish(client, userdata, result): print(">>> Publish to IoT server.") def publish(client, topic, payload): try: client.publish(topic, payload) except Exception as e: print("[ERROR] Could not publish data, error: {}".format(e)) def on_message(client, userdata, message): print("<<< Subscribe from IoT server. topic : "+ message.topic + ", message : " + str(message.payload.decode("utf-8"))) if __name__ == "__main__": main()
動作結果
mqttTLSClient.py実行結果は次の通りです。
>>> Publish to IoT server.
>>> Publish to IoT server.
>>> Publish to IoT server.
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
<<< Subscribe from IoT server. topic : alert, message : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
Node JS(JavaScript)
Cloud IoT Coreにアクセスし、計5回のメッセージの発行および購読を繰り返してからプログラムが終了するサンプルコードです。メッセージの発行は factory/room1/temperature
トピックで発生し、Cloud IoT Coreを経て再発行された alertトピックを購読します。
環境設定
Node Js(JavaScript)サンプルは、mqtt libraryライブラリのインストールが必要です。ライブラリのインストール前に npm(node package manager)をあらかじめインストールしている必要があります。
package.jsonファイル MqttClient.jsとファイルを同じフォルダに配置し、npm install
コマンドでノードライブラリパッケージをダウンロードしてから実行できます。
- package.json
{
"name": "cloud-iot-mqtt-sample",
"version": "0.1.0",
"dependencies": { "mqtt": "^1.14.1" }
}
サンプルコード
- MqttClient.js
const mqtt = require('mqtt'); const fs = require('fs'); const path = require('path'); const HOST = "msg01.cloudiot.ntruss.com"; const PORT = 8883; const PUBLISH_TOPIC = `factory/room1/temperature`; const SUBSCRIBE_TOPIC = `alert`; const QOS = 0; const certFolderPath = "/<Your>/<file>/<path>/"; const KEY = fs.readFileSync(certFolderPath + `private.pem`); const CERT = fs.readFileSync(certFolderPath + `cert.pem`); const CA_CHAIN = fs.readFileSync(certFolderPath + `caChain.pem`); const TRUSTED_CA_LIST = fs.readFileSync(certFolderPath + `rootCaCert.pem`); const FULL_CERTIFICATION = CERT+"\n"+CA_CHAIN; let receivedIndex = 0; const connectionInfo = { port: PORT, host: HOST, key: KEY, cert: FULL_CERTIFICATION, ca: TRUSTED_CA_LIST, rejectUnauthorized: true, protocol: 'mqtts', connectTimeout: 60 * 1000, keepalive: 1000, } const client = mqtt.connect(connectionInfo); // Connetion to mqtt message broker // client.on('connect', async function () { console.log('=== Successfully Connected'); for (let i = 0; i < 5; i++) { await sleep(1000); sendMessage(); } }) // Subscribe to Message // function sendMessage() { console.log(">>> Publish to IoT server.") client.publish(PUBLISH_TOPIC, getMsg()) } // Subscribe to Message // client.subscribe(SUBSCRIBE_TOPIC, {qos:QOS}); // Receiving Message // client.on('message', async function (topic, message) { console.log(`<<< Subscribe from IoT server. topic : "${topic}", message(${receivedIndex}) : ${message.toString()}`); receivedIndex++; if (receivedIndex == 5) { client.end(); console.log("=== Complete.") } }) client.on('close', function () { console.log("connection Close"); }) client.on('error', function (err) { console.log(err.toString()); }) function getMsg () { const timeStamp = Math.floor(new Date() / 1000); const msg = `{"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}`; return msg; } const sleep = (ms) => { return new Promise(resolve=>{ setTimeout(resolve,ms); }) }
動作結果
Node JS(JavaScript)のサンプルコードの動作結果は次のとおりです。
=== Successfully Connected
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(0) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(1) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(2) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(3) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : "factory/room1/temperature", message(4) : {"battery":9,"date":"2016-12-15","deviceId":"device_1","deviceType":"temperature","time":"15:12:00","value":35}
=== Complete.
connection Close
C
Cloud IoT Coreにアクセスし、計5回のメッセージの発行および購読を繰り返してからプログラムが終了するサンプルコードです。メッセージの発行は factory/room1/temperature
トピックで発生し、Cloud IoT Coreを経て再発行された alertトピックを購読します。
環境設定
このサンプルファイルを実行するためには paho mqttライブラリのインストールが必要です。
CentOS
$ sudo yum install build-essential gcc make cmake cmake-gui cmake-curses-gui $ sudo yum install openssl-devel $ git clone https://github.com/eclipse/paho.mqtt.c $ cd paho.mqtt.c $ git checkout v1.3.8 $ make $ sudo make install
Ubuntu
$ sudo apt-get install build-essential gcc make cmake cmake-gui cmake-curses-gui $ sudo apt-get install libssl-dev $ git clone https://github.com/eclipse/paho.mqtt.c.git $ cd paho.mqtt.c $ git checkout v1.3.8 $ cmake -Bbuild -H. -DPAHO_ENABLE_TESTING=OFF -DPAHO_BUILD_STATIC=ON \ -DPAHO_WITH_SSL=ON -DPAHO_HIGH_PERFORMANCE=ON $ sudo cmake --build build/ --target install $ sudo ldconfig
サンプルファイル内の Full Certificate Chain Path値は、証明書とチェーン証明書を接続した Full Certificate Chainファイルを保存するパスを意味します。サンプルを実行すると、Full Certificate Chainファイルが設定されたパスに自動的に別途保存されます。
サンプルコード
mqttClient.c
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include "MQTTClient.h" #define QOS 1 #define TIMEOUT 10000L volatile MQTTClient_deliveryToken deliveredtoken; // Generate fullCertChain by cert and caChain void appendCert(char* cert, char* caChain, char* fullCertChain) { FILE* fpw = fopen(fullCertChain,"w"); FILE* fpr = fopen(cert,"r"); int size; char* buffer; fseek(fpr, 0, SEEK_END); size = ftell(fpr); buffer = (char*)malloc(size + 1); memset(buffer, 0, size + 1); fseek(fpr, 0, SEEK_SET); fread(buffer, size, 1, fpr); fputs(buffer, fpw); fclose(fpr); free(buffer); fputs("\n", fpw); fpr = fopen(caChain,"r"); fseek(fpr, 0, SEEK_END); size = ftell(fpr); buffer = (char*)malloc(size + 1); memset(buffer, 0, size + 1); fseek(fpr, 0, SEEK_SET); fread(buffer, size, 1, fpr); fputs(buffer, fpw); fputs("\n", fpw); free(buffer); fclose(fpr); fclose(fpw); } void delivered( void *context, MQTTClient_deliveryToken dt ) { printf(">>> Publish to IoT server.\n"); deliveredtoken = dt; } int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) { char* payload = (char*)message->payload; printf("<<< Subscribe from IoT server. topic : %s , message : %s\n", topicName, payload); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; } void connlost( void *context, char *cause ) { printf("\nConnection lost\n"); printf(" cause: %s\n", cause); } int main(int argc, char *argv[]) { char clientId[] = "myClientTest"; // Modify file path and server // MQTT server hostname char iotServer[] = "ssl://msg01.cloudiot.ntruss.com:8883"; char rootCaCertFilePath[] = "/<Your>/<file>/<path>/rootCaCert.pem"; char clientCaChainFilePath[] = "/<Your>/<file>/<path>/caChain.pem"; char clientCertFilePath[] = "/<Your>/<file>/<path>/cert.pem"; char clientKeyFilePath[] = "/<Your>/<file>/<path>/private.pem"; // fullCertChain is automatically generated by cert and caChain char fullCertChainPath[] = "./fullCertChain.pem"; char password[] = ""; // Topic & Message Example char sendTopic[] = "factory/room1/temperature"; char alertTopic[] = "alert"; char payload[] = "{ \"deviceId\": \"device_1\", \"deviceType\": \"temperature\", \"value\": 35, \"battery\": 9, \"date\": \"2016-12-15\", \"time\": \"15:12:00\"}"; MQTTClient client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; int rc; int setCallBack; int attempts; MQTTClient_create(&client, iotServer, clientId, MQTTCLIENT_PERSISTENCE_NONE, NULL); // Generate fullCertChain by cert and caChain appendCert(clientCertFilePath, clientCaChainFilePath, fullCertChainPath); // MQTT Connection parameters ssl_opts.verify = 1; ssl_opts.trustStore = rootCaCertFilePath; ssl_opts.keyStore = fullCertChainPath; ssl_opts.privateKey = clientKeyFilePath; conn_opts.ssl = &ssl_opts; conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.serverURIcount = 0; conn_opts.serverURIs = NULL; // Set call back if((setCallBack = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered)) != MQTTCLIENT_SUCCESS) { printf("ERROR : MQTTClient_setCallbacks, setCallBack = %d\n", setCallBack); return setCallBack; } // Connect MQTT Broker if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("ERROR : Failed to connect, return code : %d\n", rc); exit(-1); } // Subscribe message rc = MQTTClient_subscribe(client, alertTopic, 0); pubmsg.payload = payload; pubmsg.payloadlen = strlen(payload); pubmsg.qos = QOS; pubmsg.retained = 0; for(attempts=0; attempts<5; attempts++){ // Publish message MQTTClient_publishMessage(client, sendTopic, &pubmsg, &token); rc = MQTTClient_waitForCompletion(client, token, TIMEOUT); sleep(2); //sec } sleep(2); //sec // Disconnect MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); return rc; }
サンプルコードの実行
Cサンプルコードを実行する方法は次の通りです。
$ gcc -o {executable file name} {source file name}.c -l paho-mqtt3cs
$ ./{executable file name}
動作結果
Cサンプルコードの動作結果は以下の通りです。
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert , message : {"deviceType":"temperature","date":"2016-12-15","time":"15:12:00","battery":9,"deviceId":"device_1","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert , message : {"deviceType":"temperature","date":"2016-12-15","time":"15:12:00","battery":9,"deviceId":"device_1","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert , message : {"deviceType":"temperature","date":"2016-12-15","time":"15:12:00","battery":9,"deviceId":"device_1","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert , message : {"deviceType":"temperature","date":"2016-12-15","time":"15:12:00","battery":9,"deviceId":"device_1","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert , message : {"deviceType":"temperature","date":"2016-12-15","time":"15:12:00","battery":9,"deviceId":"device_1","value":35}
C++
Cloud IoT Coreにアクセスし、計5回のメッセージの発行および購読を繰り返してからプログラムが終了するサンプルコードです。メッセージの発行は factory/room1/temperature
トピックで発生し、Cloud IoT Coreを経て再発行された alertトピックを購読します。
環境設定
このサンプルファイルを実行するためには Paho MQTTライブラリのインストールが必要です。
CentOS
$ sudo yum install build-essential gcc make cmake cmake-gui cmake-curses-gui $ sudo yum install openssl-devel $ git clone https://github.com/eclipse/paho.mqtt.c $ cd paho.mqtt.c $ git checkout v1.3.8 $ make $ sudo make install
Ubuntu
$ sudo apt-get install build-essential gcc make cmake cmake-gui cmake-curses-gui $ sudo apt-get install libssl-dev $ git clone https://github.com/eclipse/paho.mqtt.c.git $ cd paho.mqtt.c $ git checkout v1.3.8 $ cmake -Bbuild -H. -DPAHO_ENABLE_TESTING=OFF -DPAHO_BUILD_STATIC=ON \ -DPAHO_WITH_SSL=ON -DPAHO_HIGH_PERFORMANCE=ON $ sudo cmake --build build/ --target install $ sudo ldconfig
サンプルファイル内の Full Certificate Chain Path値は、証明書とチェーン証明書を接続した Full Certificate Chainファイルを保存するパスを意味します。サンプルを実行すると、Full Certificate Chainファイルが設定されたパスに自動的に別途保存されます。
サンプルコード
- mqttClient.cpp
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <iostream> #include <fstream> #include "MQTTClient.h" #define QOS 1 #define TIMEOUT 10000L volatile MQTTClient_deliveryToken deliveredtoken; // Generate fullCertChain by cert and caChain void appendCert(char* cert, char* caChain, char* fullCertChain) { std::ifstream fin; std::ofstream fout; int size; char* buffer; fin.open(cert); fout.open(fullCertChain); fin.seekg(0,std::ios::end); size = fin.tellg(); fin.seekg(0, std::ios::beg); buffer = (char*)malloc(size + 1); memset(buffer, 0, size + 1); fin.read(&buffer[0], size); fout << buffer << std::endl; free(buffer); fin.close(); fin.open(caChain); fin.seekg(0,std::ios::end); size = fin.tellg(); fin.seekg(0, std::ios::beg); buffer = (char*)malloc(size + 1); memset(buffer, 0, size + 1); fin.read(&buffer[0], size); fout << buffer << std::endl; free(buffer); fin.close(); fout.close(); } void delivered( void *context, MQTTClient_deliveryToken dt ) { std::cout << ">>> Publish to IoT server." << std::endl; deliveredtoken = dt; } int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message) { char* payload = (char*)message->payload; std::cout << "<<< Subscribe from IoT server. topic : " << topicName << " , message : " << payload << std::endl; MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; } void connlost( void *context, char *cause ) { std::cout << std::endl << "Connection lost" << std::endl; std::cout << " cause: " << cause << std::endl; } int main(int argc, char *argv[]) { char clientId[] = "myClientTest"; // Modify file path and server // MQTT server hostname char iotServer[] = "ssl://msg01.cloudiot.ntruss.com:8883"; char rootCaCertFilePath[] = "/<Your>/<file>/<path>/rootCaCert.pem"; char clientCaChainFilePath[] = "/<Your>/<file>/<path>/caChain.pem"; char clientCertFilePath[] = "/<Your>/<file>/<path>/cert.pem"; char clientKeyFilePath[] = "/<Your>/<file>/<path>/private.pem"; // fullCertChain is automatically generated by cert and caChain char fullCertChainPath[] = "./fullCertChain.pem"; char password[] = ""; // Topic & Message Example char sendTopic[] = "factory/room1/temperature"; char alertTopic[] = "alert"; char payload[] = "{ \"deviceId\": \"device_1\", \"deviceType\": \"temperature\", \"value\": 35, \"battery\": 9, \"date\": \"2016-12-15\", \"time\": \"15:12:00\"}"; MQTTClient client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_SSLOptions ssl_opts = MQTTClient_SSLOptions_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; int rc; int setCallBack; int attempts; MQTTClient_create(&client, iotServer, clientId, MQTTCLIENT_PERSISTENCE_NONE, NULL); // Generate fullCertChain by cert and caChain appendCert(clientCertFilePath, clientCaChainFilePath, fullCertChainPath); // MQTT Connection parameters ssl_opts.verify = 1; ssl_opts.trustStore = rootCaCertFilePath; ssl_opts.keyStore = fullCertChainPath; ssl_opts.privateKey = clientKeyFilePath; conn_opts.ssl = &ssl_opts; conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.serverURIcount = 0; conn_opts.serverURIs = NULL; // Set call back if((setCallBack = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered)) != MQTTCLIENT_SUCCESS) { std::cout << "ERROR : MQTTClient_setCallbacks, setCallBack = " << setCallBack << std::endl; return setCallBack; } // Connect MQTT Broker if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { std::cout << "ERROR : Failed to connect, return code : " << rc << std::endl; exit(-1); } // Subscribe message rc = MQTTClient_subscribe(client, alertTopic, 0); pubmsg.payload = payload; pubmsg.payloadlen = strlen(payload); pubmsg.qos = QOS; pubmsg.retained = 0; for(attempts=0; attempts<5; attempts++){ // Publish message MQTTClient_publishMessage(client, sendTopic, &pubmsg, &token); rc = MQTTClient_waitForCompletion(client, token, TIMEOUT); sleep(2); //sec } sleep(2); //sec // Disconnect MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); return rc; }
サンプルコードの実行
C++サンプルコードを実行する方法は次の通りです。
$ g++ -o {executable file name} {source file name}.cpp -l paho-mqtt3cs
$ ./{executable file name}
動作結果
C++サンプルコードの作成結果は以下のとおりです。
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert , message : {"deviceType":"temperature","date":"2016-12-15","time":"15:12:00","battery":9,"deviceId":"device_1","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert , message : {"deviceType":"temperature","date":"2016-12-15","time":"15:12:00","battery":9,"deviceId":"device_1","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert , message : {"deviceType":"temperature","date":"2016-12-15","time":"15:12:00","battery":9,"deviceId":"device_1","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert , message : {"deviceType":"temperature","date":"2016-12-15","time":"15:12:00","battery":9,"deviceId":"device_1","value":35}
>>> Publish to IoT server.
<<< Subscribe from IoT server. topic : alert , message : {"deviceType":"temperature","date":"2016-12-15","time":"15:12:00","battery":9,"deviceId":"device_1","value":35}