RabbitMQ and Micronaut - Event driven applications

Use RabbitMQ to communicate your Micronaut apps.

Authors: Iván López

Micronaut Version: 1.1.0.M1

1 Getting Started

In this guide, we are going to create two microservices that will use RabbitMQ to communicate each other in an asynchronous and decoupled way.

RabbitMQ is an open-source message-broker software that originally implemented the Advanced Message Queuing Protocol (AMQP) and has since been extended with a plug-in architecture to support Streaming Text Oriented Messaging Protocol (STOMP), Message Queuing Telemetry Transport (MQTT), and other protocols.

1.1 What you will need

To complete this guide, you will need the following:

  • Some time on your hands

  • A decent text editor or IDE

  • JDK 1.8 or greater installed with JAVA_HOME configured appropriately

1.2 Solution

We recommend you to follow the instructions in the next sections and create the app step by step. However, you can go right to the completed example.

or

Then, cd into the complete folder which you will find in the root project of the downloaded/cloned project.

2 Writing the App

Lets describe the microservices you are going to build through the tutorial.

  • books - It returns a list of books. It uses a domain consisting of a book name and isbn. It also publish a message in RabbitMQ every time a book is accessed.

  • analytics - It connects to RabbitMQ to update the analytics for every book (a counter). It also exposes an endpoint to get the analytics.

If you are using Java or Kotlin and IntelliJ IDEA make sure you have enabled annotation processing.

annotationprocessorsintellij

2.1 Books Microservice

Create the books microservice:

mn create-app example.micronaut.books.books

The previous command creates a folder named books and a Micronaut app inside it with default package: example.micronaut.books.

Create a BookController class to handle incoming HTTP requests into the books microservice:

books/src/main/java/example/micronaut/books/BookController.java
package example.micronaut.books;

import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;

import java.util.List;
import java.util.Optional;

@Controller("/books") (1)
public class BookController {

    private final BookService bookService; (2)

    public BookController(BookService bookService) {
        this.bookService = bookService;
    }

    @Get("/") (3)
    public List<Book> listAll() {
        return bookService.listAll();
    }

    @Get("/{isbn}") (4)
    Optional<Book> findBook(String isbn) {
        return bookService.findByIsbn(isbn);
    }
}
1 The class is defined as a controller with the @Controller annotation mapped to the path /books
2 Inject BookService using constructor injection.
3 The @Get annotation is used to map the listAll method to an HTTP GET request on /books.
4 The @Get annotation is used to map the findBook method to an HTTP GET request on /books/{isbn}.

The previous controller responds a List<Book>. Create the Book POJO:

books/src/main/java/example/micronaut/books/Book.java
package example.micronaut.books;

import java.util.Objects;

public class Book {
    private String isbn;
    private String name;

    public Book() {
    }

    public Book(String isbn, String name) {
        this.isbn = isbn;
        this.name = name;
    }

    public String getIsbn() {
        return isbn;
    }

    public void setIsbn(String isbn) {
        this.isbn = isbn;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Book{" +
                "isbn='" + isbn + '\'' +
                ", name='" + name + '\'' +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Book book = (Book) o;
        return Objects.equals(isbn, book.isbn) &&
                Objects.equals(name, book.name);
    }

    @Override
    public int hashCode() {
        return Objects.hash(isbn, name);
    }
}

To keep this guide simple there is no database persistence and the list of books is kept in memory in BookService.java:

books/src/main/java/example/micronaut/books/BookService.java
package example.micronaut.books;

import io.micronaut.context.annotation.Context;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

@Context
public class BookService {

    private static final List<Book> bookStore = new ArrayList<>();

    @PostConstruct
    void init() {
        bookStore.add(new Book("1491950358", "Building Microservices"));
        bookStore.add(new Book("1680502395", "Release It!"));
        bookStore.add(new Book("0321601912", "Continuous Delivery"));
    }

    public List<Book> listAll() {
        return bookStore;
    }

    public Optional<Book> findByIsbn(String isbn) {
        return bookStore.stream()
                .filter(b -> b.getIsbn().equals(isbn))
                .findFirst();
    }
}

2.2 Analytics Microservice

Create the analytics microservice:

mn create-app example.micronaut.analytics.analytics

To keep this guide simple there is no database persistence and the books analytics is kept in memory in AnalyticsService.java

analytics/src/main/java/example/micronaut/analytics/AnalyticsService.java
package example.micronaut.analytics;

import javax.inject.Singleton;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Singleton
public class AnalyticsService {

