Testing Kafka Listener using Testcontainers with the Micronaut Framework

This guide shows how to test a Kafka Listener using Testcontainers in a Micronaut Framework application.

Authors: Sergio del Amo

Micronaut Version: 4.4.2

In this guide you will learn how to

1. What you will need

To complete this guide, you will need the following:

2. Solution

We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.

3. What we are going to achieve in this guide

We are going to create a Micronaut project with Kafka, Micronaut Data JPA and MySQL, where we implement a Kafka Listener which receives an event payload and persists the event data in the database. Then we will test this Kafka Listener using the Testcontainers Kafka and MySQL modules in conjunction with Awaitility.

The generated application has Awaitility library as test dependency which we can use for asserting the expectations of an asynchronous process flow.

4. Writing the Application

Create an application using the Micronaut Command Line Interface or with Micronaut Launch.

mn create-app example.micronaut.micronautguide \
    --features=assertj,data-jpa,kafka,testcontainers,kafka,mysql,awaitility \
    --build=maven \
    --lang=java
If you don’t specify the --build argument, Gradle is used as the build tool.
If you don’t specify the --lang argument, Java is used as the language.
If you don’t specify the --test argument, JUnit is used for Java and Kotlin, and Spock is used for Groovy.

The previous command creates a Micronaut application with the default package example.micronaut in a directory named micronautguide.

If you use Micronaut Launch, select Micronaut Application as application type and add assertj, data-jpa, kafka, testcontainers, kafka, mysql, and awaitility features.

If you have an existing Micronaut application and want to add the functionality described here, you can view the dependency and configuration changes from the specified features and apply those changes to your application.

5. Testcontainers Dependencies

The generated application contains the following Testcontainers dependencies:

pom.xml
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <scope>test</scope>
</dependency>
pom.xml
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>mysql</artifactId>
    <scope>test</scope>
</dependency>

6. Datasource Configuration

The generated application contains the following configuration:

testcontainers/src/main/resources/application.properties
datasources.default.db-type=mysql
datasources.default.dialect=MYSQL
jpa.default.properties.hibernate.hbm2ddl.auto=update
jpa.default.entity-scan.packages=example.micronaut
datasources.default.driver-class-name=com.mysql.cj.jdbc.Driver
Note the configuration does not contain any database URL, username or password. It does not contain the Kafka bootstrap servers' location either. We will configure them in the test or via Micronaut Test Resources.

7. Getting Started

We are going to implement a Kafka Listener listening to a topic named product-price-changes and upon receiving a message we are going to extract product code and price from the event payload and update the price of that product in the MySQL database.

8. Create JPA entity

First let us start with creating a JPA entity Product.java.

testcontainers/src/main/java/example/micronaut/Product.java
package example.micronaut;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;

import java.math.BigDecimal;

@Entity
@Table(name = "products")
public class Product {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String code;

    @Column(nullable = false)
    private String name;

    @Column(nullable = false)
    private BigDecimal price;

    public Product() {}

    public Product(Long id, String code, String name, BigDecimal price) {
        this.id = id;
        this.code = code;
        this.name = name;
        this.price = price;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public BigDecimal getPrice() {
        return price;
    }

    public void setPrice(BigDecimal price) {
        this.price = price;
    }
}

9. Create Micronaut Data JPA repository

Let us create a Micronaut Data JPA repository interface for the Product entity and add methods to find a product for a given code and update the price for the given product code as follows:

testcontainers/src/main/java/example/micronaut/ProductRepository.java
package example.micronaut;

import io.micronaut.data.annotation.Query;
import io.micronaut.data.annotation.Repository;
import io.micronaut.data.jpa.repository.JpaRepository;

import java.math.BigDecimal;
import java.util.Optional;

@Repository (1)
public interface ProductRepository extends JpaRepository<Product, Long> { (2)

    Optional<Product> findByCode(String code);

    @Query("update Product p set p.price = :price where p.code = :productCode") (3)
    void updateProductPrice(String productCode, BigDecimal price);
}
1 Annotate with @Repository to allow compile time implementations to be added.
2 By extending JpaRepository, you enable automatic generation of CRUD (Create, Read, Update, Delete) operations and JPA specific methods like merge and flush.
3 You can use the @Query annotation to specify an explicit query.

10. Create the event payload java bean

Let us create a domain object named ProductPriceChangedEvent as a record representing the structure of the event payload that we are going to receive from the Kafka topic.

testcontainers/src/main/java/example/micronaut/ProductPriceChangedEvent.java
package example.micronaut;

import io.micronaut.serde.annotation.Serdeable;

import java.math.BigDecimal;

@Serdeable (1)
public record ProductPriceChangedEvent(String productCode, BigDecimal price) {}
1 Declare the @Serdeable annotation at the type level in your source code to allow the type to be serialized or deserialized.

11. Implement Kafka Listener

Finally, let us implement the Kafka listener which handles the messages received from the product-price-changes topic and updates the product price in the database.

To listen to Kafka messages you can use the @KafkaListener annotation to define a message listener.

testcontainers/src/main/java/example/micronaut/ProductPriceChangedEventHandler.java
package example.micronaut;

import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import jakarta.inject.Singleton;
import jakarta.transaction.Transactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;

@Singleton (1)
@Transactional (2)
class ProductPriceChangedEventHandler {

