當(dāng)然如果有錢有顏,可以直接選擇云廠商產(chǎn)品(比如環(huán)信的聊天室方案和超級社區(qū)),如果有才有time,也可以選擇平替版MQTT實現(xiàn)方案。今天小猿將介紹用環(huán)信MQTT消息云實現(xiàn)應(yīng)用內(nèi)的世界頻道,滿滿干貨,不要錯過~~
使用MQTT實現(xiàn)世界頻道-Demo效果演示
協(xié)議優(yōu)勢:
在介紹具體方案之前,我們先嘮一嘮為啥選擇MQTT協(xié)議。
- 輕量級:MQTT本身是物聯(lián)網(wǎng)的連接協(xié)議,專為受限設(shè)備和低帶寬場景使用。所以其代碼占用空間較小,同樣適用于注重SDK大小的移動應(yīng)用領(lǐng)域(比如:游戲領(lǐng)域)。
- 易集成:MQTT作為標準開放的消息協(xié)議,經(jīng)過多年演進,已支持30多種開發(fā)語言,10余種SDK,無論何種開發(fā)環(huán)境,都可以快速找到開源SDK。
- 高并發(fā):MQTT是輕量級的消息傳輸協(xié)議,2字節(jié)心跳報文,最小化傳輸和連接成本,云廠商broker產(chǎn)品都可支持千萬級并發(fā)接入,適用于高并發(fā)連接場景。
- 低成本:MQTT是基于客戶端-服務(wù)器的訂閱/發(fā)布模型,通過服務(wù)器中間件實現(xiàn)消息分發(fā),減少消息復(fù)制成本,快速實現(xiàn)一對多在線推送。
- 靈活性:MQTT協(xié)議支持多種消息特性,包括:topic主題層級、消息分級(QoS0,1,2)、遺囑消息、保留消息等,可以靈活實現(xiàn)多種業(yè)務(wù)場景。
- 衍生功能:隨著MQTT云服務(wù)的發(fā)展,部分服務(wù)器廠商已支持消息存儲、獲取在線設(shè)備列表、查看歷史消息等衍生功能,降低開發(fā)工作量與消息存儲成本。
實現(xiàn)方案:
言歸正傳,上干貨。本次技術(shù)實現(xiàn)方案包含:移動客戶端(Android)、后端服務(wù)(Java)以及MQTT服務(wù)器。這里提一下,MQTT服務(wù)器使用環(huán)信MQTT消息云,使用三方云服務(wù)比較省心,既節(jié)省開發(fā)時間,產(chǎn)品性能也不需要擔(dān)心,現(xiàn)在注冊可以直接使用環(huán)信MQTT消息云超高額度的免費版:每月100并發(fā)連接、300萬消息,完全滿足功能開發(fā)使用。
客戶端實現(xiàn):
客戶端實現(xiàn)主要包含以下兩部分:
- 底層MQTT業(yè)務(wù)集成:包含引入SDK、MQTT方法封裝、業(yè)務(wù)交互(消息收發(fā))。
- APP上層交互:在APP首頁提供世界頻道入口,實現(xiàn)心情彈幕飄窗(接收)和發(fā)送。
接下來上底層MQTT業(yè)務(wù)集成代碼。
引入SDK:
這一步環(huán)信官方文檔比較明確,就是根據(jù)自己的平臺引入相應(yīng)的mqtt客戶端sdk,這里簡單貼一下AndroidStudio的引入配置
1// 在根目錄 build.gradle repositories 下加入配置
2maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" }
3...
4// 然后加入 MQTT 依賴
5// MQTT sdk https://docs-im.easemob.com/mqtt/qsandroidsdk
6implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
7implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
方法封裝
這里貼一下對mqtt相關(guān)方法的簡單封裝,代碼在vmmqtt模塊兒的MQTTHelper類下:
1 /**
2 * Create by lzan13 on 2022/3/22
3 * 描述:MQTT 幫助類
4 */
5 object MQTTHelper {
6
7 private var mqttClient: MqttAndroidClient? = null
8
9 // 緩存主題集合
10 private val topicList = mutableListOf()
11
12 /**
13 * 鏈接MQTT
14 * @param id 用戶 Id
15 * @param token 用戶鏈接 MQTT 的 Token
16 * @param topic 需要訂閱的主題,不為空就會在連接成功后進行訂閱
17 */
18 fun connect(id: String, token: String, topic: String = "") {
19 // 處理訂閱主題
20 if (topic.isNotEmpty()) topicList.add(topic)
21
22 // 拼接鏈接地址
23 val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"
24 // 拼接 clientId
25 val clientId = "${id}@${MQTTConstants.mqttAppId()}"
26 mqttClient = MqttAndroidClient(VMTools.context, url, clientId)
27
28 //連接參數(shù)
29 val options = MqttConnectOptions()
30 options.isAutomaticReconnect = true //設(shè)置自動重連
31 options.isCleanSession = true // 緩存
32 options.connectionTimeout = CConstants.timeMinute.toInt() // 設(shè)置超時時間,單位:秒
33 options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包發(fā)送間隔,單位:秒
34 options.userName = id // 用戶名
35 options.password = token.toCharArray() // 密碼
36 options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;
37 // 設(shè)置MQTT監(jiān)聽
38 mqttClient?.setCallback(object : MqttCallback {
39 override fun connectionLost(t: Throwable) {
40 // 通知鏈接斷開
41 VMLog.d("MQTT 鏈接斷開 $t")
42 }
43
44 @Throws(Exception::class)
45 override fun messageArrived(topic: String, message: MqttMessage) {
46 // 通知收到消息
47 VMLog.d("MQTT 收到消息:$message")
48 // 如果未訂閱則直接丟棄
49 if (!topicList.contains(topic)) return
50 notifyEvent(topic, String(message.payload))
51 }
52
53 override fun deliveryComplete(token: IMqttDeliveryToken) {}
54 })
55 //進行連接
56 mqttClient?.connect(options, null, object : IMqttActionListener {
57 override fun onSuccess(token: IMqttToken) {
58 VMLog.d("MQTT 鏈接成功")
59 // 鏈接成功,循環(huán)訂閱緩存的主題
60 topicList.forEach { subscribe(it) }
61 }
62
63 override fun onFailure(token: IMqttToken, t: Throwable) {
64 VMLog.d("MQTT 鏈接失敗 $t")
65 }
66 })
67 }
68
69 /**
70 * 訂閱主題
71 * @param topic 主題
72 */
73 fun subscribe(topic: String) {
74 if (!topicList.contains(topic)) {
75 topicList.add(topic)
76 }
77 try {
78 //連接成功后訂閱主題
79 mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {
80 override fun onSuccess(token: IMqttToken) {
81 VMLog.d("MQTT 訂閱成功 $topic")
82 }
83
84 override fun onFailure(token: IMqttToken, t: Throwable) {
85 VMLog.d("MQTT 訂閱失敗 $topic $t")
86 }
87 })
88 } catch (e: MqttException) {
89 e.printStackTrace()
90 }
91 }
92
93 /**
94 * 取消訂閱
95 * @param topic 主題
96 */
97 fun unsubscribe(topic: String) {
98 if (topicList.contains(topic)) {
99 topicList.remove(topic)
100 }
101 try {
102 mqttClient?.unsubscribe(topic)
103 } catch (e: MqttException) {
104 e.printStackTrace()
105 }
106 }
107
108 /**
109 * 發(fā)送 MQTT 消息
110 * @param topic 主題
111 * @param content 內(nèi)容
112 */
113 fun sendMsg(topic: String, content: String) {
114 val msg = MqttMessage()
115 msg.payload = content.encodeToByteArray() // 設(shè)置消息內(nèi)容
116 msg.qos = 0 //設(shè)置消息發(fā)送質(zhì)量,可為0,1,2.
117 // 設(shè)置消息的topic,并發(fā)送。
118 mqttClient?.publish(topic, msg, null, object : IMqttActionListener {
119 override fun onSuccess(asyncActionToken: IMqttToken) {
120 VMLog.d("MQTT 消息發(fā)送成功")
121 }
122
123 override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
124 VMLog.d("MQTT 消息發(fā)送失敗 ${exception.message}")
125 }
126 })
127 }
128
129 /**
130 * 通知 MQTT 事件
131 */
132 private fun notifyEvent(topic: String, data: String) {
133 LDEventBus.post(topic, data)
134 }
135 }
業(yè)務(wù)交互
和業(yè)務(wù)相關(guān)的就是在啟動APP后,使用后端服務(wù)器返回的鑒權(quán)token信息及連接封裝接口登錄環(huán)信通MQTT服務(wù)器,登錄成功后訂閱主題并監(jiān)聽消息。
1// 請求 token 成功后,調(diào)用MQTTHelper.connect()鏈接 MQTT 服務(wù)器,這里會同時傳遞監(jiān)聽的主題
2MQTTHelper.connect(mUser.id, token, MQTTConstants.Topic.newMatchInfo)
3
4/**
5 * 發(fā)送匹配信息
6 */
7private fun sendMatchInfo() {
8 if (selfMatch.user.nickname.isEmpty()) return
9 // 提交自己的匹配信息到服務(wù)器
10 mViewModel.submitMatch(selfMatch)
11 val json = JSONObject()
12 json.put("content", selfMatch.content)
13 json.put("emotion", selfMatch.emotion)
14 json.put("gender", selfMatch.gender)
15 json.put("type", selfMatch.type)
16 val jsonUser = JSONObject()
17 jsonUser.put("avatar", mUser.avatar)
18 jsonUser.put("id", mUser.id)
19 jsonUser.put("nickname", mUser.nickname)
20 jsonUser.put("username", mUser.username)
21 json.put("user", jsonUser)
22 MQTTHelper.sendMsg(MQTTConstants.Topic.newMatchInfo, json.toString())
23}
24
25// 監(jiān)聽消息這里使用了一個事件總線進行通知,在上邊封裝 MQTTHelper 發(fā)送消息也使用了這個,
26// 訂閱 MQTT 事件
27LDEventBus.observe(this, MQTTConstants.Topic.newMatchInfo, String::class.java) {
28 val match = JsonUtils.fromJson(it, Match::class.java)
29 // 這里收到匹配信息之后就增加一條彈幕
30 addBarrage(match)
31}
后端服務(wù)實現(xiàn)
接下來介紹后端服務(wù)實現(xiàn),主要包含以下兩部分:
- 配置連接信息:配置環(huán)信MQTT消息云連接信息。
- 獲取鑒權(quán)信息:獲取客戶端連接需要的鑒權(quán)信息。
配置連接信息
配置部分只需要按照環(huán)信后臺配置信息進行替換就好,配置在config目錄下的config.xxx.json文件內(nèi)
1/**
2 * Easemob MQTT 配置 https://console.easemob.com/app/generalizeMsg/overviewService
3 */
4config.mqtt = {
5 host: 'mqtt host', // MQTT 鏈接地址
6 appId: 'appId', // MQTT AppId
7 port: [ 1883, 1884, 80, 443 ], // MQTT 端口 1883(mqtt),1884(mqtts),80(ws),443(wss)
8 restHost: 'https://api.cn1.mqtt.chat/app/8igtc0', // MQTT 服務(wù) API 地址
9 clientId: 'client id', // 替換環(huán)信后臺 clientId
10 clientSecret: 'client secret', // 替換環(huán)信后臺 clientSecret
11};
獲取鑒權(quán)信息
這里主要是獲取客戶端連接所需要的鑒權(quán)信息token,為了安全token肯定是要放在服務(wù)器端生成的,廢話不多說,上代碼:
1/**
2 * Create by lzan13 on 2022/3/22
3 * 描述:MQTT 幫助類
4 */
5object MQTTHelper {
6
7 private var mqttClient: MqttAndroidClient? = null
8
9 // 緩存主題集合
10 private val topicList = mutableListOf()
11
12 /**
13 * 鏈接MQTT
14 * @param id 用戶 Id
15 * @param token 用戶鏈接 MQTT 的 Token
16 * @param topic 需要訂閱的主題,不為空就會在連接成功后進行訂閱
17 */
18 fun connect(id: String, token: String, topic: String = "") {
19 // 處理訂閱主題
20 if (topic.isNotEmpty()) topicList.add(topic)
21
22 // 拼接鏈接地址
23 val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"
24 // 拼接 clientId
25 val clientId = "${id}@${MQTTConstants.mqttAppId()}"
26 mqttClient = MqttAndroidClient(VMTools.context, url, clientId)
27
28 //連接參數(shù)
29 val options = MqttConnectOptions()
30 options.isAutomaticReconnect = true //設(shè)置自動重連
31 options.isCleanSession = true // 緩存
32 options.connectionTimeout = CConstants.timeMinute.toInt() // 設(shè)置超時時間,單位:秒
33 options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包發(fā)送間隔,單位:秒
34 options.userName = id // 用戶名
35 options.password = token.toCharArray() // 密碼
36 options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;
37 // 設(shè)置MQTT監(jiān)聽
38 mqttClient?.setCallback(object : MqttCallback {
39 override fun connectionLost(t: Throwable) {
40 // 通知鏈接斷開
41 VMLog.d("MQTT 鏈接斷開 $t")
42 }
43
44 @Throws(Exception::class)
45 override fun messageArrived(topic: String, message: MqttMessage) {
46 // 通知收到消息
47 VMLog.d("MQTT 收到消息:$message")
48 // 如果未訂閱則直接丟棄
49 if (!topicList.contains(topic)) return
50 notifyEvent(topic, String(message.payload))
51 }
52
53 override fun deliveryComplete(token: IMqttDeliveryToken) {}
54 })
55 //進行連接
56 mqttClient?.connect(options, null, object : IMqttActionListener {
57 override fun onSuccess(token: IMqttToken) {
58 VMLog.d("MQTT 鏈接成功")
59 // 鏈接成功,循環(huán)訂閱緩存的主題
60 topicList.forEach { subscribe(it) }
61 }
62
63 override fun onFailure(token: IMqttToken, t: Throwable) {
64 VMLog.d("MQTT 鏈接失敗 $t")
65 }
66 })
67 }
68
69 /**
70 * 訂閱主題
71 * @param topic 主題
72 */
73 fun subscribe(topic: String) {
74 if (!topicList.contains(topic)) {
75 topicList.add(topic)
76 }
77 try {
78 //連接成功后訂閱主題
79 mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {
80 override fun onSuccess(token: IMqttToken) {
81 VMLog.d("MQTT 訂閱成功 $topic")
82 }
83
84 override fun onFailure(token: IMqttToken, t: Throwable) {
85 VMLog.d("MQTT 訂閱失敗 $topic $t")
86 }
87 })
88 } catch (e: MqttException) {
89 e.printStackTrace()
90 }
91 }
92
93 /**
94 * 取消訂閱
95 * @param topic 主題
96 */
97 fun unsubscribe(topic: String) {
98 if (topicList.contains(topic)) {
99 topicList.remove(topic)
100 }
101 try {
102 mqttClient?.unsubscribe(topic)
103 } catch (e: MqttException) {
104 e.printStackTrace()
105 }
106 }
107
108 /**
109 * 發(fā)送 MQTT 消息
110 * @param topic 主題
111 * @param content 內(nèi)容
112 */
113 fun sendMsg(topic: String, content: String) {
114 val msg = MqttMessage()
115 msg.payload = content.encodeToByteArray() // 設(shè)置消息內(nèi)容
116 msg.qos = 0 //設(shè)置消息發(fā)送質(zhì)量,可為0,1,2.
117 // 設(shè)置消息的topic,并發(fā)送。
118 mqttClient?.publish(topic, msg, null, object : IMqttActionListener {
119 override fun onSuccess(asyncActionToken: IMqttToken) {
120 VMLog.d("MQTT 消息發(fā)送成功")
121 }
122
123 override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
124 VMLog.d("MQTT 消息發(fā)送失敗 ${exception.message}")
125 }
126 })
127 }
128
129 /**
130 * 通知 MQTT 事件
131 */
132 private fun notifyEvent(topic: String, data: String) {
133 LDEventBus.post(topic, data)
134 }
135}
源碼地址
核心代碼就這么多,不超過500行,這里沒有直接調(diào)用環(huán)信歷史消息接口獲取消息存儲記錄,后續(xù)可以在進行改良,簡化實現(xiàn)流程。源碼鏈接附上,配合使用效果更佳。
服務(wù)端github源碼:
https://github.com/lzan13/vmtemplateserver
客戶端github源碼:
https://gitee.com/lzan13/VMTemplateAndroid
寫在最后
MQTT協(xié)議資源占用小,并發(fā)連接高,集成簡單,特別適用于高頻數(shù)據(jù)交互場景,比如:游戲的世界廣場、視頻平臺彈幕等等等等,歡迎各位小伙伴集思廣益,基于MQTT服務(wù)實現(xiàn)更多的業(yè)務(wù)場景,享受技術(shù)帶來的便利與快樂。