    private final Map<Book, Long> bookAnalytics = new ConcurrentHashMap<>(); (1)

    public void updateBookAnalytics(Book book) { (2)
        bookAnalytics.compute(book, (b, v) -> {
            if (v == null) {
                return 1L;
            } else {
                return v + 1;
            }
        });
    }

    public List<BookAnalytics> listAnalytics() { (3)
        return bookAnalytics
                .entrySet()
                .stream()
                .map(e -> new BookAnalytics(e.getKey().getIsbn(), e.getValue()))
                .collect(Collectors.toList());
    }
}
1 Keep the books analytics in memory.
2 Initialize and update the analytics for the book passed as parameter.
3 Return all the analytics.

The previous service responds a List<BookAnalytics>. Create the BookAnalytics POJO:

analytics/src/main/java/example/micronaut/analytics/BookAnalytics.java
package example.micronaut.analytics;

public class BookAnalytics {

    private String bookIsbn;
    private Long count;

    public BookAnalytics() {
    }

    public BookAnalytics(String bookIsbn, Long count) {
        this.bookIsbn = bookIsbn;
        this.count = count;
    }

    public String getBookIsbn() {
        return bookIsbn;
    }

    public void setBookIsbn(String bookIsbn) {
        this.bookIsbn = bookIsbn;
    }

    public Long getCount() {
        return count;
    }

    public void setCount(Long count) {
        this.count = count;
    }
}

Write a test:

analytics/src/test/java/example/micronaut/analytics/AnalyticsServiceTest.java
package example.micronaut.analytics;

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.micronaut.test.annotation.MicronautTest;
import org.junit.jupiter.api.Test;

import javax.inject.Inject;
import java.util.List;

@MicronautTest (1)
public class AnalyticsServiceTest {

    @Inject (2)
    AnalyticsService analyticsService;

    @Test
    void testUpdateBookAnalyticsAndGetAnalytics() {
        Book b1 = new Book("1491950358", "Building Microservices");
        Book b2 = new Book("1680502395", "Release It!");

        analyticsService.updateBookAnalytics(b1);
        analyticsService.updateBookAnalytics(b1);
        analyticsService.updateBookAnalytics(b1);
        analyticsService.updateBookAnalytics(b2);

        List<BookAnalytics> analytics = analyticsService.listAnalytics();
        assertEquals(2, analytics.size());

        assertEquals(3, findBookAnalytics(b1, analytics).getCount().intValue());
        assertEquals(1, findBookAnalytics(b2, analytics).getCount().intValue());
    }

    private BookAnalytics findBookAnalytics(Book b, List<BookAnalytics> analytics) {
        return analytics
                .stream()
                .filter(bookAnalytics -> bookAnalytics.getBookIsbn().equals(b.getIsbn()))
                .findFirst()
                .orElseThrow(() -> new RuntimeException("Book not found"));
    }
}
1 Starting with Micronaut 1.1.0 micronaut-test-junit5 is added automatically to build.gradle (or pom.xml) when creating an application with the CLI. For more information take a look at the documentation.
2 Just inject the collaborator and @MicronautTest will take care of everything.

Create a Controller to expose the analytics:

analytics/src/main/java/example/micronaut/analytics/AnalyticsController.java
package example.micronaut.analytics;

import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;

import java.util.List;

@Controller("/analytics")
public class AnalyticsController {

    private final AnalyticsService analyticsService;

    public AnalyticsController(AnalyticsService analyticsService) {
        this.analyticsService = analyticsService;
    }