    private static final Logger LOG = LoggerFactory.getLogger(ProductPriceChangedEventHandler.class);

    private final ProductRepository productRepository;

    ProductPriceChangedEventHandler(ProductRepository productRepository) { (3)
        this.productRepository = productRepository;
    }

    @Topic("product-price-changes") (4)
    @KafkaListener(offsetReset = EARLIEST, groupId = "demo") (5)
    public void handle(ProductPriceChangedEvent event) {
        LOG.info("Received a ProductPriceChangedEvent with productCode:{}: ", event.productCode());
        productRepository.updateProductPrice(event.productCode(), event.price());
    }
}
1 Use jakarta.inject.Singleton to designate a class as a singleton.
2 You can declare a method or class as transactional with the jakarta.transaction.Transactional annotation.
3 Use constructor injection to inject a bean of type ProductRepository.
4 The @Topic annotation is again used to indicate which topic(s) to subscribe to.
5 The @KafkaListener is used with offsetReset set to EARLIEST which makes the listener start listening to messages from the beginning of the partition.

Let us assume that there is an agreement between the sender and receiver that the payload will be sent in the following JSON format:

{
    "productCode": "P100",
    "price": 25.00
}

12. Testing

12.1. Write Test for Kafka Listener

We are going to write a test for the Kafka event listener ProductPriceChangedEventHandler by sending a message to the product-price-changes topic and verify the updated product price in the database.

But in order to successfully start our Micronaut context we need Kafka and the MySQL database up and running and configure the Micronaut context to talk to them.

Create a @KafkaClient to simplify publishing events in the test.

testcontainers/src/test/java/example/micronaut/ProductPriceChangesClient.java
package example.micronaut;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;

@KafkaClient (1)
public interface ProductPriceChangesClient {

    @Topic("product-price-changes") (2)
    void send(@KafkaKey String productCode, (3)
              ProductPriceChangedEvent event);
}
1 The @KafkaClient annotation is used to designate this interface as a client
2 The @Topic annotation indicates which topics the @KafkaClient should be published to.
3 The Kafka key can be specified by providing a parameter annotated with @KafkaKey. If no such parameter is specified the record is sent with a null key.

We will use the Testcontainers library to spin up a Kafka and the MySQL database instances as Docker containers and configure the application to talk to them as follows:

testcontainers/src/test/java/example/micronaut/ProductPriceChangedEventHandlerTest.java
package example.micronaut;

import io.micronaut.context.annotation.Property;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.micronaut.test.support.TestPropertyProvider;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.math.BigDecimal;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

@MicronautTest(transactional = false) (1)
@Property(name = "datasources.default.driver-class-name", value = "org.testcontainers.jdbc.ContainerDatabaseDriver") (2)
@Property(name = "datasources.default.url", value = "jdbc:tc:mysql:8:///db") (3)
@Testcontainers(disabledWithoutDocker = true) (4)
@TestInstance(TestInstance.Lifecycle.PER_CLASS) (5)
class ProductPriceChangedEventHandlerTest implements TestPropertyProvider {  (6)

    @Container (7)
    static final KafkaContainer kafka = new KafkaContainer(
            DockerImageName.parse("confluentinc/cp-kafka:7.3.3")
    );

    @Override
    public @NonNull Map<String, String> getProperties() {  (6)
        if (!kafka.isRunning()) {
            kafka.start();
        }
        return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());
    }

    @Test
    void shouldHandleProductPriceChangedEvent(
            ProductPriceChangesClient productPriceChangesClient,
            ProductRepository productRepository
    ) {
        Product product = new Product(null, "P100", "Product One", BigDecimal.TEN);
        Long id = productRepository.save(product).getId(); (8)

        ProductPriceChangedEvent event = new ProductPriceChangedEvent(
                "P100",
                new BigDecimal("14.50")
        );

        productPriceChangesClient.send(event.productCode(), event); (9)

        await() (10)
                .pollInterval(Duration.ofSeconds(3))
                .atMost(10, SECONDS)
                .untilAsserted(() -> {
                    Optional<Product> optionalProduct = productRepository.findByCode("P100");
                    assertThat(optionalProduct).isPresent();
                    assertThat(optionalProduct.get().getCode()).isEqualTo("P100");
                    assertThat(optionalProduct.get().getPrice()).isEqualTo(new BigDecimal("14.50"));
                });

        productRepository.deleteById(id);
    }
}
1 Annotate the class with @MicronautTest so the Micronaut framework will initialize the application context and the embedded server. By default, each @Test method will be wrapped in a transaction that will be rolled back when the test finishes. This behaviour is is changed by setting transaction to false.
2 Annotate the class with @Property to supply the driver class name configuration to the test.
3 We have configured the Testcontainers special JDBC URL to spin up a MySQL container and configure it as a DataSource with Micronaut application context.
4 Disable test if Docker not present.
5 Classes that implement TestPropertyProvider must use this annotation to create a single class instance for all tests (not necessary in Spock tests).
6 When you need dynamic properties definition, implement the TestPropertyProvider interface. Override the method .getProperties() and return the properties you want to expose to the application.
7 We have used the Testcontainers JUnit 5 Extension annotations @Testcontainers and @Container to spin up a Kafka container and registered the bootstrap-servers location using TestPropertyProvider mechanism.
8 We have created a Product record in the database before sending the event.
9 During the test, we sent a message to the product-price-changes topic using ProductPriceChangesClient with productCode as key and ProductPriceChangedEvent instance as value.
10 As Kafka message processing is an asynchronous process, we are using the Awaitility library to check whether the product price is updated in the database to the expected value or not with an interval of 3 seconds waiting up to a maximum of 10 seconds. If the message is consumed and processed within 10 seconds the test will pass, otherwise the test will fail.

