W3docs

Java-фреймворк Fork/Join

Рекурсивное разбиение задач и их параллелизация с помощью Fork/Join: ForkJoinPool, RecursiveTask, work-stealing.

Обычный пул потоков отлично подходит для «множества независимых задач». Но он плохо справляется с «одной большой задачей, которую можно рекурсивно разбить на меньшие версии самой себя». Для второго сценария — работы в стиле «разделяй и властвуй» — в Java есть специализированный исполнитель: ForkJoinPool. Именно он лежит в основе parallelStream, CompletableFuture.supplyAsync (когда исполнитель не задан), а также любого кода, написанного с использованием RecursiveTask/RecursiveAction.

Секрет ForkJoinPoolwork-stealing («кража работы»): у каждого рабочего потока есть собственная двусторонняя очередь, и когда она пустеет, поток крадёт задачу с нижнего конца очереди другого потока. Это обеспечивает автоматическую балансировку нагрузки — быстрые потоки помогают медленным без какой-либо явной координации.

Когда стоит применять

Fork/join — правильный инструмент для:

  • Рекурсивного разбиения задач. Быстрая сортировка, сортировка слиянием, обход деревьев, рекурсивные числовые алгоритмы, умножение матриц с делением пополам.
  • Задач, нагружающих CPU, которые можно распараллелить на примерно равные части.
  • Работы с адаптивной гранулярностью: разбивать, если фрагмент большой; выполнять напрямую, если маленький.

Это неправильный инструмент для:

  • I/O-bound задач. Заблокированный поток не крадёт задачи — а размер пула по умолчанию равен числу ядер CPU. Заблокируйте один поток — потеряете ядро.
  • Независимых несвязанных задач. Обычный ThreadPoolExecutor проще и столь же быстр для такого сценария.
  • Задач, зависящих от фиксированного внешнего расписания. Используйте ScheduledExecutorService.

Удобная ментальная модель: если вы бы использовали parallelStream для этой работы, fork/join — это тот же сценарий, выраженный напрямую. (Fork/join появился в Java 7; parallelStream в Java 8 был построен поверх него.)

Три основных класса

ForkJoinPool pool;                                    // the executor
RecursiveTask<V>;                                     // an abstract task returning V
RecursiveAction;                                      // an abstract task returning nothing

Вы расширяете RecursiveTask или RecursiveAction, переопределяете метод compute(), внутри compute() решаете — разбить задачу или выполнить напрямую — и вызываете fork()/join() на подзадачах.

class Sum extends RecursiveTask<Long> {
  private static final int THRESHOLD = 1000;
  private final long[] data;
  private final int lo, hi;

  Sum(long[] data, int lo, int hi) {
    this.data = data; this.lo = lo; this.hi = hi;
  }

  @Override
  protected Long compute() {
    int len = hi - lo;
    if (len <= THRESHOLD) {
      long s = 0;
      for (int i = lo; i < hi; i++) s += data[i];
      return s;
    }
    int mid = lo + len / 2;
    Sum left  = new Sum(data, lo, mid);
    Sum right = new Sum(data, mid, hi);
    left.fork();                                      // schedule left to run on another worker
    long rightResult = right.compute();               // run right on this worker (avoid extra task)
    long leftResult  = left.join();                   // wait for left
    return leftResult + rightResult;
  }
}

ForkJoinPool pool = new ForkJoinPool();
long total = pool.invoke(new Sum(data, 0, data.length));

Схема — проверить порог → разбить → форкнуть одну половину → вычислить другую → объединить — является каноническим идиомом fork/join. Приём «вычислять одну половину здесь, а не форкать обе» позволяет избежать создания лишней задачи и даёт небольшой, но реальный выигрыш.

Порог имеет значение

Самое важное решение: когда прекращать разбиение. Слишком маленький порог порождает тысячи задач для тривиальных кусочков — накладные расходы начинают доминировать над полезной работой. Слишком большой — и ядра не удаётся задействовать полностью: многие потоки простаивают, пока один «грызёт» большой фрагмент.

Практические правила:

  • Тело задачи должно занимать не менее 10 микросекунд. При меньшем времени накладные расходы на управление задачами сопоставимы с самой работой.
  • Задайте порог как константу, которую можно настраивать. 100, 1000, 10000 — типичные значения для примитивных массивов; правильное число зависит от стоимости обработки одного элемента.
  • Для очень небольших входных данных переключайтесь на чисто последовательную реализацию. Накладные расходы на разбиение и форкинг бессмысленны для данных, умещающихся в кэше.

fork(), join(), invoke()

Три операции над RecursiveTask:

