楊建
(江西工程學(xué)院 江西新余 338000)
現(xiàn)階段,MQTT協(xié)議主要有兩種類型,一種應(yīng)用于TCP、IP 網(wǎng)絡(luò),另一種應(yīng)用于傳感器網(wǎng)絡(luò),能夠提供有序、無損、雙向連接。目前,MQTT協(xié)議主要針對M2M、物聯(lián)網(wǎng)應(yīng)用開發(fā),在設(shè)計(jì)初期充分遵循簡單、輕量原則,減少對網(wǎng)絡(luò)帶寬與設(shè)備資源的實(shí)際需求,確??煽啃?、保證性地交付。MQTT協(xié)議使用發(fā)布、訂閱消息模式,將有效信息分配給一個或多個接收者,在實(shí)現(xiàn)同應(yīng)用程序解耦的過程中,屏蔽負(fù)載內(nèi)容,現(xiàn)已廣泛應(yīng)用于IoT、Mobile Internet、智能家居、穿戴和辦公等智能硬件、車載自組織網(wǎng)絡(luò)和電力能源等行業(yè)。
MQTT協(xié)議中存在發(fā)布者、代理者和訂閱者3種角色,發(fā)布者負(fù)責(zé)向代理者發(fā)布消息,代理者負(fù)責(zé)訂閱者轉(zhuǎn)發(fā)消息,其中,消息發(fā)布者、訂閱者均為客戶端,消息代理者為服務(wù)端。MQTT協(xié)議報文通常由Fixed header、Variable header 與Payload 共同組成,主要消息類型有11 種。MQTT 協(xié)議能夠提供質(zhì)量截然不同的3 種消息傳遞服務(wù)。在QOS0 至多一次服務(wù)等級中,MQTT 協(xié)議消息按照基礎(chǔ)TCP、IP網(wǎng)絡(luò)交付,有可能發(fā)生丟失或者產(chǎn)生重復(fù)。這種消息傳遞服務(wù)質(zhì)量等級可以應(yīng)用在非重要情況下。在QOS1 至少一次服務(wù)等級中,MQTT 協(xié)議消息可以確保及時送達(dá),有可能產(chǎn)生重復(fù)。在QOS2恰好一次服務(wù)等級中,MQTT 協(xié)議消息可以確保送達(dá)且僅被送達(dá)一次[1]。
在MQTT 協(xié)議消息系統(tǒng)的功能性需求上,主要針對一般用戶和系統(tǒng)用戶實(shí)現(xiàn)。通過對MQTT協(xié)議消息系統(tǒng)的設(shè)計(jì),IoT物模型管理能夠?qū)υO(shè)備功能實(shí)施數(shù)字化定義,云端建構(gòu)實(shí)體數(shù)據(jù)模型后,描述模型功能,便捷化管理設(shè)備。系統(tǒng)可以提供IoT設(shè)備狀態(tài)管理,通過上線、下線、離線監(jiān)控,實(shí)時獲取設(shè)備運(yùn)行狀態(tài)、健康狀態(tài)的數(shù)據(jù)參數(shù),據(jù)此可以搭建設(shè)備IoT數(shù)據(jù)平臺,為診斷設(shè)備故障、預(yù)警夯實(shí)基礎(chǔ)。端到端的消息路由可以實(shí)現(xiàn)對數(shù)據(jù)的靈活控制,提高邊緣計(jì)算節(jié)點(diǎn)的安全性,便于數(shù)據(jù)在設(shè)備、函數(shù)應(yīng)用與IoT Hub間流轉(zhuǎn)[2]。
在數(shù)據(jù)、監(jiān)控狀態(tài)上報、匯聚到系統(tǒng)中的規(guī)則引擎后,根據(jù)預(yù)先設(shè)定規(guī)則轉(zhuǎn)發(fā)消息至主體設(shè)備、業(yè)務(wù)系統(tǒng)、App、數(shù)據(jù)庫、消息中間件等。系統(tǒng)安全模塊通過在設(shè)備連接階段進(jìn)行設(shè)備身份認(rèn)證,對非法連接請求進(jìn)行拒絕,確保連接階段設(shè)備合法。與此同時,消息系統(tǒng)通過控制訂閱主題進(jìn)行權(quán)限區(qū)分,防止因數(shù)據(jù)泄密、越級訪問產(chǎn)生網(wǎng)絡(luò)阻塞等。為保證MQTT協(xié)議消息系統(tǒng)的可用性始終處于高水平,系統(tǒng)需要在集群工作管理模式下,對外部提供一致性服務(wù),確保集群工作過程中的消息發(fā)布和消息訂閱操作,能夠通過正確處理消息的方式,確保消息在各節(jié)點(diǎn)的共享。同時,系統(tǒng)可以通過IoT 設(shè)備調(diào)用服務(wù)、IoT 設(shè)備遠(yuǎn)程完整性監(jiān)控等通用信息服務(wù)能力,支持IoT中心所接收的實(shí)時傳感器數(shù)據(jù)進(jìn)行可視化展示、IoT 場景數(shù)據(jù)采集存儲服務(wù)、第三方服務(wù)等業(yè)務(wù)相關(guān)應(yīng)用,在系統(tǒng)信息監(jiān)控過程中,對主題建模最佳數(shù)量、序列與角度、CPU使用率、連接總數(shù)、活躍數(shù)、最大并發(fā)數(shù)、客戶端間發(fā)布和訂閱信息,監(jiān)控服務(wù)器實(shí)時運(yùn)行狀態(tài)。
為保證MQTT 協(xié)議消息系統(tǒng)的實(shí)用價值,在非功能性需求上,需要在消息遲延時間、程序并發(fā)性和系統(tǒng)可擴(kuò)展性幾方面進(jìn)行對應(yīng)約束[3]。MQTT 協(xié)議消息系統(tǒng)在傳遞IoT場景中設(shè)備消息方面發(fā)揮主要作用,傳遞消息的及時性,直接決定IoT設(shè)備是否能夠通過相關(guān)指令迅速進(jìn)行對應(yīng)操作,因而,需要MQTT協(xié)議消息系統(tǒng)在獲得消息后,通過端到端的消息路由實(shí)時分發(fā)消息,系統(tǒng)性能越好。在設(shè)計(jì)MQTT 協(xié)議消息系統(tǒng)過程中,需要側(cè)重考慮系統(tǒng)在程序上的并發(fā)性,一旦系統(tǒng)所接入的設(shè)備數(shù)量大規(guī)模增長,并發(fā)數(shù)逐漸增高,需要考慮并發(fā)連接支持高并發(fā)的連接數(shù),以此保證消息系統(tǒng)的穩(wěn)定程度。伴隨業(yè)務(wù)擴(kuò)展、新需求到來,為便于MQTT協(xié)議消息系統(tǒng)在設(shè)計(jì)過程中有效實(shí)施擴(kuò)展,需要通過集群方式對系統(tǒng)進(jìn)行部署,開發(fā)新功能、提供統(tǒng)一接口。
Netty 是經(jīng)由JBOSS 應(yīng)用服務(wù)器所提供的Java 網(wǎng)絡(luò)開源應(yīng)用框架,功能強(qiáng)大。Netty提供全新方式開發(fā)Web server、客戶端過程,該方式易用性、擴(kuò)展性顯著增強(qiáng),在復(fù)雜的內(nèi)部環(huán)境基礎(chǔ)上,允許更高的吞吐量。一以貫之,Netty 能夠提供異步通信方式、事件驅(qū)動策略的Web 應(yīng)用程序框架與工具,快速實(shí)現(xiàn)Web server、客戶端管理程序的高性能、高可靠性開發(fā)。Netty使用傳輸控制協(xié)議,為多客戶端提供消息查詢服務(wù),促使該過程相對簡單?,F(xiàn)階段,Netty 的核心組件主要包括由java.nio.channels包定義的Channel通道、Callback回調(diào)、異步模型Future、可擴(kuò)展的事件模型和Netty、代碼的主要擴(kuò)展與定制點(diǎn)Channel Handler[4]。其中,Channel 通道通常代表一個實(shí)體的開放性連接,異步模型Future提供在操作完成過程中通知應(yīng)用程序的方式,其能夠在未來某一時刻完成,提供對結(jié)果的訪問,Channel Handler是一個父接口,本身并未提供過多方法。
現(xiàn)階段,MQTT協(xié)議消息系統(tǒng)是IoT重要的傳輸協(xié)議,應(yīng)用廣泛,負(fù)責(zé)進(jìn)行設(shè)備接入和采集數(shù)據(jù),需要根據(jù)業(yè)務(wù)實(shí)際需求將獲取的數(shù)據(jù)存儲至Mysql 數(shù)據(jù)庫、統(tǒng)一上報到相關(guān)業(yè)務(wù)系統(tǒng)等。在搭建MQTT協(xié)議消息系統(tǒng)過程中主要以集群方式實(shí)現(xiàn),通過集群的方式可以解決基于MQTT協(xié)議的Rabbit MQ消息收發(fā),實(shí)現(xiàn)遠(yuǎn)程收發(fā)、AMQP與MQTT間的收發(fā)。單個節(jié)點(diǎn)能夠提供完整服務(wù),可以支撐MQTT協(xié)議消息的傳遞,多節(jié)點(diǎn)協(xié)調(diào)對外提供一致性服務(wù)。
每單個節(jié)點(diǎn)的MQTT 代理服務(wù)端在搭建過程中,使用Netty 技術(shù)建構(gòu)Boss Event Loop 線程模型,用以處理客戶端連接請求,建構(gòu)Work Event Loop線程模型,用以處理客戶端的讀寫請求。消息入站后進(jìn)行字節(jié)解碼,消息出站前將MQTT 客戶端對象編譯成為字節(jié)[5]。Engine x 是高性能Web 服務(wù)器,其作為云上數(shù)據(jù)庫的存儲引擎,可以通過數(shù)據(jù)流代理Stream 模塊實(shí)現(xiàn)協(xié)議的代理轉(zhuǎn)發(fā)tcp報文,主要用于數(shù)據(jù)流代理和負(fù)載均衡等。分布式應(yīng)用程序協(xié)調(diào)服務(wù)軟件zookeeper 作為服務(wù)集群,MQTT 協(xié)議消息系統(tǒng)的總體設(shè)計(jì)主要由傳感器、車載裝置、智慧工廠等物模型與各個主題模塊、設(shè)備運(yùn)行狀態(tài)管理模塊、消息路由模塊、安全模塊(SE)、集群模塊、監(jiān)控模塊組成。
服務(wù)器終端能夠在消息系統(tǒng)中接受、發(fā)送、解析和計(jì)算IoT設(shè)備控制命令與數(shù)據(jù)指令,全監(jiān)控IoT設(shè)備與數(shù)據(jù)存儲設(shè)備信息,主要在下述幾個模塊進(jìn)行。
模塊一,傳感器、車載裝置、智慧工廠等物模型與各個主題模塊。消息系統(tǒng)通過TSL 語言描述物模型,采取JSON格式對物模型數(shù)據(jù)進(jìn)行類似格式上報,在主題實(shí)現(xiàn)過程中將IoT 設(shè)備主題劃分為上行、下行兩類數(shù)據(jù)。
模塊二,設(shè)備運(yùn)行狀態(tài)管理模塊。該模塊是后續(xù)消息發(fā)布和訂閱操作實(shí)現(xiàn)的基礎(chǔ),需要在連接設(shè)備時通過發(fā)送報文連接消息系統(tǒng)。消息系統(tǒng)通過定義Session信息實(shí)現(xiàn)持久和非持久會話的存儲[6]。一旦設(shè)備成功與消息系統(tǒng)建立連接,通過維護(hù)TCP長連接,檢測設(shè)備運(yùn)行狀態(tài),及時將不夠活躍的連接斷開,充分將服務(wù)器內(nèi)存資源釋放出來?;贛QTT 協(xié)議規(guī)范,始終保持連接的設(shè)備需要主動向服務(wù)節(jié)點(diǎn)定時傳送PING 包REQ心跳請求,系統(tǒng)接收請求后及時進(jìn)行將心跳響應(yīng)(PINGRESP)返回至客戶端,進(jìn)而保持設(shè)備連接狀態(tài),成功測試網(wǎng)絡(luò)處于通順狀態(tài)。為實(shí)現(xiàn)設(shè)備心跳,需要使用Netty 技術(shù)實(shí)現(xiàn)連接讀寫事件、空閑事件檢查,根據(jù)設(shè)備心跳時間觸發(fā)Netty核心組件,找到設(shè)備對應(yīng)連接并予以關(guān)閉,釋放服務(wù)器資源。
模塊三,消息路由模塊。在實(shí)現(xiàn)即時消息推送過程中,通過成功連接設(shè)備端、系統(tǒng),首先發(fā)送報文請求,進(jìn)行協(xié)議處理,提取消息發(fā)布主題、消息傳遞服務(wù)質(zhì)量級別、消息內(nèi)容,進(jìn)行封裝后調(diào)用發(fā)布權(quán)限,驗(yàn)證設(shè)備是否具備。在消息推送前需要進(jìn)行主題訂閱匹配查詢,找出各層級設(shè)備訂閱列表和訂閱該主題消息的設(shè)備,寫入消息到設(shè)備對應(yīng)消息隊(duì)列后,將消息推送給指定客戶端。在實(shí)現(xiàn)離線消息推送過程中,需要結(jié)合MQTT協(xié)議規(guī)范存儲設(shè)備離線消息。在消息推送后進(jìn)行主題訂閱匹配查詢,通過設(shè)備狀態(tài)查詢模塊查找設(shè)備在線情況,便于系統(tǒng)通過離線數(shù)據(jù)寫入模塊,將設(shè)備離線消息寫入數(shù)據(jù)庫[7]。一旦系統(tǒng)初始化開始,為保證離線消息毫秒級查詢,需要在處理邏輯中調(diào)用generate Row Key 生成消息的行鍵。最終,將消息存儲在列表里批量提交,通過后臺程序及時刪除推送過的消息。在橋接數(shù)據(jù)到Kafka開源流處理平臺時,需要根據(jù)預(yù)設(shè)規(guī)則將主題消息篩選,通過寫入對應(yīng)主題,促使其他業(yè)務(wù)訂閱平臺主題消費(fèi)消息。
模塊四,安全模塊(SE)。使用“一型號一密鑰”的方式驗(yàn)證設(shè)備注冊情況,使用一機(jī)器一密鑰的方式驗(yàn)證設(shè)備認(rèn)證情況。設(shè)備連接時,通過提取clientid、username和password這3個參數(shù)解析連接報文,向設(shè)備身份驗(yàn)證服務(wù)發(fā)送請求。使用標(biāo)識、IP、用戶名、密碼和主題為參數(shù)發(fā)起請求權(quán)限,處理ACL HTTP請求。
模塊五,集群模塊。集群通常由節(jié)點(diǎn)構(gòu)成,節(jié)點(diǎn)不規(guī)則分布于JVM、物理機(jī)之上,一旦集群系統(tǒng)被觸發(fā),節(jié)點(diǎn)將自動加入集群,節(jié)點(diǎn)狀態(tài)變化通過gossip 通信協(xié)議傳輸至集群每個節(jié)點(diǎn),確保集群狀態(tài)一致性[8]。因MQTT 協(xié)議消息系統(tǒng)通過去中心化集群方式搭建,存在多個服務(wù)節(jié)點(diǎn),IoT 設(shè)備發(fā)布消息后,節(jié)點(diǎn)需要精準(zhǔn)轉(zhuǎn)發(fā)消息,確保消息路由推送消息的正確與完整程度。
模塊六,監(jiān)控模塊。該模塊通過變量方式存儲各節(jié)點(diǎn)設(shè)備連接數(shù)、主題訂閱數(shù)、客戶端等信息,在數(shù)據(jù)庫中,一旦需要查看相應(yīng)信息,可以依靠腳本及時獲取信息并定時更新。
在通信過程,需要設(shè)備和服務(wù)器間的通信協(xié)議、服務(wù)器與客戶端間利用AMQPAMQP傳輸?shù)耐ㄐ艆f(xié)議、服務(wù)器與移動通信終端間利用MQTT協(xié)議傳輸?shù)耐ㄐ艆f(xié)議[9]。實(shí)現(xiàn)MQTT協(xié)議消息系統(tǒng)代理服務(wù)器工作,需要科學(xué)管理已經(jīng)訂閱的消息,通過多終端轉(zhuǎn)發(fā)消息,查詢存儲信息,同時,通過設(shè)置用戶權(quán)限實(shí)現(xiàn)“1&X”服務(wù)關(guān)系。在多終端服務(wù)端口,用戶可以通過ID查詢訂閱成功的MQTT 主題,通過多終端對設(shè)備實(shí)施遠(yuǎn)程控制。
Netty 的MQTT 協(xié)議消息系統(tǒng)的設(shè)計(jì)與實(shí)現(xiàn),使用客戶端工具進(jìn)行消息系統(tǒng)的功能測試。在測試過程中,客戶端工具通過創(chuàng)建多樣化連接設(shè)置的客戶端滿足測試需求,選用MQTT Box 作為主測試工具,驗(yàn)證MQTT協(xié)議消息系統(tǒng)是否滿足系統(tǒng)功能需求。在功能測試過程中,主要對IoT 設(shè)備身份認(rèn)證ID2、發(fā)布消息、消息訂閱主題(Subscribe)、接收推送消息、自定義心跳包結(jié)構(gòu)體、消息重投功能(republish)進(jìn)行[10]。
針對發(fā)布消息、消息訂閱主題、接收推送消息、自定義心跳包結(jié)構(gòu)體、消息重投功能的測試通常由用例編號、用例名稱、用例標(biāo)題、測試目的、測試步驟(預(yù)置條件、測試步驟)、預(yù)期結(jié)果、測試結(jié)果等共同組成。選用MQTT.fx,使用Java 語言編寫的客戶端工具,針對MQTT 客戶端進(jìn)行Topic 訂閱和消息發(fā)布,同時啟動客戶端A 和客戶端B,A 用于消息發(fā)布,B 用于Topic 訂閱,測試MQTT 協(xié)議消息系統(tǒng)是否支持QOS2 恰好一次服務(wù)等級質(zhì)量[11]。
在單節(jié)點(diǎn)性能測試上,可以通過兩臺機(jī)器執(zhí)行程序模擬,多次執(zhí)行命令設(shè)置連接數(shù),通過Web監(jiān)控頁面連接成功率,5 000 連接數(shù)的CPU 占用率為10.2%,10 000連接數(shù)的CPU占用率為18.5%,20 000連接數(shù)的CPU 占用率為28.3%,30 000 連接數(shù)的CPU 占用率為41.6%,38 000 連接數(shù)的CPU 占用率為55.1%,結(jié)果均為成功。可見連接數(shù)在38 000內(nèi),設(shè)備連接較為穩(wěn)定。在集群性能測試過程中,通過Jmeter 測試軟件模擬設(shè)置并發(fā)連接數(shù)及相關(guān)參數(shù)、點(diǎn)擊執(zhí)行,生成測試報告。MQTT協(xié)議消息系統(tǒng)在集群環(huán)境下可以處理更多設(shè)備連接數(shù),節(jié)點(diǎn)集群處理100 000 連接非常穩(wěn)定,CPU 負(fù)載較低,消息時延在1 s內(nèi),滿足對系統(tǒng)需求。
綜上所述,Netty 提供API 接口從網(wǎng)絡(luò)處理代碼中解耦核心業(yè)務(wù)邏輯與輔助功能,完全基于Java NIO(無阻塞的輸入/輸出)中的Buffer 實(shí)現(xiàn)內(nèi)部可擴(kuò)展性解決方案,在NIO通道進(jìn)行交互的過程中,將數(shù)據(jù)移進(jìn)移出通道,架構(gòu)編程服務(wù)器與客戶端框架結(jié)構(gòu),實(shí)現(xiàn)服務(wù)端、客戶端要求的功能。以Netty 技術(shù)為支撐的MQTT協(xié)議消息系統(tǒng)屬于IoT服務(wù)運(yùn)營平臺組成部分之一,與IoT設(shè)備管理子系統(tǒng)同處于IoT服務(wù)運(yùn)營平臺接入層底部,作為平臺重要的基礎(chǔ),MQTT協(xié)議消息系統(tǒng)可以為平臺上層應(yīng)用提供強(qiáng)大的數(shù)據(jù)支撐,與接入層的規(guī)則引擎、IoT 應(yīng)用微服務(wù)、數(shù)據(jù)存儲管理系統(tǒng)通過HTTP API 網(wǎng)關(guān)、RPC 服務(wù)器等實(shí)現(xiàn)服務(wù)間的協(xié)同。測試結(jié)果表明,MQTT協(xié)議消息系統(tǒng)能夠滿足預(yù)期設(shè)計(jì)目標(biāo),可利用在多種物聯(lián)網(wǎng)系統(tǒng)中。