Advanced RabbitMQ

Below are some tips on how to get the best benefits from Sping Boot and RabbitMQ

JSON Message Converter

You can use Spring Boot to create a JSON message converter

RabbitmqConfig.java
@Configuration
public class RabbitmqConfig {

	@Bean
	public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
		RabbitTemplate template = new RabbitTemplate(connectionFactory);
		template.setMessageConverter(converter);

		return template;
	}

	@Bean
	public Jackson2JsonMessageConverter converter() {
		return new Jackson2JsonMessageConverter();
	}

}
DummyProducer.java
@Service
public class DummyProducer {

    // Will use the JSON convert we have configured in RabbitmqConfig.java
    private RabbitTemplate rabbitTemplate;

    public DummyProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    // DummyMessage is a simple POJO
    public void sendDummy(DummyMessage message) {
        rabbitTemplate.convertAndSend("x.dummy", "", message);
    }
}

Prefetch

There might be time where you want to distribute the load amoung the consumers when you are using concurrency, there is a property value that you can set, below will prefetch 50 messages to each consumer, this feature is useful if you have fast producers and slow consumers. The default value of prefetch is 250 if not specified.


Prefetch example
@RabbitListener(queues = "dd.hr.accounting", concurrency = "2")
public void listen(String message) throws IOException {
    var emp = objectMapper.readValue(message, Employee.class);
    log.info("Accounting Employee is {}", emp);
}

There might be times when you want to have different prefetch values on different consumers, we can create prefetch container factory to use with our listener.

RabbitmqConfig.java
@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchOneContainerFactory(
    SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        var factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setPrefetchCount(50);

        return factory;
}
Multiple Prefetch Consumer
@RabbitListener(queues = "q.scheduler", concurrency = "2", containerFactory = "prefetchOneContainerFactory")
public void listenScheduler(DummyMessage message) throws InterruptedException {
    log.info("Listening scheduler {}", message);
}

SendTo

There are times that you might want the listener to send to another exchange/queue, you can use the @SendTo annotation

SendTo example
@RabbitHandler
@SendTo("x.invoice.cancel/")
public PaymentCancelStatus handleInvoiceCancelled(InvoiceCancelledMessage message) {
    var randomStatus = ThreadLocalRandom.current().nextBoolean();

    return new PaymentCancelStatus(randomStatus, LocalDate.now(), message.getInvoiceNumber());
}

Default Handler

You have an option to setup a default handler

Introduction
@Service
@RabbitListener(queues = "q.invoice")
public class InvoiceConsumer {

    .....
    
    @RabbitHandler(isDefault = true)
    public void handleDefault(Object message) {
        log.info("on handleDefault : {}", message);
    }
}

Using Java to Create Exchanges, Queues and Bindings

You can use Spring Boot to create exchanges and queues and also to perform binding.

Creating Exchange, Queue and Binding example
@Configuration
public class RabbitmqSchemaConfig {

	@Bean
	public FanoutExchange createFanoutExchange() {
		return new FanoutExchange("dd.another-dummy-exchange", true, false, null);
	}

	@Bean
	public Queue createQueue() {
		return new Queue("dd.another-dummy-queue");
	}

	@Bean
	public Binding createBinding() {
		return new Binding("dd.another-dummy-queue", DestinationType.QUEUE, "dd.another-dummy-exchange", "", null);
		return BindingBuilder.bind(createQueue()).to(createFanoutExchange());
	}

        // Below will perform the all the methods above in one go
	@Bean
	public Declarables createRabbitmqSchema() {
		return new Declarables(new FanoutExchange("dd.another-dummy-exchange", true, false, null), 
                new Queue("dd.another-dummy-queue"),
				new Binding("dd.another-dummy-queue", DestinationType.QUEUE, "dd.another-dummy-exchange", "", null));
	}
}

You can also configure a consumer to create the queue and bindings if one does not exist

Introduction
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dd.auto-dummy-queue", durable = "true"), 
    exchange = @Exchange(name = "dd.auto-dummy-exchange", 
    type = ExchangeTypes.DIRECT, durable = "true"), 
    key = "routing-key", 
    ignoreDeclarationExceptions = "true"))
public void listenDummy(DummyMessage message) {
    log.info("{}", message);
}