MQTT C

T113 Linux

Posted by LXG on January 8, 2024

Github

paho.mqtt.c


github/paho.mqtt.c$ tree -L 1
.
├── about.html
├── android
├── appveyor.yml
├── build
├── build.xml
├── cbuild.bat
├── cmake
├── CMakeLists.txt
├── CMakeLists.txt.user
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── deploy_rsa.enc
├── dist
├── doc
├── docs
├── edl-v10
├── epl-v20
├── LICENSE
├── Makefile
├── NOTICE
├── notice.html
├── PULL_REQUEST_TEMPLATE.md
├── README.md
├── SECURITY.md
├── src
├── test
├── test_package
├── travis-build.sh
├── travis-deploy.sh
├── travis-install.sh
├── travis-setup-deploy.sh
├── version.major
├── version.minor
└── version.patch

编译和安装

修改编译脚本

PAHO_WITH_SSL = true

PAHO_BUILD_SAMPLES = true


## build options
SET(PAHO_WITH_SSL TRUE CACHE BOOL "Flag that defines whether to build ssl-enabled binaries too. ")
SET(PAHO_WITH_LIBUUID FALSE CACHE BOOL "Flag that defines whether libuuid or a custom uuid implementation should be used")
SET(PAHO_BUILD_SHARED TRUE CACHE BOOL "Build shared library")
SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library")
SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML based API documentation (requires Doxygen)")
SET(PAHO_BUILD_SAMPLES TRUE CACHE BOOL "Build sample programs")
SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package")
SET(PAHO_ENABLE_TESTING TRUE CACHE BOOL "Build tests and run")
SET(PAHO_ENABLE_CPACK TRUE CACHE BOOL "Enable CPack")
SET(PAHO_HIGH_PERFORMANCE FALSE CACHE BOOL "Disable tracing and heap tracking")
SET(PAHO_USE_SELECT FALSE CACHE BOOL "Revert to select system call instead of poll")

编译步骤


1. git clone https://github.com/eclipse/paho.mqtt.c.git

2. cd paho.mqtt.c.git

3. mkdir build

4. cd build

5. cmake .. 生成makefile

6. make 使用make命令构建mqtt c库

7. sudo make install 安装到系统目录中

ubuntu安装目录


github/paho.mqtt.c/build$ ls -al /usr/local/lib/libpaho-mqtt3*
lrwxrwxrwx 1 root root     19  1月  5 10:18 /usr/local/lib/libpaho-mqtt3a.so -> libpaho-mqtt3a.so.1
lrwxrwxrwx 1 root root     24  1月  5 10:18 /usr/local/lib/libpaho-mqtt3a.so.1 -> libpaho-mqtt3a.so.1.3.13
-rw-r--r-- 1 root root 257040  1月  8 10:58 /usr/local/lib/libpaho-mqtt3a.so.1.3.13
lrwxrwxrwx 1 root root     20  1月  8 10:58 /usr/local/lib/libpaho-mqtt3as.so -> libpaho-mqtt3as.so.1
lrwxrwxrwx 1 root root     25  1月  8 10:58 /usr/local/lib/libpaho-mqtt3as.so.1 -> libpaho-mqtt3as.so.1.3.13
-rw-r--r-- 1 root root 284648  1月  8 10:58 /usr/local/lib/libpaho-mqtt3as.so.1.3.13
lrwxrwxrwx 1 root root     19  1月  5 10:18 /usr/local/lib/libpaho-mqtt3c.so -> libpaho-mqtt3c.so.1
lrwxrwxrwx 1 root root     24  1月  5 10:18 /usr/local/lib/libpaho-mqtt3c.so.1 -> libpaho-mqtt3c.so.1.3.13
-rw-r--r-- 1 root root 221888  1月  8 10:58 /usr/local/lib/libpaho-mqtt3c.so.1.3.13
lrwxrwxrwx 1 root root     20  1月  8 10:58 /usr/local/lib/libpaho-mqtt3cs.so -> libpaho-mqtt3cs.so.1
lrwxrwxrwx 1 root root     25  1月  8 10:58 /usr/local/lib/libpaho-mqtt3cs.so.1 -> libpaho-mqtt3cs.so.1.3.13
-rw-r--r-- 1 root root 257552  1月  8 10:58 /usr/local/lib/libpaho-mqtt3cs.so.1.3.13


so库源码


