魏連秋,張義紅,張建光,馬 倩,邢春燕,劉 偉
(1.衡水學院數(shù)學與計算機科學系,河北 衡水 053000;2.衡水學院馬克思主義學院,河北 衡水 053000)
消息傳輸過程中,可能會出現(xiàn)網(wǎng)絡異常、程序異常、機器異常等各種突發(fā)情況,無法保證消息正常傳輸。一條消息從生產(chǎn)到消費的整個過程中有生產(chǎn)者生產(chǎn)、消息代理存儲、消費者消費三個過程需要保證消息的可靠性。通過對消息傳輸流程進行分析,發(fā)現(xiàn)若能保證生產(chǎn)過程可靠性、消息代理存儲過程可靠性、消費過程可靠性,則基本可以實現(xiàn)消息傳輸?shù)目煽啃?。本文通過對現(xiàn)有消息中間件實現(xiàn)機制的分析,在遵循AMQP協(xié)議的基礎上,結(jié)合實際業(yè)務應用場景,權(quán)衡消息的可靠傳輸,對消息生產(chǎn)過程、消息代理存儲過程、消息消費過程三個過程進行分析,提出一種分布式系統(tǒng)中消息可靠性傳輸方案,實現(xiàn)消息傳輸?shù)母呖煽啃訹1]。
在生產(chǎn)者發(fā)出消息之后可能會出現(xiàn)網(wǎng)絡擁塞、服務器宕機等突發(fā)問題,從而導致消息的丟失。如果不對消息生產(chǎn)過程的可靠性加以設計,生產(chǎn)者將無法感知消息是否已正常送達消息代理,進而將導致消息的丟失。如果消息在生產(chǎn)過程中,能感知到傳輸過程中消息傳輸失敗,生產(chǎn)者可以進行重新傳輸?shù)群罄m(xù)動作以確保消息生產(chǎn)過程的可靠性。因此,在消息生產(chǎn)過程中,設計一種高級發(fā)布確認方案以保證消息生產(chǎn)可靠性是十分必要的。
RabbitMQ提供了publisher confirm機制來避免消息發(fā)送到MQ過程中丟失,這種機制要給每個消息指定一個唯一ID,消息發(fā)送到MQ以后,會返回一個結(jié)果給發(fā)送者,表示消息是否發(fā)送成功。
publisher發(fā)送消息的配置為publisher-confirmtype。這里支持兩種類型:一種是simple:同步等待confirm結(jié)果,直到超時;另一種是correlated:異步回調(diào),定義ConfirmCallback,MQ返回結(jié)果時會回調(diào)這個ConfirmCallback。相對而言,異步確認性價比較高,因而在此采用correlated:異步回調(diào)。
(1)消息成功發(fā)送到交換機,如圖1所示返回ack。
圖1 消息成功投遞到交換機
(2)消息未成功投遞到交換機,如圖2所示返回nack。
圖2 消息未成功發(fā)送到交換機
在僅開啟publisher confirm機制的情況下,交換機接收到消息后,會直接給消息生產(chǎn)者發(fā)送確認信息,如果發(fā)現(xiàn)該消息不可路由,那么消息會被直接丟棄,此時生產(chǎn)者是不知道消息被丟棄了。這顯然不是我們所希望的,于是提出mandatory+publish-return方案。
設置mandatory參數(shù)可以在當消息傳輸過程中不可達目的地時將消息返回給生產(chǎn)者??梢允褂萌鐖D3所示方案,當把mandatory參數(shù)設置為true時,如果交換機無法將消息進行路由時,會將該消息返回給生產(chǎn)者,當把該參數(shù)設置為false時,如果發(fā)現(xiàn)消息無法進行路由,則直接丟棄。
圖3 mandatory+publish-return方案
在只使用mandatory參數(shù)的情況下,如果交換機無法將消息進行路由時會提示Returned message but no callback available,說明設置mandatory參數(shù)后,如果消息無法被路由,則會返回給生產(chǎn)者,此時是通過回調(diào)的方式進行的,但由于還沒有回調(diào)函數(shù),所以生產(chǎn)者需要設置相應的回調(diào)函數(shù)才能接受該消息。此時就需要設置生產(chǎn)者回執(zhí):publisher-return,實現(xiàn)一個ReturnCallback接口,這樣當消息傳輸?shù)浇粨Q機而沒有路由到隊列時,就返回ACK及路由失敗原因。此時再運行則Returned message but no callback available提示消息,而可以看到生產(chǎn)者接收到了被退回的消息,并帶上了消息被退回的原因:NO_ROUTE。
有了mandatory+publisher-return方案,我們獲得了對無法路由消息的感知能力,此時我們可以通過日志的形式得到反饋并手動重新處理。但實際上,這樣做無形中增加了生產(chǎn)者的復雜性,需要添加處理這些被退回的消息的邏輯。如果既不想增加生產(chǎn)者的復雜性,又不丟失消息,怎么辦?在RabbitMQ中,有一種備份交換機的機制存在,可以較好地解決這一問題。
消息消費可靠性是指在分布式系統(tǒng)中,保證消息在傳輸和處理過程中不會丟失或重復,為了實現(xiàn)可靠的消息消費,一般采取如下措施。
RabbitMQ為了能夠把消息正確發(fā)送給消費者,提供了消息確認機制,即告訴RabbitMQ消息已經(jīng)收到,使用消息確認機制可以使消費者成功處理消息后再從隊列中刪除該消息。
消費者獲取消息后,應該向RabbitMQ發(fā)送ACK回執(zhí),表明自己已經(jīng)處理該消息。若出現(xiàn)這樣的場景:RabbitMQ傳輸消息給消費者,消費者獲取消息后返回ACK,RabbitMQ刪除消息,此時消費者突然宕機消息未成功處理,這樣,消息就丟失了。因此消費者返回ACK的時機非常重要。一般RabbitMQ允許配置三種確認模式:①none:關閉ACK,RabbitMQ假定消費者獲取消息后會成功處理,消息傳輸后立即被刪除,此種模式下消息傳輸是不可靠的,可能丟失,因而不可?。虎赼uto:自動ACK,由spring監(jiān)測listener代碼是否出現(xiàn)異常,沒有異常則返回ACK。此種模式類似于事務機制;③manual:手動ACK,需要在業(yè)務代碼結(jié)束后,手動調(diào)用API發(fā)送ACK,這種模式需要根據(jù)具體情況,判斷什么時候發(fā)送ACK,使用較為復雜。綜合考慮推薦使用默認的auto模式[2]。
當消息數(shù)量變得過多或處理速度變慢時,可以使用消費者端限流來控制消息的傳遞速率。通過限制消息的最大數(shù)量或大小避免消費者宕機,從而提高消費者的可靠性。
為了保障分布式系統(tǒng)中消息可靠性傳輸,筆者設計了如下解決方案。
生產(chǎn)者發(fā)送消息給交換機,若不能發(fā)送成功則利用publisher confirm機制重新傳輸,若交換機不能成功將消息路由到隊列則利用備份交換機機制將消息傳輸?shù)街付犃羞M行消費;若消費者不能成功消費消息則利用延遲重試機制在指定次數(shù)內(nèi)進行循環(huán)延遲重試;若循環(huán)結(jié)束仍舊不能成功消費則利用死信交換機將消息傳輸?shù)剿佬抨犃羞M行消費。其中的交換機、隊列設置均設為持久的,消息視具體情況而定,使用如圖4所示傳輸方案,這樣可最大限度保證消息可靠性傳輸與消費。
圖4 發(fā)布確認+帶重試次數(shù)延遲消費的可靠性傳輸方案
生產(chǎn)端核心代碼如下:
在消費端對消息的處理邏輯中,還應考慮冪等性,即對同一條消息多次處理所產(chǎn)生的結(jié)果應該是相同的,這樣出現(xiàn)重復處理的情況也不會對系統(tǒng)產(chǎn)生影響。另外,在分布式環(huán)境中,我們還可以采用副本集群模式,即在RabbitMQ集群中,多個節(jié)點會同步存儲隊列中的消息,如果某個節(jié)點發(fā)生故障,其他節(jié)點可以接替它繼續(xù)處理數(shù)據(jù),從而防止消息丟失。通過以上解決方案的合理選擇和組合,可以提高RabbitMQ性能和可靠性,從而確保分布式系統(tǒng)的穩(wěn)定性和可用性。