Kafka and the Micronaut Framework - Event-Driven Applications

Use Kafka to communicate between your Micronaut applications.

Authors: Burt Beckwith

Micronaut Version: 4.6.3

1. Getting Started

In this guide, we will create a Micronaut application written in Java.

In this guide, we will create two microservices that will use Kafka to communicate with each other in an asynchronous and decoupled way.

2. What you will need

To complete this guide, you will need the following:

  • Some time on your hands

  • A decent text editor or IDE

  • JDK 1.8 or greater installed with JAVA_HOME configured appropriately

  • Docker and Docker Compose installed if you will be running Kafka in Docker, and for running tests.

3. Solution

We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.

4. Writing the application

Let’s describe the microservices you will build through the guide.

  • books - It returns a list of books. It uses a domain consisting of a book name and ISBN. It also publishes a message in Kafka every time a book is accessed.

  • analytics - It connects to Kafka to update the analytics for every book (a counter). It also exposes an endpoint to get the analytics.

4.1. Books Microservice

Create the books microservice using the Micronaut Command Line Interface or with Micronaut Launch.

mn create-app --features=kafka,reactor,graalvm,serialization-jackson example.micronaut.books --build=maven --lang=java
If you don’t specify the --build argument, Gradle with the Kotlin DSL is used as the build tool.
If you don’t specify the --lang argument, Java is used as the language.
If you don’t specify the --test argument, JUnit is used for Java and Kotlin, and Spock is used for Groovy.

If you use Micronaut Launch, select Micronaut Application as application type and add the kafka, reactor, graalvm, and serialization-jackson features.

The previous command creates a directory named books and a Micronaut application inside it with default package example.micronaut.

In addition to the dependencies added by the above features, we also need a test dependency for the Awaitility library:

pom.xml
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <version>4.2.2</version>
    <scope>test</scope>
</dependency>

Create a Book POJO:

books/src/main/java/example/micronaut/Book.java
package example.micronaut;

import io.micronaut.core.annotation.Creator;
import io.micronaut.serde.annotation.Serdeable;

import java.util.Objects;

@Serdeable
public class Book {

    private final String isbn;
    private final String name;

    @Creator
    public Book(String isbn, String name) {
        this.isbn = isbn;
        this.name = name;
    }

    public String getIsbn() {
        return isbn;
    }

    public String getName() {
        return name;
    }

    @Override
    public String toString() {
        return "Book{" +
                "isbn='" + isbn + '\'' +
                ", name='" + name + '\'' +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Book other = (Book) o;
        return Objects.equals(isbn, other.isbn) &&
                Objects.equals(name, other.name);
    }

    @Override
    public int hashCode() {
        return Objects.hash(isbn, name);
    }
}

To keep this guide simple there is no database persistence - BookService keeps the list of books in memory:

books/src/main/java/example/micronaut/BookService.java
package example.micronaut;

import jakarta.annotation.PostConstruct;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

@Singleton
public class BookService {

    private final List<Book> bookStore = new ArrayList<>();

    @PostConstruct
    void init() {
        bookStore.add(new Book("1491950358", "Building Microservices"));
        bookStore.add(new Book("1680502395", "Release It!"));
        bookStore.add(new Book("0321601912", "Continuous Delivery"));
    }

    public List<Book> listAll() {
        return bookStore;
    }

    public Optional<Book> findByIsbn(String isbn) {
        return bookStore.stream()
                .filter(b -> b.getIsbn().equals(isbn))
                .findFirst();
    }
}

Create a BookController class to handle incoming HTTP requests to the books microservice:

books/src/main/java/example/micronaut/BookController.java
package example.micronaut;

import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;

import java.util.List;
import java.util.Optional;

@Controller("/books") (1)
class BookController {

    private final BookService bookService;

    BookController(BookService bookService) { (2)
        this.bookService = bookService;
    }

    @Get (3)
    List<Book> listAll() {
        return bookService.listAll();
    }

    @Get("/{isbn}") (4)
    Optional<Book> findBook(String isbn) {
        return bookService.findByIsbn(isbn);
    }
}
1 The @Controller annotation defines the class as a controller mapped to the root URI /books
2 Use constructor injection to inject a bean of type BookService.
3 The @Get annotation maps the listAll method to an HTTP GET request on /books.
4 The @Get annotation maps the findBook method to an HTTP GET request on /books/{isbn}.

