馬 躍,顏睿陽(yáng),孫建偉
(中國(guó)科學(xué)院大學(xué),北京 100049)
(中國(guó)科學(xué)院 沈陽(yáng)計(jì)算技術(shù)研究所,沈陽(yáng) 110168)
隨著智能終端普及率的提高,大部分網(wǎng)民已經(jīng)從傳統(tǒng)的PC接入網(wǎng)絡(luò)變?yōu)橐苿?dòng)終端接入,即時(shí)通信的發(fā)展前景一片光明.然而基于私有通信協(xié)議開(kāi)發(fā)的即時(shí)通信類(lèi)應(yīng)用,嚴(yán)重影響了擴(kuò)展網(wǎng)絡(luò)功能和技術(shù)的進(jìn)步的步伐,因此急需一種基于標(biāo)準(zhǔn)協(xié)議并能滿足大眾業(yè)務(wù)需求的消息推送平臺(tái).MQTT協(xié)議是Android系統(tǒng)中消息推送的實(shí)現(xiàn)技術(shù)之一,由于此協(xié)議的簡(jiǎn)單、便捷,目前已得到了眾多應(yīng)用.但傳統(tǒng)MQTT服務(wù)器在分布式部署方面存在較大不足,基于RocketMQ具有高性能、高可靠、高實(shí)時(shí)、分布式的特點(diǎn)[1],本文提出了一種基于RocketMQ的MQTT消息推送服務(wù)器并實(shí)現(xiàn)了分布式部署.
本文主要分為4個(gè)部分,第1部分對(duì)RocketMQ進(jìn)行了概述.第2部分設(shè)計(jì)并實(shí)現(xiàn)了基于RocketMQ的MQTT消息推送服務(wù)器.第3部分介紹了服務(wù)器的分布式部署情況.第4部分對(duì)本文設(shè)計(jì)的消息推送服務(wù)器進(jìn)行了測(cè)試和分析.第5部分對(duì)本文進(jìn)行了總結(jié)以及對(duì)未來(lái)工作進(jìn)行了概述.
RocketMQ是阿里巴巴消息中間團(tuán)隊(duì)開(kāi)發(fā)的一種基于隊(duì)列模型的消息中間件,具有高性能、高可靠、高實(shí)時(shí)、分布式特點(diǎn)[1],其中Producer、Consumer以及隊(duì)列都可以實(shí)現(xiàn)分布式.發(fā)送消息時(shí),Producer以此發(fā)送消息到各個(gè)隊(duì)列,這個(gè)隊(duì)列集合稱(chēng)為T(mén)opic.一個(gè)Consumer實(shí)例消費(fèi)一個(gè)Topic對(duì)應(yīng)的所有隊(duì)列的情況稱(chēng)為廣播消費(fèi),多個(gè)Consumer實(shí)例平均消費(fèi)一個(gè)topic對(duì)應(yīng)的隊(duì)列集合的情況稱(chēng)為集群消費(fèi)[2].其通信組件使用了Netty-4.0.9.Final,并在此之上進(jìn)行了簡(jiǎn)單的協(xié)議封裝.其消息格式如表1所示.
表1 RocketMQ消息格式
其中l(wèi)ength為4字節(jié)整數(shù),其數(shù)值等于后3部分長(zhǎng)度的和.Header length為4字節(jié)整數(shù),其數(shù)值等于header data的長(zhǎng)度;Header data部分使用json序列化數(shù)據(jù);Body data使用阿里自定義的二進(jìn)制序列化數(shù)據(jù).
本文所設(shè)計(jì)的消息推送服務(wù)器結(jié)構(gòu)如圖1所示.整個(gè)服務(wù)器分為3層.第一層為MQTT Broker,主要負(fù)責(zé)將客戶端與服務(wù)器進(jìn)行連接,并且基于pub/sub機(jī)制,實(shí)現(xiàn)MQTT協(xié)議的相關(guān)功能;第二層為協(xié)議轉(zhuǎn)換層,主要將MQTT消息包轉(zhuǎn)換為RocketMQ支持的消息格式,其負(fù)責(zé)實(shí)現(xiàn)MQTT Broker與RocketMQ的對(duì)接,將消息推送到RocketMQ的消息隊(duì)列中以及將消息推送到MQTT Broker端;第三層為RocketMQ broker,主要負(fù)責(zé)將第二層發(fā)送過(guò)來(lái)的消息發(fā)送到總線當(dāng)中,與分布式服務(wù)器進(jìn)行通訊,完成消息的分發(fā)與接收.
本文服務(wù)器的功能主要是實(shí)現(xiàn)消息推送與pub/sub功能,并且在該功能的基礎(chǔ)上,實(shí)現(xiàn)用戶個(gè)體之間的即時(shí)通信、預(yù)訂閱等相關(guān)功能.即時(shí)通信可以傳輸文本、圖片、語(yǔ)音、視頻和文件等消息,預(yù)訂閱用于客戶端第一次連接到代理服務(wù)器時(shí),幫助用戶預(yù)先訂閱話題,從而達(dá)到節(jié)省流量,優(yōu)化用戶體驗(yàn)的目的.
圖1 服務(wù)器結(jié)構(gòu)
現(xiàn)如今,比較主流的MQTT協(xié)議代理有IBM Websphere MQ Telemetry和Mosquitto[3].本文選取了Mosquitto作為MQTT的消息代理.Mosquitto是一款輕量級(jí)開(kāi)源的消息代理,它相對(duì)完整的實(shí)現(xiàn)了MQTT協(xié)議中的各項(xiàng)功能,并能夠完美運(yùn)行在Linux系統(tǒng)中.所以,本文選取并在其開(kāi)源項(xiàng)目的基礎(chǔ)上,對(duì)其進(jìn)行了優(yōu)化,使其能夠?qū)崿F(xiàn)多終端同步.
通過(guò)對(duì)比MQTT協(xié)議的格式以及RocketMQ消息的格式,不難發(fā)現(xiàn)兩者都是基于Topic模塊,而且兩者參數(shù)存在很多相同.所以,我們通過(guò)協(xié)議轉(zhuǎn)換模塊,將mosquitto接收到的Topic、QoS及payload等相關(guān)參數(shù),轉(zhuǎn)換成RocketMQ的Topic、QoS及Body等參數(shù),并經(jīng)由producer發(fā)送至總線中,由Consumer負(fù)責(zé)消費(fèi).流程圖如圖2所示.
消息推送模塊完成消息發(fā)布到被消費(fèi)的具體流程:
1) 當(dāng)客戶端發(fā)布消息時(shí),由輕量級(jí)Mosquitto消息代理將消息接受并且返回給客戶端消息送達(dá)狀態(tài).
2) 將MQTT消息包內(nèi)的Topic、payload以及Qos等相關(guān)信息進(jìn)行提取,Topic在原有基礎(chǔ)上增加_rocketmq組成新的Topic傳遞給rocketmq;將payload格式由mqtt_string轉(zhuǎn)換為string,并賦值給Body傳遞給rocketmq;將MQTT的QoS參數(shù)直接傳遞給Rocketmq的QoS參數(shù).組成新的數(shù)據(jù)包之后,經(jīng)由RocketMQ broker將消息推送到消息總線中,并由總線返回消息送達(dá)狀態(tài).
3) RocketMQ的Consumer根據(jù)Topic對(duì)總線中的消息進(jìn)行消費(fèi),并將RocketMQ的Topic直接傳遞給MQTT 的 Topic,即 topic_rocketmq.并將 Body和QoS重新按照MQTT格式進(jìn)行封裝,并且推送給訂閱了該主題的客戶端.
圖2 消息流程圖
對(duì)服務(wù)器的架構(gòu)角色分配如表2所示.
圖3中,由于Name Server是一個(gè)幾乎無(wú)狀態(tài)節(jié)點(diǎn),故部署為集群,節(jié)點(diǎn)之間無(wú)任何消息同步[4].Producer與Consumer完全無(wú)狀態(tài),集群部署,Producer選擇Name Server集群中的一個(gè)隨機(jī)節(jié)點(diǎn)建立長(zhǎng)連接[5].Consumer基本與Producer相同,只是其要想提供Topic服務(wù)的Master、Slave建立長(zhǎng)連接并定時(shí)發(fā)送心跳.其即可以從Master訂閱消息,也可以從Slave訂閱消息.
表2 服務(wù)器架構(gòu)角色
Broker可分為Master與Slave.通過(guò)指定相同的BrokerName,不同的BrokerId來(lái)定義 Master與Slave的對(duì)應(yīng)關(guān)系.想要一個(gè)代理成為Master,設(shè)置BrokerId為0,設(shè)置BrokerId的值非0則為Slave.Master可以部署多個(gè).每個(gè)Broker與NameServer集群中的所有節(jié)點(diǎn)建立長(zhǎng)連接,定時(shí)向所有的NameServer注冊(cè)Topic信息[3].
圖3 分布式部署結(jié)構(gòu)圖
Host文件配置:
本部分將對(duì)本文所設(shè)計(jì)的服務(wù)器在增加負(fù)載的情況下進(jìn)行性能和魯棒性的測(cè)試.服務(wù)器配置如表3所示.
表3 服務(wù)器配置
本文通過(guò)使用Erlang語(yǔ)言開(kāi)發(fā)的,開(kāi)源測(cè)試工具emqtt_benchmark模擬客戶端登錄并發(fā)送消息對(duì)服務(wù)器進(jìn)行負(fù)載壓力測(cè)試.測(cè)試網(wǎng)絡(luò)結(jié)構(gòu)圖如圖4所示.其中,黑色連接線代表網(wǎng)絡(luò)連接,箭頭表示數(shù)據(jù)流向.
圖4 測(cè)試網(wǎng)絡(luò)結(jié)構(gòu)
兩臺(tái)terminal模擬用戶起大量線程,不間斷的向broker發(fā)送大小為2 K的消息,測(cè)試結(jié)果如圖5所示.
圖5 TPS測(cè)試結(jié)果
通過(guò)測(cè)試,單機(jī)Broker的TPS可達(dá)到15 900左右,繼續(xù)增大消息發(fā)送量時(shí),可以看到TPS有下降的趨勢(shì).通過(guò)測(cè)試可以觀測(cè)服務(wù)器在大量消息情況下依然能夠保持一個(gè)穩(wěn)定的狀態(tài),基本能夠滿足日常服務(wù)器需求.
接下來(lái)模擬大量用戶進(jìn)行連接,并在連接完成后進(jìn)行消息收發(fā),測(cè)試其CPU占有率及消息全部送達(dá)所需時(shí)間.測(cè)試結(jié)果如表4所示.
表4 服務(wù)器性能測(cè)試結(jié)果
由表中數(shù)據(jù)可以得出,本文所設(shè)計(jì)的消息推送服務(wù)器,響應(yīng)速度基本滿足當(dāng)前移動(dòng)互聯(lián)網(wǎng)領(lǐng)域中的需求,服務(wù)器負(fù)載也在可接受范圍內(nèi),連接并發(fā)數(shù)量尚可,服務(wù)器性能穩(wěn)定.
本文在研究MQTT協(xié)議以及RocketMQ的基礎(chǔ)上,論述了基于RocketMQ的MQTT消息推送服務(wù)器的設(shè)計(jì),并對(duì)其進(jìn)行了實(shí)現(xiàn).本文重點(diǎn)闡述了消息推送服務(wù)器的消息接收和發(fā)送、消息格式轉(zhuǎn)換功能、消息推送服務(wù)器的性能測(cè)試,同時(shí)與客戶端結(jié)合完成了消息推送服務(wù)器的功能測(cè)試.最后根據(jù)測(cè)試結(jié)果可以看出,本文設(shè)計(jì)的服務(wù)器完成了消息收發(fā)、協(xié)議轉(zhuǎn)換等功能要求,同時(shí)具有良好的抗壓能力和魯棒性.
下一步的工作主要集中在提高服務(wù)器的并發(fā)處理能力上,同時(shí)進(jìn)一步提高服務(wù)器的性能及魯棒性.
1 架構(gòu)說(shuō):阿里中間件技術(shù):消息中間件篇.http://www.jiagoushuo.com/article/1000141.html.[2016-04-18].
2 Apache RocketMQ:An open source distributed messaging and streaming data platform.http://rocketmq.apache.org.[2016-12-29].
3 Mosquitto.An open source MQTT v3.1/v3.1.1 broker.http://mosquitto.org/.[2015].
4 Kobejayandy.RocketMQ 入 門(mén).http://blog.csdn.net/kobejay andy/article/details/52831213.[2016-10-16].
5 歐志芳.基于RocketMQ實(shí)現(xiàn)異構(gòu)數(shù)據(jù)庫(kù)同步.網(wǎng)絡(luò)安全技術(shù)與應(yīng)用,2016,(12):99-100.[doi:10.3969/j.issn.1009-6833.2016.12.066]