github/paho.mqtt.c/src$ tree -L 1
.
├── Base64.c
├── Base64.h
├── Clients.c
├── Clients.h
├── CMakeLists.txt
├── Heap.c
├── Heap.h
├── Keysight_ws_add
├── LinkedList.c
├── LinkedList.h
├── Log.c
├── Log.h
├── Messages.c
├── Messages.h
├── MQTTAsync.c
├── MQTTAsync.h
├── MQTTAsyncUtils.c
├── MQTTAsyncUtils.h
├── MQTTClient.c
├── MQTTClient.h
├── MQTTClientPersistence.h
├── MQTTExportDeclarations.h
├── MQTTPacket.c
├── MQTTPacket.h
├── MQTTPacketOut.c
├── MQTTPacketOut.h
├── MQTTPersistence.c
├── MQTTPersistenceDefault.c
├── MQTTPersistenceDefault.h
├── MQTTPersistence.h
├── MQTTProperties.c
├── MQTTProperties.h
├── MQTTProtocolClient.c
├── MQTTProtocolClient.h
├── MQTTProtocol.h
├── MQTTProtocolOut.c
├── MQTTProtocolOut.h
├── MQTTReasonCodes.c
├── MQTTReasonCodes.h
├── MQTTSubscribeOpts.h
├── MQTTTime.c
├── MQTTTime.h
├── MQTTVersion.c
├── mutex_type.h
├── OsWrapper.c
├── OsWrapper.h
├── Proxy.c
├── Proxy.h
├── samples
├── SHA1.c
├── SHA1.h
├── SocketBuffer.c
├── SocketBuffer.h
├── Socket.c
├── Socket.h
├── SSLSocket.c
├── SSLSocket.h
├── StackTrace.c
├── StackTrace.h
├── Thread.c
├── Thread.h
├── Tree.c
├── Tree.h
├── utf-8.c
├── utf-8.h
├── VersionInfo.h.in
├── WebSocket.c
└── WebSocket.h

samples

Demo源码


github/paho.mqtt.c/src/samples$ tree
.
├── CMakeLists.txt
├── MQTTAsync_publish.c
├── MQTTAsync_publish_time.c
├── MQTTAsync_subscribe.c
├── MQTTClient_publish_async.c
├── MQTTClient_publish.c
├── MQTTClient_subscribe.c
├── paho_c_pub.c
├── paho_cs_pub.c
├── paho_cs_sub.c
├── paho_c_sub.c
├── pubsub_opts.c
└── pubsub_opts.h

编译输出


github/paho.mqtt.c/build/src/samples$ tree -L 1
.
├── CMakeFiles
├── cmake_install.cmake
├── Makefile
├── MQTTAsync_publish
├── MQTTAsync_publish_time
├── MQTTAsync_subscribe
├── MQTTClient_publish
├── MQTTClient_publish_async
├── MQTTClient_subscribe
├── paho_c_pub
├── paho_cs_pub
├── paho_cs_sub
└── paho_c_sub

编译脚本


## compilation/linkage settings
INCLUDE_DIRECTORIES(
    .
    ${PROJECT_SOURCE_DIR}/src
    ${PROJECT_BINARY_DIR}
    )

# sample files c

IF (PAHO_BUILD_SHARED)
ADD_EXECUTABLE(paho_c_pub paho_c_pub.c pubsub_opts.c)
ADD_EXECUTABLE(paho_c_sub paho_c_sub.c pubsub_opts.c)
ADD_EXECUTABLE(paho_cs_pub paho_cs_pub.c pubsub_opts.c)
ADD_EXECUTABLE(paho_cs_sub paho_cs_sub.c pubsub_opts.c)

SET_TARGET_PROPERTIES(
     paho_c_pub paho_c_sub paho_cs_pub paho_cs_sub PROPERTIES
     COMPILE_DEFINITIONS "PAHO_MQTT_IMPORTS=1")

TARGET_LINK_LIBRARIES(paho_c_pub paho-mqtt3as)
TARGET_LINK_LIBRARIES(paho_c_sub paho-mqtt3as)
TARGET_LINK_LIBRARIES(paho_cs_pub paho-mqtt3cs)
TARGET_LINK_LIBRARIES(paho_cs_sub paho-mqtt3cs)

ADD_EXECUTABLE(MQTTAsync_subscribe MQTTAsync_subscribe.c)
ADD_EXECUTABLE(MQTTAsync_publish MQTTAsync_publish.c)
ADD_EXECUTABLE(MQTTAsync_publish_time MQTTAsync_publish_time.c)
ADD_EXECUTABLE(MQTTClient_subscribe MQTTClient_subscribe.c)
ADD_EXECUTABLE(MQTTClient_publish MQTTClient_publish.c)
ADD_EXECUTABLE(MQTTClient_publish_async MQTTClient_publish_async.c)

