1. 概述
本文将深入探讨 java.util.concurrent
包中的 ConcurrentSkipListMap 类。这个并发集合类提供了无锁(lock-free)的线程安全实现,特别适合需要在对数据进行操作的同时创建不可变快照的场景。
我们将通过一个实际案例展示其威力:对事件流进行排序,并获取最近60秒内的事件快照。这种需求在实时监控、日志分析等场景中非常常见。
2. 流排序逻辑
假设我们有一个来自多线程的持续事件流,需要同时处理两类数据:
- 最近60秒内的事件
- 超过60秒的历史事件
2.1 事件数据结构
首先定义事件类:
public class Event {
private ZonedDateTime eventTime;
private String content;
// 标准构造器和getter方法
}
2.2 初始化并发跳表
使用 ConcurrentSkipListMap
存储事件,通过 Comparator
按 eventTime
排序:
ConcurrentSkipListMap<ZonedDateTime, String> events
= new ConcurrentSkipListMap<>(
Comparator.comparingLong(v -> v.toInstant().toEpochMilli()));
✅ 关键点:
- 使用
comparingLong()
提取时间戳进行比较 - 构造时传入比较器,后续操作自动排序
2.3 事件添加
添加事件无需显式同步,直接调用 put()
方法:
public void acceptEvent(Event event) {
events.put(event.getEventTime(), event.getContent());
}
⚠️ 注意:虽然线程安全,但高频写入仍需考虑性能影响
2.4 获取时间窗口数据
获取最近60秒事件
使用 tailMap()
创建不可变快照:
public ConcurrentNavigableMap<ZonedDateTime, String> getEventsFromLastMinute() {
return events.tailMap(ZonedDateTime.now().minusMinutes(1));
}
获取超过60秒的历史事件
使用 headMap()
获取历史数据:
public ConcurrentNavigableMap<ZonedDateTime, String> getEventsOlderThatOneMinute() {
return events.headMap(ZonedDateTime.now().minusMinutes(1));
}
✅ 核心优势:
- 快照创建过程无锁
- 其他线程可继续写入新数据
- 返回的视图不可变,线程安全
3. 测试流排序逻辑
通过多线程测试验证实现:
3.1 测试场景设置
创建2个生产者线程,各发送100个事件:
ExecutorService executorService = Executors.newFixedThreadPool(3);
EventWindowSort eventWindowSort = new EventWindowSort();
int numberOfThreads = 2;
Runnable producer = () -> IntStream
.rangeClosed(0, 100)
.forEach(index -> eventWindowSort.acceptEvent(
new Event(ZonedDateTime.now().minusSeconds(index), UUID.randomUUID().toString()))
);
for (int i = 0; i < numberOfThreads; i++) {
executorService.execute(producer);
}
3.2 验证最近60秒事件
获取快照并验证:
ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute
= eventWindowSort.getEventsFromLastMinute();
断言检查:
- 不存在超过60秒的事件: ```java long eventsOlderThanOneMinute = eventsFromLastMinute .entrySet() .stream() .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1))) .count();
assertEquals(eventsOlderThanOneMinute, 0);
2. 存在60秒内的事件:
```java
long eventYoungerThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
.count();
assertTrue(eventYoungerThanOneMinute > 0);
3.3 验证历史事件
测试 getEventsOlderThatOneMinute()
:
ConcurrentNavigableMap<ZonedDateTime, String> eventsOlder
= eventWindowSort.getEventsOlderThatOneMinute();
断言检查:
- 存在超过60秒的事件: ```java long eventsOlderThanOneMinute = eventsOlder .entrySet() .stream() .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1))) .count();
assertTrue(eventsOlderThanOneMinute > 0);
2. 不存在60秒内的事件:
```java
long eventYoungerThanOneMinute = eventsOlder
.entrySet()
.stream()
.filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
.count();
assertEquals(eventYoungerThanOneMinute, 0);
🔥 关键验证点:在多线程持续写入时,仍能获取准确的时间窗口快照
4. 总结
ConcurrentSkipListMap
在以下场景表现卓越:
- 需要高并发访问的有序数据结构
- 要求无锁操作的场景
- 需要创建数据快照而不阻塞写入操作
本文通过事件流排序的案例,展示了其核心优势:
- 无锁快照:
tailMap()
/headMap()
创建不可变视图 - 线程安全:多线程环境下数据一致性保证
- 自动排序:基于比较器自动维护数据顺序
完整实现代码可在 GitHub项目 中获取(Maven项目,可直接导入运行)。
⚡ 性能提示:虽然 ConcurrentSkipListMap
性能优异,但在超高并发写入场景下,仍需结合实际压测评估吞吐量。