任桂禾 王 晶
1 北京郵電大學(xué)網(wǎng)絡(luò)與交換技術(shù)國家重點實驗室 北京 100876
2 東信北郵信息技術(shù)有限公司 北京 100191
Hadoop誕生于大數(shù)據(jù)時代,是Apache基金會受到Google開發(fā)的GFS(Google File System,谷歌文件系統(tǒng))和MapReduce計算框架的啟發(fā)引入的開源項目。Hadoop使用大量的廉價Linux PC機組成集群,可謂是大數(shù)據(jù)處理商用技術(shù)架構(gòu)的開端。Hadoop作為經(jīng)典的大數(shù)據(jù)離線處理技術(shù)架構(gòu),很好地滿足了人們對于大數(shù)據(jù)的離線處理需求[1]。
然而,隨著Web2.0的興起,琳瑯滿目的各式應(yīng)用和服務(wù)如雨后春筍般地涌現(xiàn)。這其中出現(xiàn)了以微博為代表的一批典型應(yīng)用,海量的用戶、碎片化的信息流、極快的傳播速度,使得它們對業(yè)務(wù)實時性的要求大幅度提高[2]。當(dāng)業(yè)務(wù)需求允許的時延降低到一定限度時,Hadoop架構(gòu)會達(dá)到本身的瓶頸,已經(jīng)不能滿足大數(shù)據(jù)處理的需求。Twitter出于自身的業(yè)務(wù)需求開發(fā)了Storm實時處理框架,使用流式處理架構(gòu),對傳統(tǒng)離線處理技術(shù)架構(gòu)進行了變革。
Hadoop是優(yōu)秀的大數(shù)據(jù)離線處理技術(shù)架構(gòu),主要采用的思想是“分而治之”,對大規(guī)模數(shù)據(jù)的計算進行分解,然后交由眾多的計算節(jié)點分別完成,再統(tǒng)一匯總計算結(jié)果[3]。Hadoop架構(gòu)通常的使用方式為批量收集輸入數(shù)據(jù),批量計算,然后批量吐出計算結(jié)果。然而,Hadoop結(jié)構(gòu)在處理實時性要求較高的業(yè)務(wù)時,卻顯得力不從心。本章內(nèi)容對Hadoop架構(gòu)這種瓶頸的產(chǎn)生原因進行了探究。
Hadoop架構(gòu)的核心組成部分是HDFS(Hadoop Distributed File System,Hadoop分布式文件系統(tǒng))和MapReduce分布式計算框架。HDFS采用Master/Slave體系結(jié)構(gòu),在集群中由一個主節(jié)點充當(dāng)NameNode,負(fù)責(zé)文件系統(tǒng)元數(shù)據(jù)的管理,其它多個子節(jié)點充當(dāng)Datanode,負(fù)責(zé)存儲實際的數(shù)據(jù)塊[4]。如圖1所示。
圖1 HDFS架構(gòu)
MapReduce分布式計算模型由JobTracker和TaskTracker兩類服務(wù)進程實現(xiàn),JobTracker負(fù)責(zé)任務(wù)的調(diào)度和管理,TaskTracker負(fù)責(zé)實際任務(wù)的執(zhí)行。
在筆者實施的某運營監(jiān)控系統(tǒng)項目中,業(yè)務(wù)需求為處理業(yè)務(wù)平臺產(chǎn)生的海量用戶數(shù)據(jù),展現(xiàn)業(yè)務(wù)中PV(Page View,頁面瀏覽量)、UV(Unique Visitor,獨立訪客)、營收和付費用戶數(shù)等關(guān)鍵運營指標(biāo),供領(lǐng)導(dǎo)層實時了解運營狀況,做出經(jīng)營決策。在一期項目的需求描述中,允許的計算時延是15分鐘。
根據(jù)需求,在一期項目的實施中,搭建了Hadoop平臺與Hive數(shù)據(jù)倉庫,通過編寫Hive存儲過程完成數(shù)據(jù)的處理,相當(dāng)于是一個離線的批處理過程。不同的運營指標(biāo)擁有不同的算法公式,各公式的復(fù)雜程度不同導(dǎo)致各運營指標(biāo)算法復(fù)雜度不同,因此,所需要的計算時延也各不相同,如PV指標(biāo)的計算公式相對簡單,可以在5分鐘內(nèi)完成計算,而頁面訪問成功率指標(biāo)的計算公式相對復(fù)雜,需要10分鐘以上才能完成計算。項目到達(dá)二期階段時,對實時性的要求有了進一步提高,允許的計算時延減少到5分鐘。在這種應(yīng)用場景下,Hadoop架構(gòu)已經(jīng)不能滿足需要,無法在指定的時延內(nèi)完成所有運營指標(biāo)的計算。
在以上的應(yīng)用場景中,Hadoop的瓶頸主要體現(xiàn)在以下兩點。
1)MapReduce計算框架初始化較為耗時,并不適合小規(guī)模的批處理計算。因為MapReduce框架并非輕量級框架,在運行一個作業(yè)時,需要進行很多初始化的工作,主要包括檢查作業(yè)的輸入輸出路徑,將作業(yè)的輸入數(shù)據(jù)分塊,建立作業(yè)統(tǒng)計信息以及將作業(yè)代碼的Jar文件和配置文件拷貝到HDFS上。當(dāng)輸入數(shù)據(jù)的規(guī)模很大時,框架初始化所耗費的時間遠(yuǎn)遠(yuǎn)小于計算所耗費的時間,所以初始化的時間可以忽略不計;而當(dāng)輸入數(shù)據(jù)的規(guī)模較小時,初始化所耗費的時間甚至超過了計算所耗費的時間,導(dǎo)致計算效率低下,產(chǎn)生了性能上的瓶頸。
2)Reduce任務(wù)的計算速度較慢。有的運營指標(biāo)計算公式較為復(fù)雜,為之編寫的Hive存儲過程經(jīng)Hive解釋器解析后產(chǎn)生了Reduce任務(wù),導(dǎo)致無法在指定的時延內(nèi)完成計算。這是由于Reduce任務(wù)的計算過程分為三個階段,分別是copy階段、sort階段和reduce階段。其中,copy階段要求每個計算節(jié)點從其它所有計算節(jié)點上抽取其所需的計算結(jié)果,如圖2所示。copy操作需要占用大量的網(wǎng)絡(luò)帶寬,十分耗時,從而造成Reduce任務(wù)整體計算速度較慢。
圖2 copy操作示意圖
Storm的流式處理計算模式保證了任務(wù)只需進行一次初始化,就能夠持續(xù)計算,同時使用了ZeroMQ作為底層消息隊列,有效地提高了整體架構(gòu)的數(shù)據(jù)處理效率,避免了Hadoop的瓶頸[5]。
與Hadoop主從架構(gòu)一樣,Storm也采用Master/Slave體系結(jié)構(gòu),分布式計算由Nimbus和Supervisor兩類服務(wù)進程實現(xiàn),Nimbus進程運行在集群的主節(jié)點,負(fù)責(zé)任務(wù)的指派和分發(fā),Supervisor運行在集群的從節(jié)點,負(fù)責(zé)執(zhí)行任務(wù)的具體部分。
Storm架構(gòu)中使用Spout/Bolt編程模型來對消息進行流式處理。消息流是Storm中對數(shù)據(jù)的基本抽象,一個消息流是對一條輸入數(shù)據(jù)的封裝,源源不斷輸入的消息流以分布式的方式被處理。Spout組件是消息生產(chǎn)者,是Storm架構(gòu)中的數(shù)據(jù)輸入源頭,它可以從多種異構(gòu)數(shù)據(jù)源讀取數(shù)據(jù),并發(fā)射消息流。Bolt組件負(fù)責(zé)接收Spout組件發(fā)射的信息流,并完成具體的處理邏輯。在復(fù)雜的業(yè)務(wù)邏輯中可以串聯(lián)多個Bolt組件,在每個Bolt組件中編寫各自不同的功能,從而實現(xiàn)整體的處理邏輯[6]。
Storm架構(gòu)和Hadoop架構(gòu)的總體結(jié)構(gòu)相似,各個組成部分的對比如表1所示。
表1 Storm架構(gòu)與Hadoop架構(gòu)對比
在Hadoop架構(gòu)中,主從節(jié)點分別運行JobTracker和TaskTracker進程,在Storm架構(gòu)中,主從節(jié)點分別運行Nimbus和Supervisor進程。在Hadoop架構(gòu)中,應(yīng)用程序的名稱是Job,Hadoop將一個Job解析為若干Map和Reduce任務(wù),每個Map或Reduce任務(wù)都由一個Child進程來運行,該Child進程是由TaskTracker在子節(jié)點上產(chǎn)生的子進程。在Storm架構(gòu)中,應(yīng)用程序的名稱是Topology,Storm將一個Topology劃分為若干個部分,每部分由一個Worker進程來運行,該Worker進程是Supervisor在子節(jié)點上產(chǎn)生的子進程,在每個Worker進程中存在著若干Spout和Bolt線程,分別負(fù)責(zé)Spout和Bolt組件的數(shù)據(jù)處理過程。
從應(yīng)用程序的比較中可以明顯地看到Hadoop和Storm架構(gòu)的主要不同之處。在Hadoop架構(gòu)中,應(yīng)用程序Job代表著這樣的作業(yè):輸入是確定的,作業(yè)可以在有限時間內(nèi)完成,當(dāng)作業(yè)完成時Job的生命周期走到終點,輸出確定的計算結(jié)果。而在Storm架構(gòu)中,Topology代表的并不是確定的作業(yè),而是持續(xù)的計算過程。在確定的業(yè)務(wù)邏輯處理框架下,輸入數(shù)據(jù)源源不斷地進入系統(tǒng),經(jīng)過流式處理后以較低的延遲產(chǎn)生輸出。如果不主動結(jié)束這個Topology或者關(guān)閉Storm集群,那么數(shù)據(jù)處理的過程就會持續(xù)地進行下去。
通過以上的分析,我們可以看到Storm架構(gòu)是如何解決Hadoop架構(gòu)瓶頸的。
1)Storm的Topology只需初始化一次。在將Topology提交到Storm集群的時候,集群會針對該Topology做一次初始化的工作。此后,在Topology運行過程中,對于輸入數(shù)據(jù)而言,是沒有計算框架初始化耗時的,有效地避免了計算框架初始化的時間損耗。
2)Storm使用ZeroMQ作為底層的消息隊列來傳遞消息,保證消息能夠得到快速的處理。同時,Storm采用內(nèi)存計算模式,無需借助文件存儲,直接通過網(wǎng)絡(luò)直傳中間計算結(jié)果,避免了組件之間傳輸數(shù)據(jù)的大量時間損耗。
根據(jù)業(yè)務(wù)實時性需求的變化,進行大數(shù)據(jù)處理技術(shù)架構(gòu)由Hadoop向Storm變更時,需要進行業(yè)務(wù)邏輯開發(fā)變更和計算結(jié)果輸出方式變更,在變更的同時要注意對開發(fā)成本和開發(fā)效率的考量。
當(dāng)從Hadoop架構(gòu)轉(zhuǎn)向Storm架構(gòu)后,業(yè)務(wù)邏輯需要進行重新開發(fā)。在Hadoop架構(gòu)中,業(yè)務(wù)邏輯使用HiveQL語言開發(fā)。HiveQL是Hadoop平臺提供的類SQL語言,供開發(fā)工程師編寫存儲過程以操作Hive數(shù)據(jù)倉庫中的表和數(shù)據(jù),從而完成所需的數(shù)據(jù)處理過程。在運行Hive存儲過程時,Hive解釋器會生成執(zhí)行計劃,將HiveQL語句解析成底層的MapReduce程序,提交給JobTracker去執(zhí)行[7];因此,HiveQL的開發(fā)效率較高,開發(fā)工程師無需使用JAVA語言直接編寫底層MapReduce程序,而且HiveQL的開發(fā)門檻也較低。傳統(tǒng)的數(shù)據(jù)處理一般都是在關(guān)系型數(shù)據(jù)庫如Oracle中進行,當(dāng)需要將業(yè)務(wù)邏輯從Oracle平臺遷移至Hive平臺時,Oracle數(shù)據(jù)庫開發(fā)工程師可以十分容易地進行Hive開發(fā)。
而從Hadoop架構(gòu)轉(zhuǎn)向Storm架構(gòu)后,需要開發(fā)工程師使用JAVA語言來完成業(yè)務(wù)邏輯的二次開發(fā),對開發(fā)效率和開發(fā)成本會產(chǎn)生一定的影響,這是項目規(guī)劃中需要重點考量評估的一個關(guān)鍵點。
同樣的業(yè)務(wù)邏輯,由Hadoop架構(gòu)遷移至Storm架構(gòu)中時,主要的工作量在于使用Storm編程組件實現(xiàn)HiveQL中可以直接使用的AVG、SUM、COUNT、DISTINCT以及GROUP BY等標(biāo)準(zhǔn)SQL操作。在實現(xiàn)這些功能模塊時,可以巧妙地利用Storm架構(gòu)的stream grouping特性。stream grouping定義了一系列分組方式,分組方式?jīng)Q定了消息流在各組件間如何傳遞,分組的類型主要包括shuffle grouping(隨機分組)、fields grouping(字段分組)、all grouping(全部分組)和direct grouping(直接分組)等。
例如,可以使用fields grouping字段分組機制來實現(xiàn)GROUP BY操作的功能。在運營商業(yè)務(wù)邏輯中,經(jīng)常需要統(tǒng)計分省指標(biāo),利用fields grouping機制實現(xiàn)的GROUP BY操作可以用來進行分省指標(biāo)的計算。fields grouping是這樣一種消息傳遞模式,在spout組件和bolt組件之間,按照消息中指定的某個字段來決定該消息分發(fā)至哪一個bolt。在統(tǒng)計分省指標(biāo)時,可以將省份字段設(shè)置為分組的依據(jù)。這樣,不同省份的消息可以進入不同省份對應(yīng)的bolt中,然后在每個省份對應(yīng)的bolt中對其進行處理,可以得到分省的計算指標(biāo)。
在實際應(yīng)用場景中,大數(shù)據(jù)分析處理的計算結(jié)果往往要寫入到傳統(tǒng)的關(guān)系型數(shù)據(jù)庫中,以方便對計算結(jié)果進行展示和管理。在Hadoop架構(gòu)中,可以使用Hadoop生態(tài)環(huán)境中的Sqoop工具來完成這一功能。Sqoop可以將計算結(jié)果從HDFS或Hive數(shù)據(jù)倉庫傳輸至傳統(tǒng)關(guān)系型數(shù)據(jù)庫(如Oracle和Mysql),也可以將傳統(tǒng)關(guān)系型數(shù)據(jù)庫中的數(shù)據(jù)傳輸至HDFS或Hive數(shù)據(jù)倉庫。出于程序簡潔性的考慮,可以直接將Sqoop傳輸程序插入到Hive存儲過程的結(jié)束處,在Hive計算過程完成后直接調(diào)用Sqoop傳輸程序來傳送計算結(jié)果[8]。
在Storm架構(gòu)中,輸入數(shù)據(jù)源源不斷地進入計算系統(tǒng),每時每刻都在更新計算結(jié)果。Storm的設(shè)計出于計算速度的考量,采用了內(nèi)存計算的模式,所以計算結(jié)果是存在于內(nèi)存中的。因為是使用JAVA語言進行程序編寫,所以可以直接使用JDBC的方式連接關(guān)系型數(shù)據(jù)庫,來傳輸計算結(jié)果??梢栽诹魇教幚硐到y(tǒng)的最后增加一個bolt組件,來完成這一功能[9]。
如果在每條輸入數(shù)據(jù)更新計算結(jié)果后,都寫入關(guān)系型數(shù)據(jù)庫的話,會對關(guān)系型數(shù)據(jù)庫造成較大的壓力。可以根據(jù)時延的要求,選擇以固定的時間頻率寫入關(guān)系型數(shù)據(jù)庫。
隨著互聯(lián)網(wǎng)的飛速發(fā)展,新的業(yè)務(wù)對數(shù)據(jù)處理的實時性要求不斷提高。當(dāng)傳統(tǒng)的離線處理架構(gòu)難以滿足實時性要求的時候,可以適時考慮更換大數(shù)據(jù)處理技術(shù)架構(gòu)來完成業(yè)務(wù)需求。信息社會瞬息萬變,我們需要不斷地變革和創(chuàng)新,才能為社會創(chuàng)造更好的互聯(lián)網(wǎng)服務(wù)。
[1]崔杰,李陶深,蘭紅星.基于Hadoop的海量數(shù)據(jù)存儲平臺設(shè)計與開發(fā)[J].計算機研究與發(fā)展,2012(49):12-18
[2]李美敏.解讀Web 2.0時代的微博文化[EB/OL].[2014-10-20].http://media.people.com.cn/GB/22114/206896/239176/17143067.html
[3]董新華,李瑞軒,周灣灣,等.Hadoop系統(tǒng)性能優(yōu)化與功能增強綜述[J].計算機研究與發(fā)展,2013(50):1-15
[4]林偉偉.一種改進的Hadoop數(shù)據(jù)放置策略[J].華南理工大學(xué)學(xué)報,2012(40):152-158
[5]趙建紅.基于Twitter Storm的數(shù)據(jù)實時分析處理工具研究[J].商情,2013(8):274-275
[6]胡宇舟,范濱,顧學(xué)道,等.基于Storm的云計算在自動清分系統(tǒng)中的實時處理應(yīng)用[J].2014(34):96-99
[7]沙恒,貼軍.基于Hadoop子項目——Hive的云計算性能測試[J].軟件導(dǎo)刊,2012(11):14-16
[8]NextMark.Sqoop在Hadoop和關(guān)系型數(shù)據(jù)庫之間的數(shù)據(jù)轉(zhuǎn)移[EB/OL].[2014-10-20].http://www.linuxidc.com/Linux/2014-02/97305.htm
[9]韋海清.淺談Java通過JDBC連接Oracle數(shù)據(jù)庫技術(shù)[J].計算機光盤軟件與應(yīng)用,2014(7):298-300