4.2. Analytics Microservice

Create the analytics microservice using the Micronaut Command Line Interface or with Micronaut Launch.

mn create-app --features=kafka,graalvm,serialization-jackson example.micronaut.analytics --build=maven --lang=java
If you don’t specify the --build argument, Gradle with the Kotlin DSL is used as the build tool.
If you don’t specify the --lang argument, Java is used as the language.
If you don’t specify the --test argument, JUnit is used for Java and Kotlin, and Spock is used for Groovy.

If you use Micronaut Launch, select Micronaut Application as application type and add the kafka and graalvm features.

Create a Book POJO:

analytics/src/main/java/example/micronaut/Book.java
package example.micronaut;

import io.micronaut.core.annotation.Creator;
import io.micronaut.serde.annotation.Serdeable;

import java.util.Objects;

@Serdeable
public class Book {

    private final String isbn;
    private final String name;

    @Creator
    public Book(String isbn, String name) {
        this.isbn = isbn;
        this.name = name;
    }

    public String getIsbn() {
        return isbn;
    }

    public String getName() {
        return name;
    }

    @Override
    public String toString() {
        return "Book{" +
                "isbn='" + isbn + '\'' +
                ", name='" + name + '\'' +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Book other = (Book) o;
        return Objects.equals(isbn, other.isbn) &&
                Objects.equals(name, other.name);
    }

    @Override
    public int hashCode() {
        return Objects.hash(isbn, name);
    }
}
This Book POJO is the same as the one in the books microservice. In a real application this would be in a shared library but to keep things simple we’ll just duplicate it.

Create a BookAnalytics POJO:

analytics/src/main/java/example/micronaut/BookAnalytics.java
package example.micronaut;

import io.micronaut.core.annotation.Creator;
import io.micronaut.serde.annotation.Serdeable;

@Serdeable
public class BookAnalytics {

    private final String bookIsbn;
    private final long count;

    @Creator
    public BookAnalytics(String bookIsbn, long count) {
        this.bookIsbn = bookIsbn;
        this.count = count;
    }

    public String getBookIsbn() {
        return bookIsbn;
    }

    public long getCount() {
        return count;
    }
}

To keep this guide simple there is no database persistence - AnalyticsService keeps book analytics in memory:

analytics/src/main/java/example/micronaut/AnalyticsService.java
package example.micronaut;

import jakarta.inject.Singleton;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Singleton
public class AnalyticsService {

    private final Map<Book, Long> bookAnalytics = new ConcurrentHashMap<>(); (1)

    public void updateBookAnalytics(Book book) { (2)
        bookAnalytics.compute(book, (k, v) -> {
            return v == null ? 1L : v + 1;
        });
    }

    public List<BookAnalytics> listAnalytics() { (3)
        return bookAnalytics
                .entrySet()
                .stream()
                .map(e -> new BookAnalytics(e.getKey().getIsbn(), e.getValue()))
                .collect(Collectors.toList());
    }
}
1 Keep the book analytics in memory
2 Initialize and update the analytics for the book passed as parameter
3 Return all the analytics

Write a test for AnalyticsService:

analytics/src/test/java/example/micronaut/AnalyticsServiceTest.java
package example.micronaut;

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;

import jakarta.inject.Inject;
import java.util.List;

@MicronautTest
class AnalyticsServiceTest {

    @Inject
    AnalyticsService analyticsService;

    @Test
    void testUpdateBookAnalyticsAndGetAnalytics() {
        Book b1 = new Book("1491950358", "Building Microservices");
        Book b2 = new Book("1680502395", "Release It!");

        analyticsService.updateBookAnalytics(b1);
        analyticsService.updateBookAnalytics(b1);
        analyticsService.updateBookAnalytics(b1);
        analyticsService.updateBookAnalytics(b2);

        List<BookAnalytics> analytics = analyticsService.listAnalytics();
        assertEquals(2, analytics.size());

        assertEquals(3, findBookAnalytics(b1, analytics).getCount());
        assertEquals(1, findBookAnalytics(b2, analytics).getCount());
    }

    private BookAnalytics findBookAnalytics(Book b, List<BookAnalytics> analytics) {
        return analytics
                .stream()
                .filter(bookAnalytics -> bookAnalytics.getBookIsbn().equals(b.getIsbn()))
                .findFirst()
                .orElseThrow(() -> new RuntimeException("Book not found"));
    }
}