SET_TARGET_PROPERTIES(
     MQTTAsync_subscribe MQTTAsync_publish MQTTAsync_publish_time MQTTClient_subscribe MQTTClient_publish MQTTClient_publish_async PROPERTIES
     COMPILE_DEFINITIONS "PAHO_MQTT_IMPORTS=1")

TARGET_LINK_LIBRARIES(MQTTAsync_subscribe paho-mqtt3a)
TARGET_LINK_LIBRARIES(MQTTAsync_publish paho-mqtt3a)
TARGET_LINK_LIBRARIES(MQTTAsync_publish_time paho-mqtt3a)
TARGET_LINK_LIBRARIES(MQTTClient_subscribe paho-mqtt3c)
TARGET_LINK_LIBRARIES(MQTTClient_publish paho-mqtt3c)
TARGET_LINK_LIBRARIES(MQTTClient_publish_async paho-mqtt3c)

INSTALL(TARGETS paho_c_sub
                paho_c_pub
                paho_cs_sub
                paho_cs_pub
                MQTTAsync_subscribe
	              MQTTAsync_publish
	              MQTTAsync_publish_time
                MQTTClient_subscribe
                MQTTClient_publish
                MQTTClient_publish_async

    RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
    LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR})
ENDIF()

对应关系

执行 动态库 解释
MQTTAsync_subscribe paho-mqtt3a 异步消息订阅
MQTTAsync_publish paho-mqtt3a 异步消息推送
MQTTAsync_publish_time paho-mqtt3a 循环异步时间消息推送
MQTTClient_subscribe paho-mqtt3c 同步消息订阅
MQTTClient_publish paho-mqtt3c 同步消息推送
MQTTClient_publish_async paho-mqtt3c 同步消息推送 + 异步回调
paho_c_sub paho-mqtt3as 异步消息订阅 + SSL 加密
paho_c_pub paho-mqtt3as 异步消息推送 + SSL 加密
paho_cs_sub paho-mqtt3cs 同步消息订阅 + SSL 加密
paho_cs_pub paho-mqtt3cs 同步消息推送 + SSL 加密

samples 测试

产品发布一般使用的是异步消息 + SSL 加密的方式

paho_c_sub 和 paho_c_pub 测试


Usage: paho_c_sub [topicname] [-t topic] [-c connection] [-h host] [-p port]
       [-q qos] [-i clientid] [-u username] [-P password] [-k keepalive_timeout]
       [-V MQTT-version] [--quiet] [--trace trace-level]
       [-R] [--no-delimiter]
       [--will-topic topic] [--will-payload message] [--will-qos qos] [--will-retain]
       [--cafile filename] [--capath dirname] [--cert filename] [--key filename]
       [--keypass string] [--ciphers string] [--insecure]


设备A7BAMW320000062B发消息给A7BAMWFK110002201B

./usr/bin/paho_c_sub -h tcp://60.205.112.44 -p 1883 -t "client/android/A7BAMWFK110002201B" -i "client@@@A7BAMWFK110002201B" -u c8ba38e7efa3f779 -P 93848b831eb344cf

./usr/bin/paho_c_pub -h tcp://60.205.112.44 -p 1883 -t "client/android/A7BAMWFK110002201B" -i "client@@@A7BAMW320000062B" -u 6af17ba6c2ea46b7 -P 64f49ec8b875dc40 -m "Hello, World!"

paho_c_sub 源码


/**
 * 消息处理函数
 */
int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
	size_t delimlen = 0;

	if (opts.verbose)
		printf("%d %s\t", message->payloadlen, topicName);
	if (opts.delimiter)
		delimlen = strlen(opts.delimiter);
	if (opts.delimiter == NULL || (message->payloadlen > delimlen &&
		strncmp(opts.delimiter, &((char*)message->payload)[message->payloadlen - delimlen], delimlen) == 0))
		printf("%.*s", message->payloadlen, (char*)message->payload);
	else
		printf("%.*s%s", message->payloadlen, (char*)message->payload, opts.delimiter);
	if (message->struct_version == 1 && opts.verbose)
		logProperties(&message->properties);
	fflush(stdout);
	MQTTAsync_freeMessage(&message);
	MQTTAsync_free(topicName);
	return 1;
}

void reConnect() {
    printf("reConnect now\n");
    if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
    {
        if (!opts.quiet)
            fprintf(stderr, "reConnect---Failed to start connect, return code %s\n", MQTTAsync_strerror(rc));
    }
}

