1. 概述

本文将介绍如何在 Java 项目中集成 MQTT 消息通信,使用的是 Eclipse Paho 项目 提供的客户端库。

MQTT 因其轻量、高效,在物联网(IoT)场景中被广泛采用。通过 Paho 客户端,我们可以快速实现消息的发布与订阅,无需关心底层协议细节。

2. MQTT 简要介绍

MQTT(MQ Telemetry Transport)是一种轻量级的消息协议,最初为低功耗设备设计,适用于工业控制、传感器等资源受限的场景。

随着物联网的发展,MQTT 被 OASIS 和 ISO 标准化,成为主流的 IoT 通信协议之一。

✅ 核心特点:

  • 基于 发布-订阅(Publish-Subscribe) 模式
  • 消息通过 主题(Topic) 进行路由
  • 支持通配符订阅,灵活匹配多个主题

📌 主题命名示例:

  • 简单字符串:oiltemp
  • 路径风格:motor/1/rpm

📌 通配符说明:

  • +:单层通配符,如 motor/+/rpm 匹配 motor/1/rpmmotor/2/rpm
  • #:多层通配符,如 sensor/# 匹配所有以 sensor/ 开头的主题

客户端通过订阅特定主题来接收消息,发布者将消息发送到指定主题,由 Broker 负责转发。

3. 项目依赖配置

使用 Maven 构建项目时,只需引入 Paho 客户端依赖即可:

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.2.0</version>
</dependency>

📌 提示:最新版本可在 Maven Central 查询。

4. 客户端初始化

使用 Paho 的第一步是获取 IMqttClient 接口的实现,用于连接 Broker 并收发消息。

Paho 提供了两个实现:

  • MqttClient:同步客户端,API 简单直观 ✅
  • MqttAsyncClient:异步客户端,适合高并发场景

本文以同步客户端为例,初始化分为两步:创建实例 + 连接服务器。

4.1 创建 IMqttClient 实例

String publisherId = UUID.randomUUID().toString();
IMqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883", publisherId);

📌 参数说明:

  • 第一个参数:Broker 的地址(Endpoint),格式为 tcp://host:port
  • 第二个参数:客户端唯一标识(Client ID),建议使用 UUID 避免冲突

⚠️ 注意:使用的 iot.eclipse.org:1883 是 Paho 官方提供的公共测试 Broker,无需认证,适合学习和测试。

你也可以使用其他构造函数自定义消息持久化机制或线程池(ScheduledExecutorService),但大多数场景默认即可。

4.2 连接 Broker

创建实例后,需调用 connect() 方法建立连接。可通过 MqttConnectOptions 配置连接参数:

MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);     // 断线自动重连
options.setCleanSession(true);           // 每次连接都新建会话,不恢复历史消息
options.setConnectionTimeout(10);        // 连接超时 10 秒
publisher.connect(options);

✅ 常用配置项:

  • setAutomaticReconnect(true):网络异常后自动重连,生产环境建议开启
  • setCleanSession(false):保留会话状态,用于接收离线消息(QoS > 0 时有效)
  • setUserName() / setPassword():设置认证信息

默认值通常够用,按需调整即可。

5. 发送消息

消息发送非常简单,调用 publish() 方法即可:

MqttMessage msg = readEngineTemp();
msg.setQos(0);
msg.setRetained(true);
client.publish(TOPIC, msg);

📌 核心参数:

  • QoS(服务质量)
    • 0:最多一次(fire-and-forget),不保证送达 ❌
    • 1:至少一次,可能重复 ✅
    • 2:恰好一次,最可靠但开销最大 ✅✅
  • Retained(保留消息):Broker 会保存该消息,新订阅者立即收到“最后已知值”

示例中的 EngineTemperatureSensor 模拟传感器数据生成:

public class EngineTemperatureSensor implements Callable<Void> {

    private final IMqttClient client;
    private static final String TOPIC = "engine/temperature";
    private final Random rnd = new Random();

    public EngineTemperatureSensor(IMqttClient client) {
        this.client = client;
    }

    @Override
    public Void call() throws Exception {        
        if (!client.isConnected()) {
            return null;
        }           
        MqttMessage msg = readEngineTemp();
        msg.setQos(0);
        msg.setRetained(true);
        client.publish(TOPIC, msg);        
        return null;        
    }

    private MqttMessage readEngineTemp() {             
        double temp = 80 + rnd.nextDouble() * 20.0;        
        byte[] payload = String.format("T:%04.2f", temp).getBytes();        
        return new MqttMessage(payload);           
    }
}

💡 踩坑提醒:payload 必须是 byte[] 类型,字符串需手动 .getBytes() 转换。

6. 接收消息

订阅消息使用 subscribe() 方法,指定主题、QoS 和回调处理器:

CountDownLatch receivedSignal = new CountDownLatch(10);
subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> {
    byte[] payload = msg.getPayload();
    String data = new String(payload);
    System.out.println("Received: " + data);
    receivedSignal.countDown();
});    

// 等待 10 条消息或超时 1 分钟
receivedSignal.await(1, TimeUnit.MINUTES);

📌 关键点:

  • 使用 lambda 实现 IMqttMessageListener,简洁高效
  • CountDownLatch 用于同步测试,生产环境可用其他机制
  • 回调正常返回即表示消息处理成功,Paho 自动 ACK
  • 若回调抛出异常,客户端将关闭 ❌(QoS 0 消息会丢失)

⚠️ 注意事项:

  • QoS 1/2 消息在客户端重连后可重新接收
  • 同一个 IMqttClient 实例可同时用于发布和订阅,无需分开创建

7. 总结

通过 Eclipse Paho 客户端,Java 应用可以轻松集成 MQTT 协议,实现高效、可靠的设备通信。

✅ 优势:

  • API 简洁,上手快
  • 自动处理重连、ACK、会话管理
  • 支持灵活配置,满足生产需求

代码示例已上传至 GitHub:https://github.com/eugenp/tutorials/tree/master/libraries-server

📌 建议:生产环境务必配置认证、加密(TLS)和合理的 QoS 策略,避免安全与可靠性问题。


原始标题:MQTT Client in Java