于金良 朱志祥 李聰穎
摘 要:為了解決實(shí)時(shí)流式數(shù)據(jù)的采集問題,研究了一種分布式消息隊(duì)列Kafka,它可以實(shí)時(shí)采集流式數(shù)據(jù),處理數(shù)據(jù)時(shí)先從它訂閱,就可以以流數(shù)據(jù)的形式處理數(shù)據(jù)。該隊(duì)列具有部署簡單、易于管理、吞吐量高、高容錯(cuò)性等優(yōu)點(diǎn)。經(jīng)測(cè)試,該隊(duì)列可以滿足實(shí)際生產(chǎn)中對(duì)吞吐量的需求。
關(guān)鍵詞:分布式;消息隊(duì)列;主題;流數(shù)據(jù)
中圖分類號(hào):TP274.2 文獻(xiàn)標(biāo)識(shí)碼:A 文章編號(hào):2095-1302(2016)08-00-03
0 引 言
當(dāng)前大數(shù)據(jù)處理的數(shù)據(jù)形式主要分為兩種。一種是固定的批量數(shù)據(jù),這種數(shù)據(jù)是一個(gè)文件或者一個(gè)數(shù)據(jù)庫,其數(shù)據(jù)量是固定的,我們只需一次讀取,然后進(jìn)行計(jì)算;另外一種是活躍的流式數(shù)據(jù),這種數(shù)據(jù)是一個(gè)數(shù)據(jù)流,是實(shí)時(shí)生成、可傳輸?shù)?,如一個(gè)網(wǎng)站的流量page views、用戶搜索的內(nèi)容等,這些數(shù)據(jù)是實(shí)時(shí)的,且數(shù)據(jù)量很大,其采集比較困難,傳統(tǒng)的消息隊(duì)列很難應(yīng)用到此種場(chǎng)景中。要想處理這些數(shù)據(jù)就需要先采集這些數(shù)據(jù),本文介紹了一種可以實(shí)時(shí)采集這些數(shù)據(jù)的分布式消息隊(duì)列Kafka。
1 簡介
Kafka是LinkedIn于2010年12月份開源的消息系統(tǒng),它主要用于處理活躍的流式數(shù)據(jù)。活躍的流式數(shù)據(jù)在Web網(wǎng)站的應(yīng)用中十分常見,包括網(wǎng)站的pv、用戶訪問的內(nèi)容、搜索的內(nèi)容等。這些數(shù)據(jù)通常以日志的形式記錄下來,然后每隔一段時(shí)間進(jìn)行一次統(tǒng)計(jì)處理。
傳統(tǒng)的日志分析系統(tǒng)提供了一種離線處理日志信息的可擴(kuò)展方案,但若要進(jìn)行實(shí)時(shí)處理,通常會(huì)有較大延遲。而現(xiàn)有的消息(隊(duì)列)系統(tǒng)能夠很好地處理實(shí)時(shí)或者近似實(shí)時(shí)的應(yīng)用,但未處理的數(shù)據(jù)通常不會(huì)寫到磁盤上,這對(duì)于Hadoop之類(一小時(shí)或者一天只處理一部分?jǐn)?shù)據(jù))的離線應(yīng)用而言,可能會(huì)存在些許問題。Kafka正是為了解決以上問題而設(shè)計(jì)的,它能夠很好地提供離線和在線應(yīng)用。
Kafka對(duì)消息按topic進(jìn)行歸類保存,消息的發(fā)布者稱為生產(chǎn)者(Producer),消息的接收者稱為消費(fèi)者(Consumer)。Kafka是分布式,它的集群由多個(gè)Kafka實(shí)例組成,每個(gè)實(shí)例是一個(gè)Broker。Kafka集群的信息及生產(chǎn)者與消費(fèi)者的元數(shù)據(jù)都由Zookeeper保存,它本身無需保存這些數(shù)據(jù)。
2 設(shè)計(jì)原理
Kafka的設(shè)計(jì)初衷是希望作為一個(gè)統(tǒng)一的數(shù)據(jù)收集平臺(tái),能夠?qū)崟r(shí)收集數(shù)據(jù)、支撐大數(shù)據(jù),并具備良好的容錯(cuò)能力。
2.1 存儲(chǔ)
Kafka不會(huì)在消息被消費(fèi)后直接刪除,而是將消息持久化在磁盤中,使用文件存儲(chǔ)消息,而文件系統(tǒng)的優(yōu)化幾乎是不可能的,為了提高性能,采用了緩存/直接內(nèi)存映射的方法。為了減少對(duì)磁盤的訪問次數(shù),Broker將數(shù)據(jù)暫時(shí)緩存起來,當(dāng)消息的數(shù)量達(dá)到一定值時(shí),再flush到磁盤中,減少了在磁盤I/O上消耗的時(shí)間。
2.2 高吞吐量
為了提高Kafka的吞吐量,它采用了批量傳輸發(fā)送的方法,即生產(chǎn)者發(fā)布消息時(shí),先將消息緩存,當(dāng)達(dá)到一定量時(shí),批量發(fā)送到Broker;對(duì)于消費(fèi)者也一樣,Broker會(huì)批量發(fā)送多條消息??紤]到網(wǎng)絡(luò)I/O,Kafka將在網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù)進(jìn)行壓縮,它支持的壓縮方式有g(shù)zip/snappy等。而在創(chuàng)建主題時(shí),可以分為多個(gè)分區(qū),進(jìn)一步提高讀寫的吞吐量。
2.3 負(fù)載均衡
生產(chǎn)者根據(jù)用戶指定的算法,將消息發(fā)送到指定的分區(qū);存在多個(gè)分區(qū),每個(gè)分區(qū)都有自己的副本,每個(gè)副本分布在不同的Broker節(jié)點(diǎn)上;多個(gè)分區(qū)需要選取出主分區(qū),主分區(qū)負(fù)責(zé)讀寫,并由Zookeeper負(fù)責(zé)故障恢復(fù);通過Zookeeper管理Broker與消費(fèi)者的動(dòng)態(tài)加入與離開。
2.4 自動(dòng)擴(kuò)容
由于在大數(shù)據(jù)行業(yè)中數(shù)據(jù)量的大小難以估計(jì),Kafka支持集群的橫向擴(kuò)展,當(dāng)需要增加Broker結(jié)點(diǎn)時(shí),新增的Broker、生產(chǎn)者、消費(fèi)者會(huì)向Zookeeper注冊(cè),并及時(shí)作出調(diào)整。
3 技術(shù)架構(gòu)
Kafka是使用scala語言開發(fā)的,同時(shí)支持多種編程語言的客戶端(c++、java、python、go等),其總體架構(gòu)如圖1所示。
Kafka的消息分為以下幾個(gè)層次:
(1)主題(Topic):一類消息,例如頁面瀏覽日志,用戶搜索日志等都可以以主題的形式存在,Kafka集群能夠同時(shí)負(fù)責(zé)一個(gè)或多個(gè)主題的分發(fā)。
(2)分區(qū)(Partition):是在主題物理上的分組,一個(gè)主題可以分為多個(gè)分區(qū),每個(gè)分區(qū)是一個(gè)有序的隊(duì)列。分區(qū)中的每條消息都會(huì)被分配一個(gè)有序的id(offset)。
(3) 消息(Message):最小訂閱單元。
具體流程如下所示:
(1)生產(chǎn)者根據(jù)指定的分區(qū)方法(round-robin、hash等),將消息發(fā)布到指定主題的分區(qū)中;
(2)Kafka集群接收到生產(chǎn)者發(fā)過來的消息后,將其持久化到硬盤,并保留消息指定時(shí)長(可配置),而不關(guān)注消息是否被消費(fèi);
(3)消費(fèi)者從Kafka集群拉數(shù)據(jù),并控制獲取消息的offset。
4 主題與分區(qū)
一個(gè)topic是對(duì)一組消息的歸納。Kafka對(duì)每個(gè)主題的日志進(jìn)行了分區(qū),如圖2所示。
每個(gè)分區(qū)都由一系列有序的、不可變的消息組成,這些消息被連續(xù)追加到分區(qū)中。分區(qū)中的每個(gè)消息都有一個(gè)連續(xù)的序列號(hào)叫做offset,用來在分區(qū)中唯一的標(biāo)識(shí)這個(gè)消息。
在一個(gè)可配置的時(shí)間段內(nèi),Kafka集群保留所有發(fā)布的消息,不管這些消息有沒有被消費(fèi)。比如將消息的保存策略設(shè)置為2天,那么在一個(gè)消息發(fā)布到Kafka的兩天時(shí)間內(nèi),它都可以被消費(fèi)。之后它將被丟棄以釋放空間。Kafka的性能是和數(shù)據(jù)量無關(guān)的常量級(jí),所以保留太多的數(shù)據(jù)并不是問題。
實(shí)際上每個(gè)消費(fèi)者唯一需要維護(hù)的是已消費(fèi)消息在消息隊(duì)列中的位置,即offset。一般情況下隨著消費(fèi)者不斷的讀取消息,offset的值隨之不斷增加,其實(shí)消費(fèi)者可以以任意的順序讀取消息,比如它可以將offset設(shè)置成一個(gè)舊值來重讀之前的消息。
結(jié)合以上特點(diǎn)可以發(fā)現(xiàn),Kafka消費(fèi)者是輕量級(jí)的,它們可以在不對(duì)集群和其他消費(fèi)者造成影響的情況下讀取消息。
將日志分區(qū)可以達(dá)到以下目的:首先這使得每個(gè)日志的數(shù)量不會(huì)太大,可以在單個(gè)服務(wù)上保存。另外每個(gè)分區(qū)可以單獨(dú)發(fā)布和消費(fèi),為并發(fā)操作主題提供了一種可能。
4.1 分布式
每個(gè)分區(qū)在Kafka集群的若干服務(wù)中都有副本,這些持有副本的服務(wù)可以共同處理數(shù)據(jù)和請(qǐng)求,副本數(shù)量可以配置。副本使Kafka具備了容錯(cuò)能力。
每個(gè)分區(qū)都由一個(gè)服務(wù)器作為主服務(wù),零或若干服務(wù)器作為從服務(wù),主服務(wù)負(fù)責(zé)處理消息的讀和寫,從服務(wù)則復(fù)制主服務(wù),若主服務(wù)宕機(jī)了,從服務(wù)中的一臺(tái)則會(huì)自動(dòng)成為主服務(wù)。集群中的每個(gè)服務(wù)都會(huì)同時(shí)扮演兩個(gè)角色:作為它所持有的一部分分區(qū)的主,同時(shí)作為其他分區(qū)的從,這樣集群就會(huì)有較好的負(fù)載均衡。
4.2 生產(chǎn)者
生產(chǎn)者(Producer)將消息發(fā)布到它指定的主題中,并決定發(fā)布到哪個(gè)分區(qū)中。一般是由負(fù)載均衡機(jī)制隨機(jī)選擇一個(gè)分區(qū),也可通過特定的分區(qū)函數(shù)來選擇分區(qū)。使用更多的是第二種。
4.3 消費(fèi)者
發(fā)布消息通常有兩種模式:隊(duì)列模式(queuing)和發(fā)布-訂閱模式(publish-subscribe)。隊(duì)列模式中,消費(fèi)者(Consumers)可以同時(shí)從服務(wù)端讀取消息,每個(gè)消息只被其中一個(gè)消費(fèi)者讀到;發(fā)布-訂閱模式中消息被廣播到所有的消費(fèi)者中。多個(gè)消費(fèi)者可以加入到一個(gè)消費(fèi)者組中,共同競爭一個(gè)主題,主題中的消息將被分發(fā)到組中的一個(gè)成員中。同一組中的消費(fèi)者可以在不同的程序中、不同的機(jī)器上。如果所有的消費(fèi)者都在一個(gè)組中,這就成為了傳統(tǒng)的隊(duì)列模式,在消費(fèi)者組中實(shí)現(xiàn)負(fù)載均衡。
5 性能測(cè)試
通過一臺(tái)4核、8 G內(nèi)存的臺(tái)式機(jī),向一個(gè)擁有5臺(tái)Broker的Kafka集群發(fā)布、訂閱消息。消息的平均大小為100個(gè)字節(jié)。測(cè)試結(jié)果如下:
生產(chǎn)者:每秒可發(fā)布30萬條消息,且可通過調(diào)整request.required.acks參數(shù)來保證數(shù)據(jù)的可靠性。
消費(fèi)者:每秒可消費(fèi)5萬條數(shù)據(jù)。通過使用java語言編寫Kafka生產(chǎn)者與消費(fèi)者對(duì)Kafka性能進(jìn)行測(cè)試。
6 結(jié) 語
分布式消息隊(duì)列Kafka具有部署簡單、易于管理、高吞吐量、高容錯(cuò)性等優(yōu)點(diǎn),經(jīng)測(cè)試,可以滿足實(shí)際生產(chǎn)中對(duì)吞吐量的需求。
參考文獻(xiàn)
[1]孫韓林.一種基于云計(jì)算的網(wǎng)絡(luò)流量分析系統(tǒng)結(jié)構(gòu)[J].西安郵電大學(xué)學(xué)報(bào),2013,18(4):75-79.
[2]金澈清,錢衛(wèi)寧,周傲英.流數(shù)據(jù)分析與管理綜述[J].軟件學(xué)報(bào),2004,15(8):1172-1181.
[3]曹婧華,冉彥中,許志軍.分布式消息隊(duì)列的設(shè)計(jì)與實(shí)現(xiàn)[J].河南科技大學(xué)學(xué)報(bào)(自然科學(xué)版),2010,31(4):35-38,109.
[4]王博,陳莉君.Hadoop遠(yuǎn)程過程調(diào)用機(jī)制的分析和應(yīng)用[J].西安郵電學(xué)院學(xué)報(bào),2012,17(6):74-77.
[5]盧本捷.分布式消息隊(duì)列的理論、實(shí)現(xiàn)與應(yīng)用[D].武漢:華中科技大學(xué),2004.
[6]崔小燕.Linux集群系統(tǒng)分析[J].西安郵電學(xué)院學(xué)報(bào),2006,11(5):103-106.
[7]于自強(qiáng).海量流數(shù)據(jù)挖掘相關(guān)問題研究[D].濟(jì)南:山東大學(xué),2015.
[8]謝曉燕,張靜雯.一種基于Linux集群技術(shù)的負(fù)載均衡方法[J].西安郵電大學(xué)學(xué)報(bào),2014,19(3):64-68.
[9]陸慶,周世杰,秦志光,等.消息隊(duì)列中間件系統(tǒng)中消息隊(duì)列與消息分發(fā)技術(shù)研究[J].計(jì)算機(jī)應(yīng)用研究,2003(8):51-53.