1. 概述
InfluxDB 是一款高性能的时间序列数据库。它支持通过类SQL查询语言进行数据插入和实时查询。
本文将演示如何:
- 连接InfluxDB服务器
- 创建数据库
- 写入时间序列数据
- 查询数据库
2. 环境准备
在项目中添加依赖:
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.8</version>
</dependency>
最新版本可在 Maven Central 获取。
同时需要安装InfluxDB实例,安装指南参考 InfluxData官网。
3. 连接服务器
3.1 创建连接
通过URL和凭据创建连接:
InfluxDB influxDB = InfluxDBFactory.connect(databaseURL, userName, password);
3.2 验证连接
通信通过RESTful API进行,非持久连接。使用ping服务验证连接状态:
Pong response = this.influxDB.ping();
if (response.getVersion().equalsIgnoreCase("unknown")) {
log.error("Error pinging server.");
return;
}
✅ 成功响应包含数据库版本,失败返回"unknown"
3.3 创建数据库
创建数据库时必须同时设置保留策略:
保留策略定义数据存储时长。时间序列数据(如CPU/内存统计)通常需要降采样处理:
influxDB.createDatabase("baeldung");
influxDB.createRetentionPolicy(
"defaultPolicy", "baeldung", "30d", 1, true);
参数说明:
name
: 策略名称database
: 数据库名interval
: 保留时长(30天)replicationFactor
: 副本数(单机设为1)default
: 是否设为默认策略
3.4 设置日志级别
InfluxDB内部使用Retrofit,可通过拦截器设置日志:
influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
输出示例:
Dec 20, 2017 5:38:10 PM okhttp3.internal.platform.Platform log
INFO: --> GET http://127.0.0.1:8086/ping
可用级别:BASIC
、FULL
、HEADERS
、NONE
4. 数据写入与查询
4.1 数据点(Point)
InfluxDB的基本数据单位是Point(时间戳+键值对):
Point point = Point.measurement("memory")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.addField("name", "server1")
.addField("free", 4743656L)
.addField("used", 1015096L)
.addField("buffer", 1010467L)
.build();
包含内存统计、主机名和时间戳。
4.2 批量写入
时间序列数据通常包含大量小数据点,单点写入效率低下。推荐使用批量写入:
BatchPoints batchPoints = BatchPoints
.database(dbName)
.retentionPolicy("defaultPolicy")
.build();
Point point1 = Point.measurement("memory")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.addField("name", "server1")
.addField("free", 4743656L)
.addField("used", 1015096L)
.addField("buffer", 1010467L)
.build();
Point point2 = Point.measurement("memory")
.time(System.currentTimeMillis() - 100, TimeUnit.MILLISECONDS)
.addField("name", "server1")
.addField("free", 4743696L)
.addField("used", 1016096L)
.addField("buffer", 1008467L)
.build();
batchPoints.point(point1);
batchPoints.point(point2);
influxDB.write(batchPoints);
⚠️ 注意:
- 时间戳是主键,相同时间戳的数据点会被覆盖
- 批量写入必须指定数据库和保留策略
4.3 单点写入
某些场景下需要单点写入,可启用批处理模式:
influxDB.enableBatch(100, 200, TimeUnit.MILLISECONDS);
参数含义:
- 100:每100条数据提交一次
- 200:最多200毫秒提交一次
使用前需设置默认数据库和保留策略:
influxDB.setRetentionPolicy("defaultPolicy");
influxDB.setDatabase(dbName);
然后可直接写入单点:
influxDB.write(point);
重要提示:
- 单点写入前必须设置数据库和保留策略
- 不再需要时关闭批处理模式:
influxDB.disableBatch();
- 关闭连接会自动关闭线程池:
influxDB.close();
4.4 查询结果映射
查询结果(QueryResult)可映射为POJO。先创建实体类:
@Measurement(name = "memory")
public class MemoryPoint {
@Column(name = "time")
private Instant time;
@Column(name = "name")
private String name;
@Column(name = "free")
private Long free;
@Column(name = "used")
private Long used;
@Column(name = "buffer")
private Long buffer;
}
注解说明:
@Measurement
:对应measurement名称@Column
:映射字段名
使用InfluxDBResultMapper
进行映射。
4.5 查询InfluxDB
执行查询并映射结果:
QueryResult queryResult = connection
.performQuery("Select * from memory", "baeldung");
InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<MemoryPoint> memoryPointList = resultMapper
.toPOJO(queryResult, MemoryPoint.class);
assertEquals(2, memoryPointList.size());
assertTrue(4743696L == memoryPointList.get(0).getFree());
✅ 验证要点:
- 默认按时间戳升序排列
- 首条记录是较早的数据点
修改排序方式:
queryResult = connection.performQuery(
"Select * from memory order by time desc", "baeldung");
memoryPointList = resultMapper
.toPOJO(queryResult, MemoryPoint.class);
assertEquals(2, memoryPointList.size());
assertTrue(4743656L == memoryPointList.get(0).getFree());
添加order by time desc
实现降序排列。InfluxDB查询语法与SQL类似,详细参考官方文档。
5. 总结
我们完成了:
- 连接InfluxDB服务器
- 创建数据库和保留策略
- 批量/单点写入数据
- 查询并映射结果
完整示例代码见 GitHub仓库。