Getting Started with ActiveMQ 5

By | July 3, 2021

This article mainly records the configuration and writing process of ActiveMQ from installation to application to the project. You quickly configure the application following this post.

You can download ActiveMQ from its official website.

Modify the configuration

First, find the activemq.xml file in the config directory. Then, find the <policyEntries> tag, add the following configuration under the tag to configure the dead letter queue.

 <!--Dead letter queue-->
                  <policyEntry topic=">" >
                    <deadLetterStrategy>
                      <!--
                              queuePrefix:dead-letter-queue-prefix
                              useQueueForQueueMessages: a-queue-to-save-dead-letter
                      -->
                      <individualDeadLetterStrategy   queuePrefix="DLQ." useQueueForQueueMessages="true" processNonPersistent="true" />
                    </deadLetterStrategy>
                  </policyEntry>

Define Maven Dependencies

Add the following dependencies to your maven config file.

<!--activemq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<!--activemq pool-->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
</dependency>

Configure SpringBoot

Add the following configurations to the yml file.

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
    pool:
      enabled: true
      max-connections: 100

Producer Configurations

  • Config class
//MQ config
@Configuration
public class ActiveMQConfig {

//address specified in the yml file
    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

//Bean for the queue
    @Bean
    public Queue getQueue(){
        return new ActiveMQQueue("ActiveMQQueue");
    }
// Bean for the factory
    @Bean
    public ActiveMQConnectionFactory connectionFactory(){
        return new ActiveMQConnectionFactory(brokerUrl);
    }
}
  • Producer usage

To use @Scheduled(cron = “0/5 ?”) timed task annotation, you need to add @EnableScheduling annotation to the startup class to scan periodically.

@Autowired
private JmsMessagingTemplate messagingTemplate;

@Autowired
private Queue queue;

//3s interval
@Scheduled(cron = "0/3 * * * * ?")
//transaction
@Transactional(rollbackFor = Exception.class)
public void task() {
    System.out.println("cron job starting");
    //send to the queue
    messagingTemplate.convertAndSend(queue, "MingLog");
    System.out.println("added to the queue");
}

Consumer Configurations

  • Config class
//RctiveMQ consumer config
@Configuration
public class ActiveMQConfig {

    // address specified in the yml file
    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    // connect to the factory bean
    @Bean
    public ActiveMQConnectionFactory connectionFactory(RedeliveryPolicy redeliveryPolicy){
        ActiveMQConnectionFactory activeMQConnectionFactory =
        //username, password, address
                new ActiveMQConnectionFactory("admin","admin",brokerUrl);
                // define redelivery policy
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        return activeMQConnectionFactory;
    }

    //messageConsumer bean
    @Bean
    public RedeliveryPolicy getRedeliveryPolicy(){
        return new RedeliveryPolicy();
    }

    // listener bean
    @Bean
    public JmsListenerContainerFactory getJmsListenerContainerFactory(ActiveMQConnectionFactory connectionFactory){

        DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
        defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory);
defaultJmsListenerContainerFactory.setSessionAcknowledgeMode(2);
        return defaultJmsListenerContainerFactory;
    }
}
  • Consumer usage

Generally, we use message listeners to perform tasks/methods/businesses. If the message is a successful one, we should manually check the acknowledge() method. If the message consumption fails, we should perform manual rollback using the recover() method. If there is a single message that fails to be consumed five times, the message should enter the dead letter queue.

// Destination queue to be monitored, connection factory for containerFactory message monitoring
@JmsListener(destination = "ActiveMQQueue", containerFactory = "jmsListenerContainerFactory")
//TextMessage monitered
public void mqListenerEvent(TextMessage textMessage, Session session) throws JMSException {
    try {
        String text = textMessage.getText();
        System.out.println("message received:" + text);
        // your business logic here
        //consumption confirmation
        textMessage.acknowledge();
    }catch (Exception e){
        System.out.println("exception happens");
        e.getMessage();
        //rollback
        session.recover();
    }
}

Therefore, in general, we will maintain the dead letter queue separately and compensate and record logs for successfully consumed messages. Dead letter queue monitoring is similar to the above monitoring, except that the monitoring object is different.

// Listen to the dead letter queue
@JmsListener(destination = "ActiveMQ.DLQ")
public void receive2(TextMessage textMessage, Session session) throws JMSException {
    try {
        // logging
        System.out.println("dead-letter queue:"+textMessage.getText());

        textMessage.acknowledge();
   }catch (Exception e){
        System.out.println("exception happens");
        e.getMessage();
        //rollback
        session.recover();
    }     
}