概述
协议流程图
服务器搭建
下载apache-apollo
apache-apollo-1.7.1-unix-distro.tar.gz
查看帮助信息
./apollo
ProgramFiles/apache-apollo-1.7.1/bin$ ./apollo
usage: apollo [--log <log_level>] <command> [<args>]
The most commonly used apollo commands are:
create creates a new broker instance
disk-benchmark Benchmarks your disk's speed
help Display help information
version Displays the broker version
See 'apollo help <command>' for more information on a specific command.
创建服务器实例
./apollo create sunmi_broker
ProgramFiles/apache-apollo-1.7.1/bin$ ./apollo create sunmi_broker
Creating apollo instance at: sunmi_broker
Generating ssl keystore...
You can now start the broker by executing:
"/home/lixiaogang/sunmi/ProgramFiles/apache-apollo-1.7.1/bin/sunmi_broker/bin/apollo-broker" run
Or you can setup the broker as system service and run it in the background:
sudo ln -s "/home/lixiaogang/sunmi/ProgramFiles/apache-apollo-1.7.1/bin/sunmi_broker/bin/apollo-broker-service" /etc/init.d/
/etc/init.d/apollo-broker-service start
运行服务器
./bin/apollo-broker run
ProgramFiles/apache-apollo-1.7.1/bin/sunmi_broker$ ./bin/apollo-broker run
_____ .__ .__
/ _ \ ______ ____ | | | | ____
/ /_\ \\____ \ / _ \| | | | / _ \
/ | \ |_> > <_> ) |_| |_( <_> )
\____|__ / __/ \____/|____/____/\____/
\/|__| Apache Apollo (1.7.1)
Loading configuration file '/home/lixiaogang/sunmi/ProgramFiles/apache-apollo-1.7.1/bin/sunmi_broker/etc/apollo.xml'.
INFO | OS : Linux 4.4.0-170-generic (Ubuntu 16.04.6 LTS)
INFO | JVM : OpenJDK 64-Bit Server VM 1.7.0_95 (Oracle Corporation)
INFO | Apollo : 1.7.1 (at: /home/lixiaogang/sunmi/ProgramFiles/apache-apollo-1.7.1)
INFO | OS is restricting the open file limit to: 100000
INFO | Starting store: leveldb store at /home/lixiaogang/sunmi/ProgramFiles/apache-apollo-1.7.1/bin/sunmi_broker/data
INFO | Accepting connections at: tcp://0.0.0.0:61613
INFO | Accepting connections at: tls://0.0.0.0:61614
INFO | Accepting connections at: ws://0.0.0.0:61623/
INFO | Accepting connections at: wss://0.0.0.0:61624/
INFO | Administration interface available at: https://127.0.0.1:61681/
INFO | Administration interface available at: http://127.0.0.1:61680/
服务器配置
密码配置:users.properties
浏览器登录
http://127.0.0.1:61680/console/index.html
Username: admin Password: password
服务器操作
Android客户端
Android中使用MQTT需要使用到Paho Android Service库,Paho Android Service是一个用Java编写的MQTT客户端库
build.gradle
repositories {
maven {
url 'http://maven.aliyun.com/nexus/content/groups/public/'
url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
}
}
dependencies {
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
AndroidManifest.xml
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<service android:name="org.eclipse.paho.android.service.MqttService" /> <!--MqttService-->
<service android:name="com.dongyk.service.MyMqttService"/> <!--MyMqttService-->
MqttService
public class MqttService extends Service implements MqttTraceHandler {
// 连接服务器
public void connect(String clientHandle, MqttConnectOptions connectOptions,
String invocationContext, String activityToken)
throws MqttSecurityException, MqttException {
MqttConnection client = getConnection(clientHandle);
client.connect(connectOptions, null, activityToken);
}
// 发布消息
public IMqttDeliveryToken publish(String clientHandle, String topic,
byte[] payload, int qos, boolean retained,
String invocationContext, String activityToken)
throws MqttPersistenceException, MqttException {
MqttConnection client = getConnection(clientHandle);
return client.publish(topic, payload, qos, retained, invocationContext,
activityToken);
}
// 订阅消息
public void subscribe(String clientHandle, String topic, int qos,
String invocationContext, String activityToken) {
MqttConnection client = getConnection(clientHandle);
client.subscribe(topic, qos, invocationContext, activityToken);
}
}
连接服务器
public class MyMqttService extends Service {
public final String TAG = MyMqttService.class.getSimpleName();
private static MqttAndroidClient mqttAndroidClient;
private MqttConnectOptions mMqttConnectOptions;
public String HOST = "tcp://172.16.17.214:61613";//服务器地址(协议+地址+端口号)
public String USERNAME = "admin";//用户名
public String PASSWORD = "password";//密码
public static String PUBLISH_TOPIC = "tourist_enter";//发布主题
public static String RESPONSE_TOPIC = "message_arrived";//响应主题
@SuppressLint("MissingPermission")
public String CLIENTID = Build.VERSION.SDK_INT >= Build.VERSION_CODES.O
? Build.getSerial() : Build.SERIAL;//客户端ID,一般以客户端唯一标识符表示,这里用设备序列号表示
private void init() {
String serverURI = HOST; //服务器地址(协议+地址+端口号)
mqttAndroidClient = new MqttAndroidClient(this, serverURI, CLIENTID);
mqttAndroidClient.setCallback(mqttCallback); //设置监听订阅消息的回调
mMqttConnectOptions = new MqttConnectOptions();
mMqttConnectOptions.setCleanSession(true); //设置是否清除缓存
mMqttConnectOptions.setConnectionTimeout(10); //设置超时时间,单位:秒
mMqttConnectOptions.setKeepAliveInterval(20); //设置心跳包发送间隔,单位:秒
mMqttConnectOptions.setUserName(USERNAME); //设置用户名
mMqttConnectOptions.setPassword(PASSWORD.toCharArray()); //设置密码
}
/**
* 连接MQTT服务器
*/
private void doClientConnection() {
if (!mqttAndroidClient.isConnected() && isConnectIsNomarl()) {
try {
mqttAndroidClient.connect(mMqttConnectOptions, null, iMqttActionListener);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
//订阅主题的回调
private MqttCallback mqttCallback = new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.i(TAG, "收到消息: " + new String(message.getPayload()));
response("message arrived");
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
}
@Override
public void connectionLost(Throwable arg0) {
Log.i(TAG, "连接断开 ");
doClientConnection();//连接断开,重连
}
};
}
发布消息
/**
* 发布 (模拟其他客户端发布消息)
*
* @param message 消息
*/
public static void publish(String message) {
String topic = PUBLISH_TOPIC;
Integer qos = 2;
Boolean retained = false;
try {
//参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息
mqttAndroidClient.publish(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
} catch (MqttException e) {
e.printStackTrace();
}
}
订阅消息
private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken arg0) {
Log.i(TAG, "连接成功 ");
try {
mqttAndroidClient.subscribe(PUBLISH_TOPIC, 2);//订阅主题,参数:主题、服务质量
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
arg1.printStackTrace();
Log.i(TAG, "连接失败 ");
doClientConnection();//连接失败,重连(可关闭服务器进行模拟)
}
};
客户端线程
USER PID PPID VSIZE RSS WCHAN PC NAME
u0_a98 27387 1041 1004852 49584 SyS_epoll_ 00000000 S com.wildma.mqttandroidclient
u0_a98 27392 27387 1004852 49584 futex_wait 00000000 S Jit thread pool
u0_a98 27393 27387 1004852 49584 do_sigtime 00000000 S Signal Catcher
u0_a98 27394 27387 1004852 49584 poll_sched 00000000 S JDWP
u0_a98 27395 27387 1004852 49584 futex_wait 00000000 S ReferenceQueueD
u0_a98 27396 27387 1004852 49584 futex_wait 00000000 S FinalizerDaemon
u0_a98 27397 27387 1004852 49584 futex_wait 00000000 S FinalizerWatchd
u0_a98 27398 27387 1004852 49584 futex_wait 00000000 S HeapTaskDaemon
u0_a98 27399 27387 1004852 49584 binder_thr 00000000 S Binder:27387_1
u0_a98 27400 27387 1004852 49584 binder_thr 00000000 S Binder:27387_2
u0_a98 27401 27387 1004852 49584 futex_wait 00000000 S Profile Saver
u0_a98 27403 27387 1004852 49584 SyS_epoll_ 00000000 S RenderThread
u0_a98 27405 27387 1004852 49584 futex_wait 00000000 S hwuiTask1
u0_a98 27406 27387 1004852 49584 futex_wait 00000000 S hwuiTask2
u0_a98 27407 27387 1004852 49584 binder_thr 00000000 S Binder:27387_3
u0_a98 27409 27387 1004852 49584 sk_wait_da 00000000 S MQTT Rec: LA01K // 接收
u0_a98 27410 27387 1004852 49584 futex_wait 00000000 S MQTT Snd: LA01K // 发送
u0_a98 27411 27387 1004852 49584 futex_wait 00000000 S MQTT Call: LA01
客户端connect
mqttv3 工作原理
org.eclipse.paho.client.mqttv3 源码解析(1)