13. Testing the Application

To run the tests:

./mvnw test

You should see the Kafka and MySQL Docker containers are started and all tests should PASS.

You can also notice that after the tests are executed the containers are stopped and removed automatically.

14. Testing Kafka integration with Test Resources

We are going to simplify testing with Micronaut Test Resources:

Micronaut Test Resources adds support for managing external resources which are required during development or testing.

14.1. Removing Testcontainers Dependencies

Remove the Testcontainers dependencies from your build files.

14.2. Configure Test Resources

You can enable test resources support simply by setting the property micronaut.test.resources.enabled (either in your POM or via the command-line).

14.3. Test Resources Kafka

When the application is started locally — either under test or by running the application — resolution of the property kafka.bootstrap.servers is detected and the Test Resources service will start a local Kafka docker container, and inject the properties required to use this as the broker.

14.4. Simpler Test with Test Resources

Thanks to Test Resources, we can simplify the test as follows:

testresources/src/test/java/example/micronaut/ProductPriceChangedEventHandlerTest.java
package example.micronaut;

import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;

import java.math.BigDecimal;
import java.time.Duration;
import java.util.Optional;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

@MicronautTest(transactional = false) (1)
class ProductPriceChangedEventHandlerTest {

    @Test
    void shouldHandleProductPriceChangedEvent(
            ProductPriceChangesClient productPriceChangesClient,
            ProductRepository productRepository
    ) {
        Product product = new Product(null, "P100", "Product One", BigDecimal.TEN);
        Long id = productRepository.save(product).getId(); (2)

        ProductPriceChangedEvent event = new ProductPriceChangedEvent(
                "P100",
                new BigDecimal("14.50")
        );

        productPriceChangesClient.send(event.productCode(), event); (3)

        await() (4)
                .pollInterval(Duration.ofSeconds(3))
                .atMost(10, SECONDS)
                .untilAsserted(() -> {
                    Optional<Product> optionalProduct = productRepository.findByCode("P100");
                    assertThat(optionalProduct).isPresent();
                    assertThat(optionalProduct.get().getCode()).isEqualTo("P100");
                    assertThat(optionalProduct.get().getPrice()).isEqualTo(new BigDecimal("14.50"));
                });

        productRepository.deleteById(id);
    }
}
1 Annotate the class with @MicronautTest so the Micronaut framework will initialize the application context and the embedded server. By default, each @Test method will be wrapped in a transaction that will be rolled back when the test finishes. This behaviour is is changed by setting transaction to false.
2 We have created a Product record in the database before sending the event.
3 During the test, we sent a message to the product-price-changes topic using ProductPriceChangesClient with productCode as key and ProductPriceChangedEvent instance as value.
4 As Kafka message processing is an asynchronous process, we are using the Awaitility library to check whether the product price is updated in the database to the expected value or not with an interval of 3 seconds waiting up to a maximum of 10 seconds. If the message is consumed and processed within 10 seconds the test will pass, otherwise the test will fail.

If you run the test, you will see a MySQL container and Kafka container being started by Test Resources through integration with Testcontainers to provide throwaway containers for testing.

14.5. Micronaut Test Resources Goals

  • zero-configuration: without adding any configuration, test resources should be spawned and the application configured to use them. Configuration is only required for advanced use cases.

  • classpath isolation: use of test resources shouldn’t leak into your application classpath, nor your test classpath

  • compatible with GraalVM native: if you build a native binary, or run tests in native mode, test resources should be available

  • easy to use: the Micronaut build plugins for Gradle and Maven should handle the complexity of figuring out the dependencies for you

  • extensible: you can implement your own test resources, in case the built-in ones do not cover your use case

  • technology agnostic: while lots of test resources use Testcontainers under the hood, you can use any other technology to create resources

15. Summary

We have learned how to test Kafka message listeners using a real Kafka instance with Testcontainers and verified the expected result using Awaitility. If we are using Kafka and MySQL in production, it is often the best approach to test with real Kafka and MySQL instances in order to allow our test suite to provide us with more confidence about the correctness of our code.

17. Help with the Micronaut Framework

The Micronaut Foundation sponsored the creation of this Guide. A variety of consulting and support services are available.

18. License

All guides are released with an Apache license 2.0 license for the code and a Creative Commons Attribution 4.0 license for the writing and media (images…​).