葉 標(biāo), 馮 拔, 李 祥
(1.長(zhǎng)沙中車智馭新能源科技有限公司, 長(zhǎng)沙 410083; 2.中車時(shí)代電動(dòng)汽車股份有限公司, 湖南 株洲 412007)
隨著新能源車輛持續(xù)增漲[1],為滿足GB/T 32960.3—2016 《電動(dòng)汽車遠(yuǎn)程服務(wù)與管理系統(tǒng)技術(shù)規(guī)范 第3部分:通訊協(xié)議及數(shù)據(jù)格式》和國(guó)家及地方對(duì)新能源客車數(shù)據(jù)進(jìn)行實(shí)時(shí)監(jiān)管的要求,各車企紛紛搭建自己的車輛數(shù)據(jù)監(jiān)控平臺(tái)[2]。隨著平臺(tái)接入車輛數(shù)據(jù)體量和種類的逐年增長(zhǎng),以及為實(shí)現(xiàn)對(duì)車輛關(guān)鍵零部件數(shù)據(jù)的實(shí)時(shí)、不間斷采集,車輛數(shù)據(jù)發(fā)送頻率、流量也在不斷增加,使車輛監(jiān)管平臺(tái)面臨高并發(fā)壓力以及數(shù)據(jù)延時(shí)長(zhǎng)等問(wèn)題[3]?;诖耍疚脑O(shè)計(jì)和實(shí)現(xiàn)一個(gè)能解決上述問(wèn)題,同時(shí)還能提高業(yè)務(wù)處理能力、兼具更優(yōu)擴(kuò)展性的車輛監(jiān)控平臺(tái)。
本文平臺(tái)通過(guò)CAN總線技術(shù)采集車輛數(shù)據(jù),基于Netty網(wǎng)絡(luò)應(yīng)用程序架構(gòu)[4]搭建通信網(wǎng)關(guān),利用Netty架構(gòu)異步非阻塞[5]、事件驅(qū)動(dòng)等特性,解決大批量車載終端連接的高并發(fā)壓力,同時(shí)將耗時(shí)的數(shù)據(jù)處理工作綁定自定義線程池進(jìn)行異步處理;復(fù)雜的數(shù)據(jù)處理業(yè)務(wù)交由SpringCloud微服務(wù)集群進(jìn)行分布式處理,通過(guò)優(yōu)化線程池參數(shù)、自定義解碼和編碼方式、自定義通信協(xié)議,進(jìn)一步提升平臺(tái)的高并發(fā)性和低時(shí)延性。
Netty 框架是基于Reactor模型[6]開發(fā)的,通過(guò)異步事件驅(qū)動(dòng)和epoll I/O多路復(fù)寫技術(shù)實(shí)現(xiàn)終端設(shè)備的高并發(fā)接入,其模型如圖1所示。Netty服務(wù)端推薦采用主從多線程池模型,主線程池(BossGroup)和從線程池(WorkGroup)都是NioEventLoopGroup類型,主要功能是維護(hù)EventLoop子線程和Selector選擇器;EventLoop線程通過(guò)選擇器,輪詢與其綁定的Socket的網(wǎng)絡(luò)通信,并執(zhí)行處理任務(wù)。其中主線程池用來(lái)處理設(shè)備的連接,將建立的Socket信道注冊(cè)到WorkGroup線程池中NioEventLoop子線程中的Selector中,從而實(shí)現(xiàn)與Worker線程進(jìn)行綁定。從線程池處理Socket信道數(shù)據(jù)讀寫,數(shù)據(jù)讀寫任務(wù)完成后,其他任務(wù)相繼根據(jù)注冊(cè)到NioServerSocketChannel的順序依次進(jìn)行。當(dāng)存在多個(gè)設(shè)備與服務(wù)器建立連接時(shí),服務(wù)器通過(guò)Selector監(jiān)聽每個(gè)信道上是否需要進(jìn)行讀寫。這種非阻塞的讀寫過(guò)程不僅能提升 I/O 線程的運(yùn)行效率,同時(shí)能夠?qū)崿F(xiàn)線程并發(fā)進(jìn)行多個(gè)設(shè)備連接和數(shù)據(jù)讀寫操作。Netty服務(wù)端架構(gòu)[7]包含Boss和Worker兩個(gè)事件循環(huán)組(NioEventLoopGroup),Boss組主要負(fù)責(zé)處理連接事件并將請(qǐng)求分發(fā)給Worker組,由Worker組負(fù)責(zé)IO讀寫和業(yè)務(wù)處理。
圖1 網(wǎng)關(guān)Netty模型圖
Kafka作為信息中間件模型[8]有兩種信息傳遞方式:點(diǎn)對(duì)點(diǎn)的傳遞模式和分布式的發(fā)布-訂閱模式。本文設(shè)計(jì)的監(jiān)控平臺(tái)包含多個(gè)數(shù)據(jù)處理業(yè)務(wù),每個(gè)業(yè)務(wù)模塊包含一個(gè)消費(fèi)者,符合多個(gè)消費(fèi)者消費(fèi)一個(gè)Topic的應(yīng)用場(chǎng)景,因此本文使用分布式的發(fā)布-訂閱模式。
Kafka作為一個(gè)分布式的發(fā)布/訂閱信息傳遞的中間件模型,具有可擴(kuò)展、高吞吐量、有序化、持久化等特性,常用于處理異步通信、應(yīng)用解耦以及流量控制等問(wèn)題。同時(shí)Kafka模型的分布式體現(xiàn)在Kafka采用集群的方式在多臺(tái)Kafka服務(wù)器上運(yùn)行,并通過(guò)Zookeeper對(duì)集群進(jìn)行管理[9],支持動(dòng)態(tài)水平擴(kuò)展,同一個(gè)Topic的消息可以存儲(chǔ)在不同的Partition分區(qū)來(lái)提高吞吐率。在消費(fèi)者讀取信息時(shí),每個(gè)分區(qū)為信息標(biāo)記位置,并通過(guò)偏移量記錄當(dāng)前消費(fèi)信息的位置,從而保證每個(gè)Partition內(nèi)的信息有序。Kafka從底層看是一條信息隊(duì)列,并將存入隊(duì)列的信息數(shù)據(jù)進(jìn)行持久化,在Kafka處理系統(tǒng)明確指出數(shù)據(jù)已經(jīng)被處理完之前,都可以將數(shù)據(jù)恢復(fù),消費(fèi)者通過(guò)請(qǐng)求Broke中轉(zhuǎn)站,從Topic中獲取數(shù)據(jù)。
CAN總線技術(shù)(ControllerArea Network)[10],作為廣泛應(yīng)用在客車內(nèi)部控制器之間的通訊協(xié)議,具有通信及時(shí)性高、抗干擾強(qiáng)、傳輸誤碼率低、成本低等特點(diǎn)。多個(gè)控制模塊通過(guò)CAN控制器連接到CAN總線網(wǎng)絡(luò),CAN總線通過(guò)廣播的方式,將其報(bào)文數(shù)據(jù)傳輸?shù)礁鱾€(gè)CAN模塊,每個(gè)報(bào)文具有8個(gè)字節(jié)長(zhǎng)度和唯一標(biāo)識(shí)符ID。同時(shí)車載終端設(shè)備通過(guò)兩根CAN總線連接到CAN總線網(wǎng)絡(luò)并獲取客車的電機(jī)、電池、電控三電數(shù)據(jù)以及其他相關(guān)零部件的控制信息。
基于SpringCloud的微服務(wù)架構(gòu)[11]將車輛監(jiān)控平臺(tái)的不同功能模塊拆分成相互獨(dú)立的微服務(wù)應(yīng)用,每個(gè)微服務(wù)應(yīng)用專注于單一的業(yè)務(wù)功能。微服務(wù)應(yīng)用又是基于SpringBoot架構(gòu)搭建,相互之間通過(guò)輕量級(jí)Http協(xié)議進(jìn)行通信,易于開發(fā)和運(yùn)維。
本文設(shè)計(jì)的新能源客車數(shù)據(jù)監(jiān)管平臺(tái),通過(guò)CAN總線技術(shù)采集整車關(guān)鍵零部件信息,基于Netty網(wǎng)絡(luò)通信開發(fā)架構(gòu)和自定義編解碼方式,實(shí)現(xiàn)車載終端數(shù)據(jù)的采集與轉(zhuǎn)發(fā),實(shí)現(xiàn)終端到網(wǎng)關(guān)之間的可靠傳輸。引入Kafka信息中間件,基于其異步通信的特性提高網(wǎng)關(guān)的并發(fā)量;利用應(yīng)用解耦的特性將數(shù)據(jù)流式處理模塊與存儲(chǔ)分析模塊和業(yè)務(wù)模塊進(jìn)行解耦;基于B/S模式構(gòu)建Web應(yīng)用平臺(tái)對(duì)車輛數(shù)據(jù)進(jìn)行監(jiān)測(cè)。平臺(tái)總體架構(gòu)如圖2所示。新能源客車數(shù)據(jù)監(jiān)管平臺(tái)分為4個(gè)核心功能模塊。
1) 數(shù)據(jù)采集模塊?;谲囕d終端T-Box,通過(guò)CAN總線連接整車控制器,采集車輛電機(jī)、電池、電控的核心三電信息數(shù)據(jù),是轉(zhuǎn)發(fā)系統(tǒng)的數(shù)據(jù)來(lái)源。
2) 數(shù)據(jù)流式并發(fā)處理模塊?;贜etty服務(wù)端模型搭建的網(wǎng)關(guān),通過(guò)Redis緩存終端設(shè)備與服務(wù)器之間的連接信息,可支持?jǐn)U展GB 32960以外的其他設(shè)備通訊協(xié)議。終端與網(wǎng)關(guān)建立連接SocketChannel的信道,上傳采集數(shù)據(jù),終端上傳的數(shù)據(jù)為十六進(jìn)制原始碼流,網(wǎng)關(guān)接收后存放在ByteBuf 緩沖對(duì)象中。在傳輸過(guò)程中,可能會(huì)發(fā)生TCP粘包的問(wèn)題,針對(duì)此問(wèn)題,通過(guò)自定義Decoder解碼器[9],解析數(shù)據(jù)中標(biāo)志本包數(shù)據(jù)單元長(zhǎng)度的對(duì)應(yīng)字節(jié)位,將解析出來(lái)的數(shù)據(jù)長(zhǎng)度與本包數(shù)據(jù)的實(shí)際長(zhǎng)度進(jìn)行對(duì)比,如果長(zhǎng)度不夠,則通過(guò)重置ByteBuf索引位置,與下一包數(shù)據(jù)合并,直到讀取出滿足數(shù)據(jù)單元長(zhǎng)度規(guī)定的完整的一包數(shù)據(jù),再將此包數(shù)據(jù)傳遞到下一個(gè)事務(wù)處理器(Handler)中。當(dāng)有完整數(shù)據(jù)進(jìn)入Handler時(shí),首先對(duì)數(shù)據(jù)內(nèi)容進(jìn)行解析,解析出終端對(duì)應(yīng)的車架號(hào)、數(shù)據(jù)采集時(shí)間等字段組成關(guān)鍵字,然后通過(guò)調(diào)用Kafka的生產(chǎn)者(Producer)將數(shù)據(jù)存入Kafka中,再通過(guò)Flink流式計(jì)算技術(shù)對(duì)原始報(bào)文數(shù)據(jù)進(jìn)行實(shí)時(shí)解析處理,并根據(jù)解析后數(shù)據(jù)特性進(jìn)行分類,存入不同的數(shù)據(jù)集。
3) 數(shù)據(jù)存儲(chǔ)分析模塊。包含Hbase、Mysql、Redis數(shù)據(jù)庫(kù),分別以集群的方式部署并通過(guò)Zookeeper進(jìn)行監(jiān)管,其中Hbase存儲(chǔ)Flink計(jì)算后的數(shù)據(jù)集以及未處理的原始報(bào)文,采用HIVE、Hbase、Imapla結(jié)合的方式,配置HIVE與Hbase數(shù)據(jù)同步,通過(guò)Kettle工具進(jìn)行數(shù)據(jù)清洗,并基于ElasticSearch進(jìn)行建模分析。 Redis集群存儲(chǔ)實(shí)時(shí)性要求較高的報(bào)文信息,如實(shí)時(shí)報(bào)警信息、車輛定位信息等。Mysql集群用于存儲(chǔ)大數(shù)據(jù)分析過(guò)后生成的預(yù)警信息和運(yùn)營(yíng)統(tǒng)計(jì)報(bào)表信息。
4) 業(yè)務(wù)處理模塊?;赟pringCloud架構(gòu)開發(fā),單個(gè)應(yīng)用服務(wù)又是基于Springboot架構(gòu),通過(guò)集成大量的框架,并盡可能地自動(dòng)配置Spring,以簡(jiǎn)化整個(gè)項(xiàng)目的搭建和開發(fā)過(guò)程。各服務(wù)之間使用RESTFUL API相互協(xié)調(diào),通過(guò)Eureka注冊(cè)中心獲取和管理服務(wù),通過(guò)Ribbon維持各個(gè)分布式服務(wù)器間的負(fù)載均衡,通過(guò)Hystrix熔斷機(jī)制保護(hù)微服務(wù)鏈路。前端使用Vue架構(gòu),基于MVVM[12](Model View ViewModel)的模式,實(shí)現(xiàn)后端傳遞數(shù)據(jù)與前端頁(yè)面的雙向綁定。將項(xiàng)目使用到的前端靜態(tài)資源如js、css等單獨(dú)存放在靜態(tài)資源服務(wù)器上由前端維護(hù),從而實(shí)現(xiàn)前后端分離。業(yè)務(wù)處理模塊的主要功能有:
圖2 系統(tǒng)總體架構(gòu)圖
①車輛行駛狀態(tài)監(jiān)控。車輛通過(guò)終端與平臺(tái)建立通信連接后,平臺(tái)用戶可以對(duì)車輛實(shí)時(shí)行駛狀態(tài)數(shù)據(jù)進(jìn)行監(jiān)控。通過(guò)WebSocket客戶端處理上傳數(shù)據(jù)并發(fā)送給WebSocket服務(wù)器,再通過(guò)WebSocket服務(wù)器實(shí)時(shí)推送給平臺(tái)。
②預(yù)警信息推送。平臺(tái)通過(guò)大數(shù)據(jù)建模對(duì)車輛關(guān)鍵數(shù)據(jù)進(jìn)行分析、判斷,如車輛上傳數(shù)據(jù)達(dá)到預(yù)警閾值,則按照特定的規(guī)則生成預(yù)警信息,并通過(guò)語(yǔ)音、短信、APP的方式推送給車輛相關(guān)負(fù)責(zé)人。
③遠(yuǎn)程控制。車輛通過(guò)終端與平臺(tái)建立通信連接后,平臺(tái)用戶可以對(duì)車輛下發(fā)控制指令,通過(guò)獲取終端設(shè)備與Netty網(wǎng)關(guān)服務(wù)器間的連接信息,然后根據(jù)控制信息內(nèi)容封裝成原始碼流,再通過(guò)Netty網(wǎng)關(guān)下發(fā)給車載終端,從而實(shí)現(xiàn)車輛遠(yuǎn)程控制。
④生成統(tǒng)計(jì)報(bào)表。根據(jù)大數(shù)據(jù)模型生成車輛運(yùn)營(yíng)里程統(tǒng)計(jì)報(bào)表、故障統(tǒng)計(jì)報(bào)表、累計(jì)能耗統(tǒng)計(jì)等信息,平臺(tái)通過(guò)設(shè)置統(tǒng)計(jì)時(shí)間緯度生成統(tǒng)計(jì)報(bào)表,同時(shí)支持報(bào)表導(dǎo)入導(dǎo)出。
基于Netty 服務(wù)端框架開發(fā)與設(shè)備進(jìn)行通信的網(wǎng)關(guān),通過(guò)異步事件驅(qū)動(dòng)實(shí)現(xiàn)終端設(shè)備的高并發(fā)接入。終端與網(wǎng)關(guān)建立連接后,開始接收終端上傳的數(shù)據(jù),數(shù)據(jù)通過(guò)服務(wù)端非阻塞 I/O 模型的 Socket 通道進(jìn)行讀寫,數(shù)據(jù)讀寫任務(wù)完成后,其他任務(wù)根據(jù)注冊(cè)在SocketChannel的順序依次進(jìn)行。這種非阻塞的讀寫過(guò)程不僅能提升 I/O 線程的運(yùn)行效率,同時(shí)能實(shí)現(xiàn)線程并發(fā)進(jìn)行多個(gè)連接的和數(shù)據(jù)讀寫操作。
Netty的服務(wù)端架構(gòu)在處理設(shè)備連接以及I/O讀寫處理后會(huì)觸發(fā)ChannelPipeline中的自定義Handler線程池,通過(guò)異步的方式進(jìn)行終端設(shè)備信息鑒權(quán)、消息回復(fù)等業(yè)務(wù)處理,同時(shí)根據(jù)消息類型將數(shù)據(jù)存入不同的Topic中,由后續(xù)微服務(wù)業(yè)務(wù)線程消費(fèi)數(shù)據(jù)進(jìn)行業(yè)務(wù)流程,數(shù)據(jù)處理流程如圖3所示。
圖3 數(shù)據(jù)處理流程圖
在本文設(shè)計(jì)的平臺(tái)中,終端與Netty服務(wù)端之間以國(guó)標(biāo)GB/T 32960協(xié)議作為通信協(xié)議傳輸國(guó)標(biāo)數(shù)據(jù),協(xié)議格式見表1;并基于此協(xié)議擴(kuò)展了企標(biāo)數(shù)據(jù)通訊傳輸協(xié)議,協(xié)議格式見表2。表2中的企標(biāo)數(shù)據(jù)包數(shù)據(jù)結(jié)構(gòu)封裝在表1國(guó)標(biāo)協(xié)議數(shù)據(jù)包中的數(shù)據(jù)單元部分。兩種協(xié)議都是以TCP/IP網(wǎng)絡(luò)控制協(xié)議作為底層通信承載協(xié)議。數(shù)據(jù)接收端在接收到傳輸數(shù)據(jù)后應(yīng)先對(duì)數(shù)據(jù)格式進(jìn)行判斷,如果符合協(xié)議格式要求,則進(jìn)行數(shù)據(jù)業(yè)務(wù)處理;如果不符合則丟棄數(shù)據(jù),并斷開連接。
表1 國(guó)標(biāo)協(xié)議數(shù)據(jù)包結(jié)構(gòu)
表2 企標(biāo)數(shù)據(jù)包數(shù)據(jù)結(jié)構(gòu)
為進(jìn)一步提升轉(zhuǎn)發(fā)系統(tǒng)的性能,本文通過(guò)以下措施進(jìn)行優(yōu)化:
1) 將數(shù)據(jù)解析、持久化等耗時(shí)操作,加入異步線程池。
2) 通過(guò)自定義Handler 并將其分配到獨(dú)立的線程池,然后將線程池以及Handler添加到ChannelPipeline容器中,實(shí)現(xiàn)Handler的自定義線程池與WorkGroup線程池完全分離,從而最大限度避免長(zhǎng)時(shí)間程序阻塞。同時(shí)自定義線程池的大小,設(shè)置最大線程數(shù)N的參考算法[13]如下:
N=Ncpu×Ucpu×(1+W/C)
式中:Ncpu為核心CPU數(shù)目,通過(guò)Runtime.getRuntime().availableProcessors() 獲得;Ucpu為CPU使用率,一般設(shè)為1;W為線程阻塞時(shí)間;C為線程計(jì)算時(shí)間。
本文自定義線程屬于計(jì)算密集型線程,結(jié)合實(shí)際運(yùn)行環(huán)境,設(shè)置N為Ncpu+1。
3) 調(diào)整Netty服務(wù)端與客戶端的參數(shù)設(shè)置:設(shè)置TCP_NODELAY來(lái)關(guān)閉nagle算法,避免數(shù)據(jù)包堆積,從而降低延遲;設(shè)置SO_RCVBUF、SO_SNDBUF來(lái)調(diào)整接收方和發(fā)送方的收發(fā)緩存區(qū)大小,避免擁堵。
使用Jmeter模擬車載終端設(shè)備以10 s一包的頻率向網(wǎng)關(guān)發(fā)送報(bào)文。通過(guò)設(shè)置并發(fā)線程數(shù)為3 000,輪發(fā)請(qǐng)求間隔為10 s,重復(fù)30次,然后在網(wǎng)關(guān)程序中定義一個(gè)計(jì)數(shù)器,統(tǒng)計(jì)所有連接的通道數(shù)。測(cè)試結(jié)果如圖4所示:當(dāng)并發(fā)量為3 000時(shí),通過(guò)對(duì)30次試驗(yàn)取平均值,本文采用的自定義編碼解碼方式網(wǎng)關(guān)與采用傳統(tǒng)Java序列化的網(wǎng)關(guān)每秒最大接入量分別為2 047臺(tái)和1 868臺(tái),相較之下基于自定義編碼的網(wǎng)關(guān)接入量提升了9%左右。
自定義Handler時(shí)將解析數(shù)據(jù)寫入Kafka的Topic中,統(tǒng)計(jì)不同并發(fā)量下300 s內(nèi)存入Topic中的數(shù)據(jù)量作為事物處理總數(shù)(簡(jiǎn)稱TC),將TC作為通信性能測(cè)試的結(jié)果,其統(tǒng)計(jì)數(shù)如圖5所示,處理請(qǐng)求效率平均提升30%左右。
圖5 通信性能統(tǒng)計(jì)圖
使用Postman設(shè)置Monitor監(jiān)控任務(wù)對(duì)Web應(yīng)用平臺(tái)接口的響應(yīng)時(shí)間,并以視圖的形式展示接口響應(yīng)時(shí)間的測(cè)試結(jié)果。對(duì)Web應(yīng)用平臺(tái)接口進(jìn)行單次6 h的監(jiān)控,共計(jì)20次,監(jiān)控平均響應(yīng)時(shí)間為0.74 s,傳統(tǒng)平臺(tái)接口平均響應(yīng)時(shí)間為0.8 s,相較之下,平均響應(yīng)速度提高了8%左右,部分Web平臺(tái)響應(yīng)測(cè)試結(jié)果如圖6所示。
圖6 Web應(yīng)用平臺(tái)響應(yīng)測(cè)試統(tǒng)計(jì)圖
本文提出了一種新能源客車數(shù)據(jù)監(jiān)管平臺(tái)設(shè)計(jì)方法,解決了監(jiān)控平臺(tái)在實(shí)際生產(chǎn)環(huán)境中面臨的高并發(fā)壓力和數(shù)據(jù)延時(shí)長(zhǎng)的問(wèn)題,提升了平臺(tái)業(yè)務(wù)處理能力和接口請(qǐng)求響應(yīng)速率,適用于包含GB/T 32960和企標(biāo)的多種協(xié)議,滿足企業(yè)日常監(jiān)管新能源客車上傳數(shù)據(jù)業(yè)務(wù)需求的同時(shí),具有較好的擴(kuò)展性。