1. 概述

InfluxDB 是一款高性能的时间序列数据库。它支持通过类SQL查询语言进行数据插入和实时查询。

本文将演示如何:

  • 连接InfluxDB服务器
  • 创建数据库
  • 写入时间序列数据
  • 查询数据库

2. 环境准备

在项目中添加依赖:

<dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.8</version>
</dependency>

最新版本可在 Maven Central 获取。

同时需要安装InfluxDB实例,安装指南参考 InfluxData官网

3. 连接服务器

3.1 创建连接

通过URL和凭据创建连接:

InfluxDB influxDB = InfluxDBFactory.connect(databaseURL, userName, password);

3.2 验证连接

通信通过RESTful API进行,非持久连接。使用ping服务验证连接状态:

Pong response = this.influxDB.ping();
if (response.getVersion().equalsIgnoreCase("unknown")) {
    log.error("Error pinging server.");
    return;
} 

✅ 成功响应包含数据库版本,失败返回"unknown"

3.3 创建数据库

创建数据库时必须同时设置保留策略

保留策略定义数据存储时长。时间序列数据(如CPU/内存统计)通常需要降采样处理:

influxDB.createDatabase("baeldung");
influxDB.createRetentionPolicy(
  "defaultPolicy", "baeldung", "30d", 1, true);

参数说明:

  • name: 策略名称
  • database: 数据库名
  • interval: 保留时长(30天)
  • replicationFactor: 副本数(单机设为1)
  • default: 是否设为默认策略

3.4 设置日志级别

InfluxDB内部使用Retrofit,可通过拦截器设置日志:

influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);

输出示例:

Dec 20, 2017 5:38:10 PM okhttp3.internal.platform.Platform log
INFO: --> GET http://127.0.0.1:8086/ping

可用级别:BASICFULLHEADERSNONE

4. 数据写入与查询

4.1 数据点(Point)

InfluxDB的基本数据单位是Point(时间戳+键值对):

Point point = Point.measurement("memory")
  .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
  .addField("name", "server1")
  .addField("free", 4743656L)
  .addField("used", 1015096L)
  .addField("buffer", 1010467L)
  .build();

包含内存统计、主机名和时间戳。

4.2 批量写入

时间序列数据通常包含大量小数据点,单点写入效率低下。推荐使用批量写入:

BatchPoints batchPoints = BatchPoints
  .database(dbName)
  .retentionPolicy("defaultPolicy")
  .build();

Point point1 = Point.measurement("memory")
  .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
  .addField("name", "server1") 
  .addField("free", 4743656L)
  .addField("used", 1015096L) 
  .addField("buffer", 1010467L)
  .build();

Point point2 = Point.measurement("memory")
  .time(System.currentTimeMillis() - 100, TimeUnit.MILLISECONDS)
  .addField("name", "server1")
  .addField("free", 4743696L)
  .addField("used", 1016096L)
  .addField("buffer", 1008467L)
  .build();

batchPoints.point(point1);
batchPoints.point(point2);
influxDB.write(batchPoints);

⚠️ 注意:

  • 时间戳是主键,相同时间戳的数据点会被覆盖
  • 批量写入必须指定数据库和保留策略

4.3 单点写入

某些场景下需要单点写入,可启用批处理模式:

influxDB.enableBatch(100, 200, TimeUnit.MILLISECONDS);

参数含义:

  • 100:每100条数据提交一次
  • 200:最多200毫秒提交一次

使用前需设置默认数据库和保留策略:

influxDB.setRetentionPolicy("defaultPolicy");
influxDB.setDatabase(dbName);

然后可直接写入单点:

influxDB.write(point);

重要提示

  • 单点写入前必须设置数据库和保留策略
  • 不再需要时关闭批处理模式:
    influxDB.disableBatch();
    
  • 关闭连接会自动关闭线程池:
    influxDB.close();
    

4.4 查询结果映射

查询结果(QueryResult)可映射为POJO。先创建实体类:

@Measurement(name = "memory")
public class MemoryPoint {

    @Column(name = "time")
    private Instant time;

    @Column(name = "name")
    private String name;

    @Column(name = "free")
    private Long free;

    @Column(name = "used")
    private Long used;

    @Column(name = "buffer")
    private Long buffer;
}

注解说明:

  • @Measurement:对应measurement名称
  • @Column:映射字段名

使用InfluxDBResultMapper进行映射。

4.5 查询InfluxDB

执行查询并映射结果:

QueryResult queryResult = connection
  .performQuery("Select * from memory", "baeldung");

InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<MemoryPoint> memoryPointList = resultMapper
  .toPOJO(queryResult, MemoryPoint.class);

assertEquals(2, memoryPointList.size());
assertTrue(4743696L == memoryPointList.get(0).getFree());

✅ 验证要点:

  • 默认按时间戳升序排列
  • 首条记录是较早的数据点

修改排序方式:

queryResult = connection.performQuery(
  "Select * from memory order by time desc", "baeldung");
memoryPointList = resultMapper
  .toPOJO(queryResult, MemoryPoint.class);

assertEquals(2, memoryPointList.size());
assertTrue(4743656L == memoryPointList.get(0).getFree());

添加order by time desc实现降序排列。InfluxDB查询语法与SQL类似,详细参考官方文档

5. 总结

我们完成了:

  • 连接InfluxDB服务器
  • 创建数据库和保留策略
  • 批量/单点写入数据
  • 查询并映射结果

完整示例代码见 GitHub仓库


原始标题:Using InfluxDB with Java | Baeldung