Конкурентные коллекции Java
Потокобезопасные коллекции из java.util.concurrent — ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue — и когда применять каждую из них.
HashMap, ArrayList, ArrayDeque — это повседневные коллекции, и ни одна из них не является потокобезопасной. Использование их из нескольких потоков без внешней синхронизации приводит к потере обновлений, нарушению инвариантов и пресловутому ConcurrentModificationException в середине итерации. Классическим решением было Collections.synchronizedMap(...), оборачивающее обычный словарь в единственный большой замок. Это работает, но сериализует каждую операцию.
Пакет java.util.concurrent заменил подход «обернуть в замок» коллекциями, спроектированными для параллельного доступа с нуля: с разделёнными блокировками, копированием при записи и lock-free вариантами, настроенными под разные соотношения чтения и записи. Эта глава — обзор таких коллекций: что умеет каждый класс и какие ошибки нужно знать.
ConcurrentHashMap — рабочая лошадка
Самая используемая конкурентная коллекция в Java. Словарь с интерфейсом HashMap, который можно использовать из множества потоков без внешней синхронизации:
ConcurrentHashMap<String, Integer> counts = new ConcurrentHashMap<>();
counts.put("hits", 1);
counts.merge("hits", 1, Integer::sum); // atomic add-or-increment
counts.computeIfAbsent("misses", k -> 0);
counts.computeIfPresent("hits", (k, v) -> v + 1);Три вещи делают его быстрым при конкуренции:
- Разделение блокировок (lock striping). Разные ключи защищены разными внутренними замками, поэтому запись несвязанных ключей не блокирует друг друга.
- Lock-free чтение. Чтение вообще не берёт замок (в стационарном состоянии). Читающий поток может «гоняться» с пишущим; результат будет либо старым значением, либо новым — никогда не повреждённым.
- Атомарные составные обновления.
merge,compute,computeIfAbsentиputIfAbsentвыполняют проверку и действие атомарно. Без них несинхронизированный паттернif (!map.containsKey(k)) map.put(k, v)имеет гонку между двумя вызовами; атомарные методы её устраняют.
Используйте ConcurrentHashMap всякий раз, когда к HashMap обращается более одного потока. Это правильный выбор по умолчанию — быстрее, чем Hashtable, быстрее, чем synchronizedMap, и поддерживает атомарные составные обновления, которых другие не имеют.
Одно правило: ключи null и значения null запрещены. containsKey(k) надёжен; map.get(k) == null неоднозначен (ключ отсутствует или значение равно null). Запрет null устраняет неоднозначность.
ConcurrentSkipListMap — сортированный конкурентный словарь
Когда вам нужен TreeMap (сортированный по ключу) с параллельным доступом:
ConcurrentSkipListMap<Long, Event> byTimestamp = new ConcurrentSkipListMap<>();
byTimestamp.put(1700000000000L, e1);
byTimestamp.put(1700000005000L, e2);
byTimestamp.firstEntry(); // earliest
byTimestamp.lastEntry(); // latest
byTimestamp.subMap(start, end); // range queryРеализован на основе skip list (вероятностная альтернатива сбалансированному дереву, которую проще сделать lock-free). Поддерживает полный API NavigableMap. Медленнее ConcurrentHashMap для простого поиска по ключу; правильный выбор, когда нужна упорядоченная итерация или запросы по диапазону.
CopyOnWriteArrayList — список с преобладающим чтением
CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();
listeners.add(myListener);
for (Listener l : listeners) l.onEvent(e); // never throws ConcurrentModificationExceptionПри каждой записи создаётся копия внутреннего массива. Чтение не требует ожидания — никакого замка, никакой синхронизации, никакого CME. Цена очевидна: каждое add/remove/set имеет сложность O(n), так как копирует весь массив.
Это ужасный компромисс для нагрузок с интенсивной записью. Но идеальный для нагрузки, под которую он был спроектирован:
- Небольшой список (десятки, может быть, сотни элементов).
- Чтения значительно превышают количество записей.
- Итерация происходит часто; она никогда не должна бросать
CME.
Хрестоматийный случай применения: список слушателей событий, записей конфигурации или зарегистрированных подписчиков. Чтения происходят при каждом событии; записи — при запуске или когда регистрируется компонент.
Не используйте CopyOnWriteArrayList для «всего, что можно положить в ArrayList». Для изменяемых общих коллекций, которые не маленькие и не предназначены в основном для чтения, используйте Collections.synchronizedList вокруг ArrayList или пересмотрите структуру данных.
BlockingQueue — очередь производитель/потребитель
Наиболее полезная абстракция в java.util.concurrent:
BlockingQueue<Task> queue = new ArrayBlockingQueue<>(1024);
queue.put(task); // blocks if full
queue.offer(task, 100, TimeUnit.MILLISECONDS); // blocks up to deadline
queue.add(task); // throws if full
Task t = queue.take(); // blocks if empty
Task t2 = queue.poll(100, TimeUnit.MILLISECONDS); // blocks up to deadline
Task t3 = queue.poll(); // returns null if emptyput и take — это блокирующие операции: они ожидают, пока очередь не станет неполной / непустой. Это основа фреймворка исполнителей — каждый ThreadPoolExecutor внутри содержит BlockingQueue ожидающих задач; рабочие потоки выполняют take из неё; публичный метод execute кладёт задачу в неё.
Распространённые реализации:
| Класс | Ограничена? | Когда использовать |
|---|---|---|
ArrayBlockingQueue(cap) | Да — фиксированная ёмкость | Буфер фиксированного размера; обратное давление на производителя |
LinkedBlockingQueue() | Нет (или с ограничением) | Высокопроизводительная очередь общего назначения |
SynchronousQueue | 0 — прямая передача | Каждый put ждёт take; передача между потоками |
PriorityBlockingQueue | Нет | Задачи упорядочены по приоритету (не по порядку добавления) |
DelayQueue | Нет | Каждый элемент имеет задержку; извлекается только по её истечении |
ArrayBlockingQueue — стандартный выбор для production: он ограничивает объём незавершённой работы, что критично для обратного давления. LinkedBlockingQueue без ограничения — ловушка, скрытая за Executors.newFixedThreadPool (неограниченная очередь → неограниченная память).
ConcurrentLinkedQueue и ConcurrentLinkedDeque — неограниченные lock-free варианты
ConcurrentLinkedQueue<Event> events = new ConcurrentLinkedQueue<>();
events.add(e);
Event e = events.poll(); // null if empty; doesn't blockНеблокирующие, lock-free, неограниченные. poll возвращает null вместо блокировки; метода take нет. Лучший выбор, когда:
- Нужна высокая пропускная способность.
- Допустимо, что «очередь пуста» означает быстрый возврат, а не ожидание.
- Обратное давление не требуется.
Это не BlockingQueue — выбирайте их, когда вам действительно не нужна блокирующая семантика.
Итерация: слабая согласованность
Итератор HashMap бросает ConcurrentModificationException, если словарь изменяется в процессе итерации. Конкурентные коллекции ведут себя иначе: их итераторы слабо согласованы (weakly consistent). Это означает:
- Они не бросают
ConcurrentModificationException, даже если другие потоки изменяют коллекцию. - Они гарантированно видят каждый элемент, который присутствовал в момент создания итератора.
- Они могут или не могут отражать изменения, внесённые после создания итератора.
Для большинства задач это нормально — итератор-снимок — это именно то, что нужно конкурентному коду. Обратная сторона: size() тоже «слабо согласован» — для ConcurrentHashMap это приблизительный счётчик, а не гарантированное значение снимка. Если вы используете size() как авторитетный источник, вы неправильно используете API.
Когда что выбирать
Упрощённое дерево решений:
- Хранилище ключ-значение →
ConcurrentHashMap(по умолчанию),ConcurrentSkipListMap(нужна сортировка/диапазон). - Список слушателей с преобладающим чтением →
CopyOnWriteArrayList. - Очередь задач производитель–потребитель →
ArrayBlockingQueue(ограниченная),LinkedBlockingQueue(без ограничения),SynchronousQueue(прямая передача). - Приоритетная очередь между потоками →
PriorityBlockingQueue. - Очередь с задержкой →
DelayQueue. - Высокопроизводительная lock-free без блокировки →
ConcurrentLinkedQueue/ConcurrentLinkedDeque. - Множество (Set) →
ConcurrentHashMap.newKeySet(),CopyOnWriteArraySet,ConcurrentSkipListSet.
Всякий раз, когда к обычной коллекции обращается более одного потока, выбирайте конкурентную коллекцию или оборачивайте её в Collections.synchronizedX — никогда не надейтесь на случай.
Рабочий пример: каждая коллекция делает своё дело
Программа ниже демонстрирует четыре конкурентные коллекции под общей нагрузкой — ConcurrentHashMap для подсчёта событий, CopyOnWriteArrayList слушателей, ArrayBlockingQueue для производителя/потребителя и ConcurrentLinkedQueue для lock-free добавления.
Что следует вынести из запуска:
- В разделе 1
ConcurrentHashMap.mergeвыдал точно ожидаемое значение40 000. Функция слияния (Integer::sum) выполнялась атомарно для каждого ключа, поэтому два потока, одновременно инкрементирующих один ключ, никогда не теряли обновление — атомарное составное обновление и есть суть этого. С обычнымHashMap+putвы получили бы меньшее значение и, вероятно, повреждённое внутреннее состояние. - Итератор
CopyOnWriteArrayListиз раздела 2 увидел[a, b, c](снимок в момент создания итератора). Записи, добавившиеdиeв процессе итерации, не бросилиConcurrentModificationExceptionи не были видны текущему итератору. Финальный список содержал все пять элементов — записи произошли, они просто не были видны уже запущенному итератору. ArrayBlockingQueueиз раздела 3 с ёмкостью 4 заставил производителя блокироваться приput, когда очередь была заполнена. Вывод показал, как очередь заполняется до 4, затем производитель останавливается, пока потребитель опустошает её, затем производитель возобновляется. Это обратное давление, обеспечиваемое структурой данных: производитель не может работать быстрее потребителя даже без явного кода координации.ConcurrentLinkedQueueиз раздела 4 принял записи от четырёх потоков без блокировки и без конкуренции за замки. Итоговое количество извлечённых элементов совпало с добавленными — каждый записанный элемент был успешно прочитан. Цена: нетtake()для ожидания пустой очереди;poll()возвращаетnull, и вам нужно обрабатывать это самостоятельно.- На протяжении всего примера конкурентные коллекции никогда не бросали
ConcurrentModificationException. Это исключение — особенность неконкурентных коллекций: так JVM сигнализирует «вы сломали это». Конкурентные коллекции спроектированы для модификации из нескольких потоков, поэтому им не нужен такой сигнал.
Что дальше
В следующей главе, Java Virtual Threads, рассматривается функция Java 21, которая меняет подход к количеству потоков — лёгкие потоки, планируемые JVM, которые снова делают блокирующий I/O дешёвым.