1. 概述

Map 是 Java 集合中最常用的数据结构之一。

关键点:

  • HashMap 是非线程安全的
  • Hashtable 通过同步操作实现线程安全,但效率低下
  • Collections.synchronizedMap 同样存在性能瓶颈

在 Java 1.5 中,Java 集合框架引入了 ConcurrentMap,专门解决高并发场景下线程安全与吞吐量的矛盾问题。以下讨论基于 Java 1.8。

2. ConcurrentMap 接口

ConcurrentMapMap 接口的扩展,为线程安全与原子操作提供了规范。

核心特性:

  1. 禁用 null 支持:重写了以下默认方法,禁止 null 键/值

    • getOrDefault
    • forEach
    • replaceAll
    • computeIfAbsent
    • computeIfPresent
    • compute
    • merge
  2. 原子操作支持:以下方法无默认实现,保证原子性

    • putIfAbsent
    • remove
    • replace(key, oldValue, newValue)
    • replace(key, value)

其余方法直接继承 Map,行为保持一致。

3. ConcurrentHashMap

ConcurrentHashMapConcurrentMap 的开箱即用实现。

底层机制:

  • 由节点数组(Java 8 前为分段锁)构成的哈希表
  • 更新操作主要使用 CAS
  • 桶结构懒加载(首次插入时初始化)
  • 每个桶通过头节点实现独立锁,读操作无阻塞

⚠️ Java 8 变更:

  • 旧版本:concurrencyLevel 参数控制分段数量
  • 新版本:构造函数仅保留向后兼容性,参数仅影响初始容量
// 构造函数示例
public ConcurrentHashMap(
  int initialCapacity, float loadFactor, int concurrencyLevel)

3.1 线程安全保证

ConcurrentMap 保证多线程环境下键值操作的内存一致性:

线程 A 在插入键/值之前的操作 → 线程 B 访问/删除该键/值之后的操作

踩坑案例:HashMap 并发问题

@Test
public void givenHashMap_whenSumParallel_thenError() throws Exception {
    Map<String, Integer> map = new HashMap<>();
    List<Integer> sumList = parallelSum100(map, 100);

    assertNotEquals(1, sumList
      .stream()
      .distinct()
      .count());
    long wrongResultCount = sumList
      .stream()
      .filter(num -> num != 100)
      .count();
    
    assertTrue(wrongResultCount > 0);
}

