Параллельные потоки в Java
Обработка потоков Java в параллельном режиме: когда parallelStream ускоряет работу, а когда замедляет.
Параллельный поток — это тот же конвейер потоков, который вы уже писали, но JVM разрешается разбивать источник на части и обрабатывать их в нескольких потоках выполнения. Изменение на месте вызова минимально:
long total = nums.parallelStream().mapToLong(n -> heavy(n)).sum();
// ^^^^^^^^^^^^^^^^^или:
long total = nums.stream().parallel().mapToLong(n -> heavy(n)).sum();Форма конвейера, операции, результат — всё остаётся прежним. Меняется лишь то, кто его выполняет: вместо одного потока, проходящего по источнику, несколько рабочих потоков из общего ForkJoinPool (по одному на каждое ядро процессора минус один) делят работу между собой, а координатор объединяет их частичные результаты. Когда работа на элемент достаточно велика и источник хорошо разбивается, конвейер завершается примерно за реальное время / количество ядер. Когда этого нет, параллельный вариант медленнее последовательного — а иногда и некорректен. Эта глава о том, как отличить одно от другого.
Что на самом деле делает «параллельный» режим
Последовательный поток проводит один элемент через конвейер, затем следующий. Параллельный поток:
- Разбивает источник на под-потоки через
Spliteratorисточника. Массивы,ArrayList,IntStream.rangeи подобные источники делятся чисто за O(1).LinkedList,Files.lines,Stream.iterateиStream.generateлибо делятся плохо, либо не делятся вовсе. - Запускает промежуточную цепочку каждого под-потока на рабочем потоке из общего пула.
- Объединяет частичные результаты — именно для этого в
reduceиcollectнуженcombiner.
forEach в параллельном потоке вызывает ваш Consumer из нескольких потоков одновременно и в неопределённом порядке. forEachOrdered сохраняет порядок обхода ценой синхронизации. findFirst в параллельном режиме обходится дороже, чем findAny, по той же причине — нужно координировать, чтобы найти именно первое совпадение.
Контракт — что обязан соблюдать ваш конвейер
Параллельный поток даёт корректный результат только тогда, когда конвейер соответствует трём правилам. Последовательный код, нарушающий их, всё равно работает; параллельный код, нарушающий их, тихо выдаёт бессмыслицу.
- Редуктор должен быть ассоциативным.
f(f(a, b), c) == f(a, f(b, c)).+,*,max,min, объединение множеств, конкатенация списков — всё подходит. Вычитание, деление, «первое совпадение» и «добавление в список с сохранением порядка» — не подходят. Если передать неассоциативныйBinaryOperatorвreduceилиCollectors.reducing, ответ будет зависеть от того, как JVM случайно разобьёт данные. - Конвейер должен быть без состояния. Лямбды не должны читать или изменять разделяемое изменяемое состояние. Лямбда, захватывающая и изменяющая внешний
ArrayList, инкрементирующая внешнийint[]или использующая неатомарный счётчик, создаст гонку при параллельной работе. - Конвейер должен быть свободен от побочных эффектов. Логирование допустимо; запись через потокобезопасное хранилище допустима; всё остальное — ошибка, ждущая момента, когда рабочий поток перемежит операции иначе.
Коллекторы из Collectors удовлетворяют правилам 1–3 по построению (при использовании согласно документации). Ваши лямбды внутри map, filter, reduce и peek — это то, за чем нужно следить.
Когда параллельный режим помогает (и когда нет)
Параллельный поток выигрывает только тогда, когда работа на элемент достаточно велика, чтобы перекрыть стоимость координации — разбиения, планирования, слияния и накладных расходов фреймворка. Упрощённая ментальная модель:
- Большой источник + CPU-нагруженная работа на элемент + дешёвое слияние + хорошо разбиваемый источник = параллельный режим часто выигрывает. Обработка изображений попиксельно, парсинг по записи, хеширование файлов — классические случаи.
- Маленький источник = выигрывает последовательный. Пробуждение пула обходится дороже, чем всё вычисление.
- Дешёвая работа на элемент = выигрывает последовательный.
nums.stream().mapToInt(Integer::intValue).sum()быстрее, чемparallelStream(), покаnumsне вырастет до миллионов; при малых размерах накладные расходы фреймворка доминируют. - Блокирующий ввод-вывод на элемент = параллельные потоки не тот инструмент. Общий
ForkJoinPoolрассчитан на CPU-работу; блокирующий вызов I/O занимает рабочий поток и лишает ресурсов все остальные параллельные потоки в JVM (в том числе из библиотек). Для веерного I/O используйтеCompletableFutureс ограниченным исполнителем. - Неразбиваемый источник = параллельный поток либо деградирует до последовательного, либо разбивается плохо.
Files.lines,Stream.iterate,Stream.generateиLinkedList.stream()— канонические плохие источники; массивы,ArrayListиIntStream.range— канонически хорошие.
Честный совет: по умолчанию используйте последовательный поток; переходите к параллельному только при наличии измеренного обоснования — цифр из jmh или реального времени выполнения.
Операции, поведение которых меняется в параллельном режиме
Несколько операций, смысл которых меняется при переходе конвейера в параллельный режим:
forEach— выполняется из нескольких потоков в неопределённом порядке. Если порядок важен, используйтеforEachOrdered(это стоит синхронизации).findFirst— требует координации между рабочими потоками, чтобы найти первое совпадение в порядке обхода. ИспользуйтеfindAny, если не важно, какое совпадение выбрать.limit/skip— хорошо определены на упорядоченных потоках, но в параллельном режиме обходятся дороже, поскольку JVM обязан соблюдать порядок. Если порядок не важен,stream.parallel().unordered().limit(n)дешевле.distinct/sorted— требуют координации между рабочими потоками; буфер, который они используют, является разделяемым.reduceс 3-аргументным перегружением используетcombinerдля слияния результатов рабочих потоков. При 2-аргументном перегружении JVM дважды применяет идентичность плюс аккумулятор — тот же контракт, то же правило ассоциативности.collect—Collectorsспроектированы для безопасной работы в параллельном режиме; тонкость в том, что контейнер результатов может быть обычнымHashMapилиArrayList, и параллельный сбор координирует это внутренне. Ваши подчинённые коллекторы обязаны соблюдать контракт.
Ловушка разделяемого состояния на конкретном примере
Самая распространённая ошибка в начинающем параллельном коде:
// WRONG -- looks fine, races in parallel
List<String> shouts = new ArrayList<>();
words.parallelStream().forEach(w -> shouts.add(w.toUpperCase()));ArrayList.add не является потокобезопасным; параллельные рабочие потоки либо теряют элементы, добавляют дубликаты, бросают ArrayIndexOutOfBoundsException или тихо портят список. Правильная версия выражает результат как вывод конвейера, а не его побочный эффект:
List<String> shouts = words.parallelStream().map(String::toUpperCase).toList();toList(), как и любой другой коллектор и терминальная операция, производящая значение, рассчитаны на параллельное использование. Как только вы тянетесь к forEach, изменяющему внешнюю переменную, вы покидаете безопасный путь.
Если вам действительно нужно потокобезопасное хранилище для forEach, используйте ConcurrentLinkedQueue, AtomicLong, LongAdder или Collections.synchronizedList(...). Но почти всегда правильный ответ — «не используйте forEach для накопления; пусть конвейер сам строит результат».
ForkJoinPool и почему это важно
По умолчанию все параллельные потоки в вашей JVM делят общий пул, размер которого равен Runtime.getRuntime().availableProcessors() - 1 рабочих потоков. Это влечёт два следствия:
- Долго выполняющийся параллельный поток монополизирует пул. Любой другой параллельный поток — в том числе из библиотек — будет стоять в очереди за ним.
- Параллельный поток, выполняющий блокирующие операции (I/O, блокировки,
Thread.sleep), занимает рабочий поток без полезной работы, эффективно вдвое уменьшая размер пула, пока ждёт.
Вы можете выделить приватный пул для разового конвейера:
try (var pool = new java.util.concurrent.ForkJoinPool(4)) {
long total = pool.submit(() ->
nums.parallelStream().mapToLong(n -> heavy(n)).sum()
).get();
}Это правильный шаг для долгих вычислений, которые вы не хотите делить с остальной JVM. Это по-прежнему неправильный шаг для блокирующего I/O — там нужны виртуальные потоки или явная цепочка CompletableFuture на ограниченном исполнителе для I/O.
Практический пример: ускорение в параллельном режиме, ловушка разделяемого состояния и ошибка ассоциативности
Программа ниже измеряет последовательную и параллельную версии CPU-нагруженного суммирования IntStream, демонстрирует гонку состояний при forEach, показывает правильную версию на коллекторах и сравнивает ассоциативный (Integer::sum) и неассоциативный ((a, b) -> a - b) редукторы при параллельном выполнении.
Что можно извлечь из запуска:
- Параллельная сумма дала тот же результат, что и последовательная, и (на любой многоядерной машине) завершилась за долю реального времени. CPU-нагруженный вызов
heavyна элемент и хорошо разбиваемый источник (int[]) — два ингредиента, которые необходимы параллельному режиму. forEach, изменявшийbadSink, либо терял элементы, либо падал с исключением. Никакое добавлениеsynchronizedздесь не поможет — это сделает параллельную версию медленнее последовательной. Исправление — не писатьforEachдля накопления: используйте коллектор или терминальную операцию, производящую результат.Integer::sumассоциативен; параллельная редукция дала тот же ответ, что и последовательная. Неассоциативный(a, b) -> a - bдал разные ответы в последовательном и параллельном режимах, потому что JVM вправе разбивать и объединять в любом ассоциативно-эквивалентном порядке. Один и тот же код — два ответа: симптом, который рано или поздно проявляется в каждой ошибке с параллельными потоками.parallel().forEach(...)печатал0..15в каком-то немонотонном порядке;parallel().forEachOrdered(...)печатал в порядке следования ценой межпоточной синхронизации. Если вашемуforEachважен порядок — вы платите за это.- Приватный
ForkJoinPool(2)запустил конвейер на выделенном пуле. Используйте это для долгих вычислительных задач, которые не должны делить общий пул с остальной JVM. Не применяйте как заплатку для блокирующего I/O — это другая проблема с другим инструментом.
Что дальше
Теперь вы можете рассуждать о любом конвейере потоков: когда его писать, как строить, что в нём ленивое, что завершается досрочно, что работает параллельно безопасно, а что нет. На столе остаётся ещё одна центральная абстракция — та, что позволяет конвейеру выражать «это значение может отсутствовать» без единого null. Следующая глава, Java Optional, посвящена Optional<T> — что это такое, где API потоков оставляет незакрытые вопросы и как использовать map, flatMap, orElse и ifPresent для написания кода, безопасного к null по построению.