W3docs

Параллельные потоки в Java

Обработка потоков Java в параллельном режиме: когда parallelStream ускоряет работу, а когда замедляет.

Параллельный поток — это тот же конвейер потоков, который вы уже писали, но JVM разрешается разбивать источник на части и обрабатывать их в нескольких потоках выполнения. Изменение на месте вызова минимально:

long total = nums.parallelStream().mapToLong(n -> heavy(n)).sum();
//              ^^^^^^^^^^^^^^^^^

или:

long total = nums.stream().parallel().mapToLong(n -> heavy(n)).sum();

Форма конвейера, операции, результат — всё остаётся прежним. Меняется лишь то, кто его выполняет: вместо одного потока, проходящего по источнику, несколько рабочих потоков из общего ForkJoinPool (по одному на каждое ядро процессора минус один) делят работу между собой, а координатор объединяет их частичные результаты. Когда работа на элемент достаточно велика и источник хорошо разбивается, конвейер завершается примерно за реальное время / количество ядер. Когда этого нет, параллельный вариант медленнее последовательного — а иногда и некорректен. Эта глава о том, как отличить одно от другого.

Что на самом деле делает «параллельный» режим

Последовательный поток проводит один элемент через конвейер, затем следующий. Параллельный поток:

  1. Разбивает источник на под-потоки через Spliterator источника. Массивы, ArrayList, IntStream.range и подобные источники делятся чисто за O(1). LinkedList, Files.lines, Stream.iterate и Stream.generate либо делятся плохо, либо не делятся вовсе.
  2. Запускает промежуточную цепочку каждого под-потока на рабочем потоке из общего пула.
  3. Объединяет частичные результаты — именно для этого в reduce и collect нужен combiner.

forEach в параллельном потоке вызывает ваш Consumer из нескольких потоков одновременно и в неопределённом порядке. forEachOrdered сохраняет порядок обхода ценой синхронизации. findFirst в параллельном режиме обходится дороже, чем findAny, по той же причине — нужно координировать, чтобы найти именно первое совпадение.

Контракт — что обязан соблюдать ваш конвейер

Параллельный поток даёт корректный результат только тогда, когда конвейер соответствует трём правилам. Последовательный код, нарушающий их, всё равно работает; параллельный код, нарушающий их, тихо выдаёт бессмыслицу.

  1. Редуктор должен быть ассоциативным. f(f(a, b), c) == f(a, f(b, c)). +, *, max, min, объединение множеств, конкатенация списков — всё подходит. Вычитание, деление, «первое совпадение» и «добавление в список с сохранением порядка» — не подходят. Если передать неассоциативный BinaryOperator в reduce или Collectors.reducing, ответ будет зависеть от того, как JVM случайно разобьёт данные.
  2. Конвейер должен быть без состояния. Лямбды не должны читать или изменять разделяемое изменяемое состояние. Лямбда, захватывающая и изменяющая внешний ArrayList, инкрементирующая внешний int[] или использующая неатомарный счётчик, создаст гонку при параллельной работе.
  3. Конвейер должен быть свободен от побочных эффектов. Логирование допустимо; запись через потокобезопасное хранилище допустима; всё остальное — ошибка, ждущая момента, когда рабочий поток перемежит операции иначе.

Коллекторы из Collectors удовлетворяют правилам 1–3 по построению (при использовании согласно документации). Ваши лямбды внутри map, filter, reduce и peek — это то, за чем нужно следить.

Когда параллельный режим помогает (и когда нет)

Параллельный поток выигрывает только тогда, когда работа на элемент достаточно велика, чтобы перекрыть стоимость координации — разбиения, планирования, слияния и накладных расходов фреймворка. Упрощённая ментальная модель:

  • Большой источник + CPU-нагруженная работа на элемент + дешёвое слияние + хорошо разбиваемый источник = параллельный режим часто выигрывает. Обработка изображений попиксельно, парсинг по записи, хеширование файлов — классические случаи.
  • Маленький источник = выигрывает последовательный. Пробуждение пула обходится дороже, чем всё вычисление.
  • Дешёвая работа на элемент = выигрывает последовательный. nums.stream().mapToInt(Integer::intValue).sum() быстрее, чем parallelStream(), пока nums не вырастет до миллионов; при малых размерах накладные расходы фреймворка доминируют.
  • Блокирующий ввод-вывод на элемент = параллельные потоки не тот инструмент. Общий ForkJoinPool рассчитан на CPU-работу; блокирующий вызов I/O занимает рабочий поток и лишает ресурсов все остальные параллельные потоки в JVM (в том числе из библиотек). Для веерного I/O используйте CompletableFuture с ограниченным исполнителем.
  • Неразбиваемый источник = параллельный поток либо деградирует до последовательного, либо разбивается плохо. Files.lines, Stream.iterate, Stream.generate и LinkedList.stream() — канонические плохие источники; массивы, ArrayList и IntStream.range — канонически хорошие.

Честный совет: по умолчанию используйте последовательный поток; переходите к параллельному только при наличии измеренного обоснования — цифр из jmh или реального времени выполнения.

Операции, поведение которых меняется в параллельном режиме

