陳曉 朱志祥 梁小江
摘 要:海量數(shù)據(jù)的實(shí)時(shí)處理不僅要求計(jì)算框架快速高效,同時(shí)要求流處理過(guò)程中產(chǎn)生的中間數(shù)據(jù)的存儲(chǔ)過(guò)程同樣高效,因此,可通過(guò)提高Spark Streaming對(duì)中間結(jié)果數(shù)據(jù)的處理速度來(lái)提升流處理效率。為提高Spark Streaming處理中間結(jié)果的效率,文中選擇HBase作為中間數(shù)據(jù)存儲(chǔ)系統(tǒng),并通過(guò)分析Spark Streaming的架構(gòu)及HBase的存儲(chǔ)原理,給出了Spark Streaming向HBase寫(xiě)入數(shù)據(jù)的方法并進(jìn)行優(yōu)化。通過(guò)對(duì)Spark Streaming存儲(chǔ)過(guò)程的優(yōu)化,可以一定程度上提高實(shí)時(shí)數(shù)據(jù)的流處理效率。
關(guān)鍵詞:Spark Streaming;HBase;大數(shù)據(jù);內(nèi)存計(jì)算;流處理
中圖分類號(hào):TP274.2 文獻(xiàn)標(biāo)識(shí)碼:A 文章編號(hào):2095-1302(2016)04-00-03
0 引 言
隨著移動(dòng)互聯(lián)網(wǎng)、社交網(wǎng)絡(luò)等領(lǐng)域的快速發(fā)展,數(shù)據(jù)量呈指數(shù)式增長(zhǎng),大數(shù)據(jù)時(shí)代全面來(lái)臨。在這個(gè)高速發(fā)展的時(shí)代,數(shù)據(jù)的變化速度也越來(lái)越快,對(duì)數(shù)據(jù)處理和響應(yīng)時(shí)間的要求也更加苛刻,數(shù)據(jù)的實(shí)時(shí)分析和流式處理變得尤為重要。例如,在移動(dòng)通信領(lǐng)域,對(duì)海量數(shù)據(jù)進(jìn)行實(shí)時(shí)的挖掘分析,可以準(zhǔn)確識(shí)別類似于詐騙的電信請(qǐng)求,從而有效避免電信詐騙的發(fā)生。再比如,通過(guò)對(duì)移動(dòng)人口數(shù)據(jù)的實(shí)時(shí)挖掘分析,快速預(yù)測(cè)可能的突發(fā)事件。Spark Streaming是建立在Spark上的實(shí)時(shí)計(jì)算框架,擁有基于內(nèi)存的高速執(zhí)行引擎,并且提供豐富的接口和API,被廣泛用于實(shí)時(shí)數(shù)據(jù)流的分析處理。
對(duì)海量實(shí)時(shí)數(shù)據(jù)進(jìn)行處理,必然會(huì)產(chǎn)生大量的中間數(shù)據(jù),如何高效存儲(chǔ)Spark Streaming處理過(guò)程中產(chǎn)生的數(shù)據(jù)也是大數(shù)據(jù)處理過(guò)程中常見(jiàn)的問(wèn)題。HBase的LSM樹(shù)型存儲(chǔ)結(jié)構(gòu)使其具有實(shí)時(shí)讀寫(xiě)數(shù)據(jù)的功能。使用HBase作為Spark Streaming中間數(shù)據(jù)的存儲(chǔ)數(shù)據(jù)庫(kù)可大大提高數(shù)據(jù)處理的效率。
本文基于實(shí)時(shí)數(shù)據(jù)的流式處理過(guò)程,給出了Spark Streaming將數(shù)據(jù)寫(xiě)入HBase的具體方法,并在此基礎(chǔ)上進(jìn)行了優(yōu)化。
1 Spark Streaming簡(jiǎn)介
Spark Streaming是Spark生態(tài)系統(tǒng)的重要組成部分,主要用于實(shí)時(shí)數(shù)據(jù)流的處理。Spark Streaming的工作原理是將流式計(jì)算分解成一系列短小的批處理作業(yè),本質(zhì)上也是數(shù)據(jù)的批量處理,只是將時(shí)間跨度控制在數(shù)十毫秒到數(shù)秒之間。這里批處理的引擎依然為Spark,Spark Streaming將輸入數(shù)據(jù)按照批量大小(此處指時(shí)間跨度如1秒)分成一段一段的數(shù)據(jù)(Discretized Stream),然后每一段數(shù)據(jù)都會(huì)轉(zhuǎn)換成Spark中的彈性數(shù)據(jù)集(Resilient Distributed Dataset,RDD),最后將Spark Streaming中對(duì)DStream的具體操作都轉(zhuǎn)換成Spark中對(duì)RDD的操作,并將中間結(jié)果暫存在內(nèi)存中。整個(gè)流式數(shù)據(jù)處理任務(wù)可以根據(jù)需求對(duì)中間數(shù)據(jù)加以利用,比如疊加,或者將結(jié)果存儲(chǔ)到外部設(shè)備,例如文件系統(tǒng)HDFS,或者外部數(shù)據(jù)庫(kù)Hive,HBase。
HBase – Hadoop Database是Apache Hadoop的一個(gè)子項(xiàng)目,是一個(gè)高可靠性、高性能、面向列、可伸縮的分布式存儲(chǔ)系統(tǒng)。HBase采用LSM樹(shù)的存儲(chǔ)結(jié)構(gòu),這種結(jié)構(gòu)的核心在于每一次執(zhí)行插入操作時(shí)數(shù)據(jù)都會(huì)先進(jìn)入MemStore(內(nèi)存緩沖區(qū)),當(dāng)MemStore達(dá)到上限時(shí),HBase會(huì)將內(nèi)存中的數(shù)據(jù)輸出為有序的StoreFile文件數(shù)據(jù)。而在HBase中數(shù)據(jù)列是由列簇來(lái)組織的,所以每一個(gè)列簇都會(huì)有對(duì)應(yīng)的一個(gè)數(shù)據(jù)結(jié)構(gòu),HBase將列簇的存儲(chǔ)數(shù)據(jù)結(jié)構(gòu)抽象為Store,一個(gè)Store代表一個(gè)列簇。這樣在Store中會(huì)形成很多個(gè)小的StoreFile,當(dāng)這些小的File數(shù)量達(dá)到閾值時(shí),HBase會(huì)用一個(gè)線程來(lái)把這些小File合并成一個(gè)大File。這樣,HBase就把效率低下的文件中的插入、移動(dòng)操作轉(zhuǎn)變成了單純的文件輸出、合并操作。從而使HBase的讀寫(xiě)數(shù)據(jù)速度非???,能夠支持實(shí)時(shí)讀寫(xiě)。所以在對(duì)海量實(shí)時(shí)數(shù)據(jù)進(jìn)行處理時(shí)通常使用HBase作為數(shù)據(jù)存儲(chǔ)系統(tǒng)。
2 Spark Streaming寫(xiě)入數(shù)據(jù)到HBase
2.1 實(shí)現(xiàn)方法
Spark Streaming向HBase寫(xiě)入數(shù)據(jù)時(shí)需要對(duì)每一條數(shù)據(jù)執(zhí)行插入操作,通常會(huì)采用輸出方法foreachRDD(func),將func(此處指將數(shù)據(jù)插入HBase表格)作用于DStream的每一個(gè)RDD。
在上述代碼中,countBase為待處理的DStream,首先對(duì)countBase進(jìn)行foreachRDD操作,然后對(duì)每個(gè)RDD進(jìn)行操作。此處依據(jù)項(xiàng)目需求對(duì)每個(gè)RDD進(jìn)行非空判斷,然后對(duì)每個(gè)RDD執(zhí)行foreach操作,進(jìn)而對(duì)RDD的每條數(shù)據(jù)record調(diào)用writeToHBase方法,實(shí)現(xiàn)數(shù)據(jù)寫(xiě)入HBase表格。其中zkQuorum為HBase的zookeeper服務(wù)的主機(jī)名配置信息,row為HBase表的行鍵,family為表的列簇,key為表的列,value為列的值。writeToHBase方法為自定義的將數(shù)據(jù)寫(xiě)入HBase的方法。
由上述代碼可看出,在向外部HBase數(shù)據(jù)庫(kù)寫(xiě)數(shù)據(jù)時(shí),通常要先創(chuàng)建與數(shù)據(jù)庫(kù)的連接,并獲取HTable實(shí)例,其中HTable為操作HBase表格的接口,通過(guò)HTable對(duì)象對(duì)HBase表格中的數(shù)據(jù)進(jìn)行增,刪,查詢等操作。對(duì)RDD的每條數(shù)據(jù)調(diào)用writeToHBase進(jìn)行寫(xiě)入操作之前先對(duì)setTable對(duì)象進(jìn)行序列化,即對(duì)每條數(shù)據(jù)都創(chuàng)建了連接,獲取HTable實(shí)例的confTable方法具體代碼如下,其中tableName為建立連接的表格名稱。
2.2 優(yōu)化方法
方法一成功將DStream數(shù)據(jù)寫(xiě)入HBase數(shù)據(jù)庫(kù),但是資源開(kāi)銷較大。Sparking Streaming在向HBase寫(xiě)入數(shù)據(jù)時(shí),必須給每條數(shù)據(jù)都創(chuàng)建一次連接,獲取一個(gè)HTable實(shí)例,但是創(chuàng)建連接是一項(xiàng)非常耗時(shí)的操作,通常耗時(shí)數(shù)秒才能完成。在資源高度緊張的環(huán)境下,每秒都有幾千個(gè)請(qǐng)求,為每條數(shù)據(jù)單獨(dú)創(chuàng)建HTable實(shí)例是非常消耗資源的。基于此提出優(yōu)化方法,減少建立連接與創(chuàng)建HTable實(shí)例的次數(shù),從而降低資源消耗,提高數(shù)據(jù)寫(xiě)入HBase表的效率。方法二的代碼如下:
依據(jù)方法二依次對(duì)countBase執(zhí)行foreachRDD與foreachPartition操作,為每個(gè)分區(qū)創(chuàng)建一個(gè)confTable對(duì)象。對(duì)于RDD一個(gè)分區(qū)內(nèi)的所有數(shù)據(jù),這一個(gè)confTable對(duì)象是共用的。相比于給RDD中所有數(shù)據(jù)都實(shí)例化一個(gè)HTable,方法二明顯減少了實(shí)例創(chuàng)建次數(shù),大大提升了Spark Streaming向HBase寫(xiě)入數(shù)據(jù)的性能。
方法一和方法二的操作流程見(jiàn)圖2和圖3所示。
上圖中方形表示一個(gè)RDD,每一個(gè)橢圓形代表RDD的一個(gè)分區(qū)Partition,分區(qū)里的每個(gè)圓形代表RDD的一條數(shù)據(jù)record。上圖展示了DStream經(jīng)過(guò)foreachRDD操作后對(duì)每個(gè)RDD的操作。由圖2和圖3可知,方法一對(duì)RDD經(jīng)過(guò)foreach操作后對(duì)每條數(shù)據(jù)record都要經(jīng)過(guò)創(chuàng)建連接然后才能寫(xiě)入HBase表格。方法二先對(duì)RDD進(jìn)行foreachPartition操作,然后對(duì)一個(gè)分區(qū)創(chuàng)建一個(gè)連接,連接創(chuàng)建后對(duì)該分區(qū)foreach每條數(shù)據(jù)record進(jìn)行寫(xiě)入操作。對(duì)比兩圖可看出優(yōu)化后的方法創(chuàng)建連接的次數(shù)明顯比原始方法少。而對(duì)于整個(gè)處理任務(wù)來(lái)說(shuō),建立連接,實(shí)例化HTable對(duì)象消耗資源過(guò)多,所以優(yōu)化后的方法二性能大大提升。
HTable是HBase客戶端,實(shí)現(xiàn)對(duì)HBase表的增加 (Create)、查詢(Retrieve)、更新(Update)和刪除(Delete)。但是HTable適合對(duì)單表操作,對(duì)讀或?qū)懖僮鞫疾皇蔷€程安全的。對(duì)于寫(xiě)操作(Put或Delete),如果多線程共享一個(gè)HTable實(shí)例,寫(xiě)緩沖區(qū)可能會(huì)被破壞。對(duì)于讀操作,一些被scan使用的字段同時(shí)被多個(gè)線程共享,如果此時(shí)有Get操作,不能保證數(shù)據(jù)的一致性。而大量數(shù)據(jù)的事實(shí)分析通常是多線程運(yùn)作,為了解決線程安全問(wèn)題,我們使用HTablePool類創(chuàng)建一個(gè)HTable的對(duì)象池,讓多個(gè)HTable實(shí)例共享一個(gè)Configuration,使用時(shí)通過(guò)getTable方法獲取一個(gè)HTable對(duì)象,然后可以進(jìn)行各種增加(Create)、刪除(Delete)和更新(Update)等操作,使用完后調(diào)用close()方法可將HTable對(duì)象歸還到池中。方法三代碼如下:
def confTable(zkQuorum:String,tableName:String): HTable = {
import org.apache.hadoop.fs.Path
val conf = HBaseConfiguration.create()
conf.addResource(new Path(“/etc/HBase/conf/core-site.xml”))
conf.addResource(new Path(“/etc/HBase/conf/HBase-site.xml”))
conf.set(“HBase.zookeeper.quorum”,zkQuorum)
val pool = new HTablePool(conf, SIZE);
val hTable = pool.getTable(tableName);
return hTable
}
由于HTablePool僅僅作為HTable的連接池,里面維護(hù)的HTable使用的Configuration是同一個(gè),所以本質(zhì)上所有的HTable共用同一個(gè)HConnection。而對(duì)于創(chuàng)建一個(gè)連接,HConnection的創(chuàng)建所損耗的資源遠(yuǎn)遠(yuǎn)多于創(chuàng)建一個(gè)HTable。既然HTable的創(chuàng)建是輕量級(jí)的,那么共享一個(gè)HConnection的HTablePool實(shí)際價(jià)值就不大。只要保證HConnection實(shí)例是唯一的,全局共享的,然后在每次操作HBase表時(shí)根據(jù)HConnection對(duì)象來(lái)重新創(chuàng)建,使用完成之后及時(shí)關(guān)閉即可。
最簡(jiǎn)單的創(chuàng)建HConnection實(shí)例的方式是HConnectionManager.createConnection()。HConnectionManager是一個(gè)不可實(shí)例化的類,專門(mén)用于創(chuàng)建HConnection。該方法創(chuàng)建了一個(gè)連接到集群的HConnection實(shí)例,該實(shí)例被創(chuàng)建的程序管理。通過(guò)這個(gè)HConnection實(shí)例,可以使用HConnection.getTable(byte[])方法取得HTableInterface implementations的實(shí)現(xiàn)。方法四的代碼如下:
def confTable(zkQuorum:String,tableName:String): HTableInterface = {
import org.apache.hadoop.fs.Path
val conf = HBaseConfiguration.create()
conf.addResource(new Path(“/etc/HBase/conf/core-site.xml”))
conf.addResource(new Path(“/etc/HBase/conf/HBase-site.xml”))
conf.set(“HBase.zookeeper.quorum”,zkQuorum)
val connection = HConnectionManager.createConnection(conf);
val table = connection.getTable(“tablename”);
return table
}
3 結(jié) 語(yǔ)
本文先給出了海量數(shù)據(jù)的實(shí)時(shí)流處理過(guò)程中將數(shù)據(jù)存儲(chǔ)到HBabe中的方法。通過(guò)Spark Streaming直接將中間數(shù)據(jù)實(shí)時(shí)寫(xiě)入HBase表格,在解決中間數(shù)據(jù)存儲(chǔ)問(wèn)題的同時(shí),確保了流處理過(guò)程的高效率,并進(jìn)一步優(yōu)化此方法,將數(shù)據(jù)的處理速度大大提高,更大程度的提升流處理的效率。
參考文獻(xiàn)
[1] Spark[EB/OL].http://spark.apache.org//. 2015
[2] Apache HBase[EB/OL]. http://HBase.apache.org/. 2015
[3] 卓海藝.基于HBase的海量數(shù)據(jù)實(shí)時(shí)查詢系統(tǒng)設(shè)計(jì)與實(shí)現(xiàn)[D].北京:北京郵電大學(xué), 2013.
[4] 張榆,馬友忠,孟小峰.一種基于HBase的高效空間關(guān)鍵字查詢策略[J].小型微型計(jì)算機(jī)系統(tǒng), 2012,33(10):2141-2146.
[5] 夏俊鸞,邵賽賽.Spark Streaming: 大規(guī)模流式數(shù)據(jù)處理的新貴[EB/OL]. http://www.csdn.net/article/2014-01-28/2818282-Spark-Streaming-big-data. 2014.