Below are some tips on how to get the best benefits from Sping Boot and RabbitMQ
You can use Spring Boot to create a JSON message converter
RabbitmqConfig.java | @Configuration public class RabbitmqConfig { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(converter); return template; } @Bean public Jackson2JsonMessageConverter converter() { return new Jackson2JsonMessageConverter(); } } |
DummyProducer.java | @Service public class DummyProducer { // Will use the JSON convert we have configured in RabbitmqConfig.java private RabbitTemplate rabbitTemplate; public DummyProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } // DummyMessage is a simple POJO public void sendDummy(DummyMessage message) { rabbitTemplate.convertAndSend("x.dummy", "", message); } } |
There might be time where you want to distribute the load amoung the consumers when you are using concurrency, there is a property value that you can set, below will prefetch 50 messages to each consumer, this feature is useful if you have fast producers and slow consumers. The default value of prefetch is 250 if not specified.
Prefetch example | @RabbitListener(queues = "dd.hr.accounting", concurrency = "2") public void listen(String message) throws IOException { var emp = objectMapper.readValue(message, Employee.class); log.info("Accounting Employee is {}", emp); } |
There might be times when you want to have different prefetch values on different consumers, we can create prefetch container factory to use with our listener.
RabbitmqConfig.java | @Bean public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchOneContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { var factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); factory.setPrefetchCount(50); return factory; } |
Multiple Prefetch Consumer | @RabbitListener(queues = "q.scheduler", concurrency = "2", containerFactory = "prefetchOneContainerFactory") public void listenScheduler(DummyMessage message) throws InterruptedException { log.info("Listening scheduler {}", message); } |
There are times that you might want the listener to send to another exchange/queue, you can use the @SendTo annotation
SendTo example | @RabbitHandler @SendTo("x.invoice.cancel/") public PaymentCancelStatus handleInvoiceCancelled(InvoiceCancelledMessage message) { var randomStatus = ThreadLocalRandom.current().nextBoolean(); return new PaymentCancelStatus(randomStatus, LocalDate.now(), message.getInvoiceNumber()); } |
You have an option to setup a default handler
Introduction | @Service @RabbitListener(queues = "q.invoice") public class InvoiceConsumer { ..... @RabbitHandler(isDefault = true) public void handleDefault(Object message) { log.info("on handleDefault : {}", message); } } |
Using Java to Create Exchanges, Queues and Bindings
You can use Spring Boot to create exchanges and queues and also to perform binding.
Creating Exchange, Queue and Binding example | @Configuration public class RabbitmqSchemaConfig { @Bean public FanoutExchange createFanoutExchange() { return new FanoutExchange("dd.another-dummy-exchange", true, false, null); } @Bean public Queue createQueue() { return new Queue("dd.another-dummy-queue"); } @Bean public Binding createBinding() { return new Binding("dd.another-dummy-queue", DestinationType.QUEUE, "dd.another-dummy-exchange", "", null); return BindingBuilder.bind(createQueue()).to(createFanoutExchange()); } // Below will perform the all the methods above in one go @Bean public Declarables createRabbitmqSchema() { return new Declarables(new FanoutExchange("dd.another-dummy-exchange", true, false, null), new Queue("dd.another-dummy-queue"), new Binding("dd.another-dummy-queue", DestinationType.QUEUE, "dd.another-dummy-exchange", "", null)); } } |
You can also configure a consumer to create the queue and bindings if one does not exist
Introduction | @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "dd.auto-dummy-queue", durable = "true"), exchange = @Exchange(name = "dd.auto-dummy-exchange", type = ExchangeTypes.DIRECT, durable = "true"), key = "routing-key", ignoreDeclarationExceptions = "true")) public void listenDummy(DummyMessage message) { log.info("{}", message); } |