王珣
(陜西學前師范學院 教學設(shè)備與實驗室管理處, 西安 710061)
基于Spark平臺的大數(shù)據(jù)挖掘技術(shù)研究
王珣
(陜西學前師范學院 教學設(shè)備與實驗室管理處, 西安 710061)
大數(shù)據(jù)具備數(shù)據(jù)量大、富于多樣性的特點。因此在大數(shù)據(jù)分析方面,無論是對處理速度還是實時性都具有較高的要求。數(shù)據(jù)挖掘技術(shù)是從海量數(shù)據(jù)里采用某種建模算法,用來尋找隱藏在數(shù)據(jù)背后的信息,從而讓大數(shù)據(jù)產(chǎn)生更大的價值。Spark框架是一個針對超大數(shù)據(jù)集合的低延遲的集群分布式計算系統(tǒng)。本文基于該框架,對大數(shù)據(jù)挖掘技術(shù)進行了具體研究,首先完成了基于Yarn部署上Spark集群搭建,然后提出并實現(xiàn)了并行Apriori算法,該算法成功補充了Spark MLlib分布式機器學習庫中所缺乏的關(guān)聯(lián)分析問題的分布式算法。
大數(shù)據(jù); 數(shù)據(jù)挖掘; Spark
如今我國已進入大數(shù)據(jù)時代,每年都會產(chǎn)生海量的數(shù)據(jù)。預計到2020年,我們每年產(chǎn)生的數(shù)據(jù)總量將超過8.5ZB[1],涉及到金融,互聯(lián)網(wǎng),醫(yī)療等各個領(lǐng)域,這必然對大數(shù)據(jù)的挖掘和分析提出了更嚴峻的挑戰(zhàn)。因此,在大數(shù)據(jù)時代,要進行各行各業(yè)、各種應(yīng)用場景的天量數(shù)據(jù)挖掘,必然需要一個效率高、結(jié)果準確的計算平臺來進行處理。
Spark由加州大學伯克利分校的AMP實驗室出品,它是一個開源的計算框架,該框架可用于處理大數(shù)據(jù)的高性能分布式并行計算。Spark的主要優(yōu)點在于:支持Python、Java等多語言編程,使用方便;可在大數(shù)據(jù)集上進行多種復雜查詢;兼容性強,可以兼容Yarn,Mesos等多個框架;處理快速且結(jié)果精確。目前,Spark已經(jīng)被應(yīng)用或?qū)⒁粦?yīng)用到國內(nèi)外很多公司的各類應(yīng)用場景中。MLlib,作為Spark平臺中的分布式機器學習庫,承擔著對機器學習算法實現(xiàn)的功能。它在經(jīng)歷過歷次擴充改進后,正逐步完善。然而,在數(shù)據(jù)挖掘和分布式機器學習方面,傳統(tǒng)的Spark MLlib庫仍有一些缺陷,欠缺關(guān)于關(guān)聯(lián)分析這類的算法內(nèi)容,這給在需要應(yīng)用關(guān)聯(lián)分析處理眾多應(yīng)用場景的大數(shù)據(jù)時帶來諸多不便,因此,我們有必要對相應(yīng)的算法進行改進、擴充和定制,使其能夠更加適用于數(shù)據(jù)挖掘和分布式機器學習技術(shù)。
一般來講,Spark是采用集群模式應(yīng)用于實際生產(chǎn)場景中的,因此構(gòu)建好Spark分布式集群是基于Spark進行大數(shù)據(jù)挖掘技術(shù)的研究與實現(xiàn)的關(guān)鍵。Spark開發(fā)環(huán)境及其分布式集群的構(gòu)建,主要包括以下幾個方面:
1.1 硬件系統(tǒng)要求
為了保證良好的運行性和兼容性,所有構(gòu)建Spark分布式集群所用的物理主機均采用Linux 操作系統(tǒng)。本文采用的測試環(huán)境,由搭建在1臺主機上的3臺虛擬機組成。在此基礎(chǔ)上搭建Spark分布式集群,包括2個Worker節(jié)點和1個Master節(jié)點。Master節(jié)點作為單機編寫和調(diào)試Spark分布式應(yīng)用程序的機器,配置必須高于Worker節(jié)點。Master節(jié)點的機器配置為4G內(nèi)存和4核處理器,Worker節(jié)點機器配置為2G內(nèi)存和2核處理器。各節(jié)點硬盤為基于PCIE[2]的SSD固態(tài)硬盤,這種硬盤讀寫速度快,可以有效提高工作及運行效率。上述集群構(gòu)成形式,既可以減少Spark集群運行成本,降低環(huán)境構(gòu)建失敗概率,又可以根據(jù)需要隨時對節(jié)點數(shù)量進行增減。
1.2 構(gòu)造分布式Spark集群
本文選取Spark版本為Spark1.1。此版本下構(gòu)造分布式Spark集群,首先需要安裝Scala語言,然后將每臺虛擬機上的slaves文件內(nèi)容修改為集群上每個Worker節(jié)點的主機名,并修改集群每個節(jié)點的Spark安裝目錄下的Spark-env.sh文件;接著配置系統(tǒng)的jdk環(huán)境變量,修改系統(tǒng)Scala的安裝路徑為SCALA_HOME;集群中Master節(jié)點的主機名或IP地址采用SPARK_MASTER_IP的屬性值,其他項默認;最后,確保該集群中的所有節(jié)點的Spark-evn.sh文件和slaves文件的內(nèi)容完全相同。以上配置完成后便可通過jps命令查看集群的啟動情況。
1.3 配置Spark的IDE開發(fā)環(huán)境
IDEA作為Scala語言開發(fā)環(huán)境,是良好支持Scala的IDE,故選擇其為Spark應(yīng)用程序的編程和開發(fā)環(huán)境。但為了避免IDEA在使用過程中產(chǎn)生的過量緩存文件過量占用和消耗I/O資源,選擇SSD固態(tài)硬盤存儲文件以提高性能。
馬戴一生羈旅,東游江浙,南極瀟湘吳越,西至汧隴,北抵幽燕大漠,跋山涉水,足跡甚遠,嘗盡仕途坎坷的悲苦辛酸,因此他的羈旅之作除了對山水的描摹和懷古傷今之愁以外,也有懷鄉(xiāng)思歸的深切悲痛,以及和著血淚的生活體驗,這些詩歌不僅是其內(nèi)心情感的集中寫照,也是其政治命運的真實反映,在飽含深情的描繪中,呈現(xiàn)出了既同于中晚唐詩人寫作的共性——精于五律,格律嚴整的藝術(shù)成就,也展現(xiàn)了其羈旅行役詩獨特的典雅、清奇的藝術(shù)風格。
IDEA配置完成后,即可以開始進行Spark程序測試。
2.1 Apriori算法概念和核心步驟
Apriori算法是一種挖掘關(guān)聯(lián)規(guī)則的頻繁項集算法,Apriori算法多次掃描交易數(shù)據(jù)庫,每次利用候選頻繁集產(chǎn)生頻繁集。它的主要步驟可分為定義最小支持度,篩選所有頻繁項集和根據(jù)置信度產(chǎn)生關(guān)聯(lián)規(guī)則。
2.2 Apriori算法基于Spark的分布式實現(xiàn)
Apriori算法基于Spark分布式集群的基本流程圖,如圖1所示。
圖1 分布式Apriori算法的實現(xiàn)流程圖
算法的主要思路為:
(1) 產(chǎn)生頻繁1項集L1。將事務(wù)集T以RDD
(1) 得到頻繁1項集F1并保存,以下為該步驟核心代碼:
valfim1=transactions.flatMap(line=>line).Map((,1)).ReduceByKey(_+_).Filter(_._2>minSupport)savefim1(fim1,output+”result-1”,sum)defsavefim1(fim:RDD[(string,int)],path:string,count:double):Unit={fim.map(line=>{line._1+“:%.2f”.format(line._2/count)}).Coalesce(1,true).SaveAsTextFile(path)}
(2) 頻繁1項集L1自連接產(chǎn)生C1,以C1作為對比,對數(shù)據(jù)庫進行掃描以產(chǎn)生fim2,將fim2保存下來,以下為該步驟核心代碼。
var(trans,newfim)=LItofim2(fimI.rnap(_._1).collect,transactions,minSupport)save(newfim,output+"result-2",sum)defL1tofim2(L1:Array[String],trans:RDD[(String,Int)],minSupport:Double):RDD[(List[String],Int)]={valL1c=Ll.sizevalcitems=scala.collection.mutable.ArrayBuffer[List[String]]()for(i<-0untilLlc){ for(j<-i+1untilLlc){ Citems+=List(L1(i),L1(j)).sortWith(_<_) }}valbccFI=sc.broadcast(citems)valtemp1=transflatMap(linc=>{) vartmp=scala.collection.mutable.Set[(List[String],Int)]() for(citem<-bccFLvalue){ valtc=isContain(line._l.split("").toSet,citem.toSet) if(tc=1){ tmp+=citem一>line._2 }}tmp})valnewfim=temp1.ReduceByKey(_+_).Filter(_._2>minSupport).cachebccFIunpersist()returnnewfim
(3) 循環(huán)產(chǎn)生3項集到8項集。以下為核心代碼。
varfimk=newfim.collectfor(k<一3to8){ valtemp=mine(fimk.map(_._)1),trans,minSup-port) save(temp_2,output+"result-"+k,sum) fimk=temp.2.collect trans=temp._1}defisFrequent(orderitems:List[String],Lmap:scala.col-lection.mutable.Map[...]):Boolean valoCc=orderitems.sizefor(i<-0tooCc一3){ val11=orderitems.slice(0,i) val12}rderitems.slice(i+1,oCc) valkey=11.foldRight(12){(n:String,l2:List[String])=>n::12} valkeyl=key.slice(0,key.size-1)valvalues=Lmap.get(keyl)match{caseSome(n)=>ncaseNone=>List()if(!(values.exists(_=orderitems(oCc-1)))){returnfalse }}returntrue}defcombine(line,List[String]):scala.collection.mutable.ArrayBuffer[List[String]]={valCitems=scala.collection.mutable.ArrayBuffer[List[String]]()valtarray=line.2toArrayvaltc=tarray.sizefor(i<-0untiltc)foro(i<-1+1untiltc)} citems+=(tarray(j):aarray(i):aine._1).sortWith(_<_)}}citemsdefisContain():Int={varcontain=Iset.find(item=>{if(!trans.contains(item)){contain=0true}elseFalse})Contain}
3.1 實驗環(huán)境與條件
分布式Apriori算法的測試環(huán)境為由前面搭建好的Spark on Yarn集群。單機Apriori算法的測試環(huán)境為該集群中的Master節(jié)點。本文以chess標準數(shù)據(jù)集[3]作為待測數(shù)據(jù)集,每一個候選集的編號為該數(shù)據(jù)集每一行的第一個數(shù)字,最小支持度選為85%,頻繁項集K設(shè)為8,然后將所設(shè)計的算法打包并以包的形式傳到Spark上集群上運行,以進行數(shù)據(jù)挖掘。算法運行過程中需要依次輸入數(shù)據(jù)集路徑和輸出文件夾路徑。數(shù)據(jù)集路徑用于輸入數(shù)據(jù)的存儲和管理,自身存放于HDFS上的data文件夾下;輸出文件夾路徑用于存放需要輸入各項頻繁項集的結(jié)果,共包含K個文件夾。每個文件中的內(nèi)容的格式都為“項集:置信度”。
3.2 實驗結(jié)果分析
輸出結(jié)果存放于result-1至result-8這8個文件中,集群中Worker節(jié)點都打開的時候,程序總體運行時間為74 s。每個文件中的項集數(shù)依次為:984、690、517、358、177、105、32和15個,如圖2所示。
圖2 各文件中頻繁項集數(shù)
當最多只有一個Worker節(jié)點工作,其它條件不變時,測試結(jié)果,如圖3所示。
圖3 不同數(shù)量節(jié)點運行時間
從圖3中可以看出,第1類:當Spark集群中只有一個Master節(jié)點和一個Worker節(jié)點時,節(jié)點運行所消耗的時間為108s;第2類:當Spark集群中的兩個Worker節(jié)點同時運行時,所花費的時間為60 s;第3類:單機模式下,也就是當Spark集群只打開Master節(jié)點,兩個Worker節(jié)點都被關(guān)閉時,算法運行所消耗的時間為195 s??梢钥闯?,不同模式下的分布式并行Apriori算法運行具有較大的差異。在算法運行過程中會產(chǎn)生大量的候選集,頻繁與HDFS進行交互,導致時間的消耗。另外圖中第4類情況反映的是當Master節(jié)點運行java編寫的單機Apriori算法時的運行情況,所消耗時間長達759 s。
對于Apriori算法處理相同數(shù)據(jù)集時,Spark集群中的所有節(jié)點在都打開的情況下所消耗的時間遠遠少于單機模式或只有一個Worker節(jié)點和Master節(jié)點打開時所花費的時間,主要原因在于集群中的工作節(jié)點越多使得集群總體配置越高,處理速度自然也就越快。同時Spark支持可伸縮計算的特性也很多提高了原有的大數(shù)據(jù)集的效率。另外,我們也還發(fā)現(xiàn),不同的編程語言也對算法運行結(jié)果有著很大的差別。這是因為Spark框架還支持內(nèi)存計算,部分算法被放入內(nèi)存中計算,使得Apriori算法的效率在原有的基礎(chǔ)上得到極大提高,這也正是Spark框架的優(yōu)勢之一。但Spark集群運行分布式并行Apriori算法一般更適用于處理較大規(guī)模型的數(shù)據(jù)集,在處理小型數(shù)據(jù)集時,Spark集群運行分布式并行Apriori算法的效率要比單機模式下低。原因在于Spark集群在處理數(shù)據(jù)集時,需要頻繁地與HFDS交互,對數(shù)據(jù)進行RDD分塊和封裝,還有DAG備份恢復等一系列工作。所以Spark集群模式更適用于較大型數(shù)據(jù)集情況。
本文對基于Spark的大數(shù)據(jù)挖掘技術(shù)進行了研究,并提出了基于Spark平臺分布式Apriori算法,有效彌補了MLlib化中的不足,即缺少的關(guān)聯(lián)分析類算法,該算法可以應(yīng)用到關(guān)聯(lián)分析大規(guī)模數(shù)據(jù)的場景當中。本文首先搭建起Spark on yarn的分布式Spark生產(chǎn)測試環(huán)境,即由3個以上節(jié)點構(gòu)成的集群,然后再在所搭建好的集群上對文中算法進行了實驗。實驗以經(jīng)典算法Apriori為測試算法,測試對象為GB級別的大數(shù)據(jù)集,采用了Scala語言和Spark RDD的分布式算子分別對其進行編碼并運行,同時還比較了其與Apriori算法在運行Java語言所編寫的單機模式下運行結(jié)果及效率。
[1] 中國產(chǎn)業(yè)調(diào)研網(wǎng).2015年中國大數(shù)據(jù)行業(yè)現(xiàn)狀研究分析與市場前景預測報告[EB/OL].http://www.cir.cn/2015-01/DaShuJuHangYeYanJiuFenXi219.html,2017-01-13.
[2] 道客巴巴.基于Spark的大數(shù)據(jù)挖掘技術(shù)的研究與實現(xiàn)[EB/OL].http://www.doc88.com/p-7758265704891.html,2017-01-14.
[3] 道客巴巴.基于Spark的大數(shù)據(jù)挖掘技術(shù)的研究與實現(xiàn)[EB/OL].http://www.doc88.com/p-7758265704891.html,2017-01-14.
Research on Technology of Big Data Mining Based on Spark
Wang Xun
(Section of Teaching Equipment & Lab Management, Shaanxi Xueqian Normal University, Xi’an 710061, China)
Because big data have the characteristics of large amount of data and rich diversity, it must be demanding large data analysis both in processing speed and real-time requirements. Data mining technology is to use some modeling algorithm from massive data, to look for hidden information behind the data, so that big data can produce greater value. Spark framework is a low latency cluster distributed computing system for super large data sets. Based on the framework, this paper studies the big data mining technology. This paper designs and implements the Yarn deployment on the Spark cluster firstly, and then proposes and implements parallel Apriori algorithm. This algorithm successfully adds to the distributed algorithm of association analysis by the lack of Spark MLlib distributed machine learning repository.
Big data; Data mining; Spark
王珣(1982-),男,漢中人,工程師,經(jīng)濟學碩士,研究方向:信息管理與信息技術(shù)。
1007-757X(2017)06-0064-03
文獻標志碼:
2017.02.05)