教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

RabbitMQ:偽延時隊列

更新時間:2018年10月24日15時50分 來源:傳智播客 瀏覽次數(shù):

  一、什么是延時隊列

  所謂延時隊列是指消息push到隊列后,監(jiān)聽的消費者不能第一時間獲取消息,需要等到指定時間才能消費。

  一般在業(yè)務里面需要對某些消息做定時發(fā)送,不想走定時任務或者是用戶下單之后多長時間自動失效類似的場景可以考慮通過延時隊列實現(xiàn)。

  二、RabbitMQ實現(xiàn)

  MQ本身并不支持直接的延時隊列實現(xiàn),但是我們可以通過RabbitMQ的消息TTL和Dead Letter規(guī)則來實現(xiàn)

  Time TO Live (TTL): RabbitMQ可以針對Queue設置x-expires 或者 針對Message設置 x-message-ttl,來控制消息的生存時間

  Dead Letter 死信 RabbitMQ官網(wǎng)這樣定義死信消息:

  . 消息被拒絕(basic.reject或basic.nack)并且requeue=false.

  . 消息TTL過期

  隊列達到最大長度(隊列滿了,無法再添加數(shù)據(jù)到mq中)

  Dead Letter Exchanges(DLX)死信交換機 MQ默認的死信消息是丟棄的,但是我們可以通過設置以下兩個屬性讓死信消息轉(zhuǎn)發(fā)到我們指定的隊列。

  x-dead-letter-exchange:出現(xiàn)dead letter之后將dead letter重新發(fā)送到指定exchange

  x-dead-letter-routing-key:出現(xiàn)dead letter之后將dead letter重新按照指定的routing-key發(fā)送

  延時隊列實現(xiàn): 了解了MQ隊列的TTL和Dead Letter之后,我們就可以通過這兩個特性來實現(xiàn),首先我們通過設置消息或者隊列的TTL來設置消息在指定時間后成為死信,再設置死信消息的路由轉(zhuǎn)發(fā)規(guī)則到特定隊列,消費者通過監(jiān)聽這個特定隊列就能實現(xiàn)延時隊列的效果。

  代碼實現(xiàn)

  生產(chǎn)者發(fā)送消息:ttlQueue存放過期時間的隊列,deadLetterQueue死信轉(zhuǎn)發(fā)隊列,seconds是過期時間

  public static void sendTTLMsg(String ttlQueue, String deadLetterQueue, Object msg, Integer seconds) {

  MqSender.getInstance().setHost(RABBIT_MQ_HOST);

  // 獲取到連接以及MQ通道

  Connection connection;

  try {

  connection = MqSender.getInstance().newConnection();

  // 從連接中創(chuàng)建通道

  Channel channel = connection.createChannel();

  // 配置

  Map args = new HashMap();

  args.put("x-dead-letter-exchange", "");

  args.put("x-dead-letter-routing-key", deadLetterQueue);

  channel.queueDeclare(deadLetterQueue, true, false, false, null);

  channel.queueDeclare(ttlQueue, true, false, false, args);

  // 發(fā)送消息

  channel.basicPublish("", ttlQueue, new AMQP.BasicProperties.Builder().expiration(String.valueOf(seconds)).build(), MAPPER.writeValueAsBytes(msg));

  channel.close();

  connection.close();

  } catch (IOException e) {

  e.printStackTrace();

  }

  }

  消費者通過監(jiān)聽deadLetterQueue來實現(xiàn)延時消息監(jiān)聽

  三、 延時隊列的問題

  通過我們測試發(fā)現(xiàn),這種方式實現(xiàn)的延時隊列,在隊列設置TTL的情況下是可以正常的,但是如果根據(jù)消息設置了不同的TTL,就會有問題,因為MQ本質(zhì)上還是消息隊列中間件,隊列是遵循先進先出的,如果有兩個消息先后入隊,但是后入隊的消息TTL小于前面的消息,它必須等待之前的消息被消費完后才能挪到隊列頭部,這樣不同延時消息就會出現(xiàn)問題。

  通過RabbitMQ官網(wǎng)的文檔也介紹了這個問題:

  Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered)

  所以我才稱之為MQ的偽延時隊列,這種延時隊列在消息TTL不同的情況下并不能實現(xiàn)真正的延時消費。

  四、解決RabbitMQ的偽延時方案

  既然RabbitMQ無法支持不同TTL消息的延時消費,那么如果我們要實現(xiàn)這種功能,有什么方案呢,在實際業(yè)務開發(fā)中,我們有這樣的解決方案:

  首先我們會創(chuàng)建多級延時消費隊列(比如兩分鐘,三十分鐘,一天三種,具體可以根據(jù)業(yè)務量和訪問量還有時間精確度來劃分,這里的兩分鐘、三十分鐘是指隊列統(tǒng)一的TTL),push消費隊列的時候,會根據(jù)需要延時的時間,丟到不同的消費隊列,比如小于三十分鐘的我們push到兩分鐘隊列,三十分鐘到一天的放入三十分鐘隊列,超過一天的放入一天隊列,在死信隊列的監(jiān)聽器做同樣的判斷,如果是小于等于當前時間消息的,立馬消費,否則按照上述規(guī)則繼續(xù)循環(huán)到不同的延時隊列

  這種方案解決了多級延時消費的問題,并且能夠較大程度地避免了消息的重復循環(huán),降低MQ的壓力,但是缺點也比較明顯,因為最低是兩分鐘的延時,理論上來說最多會有兩分鐘的誤差,如果對時間要求性比較高的,可以適當調(diào)低最低一級別的延時TTL,比如一分鐘或者三十秒

  類似代碼如下:cts是需要消費掉的時間戳

  long now = System.currentTimeMillis();

  long cts = Long.valueOf(feedComment.getCts());

  if (cts - now <= 30 * 60 * 1000) {

  MqSender.sendTTLMsg(MqConstants.FEED_COMMENT_DELAY_QUEUE_2MIN, MqConstants.FEED_COMMENT_AUTO_POST_QUEUE, feedComment, 2 * 60);

  } else if (cts - now <= 24 * 60 * 60 * 1000) {

  MqSender.sendTTLMsg(MqConstants.FEED_COMMENT_DELAY_QUEUE_30MIN, MqConstants.FEED_COMMENT_AUTO_POST_QUEUE, feedComment, 30 * 60);

  } else {

  MqSender.sendTTLMsg(MqConstants.FEED_COMMENT_DELAY_QUEUE_24HOUR, MqConstants.FEED_COMMENT_AUTO_POST_QUEUE, feedComment, 24 * 60 * 60);

  }



作者:傳智播客JavaEE培訓學院
首發(fā):http://java.itcast.cn/

0 分享到:
和我們在線交談!