1. 引言

在本教程中,我们将探讨如何为物联网(IoT)应用构建高效的数据管道。我们会深入了解 IoT 架构的特点,并结合实际示例,展示如何使用 MQTT Broker、Apache NiFi 和 InfluxDB 等工具,搭建一个高可扩展性的数据处理流程。

2. 物联网及其架构

2.1. 什么是物联网?

物联网(Internet of Things, IoT)指的是由物理对象组成的网络,这些对象通常被称为“物”(Things)。例如,从常见的家用灯泡到复杂的工业设备,都可以是 IoT 中的“物”。通过这个网络,我们可以将大量传感器和执行器连接到互联网,用于数据交换:

IoT Home Automation 1

这些“物”可以部署在各种环境中,比如家庭、工厂,甚至移动的货运车辆中。然而,由于它们所处环境的不确定性,比如电力供应不稳定或网络质量差,这就对 IoT 应用提出了独特的需求。

2.2. IoT 架构简介

典型的 IoT 架构通常分为四层,数据在这些层之间流动:

IoT Architecture Layers

  • 感知层:主要由传感器构成,负责从环境中采集原始数据。
  • 网络层:负责将采集到的数据通过互联网传输到处理系统。
  • 数据处理层:对原始数据进行清洗、过滤和初步分析。
  • 应用层:进行深度分析和数据管理,提供业务价值。

3. MQTT、NiFi 和 InfluxDB 简介

3.1. MQTT

MQTT(Message Queuing Telemetry Transport) 是一种轻量级的发布-订阅网络协议,现已成为 OASISISO 标准。它最初由 IBM 开发,适用于资源受限的环境,比如内存、网络带宽或电源有限的设备。

MQTT 采用客户端-服务器模型,客户端通过 TCP 连接到一个称为 MQTT Broker 的服务器。客户端可以将消息发布到一个称为 Topic 的地址,也可以订阅某个 Topic 来接收消息:

MQTT Architecture

MQTT 支持三种服务质量(QoS)

  • 最多一次(At most once)
  • 至少一次(At least once)
  • 恰好一次(Exactly once)

客户端可以在发布消息时请求 Broker 持久化该消息。此外,MQTT 还支持用户名密码认证和 SSL/TLS 加密。

常见的 MQTT Broker 实现有:

我们将在本教程中使用 Mosquitto,它由 Eclipse 基金会维护,可以轻松安装在 Raspberry Pi 等嵌入式设备上。

3.2. Apache NiFi

Apache NiFi 最初由 NSA 开发,名为 NiagaraFiles。它是一个用于自动化和管理数据流的平台,基于 Flow-based Programming(FBP) 模型,将应用定义为黑盒组件之间的数据流。

基本概念

  • FlowFile:NiFi 中流动的数据对象。
  • Processor:对 FlowFile 执行实际处理(如路由、转换等)。
  • Connection:连接 Processor 的数据通道。
  • Process Group:将多个组件组合在一起,便于组织数据流。
  • Remote Process Group(RPG):用于与远程 NiFi 实例通信。

架构图

NiFi Architecture

NiFi 是一个基于 Java 的程序,包含多个核心组件运行在 JVM 中。其主要组件包括:

  • Web Server:提供控制 API。
  • Flow Controller:核心组件,负责调度资源。
  • FlowFile Repository、Content Repository、Provenance Repository:分别用于存储 FlowFile 元数据、内容和溯源信息。

MiNiFi

NiFi 还提供了一个轻量级子项目 MiNiFi,用于边缘数据采集,支持通过 Site-to-Site 协议与 NiFi 集成:

NiFi MiNiFi C2

MiNiFi 支持通过 MiNiFi C2 协议 进行集中管理,并提供完整的数据溯源链。

3.3. InfluxDB

InfluxDB 是一个用 Go 编写的时序数据库,专为高性能的时序数据存储和查询设计,非常适合 IoT 传感器数据和实时分析。

数据模型

数据在 InfluxDB 中以 时间序列(time-series) 组织。每个时间序列包含多个点(point),每个点包含以下四个部分:

InfluxDB Point

  • Timestamp:UTC 时间戳。
  • Field Set:字段键值对,表示实际数据。
  • Tag Set(可选):标签键值对,用于索引和查询。
  • Measurement:字段和标签的容器。

此外,InfluxDB 支持 Retention Policy(保留策略),用于定义数据保留时间和副本数量。

InfluxDB 是 InfluxData 平台的一部分,该平台还包括:

InfluxDB Platform 2

4. 实战:构建 IoT 数据管道

我们假设本教程的场景是:从多个城市的观测站收集空气质量相关数据,包括臭氧、一氧化碳、二氧化硫、二氧化氮和气溶胶。

4.1. 基础设施搭建