/**
 * 断开重连
 */
void connectionLost(void* context, char* cause) {
    printf("connectionLost: %s\n", cause);
    reConnect()
}

void deliveryComplete(void* context, MQTTAsync_token token) {
    printf("deliveryComplete\n");
}

/**
 * 订阅失败
 */
void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{
	if (!opts.quiet)
		fprintf(stderr, "Subscribe failed, rc %s\n",
			MQTTAsync_strerror(response->code));
    finished = 1;
}

/**
 * 连接失败后重试
 */
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
	if (!opts.quiet)
		fprintf(stderr, "Connect failed, rc %s\n", response ? MQTTAsync_strerror(response->code) : "none");
    //finished = 1;
    reConnect();
}


/**
 * 连接成功后订阅主题
 */
void onConnect5(void* context, MQTTAsync_successData5* response)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_callOptions copts = MQTTAsync_callOptions_initializer;
	int rc;

        // QoS 0: 至多一次(At most once)
        // QoS 1: 至少一次(At least once)
        // QoS 2: 只有一次(Exactly once)
	if (opts.verbose)
		printf("Subscribing to topic %s with client %s at QoS %d\n", opts.topic, opts.clientid, opts.qos);

	copts.onSuccess5 = onSubscribe5;
	copts.onFailure5 = onSubscribeFailure5;
	copts.context = client;
	if ((rc = MQTTAsync_subscribe(client, opts.topic, opts.qos, &copts)) != MQTTASYNC_SUCCESS)
	{
		if (!opts.quiet)
			fprintf(stderr, "Failed to start subscribe, return code %s\n", MQTTAsync_strerror(rc));
		finished = 1;
	}
}


int main(int argc, char** argv)
{
	MQTTAsync client;
	MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
	MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
	MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
	MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
	int rc = 0;
	
	//-------------------------------------------参数解析----------------------------------------
	if (getopts(argc, argv, &opts) != 0) {
            usage(&opts, (pubsub_opts_nameValue*)infos, program_name);
        }
        //--------------------------------------创建MQTTAsync对象------------------------------------
	rc = MQTTAsync_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE,
			NULL, &create_opts);

        //------------------------------------设置消息回调-------------------------------------------
	rc = MQTTAsync_setCallbacks(client, client, connectionLost, messageArrived, deliveryComplete);

        //--------------------------------------连接状态回调----------------------------------------
	if (opts.MQTTVersion == MQTTVERSION_5)
	{
		MQTTAsync_connectOptions conn_opts5 = MQTTAsync_connectOptions_initializer5;
		conn_opts = conn_opts5;
		conn_opts.onSuccess5 = onConnect5;
		conn_opts.onFailure5 = onConnectFailure5;
		conn_opts.cleanstart = 1;
	}
	else
	{
		conn_opts.onSuccess = onConnect;
		conn_opts.onFailure = onConnectFailure;
		conn_opts.cleansession = 1;
	}
	
	// keepAliveInterval(保持连接间隔)是一个重要的参数,它定义了客户端与服务器之间在无任何数据传输的情况下,为了维持TCP连接的活跃状态,客户端需要多长时间发送一次心跳(PINGREQ)消息
	conn_opts.keepAliveInterval = opts.keepalive;
	conn_opts.username = opts.username;
	conn_opts.password = opts.password;
	conn_opts.MQTTVersion = opts.MQTTVersion;
	conn_opts.context = client;
	
	// 用于控制客户端在与服务器断开连接后是否自动尝试重新建立连接。当设置为 true 时,如果客户端检测到与服务器的连接中断,它会按照一定的重连策略(如线性退避、指数退避等)自动尝试重新连接到服务器
	conn_opts.automaticReconnect = 1;
	conn_opts.httpProxy = opts.http_proxy;
	conn_opts.httpsProxy = opts.https_proxy;

        // 通过设置 will_topic 和 will_message,即使设备突然离线,其他 MQTT 客户端也能通过监听此 will_topic 获知该设备的离线状态或者接收到设备预先设定好的离线信息。
        // 这对于监控系统中的设备状态、确保数据完整性以及执行相应的故障恢复操作非常有用。
	if (opts.will_topic) 	/* will options */
	{
		will_opts.message = opts.will_payload;
		will_opts.topicName = opts.will_topic;
		will_opts.qos = opts.will_qos;
		will_opts.retained = opts.will_retain;
		conn_opts.will = &will_opts;
	}

        // 如果你关心数据安全性,应当优先考虑使用 wss:// 或 ssl://(或等同于 tls:// 的方案),它们能为MQTT连接提供加密保护。
        // 而 tcp:// 方案适用于不需要加密或者已通过其他手段保证网络环境安全的情况。
	if (opts.connection && (strncmp(opts.connection, "ssl://", 6) == 0 ||
			strncmp(opts.connection, "wss://", 6) == 0))
	{
		ssl_opts.verify = (opts.insecure) ? 0 : 1;
		ssl_opts.CApath = opts.capath;
		ssl_opts.keyStore = opts.cert;
		ssl_opts.trustStore = opts.cafile;
		ssl_opts.privateKey = opts.key;
		ssl_opts.privateKeyPassword = opts.keypass;
		ssl_opts.enabledCipherSuites = opts.ciphers;
		conn_opts.ssl = &ssl_opts;
	}

        // 连接MQTT
	if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
	{
		if (!opts.quiet)
			fprintf(stderr, "Failed to start connect, return code %s\n", MQTTAsync_strerror(rc));
		exit(EXIT_FAILURE);
	}

	while (!subscribed)
		mysleep(100);

	if (finished)
		goto exit;

	while (!finished)
		mysleep(100);

	disc_opts.onSuccess = onDisconnect;
	if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
	{
		if (!opts.quiet)
			fprintf(stderr, "Failed to start disconnect, return code: %s\n", MQTTAsync_strerror(rc));
		exit(EXIT_FAILURE);
	}

	while (!disconnected)
		mysleep(100);