    @Get("/") (1)
    public List<BookAnalytics> listAnalytics() {
        return analyticsService.listAnalytics();
    }
}
1 Just expose the analytics.

The application doesn’t expose the method updateBookAnalytics created in AnalyticsService. This method will be invoked when reading messages from RabbitMQ.

Run the tests:

analytics $ ./gradlew test

> Task :compileJava
Note: Creating bean classes for 4 type elements

> Task :processResources
> Task :classes

> Task :compileTestJava
Note: Creating bean classes for 1 type elements

> Task :processTestResources NO-SOURCE
> Task :testClasses

> Task :test

BUILD SUCCESSFUL in 10s
4 actionable tasks: 4 executed

Finally edit application.yml to run the application on a different port that books microservice.

analytics/src/main/resources/application.yml
micronaut:
    application:
        name: analytics
    server:
        port: 8081 (1)
1 Start the Micronaut microservice on port 8081.

3 Running the app

Run books microservice:

books $ ./gradlew run

17:54:17.497 [main] INFO  io.micronaut.runtime.Micronaut - Startup completed in 1261ms. Server Running: http://localhost:8080

Run analytics microservice:

analytics $ ./gradlew run

17:58:57.804 [main] INFO  io.micronaut.runtime.Micronaut - Startup completed in 1130ms. Server Running: http://localhost:8081

You can run a curl command to test the application:

$ curl http://localhost:8080/books
[{"isbn":"1491950358","name":"Building Microservices"},{"isbn":"1680502395","name":"Release It!"},{"isbn":"0321601912","name":"Continuous Delivery"}]

$ curl http://localhost:8080/books/1491950358
{"isbn":"1491950358","name":"Building Microservices"}

$ curl http://localhost:8081/analytics
[]

Please note that getting the analytics returns an empty list because the applications are not communicating to each other (yet).

4 RabbitMQ and Micronaut

4.1 Install RabbitMQ via Docker

The fastest way to start using RabbitMQ is via Docker:

docker run --rm -it \
        -p 5672:5672 \
        -p 15672:15672 \
        rabbitmq:3.7.11-management

4.2 Books Microservice

Modify build.gradle to add rabbitmq dependency.

books/build.gradle
dependencies {
    ...
    ..
    .
    compile 'io.micronaut.configuration:micronaut-rabbitmq:1.1.0.M2'
}

By default Micronaut will connect to a RabbitMQ instance running on localhost so it is not necessary to add anything to application.yml. In case you want to change the configuration, add the following:

books/src/main/resources/application.yml
rabbitmq:
    uri: amqp://localhost:5672

Create RabbitMQ exchange, queue and binding

Before being able to send and receive messages using RabbitMQ it is necessary to define the exchange, queue and binding. One option is create them directly in the RabbitMQ Admin UI available on http://localhost:15672. Use guest for both username and password.

Another option is create them programatically with Micronaut. Create the class ChannelPoolListener.java:

books/src/main/java/example/micronaut/books/ChannelPoolListener.java
package example.micronaut.books;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import io.micronaut.configuration.rabbitmq.connect.ChannelInitializer;

import javax.inject.Singleton;
import java.io.IOException;

@Singleton
public class ChannelPoolListener extends ChannelInitializer {

    @Override
    public void initialize(Channel channel) throws IOException {
        channel.exchangeDeclare("micronaut", BuiltinExchangeType.DIRECT, true); (1)
        channel.queueDeclare("analytics", true, false, false, null); (2)
        channel.queueBind("analytics", "micronaut", "analytics"); (3)
    }
}
1 Define an exchange named micronaut. From the producer point of view everything is sent to the exchange with the appropriate routing key
2 Define a queue named analytics. The consumer will listen for messages in that queue.
3 Define a binding between the exchange and the queue using the routing key analytics.

Create RabbitMQ client (producer)

Let’s create an interface to send messages to RabbitMQ. Micronaut will implement the interface at compilation time:

books/src/main/java/example/micronaut/books/AnalyticsClient.java
package example.micronaut.books;

import io.micronaut.configuration.rabbitmq.annotation.Binding;
import io.micronaut.configuration.rabbitmq.annotation.RabbitClient;

@RabbitClient("micronaut") (1)
public interface AnalyticsClient {

    @Binding("analytics") (2)
    void updateAnalytics(Book book); (3)
}
1 Set the exchange used to send the messages.
2 Set the routing key.
3 Send the Book POJO. Micronaut will automatically convert it to JSON before sending it.

Send Analytics information automatically

Sending a message to RabbitMQ is as simple as injecting AnalyticsClient and calling updateAnalytics method. The goal is to do it automatically every time a book is returned, i.e., every time there is a call to http://localhost:8080/books/{isbn}. To achieve this we are going to create an Http Server Filter. Create the file AnalyticsFilter.java:

books/src/main/java/example/micronaut/books/AnalyticsFilter.java
package example.micronaut.books;

import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpResponse;
import io.micronaut.http.annotation.Filter;
import io.micronaut.http.filter.HttpServerFilter;
import io.micronaut.http.filter.ServerFilterChain;
import io.reactivex.Flowable;
import org.reactivestreams.Publisher;

import java.util.Optional;

@Filter("/books/?*") (1)
public class AnalyticsFilter implements HttpServerFilter { (2)

    private final AnalyticsClient analyticsClient; (3)

    public AnalyticsFilter(AnalyticsClient analyticsClient) { (3)
        this.analyticsClient = analyticsClient;
    }

