rabbitmq~消(xiao)息(xi)失敗后重試達(da)到(dao) TTL放到(dao)死信隊(dui)列(事務型(xing)消(xiao)息(xi)補償機(ji)制(zhi))
這是(shi)一(yi)個(ge)基于消(xiao)息(xi)的(de)(de)分布式事務(wu)的(de)(de)一(yi)部(bu)分,主(zhu)要(yao)通過(guo)消(xiao)息(xi)來實現,生(sheng)產(chan)者把消(xiao)息(xi)發到隊列后,由消(xiao)費(fei)方去執行(xing)剩下的(de)(de)邏輯,而(er)當消(xiao)費(fei)方處理失敗后,我們需要(yao)進行(xing)重(zhong)(zhong)試,即為了最(zui)現數據(ju)的(de)(de)最(zui)終一(yi)致性,在(zai)rabbitmq里(li),它有消(xiao)息(xi)重(zhong)(zhong)試和重(zhong)(zhong)試次數的(de)(de)配置,但當你配置之后,你的(de)(de)TTL達到 后,消(xiao)息(xi)不能(neng)自動放入死信隊列,所以這塊需要(yao)手工(gong)處理一(yi)下.
rabbitmq關于消息重試的配置
rabbitmq:
host: xxx
port: xxx
username: xxx
password: xxx
virtual-host: xxx
###開啟消息確認機制 confirms
publisher-confirms: true
publisher-returns: true
listener:
simple:
acknowledge-mode: manual #設置確認方式
prefetch: 1 #每次處理1條消息
retry.max-attempts: 3 # 最大重試次數
retry.enabled: true #是否開啟消費者重試(為false時關閉消費者重試,這時消費端代碼異常會一直重復收到消息)
retry.initial-interval: 2000 #重試間隔時間(單位毫秒)
default-requeue-rejected: true #該配置項是決定由于監聽器拋出異常而拒絕的消息是否被重新放回隊列。默認值為true,需要手動basicNack時這些參數諒失效了
手工實現消息重試并放入死信的方式
定義隊列的相關配置
/**
* 創建普通交換機.
*/
@Bean
public TopicExchange lindExchange() {
//消息持久化
return (TopicExchange) ExchangeBuilder.topicExchange(EXCHANGE).durable(true).build();
}
@Bean
public TopicExchange deadExchange() {
return (TopicExchange) ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true).build();
}
/**
* 基于消息事務的處理方式,當消費失敗進行重試,有時間間隔,當達到超時時間,就發到死信隊列,等待人工處理.
* @return
*/
@Bean
public Queue testQueue() {
//設置死信交換機
return QueueBuilder.durable(QUEUE).withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)
//毫秒
.withArgument("x-message-ttl", CONSUMER_EXPIRE)
//設置死信routingKey
.withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE).build();
}
@Bean
public Queue deadQueue() {
return new Queue(LIND_DEAD_QUEUE);
}
@Bean
public Binding bindBuildersRouteKey() {
return BindingBuilder.bind(testQueue()).to(lindExchange()).with(ROUTER);
}
@Bean
public Binding bindDeadBuildersRouteKey() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(LIND_DEAD_QUEUE);
}
消費者實現的代碼
/**
* 延時隊列:不應該有RabbitListener訂閱者,應該讓它自己達到超時時間后自動轉到死信里去消費
* 消息異常處理:消費出現異常后,延時幾秒,然后從新入隊列消費,直到達到TTL超時時間,再轉到死信,證明這個信息有問題需要人工干預
*
* @param message
*/
@RabbitListener(queues = MqConfig.QUEUE)
public void testSubscribe(Message message, Channel channel) throws IOException, InterruptedException {
try {
System.out.println(LocalDateTime.now() + ":Subscriber:" + new String(message.getBody(), "UTF-8"));
//當程序處理出現問題時,消息使用basicReject上報
int a = 0;
int b = 1 / a;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (Exception ex) {
//出現異常手動放回隊列
Thread.sleep(2000);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
/**
* 死信隊列.
*
* @param message
*/
@RabbitListener(queues = MqConfig.LIND_DEAD_QUEUE)
public void dealSubscribe(Message message, Channel channel) throws IOException {
System.out.println("Dead Subscriber:" + new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
消費者(zhe)這(zhe)塊,也可以直接聲明隊列和(he)綁定交換機,直接在注解(jie)上添加 QueueBinding即可.
@RabbitListener(bindings = {@QueueBinding(value = @Queue(
name = MqConfig.QUEUE,
durable = "true",arguments = {@Argument(name = "x-dead-letter-exchange", value = MqConfig.LIND_DL_EXCHANGE),
@Argument(name = "x-message-ttl", value = MqConfig.CONSUMER_EXPIRE,type="java.lang.Long"),
@Argument(name = "x-dead-letter-routing-key", value = MqConfig.LIND_DEAD_QUEUE)}),
exchange = @Exchange(value = MqConfig.EXCHANGE, durable = "true",type="topic")
)})
public void testSubscribe(Message message, Channel channel) throws IOException, InterruptedException {
}
這邊嘗試讓(rang)消(xiao)費者執行出錯,然后(hou)走到(dao)catch里(li)(li)使用basicNack方(fang)法(fa)把消(xiao)息(xi)從新放里(li)(li)隊列里(li)(li),并讓(rang)線程(cheng)讓(rang)休息(xi)2秒,以避免頻繁操作,之后(hou)就是我們希望看到(dao)的代碼
2019-12-20T17:21:31.190:Subscriber:send a message to mq
2019-12-20T17:21:33.200:Subscriber:send a message to mq
2019-12-20T17:21:35.206:Subscriber:send a message to mq
2019-12-20T17:21:37.213:Subscriber:send a message to mq
2019-12-20T17:21:39.221:Subscriber:send a message to mq
Dead Subscriber:send a message to mq
這就是一個消息隊列的補償機制,使用死信隊列也可以實現延時消息的機制,有時(shi)間再給大家(jia)分享!