彭嬌
摘要:隨著時(shí)代的發(fā)展,“大數(shù)據(jù)”已經(jīng)成為一個(gè)耳熟能詳?shù)脑~匯,與此同時(shí),大數(shù)據(jù)處理框架Hadoop也逐漸成為數(shù)據(jù)處理挖掘行業(yè)廣泛使用的主流技術(shù),而Storm作為“實(shí)時(shí)的Hadoop”,實(shí)現(xiàn)了大規(guī)模實(shí)時(shí)數(shù)據(jù)流處理的需求,達(dá)到實(shí)時(shí)數(shù)據(jù)監(jiān)控的目的。本文將基于Storm框架,利用手機(jī)信令、Logstash、Kafka等技術(shù)實(shí)時(shí)采集人流地理位置坐標(biāo)數(shù)據(jù),以特殊高亮的形式顯示訪客熱衷的區(qū)域和訪客所在的地理區(qū)域的圖示,能夠準(zhǔn)確并及時(shí)地分析人流信息,為出行者提供最優(yōu)質(zhì)的選擇。
關(guān)鍵字: Hadoop;Storm;實(shí)時(shí)處理
引言
隨著城市規(guī)模的擴(kuò)張和交通的便利,閑暇時(shí)間的出游變的日?;5鞘薪煌赡軙?yàn)闊o法高效地利用路線和實(shí)時(shí)監(jiān)控,很容易造成交通堵塞。改善城市的出行、提高出行效率成為構(gòu)建智慧城市的當(dāng)務(wù)之急。如何在海量的交通、出行數(shù)據(jù)中,及時(shí)準(zhǔn)備分析的當(dāng)前的人流信息并進(jìn)行監(jiān)控管理,為出行者提供優(yōu)質(zhì)的交通引導(dǎo)服務(wù),減少擁堵狀況,成為城市智能規(guī)劃的核心所在。
在大數(shù)據(jù)時(shí)代,大數(shù)據(jù)處理的典型工具Hadoop是一個(gè)由Apache基金會所開發(fā)的分布式系統(tǒng)基礎(chǔ)架構(gòu),主要解決的是海量數(shù)據(jù)的存儲和分析計(jì)算問題,作為處理大數(shù)據(jù)的分布式存儲和計(jì)算框架,得到了國內(nèi)外大、中、小型企業(yè)的廣泛應(yīng)用。不過它并不是一套實(shí)時(shí)系統(tǒng)。為了解決這個(gè)問題,計(jì)算機(jī)工程師們又開發(fā)了Storm和Kafka。 Apache Storm是一套開源的分布式實(shí)時(shí)計(jì)算系統(tǒng)。最早由Nathan Marz開發(fā),在被Twitter收購后開源,并在2014年9月起成為Apache頂級開源項(xiàng)目。Storm被廣泛用于各種商業(yè)網(wǎng)站,包括 Twitter、Yelp、Groupon、百度、淘寶等。Storm的使用場景非常廣泛,例如實(shí)時(shí)分析、在線機(jī)器學(xué)習(xí)、連續(xù)計(jì)算、分部署RPC、ET 等。Storm有著非??斓奶幚硭俣?,單節(jié)點(diǎn)可以達(dá)到百萬個(gè)元組每秒,此外它還具有高擴(kuò)展、容錯、保證數(shù)據(jù)處理等特性。
本文以IDEA為開發(fā)工具,利用Hadoop生態(tài)圈中的Storm框架、Zookeeper協(xié)調(diào)服務(wù)節(jié)點(diǎn)集群、Kafka中間件等運(yùn)行環(huán)境,對人流信息數(shù)據(jù)進(jìn)行流式處理和實(shí)時(shí)分析,并將結(jié)果反饋到基于J2EE架構(gòu)的平臺中,實(shí)時(shí)顯示人流的熱力分布圖。
1 實(shí)時(shí)數(shù)據(jù)流處理框架
Storm是一個(gè)免費(fèi)開源、分布式、高容錯的實(shí)時(shí)計(jì)算系統(tǒng)。Storm令持續(xù)不斷的流計(jì)算變得容易,彌補(bǔ)了Hadoop批處理所不能滿足的實(shí)時(shí)要求。Storm經(jīng)常用于在實(shí)時(shí)分析、在線機(jī)器學(xué)習(xí)、持續(xù)計(jì)算、分布式遠(yuǎn)程調(diào)用和ETL等領(lǐng)域。
Storm 采用主從架構(gòu),主要分為兩種組件Nimbus和Supervisor,這兩種組件都是快速失敗的,沒有狀態(tài),使利用 Zookeeper來協(xié)調(diào)狀態(tài)和保存集群運(yùn)行的狀態(tài)信息,如圖1所示。
Nimbus負(fù)責(zé)在集群里面發(fā)送代碼,分配工作給機(jī)器,并且監(jiān)控狀態(tài),全局只有一個(gè)。
Supervisor會監(jiān)聽分配給它那臺機(jī)器的工作,根據(jù)需要啟動/關(guān)閉工作進(jìn)程Worker。每一個(gè)要運(yùn)行Storm的機(jī)器上都要部署一個(gè),并且,按照機(jī)器的配置設(shè)定上面分配的槽位數(shù)。
Zookeeper是Storm重點(diǎn)依賴的外部資源。Nimbus和Supervisor甚至實(shí)際運(yùn)行的Worker都是把心跳保存在Zookeeper上的。Nimbus也是根據(jù)Zookeeper上的心跳和任務(wù)運(yùn)行狀況,進(jìn)行調(diào)度和任務(wù)分配的。
Storm提交運(yùn)行的程序稱為Topology。Topology處理的最小的消息單位是一個(gè)Tuple,也就是一個(gè)任意對象的數(shù)組。Topology由Spout和Bolt構(gòu)成。Spout是發(fā)出Tuple的結(jié)點(diǎn)。Bolt可以隨意訂閱某個(gè)Spout或者Bolt發(fā)出的Tuple。Spout和Bolt都統(tǒng)稱為component。
2 實(shí)驗(yàn)與分析
2.1 軟硬件環(huán)境參數(shù)設(shè)置
硬件環(huán)境:CPU:酷睿i3主頻3.0GHz 內(nèi)存:8G 硬盤空間:1T
軟件環(huán)境:jdk-8u161-linux-x64.tar 、apache-storm-1.1.3.tar、logstash-2.4.1.tar、kafka_2.11-0.9.0.0、zookeeper-3.4.5-cdh5.7.0.tar
操作系統(tǒng):CentOS-6.5-x86_64-bin
2.2 平臺架構(gòu)及處理流程
本平臺通過Logstash實(shí)時(shí)的將采集到的數(shù)據(jù)存儲到Kafka得broker集群中,Storm集群中的節(jié)點(diǎn)對Kafka中的數(shù)據(jù)進(jìn)行處理,將處理后的數(shù)據(jù)輸出的數(shù)據(jù)庫DB中,將DB中的數(shù)據(jù)通過地圖的API上進(jìn)行展示。其中Kafka集群和Storm集群的運(yùn)行都依賴于Zookeeper。具體流程如圖2所示。
(1)數(shù)據(jù)采集
本系統(tǒng)可以依靠手機(jī)移動網(wǎng)絡(luò)信令來獲取用戶所在位置的經(jīng)緯度,從而收集獲取人流信息,通過對信令信息的相應(yīng)字段進(jìn)行分析、挖掘,并結(jié)合GIS技術(shù)實(shí)現(xiàn)自定義區(qū)域內(nèi)實(shí)時(shí)人流量的智能化統(tǒng)計(jì)分析。
(2)搭建集群環(huán)境
使用虛擬機(jī)VMWare構(gòu)建有3個(gè)節(jié)點(diǎn)的集群環(huán)境,包括一個(gè)主節(jié)點(diǎn)master,和兩個(gè)從節(jié)點(diǎn)slave1和slave2。在集群環(huán)境搭建的過程中需要完成以下步驟:
完成主機(jī)名的修改:編輯/etc/sysconfig/network文件,修改hostname的值為master。
配置hosts文件,要用于確定結(jié)點(diǎn)的IP地址,方便后續(xù)對節(jié)點(diǎn)能快速查到并訪問。
關(guān)閉防火墻:systemctl stop firewalld
安裝JDK并完成環(huán)境變量的配置:在~/.bash_profile文件中設(shè)置JDK的環(huán)境變量 JAVA_HOME和PATH。
搭建集群環(huán)境:對master節(jié)點(diǎn)進(jìn)行克隆,克隆出slave1、slave2,重復(fù)上面的步驟,修改主機(jī)名、修改主機(jī)名與ip地址的映射的host文件。
下載安裝并設(shè)置ntp,用來同步網(wǎng)絡(luò)中各個(gè)計(jì)算機(jī)的時(shí)間。
配置SSH免密碼登錄,以提高節(jié)點(diǎn)之間的訪問效率。
(3) Storm周邊環(huán)境的搭建
從圖2種我們可以看到,Storm的周邊環(huán)境包括Zookeeper、logstash、Kafka。
Zookeeper的安裝及配置:解壓Zookeeper的安裝包, 在~/.bash_profile文件中配置Zookeeper的環(huán)境變量,編輯zookeeper的配置文件zoo.cfg,添加zookeeper數(shù)據(jù)保存的路徑dataDir的參數(shù)值。
Logstash的配置及使用:Logstash 是一個(gè)開源的數(shù)據(jù)收集引擎,它具有備實(shí)時(shí)數(shù)據(jù)傳輸能力。它可以統(tǒng)一過濾來自不同源的數(shù)據(jù),并按照開發(fā)者的制定的規(guī)范輸出到目的地。解壓Logstash的安裝包, 在~/.bash_profile文件中配置Logstash的環(huán)境變量后即可使用Logstash完成數(shù)據(jù)的輸入和輸出。
Kafka的配置及使用:Kafka主要是為實(shí)時(shí)數(shù)據(jù)提供一個(gè)統(tǒng)一、高吞吐、低延遲的處理平臺。Kafka集群包含一個(gè)或多個(gè)服務(wù)器broker,每條發(fā)布到Kafka集群的消息都有一個(gè)類別,稱為Topic,每個(gè)Topic包含一個(gè)或多個(gè)Partition,Producer負(fù)責(zé)發(fā)布消息到Kafka broker,Consumer消息消費(fèi)者,向Kafka broker讀取消息的客戶端。所以Kafka安裝配置后首先需要建立一個(gè)topic。
(4)搭建Storm架構(gòu)
使用zkServer.sh start命令啟動Zookeeper集群服務(wù)。
搭建storm的集群環(huán)境,首先解壓apache-storm,在~/.bash_profile文件中配置storm環(huán)境變量,然后編寫conf/storm-env.sh和conf/storm.yaml文件,最后就可以啟動集群的storm環(huán)境了。
(5)Storm整合Kafka并存儲展示數(shù)據(jù)
Storm整合對接Kafka,首先需要在maven的pow.xml中加載一個(gè)storm-kafka依賴,然后配置一個(gè)SpoutConfig的對象,此對象主要是配置kafka相關(guān)的環(huán)境、主題、重試策略、消費(fèi)的初始偏移量等等參數(shù)。最后將處理好的數(shù)據(jù)即Storm的tuple數(shù)據(jù)寫入到MySql中。最后可以SpringBoot構(gòu)建Web項(xiàng)目將統(tǒng)計(jì)的結(jié)果進(jìn)行展示。
3 結(jié)束語
Storm 的計(jì)算架構(gòu)處理海量人流位置數(shù)據(jù)的定位,通過與lodstash、Kafka 和mysql 結(jié)合對人流信息進(jìn)行了實(shí)時(shí)性的分析和處理,通過地圖API實(shí)時(shí)顯示人流所在地理位置的展示,能夠準(zhǔn)確并及時(shí)地分析人流信息,為出行者提供最優(yōu)質(zhì)的交通引導(dǎo)服務(wù),減少擁堵狀況。
參考文獻(xiàn):
[1] 李團(tuán)結(jié),從新法,李光明. 日志綜合管理平臺基于Storm框架的實(shí)現(xiàn)[J].中國新通信,2017(3):41-46[
[2] 2] 朱群. 基于Storm的交通信息實(shí)時(shí)處理系統(tǒng)的設(shè)計(jì)與實(shí)現(xiàn)[D]. 西安電子科技大學(xué),2017.6
[3] 蔡正義. 基于大數(shù)據(jù)的城市居民出行分析建模[D]. 浙江大學(xué),2018.7