Publishing and subscribing to MQTT Topics from a Micronaut Application

Learn how to use Mosquitto as an MQTT broker, create a Micronaut CLI application and publish an MQTT topic, and subscribe to the MQTT topic in a different Micronaut Messaging application.

Authors: Sergio del Amo

Micronaut Version: 4.4.0

1. Getting Started

In this guide, we will create a Micronaut application written in Java.

2. What you will need

To complete this guide, you will need the following:

3. Solution

We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.

4. Test Resources

For this guide, we will use Mosquitto via Test Resources. As described in the MQTT section of the Test Resources documentation, configure a mosquitto container:

This should be done in both apps for this guide.
src/main/resources/application.yml
test-resources:
  containers:
    mosquitto:
      image-name: eclipse-mosquitto
      hostnames:
        - mqtt.host
      exposed-ports:
        - mqtt.port: 1883
      ro-fs-bind:
        - "src/test-resources/mosquitto.conf": /mosquitto/config/mosquitto.conf

And then define the mosquitto configuration file:

src/test-resources/mosquitto.conf
persistence false
allow_anonymous true
connection_messages true
log_type all
listener 1883

As we have defined that Test Resources are shared in the build, both applications will make use of the same instance of Mosquitto.

When running under production, you should replace this property with the location of your production message broker via an environment variable.

MQTT_CLIENT_SERVER_URI=tcp://production-server:1183

5. Writing the CLI (Command Line Interface) Application

Create an application using the Micronaut Command Line Interface or with Micronaut Launch.

mn create-cli-app example.micronaut.micronautguide \
    --features=yaml,graalvm,mqtt,awaitility \
    --build=maven --lang=java
If you don’t specify the --build argument, Gradle is used as the build tool.
If you don’t specify the --lang argument, Java is used as the language.
If you don’t specify the --test argument, JUnit is used for Java and Kotlin, and Spock is used for Groovy.

The previous command creates a Micronaut application with the default package example.micronaut in a directory named micronautguide.

Rename this micronautguide directory to cli.

If you use Micronaut Launch, select Micronaut Application as application type and add yaml, graalvm, mqtt, and awaitility features.

If you have an existing Micronaut application and want to add the functionality described here, you can view the dependency and configuration changes from the specified features and apply those changes to your application.

5.1. Create an MqttPublisher

Create an interface to publish MQTT Topics:

cli/src/main/java/example/micronaut/TemperatureClient.java
package example.micronaut;

import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttPublisher;

@MqttPublisher (1)
public interface TemperatureClient {

    @Topic("house/livingroom/temperature") (2)
    void publishLivingroomTemperature(byte[] data);
}
1 Micronaut MQTT implements the interface automatically because it is annotated with @MqttPublisher.
2 To set the topic to publish to, apply the @Topic annotation to the method or an argument of the method.

5.2. Writing the CLI Command

Create an enum to allow the user to submit temperatures in Celsius or Fahrenheit:

cli/src/main/java/example/micronaut/Scale.java
package example.micronaut;

import io.micronaut.core.annotation.NonNull;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public enum Scale {
    FAHRENHEIT("Fahrenheit"),
    CELSIUS("Celsius");

    private static final Map<String,Scale> ENUM_MAP;
    private final String name;

    Scale(String name) {
        this.name = name;
    }

    static {
        Map<String,Scale> map = new ConcurrentHashMap<>();
        for (Scale instance : Scale.values()) {
            map.put(instance.getName(), instance);
        }
        ENUM_MAP = Collections.unmodifiableMap(map);
    }

    @NonNull
    public static Optional<Scale> of(@NonNull String name) {
        return Optional.ofNullable(ENUM_MAP.get(name));
    }

    @NonNull
    public String getName() {
        return name;
    }

    @Override
    public String toString() {
        return name;
    }

    public static Set<String> candidates() {
        return ENUM_MAP.keySet();
    }
}

Create a class to show completion candidates:

cli/src/main/java/example/micronaut/TemperatureScaleCandidates.java
package example.micronaut;

import java.util.ArrayList;

public class TemperatureScaleCandidates extends ArrayList<String> {

    public TemperatureScaleCandidates() {
        super(Scale.candidates());
    }
}

Replace the command:

cli/src/main/java/example/micronaut/MicronautguideCommand.java
package example.micronaut;

import io.micronaut.configuration.picocli.PicocliRunner;
import jakarta.inject.Inject;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;

import java.math.BigDecimal;
import java.math.RoundingMode;

import static example.micronaut.Scale.CELSIUS;
import static example.micronaut.Scale.FAHRENHEIT;

@Command(name = "house-livingroom-temperature", (1)
         description = "Publish living room temperature",
         mixinStandardHelpOptions = true)