private List<Integer> parallelSum100(Map<String, Integer> map, 
  int executionTimes) throws InterruptedException {
    List<Integer> sumList = new ArrayList<>(1000);
    for (int i = 0; i < executionTimes; i++) {
        map.put("test", 0);
        ExecutorService executorService = 
          Executors.newFixedThreadPool(4);
        for (int j = 0; j < 10; j++) {
            executorService.execute(() -> {
                for (int k = 0; k < 10; k++)
                    map.computeIfPresent(
                      "test", 
                      (key, value) -> value + 1
                    );
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(5, TimeUnit.SECONDS);
        sumList.add(map.get("test"));
    }
    return sumList;
}

正确姿势:ConcurrentHashMap

@Test
public void givenConcurrentMap_whenSumParallel_thenCorrect() 
  throws Exception {
    Map<String, Integer> map = new ConcurrentHashMap<>();
    List<Integer> sumList = parallelSum100(map, 1000);

    assertEquals(1, sumList
      .stream()
      .distinct()
      .count());
    long wrongResultCount = sumList
      .stream()
      .filter(num -> num != 100)
      .count();
    
    assertEquals(0, wrongResultCount);
}

3.2 Null 键/值处理

严格限制:

@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenPutWithNullKey_thenThrowsNPE() {
    concurrentMap.put(null, new Object());
}

@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenPutNullValue_thenThrowsNPE() {
    concurrentMap.put("test", null);
}

特殊场景:计算类方法

  • compute*merge 允许返回 null
  • 返回 null 表示删除映射(若存在)或保持不存在
@Test
public void givenKeyPresent_whenComputeRemappingNull_thenMappingRemoved() {
    Object oldValue = new Object();
    concurrentMap.put("test", oldValue);
    concurrentMap.compute("test", (s, o) -> null);

    assertNull(concurrentMap.get("test"));
}

3.3 Stream 支持

Java 8 为 ConcurrentHashMap 增加了 Stream 支持:

  • ✅ 批量操作(顺序/并行)允许并发修改
  • ✅ 不会抛出 ConcurrentModificationException
  • ✅ 新增 forEach*searchreduce* 方法支持遍历与归约

3.4 性能对比

简单粗暴的基准测试(4 线程执行 50 万次读写):

@Test
public void givenMaps_whenGetPut500KTimes_thenConcurrentMapFaster() 
  throws Exception {
    Map<String, Object> hashtable = new Hashtable<>();
    Map<String, Object> synchronizedHashMap = 
      Collections.synchronizedMap(new HashMap<>());
    Map<String, Object> concurrentHashMap = new ConcurrentHashMap<>();

    long hashtableAvgRuntime = timeElapseForGetPut(hashtable);
    long syncHashMapAvgRuntime = 
      timeElapseForGetPut(synchronizedHashMap);
    long concurrentHashMapAvgRuntime = 
      timeElapseForGetPut(concurrentHashMap);

    assertTrue(hashtableAvgRuntime > concurrentHashMapAvgRuntime);
    assertTrue(syncHashMapAvgRuntime > concurrentHashMapAvgRuntime);
}

private long timeElapseForGetPut(Map<String, Object> map) 
  throws InterruptedException {
    ExecutorService executorService = 
      Executors.newFixedThreadPool(4);
    long startTime = System.nanoTime();
    for (int i = 0; i < 4; i++) {
        executorService.execute(() -> {
            for (int j = 0; j < 500_000; j++) {
                int value = ThreadLocalRandom
                  .current()
                  .nextInt(10000);
                String key = String.valueOf(value);
                map.put(key, value);
                map.get(key);
            }
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);
    return (System.nanoTime() - startTime) / 500_000;
}

测试结果(纳秒/操作):

Hashtable: 1142.45
SynchronizedHashMap: 1273.89
ConcurrentHashMap: 230.2

结论:

  • ✅ 高并发场景首选 ConcurrentHashMap
  • ✅ 单线程场景 HashMap 更简单高效

3.5 踩坑指南

关键注意事项:

  1. 聚合状态方法的弱一致性

    • size()isEmpty()containsValue() 在并发更新时可能不准确

      @Test
      public void givenConcurrentMap_whenUpdatingAndGetSize_thenError() 
        throws InterruptedException {
        Runnable collectMapSizes = () -> {
            for (int i = 0; i < MAX_SIZE; i++) {
                mapSizes.add(concurrentMap.size());
            }
        };
        Runnable updateMapData = () -> {
            for (int i = 0; i < MAX_SIZE; i++) {
                concurrentMap.put(String.valueOf(i), i);
            }
        };
        executorService.execute(updateMapData);
        executorService.execute(collectMapSizes);
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
      
        assertNotEquals(MAX_SIZE, mapSizes.get(MAX_SIZE - 1).intValue());
        assertEquals(MAX_SIZE, concurrentMap.size());
      }
      
    • ✅ 监控/估算场景可用

    • ❌ 强一致性需求需外部同步

  2. hashCode 陷阱

    • 大量相同 hashCode 的键会严重降低性能
    • 可比较键(Comparable)会利用比较顺序优化
  3. 迭代器特性

    • 弱一致性(非快速失败)
    • 单线程设计
    • 不抛 ConcurrentModificationException
  4. 容量初始化

    public ConcurrentHashMap(
      int initialCapacity, float loadFactor, int concurrencyLevel) {
        if (initialCapacity < concurrencyLevel) {
            initialCapacity = concurrencyLevel; // 容量不低于并发级别
        }
    }
    
  5. 重映射函数原则

    • 保持函数快速、简短、简单
    • 避免阻塞操作
  6. 排序需求

    • 键无序 → 需排序时用 ConcurrentSkipListMap

4. ConcurrentNavigableMap

当需要键排序时,使用 ConcurrentNavigableMap(默认升序)。

并发兼容的视图方法:

  • subMap
  • headMap
  • tailMap
  • descendingMap

弱内存一致性的键集视图:

  • navigableKeySet
  • keySet
  • descendingKeySet

5. ConcurrentSkipListMap

ConcurrentSkipListMapTreeMap 的并发版本,基于跳表(SkipList)实现:

  • 平均时间复杂度:O(log n)
  • 支持键的插入、删除、更新和访问的线程安全

并发导航对比:

@Test
public void givenSkipListMap_whenNavConcurrently_thenCountCorrect() 
    throws InterruptedException {
    NavigableMap<Integer, Integer> skipListMap
      = new ConcurrentSkipListMap<>();
    int count = countMapElementByPollingFirstEntry(skipListMap, 10000, 4);
 
    assertEquals(10000 * 4, count);
}

@Test
public void givenTreeMap_whenNavConcurrently_thenCountError() 
    throws InterruptedException {
    NavigableMap<Integer, Integer> treeMap = new TreeMap<>();
    int count = countMapElementByPollingFirstEntry(treeMap, 10000, 4);
 
    assertNotEquals(10000 * 4, count);
}

private int countMapElementByPollingFirstEntry(
    NavigableMap<Integer, Integer> navigableMap, 
    int elementCount, 
    int concurrencyLevel) throws InterruptedException {
 
    for (int i = 0; i < elementCount * concurrencyLevel; i++) {
        navigableMap.put(i, i);
    }
    
    AtomicInteger counter = new AtomicInteger(0);
    ExecutorService executorService
      = Executors.newFixedThreadPool(concurrencyLevel);
    for (int j = 0; j < concurrencyLevel; j++) {
        executorService.execute(() -> {
            for (int i = 0; i < elementCount; i++) {
                if (navigableMap.pollFirstEntry() != null) {
                    counter.incrementAndGet();
                }
            }
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);
    return counter.get();
}

6. 总结

本文深入探讨了 ConcurrentMap 接口及 ConcurrentHashMap 的核心特性,并介绍了需要键排序时的 ConcurrentNavigableMapConcurrentSkipListMap

关键结论:

  • ✅ 高并发场景优先选择 ConcurrentHashMap
  • ✅ 需要排序时用 ConcurrentSkipListMap
  • ❌ 避免在并发中使用 HashMap/Hashtable
  • ⚠️ 注意聚合状态方法的弱一致性

完整示例代码见 GitHub 项目


原始标题:A Guide to ConcurrentMap | Baeldung