• 
    

    
    

      99热精品在线国产_美女午夜性视频免费_国产精品国产高清国产av_av欧美777_自拍偷自拍亚洲精品老妇_亚洲熟女精品中文字幕_www日本黄色视频网_国产精品野战在线观看 ?

      支持非等值連接的分布式數(shù)據(jù)流處理系統(tǒng)

      2017-09-22 09:28:08陳明珠王曉桐房俊華張蓉
      關(guān)鍵詞:元組處理單元數(shù)據(jù)流

      陳明珠,王曉桐,房俊華,張蓉

      (華東師范大學(xué)計算機科學(xué)與軟件工程學(xué)院上海高可信計算重點實驗室,上海200062)

      支持非等值連接的分布式數(shù)據(jù)流處理系統(tǒng)

      陳明珠,王曉桐,房俊華,張蓉

      (華東師范大學(xué)計算機科學(xué)與軟件工程學(xué)院上海高可信計算重點實驗室,上海200062)

      實時處理的分布式數(shù)據(jù)流系統(tǒng)在當(dāng)今大數(shù)據(jù)時代扮演著越來越重要的角色.其中,連接查詢是大數(shù)據(jù)分析處理中最為重要且開銷較大的操作之一.然而,由于現(xiàn)實應(yīng)用產(chǎn)生的數(shù)據(jù)普遍存在傾斜分布現(xiàn)象,加之?dāng)?shù)據(jù)流本身的無界性與不可預(yù)知性,給在分布式數(shù)據(jù)流系統(tǒng)上進行連接查詢處理提出了嚴(yán)峻的挑戰(zhàn).目前工業(yè)界較為主流的數(shù)據(jù)流系統(tǒng)處理連接查詢的通用性較低,沒有提供專門針對連接操作的接口;學(xué)術(shù)界推出的數(shù)據(jù)流連接查詢原型系統(tǒng)雖然提供了接口,但大多面向等值連接,或僅能支持部分theta連接,且存在資源開銷大、負載均衡性能低等問題.本文對比分析三種典型數(shù)據(jù)流系統(tǒng),將基于Join-Matrix的連接處理技術(shù)與Storm系統(tǒng)相結(jié)合,設(shè)計并實現(xiàn)了通用的、可支持任意連接查詢的數(shù)據(jù)流處理系統(tǒng).實驗展示了本文設(shè)計的系統(tǒng)具有更加良好的吞吐量與資源優(yōu)化表現(xiàn).

      數(shù)據(jù)流處理系統(tǒng);連接處理;分布式計算

      0 引言

      隨著通信技術(shù)和硬件設(shè)備的不斷發(fā)展,尤其是無線傳感設(shè)備的廣泛應(yīng)用,數(shù)據(jù)采集越來越便捷,數(shù)據(jù)生成速度越來越快,這也決定了數(shù)據(jù)流入系統(tǒng)的速度增加.然而,由于內(nèi)存資源有限,系統(tǒng)不可能對所有的數(shù)據(jù)進行存儲,同時,流入的數(shù)據(jù)往往在流量與分布上存在動態(tài)性,這就決定了數(shù)據(jù)一旦流入系統(tǒng)便需要進行自適應(yīng)的實時處理.21世紀(jì)之前,由于硬件條件的限制,人們對數(shù)據(jù)的實時處理往往局限于對數(shù)據(jù)流進行采樣,以獲取概要信息.近年來,隨著分布式并行架構(gòu)的相關(guān)技術(shù)日益成熟及在線實時分析連續(xù)數(shù)據(jù)流的需求日益增多,分布式數(shù)據(jù)流處理系統(tǒng)得到了快速發(fā)展,這使得其能夠應(yīng)對諸如金融交易管理、網(wǎng)絡(luò)監(jiān)控管理與實時推薦等應(yīng)用成為可能.隨著開放式處理平臺的發(fā)展,工業(yè)界與學(xué)術(shù)界相繼出現(xiàn)了大量的分布式數(shù)據(jù)流處理系統(tǒng),典型的有Twitter的Storm[1],Yahoo!的S4[2]和伯克利大學(xué)的Spark Streaming[3].

      在實時處理中,連接是最為重要的且開銷較大的操作之一,其基礎(chǔ)的要求是保證來自不同數(shù)據(jù)源的元組按照連接謂詞聚集進行運算.在傳統(tǒng)數(shù)據(jù)庫領(lǐng)域中,連接技術(shù)的研究與發(fā)展頗為成熟.然而,在數(shù)據(jù)流環(huán)境下,日益增長的數(shù)據(jù)規(guī)模,有限的內(nèi)存資源,流式數(shù)據(jù)的實時動態(tài)性以及數(shù)據(jù)的傾斜分布等給數(shù)據(jù)流系統(tǒng)的連接操作帶來了嚴(yán)峻的挑戰(zhàn).以當(dāng)下最為流行的共享單車為例,應(yīng)用需要處理用戶查找當(dāng)前距離最近的單車停放點的頻繁請求.在這個語義中,單車與用戶為兩條位置實時變化的數(shù)據(jù)流,系統(tǒng)需要根據(jù)位置與時間對兩條數(shù)據(jù)流進行數(shù)據(jù)項匹配,輸出連接結(jié)果,即向用戶推送符合要求的單車位置.隨著應(yīng)用的推廣,持續(xù)增長的用戶與單車數(shù)量規(guī)模,經(jīng)緯度定位數(shù)據(jù)的實時變化,以及上下班高峰期間用戶人數(shù)的急劇增長等問題,無疑均會成為連接處理性能的限制因素.針對數(shù)據(jù)流的特性,研究人員設(shè)計并實現(xiàn)了諸多集中式數(shù)據(jù)流連接處理[4-8].近年來,隨著大數(shù)據(jù)時代的到來以及分布式計算框架的普及,研究人員針對分布式數(shù)據(jù)流連接處理做了大量的工作[9-11].

      盡管目前工業(yè)界推出的主流分布式數(shù)據(jù)流處理系統(tǒng)能夠以高效的性能進行實時數(shù)據(jù)處理,但是大都沒有提供用戶有關(guān)連接處理的編程接口,均需用戶根據(jù)具體的連接謂詞構(gòu)建不同的連接算子,不具備通用性與易用性.以Storm系統(tǒng)為例,需要根據(jù)具體的連接謂詞選擇適當(dāng)?shù)臄?shù)據(jù)分組策略,將數(shù)據(jù)分發(fā)到對應(yīng)的節(jié)點上進行連接處理.近年來,學(xué)術(shù)界也相繼開發(fā)出改進的數(shù)據(jù)流原型系統(tǒng).Squall[12]采用基于矩陣的連接模型,將連接語句翻譯成由執(zhí)行算子構(gòu)成的有向無環(huán)圖的查詢計劃,可支持任意謂詞的連接操作,并提供了連接處理的編程接口,但連接矩陣的結(jié)構(gòu)限制了系統(tǒng)擴展的靈活性,且?guī)磔^大的數(shù)據(jù)冗余存儲,只能支持無窗口模式的連接處理.BiStream[13]采用基于二分圖的連接模型,將集群分成兩個部分,可根據(jù)連接操作的負載程度動態(tài)擴展處理單元的數(shù)量,較Squall中的矩陣模型大幅度降低了數(shù)據(jù)冗余度,提高了內(nèi)存資源的利用率,但在數(shù)據(jù)分組時利用了內(nèi)容敏感的混合路由策略,因此只能支持部分theta連接,且需要人工干預(yù)數(shù)據(jù)分組的參數(shù)設(shè)置.綜上所述,目前在分布式數(shù)據(jù)流處理系統(tǒng)上進行連接操作主要存在以下2個問題.

      (1)通用性低.多數(shù)系統(tǒng)不提供通用的連接編程接口,為了處理不同的應(yīng)用需要,用戶需要自己編寫復(fù)雜的連接算子;

      (2)theta連接處理限制性高.多數(shù)系統(tǒng)只能提供等值連接處理,或僅能支持部分簡單的theta連接處理.

      面對上述問題,本文從系統(tǒng)架構(gòu)與編程模型的角度對3個典型數(shù)據(jù)流處理系統(tǒng)進行對比分析,基于Join-Matrix矩陣模型,制定均衡的數(shù)據(jù)分發(fā)策略與輕量級的數(shù)據(jù)遷移機制,與已用的系統(tǒng)相結(jié)合,設(shè)計并實現(xiàn)了通用的、可處理任意theta連接的數(shù)據(jù)流系統(tǒng),并在Storm上實現(xiàn)了系統(tǒng)原型.通過大量實驗證明本文設(shè)計的系統(tǒng)具有更加良好的性能.

      本文后續(xù)組織安排如下.第1節(jié)闡述相關(guān)工作,包括數(shù)據(jù)流連接方法的介紹;第2節(jié)介紹了若干典型的數(shù)據(jù)流處理系統(tǒng),并進行對比分析;第3節(jié)詳細介紹系統(tǒng)架構(gòu)設(shè)計、系統(tǒng)功能模塊解析和連接處理的兩項關(guān)鍵技術(shù);第4節(jié)通過實驗驗證系統(tǒng);最后,在第5節(jié)進行總結(jié)與未來工作展望.

      1 相關(guān)工作

      由于數(shù)據(jù)流的無界性與動態(tài)性等特點,且傳統(tǒng)數(shù)據(jù)庫領(lǐng)域的連接處理中包含阻塞行為,因此不能直接運用于數(shù)據(jù)流.近年來,研究人員對數(shù)據(jù)流連接處理投入了大量的研究工作.

      對稱哈希連接(SHJ)[4]是一個簡單的、基于哈希的方法,假定所有的連接狀態(tài)均需存儲于內(nèi)存中.XJoin[5]和DPHJ[6]對SHJ進行了擴展,允許將哈希表中的部分?jǐn)?shù)據(jù)存儲到磁盤中.后來許多流式連接處理均以XJoin為基礎(chǔ),RPJ[7]就是其中之一,它基于統(tǒng)計信息的刷新策略,選取最有可能被選中的元組存儲在內(nèi)存中.HMJ[8]總是同時刷新所有數(shù)據(jù)流的哈希表,用于平衡數(shù)據(jù)流的內(nèi)存分配.以上均為集中式算法,因此不能直接應(yīng)用于分布式的數(shù)據(jù)流連接處理.

      Photon[9]是谷歌公司針對連接網(wǎng)絡(luò)查詢數(shù)據(jù)流和用戶點擊廣告數(shù)據(jù)流而設(shè)計的,利用中心協(xié)調(diào)器實現(xiàn)容錯和可擴展的多條數(shù)據(jù)流連接操作.由于采取非阻塞的方式進行鍵值匹配,不支持theta連接.D-Stream[10]是Spark Streaming定義的數(shù)據(jù)流操作對象,以阻塞的方式將數(shù)據(jù)流切分成一系列離散的微批次(mini-Batch),并利用世系機制實現(xiàn)快速容錯,可支持theta連接,但是由于存在窗口大小的限制,只能獲得近似查詢結(jié)果.TimeStream[11]針對任意theta連接,提供了類似MapReduce的批量計算與流式計算兩種處理方式,但是需要連接狀態(tài)的依賴信息,通信代價較大.

      2 分布式數(shù)據(jù)流處理系統(tǒng)

      本章節(jié)介紹三種典型的分布式數(shù)據(jù)流系統(tǒng)進行連接的處理機制,包括Storm、Spark Streaming與Squall,并給出相應(yīng)的對比分析.

      Storm是可擴展的且具備容錯能力的開源分布式數(shù)據(jù)流計算系統(tǒng),在實時計算方面表現(xiàn)突出,但是在處理連接操作時,需要根據(jù)具體的連接謂詞選擇適當(dāng)?shù)臄?shù)據(jù)分組策略,將數(shù)據(jù)分發(fā)到對應(yīng)的節(jié)點上進行連接處理.Storm提供了八種數(shù)據(jù)分組策略,使用最為廣泛的是隨機分發(fā)(shuffl e grouping)與按字段分發(fā)(f i eld grouping).隨機分發(fā)策略對數(shù)據(jù)進行洗牌,盡量均勻地隨機分發(fā)到各處理節(jié)點;按字段分發(fā)策略,根據(jù)路由字段值(f i eld)對數(shù)據(jù)進行分組,相同字段的數(shù)據(jù)將被分發(fā)至同一個處理節(jié)點.隨機分發(fā)策略由于不考慮數(shù)據(jù)的字段值,即其為內(nèi)容不敏感的數(shù)據(jù)分發(fā),因此在任意分布的數(shù)據(jù)負載下,都能保持處理節(jié)點間的絕對負載均衡.然而,在執(zhí)行連接操作時,內(nèi)容不敏感破壞了基于字段的操作語義,為了保證結(jié)果的完整性與正確性,需要付出額外的資源代價,譬如廣播數(shù)據(jù)帶來的網(wǎng)絡(luò)通信負載相反,按字段分發(fā)策略根據(jù)連接謂詞,將連接屬性數(shù)值相同的數(shù)據(jù)分發(fā)到同一個處理節(jié)點,不存在額外的數(shù)據(jù)廣播操作.但是,當(dāng)數(shù)據(jù)呈現(xiàn)傾斜分布時,譬如上述共享單車的例子中,地鐵站等人口流量較高的地點停放的單車數(shù)量遠遠高于其他區(qū)域,數(shù)據(jù)的分布不均將直接導(dǎo)致處理節(jié)點的負載不均衡.

      Spark Streaming是對Spark核心API(Application Programming Interface)的擴展,用于可伸縮、高吞吐量、可容錯地處理在線數(shù)據(jù)流.Spark Streaming接收在線數(shù)據(jù)流,將若干數(shù)據(jù)流元組以阻塞的方式構(gòu)造成微批次,然后通過Spark引擎處理,并最終得到由一系列微批次的數(shù)據(jù)項構(gòu)成的數(shù)據(jù)流.與Storm類似,利用Spark的RDD(Resilient Distributed Datasets)機制提供的基于字段與混洗的分區(qū)策略,進行連接處理.然后,由于微批次的數(shù)據(jù)結(jié)構(gòu),導(dǎo)致某些離散的微批次存在丟失連接元組對的現(xiàn)象,因此只能獲得近似查詢結(jié)果.

      Squall是由洛桑聯(lián)邦理工學(xué)院數(shù)據(jù)實驗室基于Storm開發(fā)的分布式在線查詢系統(tǒng),可利用SQL(Structured Query Language)查詢語句實現(xiàn)對數(shù)據(jù)流的實時連接處理.由于Storm處理連接操作的不便性,Squall將SQL查詢語句翻譯成由執(zhí)行算子構(gòu)成的有向無環(huán)圖查詢計劃,對Storm的組件進行封裝,實現(xiàn)連接查詢的邏輯任務(wù),最終通過構(gòu)建Storm的拓撲結(jié)構(gòu)來執(zhí)行查詢計劃.盡管該系統(tǒng)提供了便捷的編程接口,但是,該系統(tǒng)不支持滑動窗口操作.此外,由于底層連接處理是基于不易擴展且結(jié)構(gòu)限制的連接矩陣模型,因此系統(tǒng)擴展的靈活性較低,資源開銷較大.

      就數(shù)據(jù)流的連接處理,表1從連接類型、連接模型、可擴展性等方面,對上述三種數(shù)據(jù)流連接系統(tǒng)進行對比.

      表1 Storm、Spark Streaming與Squall系統(tǒng)對比Tab.1 Comparison among Storm,Spark Streaming and Squall

      表1中,Squall基于矩陣模型進行數(shù)據(jù)流的連接處理,盡管其可以精確地應(yīng)對任意連接謂詞的處理,然而這卻以消耗更多的計算資源為代價.下文展示了基于優(yōu)化的連接矩陣模型的系統(tǒng)實現(xiàn).

      3 系統(tǒng)實現(xiàn)

      本章節(jié)主要從系統(tǒng)架構(gòu)設(shè)計、系統(tǒng)功能模塊解析和連接處理的關(guān)鍵技術(shù)這三個方面進行闡述.針對現(xiàn)有分布式數(shù)據(jù)流系統(tǒng)在處理連接時存在的問題,融合一種高效的數(shù)據(jù)流處理系統(tǒng),它利用高吞吐量的分布式消息隊列作為輸入數(shù)據(jù)源的適配器,集成了主流的分布式數(shù)據(jù)流處理系統(tǒng),配以開源的、基于內(nèi)存的key-value數(shù)據(jù)庫作為中間緩存,并使用目前主流的前端框架,實時動態(tài)地展示系統(tǒng)的運行狀況.

      3.1 系統(tǒng)架構(gòu)設(shè)計

      該系統(tǒng)共由四層組件構(gòu)成,分別是數(shù)據(jù)源層,連接處理層,中間緩存層與應(yīng)用展示層,如圖1所示.

      Kafka[14]是一個開源的消息發(fā)布與訂閱系統(tǒng),數(shù)據(jù)源層利用Kafka消息隊列接收現(xiàn)實應(yīng)用的用戶行為數(shù)據(jù)或日志數(shù)據(jù),作為系統(tǒng)的輸入數(shù)據(jù)適配器.連接處理層是基于分布式數(shù)據(jù)流處理系統(tǒng)Storm進行連接操作的封裝,將處理單元(bolt的task實例)組織成連接矩陣的形式.第三節(jié)將給出有關(guān)連接矩陣的設(shè)計與分析.中間緩存層利用內(nèi)存數(shù)據(jù)庫Redis[15]存放系統(tǒng)運行時需要使用的各類中間值與信號量,便于底層與上層的信息傳遞與交互.最上層為任務(wù)提交與運行狀況顯示的應(yīng)用層,用戶可自定義配置參數(shù)(Storm的bolt并行度等),提交連接查詢?nèi)蝿?wù),并利用Angular Js[16]前端架構(gòu)動態(tài)展示系統(tǒng)運行的狀況,比如處理架構(gòu)的變換和數(shù)據(jù)遷移的方向與規(guī)模等.

      圖1 系統(tǒng)架構(gòu)圖Fig.1 The architecture of system

      3.2 系統(tǒng)功能模塊解析

      為分析當(dāng)前實時數(shù)據(jù)流連接處理的任務(wù)要求,系統(tǒng)包含三大功能模塊:數(shù)據(jù)路由模塊、數(shù)據(jù)連接模塊與數(shù)據(jù)控制模塊,如圖2所示.

      圖2 數(shù)據(jù)流連接系統(tǒng)功能模塊圖Fig.2 Function block diagram of stream join processing system

      數(shù)據(jù)路由模塊實時接收輸入數(shù)據(jù)適配,按照特定的路由策略,將數(shù)據(jù)分發(fā)至數(shù)據(jù)連接模塊的對應(yīng)連接計算單元.

      數(shù)據(jù)連接模塊接收上層應(yīng)用提交的連接查詢請求,并轉(zhuǎn)為對已存儲數(shù)據(jù)進行連接的邏輯計算任務(wù),由具體的算子實施真實的連接計算.當(dāng)前數(shù)據(jù)連接模塊主要包括以下三部分業(yè)務(wù):(1)查詢分析:將連接查詢請求翻譯成由執(zhí)行算子構(gòu)成的有向無環(huán)圖的連接查詢計劃,每個算子對應(yīng)于Storm的一個組件,將算子的多個實例組織成矩陣的形式,并通過構(gòu)建Storm的拓撲結(jié)構(gòu)來執(zhí)行連接查詢計劃.第3.3節(jié)將給出有關(guān)連接矩陣的關(guān)鍵技術(shù).(2)連接計算:根據(jù)查詢分析制定的連接查詢計劃,對Storm的組件(bolt)進行封裝,實現(xiàn)具體的連接計算.多個連接算子計算,且互相獨立,可實現(xiàn)相關(guān)工作中所述的任意集中式連接算法.(3)負載匯報:連接算子周期性地向數(shù)據(jù)控制模塊匯報當(dāng)前工作負載信息.

      數(shù)據(jù)控制模塊是系統(tǒng)的核心功能模型,負責(zé)處理架構(gòu)的制定,主要包括以下三部分業(yè)務(wù): (1)負載監(jiān)控:接收數(shù)據(jù)連接模塊定期匯報的負載,根據(jù)數(shù)據(jù)流的流量變化以及當(dāng)前工作負載是否超出內(nèi)存閾值,判斷算子實例(處理單元)的組織形式是否需要調(diào)整.(2)矩陣制定:在負載監(jiān)控的基礎(chǔ)上,制定出資源消耗最小的矩陣結(jié)構(gòu),并更新數(shù)據(jù)路由模塊中的路由策略.(3)遷移計劃制定:為實現(xiàn)矩陣結(jié)構(gòu)變換時數(shù)據(jù)遷移代價的最小化,利用局部敏感的機制制定數(shù)據(jù)遷移計劃,指導(dǎo)連接模塊中的連接計算單元進行相應(yīng)的數(shù)據(jù)遷移操作.

      3.3 平臺關(guān)鍵技術(shù)

      本文系統(tǒng)設(shè)計的關(guān)鍵在于有效應(yīng)對數(shù)據(jù)傾斜與降低連接資源開銷的平衡兼顧問題.本文使用Join-Matrix矩陣模型來解決數(shù)據(jù)傾斜問題,并制定局部敏感的數(shù)據(jù)遷移機制來解決連接操作的資源開銷問題.由于篇幅的限制,有關(guān)上述兩種技術(shù)的具體細節(jié)可參考文獻[17-19].

      3.3.1 Join-Matrix矩陣模型

      Join-Matrix作為一種高性能的連接矩陣模型,方便部署于分布式環(huán)境下,支持任意連接謂詞的數(shù)據(jù)流連接操作.由于采取隨機分發(fā)元組作為路由策略,Join-Matrix可利用對元組內(nèi)容的不敏感性來有效抵御數(shù)據(jù)傾斜.為了實現(xiàn)工作節(jié)點的負載均衡以及網(wǎng)絡(luò)傳輸代價的最小化,本文在連接矩陣的模型基礎(chǔ)上,利用等周定理引申的兩條定理,即(1)在給定面積的所有矩形中,正方形的周長最小;(2)在給定周長的所有矩形中,正方形的面積最大.將系統(tǒng)中的計算資源抽象為上述定理中的面積與半周長,進而設(shè)計了一種代價高效的數(shù)據(jù)劃分方案,使其占用的資源開銷最小.

      3.3.2 數(shù)據(jù)遷移機制

      為了實現(xiàn)矩陣結(jié)構(gòu)變換時數(shù)據(jù)遷移代價的最小化,本文設(shè)計了一種局部敏感的遷移機制,在進行矩陣變換之前,確定新舊矩陣中各個處理單元之間的映射關(guān)系,計算各處理單元之間的數(shù)據(jù)重疊程度,并將重疊程度最高的設(shè)置為最終配對單元.如圖3所示,當(dāng)數(shù)據(jù)流S的流量增大時,矩陣結(jié)構(gòu)由2×2轉(zhuǎn)變?yōu)?×3.為了實現(xiàn)數(shù)據(jù)遷移量的最小化,在原矩陣的中間增加一列新的處理單元,即第5和第6號單元,將1號和3號處理單元中存儲的所有R流元組分別復(fù)制到5號與6號單元,同時將1號處理單元存儲的S流元組與2號處理單元存儲的S流元組復(fù)制到5號處理單元;將3號處理單元存儲的S流元組與4號處理單元存儲的S流元組復(fù)制到6號處理單元.

      圖3 矩陣變換(2×2轉(zhuǎn)變?yōu)?×3)Fig.3 Matrix transformation(2×2 to 2×3)

      4 實驗與系統(tǒng)展示

      本章節(jié)通過一系列對比實驗對本文設(shè)計的數(shù)據(jù)流連接查詢系統(tǒng)進行性能評估,并進行系統(tǒng)運行展示.實驗部署在一組由22個物理主機的惠普刀片機服務(wù)器集群,單個機器擁有2個4核4線程的處理器,型號為Intel Xeon E5335,主頻為250 GHz,并配有16 GB的RAM內(nèi)存與2 TB的硬盤.所有的服務(wù)器主機運行Cent OS 65 Linux操作系統(tǒng).通過控制元組處理的壓力,使得分布式系統(tǒng)達到CPU資源的飽和點.實驗結(jié)果是五次運行記錄的平均值.

      本文共使用了三種不同的系統(tǒng)來進行對比實驗,MFM是本文設(shè)計的數(shù)據(jù)流連接查詢系統(tǒng); Squall是文獻[12]中設(shè)計的基于連接矩陣模型的系統(tǒng);Storm即原生態(tài)的分布式數(shù)據(jù)流系統(tǒng),默認(rèn)設(shè)置數(shù)據(jù)的分發(fā)策略為按字段分發(fā).

      實驗中使用TPC-benchmark[20]的數(shù)據(jù)生成器dbgen生成具有Zipf分布的數(shù)據(jù)集,通過參數(shù)z調(diào)整數(shù)據(jù)的傾斜程度,默認(rèn)值為1.實驗基于全歷史記錄模型進行處理,將數(shù)據(jù)源源不斷地載入系統(tǒng),記錄系統(tǒng)運行期間占用的處理單元數(shù)量,如圖4(a)所示.隨著數(shù)據(jù)的不斷流入,由于矩陣形狀的限制,Squall占用的處理單元數(shù)量急劇增加,相反,MFM根據(jù)當(dāng)前系統(tǒng)的負載情況按需分配資源,消耗的處理單元最少.

      此外,實驗還使用了社交網(wǎng)絡(luò)微博數(shù)據(jù)集Weibo,該數(shù)據(jù)集包含了微博服務(wù)五天的數(shù)據(jù)量,數(shù)據(jù)集中每一個元組的結(jié)構(gòu)是以話題為key的字符串序列,數(shù)據(jù)總量為12 GB,涵蓋10 000個話題.實驗中基于窗口模型進行自連接操作,設(shè)置一個窗口內(nèi)的工作負載為最近24小時內(nèi)的所有元組,記錄運行期間各系統(tǒng)的吞吐量,如圖4(b)所示.由于Storm采用按字段分發(fā),隨著數(shù)據(jù)傾斜程度的增加,系統(tǒng)中某些處理單元的工作負載遠遠超過其他單元,成為系統(tǒng)中的straggler,進而降低了系統(tǒng)的吞吐量.Squall由于在矩陣變換時占用較多的處理單元,導(dǎo)致大量的數(shù)據(jù)遷移量,進而吞吐量低于本文的系統(tǒng)MFM.

      圖4 實驗結(jié)果Fig.4 Experiment results

      圖5為系統(tǒng)運行的展示界面.用戶選擇連接查詢?nèi)蝿?wù),設(shè)置處理模式(窗口模式或全歷史記錄模式)并提交執(zhí)行.圖5(a)展示了系統(tǒng)運行期間處理單元組織結(jié)構(gòu)的變換過程,即由2×2的矩陣轉(zhuǎn)換為3×4的矩陣,并給出了數(shù)據(jù)遷移的具體細節(jié).其中,綠色節(jié)點為舊矩陣的處理單元,黃色節(jié)點為新增的處理單元.圖5(b)給出了系統(tǒng)運行期間各處理節(jié)點的工作負載,從圖中可以看出,系統(tǒng)的負載均衡性能較高.

      5 總結(jié)

      分布式數(shù)據(jù)流連接操作在流式計算中占據(jù)舉足輕重的地位.由于數(shù)據(jù)流的動態(tài)特性使得相關(guān)技術(shù)的研究面臨嚴(yán)峻的挑戰(zhàn).本文以負載均衡與低資源開銷為目標(biāo),基于連接矩陣模型,設(shè)計并實現(xiàn)了代價高效的數(shù)據(jù)流連接處理系統(tǒng),可支持任意theta連接.但是,連接矩陣由于內(nèi)容不敏感的特性,存在較為嚴(yán)重的數(shù)據(jù)冗余存儲現(xiàn)象.因此,在未來的工作中,我們會進一步研究更優(yōu)的方法,將內(nèi)容敏感與連接矩陣相結(jié)合,將相關(guān)技術(shù)集成到Storm的底層源碼,并在應(yīng)用層做出改進.

      圖5 系統(tǒng)展示Fig.5 System demo

      [1]ANKIT T,SIDDARTH T,AMIT S,et al.Storm@Twitter[C]//Proceedings of SIGMOD International Conference on Management of Data.ACM,2014:147-156.

      [2]LEONARDO N,BRUCE R,ANISH N,et al.S4:Distributed stream computing platform[C]//Proceedings of the International Conference on Data Mining Workshops,2010:170-177.

      [3]CHEN G J,WIENER J L,IYER S,et al.Realtime data processing at Facebook[C]// Proceedings of SIGMOD International Conference on Management of Data.ACM,2016:1087-1098.

      [4]WILSCHUT A N,APERS P M G.Dataf l ow query execution in a parallel main-memory environment[J].Distributed and Parallel Databases,1993(1):103-123.

      [5]URHAN T,FRANKLINM J.Dynamic pipeline scheduling for improving interactive query performance[C]//Proceedings of International Conference on Very Large Data Bases.2001:501-510.

      [6]IVES Z G,FLORESCU D,FRIEDMAN M,et al.An adaptive query execution system for data integration[C]//Proceedings of SIGMOD International Conference on Management of Data.ACM,1999:299-310.

      [7]TAO Y F,YIU M L,PAPADIAS D,et al.RPJ:Producing fast join results on streams through rate-based optimization[C]//Proceedings of SIGMOD International Conference on Management of Data.ACM,2005:371-382.

      [8]MOKBEL M F,LU M,AREF W G.Hash-merge join:A non-blocking join algorithm for producing fast and early join results[C]//Proceedings of the 20th International Conference on Data Engineering.2004:251-262.

      [9]ANANTHANARAYANAN R,BASKER V,DAS S,et al.Photon:Fault-tolerant and scalable joining of continuous data streams[C]//Proceedings of SIGMOD International Conference on Management of Data.ACM,2013: 577-588.

      [10]ZAHARIA M,DAS T,LI H Y,et al.Discretized streams:Fault-tolerant streaming computation at scale[C]// Proceedings of the 24th ACM Symposium on Operating Systems Principles.2013:423-438.

      [11]QIAN Z P,HE Y,SU C Z,et al.TimeStream:Reliable stream computation in the cloud[C]//Proceedings of the 8th ACM European Conference on Computer Systems.ACM,2013:1-14.

      [12]ELSEIDY M,ELGUINDY A,VITOROVIC A,et al.Scalable and adaptive online joins[C]//Proceedings of International Conference on Very Large Data Bases,2014(7):441-452.

      [13]LIN Q,OOI B C,WANG Z K,et al.Scalable distributed stream join processing[C]//Proceedings of ACM SIGMOD International Conference on Management of Data.ACM,2015:811-825.

      [14]GOODHOPE K,KOSHY J,KREPS J,et al.Building linkedin’s real-time activity data pipeline[J].IEEE Data Eng Bull,2012,35(2):33-45.

      [15]REDIS.[DB/OL].[2017-06-01].https://redis.io/.

      [16]ANGULAR JS.[EB/OL].[2017/06-01].https://angularjs.org/.

      [17]FANG J H,ZHANG R,WANG X T,et al.Distributed stream join under workload variance[J].World Wide Web Journal,2017:1-22.

      [18]FANG J H,WANG X T,ZHANG R,et al.Flexible and adaptive stream join algorithm[C]//Proceedings of International Conference on Asia-Pacif i c Web,2016:3-16.

      [19]FANG J H,ZHANG R,WANG X T,et al.Cost-ef f ective stream join algorithm on cloud system[C]//Proceedings of CIKM International Conference on Information and Knowledge Management.ACM,2016:1773-1782.

      [20]TPC-H BENCHMARK.[EB/OL].[2017-06-01].http://www.tpc.org/tpch.

      (責(zé)任編輯:林磊)

      Distributed stream processing system for join operations

      CHEN Ming-zhu,WANG Xiao-tong,FANG Jun-hua,ZHANG Rong
      (School of Computer Science and Software Engineering,Shanghai Key Laboratory of Trustworthy Computing,East China Normal University,Shanghai 200062,China)

      Real-time stream processing system plays an increasingly important role in practical applications.Stream Join constitutes one of the most important and expensive operation in big data analysis.However,skewed data distribution in real-world applications and inherent features of streaming data,such as inf i nity and unpredictability,put great pressure on the join processing in distributed stream systems.Mainstream industrial stream systems have low versatility on join processing,providing no programming interface; though several academic stream prototype systems solve such a problem to a certain extent, they support equi-join processing only,or results in high resource utilization and severe load imbalance.In this paper,after analyzing three typical distributed stream systems,we integrate the techniques based on Join-Matrix into Storm,design and implement a general stream processing system which supports arbitrary theta joins.Experiments demonstratethat the system proposed in this paper outperforms the static-of-the-art strategies.

      stream processing system;join processing;distributed computing

      TP391

      A

      10.3969/j.issn.1000-5641.2017.05.002

      1000-5641(2017)05-0011-09

      2017-06-28

      國家大學(xué)生創(chuàng)新創(chuàng)業(yè)訓(xùn)練計劃(20160269127);國家自然科學(xué)基金(61232002);國家863計劃(2015AA015307);國家自然基金委項目(61672233)

      陳明珠,女,本科生,專業(yè)為計算機科學(xué).E-mail:101521300140@stu.ecnu.edu.cn.

      張蓉,女,教授,研究方向為分布式數(shù)據(jù)管理.E-mail:rzhang@sei.ecnu.edu.cn.

      猜你喜歡
      元組處理單元數(shù)據(jù)流
      不同生物鏈組合對黃河下游地區(qū)引黃水庫富營養(yǎng)化及藻類控制
      城市污水處理廠設(shè)備能耗及影響因素分析研究
      科技資訊(2021年10期)2021-07-28 04:04:53
      長填齡滲濾液MBR+NF組合工藝各處理單元的DOM化學(xué)多樣性
      Python核心語法
      電腦報(2021年14期)2021-06-28 10:46:22
      一種高可用負載均衡網(wǎng)絡(luò)數(shù)據(jù)采集處理的方法及系統(tǒng)
      汽車維修數(shù)據(jù)流基礎(chǔ)(下)
      海量數(shù)據(jù)上有效的top-kSkyline查詢算法*
      基于減少檢索的負表約束優(yōu)化算法
      一種提高TCP與UDP數(shù)據(jù)流公平性的擁塞控制機制
      基于數(shù)據(jù)流聚類的多目標(biāo)跟蹤算法
      濮阳市| 重庆市| 湖北省| 北海市| 报价| 伽师县| 阆中市| 汉川市| 密山市| 巨鹿县| 靖边县| 元谋县| 安福县| 大荔县| 精河县| 宜都市| 和平区| 长宁区| 宜宾县| 陇西县| 义乌市| 隆子县| 江达县| 乐至县| 祁连县| 河西区| 普陀区| 保山市| 新晃| 辰溪县| 察隅县| 上饶县| 南昌县| 城口县| 长宁县| 凌海市| 阿勒泰市| 云和县| 滦南县| 乌审旗| 靖远县|