MQTT

Message Queuing Telemetry Transport,消息队列遥测传输协议

Posted by LXG on December 27, 2019

mqtt.org-官网

MQTT中文网

MQTT协议3.1.1

MQTT实战

MQTT Wiki-github

概述

mqtt_concept

协议流程图

mqtt_process

服务器搭建

实战

下载apache-apollo

apache-apollo-1.7.1-unix-distro.tar.gz

apollo_download

查看帮助信息

./apollo


ProgramFiles/apache-apollo-1.7.1/bin$ ./apollo
usage: apollo [--log <log_level>] <command> [<args>]

The most commonly used apollo commands are:
    create           creates a new broker instance
    disk-benchmark   Benchmarks your disk's speed
    help             Display help information
    version          Displays the broker version

See 'apollo help <command>' for more information on a specific command.

创建服务器实例

./apollo create sunmi_broker


ProgramFiles/apache-apollo-1.7.1/bin$ ./apollo create sunmi_broker
Creating apollo instance at: sunmi_broker
Generating ssl keystore...

You can now start the broker by executing:

   "/home/lixiaogang/sunmi/ProgramFiles/apache-apollo-1.7.1/bin/sunmi_broker/bin/apollo-broker" run

Or you can setup the broker as system service and run it in the background:

   sudo ln -s "/home/lixiaogang/sunmi/ProgramFiles/apache-apollo-1.7.1/bin/sunmi_broker/bin/apollo-broker-service" /etc/init.d/
   /etc/init.d/apollo-broker-service start

运行服务器

./bin/apollo-broker run


ProgramFiles/apache-apollo-1.7.1/bin/sunmi_broker$ ./bin/apollo-broker run

    _____                .__  .__
   /  _  \ ______   ____ |  | |  |   ____
  /  /_\  \\____ \ /  _ \|  | |  |  /  _ \
 /    |    \  |_> >  <_> )  |_|  |_(  <_> )
 \____|__  /   __/ \____/|____/____/\____/
         \/|__|  Apache Apollo (1.7.1)


Loading configuration file '/home/lixiaogang/sunmi/ProgramFiles/apache-apollo-1.7.1/bin/sunmi_broker/etc/apollo.xml'.
INFO  | OS     : Linux 4.4.0-170-generic (Ubuntu 16.04.6 LTS)
INFO  | JVM    : OpenJDK 64-Bit Server VM 1.7.0_95 (Oracle Corporation)
INFO  | Apollo : 1.7.1 (at: /home/lixiaogang/sunmi/ProgramFiles/apache-apollo-1.7.1)
INFO  | OS is restricting the open file limit to: 100000
INFO  | Starting store: leveldb store at /home/lixiaogang/sunmi/ProgramFiles/apache-apollo-1.7.1/bin/sunmi_broker/data
INFO  | Accepting connections at: tcp://0.0.0.0:61613
INFO  | Accepting connections at: tls://0.0.0.0:61614
INFO  | Accepting connections at: ws://0.0.0.0:61623/
INFO  | Accepting connections at: wss://0.0.0.0:61624/
INFO  | Administration interface available at: https://127.0.0.1:61681/
INFO  | Administration interface available at: http://127.0.0.1:61680/

服务器配置

mqtt_etc

密码配置:users.properties

浏览器登录

http://127.0.0.1:61680/console/index.html

Username: admin Password: password

apolo_login

服务器操作

apollo_server

Android客户端

Paho Android Service

Android中使用MQTT需要使用到Paho Android Service库,Paho Android Service是一个用Java编写的MQTT客户端库

build.gradle


repositories {
    maven {
        url 'http://maven.aliyun.com/nexus/content/groups/public/'
        url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
    }
}


dependencies {
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}

AndroidManifest.xml


    <uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
    <uses-permission android:name="android.permission.WAKE_LOCK" />

    <service android:name="org.eclipse.paho.android.service.MqttService" /> <!--MqttService-->
    <service android:name="com.dongyk.service.MyMqttService"/> <!--MyMqttService-->

MqttService


public class MqttService extends Service implements MqttTraceHandler {

  // 连接服务器
  public void connect(String clientHandle, MqttConnectOptions connectOptions,
      String invocationContext, String activityToken)
      throws MqttSecurityException, MqttException {
	  MqttConnection client = getConnection(clientHandle);
	  client.connect(connectOptions, null, activityToken);

  }

  // 发布消息
  public IMqttDeliveryToken publish(String clientHandle, String topic,
      byte[] payload, int qos, boolean retained,
      String invocationContext, String activityToken)
      throws MqttPersistenceException, MqttException {
    MqttConnection client = getConnection(clientHandle);
    return client.publish(topic, payload, qos, retained, invocationContext,
        activityToken);
  }

  // 订阅消息
  public void subscribe(String clientHandle, String topic, int qos,
      String invocationContext, String activityToken) {
    MqttConnection client = getConnection(clientHandle);
    client.subscribe(topic, qos, invocationContext, activityToken);
  }

}

连接服务器


public class MyMqttService extends Service {

