馬卿云 季航旭 趙宇海 毛克明 王國(guó)仁
1(東北大學(xué)計(jì)算機(jī)科學(xué)與工程學(xué)院 沈陽(yáng) 110169)
2(東北大學(xué)軟件學(xué)院 沈陽(yáng) 110169)
3(北京理工大學(xué)計(jì)算機(jī)學(xué)院 北京 100081)
隨著物聯(lián)網(wǎng)、移動(dòng)互聯(lián)網(wǎng)、產(chǎn)業(yè)互聯(lián)網(wǎng)和社交媒體等技術(shù)的飛速發(fā)展,每天都會(huì)產(chǎn)生大量的數(shù)據(jù),人們已經(jīng)身處大數(shù)據(jù)時(shí)代[1].根據(jù)國(guó)際數(shù)據(jù)公司(International Data Corporation, IDC)的預(yù)測(cè),到2025年,全球的數(shù)據(jù)量將是現(xiàn)在的10倍,達(dá)到175 ZB.
大數(shù)據(jù)中有著豐富的信息,并且蘊(yùn)含著巨大的價(jià)值[2].谷歌通過(guò)用戶搜索詞頻的變化成功對(duì)冬季流感進(jìn)行了預(yù)測(cè),沃爾瑪通過(guò)分析消費(fèi)者購(gòu)物行為對(duì)紙尿褲和啤酒進(jìn)行共同銷售,這些耳熟能詳?shù)陌咐加∽C了這一點(diǎn).但隨著數(shù)據(jù)產(chǎn)生速度的加快,數(shù)據(jù)量的急劇增長(zhǎng),如何對(duì)龐大的數(shù)據(jù)進(jìn)行處理成為了新的難題.傳統(tǒng)的單機(jī)處理已經(jīng)無(wú)法滿足大數(shù)據(jù)的需求,分布式的大數(shù)據(jù)處理框架應(yīng)運(yùn)而生.谷歌首先提出了用于大規(guī)模數(shù)據(jù)并行計(jì)算的編程模型MapReduce[3],引起了極大的反響,也因此促使了Hadoop[4]的誕生.之后為了改進(jìn)傳統(tǒng)MapReduce中運(yùn)行效率低下的問(wèn)題,基于內(nèi)存計(jì)算的Spark[5]被提出.時(shí)至今日,為了追求更快的處理速度、更低的時(shí)延,F(xiàn)link[6]開(kāi)始嶄露頭角,并得到了飛速的發(fā)展.
與此同時(shí),隨著云計(jì)算[7]的興起,包括谷歌、微軟、阿里巴巴等在內(nèi)的互聯(lián)網(wǎng)公司都提供了大數(shù)據(jù)存儲(chǔ)與分析的相關(guān)服務(wù),眾多企業(yè)開(kāi)始選擇將自己的業(yè)務(wù)上“云”.這些提供云服務(wù)的公司需要存儲(chǔ)和處理的數(shù)據(jù)同樣是海量的,為了更好地為客戶提供服務(wù),提供云服務(wù)的公司通常都會(huì)在各地建立數(shù)據(jù)中心[8],例如微軟和谷歌在世界各地就分布著超過(guò)十個(gè)的數(shù)據(jù)中心.各數(shù)據(jù)中心之間經(jīng)常需要聯(lián)合進(jìn)行數(shù)據(jù)分析,此時(shí)分布式大數(shù)據(jù)處理框架依然是不二之選.跨數(shù)據(jù)中心的大數(shù)據(jù)分析業(yè)務(wù)許多都是數(shù)據(jù)密集型作業(yè),作業(yè)運(yùn)行過(guò)程中,通常需要使用數(shù)據(jù)分區(qū)方法將相同鍵的數(shù)據(jù)發(fā)送到同一數(shù)據(jù)中心進(jìn)行處理,而各個(gè)數(shù)據(jù)中心之間通常相隔較遠(yuǎn),這樣會(huì)產(chǎn)生大量的網(wǎng)絡(luò)傳輸開(kāi)銷,導(dǎo)致數(shù)據(jù)在網(wǎng)絡(luò)中的傳輸時(shí)間成為大數(shù)據(jù)分析作業(yè)的瓶頸.由于網(wǎng)絡(luò)提供商硬件設(shè)備的不同,各數(shù)據(jù)中心之間的帶寬通常差異較大,這樣便會(huì)形成異構(gòu)帶寬的分布式環(huán)境[9].當(dāng)然,即使在同構(gòu)的集群中,也可能因?yàn)槟承┕?jié)點(diǎn)上的作業(yè)搶占了帶寬而導(dǎo)致集群環(huán)境中各節(jié)點(diǎn)帶寬異構(gòu).綜上所述,在異構(gòu)帶寬環(huán)境下,如何高效地進(jìn)行數(shù)據(jù)分區(qū)是一個(gè)急需解決的問(wèn)題.
數(shù)據(jù)分區(qū)是大數(shù)據(jù)框架的一個(gè)基本功能,通過(guò)數(shù)據(jù)分區(qū)可以將各分區(qū)數(shù)據(jù)交給不同的節(jié)點(diǎn)進(jìn)行處理.常用的數(shù)據(jù)分區(qū)方式有隨機(jī)分區(qū)、Hash分區(qū)和Range分區(qū)[10].其中Hash分區(qū)和Range分區(qū)都能保證具有相同鍵的數(shù)據(jù)分發(fā)到同一節(jié)點(diǎn),這也為許多需要這種保證的算子提供了保障.現(xiàn)有的研究很少在數(shù)據(jù)分區(qū)時(shí)對(duì)節(jié)點(diǎn)的帶寬進(jìn)行考慮,在節(jié)點(diǎn)間異構(gòu)帶寬的情況下,傳統(tǒng)的數(shù)據(jù)分區(qū)方法效率低下,完成數(shù)據(jù)分區(qū)的時(shí)間開(kāi)銷較大.針對(duì)該問(wèn)題,本文提出了一種基于帶寬的數(shù)據(jù)分區(qū)方法,在帶寬異構(gòu)的集群環(huán)境下可以有效減少數(shù)據(jù)分區(qū)完成的時(shí)間.
本文的主要貢獻(xiàn)有3個(gè)方面:
1) 提出了一種基于帶寬的數(shù)據(jù)分區(qū)方法,該方法在異構(gòu)帶寬的集群下能有效減少數(shù)據(jù)分區(qū)所需的時(shí)間;
2) 在新一代大數(shù)據(jù)計(jì)算框架Flink中,對(duì)基于帶寬的數(shù)據(jù)分區(qū)方法進(jìn)行了實(shí)現(xiàn);
3) 通過(guò)實(shí)驗(yàn)對(duì)基于帶寬的數(shù)據(jù)分區(qū)方法進(jìn)行了驗(yàn)證,實(shí)驗(yàn)結(jié)果顯示該方法可以有效地減少完成數(shù)據(jù)分區(qū)所需的時(shí)間.
針對(duì)異構(gòu)集群環(huán)境下的大數(shù)據(jù)框架優(yōu)化的研究已有不少,主要的研究方向是針對(duì)節(jié)點(diǎn)間計(jì)算能力的不同,為各節(jié)點(diǎn)分配不同的數(shù)據(jù)量或者不同的計(jì)算任務(wù).如在異構(gòu)Hadoop集群中,文獻(xiàn)[11]針對(duì)集群中節(jié)點(diǎn)計(jì)算性能不同的特點(diǎn),以數(shù)據(jù)本地性策略為基礎(chǔ),通過(guò)在計(jì)算能力更強(qiáng)的節(jié)點(diǎn)放置更多的數(shù)據(jù)塊,使得計(jì)算能力強(qiáng)的節(jié)點(diǎn)處理更多的數(shù)據(jù),從而提升系統(tǒng)的性能;文獻(xiàn)[12-13]則針對(duì)異構(gòu)的Hadoop集群,考慮提交至集群的作業(yè)運(yùn)行時(shí)需要的資源大小和集群中可用資源的數(shù)量,提出了一種新的調(diào)度系統(tǒng)COSHH,該調(diào)度系統(tǒng)可以結(jié)合Hadoop中原始的調(diào)度策略進(jìn)行使用,進(jìn)一步減少異構(gòu)Hadoop集群中作業(yè)的平均完成時(shí)間,使得MapReduce模型在異構(gòu)集群中的運(yùn)行效率更高.
在異構(gòu)Spark集群中同樣有著相應(yīng)的研究,如文獻(xiàn)[14]提出了一種在異構(gòu)Spark集群下的自適應(yīng)任務(wù)調(diào)度策略,其主要考慮的是集群中各節(jié)點(diǎn)的計(jì)算能力不同,通過(guò)對(duì)各節(jié)點(diǎn)的負(fù)載和資源利用率進(jìn)行監(jiān)測(cè)來(lái)動(dòng)態(tài)地調(diào)整節(jié)點(diǎn)任務(wù)的分配;文獻(xiàn)[15]則采用了一種主動(dòng)式的數(shù)據(jù)放置策略,通過(guò)對(duì)任務(wù)所需的計(jì)算時(shí)間進(jìn)行預(yù)測(cè),在初始數(shù)據(jù)加載過(guò)程中將數(shù)據(jù)放置在適當(dāng)?shù)墓?jié)點(diǎn)上,并在作業(yè)執(zhí)行的過(guò)程中進(jìn)一步對(duì)數(shù)據(jù)的放置進(jìn)行調(diào)整,縮短作業(yè)的整體運(yùn)行時(shí)間.
對(duì)于數(shù)據(jù)在節(jié)點(diǎn)之間的傳輸,主要的研究方向是針對(duì)同構(gòu)集群中的數(shù)據(jù)傾斜問(wèn)題,比如文獻(xiàn)[16]提出了一種用于MapReduce的采樣算法,在Hadoop集群中不需要對(duì)輸入數(shù)據(jù)運(yùn)行額外的預(yù)采樣程序,就能比較精確地估計(jì)出中間結(jié)果的分布,從而均衡各節(jié)點(diǎn)的數(shù)據(jù)量;同樣是針對(duì)MapReduce框架中出現(xiàn)的數(shù)據(jù)傾斜問(wèn)題,文獻(xiàn)[17]基于對(duì)Map端中間結(jié)果的采樣,提出了一種基于動(dòng)態(tài)劃分的負(fù)載均衡方法,可以保證每個(gè)Reduce任務(wù)處理的數(shù)據(jù)量盡量均衡;文獻(xiàn)[18]則針對(duì)Spark提出了一種基于鍵重分配和分區(qū)切分的算法,該算法作用于中間結(jié)果的產(chǎn)生和shuffle過(guò)程中,同樣用于解決數(shù)據(jù)傾斜問(wèn)題;針對(duì)數(shù)據(jù)傳輸過(guò)程的優(yōu)化通常都需要使用采樣算法來(lái)獲取數(shù)據(jù)的信息,文獻(xiàn)[19]針對(duì)大規(guī)模數(shù)據(jù)流,提出了一種改進(jìn)的水塘抽樣方法,F(xiàn)link中使用該抽樣方法實(shí)現(xiàn)了Range分區(qū).
以上研究都沒(méi)有考慮在異構(gòu)帶寬情況下,如何對(duì)數(shù)據(jù)分區(qū)方法進(jìn)行優(yōu)化.對(duì)于數(shù)據(jù)密集型作業(yè),網(wǎng)絡(luò)傳輸往往是瓶頸所在,在異構(gòu)帶寬條件下,傳統(tǒng)考慮負(fù)載均衡的數(shù)據(jù)分區(qū)方法運(yùn)行效率反而低下.針對(duì)該問(wèn)題,本文通過(guò)建立基于傳輸時(shí)間的數(shù)據(jù)分發(fā)模型,提供了一種基于帶寬的數(shù)據(jù)分區(qū)方法,在異構(gòu)帶寬的集群環(huán)境下可以有效地減少數(shù)據(jù)的傳輸時(shí)間.
Flink與大多數(shù)大數(shù)據(jù)框架一樣也可以分為Master和Slave節(jié)點(diǎn),如圖1所示,其中充當(dāng)Master的稱為JobManager,充當(dāng)Slave的稱為TaskManager,除此之外,提交作業(yè)的節(jié)點(diǎn)通常稱為Client.
Fig. 1 The architecture of Flink圖1 Flink架構(gòu)圖
Flink中JobManager將接收Client提交的作業(yè),對(duì)作業(yè)進(jìn)行調(diào)度并選定TaskManager進(jìn)行任務(wù)的執(zhí)行,收集作業(yè)運(yùn)行的狀態(tài),并在作業(yè)運(yùn)行失敗時(shí)進(jìn)行容錯(cuò)和恢復(fù),TaskManager上則真正運(yùn)行著作業(yè)的各個(gè)子任務(wù)[20].通常Flink集群中會(huì)有一個(gè)JobManager和多個(gè)TaskManager.
Flink中作業(yè)會(huì)被抽象為數(shù)據(jù)流圖,通常都是一個(gè)DAG結(jié)構(gòu)[21].具體來(lái)講,作業(yè)在Client端提交后,如果是批處理作業(yè)會(huì)通過(guò)優(yōu)化器生成Optimized-Plan,如果是流處理作業(yè)則會(huì)生成StreamGraph,之后會(huì)繼續(xù)在Client統(tǒng)一轉(zhuǎn)化為JobGraph,提交給JobManager.在JobManager處接收到JobGraph之后,會(huì)將其轉(zhuǎn)化為ExecutionGraph,最后調(diào)度執(zhí)行.
大數(shù)據(jù)計(jì)算框架通常都會(huì)為用戶提供數(shù)據(jù)分區(qū)的功能[22],F(xiàn)link在其批處理API中也提供了3種常用的數(shù)據(jù)分區(qū)方法,包括Rebalance分區(qū)、Hash分區(qū)和Range分區(qū).
2.2.1 Rebalance分區(qū)
Rebalance分區(qū)是Flink中最簡(jiǎn)單的數(shù)據(jù)分區(qū)方法,通過(guò)該分區(qū)方法可以很好地均衡每個(gè)節(jié)點(diǎn)上的數(shù)據(jù),但其無(wú)法保證具有相同鍵的數(shù)據(jù)分發(fā)到同一節(jié)點(diǎn)上.Flink中使用Round-Robin算法實(shí)現(xiàn)了Rebalance分區(qū),具體算法如算法1所示:
算法1.Rebalance分區(qū)算法.
輸入:待分區(qū)的記錄record、分區(qū)數(shù)量numPartitions、當(dāng)前分區(qū)編號(hào)partitionToSendTo;
輸出:待分區(qū)記錄的分區(qū)編號(hào)partitionToSendTo.
①partitionToSendTo++;
② IFpartitionToSendTo≥numPartitions
③partitionToSendTo=0;
④ END IF
⑤ RETURNpartitionToSendTo.
2.2.2 Hash分區(qū)
Hash分區(qū)是使用最普遍的數(shù)據(jù)分區(qū)方法,該分區(qū)方法是基于Hash算法實(shí)現(xiàn)的[23].使用該分區(qū)方法首先會(huì)根據(jù)待分區(qū)記錄的key值得到相應(yīng)的Hash值,之后利用Hash值對(duì)分區(qū)數(shù)量取余,得到的結(jié)果作為該條記錄所屬的分區(qū)編號(hào).因?yàn)橄嗤膋ey值一定有相同的Hash值,因此Hash分區(qū)可以保證鍵相同的記錄分發(fā)到同一節(jié)點(diǎn)上.具體算法如算法2所示:
算法2.Hash分區(qū)算法.
輸入:待分區(qū)的記錄record、分區(qū)數(shù)量numPartitions;
輸出:待分區(qū)記錄的分區(qū)編號(hào)partitionToSendTo.
①key=extractKey(record); /*提取記錄的key*/
② IFkey==null
③partitionToSendTo=0;
④ ELSE
⑤hash=key.hashCode;
⑥partitionToSendTo=Hash%numPartitions;
⑦ END IF
⑧ RETURNpartitionToSendTo.
2.2.3 Range分區(qū)
Range分區(qū)是一種根據(jù)所有待分區(qū)記錄的鍵的范圍進(jìn)行數(shù)據(jù)分區(qū)的方法[24],也就是說(shuō)每個(gè)分區(qū)結(jié)果都包含互不相交的鍵在一定范圍內(nèi)的記錄,也因此使用Range分區(qū)方法時(shí)需要確定每個(gè)分區(qū)的邊界.為了確定每個(gè)分區(qū)的邊界,通常使用的方式是對(duì)輸入數(shù)據(jù)進(jìn)行抽樣.Flink中使用的抽樣算法是改進(jìn)后的蓄水池抽樣算法,各分區(qū)邊界的確定則是對(duì)抽樣數(shù)據(jù)進(jìn)行排序后按等比例獲取各個(gè)分區(qū)的邊界.
舉例來(lái)說(shuō),假設(shè)抽樣得到的數(shù)據(jù)按鍵排序后的結(jié)果為{(10,value1),(20,value2),(30,value3),(40,value4),(50,value5),(60,value6),(70,value7),(80,value8),(90,value9)},分區(qū)數(shù)量為3,則計(jì)算得到的邊界為{30,60},也就是說(shuō)key≤30的記錄將會(huì)被發(fā)往第1個(gè)分區(qū),key∈(30,60]之間的數(shù)據(jù)會(huì)發(fā)往第2個(gè)分區(qū),key>60的數(shù)據(jù)則會(huì)發(fā)往第3個(gè)分區(qū).Range分區(qū)方法通過(guò)抽樣并等比例劃分各個(gè)分區(qū)的邊界,可以在保證鍵相同的記錄發(fā)往同一節(jié)點(diǎn)的同時(shí),使得各分區(qū)擁有的數(shù)據(jù)大致相等.具體算法如算法3所示:
算法3.Range分區(qū)算法.
輸入:輸入源的分區(qū)數(shù)量numInputPartitions、待分區(qū)的記錄record、分區(qū)數(shù)量numPartitions;
輸出:待分區(qū)記錄的分區(qū)編號(hào)partitionToSendTo.
① 使用改進(jìn)后的蓄水池抽樣算法在每個(gè)輸入源分區(qū)上進(jìn)行抽樣;
② 將各輸入源分區(qū)上的抽樣結(jié)果進(jìn)行匯總,得到sampleData[],并排序;
③ 根據(jù)分區(qū)數(shù)量numPartitions和sampleData[],計(jì)算出分區(qū)邊界rangeBoundary[];
④ 對(duì)于每條待分區(qū)的記錄record,在分區(qū)邊界rangeBoundary[]中查找出所屬的分區(qū)編號(hào)partitionToSendTo;
⑤ RETURNpartitionToSendTo.
本節(jié)我們先對(duì)最優(yōu)數(shù)據(jù)分發(fā)比例的計(jì)算建立模型.之后舉例說(shuō)明異構(gòu)帶寬的集群中不同的數(shù)據(jù)分發(fā)比例對(duì)數(shù)據(jù)分區(qū)完成時(shí)間的影響,體現(xiàn)基于帶寬的數(shù)據(jù)分區(qū)方法的重要性.最后介紹針對(duì)異構(gòu)帶寬的數(shù)據(jù)分區(qū)方法的算法流程以及在Flink中的實(shí)現(xiàn).
本節(jié)對(duì)異構(gòu)帶寬環(huán)境下各節(jié)點(diǎn)最優(yōu)數(shù)據(jù)分發(fā)比例的計(jì)算建立模型,首先對(duì)所要用到的變量進(jìn)行定義.
Di:節(jié)點(diǎn)i上的初始數(shù)據(jù)量大小;
ui:節(jié)點(diǎn)i的上行帶寬;
di:節(jié)點(diǎn)i的下行帶寬;
cost:數(shù)據(jù)分發(fā)所要花費(fèi)的總時(shí)間.
(1)
(2)
(3)
(4)
數(shù)據(jù)分發(fā)所要花費(fèi)的總時(shí)間cost則是各節(jié)點(diǎn)傳輸數(shù)據(jù)所需時(shí)間的最大值.我們的目標(biāo)是最小化數(shù)據(jù)分發(fā)所需的時(shí)間,則形式化地針對(duì)數(shù)據(jù)傳輸時(shí)間的優(yōu)化模型表示為
mincost
s.t. ?i:xi≥0,
該模型是一個(gè)典型的線性規(guī)劃問(wèn)題,使用計(jì)算機(jī)可以比較方便地求解.
考慮集群中參與數(shù)據(jù)分區(qū)的2個(gè)節(jié)點(diǎn)Slave1和Slave2,它們初始的節(jié)點(diǎn)信息如表1所示:
Table 1 Information of Nodes表1 節(jié)點(diǎn)信息表
其中Slave1節(jié)點(diǎn)上的初始數(shù)據(jù)量D1=320 MB,上行帶寬u1=2 Mbps,下行帶寬d1=10 Mbps.Slave2節(jié)點(diǎn)上的初始數(shù)據(jù)量D2=160 MB,上行帶寬u2=10 Mbps,下行帶寬d2=10 Mbps.
當(dāng)Slave1和Slave2以50%和50%的比例進(jìn)行數(shù)據(jù)分發(fā)時(shí),可以分別計(jì)算出Slave1和Slave2傳輸?shù)臄?shù)據(jù)量大小和所需時(shí)間,具體如表2所示:
Table 2 Transmission Information on Proportional 50%∶50%表2 50%∶50%比例分配數(shù)據(jù)傳輸信息表
其中Slave1需要傳出50%的數(shù)據(jù),即160 MB,接收來(lái)自Slave2的80 MB數(shù)據(jù).Slave2則需傳出80 MB數(shù)據(jù),接收來(lái)自Slave1的160 MB數(shù)據(jù).根據(jù)Slave1和Slave2的上下行帶寬可以計(jì)算得出相應(yīng)的傳輸時(shí)間,而最終數(shù)據(jù)分區(qū)完成需要取決于傳輸最慢的節(jié)點(diǎn),也就是說(shuō)以50%和50%的比例進(jìn)行數(shù)據(jù)分區(qū),最終需要花費(fèi)640 s來(lái)完成.
考慮以90%和10%的比例進(jìn)行數(shù)據(jù)分發(fā),也就是說(shuō)數(shù)據(jù)分發(fā)結(jié)束后Slave1保留90%的數(shù)據(jù),Slave2保留10%的數(shù)據(jù).同樣可以計(jì)算出各節(jié)點(diǎn)所需傳輸數(shù)據(jù)量大小和相應(yīng)的時(shí)間,如表3所示:
Table 3 Transmission Information on Proportional 90%∶10%表3 90%∶10%比例分配數(shù)據(jù)傳輸信息表
其中Slave1需要傳出10%的數(shù)據(jù),即32 MB,接收來(lái)自Slave2的144 MB的數(shù)據(jù).Slave2則需傳出144 MB數(shù)據(jù),接收來(lái)自Slave1的32 MB數(shù)據(jù).同理計(jì)算出傳輸時(shí)間后,可以得到最終傳輸結(jié)束所需時(shí)間為128 s,與分配比例為50%時(shí)相比速度提高了4倍.
以建立的最優(yōu)數(shù)據(jù)分發(fā)比例模型為基礎(chǔ),可以設(shè)計(jì)出基于帶寬的數(shù)據(jù)分區(qū)算法,如算法4所示:
算法4.基于帶寬的數(shù)據(jù)分區(qū)算法.
輸入:輸入源的分區(qū)數(shù)量numInputPartitions、待分區(qū)的記錄record、分區(qū)數(shù)量numPartitions、參與分區(qū)的節(jié)點(diǎn)信息instanceInfo;
輸出:待分區(qū)記錄的分區(qū)編號(hào)partitionToSendTo.
① 使用改進(jìn)后的蓄水池抽樣算法在每個(gè)輸入源分區(qū)上進(jìn)行抽樣;
② 將各輸入源分區(qū)上的抽樣結(jié)果進(jìn)行匯總,得到sampleData[],并排序;
③ 根據(jù)參與分區(qū)的節(jié)點(diǎn)信息instanceInfo計(jì)算出最優(yōu)數(shù)據(jù)分發(fā)比例ratio[];
④ 根據(jù)最優(yōu)數(shù)據(jù)分發(fā)比例ratio[]和得到的抽樣結(jié)果sampleData[],計(jì)算出分區(qū)邊界rangeBoundary[];
⑤ 對(duì)于每條待分區(qū)的記錄record,在分區(qū)邊界rangeBoundary[]中查找出所屬的分區(qū)編號(hào)partitionToSendTo;
⑥ RETURNpartitionToSendTo.
鑒于新一代大數(shù)據(jù)計(jì)算框架Flink的出色性能,選用Flink對(duì)基于帶寬的數(shù)據(jù)分區(qū)算法進(jìn)行了實(shí)現(xiàn).
實(shí)現(xiàn)基于帶寬的數(shù)據(jù)分區(qū)方法,需要完成最優(yōu)數(shù)據(jù)分發(fā)比例的計(jì)算和作業(yè)圖邏輯的修改.3.4節(jié)已經(jīng)提到過(guò),計(jì)算節(jié)點(diǎn)的最優(yōu)數(shù)據(jù)分發(fā)比例需要節(jié)點(diǎn)的帶寬信息和數(shù)據(jù)量.原始的Flink無(wú)法獲取集群中各節(jié)點(diǎn)的帶寬信息,考慮實(shí)現(xiàn)簡(jiǎn)便性,我們?cè)贔link的配置文件中添加了各節(jié)點(diǎn)上下行帶寬的配置項(xiàng),在Flink集群?jiǎn)?dòng)時(shí),各TaskManager會(huì)將自身的帶寬信息匯總到JobManager處.各節(jié)點(diǎn)的數(shù)據(jù)量則根據(jù)作業(yè)JobGraph中的Source算子,獲取相應(yīng)的數(shù)據(jù)源分布情況后推算得出.作業(yè)圖邏輯的修改包括采樣算法的加入和分區(qū)方法的重寫,這部分將在生成OptimizedPlan時(shí)完成,這樣可以減少JobManager處的負(fù)載.
圖2對(duì)一個(gè)基于帶寬的數(shù)據(jù)分區(qū)作業(yè)的整體流程進(jìn)行了詳細(xì)描述.如圖2中Step1~3所示,作業(yè)在Client端提交后,首先會(huì)通過(guò)優(yōu)化器優(yōu)化生成OptimizedPlan,之后將以生成的OptimizedPlan為基礎(chǔ),生成作業(yè)圖JobGraph.在作業(yè)圖生成的過(guò)程中我們添加了采樣的邏輯和用于計(jì)算分區(qū)邊界的算子,并重寫了數(shù)據(jù)分區(qū)的方法.其中計(jì)算分區(qū)邊界的算子會(huì)根據(jù)最優(yōu)數(shù)據(jù)分發(fā)比例得到數(shù)據(jù)分區(qū)的界,該結(jié)果將會(huì)通過(guò)廣播的方式發(fā)送到每個(gè)分區(qū)算子.需要注意的是,此時(shí)還在Client端,計(jì)算分區(qū)邊界的算子還沒(méi)有實(shí)際獲取到最優(yōu)數(shù)據(jù)分發(fā)比例,最優(yōu)數(shù)據(jù)分發(fā)比例的獲取需要在JobManager處完成.完成作業(yè)邏輯的修改后, 通過(guò)Step3生成的JobGraph將被提交到JobManager.
如圖2中Step4~7所示,JobManager收到作業(yè)的JobGraph后,首先會(huì)遍歷JobGraph中的算子并找到Source算子,通過(guò)Source算子中存儲(chǔ)的數(shù)據(jù)源信息去獲取待處理數(shù)據(jù)在集群中的分布情況,并結(jié)合作業(yè)的并行度選擇運(yùn)行該作業(yè)的節(jié)點(diǎn).考慮網(wǎng)絡(luò)傳輸是作業(yè)運(yùn)行的瓶頸,節(jié)點(diǎn)的選擇策略是盡可能選擇擁有數(shù)據(jù)的節(jié)點(diǎn)來(lái)運(yùn)行作業(yè),這樣根據(jù)數(shù)據(jù)本地性策略,可以減少Source算子讀取數(shù)據(jù)源時(shí)的網(wǎng)絡(luò)傳輸.確定作業(yè)運(yùn)行的節(jié)點(diǎn)后,通過(guò)節(jié)點(diǎn)的帶寬信息和初始數(shù)據(jù)量大小,使用數(shù)據(jù)分發(fā)比例計(jì)算模塊就可以計(jì)算出各節(jié)點(diǎn)的最優(yōu)數(shù)據(jù)比例,該比例將會(huì)被寫回JobGraph中用于計(jì)算分界的算子.至此,包含最終作業(yè)執(zhí)行邏輯的作業(yè)圖JobGraph才真正構(gòu)建完成.最后根據(jù)JobGraph中各算子的并行度,會(huì)生成對(duì)應(yīng)的執(zhí)行圖ExecutionGraph,執(zhí)行圖中的每個(gè)任務(wù)通過(guò)Step7將部署至對(duì)應(yīng)的節(jié)點(diǎn),進(jìn)行調(diào)度執(zhí)行.
Fig. 2 The process of bandwidth partitioning job圖2 基于帶寬的數(shù)據(jù)分區(qū)方法作業(yè)運(yùn)行過(guò)程
實(shí)驗(yàn)所用環(huán)境為4個(gè)節(jié)點(diǎn)的分布式集群,每個(gè)節(jié)點(diǎn)的處理器為Intel Xeon E5-2603 V4(6核6線程),內(nèi)存為64 GB,節(jié)點(diǎn)間通過(guò)千兆以太網(wǎng)連接,安裝的操作系統(tǒng)為CentOS7.集群上通過(guò)Standalone模式搭建了修改后的Flink集群,其中1臺(tái)master節(jié)點(diǎn)作為JobManager,另外3臺(tái)Slave節(jié)點(diǎn)作為TaskManager,使用的版本為Flink1.7.2.除此之外集群中還基于Hadoop2.7.5搭建了Hadoop集群,使用其中的HDFS作為分布式文件存儲(chǔ)系統(tǒng).集群中各節(jié)點(diǎn)帶寬的控制則通過(guò)工具Wondershaper[25]來(lái)實(shí)現(xiàn).
實(shí)驗(yàn)使用的數(shù)據(jù)是通過(guò)TPC-H[26]基準(zhǔn)測(cè)試工具生成的數(shù)據(jù)集,該工具可以生成8種表,選取了其中較大的Lineitem表和Orders表作為數(shù)據(jù)源.其中Lineitem有16個(gè)字段,前3個(gè)字段Orderkey,Partkey,Suppkey,其中Suppkey是主鍵.Orders表有9個(gè)字段,前2個(gè)字段Orderkey和Custkey,其中Custkey是主鍵.
本節(jié)從算法開(kāi)銷和算法效果2方面進(jìn)行實(shí)驗(yàn)結(jié)果的說(shuō)明與分析.
4.3.1 算法開(kāi)銷
基于帶寬的數(shù)據(jù)分區(qū)方法的算法開(kāi)銷主要包括作業(yè)圖邏輯修改、最優(yōu)數(shù)據(jù)分發(fā)比例的計(jì)算、數(shù)據(jù)采樣3部分,本文針對(duì)這3部分所需的開(kāi)銷分別進(jìn)行了實(shí)驗(yàn).
作業(yè)圖邏輯的修改發(fā)生在Flink作業(yè)圖生成的過(guò)程中,主要包括采樣算子的添加、計(jì)算分界算子的添加以及分區(qū)方法的重寫等步驟.通過(guò)與未修改作業(yè)圖邏輯進(jìn)行對(duì)比,可以得到作業(yè)圖邏輯修改所需的時(shí)間.經(jīng)過(guò)5次實(shí)驗(yàn)并取平均值,得到從作業(yè)提交到作業(yè)圖生成完畢所需的平均時(shí)間為185 ms,如果進(jìn)行作業(yè)圖邏輯的修改,所需的平均時(shí)間則為232 ms,即作業(yè)圖邏輯修改平均所需時(shí)間為47 ms.
最優(yōu)數(shù)據(jù)分發(fā)比例的計(jì)算是利用數(shù)學(xué)規(guī)劃優(yōu)化器Gurobi Optimizer[27]實(shí)現(xiàn)的數(shù)據(jù)分發(fā)比例計(jì)算模塊完成的,實(shí)驗(yàn)對(duì)不同節(jié)點(diǎn)數(shù)量下最優(yōu)比例計(jì)算所需的時(shí)間進(jìn)行了測(cè)試,每個(gè)節(jié)點(diǎn)的帶寬和數(shù)據(jù)量大小則隨機(jī)生成.實(shí)驗(yàn)結(jié)果如圖3所示,當(dāng)節(jié)點(diǎn)數(shù)量為5時(shí)所需的計(jì)算時(shí)間為32 ms,節(jié)點(diǎn)數(shù)量擴(kuò)大至1 000時(shí)計(jì)算時(shí)間仍在100 ms以內(nèi),當(dāng)節(jié)點(diǎn)數(shù)量達(dá)到6 000時(shí)所需的計(jì)算時(shí)間也僅為293 ms.
Fig. 3 The time of optimal ratio calculation圖3 最優(yōu)比例計(jì)算時(shí)間
為了測(cè)試數(shù)據(jù)采樣所需的開(kāi)銷,我們分別運(yùn)行了添加了采樣過(guò)程的作業(yè)和未添加采樣過(guò)程的作業(yè),使用作業(yè)運(yùn)行的時(shí)間差作為采樣所需的開(kāi)銷.實(shí)驗(yàn)中使用了不同數(shù)據(jù)量大小的Lineitem表作為輸入,具體實(shí)驗(yàn)結(jié)果如圖4所示,在數(shù)據(jù)量大小分別為3.6 GB,7.2 GB,14.6 GB,29.4 GB時(shí),所需的采樣時(shí)間分別為21 s,44 s,86 s,167 s,平均每GB數(shù)據(jù)所需的采樣時(shí)間約為5.8 s.
Fig. 4 The time of sampling圖4 采樣時(shí)間
總體來(lái)說(shuō),基于帶寬的數(shù)據(jù)分區(qū)方法在作業(yè)圖邏輯修改和最優(yōu)數(shù)據(jù)分發(fā)比例的計(jì)算過(guò)程中所需的時(shí)間開(kāi)銷都較小,為毫秒級(jí)別.數(shù)據(jù)采樣則相對(duì)時(shí)間開(kāi)銷較大,且與數(shù)據(jù)量大小相關(guān),但在計(jì)算資源更為充足的情況下,采樣所需時(shí)間可以進(jìn)一步減少.
4.3.2 算法效果
為了探究本文提出的算法在不同的異構(gòu)帶寬條件下的效果,我們?cè)O(shè)置了帶寬異構(gòu)程度不同的4個(gè)實(shí)驗(yàn).同時(shí)為了實(shí)驗(yàn)的方便,主要針對(duì)Slave1節(jié)點(diǎn)設(shè)置了不同的下行帶寬,這樣已經(jīng)可以涵蓋不同的數(shù)據(jù)分發(fā)比例,其他情形的異構(gòu)帶寬集群則與此類似.實(shí)驗(yàn)中各節(jié)點(diǎn)的具體帶寬如表4所示,表4中上行帶寬在前、下行帶寬在后.
Table 4 Bandwidth Information表4 帶寬信息表 Mbps
實(shí)驗(yàn)中使用的數(shù)據(jù)為3.6 GB的Lineitem表和1.63 GB的Orders表.實(shí)驗(yàn)程序會(huì)先對(duì)數(shù)據(jù)集進(jìn)行分區(qū)操作,數(shù)據(jù)分區(qū)結(jié)束之后將在每個(gè)分區(qū)中進(jìn)行一次聚合,統(tǒng)計(jì)各分區(qū)最終的記錄數(shù)量,以便計(jì)算出最終各分區(qū)數(shù)據(jù)的比例.程序的執(zhí)行模式設(shè)置為Batch模式,分區(qū)方法使用Hash分區(qū)、Range分區(qū)與基于帶寬的Bandwidth分區(qū)進(jìn)行比較,驗(yàn)證基于帶寬的Bandwidth分區(qū)效果.
因?yàn)閷?shí)驗(yàn)主要針對(duì)的是節(jié)點(diǎn)帶寬的影響,而實(shí)驗(yàn)所使用的集群TaskManager的數(shù)量是3,因此作業(yè)運(yùn)行的并行度同樣設(shè)置為3.使用的數(shù)據(jù)源則被上傳到HDFS中,3個(gè)節(jié)點(diǎn)上的數(shù)據(jù)量幾乎是相等的,因此可以認(rèn)為Source算子的每個(gè)并行度讀入的數(shù)據(jù)量都是相同的.對(duì)數(shù)據(jù)集進(jìn)行介紹時(shí)有過(guò)說(shuō)明,Lineitem表中有3個(gè)字段是主鍵,Orders表中有2個(gè)字段是主鍵,在實(shí)驗(yàn)過(guò)程中我們發(fā)現(xiàn)這2個(gè)表中并沒(méi)有明顯的數(shù)據(jù)傾斜,通過(guò)主鍵中的任一字段做數(shù)據(jù)分區(qū),實(shí)驗(yàn)結(jié)果都是相似的.后續(xù)的實(shí)驗(yàn)結(jié)果都是以各個(gè)表的第1個(gè)字段作為鍵來(lái)進(jìn)行數(shù)據(jù)分區(qū),也就是說(shuō)數(shù)據(jù)源Lineitem和Orders都使用Orderkey作為鍵進(jìn)行數(shù)據(jù)分區(qū).
如圖5所示,在實(shí)驗(yàn)1條件下,使用Bandwidth分區(qū)的作業(yè)時(shí)間在Lineitem上所需時(shí)間為198 s,在Orders上所需時(shí)間為92 s,明顯小于Hash分區(qū)和Range分區(qū)所需時(shí)間,作業(yè)運(yùn)行完成整體速度提升了為2.5~3倍.
Fig. 5 Running time in different partition modes in experiment 1圖5 實(shí)驗(yàn)1中不同分區(qū)模式下的執(zhí)行時(shí)間
在實(shí)驗(yàn)1的條件下,可以計(jì)算出3個(gè)節(jié)點(diǎn)數(shù)據(jù)的最優(yōu)分配比例為4∶48∶48,通過(guò)表5和表6可以看出,使用Bandwidth分區(qū)很好地契合了最優(yōu)數(shù)據(jù)分配比例,特別是在Lineitem上實(shí)際數(shù)據(jù)分區(qū)比例與最優(yōu)比例幾乎完全相同.
Table 5 Proportion of Lineitem After Partition in Experiment 1表5 實(shí)驗(yàn)1中Lineitem分區(qū)后各節(jié)點(diǎn)數(shù)據(jù)比例 %
Table 6 Proportion of Orders After Partition in Experiment 1表6 實(shí)驗(yàn)1中Orders分區(qū)后各節(jié)點(diǎn)數(shù)據(jù)比例 %
如圖6所示,在實(shí)驗(yàn)2條件下,使用Bandwidth分區(qū)的作業(yè)時(shí)間在Lineitem上所需時(shí)間為209 s,在Orders上所需時(shí)間為99 s,相較于Hash分區(qū)和Range分區(qū),效果同樣不錯(cuò),提升為0.6~0.7倍.
Fig. 6 Running time in different partition modes in experiment 2圖6 實(shí)驗(yàn)2中不同分區(qū)模式下的執(zhí)行時(shí)間
在實(shí)驗(yàn)2的條件下,計(jì)算出的各節(jié)點(diǎn)數(shù)據(jù)的最優(yōu)分配比例為11∶44∶44,通過(guò)表7和表8可以看出,使用Bandwidth分區(qū)后的數(shù)據(jù)分布也與最優(yōu)數(shù)據(jù)分配比例比較契合.
如圖7所示,在實(shí)驗(yàn)3條件下,使用Bandwidth分區(qū)的作業(yè)時(shí)間在Lineitem上所需時(shí)間為184 s,在Orders上Bandwidth分區(qū)所需時(shí)間為68 s,相較Hash分區(qū)所需時(shí)間194 s和88 s已經(jīng)沒(méi)有太大的優(yōu)勢(shì),但仍然能節(jié)省一些時(shí)間.
Table 7 Proportion of Lineitem After Partition in Experiment 2表7 實(shí)驗(yàn)2中Lineitem分區(qū)后各節(jié)點(diǎn)數(shù)據(jù)比例 %
Table 8 Proportion of Orders After Partition in Experiment 2表8 實(shí)驗(yàn)2中Orders分區(qū)后各節(jié)點(diǎn)數(shù)據(jù)比例 %
Fig. 7 Running time in different partition modes in experiment 3圖7 實(shí)驗(yàn)3中不同分區(qū)模式下的執(zhí)行時(shí)間
此時(shí)節(jié)點(diǎn)之間數(shù)據(jù)傳輸?shù)淖顑?yōu)比例已經(jīng)是20∶40∶40,與平均分配差距已經(jīng)沒(méi)有那么大,同時(shí)還可以發(fā)現(xiàn),此次實(shí)驗(yàn)條件下Range分區(qū)表現(xiàn)較差,分區(qū)完成所用時(shí)間與其他實(shí)驗(yàn)相比明顯變長(zhǎng).分析表9和表10可以發(fā)現(xiàn),Range分區(qū)在實(shí)驗(yàn)4中,對(duì)需要數(shù)據(jù)比例少的Slave1節(jié)點(diǎn)反而分配了更多的數(shù)據(jù),導(dǎo)致Range分區(qū)所需時(shí)間遠(yuǎn)超了其他2種分區(qū)方法.
如圖8所示,在實(shí)驗(yàn)4條件下,與Hash分區(qū)相比,Bandwidth分區(qū)所需時(shí)間反而變得更長(zhǎng).此時(shí)節(jié)點(diǎn)間的最優(yōu)比例是27∶36∶36,與平均分配比例已經(jīng)十分接近,而Range分區(qū)和Bandwidth分區(qū)需要額外的采樣時(shí)間.除此之外,結(jié)合表11和表12可以發(fā)現(xiàn),采樣得到的結(jié)果并不是特別準(zhǔn)確,導(dǎo)致并不能完全按計(jì)算得到的最優(yōu)比例進(jìn)行數(shù)據(jù)分發(fā).
Table 9 Proportion of Lineitem After Partition in Experiment 3表9 實(shí)驗(yàn)3中Lineitem分區(qū)后各節(jié)點(diǎn)數(shù)據(jù)比例 %
Table 10 Proportion of Orders After Partition in Experiment 3表10 實(shí)驗(yàn)3中Orders分區(qū)后各節(jié)點(diǎn)數(shù)據(jù)比例 %
Fig. 8 Running time in different partition modes in experiment 4圖8 實(shí)驗(yàn)4中不同分區(qū)模式下的執(zhí)行時(shí)間
Table 11 Proportion of Lineitem After Partition in Experiment 4表11 實(shí)驗(yàn)4中Lineitem分區(qū)后各節(jié)點(diǎn)數(shù)據(jù)比例 %
結(jié)合4個(gè)實(shí)驗(yàn),可以發(fā)現(xiàn)Hash分區(qū)十分穩(wěn)定,每次實(shí)驗(yàn)分區(qū)結(jié)果都十分均衡,說(shuō)明數(shù)據(jù)源中并沒(méi)有明顯的數(shù)據(jù)傾斜.Range分區(qū)和Bandwidth分區(qū)則并不能每次都保證數(shù)據(jù)按預(yù)設(shè)的比例分配,主要是因?yàn)樗鼈兌夹枰褂貌蓸铀惴▉?lái)估計(jì)數(shù)據(jù)分布,有時(shí)候采樣的結(jié)果并不是十分精確.同樣由于采樣算法的存在,Range分區(qū)和Bandwidth分區(qū)都需要額外的開(kāi)銷,這也導(dǎo)致大多數(shù)時(shí)候Range分區(qū)都比Hash分區(qū)花費(fèi)更多的時(shí)間.唯一例外的是在實(shí)驗(yàn)1中對(duì)Orders表進(jìn)行分區(qū),原因是Range分區(qū)恰好給瓶頸節(jié)點(diǎn)Slave1分配了更小的比例,而Slave1下行帶寬很小,較小的數(shù)據(jù)量就會(huì)對(duì)傳輸時(shí)間產(chǎn)生較大的影響.
Table 12 Proportion of Orders After Partition in Experiment 4表12 實(shí)驗(yàn)4中Orders分區(qū)后各節(jié)點(diǎn)數(shù)據(jù)比例 %
綜合來(lái)看,當(dāng)帶寬異構(gòu)性強(qiáng),各節(jié)點(diǎn)之間最優(yōu)數(shù)據(jù)分發(fā)比例比較不均衡時(shí),基于帶寬的數(shù)據(jù)分區(qū)方法可以取得較好的效果,甚至帶來(lái)數(shù)倍的速度提升.當(dāng)帶寬異構(gòu)性較弱時(shí),由于采樣算法需要額外的開(kāi)銷,基于帶寬的數(shù)據(jù)分區(qū)方法所需時(shí)間可能會(huì)長(zhǎng)于Hash分區(qū)方法,這種情況下可以通過(guò)更充足的計(jì)算資源來(lái)降低采樣過(guò)程所需的開(kāi)銷.在實(shí)際應(yīng)用過(guò)程中,則可以綜合考慮最優(yōu)比例的計(jì)算結(jié)果和采樣所需的時(shí)間,在速度提升較為明顯時(shí)選擇使用基于帶寬的數(shù)據(jù)分區(qū)方法.
在異構(gòu)帶寬的條件下,傳統(tǒng)的數(shù)據(jù)分區(qū)方法會(huì)因?yàn)槠款i節(jié)點(diǎn)的存在,導(dǎo)致數(shù)據(jù)分發(fā)效率低下.通過(guò)對(duì)各節(jié)點(diǎn)之間數(shù)據(jù)傳輸模型進(jìn)行分析,本文提出了一種針對(duì)異構(gòu)帶寬集群的數(shù)據(jù)分區(qū)方法,并在Flink中進(jìn)行了實(shí)現(xiàn).實(shí)驗(yàn)證明:在節(jié)點(diǎn)間帶寬異構(gòu)的情況下,基于帶寬的數(shù)據(jù)分區(qū)方法可以極大地提升數(shù)據(jù)分區(qū)完成的速度.