Create a Controller to expose the analytics:

analytics/src/main/java/example/micronaut/AnalyticsController.java
package example.micronaut;

import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;

import java.util.List;

@Controller("/analytics")
class AnalyticsController {

    private final AnalyticsService analyticsService;

    AnalyticsController(AnalyticsService analyticsService) {
        this.analyticsService = analyticsService;
    }

    @Get (1)
    List<BookAnalytics> listAnalytics() {
        return analyticsService.listAnalytics();
    }
}
1 Just expose the analytics

The application doesn’t expose the method updateBookAnalytics created in AnalyticsService. This method will be invoked when reading messages from Kafka.

To run the tests:

analytics
./mvnw test

Modify the Application class to use dev as a default environment:

The Micronaut framework supports the concept of one or many default environments. A default environment is one that is only applied if no other environments are explicitly specified or deduced.

analytics/src/main/java/example/micronaut/Application.java
package example.micronaut;

import io.micronaut.runtime.Micronaut;

import static io.micronaut.context.env.Environment.DEVELOPMENT;

public class Application {

    public static void main(String[] args) {
        Micronaut.build(args)
                .mainClass(Application.class)
                .defaultEnvironments(DEVELOPMENT)
                .start();
    }
}

Create src/main/resources/application-dev.yml. The Micronaut framework applies this configuration file only for the dev environment.

analytics/src/main/resources/application-dev.yml
micronaut:
  server:
    port: 8081 (1)
1 Start the analytics microservice on port 8081

5. Running the application

Start the books microservice:

books
./mvnw mn:run
16:35:55.614 [main] INFO  io.micronaut.runtime.Micronaut - Startup completed in 576ms. Server Running: http://localhost:8080

Start the analytics microservice:

analytics
./mvnw mn:run
16:35:55.614 [main] INFO  io.micronaut.runtime.Micronaut - Startup completed in 623ms. Server Running: http://localhost:8081

You can use curl to test the application:

curl http://localhost:8080/books
[{"isbn":"1491950358","name":"Building Microservices"},{"isbn":"1680502395","name":"Release It!"},{"isbn":"0321601912","name":"Continuous Delivery"}]
curl http://localhost:8080/books/1491950358
{"isbn":"1491950358","name":"Building Microservices"}
curl http://localhost:8081/analytics
[]

Note that getting the analytics returns an empty list because the applications are not communicating with each other (yet).

6. Test Resources

When the application is started locally — either under test or by running the application — resolution of the property kafka.bootstrap.servers is detected and the Test Resources service will start a local Kafka docker container, and inject the properties required to use this as the broker.

When running under production, you should replace this property with the location of your production Kafka instance via an environment variable.

KAFKA_BOOTSTRAP_SERVERS=production-server:9092

For more information, see the Kafka section of the Test Resources documentation.

7. Kafka and the Micronaut Framework

7.1. Install Kafka

A fast way to start using Kafka is via Docker. Create this docker-compose.yml file:

docker/docker-compose.yml
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    ports:
      - 2181:2181 (1)
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: confluentinc/cp-kafka
    depends_on:
      - zookeeper
    ports:
      - 9092:9092 (2)
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
1 Zookeeper uses port 2181 by default, but you can change the value if necessary.
2 Kafka uses port 9092 by default, but you can change the value if necessary.

Start Zookeeper and Kafka (use CTRL-C to stop both):

docker-compose up

7.2. Books Microservice

The generated code will use the Test Resources plugin to start a local Kafka broker inside Docker, and configure the connection URL.

7.2.1. Create Kafka client (producer)

Let’s create an interface to send messages to Kafka. The Micronaut framework will implement the interface at compilation time:

books/src/main/java/example/micronaut/AnalyticsClient.java
package example.micronaut;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

@KafkaClient
public interface AnalyticsClient {

    @Topic("analytics") (1)
    Mono<Book> updateAnalytics(Book book); (2)
}
1 Set the topic name
2 Send the Book POJO. The Framework will automatically convert it to JSON before sending it

7.2.2. Create Tests

We could use mocks to test the message sending logic between BookController, AnalyticsFilter, and AnalyticsClient, but it’s more realistic to use a running Kafka broker this is why Test Resources are used to run Kafka inside a Docker container.