exit:
	MQTTAsync_destroy(&client);

	return EXIT_SUCCESS;

}

paho_c_pub 源码


/**
 * 客户端连接
 */
void myconnect(MQTTAsync client)
{
        //-------------------------------------------------------------------

	if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
	{
		fprintf(stderr, "Failed to start connect, return code %s\n", MQTTAsync_strerror(rc));
		exit(EXIT_FAILURE);
	}
}

/**
 * 消息发布
 */
int mypublish(MQTTAsync client, int datalen, char* data)
{
	int rc;

    //if (opts.verbose)
		printf("Publishing data of length %d\n", datalen);

	rc = MQTTAsync_send(client, opts.topic, datalen, data, opts.qos, opts.retained, &pub_opts);
	if (opts.verbose && rc != MQTTASYNC_SUCCESS && !opts.quiet)
		fprintf(stderr, "Error from MQTTAsync_send: %s\n", MQTTAsync_strerror(rc));

	return rc;
}


/**
 * 连接回调
 */
void onConnect(void* context, MQTTAsync_successData* response)
{
	MQTTAsync client = (MQTTAsync)context;
	int rc = 0;

    //if (opts.verbose)
		printf("Connected\n");

	if (opts.null_message == 1)
		rc = mypublish(client, 0, "");
	else if (opts.message)
		rc = mypublish(client, (int)strlen(opts.message), opts.message);
	else if (opts.filename)
	{
		int data_len = 0;
		char* buffer = readfile(&data_len, &opts);

		if (buffer == NULL)
			toStop = 1;
		else
		{
			rc = mypublish(client, data_len, buffer);
			free(buffer);
		}
	}

	connected = 1;
}

void onPublish(void* context, MQTTAsync_successData* response)
{
    //if (opts.verbose)
		printf("Publish succeeded\n");

	if (opts.null_message || opts.message || opts.filename)
		toStop = 1;

	published = 1;
}

int main(int argc, char** argv)
{

	MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
	MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
	MQTTAsync client;
	char* buffer = NULL;
	char* url = NULL;
	int url_allocated = 0;
	int rc = 0;

	//-----------------------------------创建mqtt客户端-----------------------------------

        rc = MQTTAsync_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);

	//-----------------------------------设置MQTT 回调------------------------------------

	rc = MQTTAsync_setCallbacks(client, client, NULL, messageArrived, NULL);

	//-----------------------------------建立MQTT 连接------------------------------------

	myconnect(client);

	while (!toStop)
	{
	                //-----------------------------发布消息---------------------------------
			rc = mypublish(client, data_len, buffer);
	}

        //------------------------------断开 MQTT 连接,回收资源--------------------------------

	if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
	{
		if (!opts.quiet)
			fprintf(stderr, "Failed to start disconnect, return code: %s\n", MQTTAsync_strerror(rc));
		exit(EXIT_FAILURE);
	}

	while (!disconnected)
		mysleep(100);

	MQTTAsync_destroy(&client);

	if (url_allocated)
		free(url);

}