吳海濤 廖 兵 王 丹 李張?zhí)?周文進(jìn)
(1.四川明星電力股份有限公司,四川 遂寧 629000 2.北京國(guó)網(wǎng)信通埃森哲信息技術(shù)有限公司,北京 100031)
目前數(shù)據(jù)中心的數(shù)據(jù)抽取方式大致為全量數(shù)據(jù)接入方式或基于全量對(duì)比的數(shù)據(jù)抽取技術(shù)。該方式處理效率低下,經(jīng)常會(huì)造成數(shù)據(jù)積壓嚴(yán)重,通常無(wú)法達(dá)到數(shù)據(jù)的實(shí)時(shí)性抽取任務(wù)要求。針對(duì)該問(wèn)題,本系統(tǒng)對(duì)現(xiàn)今的增量抽取技術(shù)進(jìn)行了研究,提出一種Change Data Capture (CDC)結(jié)合GPkafka的實(shí)時(shí)數(shù)據(jù)接入抽取技術(shù),極大地提升了目前明星電力公司中臺(tái)數(shù)據(jù)抽取實(shí)時(shí)性,達(dá)到了零數(shù)據(jù)積壓的數(shù)據(jù)入庫(kù)要求。
CDC又稱(chēng)變更數(shù)據(jù)捕獲(Change Data Capture)。CDC有兩個(gè)模式:同步和異步。同步CDC主要是采用觸發(fā)器記錄新增數(shù)據(jù),基本能夠做到實(shí)時(shí)增量抽取。而異步CDC則是通過(guò)分析已經(jīng)commit的日志記錄來(lái)得到增量數(shù)據(jù)信息,有一定的時(shí)間延遲,并且提供了到Oracle Streams的接口。同步相對(duì)比較簡(jiǎn)單,通過(guò)觸發(fā)器捕獲增量數(shù)據(jù)。而異步CDC根據(jù)實(shí)現(xiàn)的內(nèi)部機(jī)制區(qū)別,又可以分為異步HotLog模式,異步分布式HotLog模式和異步AutoLog模式[1]。
2.1 Kafka。Kafka是一個(gè)分布式消息隊(duì)列。Kafka對(duì)消息保存時(shí)根據(jù)Topic進(jìn)行歸類(lèi),發(fā)送消息者稱(chēng)為Producer,消息接受者稱(chēng)為Consumer。Producer即生產(chǎn)者,向Kafka集群發(fā)送消息,在發(fā)送消息之前,會(huì)對(duì)消息進(jìn)行分類(lèi)。Topic即主題,通過(guò)對(duì)消息指定主題可以將消息分類(lèi),消費(fèi)者可以只關(guān)注自己需要的Topic中的消息。Consumer即消費(fèi)者,消費(fèi)者通過(guò)與kafka集群建立長(zhǎng)連接的方式,不斷地從集群中拉取消息,然后可以對(duì)這些消息進(jìn)行處理。
2.2 GPSS。Greenplum Stream Server(GPSS)是 一 個(gè)ETL(提取、轉(zhuǎn)換、加載)工具。GPSS服務(wù)器的一個(gè)實(shí)例從一個(gè)或多個(gè)客戶(hù)機(jī)接收流數(shù)據(jù),使用Greenplum數(shù)據(jù)庫(kù)可讀的外部表將數(shù)據(jù)轉(zhuǎn)換并插入到目標(biāo)Greenplum表中。數(shù)據(jù)源和數(shù)據(jù)格式是特定于客戶(hù)機(jī)的。數(shù)據(jù)源和數(shù)據(jù)格式由客戶(hù)端指定[2]。
整體技術(shù)流程圖如圖1所示
圖1 基于異步聯(lián)機(jī)日志結(jié)合流處理的實(shí)時(shí)數(shù)據(jù)抽取實(shí)現(xiàn)
系統(tǒng)采用可視化數(shù)據(jù)抽取流程建模技術(shù)實(shí)現(xiàn)了以上的數(shù)據(jù)抽取過(guò)程??梢暬瘮?shù)據(jù)抽取技術(shù)通過(guò)組件連接的方式形成數(shù)據(jù)處理管道。
整體數(shù)據(jù)實(shí)時(shí)抽取流程由兩個(gè)管道組成,分別是數(shù)據(jù)生產(chǎn)者和數(shù)據(jù)消費(fèi)者。
3.1 數(shù)據(jù)生產(chǎn)者管道。通過(guò)解析異步聯(lián)機(jī)日志,CDC感知數(shù)據(jù)源庫(kù)Oracle中表的數(shù)據(jù)更新、增加和刪除操作,并將操作流轉(zhuǎn)換成數(shù)據(jù)流發(fā)送到Kafka中。通過(guò)CDC讀取到的日志進(jìn)行SQL解析與轉(zhuǎn)換,將數(shù)據(jù)解析為對(duì)應(yīng)數(shù)據(jù)的JSON對(duì)象,并標(biāo)記該數(shù)據(jù)是增加、修改還是刪除數(shù)據(jù),然后將數(shù)據(jù)存入到Kafka中。需要注意的是,存儲(chǔ)到Kafka中的數(shù)據(jù),不同來(lái)源表的數(shù)據(jù)需要存儲(chǔ)到不同的Topic中,相當(dāng)于每個(gè)Topic中僅存儲(chǔ)固定的一張?jiān)幢淼脑隽繑?shù)據(jù)信息,一般情況下,使用Oracle的schema名和table名聯(lián)合起來(lái)作為T(mén)opic的名稱(chēng)[3]。
3.2 數(shù)據(jù)消費(fèi)者管道。GPKafka通過(guò)Job方式將導(dǎo)數(shù)的配置提交到GPSS,GPSS讀取Kafka對(duì)應(yīng)的Topic數(shù)據(jù),并寫(xiě)入對(duì)應(yīng)的Grennplum數(shù)據(jù)庫(kù),然后將Job的調(diào)度的信息存入到本地文件中。GPKafka消費(fèi)者根據(jù)配置的目標(biāo)倉(cāng)庫(kù)的schema和table信息,自動(dòng)生成對(duì)應(yīng)的Job配置文件,然后提交Job至GPSS中進(jìn)行執(zhí)行。GPSS接收到Job信息后,開(kāi)始根據(jù)Job配置讀取Kafka中對(duì)應(yīng)Topic中的數(shù)據(jù),然后寫(xiě)入到GreenPlum數(shù)據(jù)庫(kù)中。當(dāng)同時(shí)運(yùn)行的Job數(shù)量過(guò)多時(shí),GPKafka消費(fèi)者會(huì)根據(jù)實(shí)際情況,暫停和調(diào)度不同的Job運(yùn)行情況,以滿(mǎn)足當(dāng)前數(shù)據(jù)抽取業(yè)務(wù)的需要。最后將流程運(yùn)行階段性情況和調(diào)度情況寫(xiě)入到本地文件系統(tǒng)中[4]。
數(shù)據(jù)來(lái)源:明星電力公司中臺(tái)數(shù)據(jù),大小為從100M-5G單表。
測(cè)試環(huán)境:1臺(tái)Oracle數(shù)據(jù)庫(kù)服務(wù)器,1臺(tái)大數(shù)據(jù)中臺(tái)服務(wù)器,1個(gè)GreenPlum實(shí)例(6臺(tái)服務(wù)器,1個(gè)Master,24個(gè)Segment),Kafka集群(3臺(tái)服務(wù)器節(jié)點(diǎn)),服務(wù)器物理內(nèi)存64G,內(nèi)網(wǎng)帶寬1Gbps。測(cè)試所用源庫(kù)表以完全入庫(kù)完畢。
測(cè)試方法:對(duì)比方法:
第一,CDC讀取數(shù)據(jù)庫(kù)數(shù)據(jù)變化日志,然后通過(guò)傳統(tǒng)JDBC直接寫(xiě)入Grennplum數(shù)據(jù)庫(kù),后面稱(chēng)為(CDC+JDBC);第二,我們系統(tǒng)采用的方法:通過(guò)CDC讀取數(shù)據(jù)庫(kù)數(shù)據(jù)變化日志,然后通過(guò)GPKafka直接寫(xiě)入Grennplum數(shù)據(jù)庫(kù),后面稱(chēng)為(CDC+GPKAFKA);第三,通過(guò)JDBC實(shí)現(xiàn)全量抽取,并通過(guò)Greenplum存儲(chǔ)組件將數(shù)據(jù)寫(xiě)入Grennplum數(shù)據(jù)庫(kù),后面稱(chēng)為(JDBC全量抽?。5]。
實(shí)現(xiàn)對(duì)比結(jié)果如表3、表4所示。
表3 各個(gè)方法測(cè)試對(duì)比結(jié)果(累計(jì)讀取源Oracle數(shù)據(jù)庫(kù)效率)
表4 各個(gè)方法測(cè)試對(duì)比結(jié)果(累計(jì)寫(xiě)入目標(biāo)Greenplum數(shù)據(jù)庫(kù)效率)
通過(guò)觀察計(jì)算可以發(fā)現(xiàn)JDBC全量抽取和CDC+JDBC這兩種方式再讀取和寫(xiě)入效率上都是一致的,因?yàn)樗麄兌荚谕粋€(gè)任務(wù)中做處理。而CDC+GPKAFKA這種方式,由于讀取出數(shù)據(jù)后,暫存在Kafka中,有個(gè)消費(fèi)的過(guò)程,它的讀取和寫(xiě)入的效率是有區(qū)別的,寫(xiě)入效率遠(yuǎn)高于讀取的效率。從事實(shí)數(shù)據(jù)抽取的效率上面來(lái)看,CDC+GPKAFKA這種方式是最優(yōu)選。CDC+GPKAFKA由于同時(shí)采用了CDC異步日志分析技術(shù)和kafak流式并行寫(xiě)入技術(shù),不經(jīng)過(guò)JDBC接口因此在小表和大表,小數(shù)據(jù)改變量到大數(shù)據(jù)改變量情況下都保持了非常好的數(shù)據(jù)抽取寫(xiě)入性能[6]。
我們對(duì)多個(gè)業(yè)務(wù)數(shù)據(jù)源系統(tǒng)進(jìn)行數(shù)據(jù)實(shí)時(shí)抽取,包含的數(shù)據(jù)源表總共約5000余張,歷史存量數(shù)據(jù)約150GB,月增量約10GB。同時(shí),以上所述業(yè)務(wù)系統(tǒng)內(nèi)的原始數(shù)據(jù)源表,包含部分未做分區(qū)、分表等設(shè)計(jì)的超大表,無(wú)唯一鍵和主鍵等現(xiàn)象,其中最大的表記錄數(shù)量達(dá)到2億條,約5GB,為數(shù)據(jù)中心對(duì)于該類(lèi)表的數(shù)據(jù)實(shí)時(shí)性抽取增加了較大的困難和障礙。原始的數(shù)據(jù)抽取方式為全量數(shù)據(jù)接入方式和基于全量對(duì)比的數(shù)據(jù)抽取技術(shù)。該方式處理效率低下,通常造成數(shù)據(jù)積壓嚴(yán)重,無(wú)法達(dá)到數(shù)據(jù)的實(shí)時(shí)性抽取任務(wù)要求[7]。
針對(duì)該問(wèn)題,提出的一種CDC結(jié)合GPKafka的實(shí)時(shí)數(shù)據(jù)接入抽取技術(shù),提升了中臺(tái)數(shù)據(jù)抽取實(shí)時(shí)性。通過(guò)與CDC+JDBC方式、JDBC全量抽取方式對(duì)比,CDC+GPKAFKA有明顯的優(yōu)勢(shì),在對(duì)1億條數(shù)據(jù)量的讀寫(xiě)測(cè)試中,CDC+GPKafka的方式讀寫(xiě)數(shù)據(jù)效率分別261000條/分鐘和509000條/分鐘。達(dá)到了零數(shù)據(jù)積壓的數(shù)據(jù)入庫(kù)要求。