Write a test for BookController to verify the interaction with AnalyticsService:

books/src/test/java/example/micronaut/BookControllerTest.java
package example.micronaut;

import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.client.HttpClient;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import jakarta.inject.Inject;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;

@MicronautTest
@TestInstance(PER_CLASS) (1)
class BookControllerTest {

    private static final Collection<Book> received = new ConcurrentLinkedDeque<>();

    @Inject
    AnalyticsListener analyticsListener; (2)

    @Inject
    @Client("/")
    HttpClient client; (3)

    @Test
    void testMessageIsPublishedToKafkaWhenBookFound() {
        String isbn = "1491950358";

        Optional<Book> result = retrieveGet("/books/" + isbn); (4)
        assertNotNull(result);
        assertTrue(result.isPresent());
        assertEquals(isbn, result.get().getIsbn());

        await().atMost(5, SECONDS).until(() -> !received.isEmpty()); (5)

        assertEquals(1, received.size()); (6)
        Book bookFromKafka = received.iterator().next();
        assertNotNull(bookFromKafka);
        assertEquals(isbn, bookFromKafka.getIsbn());
    }

    @Test
    void testMessageIsNotPublishedToKafkaWhenBookNotFound() throws Exception {
        assertThrows(HttpClientResponseException.class, () -> {
            retrieveGet("/books/INVALID");
        });

        Thread.sleep(5_000); (7)
        assertEquals(0, received.size());
    }

    @AfterEach
    void cleanup() {
        received.clear();
    }

    @KafkaListener(offsetReset = EARLIEST)
    static class AnalyticsListener {

        @Topic("analytics")
        void updateAnalytics(Book book) {
            received.add(book);
        }
    }

    private Optional<Book> retrieveGet(String url) {
        return client.toBlocking().retrieve(HttpRequest.GET(url), Argument.of(Optional.class, Book.class));
    }
}
1 Classes that implement TestPropertyProvider must use this annotation to create a single class instance for all tests (not necessary in Spock tests).
2 Dependency injection for the AnalyticsListener class declared below, a Kafka listener class that replicates the functionality of the class of the same name in the analytics microservice
3 Dependency injection for an HTTP client that the Micronaut framework will implement at compile to make calls to BookController
4 Use the HttpClient to retrieve a Book, which will trigger sending a message with Kafka
5 Wait a few seconds for the message to arrive; it should happen very quickly, but the message will be sent on a separate thread
6 Verify that the message was received and has the correct data
7 Wait a few seconds to make sure no message is sent

7.2.3. Send Analytics information automatically

Sending a message to Kafka is as simple as injecting AnalyticsClient and calling the updateAnalytics method. The goal is to do it automatically every time a book is returned, i.e., every time there is a call to http://localhost:8080/books/{isbn}. To achieve this we will create an Http Server Filter. Create the AnalyticsFilter class:

books/src/main/java/example/micronaut/AnalyticsFilter.java
package example.micronaut;

import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpResponse;
import io.micronaut.http.annotation.Filter;
import io.micronaut.http.filter.HttpServerFilter;
import io.micronaut.http.filter.ServerFilterChain;
import reactor.core.publisher.Flux;
import org.reactivestreams.Publisher;

import java.util.Optional;

@Filter("/books/?*") (1)
public class AnalyticsFilter implements HttpServerFilter { (2)

    private final AnalyticsClient analyticsClient;

    public AnalyticsFilter(AnalyticsClient analyticsClient) { (3)
        this.analyticsClient = analyticsClient;
    }

    @Override
    public Publisher<MutableHttpResponse<?>> doFilter(HttpRequest<?> request,
                                                      ServerFilterChain chain) { (4)
        return Flux
                .from(chain.proceed(request)) (5)
                .flatMap(response -> {
                    Book book = response.getBody(Book.class).orElse(null); (6)
                    if (book == null) {
                        return Flux.just(response);
                    }
                    return Flux.from(analyticsClient.updateAnalytics(book)).map(b -> response); (7)
                });
    }
}
1 Annotate the class with @Filter and define the Ant-style matcher pattern to intercept all calls to the desired URIs
2 The class must implement HttpServerFilter
3 Dependency injection for the Kafka AnalyticsClient
4 Implement the doFilter method
5 Execute the request; this will invoke the controller action
6 Get the response from the controller and return the body as a Book
7 If the book is found, use the Kafka client to send a message

