Implementation of Rabbitmq delay queue (dead letter queue) in C#

By | June 22, 2020

Queue-based and message-based TTL

TTL is short for time to live, as the name implies refers to the time to live of the message. rabbitMq can set the message expiration time from two dimensions, namely the queue and the message itself.
Queue message expiration time-Per-Queue Message TTL:
Set the x-message-ttl parameter of the queue to set the message’s survival time on the specified queue. The value is a non-negative integer in microseconds. The expiration time of different queues does not affect each other, even for the same message. If the message in the queue exists in the queue for longer than the expiration time, it becomes a dead letter

Dead letter switch (DLX)

Messages in the queue will become dead letters in the following three situations

  • The message is rejected (basic.reject or basic.nack), and requeue=false
  • The expiration time of the message has expired;
  • The queue length limit has been exceeded.

When the message in the queue becomes a dead letter, if the queue is set to DLX, the message will be sent to DLX. Set the DLX through x-dead-letter-exchange, and use this x-dead-letter-routing-key to set the routing-key used for sending messages to DLX, if you do not set the default routing-key to use the message itself.

 @Bean
  public Queue lindQueue() {
    return QueueBuilder.durable(LIND_QUEUE)
        .withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//DLX
        .withArgument("x-message-ttl", makeCallExpire)
        .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)// dead letter routingKey
        .build();
  }

Implementation of delay queue (dead letter queue)

@Component
public class AmqpConfig {
  public static final String LIND_EXCHANGE = "lind.exchange";
  public static final String LIND_DL_EXCHANGE = "lind.dl.exchange";
  public static final String LIND_QUEUE = "lind.queue";
  public static final String LIND_DEAD_QUEUE = "lind.queue.dead";

  public static final String LIND_FANOUT_EXCHANGE = "lindFanoutExchange";
  /**
   * the unit is us
   */
  @Value("${tq.makecall.expire:60000}")
  private long makeCallExpire;

  /**
   * Switch for common messages
   */
  @Bean
  public TopicExchange lindExchange() {
    return (TopicExchange) ExchangeBuilder.topicExchange(LIND_EXCHANGE).durable(true)
        .build();
  }

  /**
   * DLX
   */
  @Bean
  public TopicExchange lindExchangeDl() {
    return (TopicExchange) ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true)
        .build();
  }

  /**
   * Common queue
   */
  @Bean
  public Queue lindQueue() {
    return QueueBuilder.durable(LIND_QUEUE)
        .withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)//DLX
        .withArgument("x-message-ttl", makeCallExpire)
        .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE)//DL routingKey
        .build();
  }

  /**
   * DL Queue
   */
  @Bean
  public Queue lindDelayQueue() {
    return QueueBuilder.durable(LIND_DEAD_QUEUE).build();
  }

  /**
   * Bind DL Queue
   */
  @Bean
  public Binding bindDeadBuilders() {
    return BindingBuilder.bind(lindDelayQueue())
        .to(lindExchangeDl())
        .with(LIND_DEAD_QUEUE);
  }

  /**
   * Bind common queue
   *
   * @return
   */
  @Bean
  public Binding bindBuilders() {
    return BindingBuilder.bind(lindQueue())
        .to(lindExchange())
        .with(LIND_QUEUE);
  }

  /**
   * Fanout Exchange
   *
   * @return
   */
  @Bean
  public FanoutExchange fanoutExchange() {
    return new FanoutExchange(LIND_FANOUT_EXCHANGE);
  }
}


//-----------------

@Component
public class Publisher {
  @Autowired
  private RabbitTemplate rabbitTemplate;


  public void publish(String message) {
    try {
      rabbitTemplate
          .convertAndSend(AmqpConfig.LIND_EXCHANGE, AmqpConfig.LIND_DELAY_QUEUE,
              message);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

//-----------------

@Component
@Slf4j
public class Subscriber {
  @RabbitListener(queues = AmqpConfig.LIND_QUEUE)
  public void customerSign(String data) {
    try {

      log.info("Get data from the queue :{}", data);

    } catch (Exception ex) {
          e.printStackTrace();
    }
  }
}