中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

rabbitmq實現(xian)延(yan)時隊列(lie)(lie)(死信隊列(lie)(lie))

基于隊列和基于消息的TTL

TTL是time to live 的簡稱,顧名思義指的是消息的存活時間。rabbitMq可以從兩種維度設置消息過期時間,分別是隊列和消息本身。
隊列消息過期時間-Per-Queue Message TTL:
通過(guo)(guo)(guo)(guo)設置隊(dui)列的x-message-ttl參數來設置指定隊(dui)列上消息(xi)的存活時(shi)間(jian),其值是一個非負整(zheng)數,單位為微秒。不同隊(dui)列的過(guo)(guo)(guo)(guo)期時(shi)間(jian)互(hu)相(xiang)之間(jian)沒有影響(xiang),即使(shi)是對(dui)于同一條消息(xi)。隊(dui)列中的消息(xi)存在隊(dui)列中的時(shi)間(jian)超過(guo)(guo)(guo)(guo)過(guo)(guo)(guo)(guo)期時(shi)間(jian)則成為死(si)信。

死信交換機DLX

隊列中的消息在以下三種情況下會變成死信
(1)消息被拒絕(basic.reject 或者 basic.nack),并且requeue=false;
(2)消息的過期時間到期了;
(3)隊列長度限制超過了。
當隊(dui)列中的(de)消息(xi)(xi)成(cheng)為死信以后,如(ru)果(guo)隊(dui)列設(she)置了(le)DLX那么(me)消息(xi)(xi)會被發(fa)送到DLX。通過x-dead-letter-exchange設(she)置DLX,通過這個x-dead-letter-routing-key設(she)置消息(xi)(xi)發(fa)送到DLX所用(yong)的(de)routing-key,如(ru)果(guo)不設(she)置默認使(shi)用(yong)消息(xi)(xi)本身的(de)routing-key.

 @Bean
  public Queue lindQueue() {
    return QueueBuilder.durable(LIND_QUEUE)
        .withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//設置死信交換機
        .withArgument("x-message-ttl", makeCallExpire)
        .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//設置死信routingKey
        .build();
  }

實現的過程

graph TD publisher-->正常queue 正常queue-->TTL TTL-->dead.queue dead.queue-->subscriber

完整的代碼

@Component
public class AmqpConfig {
  /**
   * 主要測試一個死信隊列,功能主要實現延時消費,原理是先把消息發到正常隊列,
   * 正常隊列有超時時間,當達到時間后自動發到死信隊列,然后由消費者去消費死信隊列里的消息.
   */
  public static final String LIND_EXCHANGE = "lind.exchange";
  public static final String LIND_DL_EXCHANGE = "lind.dl.exchange";
  public static final String LIND_QUEUE = "lind.queue";
  public static final String LIND_DEAD_QUEUE = "lind.queue.dead";

  public static final String LIND_FANOUT_EXCHANGE = "lindFanoutExchange";
  /**
   * 單位為微秒.
   */
  @Value("${tq.makecall.expire:60000}")
  private long makeCallExpire;

  /**
   * 創建普通交換機.
   */
  @Bean
  public TopicExchange lindExchange() {
    return (TopicExchange) ExchangeBuilder.topicExchange(LIND_EXCHANGE).durable(true)
        .build();
  }

  /**
   * 創建死信交換機.
   */
  @Bean
  public TopicExchange lindExchangeDl() {
    return (TopicExchange) ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true)
        .build();
  }

  /**
   * 創建普通隊列.
   */
  @Bean
  public Queue lindQueue() {
    return QueueBuilder.durable(LIND_QUEUE)
        .withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//設置死信交換機
        .withArgument("x-message-ttl", makeCallExpire)
        .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//設置死信routingKey
        .build();
  }

  /**
   * 創建死信隊列.
   */
  @Bean
  public Queue lindDelayQueue() {
    return QueueBuilder.durable(LIND_DEAD_QUEUE).build();
  }

  /**
   * 綁定死信隊列.
   */
  @Bean
  public Binding bindDeadBuilders() {
    return BindingBuilder.bind(lindDelayQueue())
        .to(lindExchangeDl())
        .with(LIND_DEAD_QUEUE);
  }

  /**
   * 綁定普通隊列.
   *
   * @return
   */
  @Bean
  public Binding bindBuilders() {
    return BindingBuilder.bind(lindQueue())
        .to(lindExchange())
        .with(LIND_QUEUE);
  }

  /**
   * 廣播交換機.
   *
   * @return
   */
  @Bean
  public FanoutExchange fanoutExchange() {
    return new FanoutExchange(LIND_FANOUT_EXCHANGE);
  }
}


//-----------------

@Component
public class Publisher {
  @Autowired
  private RabbitTemplate rabbitTemplate;


  public void publish(String message) {
    try {
      rabbitTemplate
          .convertAndSend(AmqpConfig.LIND_EXCHANGE, AmqpConfig.LIND_DELAY_QUEUE,
              message);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

//-----------------

@Component
@Slf4j
public class Subscriber {
  @RabbitListener(queues = AmqpConfig.LIND_QUEUE)
  public void customerSign(String data) {
    try {

      log.info("從隊列拿到數據 :{}", data);

    } catch (Exception ex) {
          e.printStackTrace();
    }
  }
}

posted @ 2018-11-19 19:06  張占嶺  閱讀(9795)  評論(1)    收藏  舉報