7.3. Analytics Microservice

7.3.1. Create Kafka consumer

Create a new class to act as a consumer of the messages sent to Kafka by the books microservice. The Micronaut framework will implement logic to invoke the consumer at compile time. Create the AnalyticsListener class:

analytics/src/main/java/example/micronaut/AnalyticsListener.java
package example.micronaut;

import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;

@Requires(notEnv = Environment.TEST) (1)
@KafkaListener (2)
public class AnalyticsListener {

    private final AnalyticsService analyticsService; (3)

    public AnalyticsListener(AnalyticsService analyticsService) { (3)
        this.analyticsService = analyticsService;
    }

    @Topic("analytics") (4)
    public void updateAnalytics(Book book) {
        analyticsService.updateBookAnalytics(book); (5)
    }
}
1 Do not load this bean for the test environment - this lets us run the tests without having Kafka running
2 Annotate the class with @KafkaListener to indicate that this bean will consume messages from Kafka
3 Constructor injection for AnalyticsService
4 Annotate the method with @Topic and specify the topic name to use
5 Call AnalyticsService to update the analytics for the book

7.4. Running the application

Start the books microservice:

books
./mvnw mn:run
16:35:55.614 [main] INFO  io.micronaut.runtime.Micronaut - Startup completed in 576ms. Server Running: http://localhost:8080

Execute a curl request to get one book:

curl http://localhost:8080/books/1491950358
{"isbn":"1491950358","name":"Building Microservices"}

Start the analytics microservice:

analytics
./mvnw mn:run
16:35:55.614 [main] INFO  io.micronaut.runtime.Micronaut - Startup completed in 623ms. Server Running: http://localhost:8081

The application will consume and process the message automatically after startup.

Now, use curl to see the analytics:

curl http://localhost:8081/analytics
[{"bookIsbn":"1491950358","count":1}]

Update the curl command to the books microservice to retrieve other books and repeat the invocations, then re-run the curl command to the analytics microservice to see that the counts increase.

8. Generate Micronaut Application Native Executables with GraalVM

We will use GraalVM, an advanced JDK with ahead-of-time Native Image compilation, to generate a native executable of this Micronaut application.

Compiling Micronaut applications ahead of time with GraalVM significantly improves startup time and reduces the memory footprint of JVM-based applications.

Only Java and Kotlin projects support using GraalVM’s native-image tool. Groovy relies heavily on reflection, which is only partially supported by GraalVM.

8.1. Native Executable Generation

The easiest way to install GraalVM on Linux or Mac is to use SDKMan.io.

Java 21
sdk install java 21.0.5-graal

For installation on Windows, or for a manual installation on Linux or Mac, see the GraalVM Getting Started documentation.

The previous command installs Oracle GraalVM, which is free to use in production and free to redistribute, at no cost, under the GraalVM Free Terms and Conditions.

Alternatively, you can use the GraalVM Community Edition:

Java 21
sdk install java 21.0.2-graalce

To generate native executables for each application using Maven, run:

./mvnw package -Dpackaging=native-image

The native executable is created in the target directory and can be run with target/micronautguide.

It is possible to customize the name of the native executable or pass additional build arguments using the Maven plugin for GraalVM Native Image building. Declare the plugin as following:

pom.xml
<plugin>
    <groupId>org.graalvm.buildtools</groupId>
    <artifactId>native-maven-plugin</artifactId>
    <version>0.10.3</version>
    <configuration>
        <!-- <1> -->
        <imageName>mn-graalvm-application</imageName> (1)
        <buildArgs>
              <!-- <2> -->
          <buildArg>-Ob</buildArg>
        </buildArgs>
    </configuration>
</plugin>
1 The native executable name will now be mn-graalvm-application.
2 It is possible to pass extra build arguments to native-image. For example, -Ob enables the quick build mode.

Start the native executables for the two microservices and run the same curl request as before to check that everything works with GraalVM.

9. Next Steps

Read more about Kafka support in Micronaut framework.

Read more about Test Resources in Micronaut.

10. License

All guides are released with an Apache license 2.0 license for the code and a Creative Commons Attribution 4.0 license for the writing and media (images…​).