鄒紅旭,潘冠華,李 吟
(江蘇自動化研究所,江蘇 連云港 222006)
在互聯(lián)網(wǎng)產(chǎn)生的數(shù)據(jù)量迅速增長的今天,如何準(zhǔn)確高效地挖掘數(shù)據(jù)中蘊(yùn)藏的信息變得越來越有挑戰(zhàn)性[1],推薦系統(tǒng)能有效地解決這一問題。推薦系統(tǒng),是提供信息推薦的系統(tǒng),以降低用戶尋找有效信息的難度[2],增加用戶與網(wǎng)絡(luò)信息交互的體驗感為目的的算法本體。推薦系統(tǒng)始于二十世紀(jì)九十年代,歷經(jīng)了二十多年的發(fā)展后,其理論也在不斷地更新和完善[3],應(yīng)用也變得越來越廣泛。然而,面對當(dāng)今時代爆炸式增長的數(shù)據(jù)量,推薦算法的計算效率面臨很大的挑戰(zhàn),單機(jī)已經(jīng)無法滿足計算需求,因此,如何在多節(jié)點(diǎn)的計算機(jī)集群上高效地并行化運(yùn)行推薦算法成為了重中之重[4]。
為了使協(xié)同過濾算法更能適應(yīng)數(shù)據(jù)稀疏的情況,改進(jìn)相似度計算公式,設(shè)計一種既能適應(yīng)稀疏數(shù)據(jù),又適合進(jìn)行并行化改造的協(xié)同過濾算法,研究其在Spark框架下的并行化實現(xiàn)方案,并進(jìn)行調(diào)優(yōu)和驗證,使其準(zhǔn)確度和計算效率達(dá)到最大化。文獻(xiàn)[5]研究了協(xié)同過濾算法在Spark上的并行化方案,但其在實現(xiàn)過程中采用了將數(shù)據(jù)集在集群上從單個節(jié)點(diǎn)進(jìn)行廣播的方法,當(dāng)數(shù)據(jù)集非常大時該方法會有明顯的瓶頸現(xiàn)象,進(jìn)行廣播的節(jié)點(diǎn)會占用大量的計算時間,而且如果數(shù)據(jù)集大到無法在單一節(jié)點(diǎn)內(nèi)存中存放時,該方法則無法完成計算。因此,提出了避免瓶頸問題的解決方案,采用了完全分布式的計算方法,每個節(jié)點(diǎn)始終只需存儲和計算一小部分?jǐn)?shù)據(jù),提高了系統(tǒng)的可擴(kuò)展性。
谷歌公司于2003年起依次發(fā)表了三篇論文:《Google文件系統(tǒng)(Google file system,GFS)》[6]、《大型結(jié)構(gòu)化數(shù)據(jù)表(big table)》及《大數(shù)據(jù)分布式計算模型(MapReduce)》[7-8]。這三篇論文奠定了大數(shù)據(jù)技術(shù)的基礎(chǔ),后來Apache基金會根據(jù)這三篇論文,設(shè)計了Hadoop分布式數(shù)據(jù)處理系統(tǒng),從而將大數(shù)據(jù)的研究推向了高潮。作為MapReduce計算模型的替代方案,Apache Spark彌補(bǔ)了MapReduce在實現(xiàn)迭代式算法時由于多次進(jìn)行磁盤I/O而降低算法計算效率、無法滿足算法實時性要求的缺陷[9]。
Spark采用了內(nèi)存計算模型,對于迭代式算法,數(shù)據(jù)和計算中間結(jié)果都存儲在內(nèi)存中,只需開始時從磁盤加載一次數(shù)據(jù),之后都直接從內(nèi)存中加載數(shù)據(jù),省去了大量的磁盤I/O時間。Spark的系統(tǒng)架構(gòu)如圖1所示,Spark運(yùn)行程序時首先由Driver向集群管理器申請資源,包括CPU和內(nèi)存資源,獲得集群管理器分配的資源之后,Driver將應(yīng)用程序以task的形式發(fā)放到各個工作節(jié)點(diǎn),工作節(jié)點(diǎn)并行執(zhí)行各個task。
圖1 Spark的整體架構(gòu)
Item-Based推薦算法[10-13]的輸入為用戶-項目評分?jǐn)?shù)據(jù)集,該數(shù)據(jù)集由數(shù)量為m的用戶對數(shù)量為n的項目的評分構(gòu)成,用戶集合用U={u1,u2,…,um}表示,項目集合用I={i1,i2,…,in}表示。評分?jǐn)?shù)據(jù)集是不完善的,每個用戶只評論了部分項目,推薦算法的目的就是完善該數(shù)據(jù)集,預(yù)測出所有用戶對所有項目的評分。計算過程如下:
(1)計算項目間相似度,得到相似度矩陣Simn×n。這里采用修正的余弦相似度計算方法,公式如下:
(1)
(2)根據(jù)相似度進(jìn)行評分預(yù)測。使用Park采用的預(yù)測方法,計算公式如下:
如式(1)所示,相似度的計算要用到兩個項目之間的公共用戶,即集合Uij里的用戶u分別對項目i、j的評分,但當(dāng)數(shù)據(jù)稀疏時[14-16],兩個項目之間的公共用戶的數(shù)目可能會很小,導(dǎo)致相似度的計算會出現(xiàn)很大的偏差。極端情況下,當(dāng)Uij中只有一個元素時,根據(jù)式(1)計算得到的相似度為1,表示兩個項目間的相似度為100%,無論這兩個項目在實際中是否相似,都將得到這個結(jié)果。而在后續(xù)的計算中,這個值為1的相似度由于是最大值,又必然會被選入KNNi中,于是會對計算結(jié)果Pu,i的準(zhǔn)確度產(chǎn)生很大的干擾。
針對上述問題,提出改進(jìn)的相似度計算公式,在原公式(1)的基礎(chǔ)上乘以一個權(quán)重函數(shù)f(x),x表示集合Uij中元素的數(shù)目,即兩項目間的公共用戶數(shù)目,目的是對當(dāng)x很小時計算得出的相似度加一個很小的權(quán)重,使其盡量不被選入KNNi中,或者即使被選入KNNi中,也無法對Pu,i的計算產(chǎn)生很大的影響。另一方面,當(dāng)兩個項目間公共用戶數(shù)目很少時,也能從一定程度上說明這兩個項目間的相似度很小,所以乘以權(quán)重函數(shù)f(x)之后得到的相似度將更加接近這兩個項目間的真實相似度。f(x)的選取要滿足以下兩個條件:
(1)f(x)是關(guān)于x的增函數(shù);
(2)當(dāng)x足夠大時f(x)隨x的增長要趨于收斂,因為此時根據(jù)式(1)計算得到的相似度已接近真實值,f(x)的值必須趨于穩(wěn)定,才能保證加權(quán)之后對計算結(jié)果Pu,i產(chǎn)生的干擾達(dá)到最小。
考慮以上兩點(diǎn),選取f(x)=ln(x),改進(jìn)后的相似度計算公式如下:
ln(x)
(3)
其中,x表示集合Uij中元素的數(shù)目。
采用的分布式環(huán)境為Spark+Yarn模式,Spark作為分布式計算框架,Yarn作為集群管理器,根據(jù)集群配置,Spark向Yarn申請10個executor作為計算資源,每個executor包括一個CPU與500 M內(nèi)存。在Spark中,數(shù)據(jù)被切分為若干個分區(qū),不同的分區(qū)位于不同的executor中,每個executor只需對自己分區(qū)中的數(shù)據(jù)進(jìn)行計算,從而達(dá)到分布式計算的目的。分區(qū)數(shù)目的選擇是影響Spark計算效率的一個關(guān)鍵因素,分區(qū)數(shù)目太多會導(dǎo)致數(shù)據(jù)的混洗(shuffle)過程消耗更長的時間,而分區(qū)數(shù)目太少則會使join操作更加耗費(fèi)時間并且降低系統(tǒng)容錯性。在本次實驗中分區(qū)數(shù)目選取為50個。計劃好資源的申請之后就可以用如下語句啟動spark:
spark-submit --master yarn --executor-memory 500 M --executor-cores 1 --num-executors 10
Spark主要通過把數(shù)據(jù)集抽象成RDD(resilient distributed dataset,彈性分布式數(shù)據(jù)集)對象來操作數(shù)據(jù),通過對RDD的操作來間接操作數(shù)據(jù)。相似度的計算可以分為以下四個步驟:
(1)從HDFS(Hadoop distributed file system,Hadoop分布式文件系統(tǒng))中讀取包含訓(xùn)練數(shù)據(jù)的文件。需要用到數(shù)據(jù)文件中userId、itemId、rate這三個字段,分別表示用戶編號、項目編號、用戶對項目的評分。將數(shù)據(jù)利用textFile()方法讀入內(nèi)存,并轉(zhuǎn)化為格式為(userId,(itemId,rate))的元組組成的RDDdata。
(2)依次利用map、reduceByKey、mapValues算子對RDDdata操作得到RDDavgrate,該RDD代表用戶評分均值,其元素的格式為(userId,averageRate)。因為后面要多次用到該RDD,為了避免Spark重復(fù)計算該RDD,利用persist操作將其持久化到內(nèi)存中。
(3)用join算子對RDDdata和RDDavgrate進(jìn)行等值連接,將得到的結(jié)果再次與RDDdata進(jìn)行等值連接,得到RDDjoin,其元素的格式為(userId,(itemId1,rate1,itemId2,rate2,averageRate)),itemId1、itemId2表示該用戶共同評價過的兩個項目,rate1、rate2表示評分。
(4)對RDDjoin進(jìn)行map操作,將其元素的格式轉(zhuǎn)換為(itemId1_itemId2,rate1,rate2,averageRate),然后對其進(jìn)行combineByKey操作得到RDDsimilarity,其元素的格式為(itemId1_itemId2,similarity),similarity表示itemId1和itemId2之間的相似度。
相似度計算過程中用到的RDD的譜系圖如圖2所示。
圖2 相似度計算過程中的RDD譜系圖
評分的預(yù)測可以分為以下幾個步驟:
(1)對RDDdata進(jìn)行combineByKey操作,得到RDDitemList,其元素的格式為(userId,ArrayBuffer[(itemId,rate)]),其中ArrayBuffer表示scala語言中的變長數(shù)組,ArrayBuffer[(itemId,rate)]表示該用戶評價過的所有項目和對應(yīng)評分組成的數(shù)組。
(2)從HDFS中讀取測試數(shù)據(jù)集RDDtest,并與RDDitemList進(jìn)行join操作,再對結(jié)果進(jìn)行map操作得到RDDtestList,其元素的格式為(itemId1,(userId,ArrayBuffer[(itemId2,rate)])),其中itemId1、userId分別表示要進(jìn)行預(yù)測評分的用戶和項目,需要注意的是itemId1來自測試集而itemId2來自訓(xùn)練集。最后再對RDDtestList進(jìn)行combineByKey聚合操作,使其鍵保持不變,值聚合到一個ArrayBuffer中,方便后續(xù)計算。
(3)依次利用map、reduceByKey、mapValues算子對RDDdata操作得到RDDitemavg,代表項目評分均值,其元素的格式為(itemId,itemAverageRate)。
(4)對RDDsimilarity和RDDitemavg進(jìn)行join操作,再對結(jié)果進(jìn)行flatMap、combineByKey操作得到RDDknn,代表項目的最近鄰居集,其元素的格式為(itemId1,ArrayBuffer[(itemId2,similarity,itemAverageRate)])。
(5)對RDDitemavg、RDDknn、RDDtestList進(jìn)行join操作,再對結(jié)果進(jìn)行mapValues、flatMap操作得到最終結(jié)果RDDresult,其元素格式為(itemId,userId,result),result表示最終預(yù)測的評分。
評分預(yù)測過程中用到的RDD的譜系圖如圖3所示。
圖3 評分預(yù)測過程中的RDD譜系圖
根據(jù)上述步驟可知,計算過程中涉及到很多的join操作,而join操作會消耗很多的計算資源[17-18],這主要是因為join操作會進(jìn)行多次的迭代來匹配相同鍵值的元素,因此有必要對其進(jìn)行優(yōu)化。假設(shè)要對RDD1和RDD2進(jìn)行連接操作,其元素數(shù)目分別為x1和x2,如果采用join算子會進(jìn)行共x1*x2次迭代。設(shè)計自定義的Hash_Join函數(shù)替代join操作,Hash_Join函數(shù)原理如下:
假設(shè)RDD1的元素格式為(Key1,Value1),RDD2的元素格式為(Key2,Value2),首先對兩個RDD進(jìn)行map操作,利用自定義的Hash函數(shù)對兩個RDD的Key進(jìn)行轉(zhuǎn)換,使其映射到集合Buckets,原有的Key放入Value中。兩個RDD中相同的Key會被映射到相同的Bucket中。Key集合與Buckets集合是多對一的關(guān)系,目的是為了將多個Key對應(yīng)的內(nèi)容聚集到一個Bucket中。然后再對這兩個RDD進(jìn)行join操作,把兩個RDD中相同Bucket的內(nèi)容匹配到一起,最后再把每個Bucket中相同Key的內(nèi)容匹配到一起。Hash_Join函數(shù)原理如圖4所示。
假設(shè)Key的數(shù)量是Bucket數(shù)量的a倍,則每個Bucket含有a條元素,優(yōu)化后的迭代次數(shù)如下:
(4)
在對MovieLens數(shù)據(jù)集進(jìn)行測試時,進(jìn)行連接操作的數(shù)據(jù)集規(guī)模大小為百萬級,此時取a的大小為200,可以看出,優(yōu)化后的迭代次數(shù)明顯小于x1*x2。
圖4 Hash_Join原理
實驗使用UCI的公用數(shù)據(jù)集MovieLens對算法進(jìn)行測試,該數(shù)據(jù)集有三種不同大小的數(shù)據(jù),其數(shù)據(jù)規(guī)模分別為100 k、1 M和10 M,實驗中采用規(guī)模為1 M的數(shù)據(jù)集,其中包含了6 040位用戶對3 900部電影的評分,評分記錄一共有1 000 209條。實驗用到的Spark集群部署在5臺服務(wù)器上,使用的集群管理器為Yarn管理器,通過向Yarn申請executor來獲取計算資源,實驗中申請的executor數(shù)目為10個,為每個executor分配一個CPU,CPU型號為Intel(R) Xeon(R) CPU E5-2630 v3 @2.40 GHz,每個executor分配的內(nèi)存為500 M。
(1)算法準(zhǔn)確度測試。
為了衡量預(yù)測評分的準(zhǔn)確性,采用平均絕對誤差(MAE)來計算預(yù)測結(jié)果的誤差。先后把數(shù)據(jù)集分成不同比例的訓(xùn)練集和測試集,分別用改進(jìn)前的算法與改進(jìn)后的算法對測試集進(jìn)行預(yù)測,并分別對預(yù)測結(jié)果進(jìn)行MAE的計算,結(jié)果如表1所示。
表1 不同的訓(xùn)練集和測試集比例下兩種算法的MAE值
從結(jié)果中可以看出,改進(jìn)后的算法預(yù)測結(jié)果的MAE要小于改進(jìn)前的算法,說明改進(jìn)后的算法提高了預(yù)測結(jié)果的準(zhǔn)確度,并且可以看到,隨著訓(xùn)練數(shù)據(jù)集的比例逐漸減小,即訓(xùn)練數(shù)據(jù)越發(fā)稀疏時,兩種算法的MAE的差值逐漸增大,說明改進(jìn)后的算法對預(yù)測準(zhǔn)確度的提升愈加明顯,從而說明了改進(jìn)后的算法更加適合數(shù)據(jù)稀疏的情況。
(2)算法執(zhí)行時間測試。
在Spark集群上分別運(yùn)行未優(yōu)化的算法和優(yōu)化等值連接操作后的算法,并與單機(jī)運(yùn)行算法的時間進(jìn)行對比,取訓(xùn)練集與測試集比例為9∶1,數(shù)據(jù)集規(guī)模逐漸增加,實驗結(jié)果如表2所示。
表2 運(yùn)行時間對比
從結(jié)果中可以看出,當(dāng)數(shù)據(jù)規(guī)模較小時,集群算法的運(yùn)行時間并沒有比單機(jī)算法短很多,加速比并不是很高,這主要是因為Spark啟動作業(yè)、分配任務(wù)等系統(tǒng)操作以及集群上各節(jié)點(diǎn)之間的通信、互相傳遞數(shù)據(jù)等操作占用了額外的運(yùn)行時間,但隨著數(shù)據(jù)規(guī)模的擴(kuò)大,加速比逐漸提高并趨于穩(wěn)定。
經(jīng)過等值連接優(yōu)化的算法在數(shù)據(jù)規(guī)模較小時表現(xiàn)并不突出,因為在用自定義的Hash函數(shù)把兩個RDD的Key轉(zhuǎn)換為Bucket時要付出額外的計算成本,但隨著數(shù)據(jù)規(guī)模的擴(kuò)大,其性能表現(xiàn)逐漸超過了未優(yōu)化的算法,并且數(shù)據(jù)規(guī)模越大,其相比未優(yōu)化的算法節(jié)省的計算時間越多。
設(shè)計了一種在Spark平臺上運(yùn)行的改進(jìn)的Item-Based協(xié)同過濾算法,使其更適合數(shù)據(jù)稀疏的情況,并對算法中涉及的等值連接操作進(jìn)行了優(yōu)化,提高算法效率。用MovieLens數(shù)據(jù)集對算法進(jìn)行測試后得到的結(jié)果表明,算法在準(zhǔn)確度和效率方面都有更好的表現(xiàn),說明了算法既能更準(zhǔn)確地得到預(yù)測結(jié)果,又能適應(yīng)分布式計算平臺,更快地得到計算結(jié)果。