МетодПоведение
fork()Запланировать задачу в текущем пуле; вернуться немедленно
join()Дождаться задачи и вернуть её результат (или пробросить возникшее исключение)
invoke()Комбинация fork + join для текущего потока — синхронное выполнение
compute()Запустить тело задачи непосредственно в вызывающем потоке (без форка)

В приведённом выше паттерне left.fork(); right.compute(); left.join(); всё делается правильно — форкаем одну половину в другой поток, выполняем другую здесь, затем ждём завершения форка.

Не стоит писать left.fork(); right.fork(); left.join(); right.join();. Правая часть форкается, а текущий поток ждёт — нет потока выполнения, который реально запустит right, пока поток не дойдёт до join. Такая комбинация впустую тратит временной слот текущего потока.

Общий пул

ForkJoinPool.commonPool() — общий пул на уровне JVM, размер которого по умолчанию равен Runtime.getRuntime().availableProcessors() - 1. Он лежит в основе:

  • Stream.parallelStream()
  • CompletableFuture.supplyAsync(supplier) (перегрузка без исполнителя)
  • Arrays.parallelSort()

Размер общего пула можно настроить через системное свойство java.util.concurrent.ForkJoinPool.common.parallelism при запуске JVM. Не используйте общий пул для I/O — один блокирующий вызов занимает поток, который разделяет вся JVM.

Work-stealing в картинках

worker-1 deque:  [t1 t2 t3 t4]            (it forked these; t4 just got pushed)
worker-2 deque:  []                       (empty — workers steal)
worker-3 deque:  [t10 t11]                (still has its own)

worker-2 finds its deque empty; steals t1 from the BOTTOM of worker-1's deque
worker-1 keeps pulling its own tasks from the TOP

Двусторонняя очередь — сердце этой конструкции: потоки-владельцы добавляют и извлекают задачи с одного конца (LIFO — локальность обращений для попаданий в кэш), а похитители берут с другого (FIFO — минимальная конкуренция с владельцем). Именно поэтому fork/join показывает такую высокую производительность: потоки редко конкурируют за структуры данных друг друга даже при высокой нагрузке.

Пример на практике: параллельная сумма против последовательной

Программа ниже суммирует массив из 10 миллионов элементов двумя способами — последовательным циклом и рекурсией с fork/join — и выводит реальное время выполнения каждого.

java— editable, runs on the server

Что можно извлечь из запуска:

  • Версия с fork/join оказалась в несколько раз быстрее последовательного цикла. На машине с N ядрами теоретический предел — примерно — реальное число было меньше, потому что JVM, GC и другие потоки JVM также претендовали на CPU, а нагрузка при пороговом значении распределяется не идеально. Тем не менее это существенное ускорение за несколько строк рекурсивного кода.
  • Обе суммы совпали. Это проверка корректности разбиения и слияния: каждый лист суммировал свой непересекающийся срез; шаг объединения (l + r) складывал их; никакого двойного счёта или гонок данных, поскольку каждый лист писал в свою локальную переменную.
  • Вариант SumTiny с порогом 10 оказался медленнее последовательного цикла. При 10 миллионах элементов, разбитых до фрагментов по 10, создаётся около 2 миллионов задач — и накладные расходы на управление задачами затмевают саму работу сложения. Порог — реальный рычаг управления; измеряйте его на представительных входных данных.
  • Паттерн left.fork(); long r = right.compute(); long l = left.join(); использовал на одну задачу меньше, чем fork(); fork(); join(); join();. Текущий поток свободен во время compute() — использовать его для одной из половин экономит целое выделение задачи. Это небольшой, но накопительный выигрыш на многих реальных нагрузках.
  • ForkJoinPool.commonPool() был исполнителем в этом демо. Для разового запуска общий пул вполне подходит. Для долго работающей программы, которая смешивает fork/join-работу с параллельными потоками (parallel) и асинхронными future, выделите тяжёлой fork/join-нагрузке собственный пул — общий пул предназначен для коротких всплесков, а не для непрерывных интенсивных вычислений.

Что дальше

Следующая глава, Java Concurrent Collections, охватывает структуры данных, спроектированные для одновременного обращения множества потоков: ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue и остальные классы java.util.concurrent.

Практика

Практика
Внутри `compute()` класса `RecursiveTask` у вас есть две подзадачи: `left` и `right`. Какой паттерн вызовов является каноническим идиомом fork/join?
Внутри `compute()` класса `RecursiveTask` у вас есть две подзадачи: `left` и `right`. Какой паттерн вызовов является каноническим идиомом fork/join?
Was this page helpful?