鄧杰 童孟軍 胡文澤 林英杰 胡燚
摘? 要: Mysql數(shù)據(jù)庫表切換過程,需要保證兩個相同表同時寫入。Solr作為企業(yè)級的搜索框架,其自帶的索引更新系統(tǒng)是增量更新,實時性差。為了滿足互聯(lián)網(wǎng)企業(yè)對數(shù)據(jù)的實時性要求,設(shè)計和實現(xiàn)了一款基于Zookeeper的能夠準(zhǔn)實時更新Solr索引和實現(xiàn)數(shù)據(jù)庫雙寫的系統(tǒng)。用戶需要提交配置信息到系統(tǒng),當(dāng)Mysql數(shù)據(jù)源發(fā)生內(nèi)容變更時,系統(tǒng)就能夠?qū)崟r捕獲,將變更內(nèi)容經(jīng)過數(shù)據(jù)轉(zhuǎn)化并實時同步更新Solr索引。
關(guān)鍵詞: Zookeeper; Solr; Mysql; 實時同步; 分布式
中圖分類號:TP392? ? ? ? ? 文獻(xiàn)標(biāo)識碼:? ? ?文章編號:1006-8228(2020)02-58-04
Quasi–real-time index update system using Zookeeper and its monitoring
Deng Jie, Tong Mengjun, Hu Wenze, Lin Yingjie, Hu Yi
(College of Information Engineering, Zhejiang A&F University, Hangzhou, Zhejiang, 311300, China)
Abstract: Mysql database table switching process, it is needed to ensure that two same tables are written at the same time. As an enterprise-class search framework, Solr's built-in index update system is incremental update with poor real-time performance. In order to meet the real-time requirements of Internet enterprises, a system is designed and implemented by using Zookeeper, which can quasi–real-timely update Solr index and realize database dual write. Users need to submit configuration information to the system. When the content of Mysql data source changes, the system can capture the changed content in real time, and update Solr index synchronously in real time through the data transformation.
Key words: Zookeeper; Solr; Mysql; real-time; synchronization
0 引言
當(dāng)前互聯(lián)網(wǎng)企業(yè)對數(shù)據(jù)的實時性要求越來越高。所以本文研究的內(nèi)容著眼于實時的數(shù)據(jù)同步,整個系統(tǒng)分為數(shù)據(jù)抓取分發(fā)平臺部分和索引更新部分?;谶@個系統(tǒng)來解決下面兩方面的問題。
⑴ 很多公司會將部分?jǐn)?shù)據(jù)冗余存在搜索平臺Solr上,當(dāng)用戶請求數(shù)據(jù)時,直接從搜索平臺Solr上獲取而不是訪問數(shù)據(jù)庫(因為數(shù)據(jù)庫不支持復(fù)雜的搜索邏輯)。所以需要一款能夠?qū)崟r更新搜索索引的系統(tǒng),當(dāng)數(shù)據(jù)庫內(nèi)容發(fā)生變更時,及時更新對應(yīng)的Solr上的索引。
⑵ 有些特殊情況需要替換線上運行的數(shù)據(jù)庫mysql內(nèi)的某個表,但又因為是線上運行的數(shù)據(jù)庫,不應(yīng)該讓業(yè)務(wù)感知到這種變更,所以需要一個系統(tǒng)進(jìn)行漸進(jìn)的替換過程。
1 核心框架和相關(guān)技術(shù)介紹
1.1 系統(tǒng)運用到的核心框架
zookeeper提供分布式協(xié)調(diào)服務(wù),提供諸如統(tǒng)一命名服務(wù)、配置管理和分布式鎖、分布式消息等分布式的基礎(chǔ)服務(wù)。它是一個典型的分布式數(shù)據(jù)一致性的解決方案,分布式應(yīng)用可以基于zookeeper實現(xiàn)發(fā)布/訂閱、負(fù)載均衡、命名服務(wù)、分布式協(xié)調(diào)/通知、集群管理/master選舉、分布式鎖和分布式隊列等功能[1]。功能強(qiáng)大又好用。
kafka是一款基于發(fā)布訂閱模型的消息隊列。它是一個能夠提供實時數(shù)據(jù)傳輸?shù)钠脚_,具有高吞吐、低延遲的特點。使用它的原因:一是可以處理更多的消息,不受單臺服務(wù)器的限制,二是分區(qū)可作為并行處理的單元。這樣使所有的請求給多臺服務(wù)器處理[2]。
Kafka Connect是一種用于Kafka和其他數(shù)據(jù)系統(tǒng)之間進(jìn)行數(shù)據(jù)傳輸?shù)墓ぞ?。僅關(guān)注數(shù)據(jù)的復(fù)制,不處理其他任務(wù),對數(shù)據(jù)的傳輸進(jìn)行管理和監(jiān)控。
Debezium是Kafka Connect的一種實現(xiàn),主要用于數(shù)據(jù)庫和kafka之間的數(shù)據(jù)傳輸。它是一個CDC(Change Data Capture)系統(tǒng),能實時捕獲上游數(shù)據(jù)的變動。然后記錄到一個或者多個Kafka topic。
solr是一款開源的企業(yè)級搜索框架,其主要功能有全文檢索,分詞,拼音檢索等。通過solr索引,能夠在短時間從海量數(shù)據(jù)里得到用戶關(guān)心的數(shù)據(jù)。
1.2 系統(tǒng)運用到的主要技術(shù)
⑴ Spring技術(shù)。Spring是現(xiàn)在非常流行的一個框架。SpringMVC是Spring的一個模塊,它提供完整的MVC模型解決方案[3]。本系統(tǒng)使用REST接口來管理同步任務(wù),通過使用Spring和SpringMVC能夠很方便、快速的搭建一個后端應(yīng)用,簡化開發(fā)流程[4]。
⑵ Java多線程技術(shù)。只有一個線程的系統(tǒng),運行效率必定糟糕,運用到多線程技術(shù)后可以同時運行不同的同步任務(wù),大大的提高了運行的效率[5]。
⑶ 分布式技術(shù)。本系統(tǒng)基于搶占式任務(wù)調(diào)度方式,保證同步任務(wù)的高可用,可以在不同機(jī)器運行。
2 系統(tǒng)實現(xiàn)
2.1 實時數(shù)據(jù)管道
實時數(shù)據(jù)管道主要數(shù)據(jù)來自mysql。當(dāng)數(shù)據(jù)庫信息發(fā)生變更時,mysql內(nèi)變更前和變更后的內(nèi)容會被發(fā)送到實時數(shù)據(jù)管道上,使各個接入方在極短時間內(nèi)收到數(shù)據(jù)庫變更信息。
實時數(shù)據(jù)管道由3個核心部分組成:CDC模塊、系統(tǒng)中間體Kafka和Schema registry。
圖1是一個簡單的架構(gòu)圖,為了展示用戶如何通過管道得到數(shù)據(jù)變更。Debezium實時的抓取到mysql的數(shù)據(jù)變更并使用Avro將數(shù)據(jù)序列化得到schema和值,其中schema被提交到schema registry并返回id,之后id和序列化后的值一起發(fā)送到kafka。數(shù)據(jù)管道下游邏輯由開發(fā)人員實現(xiàn):初始化一個KafkaConsumer,訂閱指定的Kafka Topic,根據(jù)拿到的數(shù)據(jù)schema id,從schema registry里獲取到對應(yīng)的schema,然后使用Avro將拉取到的數(shù)據(jù)的值和schema反序列化成一條消息交付給用戶。本系統(tǒng)中的實時數(shù)據(jù)管道部分還增加了基于組件JMX信息的監(jiān)控,用來實時觀察數(shù)據(jù)管道的狀況。
2.2 Solr索引和Mysql數(shù)據(jù)的同步系統(tǒng)SIS
SIS目的是解決Solr索引實時更新和數(shù)據(jù)庫雙寫的問題。由服務(wù)端和客戶端兩部分組成。用戶向SIS服務(wù)端提交任務(wù),SIS客戶端從SIS服務(wù)端監(jiān)聽到新任務(wù)后,啟動任務(wù)。
⑴ SIS服務(wù)端的實現(xiàn)
SIS服務(wù)端有提交同步任務(wù)、刪除同步任務(wù)、更新同步任務(wù)三個功能。
用戶向服務(wù)端提交任務(wù),服務(wù)端創(chuàng)建/sis/task和/taskX節(jié)點。/sis/task節(jié)點為SIS同步任務(wù)根節(jié)點,/sis/task/taskX表示一個同步任務(wù)節(jié)點,其配置信息都會保存在自己內(nèi)部。
新任務(wù)提交的過程的概括如下:首先SIS服務(wù)端啟動時,首先會嘗試向zookeeper注冊/sis/task持久節(jié)點。然后用戶提交一份同步任務(wù)的配置內(nèi)容到SIS服務(wù)端。SIS服務(wù)端收到任務(wù)創(chuàng)建請求后,向zk創(chuàng)建/sis/task/taskX 持久節(jié)點,其中taskX為同步任務(wù)的名稱。同時,同步任務(wù)的配置信息會被保存到/sis/task/taskX節(jié)點內(nèi)。Zookeeper提供的分布式協(xié)調(diào)功能對同一個節(jié)點的多個創(chuàng)建請求,只會有一個請求能成功,這也保證了不會有多個相同任務(wù)被創(chuàng)建。
⑵ SIS服務(wù)端的設(shè)計
SIS客戶端的設(shè)計圍繞zookeepe展開,它負(fù)責(zé)同步任務(wù)組件創(chuàng)建,調(diào)度器初始化等工作。這里的實現(xiàn)非常復(fù)雜,在這里簡單的闡述客戶端任務(wù)的情況。
圖2是SIS客戶端創(chuàng)建任務(wù)工作流程。其中Client表示SIS服務(wù)端,它們以集群的形式運行,每一個Client都是對等的。
⑴ 每個SIS客戶端啟動時,會向/sis/task節(jié)點注冊監(jiān)聽器,監(jiān)聽該節(jié)點子節(jié)點變化情況。當(dāng)/sis/task子節(jié)點增加,刪除,內(nèi)容更新時SIS客戶端會收到通知。
⑵ SIS服務(wù)端根據(jù)用戶創(chuàng)建任務(wù)請求,創(chuàng)建/sis/task/taskX 任務(wù)節(jié)點,其中taskX為同步任務(wù)的名稱,它是一個持久節(jié)點。
⑶ 所有SIS客戶端都會得到節(jié)點/sis/task/taskX被創(chuàng)建的消息。SIS客戶端收到回調(diào)之后,都會向/sis/task/taskX節(jié)點注冊監(jiān)聽器。Zookeeper保證只會有一個客戶端請求成功,開始任務(wù)同時將同步任務(wù)的運行狀態(tài)寫入到lock節(jié)點內(nèi)部。
⑷ 如果此時運行同步任務(wù)taskX的服務(wù)器發(fā)生宕機(jī), 那么SIS和zookeeper的連接將會斷開,并且lock臨時節(jié)點將自動刪除。剩余的SIS客戶端由于添加了對同步任務(wù)的監(jiān)聽器會收到同步任務(wù)中斷的通知,又開始對/sis/task/taskX任務(wù)節(jié)點加鎖。加鎖成功的SIS客戶端,從任務(wù)節(jié)點讀取配置信息,重新啟動同步任務(wù)?;谶@個機(jī)制實現(xiàn)SIS同步任務(wù)的高可用。
⑸ 如果用戶主動提交刪除任務(wù)請求,那么SIS服務(wù)端首先將/sis/task/taskX/lock鎖節(jié)點的狀態(tài)信息更新為WAIT_FOR_CLOSE,這表示該任務(wù)節(jié)點等待刪除,隨后刪除鎖節(jié)點和/sis/task/taskX任務(wù)節(jié)點,之后所有在/sis/task/taskX任務(wù)節(jié)點注冊監(jiān)聽器的SIS客戶端都會收到鎖節(jié)點被用戶主動刪除的通知,但都不做任何響應(yīng)。任務(wù)節(jié)點刪除之后,所有向/sis/task節(jié)點注冊監(jiān)聽器的SIS客戶端收到任務(wù)節(jié)點被刪除的通知,SIS客戶端根據(jù)通知內(nèi)容判斷同步任務(wù)是否運行在自己所在服務(wù)器來同步任務(wù)和清理資源。
⑹ 如果用戶主動提交更新同步任務(wù)配置信息請求,那么SIS服務(wù)端會更新/sis/task/taskX的節(jié)點內(nèi)容。隨后所有在/sis/task節(jié)點注冊監(jiān)聽器的SIS客戶端都會收到通知,并根據(jù)通知獲取到具體哪個任務(wù)節(jié)點需要更新,隨后更新/sis/task/taskX/lock鎖節(jié)點狀態(tài)為NEED_UPDATE。之后所有在/sis/task/taskX節(jié)點注冊監(jiān)聽器的SIS客戶端收都會收到鎖節(jié)點內(nèi)容被更新的通知,并判斷對應(yīng)的同步任務(wù)是否在自己所在的服務(wù)器,如果是則再次判斷任務(wù)狀態(tài),如果為NEED_UPDATE,那么就停止老的同步任務(wù),清理資源,刪除同步任務(wù)下的鎖節(jié)點。鎖節(jié)點被刪除后,和第⑷步類似,創(chuàng)建新的同步任務(wù)。
上述六個步驟概述了SIS客戶端對任務(wù)的調(diào)度過程,基于zookeeper的SIS客戶端和服務(wù)端的實現(xiàn),讓SIS同步任務(wù)能夠高可用,即使某一臺服務(wù)器宕機(jī),同步任務(wù)也不會中斷。
2.3 實時數(shù)據(jù)管道和SIS
實時數(shù)據(jù)管道可以應(yīng)用于以下的場景:數(shù)值統(tǒng)計、實時數(shù)據(jù)分析、響應(yīng)式編程。實時數(shù)據(jù)管道能夠讓開發(fā)人員實現(xiàn)實時ETL(Extract-Transform-Load),提供實時、無限的數(shù)據(jù)流。
SIS可以解決solr索引更新延時大的缺點,實現(xiàn)索引的實時更新。并且還能夠完美解決Mysql雙寫需要開發(fā)人員在項目代碼里添加額外代碼,實現(xiàn)數(shù)據(jù)寫入兩個庫的問題。
如下是數(shù)據(jù)管道和SIS相結(jié)合實現(xiàn)mysql雙寫。
現(xiàn)在在同一個數(shù)據(jù)庫里有2張表,分別是userinfo和test表。本系統(tǒng)將實現(xiàn):當(dāng)userinfo表有內(nèi)容變更時,test表能立刻同步。用戶提交配置內(nèi)容到同步服務(wù)器,指定userinfo表的變更需要被同步到test表,點擊create創(chuàng)建數(shù)據(jù)庫雙寫同步任務(wù)。
提交的配置信息如圖3,指定SIS同步的數(shù)據(jù)來自數(shù)據(jù)管道userinfo相關(guān)的topic。中間有多個處理過程包括數(shù)據(jù)的冗余,轉(zhuǎn)換。最后數(shù)據(jù)會被寫入mysql的test表。
從圖4可以看到,Userinfo表里userid為1的數(shù)據(jù)變更前timestamp字段的值為null,當(dāng)行內(nèi)容有更新時,timestamp值會被自動更新為內(nèi)容更新的時間。
更新userinfo表userid為1的行數(shù)據(jù),將username更新為name111,如圖5所示。timestamp字段的值被更新為此行內(nèi)容變更時的時間2018-05-15 23:37:03。因為之前創(chuàng)建了mysql雙寫同步任務(wù),所以userinfo的變更內(nèi)容,會被同步到test表。test表的username值變更為name111,并且timestamp為test表userid為1的行變更時的時間 2018-05-14 23:37:04。和userinfo的timestamp值2018-05-15 23:37:03相比,同步userinfo變更內(nèi)容到test表,只花費了1秒。
2.4 對比
圖6⑴是使用本系統(tǒng)后的效果圖。highwater表示每個時間點mysql總共有多少變更數(shù)據(jù),offset表示當(dāng)前消費的數(shù)據(jù)量。從中可以看到,2條線是重合的,也就是說在每個時間點的mysql變更,本系統(tǒng)都能夠?qū)崟r的處理消費。由于監(jiān)控的原因,offset可能會高于highwater,offset高于highwater表示SIS消費是完全跟上了mysql變更。
圖6⑵是不使用本系統(tǒng)而是使用增量的方式處理消息的延時圖。可以看到下面的線offset總是經(jīng)過一段時間后才上漲,而不能做到實時的和highwater保持一致。通過對比,使用了本系統(tǒng)后,能夠做到數(shù)據(jù)的實時處理。
3 總結(jié)
本文主要研究了一個通用的,能夠服務(wù)于不同系統(tǒng)的數(shù)據(jù)同步系統(tǒng)。通過本系統(tǒng),開發(fā)人員只需要編寫一份簡單的描述文件,說明要同步的數(shù)據(jù)從哪來、到哪去,比如指定需要同步的數(shù)據(jù)是哪個mysql的表,這些數(shù)據(jù)會被同步到solr還是mysql的另一個新表等。系統(tǒng)根據(jù)配置內(nèi)容就能自動同步。有了此系統(tǒng)能夠讓開發(fā)人員專注于業(yè)務(wù)開發(fā)而不需要花費大量的精力在業(yè)務(wù)之外的代碼編寫上,提高了開發(fā)效率。
參考文獻(xiàn)(References):
[1] 倪超.從Paxos到Zookeeper:分布式一致性原理與實踐[M].北京:機(jī)械工業(yè)出版社,2015.
[2] 牟大恩.Kafka入門與實踐[M].人民郵電出版社,2017:59-89
[3] Craig Walls.Spring實戰(zhàn)(第4版) [M].人民郵電出版社,2016:187-205
[4] 明日科技.Java Web從入門到精通 [M].清華大學(xué)出版社,2012:78-89
[5] 葛一鳴,郭超.實戰(zhàn)Java高并發(fā)程序設(shè)計[M].電子工業(yè)出版社,2015:100-110
[6] 鳥哥.鳥哥的Linux私房菜[M].人民郵電出版社,2010:120-150
[7] 克雷格·沃斯.Spring Boot實戰(zhàn)[M].人民郵電出版社,2016:93-134
[8] Bruce Eckel.Java編程思想(第4版)[M].機(jī)械工業(yè)出版社,2007:135-150
[9] 瘋狂軟件.Spring+MyBatis企業(yè)應(yīng)用實戰(zhàn)[M].電子工業(yè)出版社,2017:87-102
[10] Raoul-Gabriel Urma, Mario Fusco, Alan Mycroft.Java 8 in Action[M].USA:Manning,2014:153-160