每个气象站配备传感器,连接到 Raspberry Pi 等设备进行数据采集,然后通过无线网络上传:

IoT Infrastructure Set up

区域控制中心负责聚合数据并进行初步分析,最终将数据上传到云端的中央指挥中心。

4.2. 架构设计

我们将使用 Mosquitto、MiNiFi、NiFi 和 InfluxDB 构建如下架构:

IoT Architecture

4.3. 安装步骤

启动 Mosquitto

net start mosquitto

配置 NiFi 的 Site-to-Site 端口

<NIFI_HOME>/conf/nifi.properties 中配置:

nifi.remote.input.socket.port=1026

启动 NiFi:

<NIFI_HOME>/bin/run-nifi.bat

安装 MiNiFi 并添加 MQTT 处理器

将 NiFi 的 MQTT NAR 文件复制到 MiNiFi 的 lib 目录:

COPY <NIFI_HOME>/lib/nifi-mqtt-nar-x.x.x.nar <MINIFI_HOME>/lib/nifi-mqtt-nar-x.x.x.nar

启动 MiNiFi:

<MINIFI_HOME>/bin/run-minifi.bat

启动 InfluxDB

<INFLUXDB_HOME>/influxd.exe

4.4. 定义 NiFi 数据流

访问 http://localhost:8080/nifi,创建如下数据流:

NiFi Main Data Flow Combined

配置 PutInfluxDB 处理器,指定 InfluxDB 的连接 URL 和数据库名。

4.5. 定义 MiNiFi 数据流

在 NiFi 中定义如下数据流,然后导出为模板:

NiFi MiNiFi Data Flow Combined

使用 MiNiFi 工具包将模板转换为 YAML:

<MINIFI_TOOLKIT_HOME>/bin/config.bat transform config.xml config.yml

手动添加 NiFi 服务器地址:

Input Ports:
  - id: 19442f9d-aead-3569-b94c-1ad397e8291c
    name: From MiNiFi
    Port: 1026
    Host Name: localhost

config.yml 放入 <MINIFI_HOME>/conf 并重启 MiNiFi。

4.6. 测试数据管道

使用如下 Java 程序模拟传感器数据:

class Sensor implements Callable<Boolean> {
    String city;
    String station;
    String pollutant;
    String topic;
    Sensor(String city, String station, String pollutant, String topic) {
        this.city = city;
        this.station = station;
        this.pollutant = pollutant;
        this.topic = topic;
    }

    @Override
    public Boolean call() throws Exception {
        MqttClient publisher = new MqttClient(
          "tcp://localhost:1883", UUID.randomUUID().toString());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setAutomaticReconnect(true);
        options.setCleanSession(true);
        options.setConnectionTimeout(10);
        publisher.connect(options);
        IntStream.range(0, 10).forEach(i -> {
            String payload = String.format("%1$s,city=%2$s,station=%3$s value=%4$04.2f",
              pollutant,
              city,
              station,
              ThreadLocalRandom.current().nextDouble(0, 100));
            MqttMessage message = new MqttMessage(payload.getBytes());
            message.setQos(0);
            message.setRetained(true);
            try {
                publisher.publish(topic, message);
                Thread.sleep(1000);
            } catch (MqttException | InterruptedException e) {
                e.printStackTrace();
            }
        });
        return true;
    }
}

启动多个传感器模拟器:

ExecutorService executorService = Executors.newCachedThreadPool();
List<Callable<Boolean>> sensors = Arrays.asList(
  new Simulation.Sensor("london", "central", "ozone", "air-quality/ozone"),
  new Simulation.Sensor("london", "central", "co", "air-quality/co"),
  new Simulation.Sensor("london", "central", "so2", "air-quality/so2"),
  new Simulation.Sensor("london", "central", "no2", "air-quality/no2"),
  new Simulation.Sensor("london", "central", "aerosols", "air-quality/aerosols"));
List<Future<Boolean>> futures = executorService.invokeAll(sensors);

如果一切正常,可在 InfluxDB 查询到数据:

InfluxDB Query Result

例如,查询 measurement 为 ozone 的所有点。

5. 总结

本教程通过一个简单的空气质量监测场景,展示了如何使用 MQTT、NiFi 和 InfluxDB 构建一个可扩展的 IoT 数据管道。虽然这只是 IoT 应用的一个缩影,但它体现了数据采集、传输、处理和存储的基本流程。

⚠️ 实际项目中,架构会更加复杂,可能需要考虑数据加密、容错机制、自动化部署等高级功能。此外,还可以通过反馈机制将分析结果转化为控制指令,形成闭环。

✅ 本教程为入门级演示,适合快速搭建原型系统。对于生产环境,建议进一步优化架构和配置。


原始标题:IoT Data Pipeline with MQTT, NiFi, and InfluxDB