Несколько операций, смысл которых меняется при переходе конвейера в параллельный режим:

  • forEach — выполняется из нескольких потоков в неопределённом порядке. Если порядок важен, используйте forEachOrdered (это стоит синхронизации).
  • findFirst — требует координации между рабочими потоками, чтобы найти первое совпадение в порядке обхода. Используйте findAny, если не важно, какое совпадение выбрать.
  • limit / skip — хорошо определены на упорядоченных потоках, но в параллельном режиме обходятся дороже, поскольку JVM обязан соблюдать порядок. Если порядок не важен, stream.parallel().unordered().limit(n) дешевле.
  • distinct / sorted — требуют координации между рабочими потоками; буфер, который они используют, является разделяемым.
  • reduce с 3-аргументным перегружением использует combiner для слияния результатов рабочих потоков. При 2-аргументном перегружении JVM дважды применяет идентичность плюс аккумулятор — тот же контракт, то же правило ассоциативности.
  • collectCollectors спроектированы для безопасной работы в параллельном режиме; тонкость в том, что контейнер результатов может быть обычным HashMap или ArrayList, и параллельный сбор координирует это внутренне. Ваши подчинённые коллекторы обязаны соблюдать контракт.

Ловушка разделяемого состояния на конкретном примере

Самая распространённая ошибка в начинающем параллельном коде:

// WRONG -- looks fine, races in parallel
List<String> shouts = new ArrayList<>();
words.parallelStream().forEach(w -> shouts.add(w.toUpperCase()));

ArrayList.add не является потокобезопасным; параллельные рабочие потоки либо теряют элементы, добавляют дубликаты, бросают ArrayIndexOutOfBoundsException или тихо портят список. Правильная версия выражает результат как вывод конвейера, а не его побочный эффект:

List<String> shouts = words.parallelStream().map(String::toUpperCase).toList();

toList(), как и любой другой коллектор и терминальная операция, производящая значение, рассчитаны на параллельное использование. Как только вы тянетесь к forEach, изменяющему внешнюю переменную, вы покидаете безопасный путь.

Если вам действительно нужно потокобезопасное хранилище для forEach, используйте ConcurrentLinkedQueue, AtomicLong, LongAdder или Collections.synchronizedList(...). Но почти всегда правильный ответ — «не используйте forEach для накопления; пусть конвейер сам строит результат».

ForkJoinPool и почему это важно

По умолчанию все параллельные потоки в вашей JVM делят общий пул, размер которого равен Runtime.getRuntime().availableProcessors() - 1 рабочих потоков. Это влечёт два следствия:

  • Долго выполняющийся параллельный поток монополизирует пул. Любой другой параллельный поток — в том числе из библиотек — будет стоять в очереди за ним.
  • Параллельный поток, выполняющий блокирующие операции (I/O, блокировки, Thread.sleep), занимает рабочий поток без полезной работы, эффективно вдвое уменьшая размер пула, пока ждёт.

Вы можете выделить приватный пул для разового конвейера:

try (var pool = new java.util.concurrent.ForkJoinPool(4)) {
    long total = pool.submit(() ->
        nums.parallelStream().mapToLong(n -> heavy(n)).sum()
    ).get();
}

Это правильный шаг для долгих вычислений, которые вы не хотите делить с остальной JVM. Это по-прежнему неправильный шаг для блокирующего I/O — там нужны виртуальные потоки или явная цепочка CompletableFuture на ограниченном исполнителе для I/O.

Практический пример: ускорение в параллельном режиме, ловушка разделяемого состояния и ошибка ассоциативности

Программа ниже измеряет последовательную и параллельную версии CPU-нагруженного суммирования IntStream, демонстрирует гонку состояний при forEach, показывает правильную версию на коллекторах и сравнивает ассоциативный (Integer::sum) и неассоциативный ((a, b) -> a - b) редукторы при параллельном выполнении.

java— editable, runs on the server

Что можно извлечь из запуска:

  • Параллельная сумма дала тот же результат, что и последовательная, и (на любой многоядерной машине) завершилась за долю реального времени. CPU-нагруженный вызов heavy на элемент и хорошо разбиваемый источник (int[]) — два ингредиента, которые необходимы параллельному режиму.
  • forEach, изменявший badSink, либо терял элементы, либо падал с исключением. Никакое добавление synchronized здесь не поможет — это сделает параллельную версию медленнее последовательной. Исправление — не писать forEach для накопления: используйте коллектор или терминальную операцию, производящую результат.
  • Integer::sum ассоциативен; параллельная редукция дала тот же ответ, что и последовательная. Неассоциативный (a, b) -> a - b дал разные ответы в последовательном и параллельном режимах, потому что JVM вправе разбивать и объединять в любом ассоциативно-эквивалентном порядке. Один и тот же код — два ответа: симптом, который рано или поздно проявляется в каждой ошибке с параллельными потоками.
  • parallel().forEach(...) печатал 0..15 в каком-то немонотонном порядке; parallel().forEachOrdered(...) печатал в порядке следования ценой межпоточной синхронизации. Если вашему forEach важен порядок — вы платите за это.
  • Приватный ForkJoinPool(2) запустил конвейер на выделенном пуле. Используйте это для долгих вычислительных задач, которые не должны делить общий пул с остальной JVM. Не применяйте как заплатку для блокирующего I/O — это другая проблема с другим инструментом.

Что дальше

Теперь вы можете рассуждать о любом конвейере потоков: когда его писать, как строить, что в нём ленивое, что завершается досрочно, что работает параллельно безопасно, а что нет. На столе остаётся ещё одна центральная абстракция — та, что позволяет конвейеру выражать «это значение может отсутствовать» без единого null. Следующая глава, Java Optional, посвящена Optional<T> — что это такое, где API потоков оставляет незакрытые вопросы и как использовать map, flatMap, orElse и ifPresent для написания кода, безопасного к null по построению.

Практика

Практика
`nums.parallelStream().reduce(0, (a, b) -> a - b)` возвращает другой ответ, чем аналог с `stream()`. Почему?
`nums.parallelStream().reduce(0, (a, b) -> a - b)` возвращает другой ответ, чем аналог с `stream()`. Почему?
Was this page helpful?