俞華鋒
(浙江經(jīng)濟(jì)職業(yè)技術(shù)學(xué)院,浙江 杭州 310018)
在大數(shù)據(jù)處理領(lǐng)域中,Spark 云平臺(tái)越來越受到歡迎, 現(xiàn)在已經(jīng)演變成一個(gè)高速發(fā)展應(yīng)用廣泛的計(jì)算平臺(tái),在各大電子商務(wù)網(wǎng)站都有使用。 Spark 云平臺(tái)適用于大數(shù)據(jù)處理的各個(gè)場(chǎng)合,與Hadoop 平臺(tái)有相似的地方, 但優(yōu)于它的是,Spark 計(jì)算的中間結(jié)果可以保存在內(nèi)存中, 從而不再需要讀寫HDFS, 因此Spark 能更好地適用于大數(shù)據(jù)領(lǐng)域的離線批處理、 數(shù)據(jù)挖掘、 機(jī)器學(xué)習(xí)、SQL 類處理、流式/實(shí)時(shí)計(jì)算、圖計(jì)算等各種不同類型的需要迭代計(jì)算的地方。
通過Spark 云平臺(tái)可以使得處理大數(shù)據(jù)的任務(wù)執(zhí)行的很快, 處理性能和效率很高。 當(dāng)然如果我們要使用Spark 開發(fā)出高效率和高性能的云計(jì)算平臺(tái), 就必須對(duì)其各個(gè)方面進(jìn)行合理的設(shè)置和優(yōu)化, 否則Spark平臺(tái)的執(zhí)行效率可能會(huì)很低。 因此,如果要發(fā)揮Spark本身的優(yōu)勢(shì), 就必須對(duì)其各個(gè)方面進(jìn)行綜合分析,并進(jìn)行合理的設(shè)置和優(yōu)化, 才能提高其性能。 本文主要探討如何設(shè)置和優(yōu)化Spark 平臺(tái),來提高其性能,從而來提高大數(shù)據(jù)計(jì)算作業(yè)的執(zhí)行速度和執(zhí)行效率。
Spark 云平臺(tái)是處理Stream 數(shù)據(jù)的框架, 它是將數(shù)據(jù)分割成很小的時(shí)間片斷,以batch 批量處理的方式來處理Stream 數(shù)據(jù)。 這種批量處理的方式使得它可以同時(shí)兼顧實(shí)時(shí)和批量數(shù)據(jù)處理的邏輯和算法, 方便了需要將歷史和實(shí)時(shí)數(shù)據(jù)進(jìn)行挖掘和分析的應(yīng)用場(chǎng)合。Spark 云平臺(tái)通過序列化及類加載機(jī)制, 運(yùn)行在JAVA虛擬機(jī)上, 采用分布式方式執(zhí)行各個(gè)任務(wù)。 執(zhí)行任務(wù)的流程如圖1 所示。
圖1 執(zhí)行任務(wù)的流程圖
在Client 端機(jī)子上通過spark-submit 命令提交任務(wù)給 Master 機(jī)子后, 就會(huì)啟動(dòng)一個(gè)屬于該任務(wù)的Driver進(jìn)程。 Driver 進(jìn)程根據(jù)部署模式,可能在本地機(jī)子上啟動(dòng),也可能在分布式集群中的某個(gè)Worker 工作節(jié)點(diǎn)上啟動(dòng)。 Driver 進(jìn)程啟動(dòng)以后,它向集群管理器申請(qǐng)執(zhí)行Spark 任務(wù)所需的資源Executor 進(jìn)程。 集群管理器收到Driver 進(jìn)程的申請(qǐng)后, 會(huì)根據(jù)申請(qǐng)的參數(shù), 在不同的Worker 節(jié)點(diǎn)上,啟動(dòng)相應(yīng)數(shù)量的Executor 進(jìn)程。
Driver 進(jìn)程申請(qǐng)到了所需的資源之后,便開始調(diào)度和執(zhí)行任務(wù)代碼。 Driver 進(jìn)程首先將Spark 的任務(wù)代碼拆分成很多部分,每一部分稱之為stage,即每個(gè)stage對(duì)應(yīng)一部分代碼。 同時(shí)每個(gè)stage 新建一批子任務(wù),然后由各個(gè)Worker 節(jié)點(diǎn)上的Executor 進(jìn)程來執(zhí)行這些子任務(wù)。 第一個(gè)stage 的所有子任務(wù)執(zhí)行完畢之后,就將中間計(jì)算結(jié)果存儲(chǔ)到Worker 節(jié)點(diǎn)的本地磁盤中。 一個(gè)stag 執(zhí)行完以后,Driver 進(jìn)程就會(huì)執(zhí)行下一個(gè)stage,一直執(zhí)行到全部任務(wù)執(zhí)行完畢, 并且計(jì)算完所有的數(shù)據(jù)才停止。
Spark 的性能優(yōu)化, 需要我們根據(jù)不同的大數(shù)據(jù)應(yīng)用場(chǎng)景,對(duì)Spark 的各項(xiàng)任務(wù)進(jìn)行綜合的考慮,并提供多方面的技術(shù)解決方案來進(jìn)行優(yōu)化, 才能獲得最佳性能。 本文主要從開發(fā)Spark 任務(wù)時(shí)的優(yōu)化、資源調(diào)度時(shí)的優(yōu)化設(shè)置和數(shù)據(jù)傾斜時(shí)的優(yōu)化處理這三個(gè)方面進(jìn)行探討。
在開發(fā)Spark 任務(wù)時(shí),應(yīng)根據(jù)具體的業(yè)務(wù)以及實(shí)際的應(yīng)用場(chǎng)景, 將一些性能優(yōu)化的基本原則靈活地運(yùn)用到Spark 任務(wù)中,例如,避免RDD 的重復(fù)設(shè)計(jì),合理的配置Spark 的各個(gè)參數(shù)以及一些特殊操作的優(yōu)化等等。
在開發(fā)一個(gè)Spark 任務(wù)時(shí),首先根據(jù)任務(wù)相對(duì)應(yīng)的數(shù)據(jù)源創(chuàng)建一個(gè)初始的彈性分布式數(shù)據(jù)集RDD,然后對(duì)創(chuàng)建的這個(gè)RDD 執(zhí)行映射或歸約的操作,得到下一個(gè)中間的彈性分布式數(shù)據(jù)集RDD,然后對(duì)中間的RDD再執(zhí)行映射或歸約的操作, 直到計(jì)算出最終的結(jié)果。在上述的循環(huán)往復(fù)的操作過程中,不同的 RDD 會(huì)通過映射或歸約操作得到一系列的RDD 串。對(duì)于同一份數(shù)據(jù)源一般只應(yīng)該創(chuàng)建一個(gè)RDD, 如果創(chuàng)建了多個(gè)RDD, Spark 云平臺(tái)會(huì)對(duì)不同的RDD 分別進(jìn)行計(jì)算,得到的結(jié)果相似,失去了參考價(jià)值,因此增加了Spark 任務(wù)的資源開銷。
在設(shè)計(jì)RDD 時(shí), 除了上述原則外, 還要在對(duì)不同的數(shù)據(jù)執(zhí)行映射或歸約的操作時(shí), 盡量地復(fù)用同一個(gè)已經(jīng)存在的RDD。 例如,已經(jīng)創(chuàng)建了一個(gè)
根據(jù)具體的業(yè)務(wù)開發(fā)好Spark 任務(wù)代碼后,就應(yīng)該為相應(yīng)的任務(wù)配置相應(yīng)的資源。 我們可以通過sparksubmit 命令來設(shè)置特定任務(wù)的資源參數(shù)。 如果資源參數(shù)設(shè)置不合理, 就會(huì)導(dǎo)致集群的資源沒有發(fā)揮應(yīng)有的性能, 任務(wù)執(zhí)行會(huì)比較緩慢。 如果設(shè)置的資源參數(shù)過大,超過了集群能夠提供的極限,就會(huì)出現(xiàn)各種異常??傊灾Y源參數(shù)要設(shè)置合理,否則就會(huì)導(dǎo)致Spark任務(wù)的執(zhí)行效率低下, 無法達(dá)到預(yù)期的性能。 因此我們需要對(duì)資源參數(shù)進(jìn)行設(shè)置和優(yōu)化處理。
怎么樣對(duì)Spark 的資源參數(shù)進(jìn)行優(yōu)化配置呢?主要是通過調(diào)節(jié)和優(yōu)化num-executors 和executor-memory 等參數(shù),來提高資源使用的效率,發(fā)揮集群的優(yōu)勢(shì),從而提升Spark 任務(wù)的執(zhí)行性能。
num-executors 參數(shù)的作用是,設(shè)置Spark 執(zhí)行一個(gè)任務(wù)時(shí)需要執(zhí)行多少個(gè)Executor 進(jìn)程。 Driver 進(jìn)程在向Spark 云平臺(tái)申請(qǐng)資源時(shí),系統(tǒng)會(huì)按照num-executors 參數(shù)設(shè)置的數(shù)量,在各個(gè)worker 工作節(jié)點(diǎn)上,啟動(dòng)numexecutors 個(gè)Executor 進(jìn)程。 這個(gè)參數(shù)如果不設(shè)置的話,系統(tǒng)只會(huì)啟動(dòng)很少量的Executor 進(jìn)程, 這樣就會(huì)導(dǎo)致運(yùn)行效率非常低,速度非常慢。num-executors 參數(shù)一般設(shè)置50~100 比較合適,執(zhí)行任務(wù)時(shí)集群管理器會(huì)啟動(dòng)50~100 個(gè)左右的Executor 進(jìn)程, 大部分隊(duì)列可以得到充分的資源,達(dá)到性能最優(yōu)化。 當(dāng)然如果設(shè)置的太少,就發(fā)揮不了集群資源的優(yōu)勢(shì),造成資源浪費(fèi)。
executor-memory 參數(shù)用于設(shè)置Executor 進(jìn)程的內(nèi)存。 Executor 內(nèi)存的大小很大程度上影響著Spark 任務(wù)執(zhí)行的速度。 我們可以把Executor 進(jìn)程的內(nèi)存大小設(shè)置為4-8G, 具體設(shè)置多少還得根據(jù)資源隊(duì)列的最大內(nèi)存限制是多少。 num-executors 和executor-memory 的積就是某個(gè)Spark 任務(wù)執(zhí)行的總內(nèi)存量, 如果超過了隊(duì)列的最大內(nèi)存量,性能也會(huì)下降。
在大數(shù)據(jù)業(yè)務(wù)處理中經(jīng)常會(huì)遇到的問題是數(shù)據(jù)傾斜。 例如, 在進(jìn)行shuffle 操作時(shí), 可能會(huì)出現(xiàn)這種情況,大部分key 對(duì)應(yīng)幾條數(shù)據(jù),系統(tǒng)很快就處理完了,但是個(gè)別key 可能對(duì)應(yīng)了百萬級(jí)別的數(shù)據(jù), 系統(tǒng)可能需要花費(fèi)很長時(shí)間來處理。 而最長的task 花費(fèi)的時(shí)間決定了整個(gè)Spark 任務(wù)的執(zhí)行時(shí)間,此時(shí)的Spark 任務(wù)的執(zhí)行時(shí)間會(huì)很長。 數(shù)據(jù)傾斜的優(yōu)化處理就是使用各種解決方案來解決數(shù)據(jù)傾斜的問題, 以縮短任務(wù)的執(zhí)行時(shí)間,從而保證Spark 的執(zhí)行效率和性能。
由于大數(shù)據(jù)計(jì)算業(yè)務(wù)的需要, 經(jīng)常會(huì)對(duì)Hive 數(shù)據(jù)源執(zhí)行分析操作。 由于Hive 數(shù)據(jù)源中的數(shù)據(jù)不均勻,出現(xiàn)數(shù)據(jù)傾斜的幾率非常大, 在這種情況下, 我們可以先對(duì)數(shù)據(jù)根據(jù)key 進(jìn)行聚合操作, 即所謂的ETL 預(yù)處理, 然后,Spark 再對(duì)ETL 預(yù)處理之后的數(shù)據(jù)進(jìn)行處理。 由于Spark 處理的數(shù)據(jù)是聚合后的數(shù)據(jù),它就不需要使用原先的shuffle 操作了,也不會(huì)發(fā)生數(shù)據(jù)傾斜了。
如果在shuffle 操作時(shí), 就少數(shù)幾個(gè)鍵值會(huì)造成數(shù)據(jù)傾斜, 當(dāng)然這少數(shù)幾個(gè)鍵值對(duì)任務(wù)本身的影響不大的話, 我們可以過濾掉這幾個(gè)鍵值。 因?yàn)檫@些鍵值被丟棄了,就不參加運(yùn)算了,也就不會(huì)產(chǎn)生數(shù)據(jù)傾斜。 例如, 我們可以使用where 子句過濾掉上述的鍵值,在Spark Core 中對(duì)RDD 也執(zhí)行相同的過濾操作, 過濾掉產(chǎn)生數(shù)據(jù)傾斜的鍵值。
本文首先闡述了Spark 云平臺(tái)性能優(yōu)化的意義,然后闡述了Spark 運(yùn)行的基本原理,最后探討了Spark 云平臺(tái)性能的優(yōu)化方法。 希望能對(duì)Spark 云平臺(tái)的研究提供一定的參考。 當(dāng)然本文只是簡單的提出了性能優(yōu)化的一些方法, 具體的實(shí)現(xiàn)和優(yōu)化處理的方法有待進(jìn)一步的研究與完善。