章昱 鐘茂生 童維勤 嚴(yán)偉安
(1.江西師范大學(xué)計算機(jī)信息工程學(xué)院 江西省南昌市 330027 2.上海大學(xué) 上海市 200444)
大數(shù)據(jù)的批處理技術(shù)應(yīng)用于大規(guī)模靜態(tài)數(shù)據(jù)集的離線計算和處理,架構(gòu)設(shè)計的初衷是為了解決大規(guī)模、非實(shí)時數(shù)據(jù)計算,以吞吐量大為顯著特征。在大數(shù)據(jù)批處理技術(shù)包括兩種計算模型:MapReduce 計算模型和DAG 模型。
MapReduce 的設(shè)計理念是“計算向數(shù)據(jù)靠攏”,而不是“數(shù)據(jù)向計算靠攏”。MapReduce 將復(fù)雜的、運(yùn)行于大規(guī)模集群上的并行計算過程高度地抽象到了兩個函數(shù):Map()和Reduce()。使用簡單的編程接口,不需要掌握分布式并行編程細(xì)節(jié),可以很容易的將自己的程序運(yùn)行在分布式系統(tǒng)上,完成海量數(shù)據(jù)的計算。
MapReduce 編程模型的示意圖如圖1 所示。其中,Map操作是對一部分原始數(shù)據(jù)進(jìn)行對應(yīng)的操作,每個Map 操作都針對不同的原始數(shù)據(jù),因此Map 與Map 之間是相互獨(dú)立的,這使得他們可以充分的并行化。Reduce 操作是對每個Map 所產(chǎn)生的一部分中間結(jié)果進(jìn)行合并操作,每個Reduce所處理的Map 中間結(jié)果是互不交叉的,所有Reduce 產(chǎn)生的最終結(jié)果經(jīng)過簡單連接就形成了完整結(jié)果集。
圖1:MapReduce 編程模型示意圖
開發(fā)者只需要編寫兩個函數(shù):
Map(in_key , in_value) -> {( key, value) | j = 1 , … , k };
Reduce(key , [ value1 , … , valuem ]) -> ( key , final_value);
Map 的輸入?yún)?shù):in_key 和in_value,它指明了Map 需要處理的原始數(shù)據(jù),Map 的輸出結(jié)果:一組< key , value >對,這是經(jīng)過Map 操作后所產(chǎn)生的中間結(jié)果,用于后續(xù)Reduce的輸入。Reduce 的輸入?yún)?shù):(key , [ value1 , … , valuem ]),此操作是對這些對應(yīng)相同key 的value 值進(jìn)行歸并操作,Reduce 的輸出結(jié)果:( key , final_value ),所有Reduce 的結(jié)果并在一起就是最終結(jié)果。
下面用一個簡單的實(shí)例來對MapReduce 的編程模型進(jìn)行講解。
假定給出了10 萬本長篇英文小說的文字,如何統(tǒng)計每個字母出現(xiàn)的次數(shù)?這個問題看似簡單,但是如果使用傳統(tǒng)的數(shù)據(jù)處理方式,在單機(jī)環(huán)境下想要快速的進(jìn)行統(tǒng)計還是需要一些技巧的,主要原因是數(shù)據(jù)規(guī)模巨大,導(dǎo)致的處理速度慢和運(yùn)行內(nèi)存不足。MapReduce 計算模型下實(shí)現(xiàn)這個功能很簡單直觀,只要完成Map 和Reduce 操作的業(yè)務(wù)邏輯即可。這個任務(wù)對應(yīng)的Map 操作和Reduce 操作如表1 所示。
表1:Map 操作和Reduce 操作
MapReduce 模型工作原理如圖2 所示,其最初設(shè)計方案是將MapReduce 模型運(yùn)行在由低端計算機(jī)組成的大型集群上。集群中每臺計算機(jī)包含一個工作節(jié)點(diǎn)(Worker)、一個較快的主內(nèi)存和一個輔助存儲器。其中,工作節(jié)點(diǎn)用于數(shù)據(jù)的處理;主內(nèi)存用于暫存工作節(jié)點(diǎn)的輸出數(shù)據(jù);輔助存儲器組成了集群的全局共享存儲器,用于存儲全部的初始數(shù)據(jù)和工作節(jié)點(diǎn)的輸出數(shù)據(jù),并且計算機(jī)之間可以通過底層網(wǎng)絡(luò)實(shí)現(xiàn)輔助存儲器的同步遠(yuǎn)程互訪。
圖2:MapReduce 計算架構(gòu)工作原理圖
由圖2 可知,一個MapReduce 作業(yè)是由Map 和Reduce兩個階段組成,每一個階段包括數(shù)據(jù)輸入、計算處理和數(shù)據(jù)輸出三個步驟。其中每一個階段的輸出數(shù)據(jù)被當(dāng)作下一階段的輸入數(shù)據(jù),而且只有當(dāng)每一個計算機(jī)都將它的輸出數(shù)據(jù)寫入共享存儲器并完成數(shù)據(jù)同步后,計算機(jī)才可以讀取它前一個階段寫入共享存儲器的數(shù)據(jù)進(jìn)行數(shù)據(jù)互相訪問。除此方式以外,各個計算機(jī)之間不存在其他的數(shù)據(jù)交互方式(主節(jié)點(diǎn)Master 除外)。
1.3.1 優(yōu)點(diǎn)
(1)硬件要求低,MapReduce 模型的設(shè)計是面向由數(shù)千臺中低端計算機(jī)組成的大規(guī)模集群,并能夠保證在現(xiàn)有的異構(gòu)集群中運(yùn)行;
(2)接口化,MapReduce 模型通過簡單的接口實(shí)現(xiàn)了大規(guī)模分布式計算的自動并行化,屏蔽了需要大量并行代碼去實(shí)現(xiàn)的容錯、負(fù)載均衡和數(shù)據(jù)分布等復(fù)雜細(xì)節(jié),程序員只需關(guān)注實(shí)際操作數(shù)據(jù)的Map 函數(shù)和Reduce 函數(shù);
(3)編程語言多樣化,MapReduce 模型支持Java、C、C++、Python、Shell、PHP、Ruby 等多種開發(fā)語言;
(4)擴(kuò)展性強(qiáng),MapReduce 模型采用的Shared-Nothing結(jié)構(gòu)保證了其良好的伸縮性,同時,使其具有了各個節(jié)點(diǎn)間的松耦合性和較強(qiáng)的容錯能力,節(jié)點(diǎn)可以被任意地從集群中移除,幾乎不影響現(xiàn)有任務(wù)的執(zhí)行;
(5)數(shù)據(jù)分析低延遲,基于MapReduce模型的數(shù)據(jù)分析,無需復(fù)雜的數(shù)據(jù)預(yù)處理和寫入數(shù)據(jù)庫的過程,而是直接基于平面文件進(jìn)行分析,這種移動計算而非移動數(shù)據(jù)的計算模式可以將分析延遲最小化。
1.3.2 缺點(diǎn)
(1)無法達(dá)到數(shù)據(jù)實(shí)時處理,MapReduce 模型設(shè)計初衷是為解決大規(guī)模、而非實(shí)時數(shù)據(jù)問題,因此在大數(shù)據(jù)時代,MapReduce 并不能滿足大數(shù)據(jù)實(shí)時處理的需求;
(2)程序員負(fù)擔(dān)增加,MapReduce 模型將文件存儲格式的設(shè)計、模式信息的記錄以及數(shù)據(jù)處理算法的實(shí)現(xiàn)等工作全部交由程序員完成,從而導(dǎo)致程序員的負(fù)擔(dān)過重;
(3)I/O(Input or Output,輸入輸出)代價較高,MapReduce 的輸入數(shù)據(jù)并不能“貫穿”整個MapReduce 流程,在Map 階段結(jié)束后數(shù)據(jù)由內(nèi)存寫入本地存儲,Reduce 階段的輸入數(shù)據(jù)需要從本地存儲重新讀取,這種基于掃描的處理模式和對中間結(jié)果步步處理的執(zhí)行策略,導(dǎo)致了較高的I/O代價。
MapReduce 雖然解決了批處理領(lǐng)域中的部分需求,可是它也存在或多或少的局限性,比如I/O 代價較高。為了彌補(bǔ)這些不足,行業(yè)中衍生出了另一類基于DAG(Directed Acyclic Graph,有向無環(huán)圖)計算模型的大數(shù)據(jù)計算架構(gòu)。DAG 是數(shù)據(jù)結(jié)構(gòu)領(lǐng)域的概念。在大數(shù)據(jù)領(lǐng)域,DAG 計算模型往往指將計算任務(wù)在內(nèi)部分解為若干個子任務(wù),這些子任務(wù)之間由邏輯關(guān)系或運(yùn)行先后順序等因素被構(gòu)建成有向無環(huán)圖結(jié)構(gòu)。
本節(jié)將以Spark 計算架構(gòu)為例,介紹在批處理領(lǐng)域中DAG 模型的工作原理。DAG 是在分布式計算中非常常見的一種結(jié)構(gòu),因為其通用性強(qiáng),所以表達(dá)能力自然也強(qiáng)。比如前面介紹的MapReduce 計算模型,在本質(zhì)上是DAG 模型的一種特例。
Spark 運(yùn)行架構(gòu)如圖3 所示,其中包括集群資源管理器(Cluster Manager)、運(yùn)行作業(yè)任務(wù)的工作節(jié)點(diǎn)(Worker Node)、每個應(yīng)用的任務(wù)控制節(jié)點(diǎn)(Driver)和每個工作節(jié)點(diǎn)上負(fù)責(zé)具體任務(wù)的執(zhí)行進(jìn)程(Executor),資源管理器可以用自帶的、Mesos 或YARN。
圖3:Spark 運(yùn)行架構(gòu)圖
(1)Application:用戶編寫的Spark 應(yīng)用程序;
(2)Driver:Spark 中的Driver 即運(yùn)行上述Application的main 函數(shù)并創(chuàng)建SparkContext;
(3)Cluter Manager:指的是對集群進(jìn)行資源管理的外部服務(wù)。
(4)Executor:是運(yùn)行在工作節(jié)點(diǎn)(Worker Node)的一個進(jìn)程,負(fù)責(zé)運(yùn)行Task;
(5)Task: 運(yùn)行在Executor 上的工作單元;
(6)RDD(Resillient Distributed Dataset,彈性分布式數(shù)據(jù)集):分布式內(nèi)存的一個抽象概念,提供了一種高度受限的共享內(nèi)存模型;
(7)Job:一個Job 包含多個RDD 及作用于相應(yīng)RDD上的各種操作;
一個Application 由一個Driver 和若干個Job 構(gòu)成,一個Job 由多個Stage 構(gòu)成,一個Stage 由多個沒有Shuffle 關(guān)系的Task 組成,包含關(guān)系如圖4 所示。
圖4:Application 結(jié)構(gòu)圖
當(dāng)執(zhí)行一個Application 時,Driver 會向集群管理器申請資源,啟動Executor,并向Executor 發(fā)送應(yīng)用程序代碼和文件,然后在Executor 上執(zhí)行Task,運(yùn)行結(jié)束后,執(zhí)行結(jié)果會返回給Driver,或者寫到HDFS 或者其它數(shù)據(jù)庫中。
與MapReduce 計算架構(gòu)相比,Spark 所采用的Executor有兩個優(yōu)點(diǎn):
(1)利用多線程來執(zhí)行具體的任務(wù)減少任務(wù)的啟動開銷;
(2)Executor 中有一個BlockManager 存儲模塊,會將內(nèi)存和磁盤共同作為存儲設(shè)備,有效減少IO 開銷。
Spark 的運(yùn)行基本流程圖如圖5 所示。分為以下4 步進(jìn)行解釋:
圖5:Spark 運(yùn)行的基本流程圖
(1)首先為應(yīng)用構(gòu)建起基本的運(yùn)行環(huán)境,即有Driver創(chuàng)建一個SparkContext,進(jìn)行資源申請、任務(wù)分配和狀態(tài)監(jiān)控;
(2)資源管理器為Executor 分配資源,并啟動Executor 進(jìn)程;
(3)SparkContext 根據(jù)RDD 的依賴關(guān)系構(gòu)建DAG 圖,DAG 圖提交給DAG Scheduler 解析成Stage,然后把一個個TaskSet 提交給底層調(diào)度器Task Scheduler 處理;Excutor向SparkContext 申請Task,Task Scheduler 將Task 發(fā)放給Executor 運(yùn)行,并提供程序代碼;
(4)Task 在Excutor 上運(yùn)行,把執(zhí)行結(jié)果反饋給Task Scheduler,然后反饋給DAG Scheduler,運(yùn)行完畢后寫入數(shù)據(jù)并釋放所有資源。
總體而言,Spark 運(yùn)行架構(gòu)具有以下特點(diǎn):
(1)每個Application 都有自己專屬的Executor 進(jìn)程,并且該進(jìn)程在Application 運(yùn)行期間一直駐留,Executor 進(jìn)程以多線程的方式運(yùn)行Task;
(2)Spark 運(yùn)行過程與資源管理器無關(guān),只要能夠獲取Executor 進(jìn)程并保持通信即可;
(3)BlockManager 將中間數(shù)據(jù)存儲于內(nèi)存或磁盤,實(shí)現(xiàn)緩存機(jī)制;
(4)Task 采用了數(shù)據(jù)本地性和推測執(zhí)行等優(yōu)化機(jī)制。
RDD 依賴關(guān)系,也就是有依賴的RDD 之間的關(guān)系,比如RDD1——>RDD2(RDD1 生成RDD2),RDD2 依賴于RDD1。這里的生成也就是RDD 的Transformation 操作。
RDD 之間的依賴關(guān)系分為窄(narrow)依賴(如圖6 (a)所示)和寬(shuffle/wide)依賴(如圖6(b)所示)。窄依賴表現(xiàn)為一個父RDD 的分區(qū)對應(yīng)于一個子RDD 的分區(qū),或多個父RDD 的分區(qū)對應(yīng)于一個子RDD 的分區(qū)。寬依賴則表現(xiàn)為存在一個父RDD 的一個分區(qū)對應(yīng)一個子RDD 的多個分區(qū)。
圖6:RDD 依賴關(guān)系圖
Stage 是Job 的基本調(diào)度單位,一個Job 會分為多組Task,每組Task 被稱為Stage,或者也被稱為TaskSet,代表一組關(guān)聯(lián)的,是相互之間沒有Shuffle 依賴關(guān)系的任務(wù)組成的任務(wù)集。
Stage 的劃分主要有三大原則:
(1)將窄依賴的RDD 歸并到同一個Stage 中;
(2)將寬依賴的RDD 前后拆分為兩個Stage,前一個Stage 寫完文件后,下一個Stage 才能開始;
(3)進(jìn)行Action 操作時,相關(guān)RDD 會歸并在同一個Stage 中,這個Stage 稱為ResultStage,沒有輸出,而是直接產(chǎn)生結(jié)果或進(jìn)行存儲。除ResultStage 外,稱為SuffleStage。
如圖7 所示是一個Stage 劃分的示意圖。其中的RDD被劃分為3 個Stage,在Stage2 中,從map 到union 都是窄依賴,這兩步操作可以形成一個流水線操作:分區(qū)7 通過map 操作生成分區(qū)9,可以不用等待分區(qū)8 到分區(qū)10 這步map 操作的計算結(jié)束,而是繼續(xù)進(jìn)行union 操作,得到分區(qū)13。這樣流水線執(zhí)行大大提高了程序的計算效率。
圖7:Stage 劃分樣例示意圖
至此,Spark 的運(yùn)行原理由整體到局部已解釋完畢,從中不難看出,Spark 使用的DAG 計算模式較之MapReduce有著諸多優(yōu)勢:
(1)不局限于Map 和Reduce 兩個算子,編程模型更靈活,表達(dá)能力更強(qiáng);
(2)Spark 提供了內(nèi)存計算,可以將SuffleStage 產(chǎn)生的中間結(jié)果保存在內(nèi)存中,較之磁盤而言,迭代的效率更高;
(3)DAG 計算模型將Tasks 分為不同Stage,同一個Stage 中的任務(wù)可以并行計算,極大的提高了程序的計算效率。
隨著處理靜態(tài)的大數(shù)據(jù)的MapReduce 計算模型、DAG模型的成熟,在各個行業(yè)都有著應(yīng)用上的發(fā)展,如交通數(shù)據(jù)分析、商業(yè)零售數(shù)據(jù)分析、電影影評數(shù)據(jù)分析、互聯(lián)網(wǎng)微博數(shù)據(jù)、用戶上網(wǎng)行為分析等。靜態(tài)分析模型用于教育領(lǐng)域?qū)π袠I(yè)數(shù)據(jù)復(fù)盤,歷史數(shù)據(jù)刻畫有著重要的作用。
隨產(chǎn)業(yè)界對大數(shù)據(jù)處理技術(shù)的進(jìn)一步需求,若數(shù)據(jù)吞吐量需求增大,實(shí)時性要求更高,各種特定業(yè)務(wù)需求越來越復(fù)雜,那么對大數(shù)據(jù)計算技術(shù)的需求就越豐富。
就拿物聯(lián)網(wǎng)來舉例,其傳感器所產(chǎn)生的數(shù)據(jù),都是不斷增加的,產(chǎn)生的速度也是極快的,而且這些數(shù)據(jù)需要被實(shí)時的進(jìn)行處理,比如全國各地所有大橋的傳感器監(jiān)測數(shù)據(jù),需要被實(shí)時處理,得到各種健康指標(biāo)來判斷橋梁是否健康,批處理技術(shù)已經(jīng)無法滿足這種需求,因此需要流處理技術(shù),如Spark Streaming,其思想是將實(shí)時流入的數(shù)據(jù)按一定時間片(通常在0.5~2 秒之間)組合成一小批一小批的數(shù)據(jù),再使用批處理進(jìn)行計算處理,由于其是構(gòu)建在Spark 基礎(chǔ)之上的,所以其與Spark 系的系統(tǒng)進(jìn)行整合,風(fēng)險和成本消耗都不會很大,如果實(shí)時需求不是特別高的話,是個不錯的選擇;普通的流處理(數(shù)據(jù)流進(jìn),計算,數(shù)據(jù)流出),每次計算不會考慮歷史數(shù)據(jù)的計算結(jié)果,即是無狀態(tài)的計算。
因此,從軟件上講,大數(shù)據(jù)處理技術(shù)為應(yīng)對產(chǎn)業(yè)界的需求,在算法上結(jié)合機(jī)器學(xué)習(xí)和人工智能,研究適用于不同數(shù)據(jù)處理類型的軟件框架,以及對以上架構(gòu)進(jìn)行整合是大數(shù)據(jù)分析和處理行業(yè)的發(fā)展趨勢。