    public final   String             TAG            = MyMqttService.class.getSimpleName();
    private static MqttAndroidClient  mqttAndroidClient;
    private        MqttConnectOptions mMqttConnectOptions;
    public         String             HOST           = "tcp://172.16.17.214:61613";//服务器地址(协议+地址+端口号)
    public         String             USERNAME       = "admin";//用户名
    public         String             PASSWORD       = "password";//密码
    public static  String             PUBLISH_TOPIC  = "tourist_enter";//发布主题
    public static  String             RESPONSE_TOPIC = "message_arrived";//响应主题
    @SuppressLint("MissingPermission")
    public         String             CLIENTID       = Build.VERSION.SDK_INT >= Build.VERSION_CODES.O
            ? Build.getSerial() : Build.SERIAL;//客户端ID,一般以客户端唯一标识符表示,这里用设备序列号表示


    private void init() {
        String serverURI = HOST; //服务器地址(协议+地址+端口号)
        mqttAndroidClient = new MqttAndroidClient(this, serverURI, CLIENTID);
        mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调
        mMqttConnectOptions = new MqttConnectOptions();
        mMqttConnectOptions.setCleanSession(true); //设置是否清除缓存
        mMqttConnectOptions.setConnectionTimeout(10); //设置超时时间,单位:秒
        mMqttConnectOptions.setKeepAliveInterval(20); //设置心跳包发送间隔,单位:秒
        mMqttConnectOptions.setUserName(USERNAME); //设置用户名
        mMqttConnectOptions.setPassword(PASSWORD.toCharArray()); //设置密码
    }

    /**
     * 连接MQTT服务器
     */
    private void doClientConnection() {
        if (!mqttAndroidClient.isConnected() && isConnectIsNomarl()) {
            try {
                mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    //订阅主题的回调
    private MqttCallback mqttCallback = new MqttCallback() {

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            Log.i(TAG, "收到消息: " + new String(message.getPayload()));
            response("message arrived");
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {

        }

        @Override
        public void connectionLost(Throwable arg0) {
            Log.i(TAG, "连接断开 ");
            doClientConnection();//连接断开,重连
        }
    };

}

发布消息


    /**
     * 发布 (模拟其他客户端发布消息)
     *
     * @param message 消息
     */
    public static void publish(String message) {
        String topic = PUBLISH_TOPIC;
        Integer qos = 2;
        Boolean retained = false;
        try {
            //参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息
            mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

订阅消息


    private IMqttActionListener iMqttActionListener = new IMqttActionListener() {

        @Override
        public void onSuccess(IMqttToken arg0) {
            Log.i(TAG, "连接成功 ");
            try {
                mqttAndroidClient.subscribe(PUBLISH_TOPIC, 2);//订阅主题,参数:主题、服务质量
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void onFailure(IMqttToken arg0, Throwable arg1) {
            arg1.printStackTrace();
            Log.i(TAG, "连接失败 ");
            doClientConnection();//连接失败,重连(可关闭服务器进行模拟)
        }
    };

客户端线程


USER      PID   PPID  VSIZE  RSS   WCHAN            PC  NAME
u0_a98    27387 1041  1004852 49584 SyS_epoll_ 00000000 S com.wildma.mqttandroidclient
u0_a98    27392 27387 1004852 49584 futex_wait 00000000 S Jit thread pool
u0_a98    27393 27387 1004852 49584 do_sigtime 00000000 S Signal Catcher
u0_a98    27394 27387 1004852 49584 poll_sched 00000000 S JDWP
u0_a98    27395 27387 1004852 49584 futex_wait 00000000 S ReferenceQueueD
u0_a98    27396 27387 1004852 49584 futex_wait 00000000 S FinalizerDaemon
u0_a98    27397 27387 1004852 49584 futex_wait 00000000 S FinalizerWatchd
u0_a98    27398 27387 1004852 49584 futex_wait 00000000 S HeapTaskDaemon
u0_a98    27399 27387 1004852 49584 binder_thr 00000000 S Binder:27387_1
u0_a98    27400 27387 1004852 49584 binder_thr 00000000 S Binder:27387_2
u0_a98    27401 27387 1004852 49584 futex_wait 00000000 S Profile Saver
u0_a98    27403 27387 1004852 49584 SyS_epoll_ 00000000 S RenderThread
u0_a98    27405 27387 1004852 49584 futex_wait 00000000 S hwuiTask1
u0_a98    27406 27387 1004852 49584 futex_wait 00000000 S hwuiTask2
u0_a98    27407 27387 1004852 49584 binder_thr 00000000 S Binder:27387_3
u0_a98    27409 27387 1004852 49584 sk_wait_da 00000000 S MQTT Rec: LA01K  // 接收
u0_a98    27410 27387 1004852 49584 futex_wait 00000000 S MQTT Snd: LA01K  // 发送
u0_a98    27411 27387 1004852 49584 futex_wait 00000000 S MQTT Call: LA01

客户端connect

mqtt_connect

mqttv3 工作原理

org.eclipse.paho.client.mqttv3 源码解析(1)

mqtt_publish

服务器状态

连接设备

mqtt_connectors

订阅主题

mqtt_topics