public class MicronautguideCommand implements Runnable {

    @Option(names = {"-t", "--temperature"}, (2)
            required = true, (3)
            description = "Temperature value")
    BigDecimal temperature;

    @Option(names = {"-s", "--scale"}, (2)
            required = false, (3)
            description = "Temperate scales ${COMPLETION-CANDIDATES}", (4)
            completionCandidates = TemperatureScaleCandidates.class) (4)
    String scale;

    @Inject
    TemperatureClient temperatureClient; (5)

    public static void main(String[] args) {
        PicocliRunner.run(MicronautguideCommand.class, args);
    }

    public void run() {
        Scale temperatureScale = scale != null ?
                Scale.of(scale).orElse(CELSIUS) : CELSIUS;
        BigDecimal celsius = (temperatureScale == FAHRENHEIT)
                ? fahrenheitToCelsius(temperature) : temperature;
        byte[] data = celsius.toPlainString().getBytes();
        temperatureClient.publishLivingroomTemperature(data); (6)
        System.out.println("Topic published");
    }

    private static BigDecimal fahrenheitToCelsius(BigDecimal temperature) {
        return temperature
                .subtract(BigDecimal.valueOf(32))
                .multiply(BigDecimal.valueOf(5/9.0))
                .setScale(2, RoundingMode.FLOOR);
    }
}
1 The picocli @Command annotation designates this class as a command.
2 Picocli @Option must have one or more names.
3 Options can be marked required to make it mandatory for the user to specify them on the command line.
4 It is possible to embed the completion candidates in the description for an option by specifying the variable ${COMPLETION-CANDIDATES} in the description text.
5 Field injection
6 Publish the MQTT Topic

Replace the generated test with this:

cli/src/test/java/example/micronaut/MicronautguideCommandTest.java
package example.micronaut;

import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import io.micronaut.core.util.CollectionUtils;
import io.micronaut.mqtt.annotation.MqttSubscriber;
import io.micronaut.mqtt.annotation.Topic;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.math.BigDecimal;
import java.util.concurrent.TimeUnit;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.awaitility.Awaitility.await;

class MicronautguideCommandTest {

    @Test
    void testWithCommandLineOption() {
        OutputStream baos = new ByteArrayOutputStream();
        System.setOut(new PrintStream(baos));

        try (ApplicationContext ctx = ApplicationContext.run(
                CollectionUtils.mapOf("spec.name", "MicronautguideCommandTest"),
                Environment.CLI, Environment.TEST)) {

            String[] args = new String[] { "-t", "212", "-s", "Fahrenheit" };
            PicocliRunner.run(MicronautguideCommand.class, ctx, args);
            assertTrue(baos.toString().contains("Topic published"));

            TemperatureListener listener = ctx.getBean(TemperatureListener.class);
            await().atMost(5, TimeUnit.SECONDS)
                    .untilAsserted(() -> assertEquals(new BigDecimal("100.00"), listener.temperature));
        }
    }

    @Requires(property = "spec.name", value = "MicronautguideCommandTest")
    @MqttSubscriber (1)
    public static class TemperatureListener {

        private BigDecimal temperature;

        @Topic("house/livingroom/temperature") (2)
        public void receive(byte[] data) {
            temperature = new BigDecimal(new String(data, UTF_8));
        }
    }
}
1 Because it is annotated with @MqttSubscriber, Micronaut MQTT listens for messages.
2 To set the topic to listen to, apply the @Topic annotation to the method or an argument of the method.

The MQTT server URI is configured by referencing the properties when we used when we set up Mosquitto via Test Resources:

cli/src/main/resources/application.yml
mqtt:
  client:
    server-uri: tcp://${mqtt.host}:${mqtt.port}
    client-id: ${random.uuid}

6. Writing an MQTT subscriber application

Create an application using the Micronaut Command Line Interface or with Micronaut Launch.

mn create-messaging-app example.micronaut.micronautguide \
    --features=yaml,graalvm,mqtt,awaitility \
    --build=maven --lang=java
If you don’t specify the --build argument, Gradle is used as the build tool.
If you don’t specify the --lang argument, Java is used as the language.
If you don’t specify the --test argument, JUnit is used for Java and Kotlin, and Spock is used for Groovy.

The previous command creates a Micronaut application with the default package example.micronaut in a directory named micronautguide.

Rename this micronautguide directory to app.

If you use Micronaut Launch, select Micronaut Application as application type and add yaml, graalvm, mqtt, and awaitility features.

If you have an existing Micronaut application and want to add the functionality described here, you can view the dependency and configuration changes from the specified features and apply those changes to your application.

6.1. Configuration

The MQTT server URI is configured by referencing the properties when we used when we set up Mosquitto via Test Resources:

app/src/main/resources/application.yml
mqtt:
  client:
    server-uri: tcp://${mqtt.host}:${mqtt.port}
    client-id: ${random.uuid}

6.2. Create Subscriber

app/src/main/java/example/micronaut/TemperatureListener.java
package example.micronaut;

import io.micronaut.core.annotation.Nullable;
import io.micronaut.mqtt.annotation.MqttSubscriber;
import io.micronaut.mqtt.annotation.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;

import static java.nio.charset.StandardCharsets.UTF_8;

@MqttSubscriber (1)
public class TemperatureListener {

    private static final Logger LOG = LoggerFactory.getLogger(TemperatureListener.class);
    private BigDecimal temperature = null;

    @Topic("house/livingroom/temperature") (2)
    public void receive(byte[] data) {
        temperature = new BigDecimal(new String(data, UTF_8));
        LOG.info("temperature: {}", temperature);
    }

    @Nullable
    public BigDecimal getTemperature() {
        return temperature;
    }
}
1 Because it is annotated with @MqttSubscriber, Micronaut MQTT listens for messages.
2 To set the topic to publish to, apply the @Topic annotation to the method or an argument of the method.

6.3. Add test

app/src/test/java/example/micronaut/SubscriptionTest.java
package example.micronaut;

import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Requires;
import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttPublisher;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;

@MicronautTest (1)
@Property(name = "spec.name", value = "SubscriptionTest") (2)
class SubscriptionTest {

    @Inject
    TemperatureClient client;

    @Inject
    TemperatureListener listener;

    @Test
    void checkSubscriptionsAreReceived() {
        client.publishLivingroomTemperature("3.145".getBytes(StandardCharsets.UTF_8));

        await().atMost(5, TimeUnit.SECONDS)
                .untilAsserted(() -> assertEquals(new BigDecimal("3.145"), listener.getTemperature()));
    }

    @Requires(property = "spec.name", value = "SubscriptionTest") (3)
    @MqttPublisher (4)
    static interface TemperatureClient {

        @Topic("house/livingroom/temperature") (5)
        void publishLivingroomTemperature(byte[] data);
    }
}
1 Annotate the class with @MicronautTest so the Micronaut framework will initialize the application context and the embedded server. More info.
2 Annotate the class with @Property to supply configuration to the test.
3 This bean loads only if the specified property is defined.
4 Micronaut MQTT implements the interface automatically because it is annotated with @MqttPublisher.
5 To set the topic to publish to, apply the @Topic annotation to the method or an argument of the method.

7. Running the Application

7.1. Run the Subscriber App

To run the application, use the ./mvnw mn:run command, which starts the application on port 8080.

Keep it running. Once you publish a topic with the CLI application, you will see a log entry.

7.2. Run the CLI

Run the CLI command, which will publish a temperature at startup.

./gradlew run --args="-t 212 -s Fahrenheit"

The subscriber receives the MQTT topics, as you will see in the logs:

12:09:47.280 [MQTT Call: 180d98b5-75b9-41be-a874-295289346592]
    INFO  e.micronaut.TemperatureListener - temperature: 100.00

8. Generate a Micronaut Application Native Executable with GraalVM

We will use GraalVM, the polyglot embeddable virtual machine, to generate a native executable of our Micronaut application.

Compiling native executables ahead of time with GraalVM improves startup time and reduces the memory footprint of JVM-based applications.

Only Java and Kotlin projects support using GraalVM’s native-image tool. Groovy relies heavily on reflection, which is only partially supported by GraalVM.

8.1. GraalVM installation

The easiest way to install GraalVM on Linux or Mac is to use SDKMan.io.

Java 17
sdk install java 17.0.8-graal
Java 17
sdk use java 17.0.8-graal

For installation on Windows, or for manual installation on Linux or Mac, see the GraalVM Getting Started documentation.

The previous command installs Oracle GraalVM, which is free to use in production and free to redistribute, at no cost, under the GraalVM Free Terms and Conditions.

Alternatively, you can use the GraalVM Community Edition:

Java 17
sdk install java 17.0.8-graalce
Java 17
sdk use java 17.0.8-graalce

8.2. Native executable generation

To generate a native executable using Maven, run:

./mvnw package -Dpackaging=native-image

The native executable is created in the target directory and can be run with target/micronautguide.

Generate GraalVM native executables for both the CLI and the messaging application, then execute both. Publish a temperature, and you will see it in the subscriber logs.

9. Next steps

Read more about Micronaut MQTT.

10. Help with the Micronaut Framework

The Micronaut Foundation sponsored the creation of this Guide. A variety of consulting and support services are available.

11. License

All guides are released with an Apache license 2.0 license for the code and a Creative Commons Attribution 4.0 license for the writing and media (images…​).