1. 概述

本文将深入探讨 java.util.concurrent 包中的 ConcurrentSkipListMap 类。这个并发集合类提供了无锁(lock-free)的线程安全实现,特别适合需要在对数据进行操作的同时创建不可变快照的场景。

我们将通过一个实际案例展示其威力:对事件流进行排序,并获取最近60秒内的事件快照。这种需求在实时监控、日志分析等场景中非常常见。

2. 流排序逻辑

假设我们有一个来自多线程的持续事件流,需要同时处理两类数据:

  • 最近60秒内的事件
  • 超过60秒的历史事件

2.1 事件数据结构

首先定义事件类:

public class Event {
    private ZonedDateTime eventTime;
    private String content;

    // 标准构造器和getter方法
}

2.2 初始化并发跳表

使用 ConcurrentSkipListMap 存储事件,通过 ComparatoreventTime 排序:

ConcurrentSkipListMap<ZonedDateTime, String> events
 = new ConcurrentSkipListMap<>(
 Comparator.comparingLong(v -> v.toInstant().toEpochMilli()));

✅ 关键点:

  • 使用 comparingLong() 提取时间戳进行比较
  • 构造时传入比较器,后续操作自动排序

2.3 事件添加

添加事件无需显式同步,直接调用 put() 方法:

public void acceptEvent(Event event) {
    events.put(event.getEventTime(), event.getContent());
}

⚠️ 注意:虽然线程安全,但高频写入仍需考虑性能影响

2.4 获取时间窗口数据

获取最近60秒事件

使用 tailMap() 创建不可变快照:

public ConcurrentNavigableMap<ZonedDateTime, String> getEventsFromLastMinute() {
    return events.tailMap(ZonedDateTime.now().minusMinutes(1));
}

获取超过60秒的历史事件

使用 headMap() 获取历史数据:

public ConcurrentNavigableMap<ZonedDateTime, String> getEventsOlderThatOneMinute() {
    return events.headMap(ZonedDateTime.now().minusMinutes(1));
}

✅ 核心优势:

  • 快照创建过程无锁
  • 其他线程可继续写入新数据
  • 返回的视图不可变,线程安全

3. 测试流排序逻辑

通过多线程测试验证实现:

3.1 测试场景设置

创建2个生产者线程,各发送100个事件:

ExecutorService executorService = Executors.newFixedThreadPool(3);
EventWindowSort eventWindowSort = new EventWindowSort();
int numberOfThreads = 2;

Runnable producer = () -> IntStream
  .rangeClosed(0, 100)
  .forEach(index -> eventWindowSort.acceptEvent(
      new Event(ZonedDateTime.now().minusSeconds(index), UUID.randomUUID().toString()))
  );

for (int i = 0; i < numberOfThreads; i++) {
    executorService.execute(producer);
}

3.2 验证最近60秒事件

获取快照并验证:

ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute 
  = eventWindowSort.getEventsFromLastMinute();

断言检查:

  1. 不存在超过60秒的事件: ```java long eventsOlderThanOneMinute = eventsFromLastMinute .entrySet() .stream() .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1))) .count();

assertEquals(eventsOlderThanOneMinute, 0);


2. 存在60秒内的事件:
```java
long eventYoungerThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertTrue(eventYoungerThanOneMinute > 0);

3.3 验证历史事件

测试 getEventsOlderThatOneMinute()

ConcurrentNavigableMap<ZonedDateTime, String> eventsOlder 
  = eventWindowSort.getEventsOlderThatOneMinute();

断言检查:

  1. 存在超过60秒的事件: ```java long eventsOlderThanOneMinute = eventsOlder .entrySet() .stream() .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1))) .count();

assertTrue(eventsOlderThanOneMinute > 0);


2. 不存在60秒内的事件:
```java
long eventYoungerThanOneMinute = eventsOlder
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertEquals(eventYoungerThanOneMinute, 0);

🔥 关键验证点:在多线程持续写入时,仍能获取准确的时间窗口快照

4. 总结

ConcurrentSkipListMap 在以下场景表现卓越:

  • 需要高并发访问的有序数据结构
  • 要求无锁操作的场景
  • 需要创建数据快照而不阻塞写入操作

本文通过事件流排序的案例,展示了其核心优势:

  1. 无锁快照tailMap()/headMap() 创建不可变视图
  2. 线程安全:多线程环境下数据一致性保证
  3. 自动排序:基于比较器自动维护数据顺序

完整实现代码可在 GitHub项目 中获取(Maven项目,可直接导入运行)。

⚡ 性能提示:虽然 ConcurrentSkipListMap 性能优异,但在超高并发写入场景下,仍需结合实际压测评估吞吐量。


原始标题:Guide to the ConcurrentSkipListMap