韓雨軒 李盼穎 溫秀梅,3,* 馬兆輝 張書瑋
(1.河北建筑工程學院,河北 張家口 075000;2.哈爾濱工業(yè)大學(深圳),中國 深圳 518055;3.張家口市大數(shù)據(jù)技術(shù)創(chuàng)新中心,河北 張家口 075000)
隨著信息時代的不斷發(fā)展,人們每天在互聯(lián)網(wǎng)中產(chǎn)生的數(shù)據(jù)量迅速膨脹.對于數(shù)據(jù)信息中價值的挖掘成為當下的熱點和人們關(guān)注的焦點,而數(shù)據(jù)的價值隨著時間的流逝不斷的降低,所以人們對于獲取數(shù)據(jù)信息和處理數(shù)據(jù)信息的時效性都有了更高的要求.實時到來實時處理的數(shù)據(jù)稱為流數(shù)據(jù),也叫動態(tài)數(shù)據(jù).不同于靜態(tài)數(shù)據(jù),流數(shù)據(jù)以大量、快速、時變的流形式持續(xù)到達,需要計算框架對其進行實時數(shù)據(jù)采集和計算處理,而傳統(tǒng)的批處理框架為了提高吞吐量犧牲了實時性,無法適用于流數(shù)據(jù)的實時處理.為了解決這個需求,多種實時流計算框架相繼誕生,它是專門面向動態(tài)數(shù)據(jù)處理的計算平臺[1],具有低延遲、可擴展、高可用等特點,廣泛應用于金融服務、網(wǎng)絡監(jiān)控、電信數(shù)據(jù)管理、Web應用、生產(chǎn)制造、傳感檢測等應用領(lǐng)域.本文主要介紹兩種時下應用廣泛的流式計算框架Storm和Spark Streaming,通過實驗分析這兩種框架的結(jié)構(gòu)和工作流程,并對比分析了兩種框架之間的不同點,最后對基于Spark Streaming的實驗進行了改進[2],在集群環(huán)境中對Spark Streaming、Flume和Kafka進行了整合.
Storm是應用廣泛的流式計算框架,由stream、spout和bolt組成,其中,stream是傳輸?shù)臄?shù)據(jù)流信息,spout是數(shù)據(jù)流的源頭,也就是生產(chǎn)者,bolt是數(shù)據(jù)流在運算過程中的某個階段.整個結(jié)構(gòu)可以稱為一個拓撲(topology),如圖1所示.在Storm流式計算框架中,從數(shù)據(jù)流源頭到數(shù)據(jù)計算的過程持續(xù)運行,直到進程結(jié)束部署[3].
圖1 Storm結(jié)構(gòu)圖
在Storm中,stream由任意多個元組組成,元組(tuple)是Storm中的核心數(shù)據(jù)結(jié)構(gòu),它是由任意多個鍵值對組成的列表.
spout是Storm中數(shù)據(jù)的源頭,數(shù)據(jù)流在spout中被轉(zhuǎn)換為Storm的數(shù)據(jù)結(jié)構(gòu)tuple,作為連續(xù)的數(shù)據(jù)流傳遞到負責運算的節(jié)點[4].spout可以接收的數(shù)據(jù)源有很多種類,例如網(wǎng)站上的點擊流數(shù)據(jù)、社交軟件上產(chǎn)生的實時信息、采集器采集的日志信息等.storm將數(shù)據(jù)轉(zhuǎn)換和處理業(yè)務邏輯分離,不僅使spout的開發(fā)更為便捷,也增強了它的復用性.spout開發(fā)的主要內(nèi)容是消費數(shù)據(jù)源的實時流數(shù)據(jù).
bolt是Storm中負責處理業(yè)務邏輯的組件,它從spout接收tuple數(shù)據(jù)流,并在進行相應的邏輯計算后,將結(jié)果發(fā)送到下一級的bolt中進行下一階段的業(yè)務處理.bolt常用的計算功能包括過濾、聚合和計算.bolt的傳遞關(guān)系根據(jù)用戶的業(yè)務需要進行組建,可以形成復雜的Topology結(jié)構(gòu)[5].
Spark Streaming是基于Spark的流式計算框架,可以對動態(tài)數(shù)據(jù)進行高通量,高容錯處理,spark Streaming可以靈活選擇數(shù)據(jù)源,例如:采集系統(tǒng)獲取到的數(shù)據(jù)流,或者通過TCP套接字獲取的數(shù)據(jù)流等.它可以對獲取到的數(shù)據(jù)流進行Map、Reduce和Join等復雜操作.最終Spark Streaming將計算后的結(jié)果數(shù)據(jù)存儲到文件系統(tǒng)或分布式數(shù)據(jù)庫中[6].
Spark Streaming的工作原理和Storm不同,它本質(zhì)是Spark的改進,是將實時流式數(shù)據(jù)以細小的批處理作業(yè)進行計算,實現(xiàn)流式數(shù)據(jù)的實時處理.Spark Streaming中的對數(shù)據(jù)進行離散化(Discretized)處理,依照一定的batch size將動態(tài)接收的數(shù)據(jù)轉(zhuǎn)化為Spark Streaming中的數(shù)據(jù)結(jié)構(gòu)DStream.在Spark Streaming中,對DStream的Transformations操作會進一步變?yōu)镽DD的Transformations操作,每一步的中間結(jié)果以內(nèi)存的形式緩存[7].
計算過程根據(jù)用戶的業(yè)務需求進行設計,最終輸出到外部數(shù)據(jù)庫進行存儲,整體架構(gòu)如圖2所示.
圖2 Spark Streaming流程圖
DStream是一種抽象的數(shù)據(jù)結(jié)構(gòu),在Spark Streaming計算框架中,DStream可以表示為連續(xù)性的RDD,也就是持續(xù)性的數(shù)據(jù)流.這里的RDD包含一小段時間內(nèi)的數(shù)據(jù)流,整個計算過程中對數(shù)據(jù)的處理同樣是以RDD為單位進行的,計算過程由Spark引擎Spark core來完成[8].流程如圖3所示.
圖3 DStream流程圖
分別在Storm和Spark Streaming流計算框架中設計實驗進行分析,實驗及改進設計Storm、Spark Streaming、Flume、Kafka等技術(shù),各軟件具體版本如表1所示.
表1 軟件及對應版本表
實驗中,Spout不斷讀取數(shù)組中的語句作為數(shù)據(jù)流的來源,并以行的形式讀取到Tuple中,傳輸給SplitBolt進行單詞的切分,然后制定消息流在Bolt之間的傳輸規(guī)則——Strom消息流分組.目的是為了在進行單詞計數(shù)的時候,同一個單詞的統(tǒng)計發(fā)送到同一個WordCountBolt上,SplitBolt訂閱Spout發(fā)射的tuple,每接收到一個tuple就獲取"line"對應的值,并保存到一個字符串中:String line=input.getStringByField("line").將獲取到的數(shù)據(jù)流以空格切分為單詞,保存到字符串數(shù)組中:String[]words=line.split("").最終切分后的單詞會向下一級Bolt發(fā)射tuple:collector.emit(input,new Values(word)).形式如:{"word","hallo"}.
WordCountBolt訂閱SplitBolt的輸出,用來計算單詞的當前頻率.每接收到一個到來的tuple,會將tuple中單詞的頻率加一:map.put(word,map.get(word)+1),并向后發(fā)送該單詞的當前頻數(shù),形式如:{"word":"I","count":3}.
最終結(jié)果通過reportBolt進行輸出打印,輸出為當前單詞統(tǒng)計的當前頻率,打印結(jié)果形如:“單詞:word的當前頻次為107”.實驗過程,通過ack確認機制來確保數(shù)據(jù)傳輸?shù)目煽啃?整體設計如圖4所示.
圖4 Storm實驗設計圖
實驗中Storm的Topology設計如圖5所示.
圖5 Topology設計圖
同時,在單節(jié)點上進行Spark Streaming的實驗設計,通過監(jiān)聽Socket獲取數(shù)據(jù),該處需要提供Socket的主機名和端口號,數(shù)據(jù)保存在內(nèi)存和硬盤中.信息的獲取流程如圖6所示.9999端口為NC開啟的端口.為避免Spark Streaming只處理最新數(shù)據(jù)流信息,而不緩存舊的歷史結(jié)果,需要通過chekcpoint方法設置檢查點,通過檢查點將之前處理的RDD結(jié)果存入到指定的目錄.利用updateStateByKey函數(shù)對數(shù)據(jù)流的處理結(jié)果進行累加,然后傳入一個自定義的累加函數(shù)updateFunc:
圖6 信息獲取流程圖
iter.flatMap{case(x,y,z)=>Some(y.sum+z.getOrElse(0)).map(m=>(x,m))
通過實驗,可以看到雖然兩框架都提供了可擴展性和可容錯性,但是它們的處理模型從根本上是不一樣的.實驗中Storm使用Java語言進行程序開發(fā),編程模型為Spout/Bolt,而Spark使用Scala語言,編程模型為DStream.Storm框架每次實時處理一條接受到的event,而Spark Streaming是在一個短暫的時間間隔里處理小批量的Event,其數(shù)據(jù)結(jié)構(gòu)DStream的本質(zhì)還是RDD,而RDD是批量處理.所以說Storm可以真正實現(xiàn)毫秒級處理,而相比于Storm,Spark Streaming則有一定的時延.
對于大多數(shù)的應用,計算不會成為瓶頸,而計算完成后的數(shù)據(jù)存儲才是真正的問題.即使使用了storm,也沒辦法處理完一條數(shù)據(jù)就落到數(shù)據(jù)庫或者HDFS中.而對于一些計算一段時間的數(shù)據(jù)流的應用,無需數(shù)據(jù)來一條就處理一條,立刻要看結(jié)果的,大多都有10s或者幾秒的延遲,這樣的情況下spark streaming具有良好的適應性.
Spark Streaming支持的數(shù)據(jù)輸入源除了上述的TCP套接字之外,還可以通過Kafka、Flume、Twitter、ZeroMQ等獲取數(shù)據(jù)源[9],對獲取到的數(shù)據(jù)流進行Map、Reduce和Join等復雜計算后的結(jié)果數(shù)據(jù)存儲到文件系統(tǒng)HDFS或分布式數(shù)據(jù)庫中.故可以將Spark和Flume,Kafka進行整合,將實驗搭建在集群之上,通過zookeeper進行協(xié)調(diào)服務,利用Flume實時采集數(shù)據(jù)并寫入到Kafka,在Kafka中創(chuàng)建Topic:
bin/kafka-topics.sh--create--zookeeper node1.itcast.cn:2181,node2.itcast.cn:2181--replication-factor 3--partitions 3--topic urlcount
Spark Streaming利用KafkaUtils.createStream方法從Kafka中拉取數(shù)據(jù)流信息,創(chuàng)建DStream并進行實時計算,實驗設計圖如圖7所示.
圖7 實驗設計圖
本文論述了Storm和Spark Streaming兩種流式計算框架的設計思想和架構(gòu)設計,通過實驗對二者架構(gòu)原理和工作流程進行了進一步的闡述和分析,并對Storm和Spark Streaming之間的不同點進行了對比和分析.在實際應用中,從數(shù)據(jù)過濾,進行解析進行處理,到計算完成后進行數(shù)據(jù)落地的過程中,計算通常不會成為瓶頸,而是受限于數(shù)據(jù)存儲的速率.所以Spark Streaming除了應用于一些需要歷史數(shù)據(jù)和實時數(shù)據(jù)結(jié)合分析的特定應用場合之外,還可以適用于對實時性要求不高或者數(shù)據(jù)量非常巨大的應用場景.