    @Override
    public Publisher<MutableHttpResponse<?>> doFilter(HttpRequest<?> request, ServerFilterChain chain) { (4)
        return Flowable
                .fromPublisher(chain.proceed(request)) (5)
                .flatMap(response ->
                        Flowable.fromCallable(() -> {
                            Optional<Book> book = response.getBody(Book.class); (6)
                            book.ifPresent(analyticsClient::updateAnalytics); (7)

                            return response;
                        })
                );
    }
}
1 Annotate the class with @Filter and define the ANT Matcher pattern to intercept all the calls to the desire URI.
2 The class needs to implement HttpServerFilter.
3 Constructor injection for RabbitMQ AnalyticsClient.
4 Override doFilter method.
5 Execute the request. This will call the controller action.
6 Get the response from the controller and return the body as a Book.
7 If the book is found, use RabbitMQ client to send a message.

4.3 Analytics Microservice

Modify build.gradle to add rabbitmq dependency.

analytics/build.gradle
dependencies {
    ...
    ..
    .
    compile 'io.micronaut.configuration:micronaut-rabbitmq:1.1.0.M2'
}

Create RabbitMQ exchange, queue and binding

As we already did in Books Microservice, let’s create the class ChannelPoolListener.java to define the exchange, queue and binding:

analytics/src/main/java/example/micronaut/analytics/ChannelPoolListener.java
package example.micronaut.analytics;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import io.micronaut.configuration.rabbitmq.connect.ChannelInitializer;

import javax.inject.Singleton;
import java.io.IOException;

@Singleton
public class ChannelPoolListener extends ChannelInitializer {

    @Override
    public void initialize(Channel channel) throws IOException {
        channel.exchangeDeclare("micronaut", BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare("analytics", true, false, false, null);
        channel.queueBind("analytics", "micronaut", "analytics");
    }
}
Instead of copy-paste the class in every project it would be better to create a new Gradle (or Maven) module and share it among all the microservices.

Create RabbitMQ consumer

Create a new class to act as a consumer of the messages sent to RabbitMQ by the Books Microservice. Micronaut will implement the consume at compile time. Create AnalyticsListener.java

analytics/src/main/java/example/micronaut/analytics/AnalyticsListener.java
package example.micronaut.analytics;

import io.micronaut.configuration.rabbitmq.annotation.Queue;
import io.micronaut.configuration.rabbitmq.annotation.RabbitListener;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;

@Requires(notEnv = Environment.TEST) (1)
@RabbitListener (2)
public class AnalyticsListener {

    private final AnalyticsService analyticsService; (3)

    public AnalyticsListener(AnalyticsService analyticsService) { (3)
        this.analyticsService = analyticsService;
    }

    @Queue("analytics") (4)
    public void updateAnalytics(Book book) {
        analyticsService.updateBookAnalytics(book); (5)
    }
}
1 Do not load this bean for the test environment. This enable us to run the tests without having a RabbitMQ instance running.
2 Annotate the class with @RabbitListener to indicate that this bean will consume messages from RabbitMQ.
3 Constructor injection for AnalyticsService.
4 Annotate the method with @Queue. This listener will listen to messages in analytics queue.
5 Call the previously created method to update the analytics for the book.

4.4 Running the App

Run books microservice:

books $ ./gradlew run

18:19:02.206 [main] INFO  io.micronaut.runtime.Micronaut - Startup completed in 1338ms. Server Running: http://localhost:8080

Execute a curl request to get one book:

$ curl http://localhost:8080/books/1491950358
{"isbn":"1491950358","name":"Building Microservices"}

Open RabbitMQ Admin UI on http://localhost:15672 and use guest for both username and password. Select queues and analytics queue. You can see that there is a message in the queue.

rabbitmq message

Expand the "Get messages" option and get one message. You can see all the information: exchange, routing key and the payload serialized to json:

rabbitmq message detail

Run analytics microservice:

analytics $ ./gradlew run

18:24:45.810 [main] INFO  io.micronaut.runtime.Micronaut - Startup completed in 1307ms. Server Running: http://localhost:8081

The application will consume and process the message automatically after the startup. Go to RabbitMQ Admin UI and check that the message has been consumed:

rabbitmq message consumed

Now, run a curl to get the analytics:

$ curl http://localhost:8081/analytics
[{"bookIsbn":"1491950358","count":1}]

5 Next Steps

Read more about RabbitMQ support inside Micronaut.