Java-фреймворк Fork/Join
Рекурсивное разбиение задач и их параллелизация с помощью Fork/Join: ForkJoinPool, RecursiveTask, work-stealing.
Обычный пул потоков отлично подходит для «множества независимых задач». Но он плохо справляется с «одной большой задачей, которую можно рекурсивно разбить на меньшие версии самой себя». Для второго сценария — работы в стиле «разделяй и властвуй» — в Java есть специализированный исполнитель: ForkJoinPool. Именно он лежит в основе parallelStream, CompletableFuture.supplyAsync (когда исполнитель не задан), а также любого кода, написанного с использованием RecursiveTask/RecursiveAction.
Секрет ForkJoinPool — work-stealing («кража работы»): у каждого рабочего потока есть собственная двусторонняя очередь, и когда она пустеет, поток крадёт задачу с нижнего конца очереди другого потока. Это обеспечивает автоматическую балансировку нагрузки — быстрые потоки помогают медленным без какой-либо явной координации.
Когда стоит применять
Fork/join — правильный инструмент для:
- Рекурсивного разбиения задач. Быстрая сортировка, сортировка слиянием, обход деревьев, рекурсивные числовые алгоритмы, умножение матриц с делением пополам.
- Задач, нагружающих CPU, которые можно распараллелить на примерно равные части.
- Работы с адаптивной гранулярностью: разбивать, если фрагмент большой; выполнять напрямую, если маленький.
Это неправильный инструмент для:
- I/O-bound задач. Заблокированный поток не крадёт задачи — а размер пула по умолчанию равен числу ядер CPU. Заблокируйте один поток — потеряете ядро.
- Независимых несвязанных задач. Обычный
ThreadPoolExecutorпроще и столь же быстр для такого сценария. - Задач, зависящих от фиксированного внешнего расписания. Используйте
ScheduledExecutorService.
Удобная ментальная модель: если вы бы использовали parallelStream для этой работы, fork/join — это тот же сценарий, выраженный напрямую. (Fork/join появился в Java 7; parallelStream в Java 8 был построен поверх него.)
Три основных класса
ForkJoinPool pool; // the executor
RecursiveTask<V>; // an abstract task returning V
RecursiveAction; // an abstract task returning nothingВы расширяете RecursiveTask или RecursiveAction, переопределяете метод compute(), внутри compute() решаете — разбить задачу или выполнить напрямую — и вызываете fork()/join() на подзадачах.
class Sum extends RecursiveTask<Long> {
private static final int THRESHOLD = 1000;
private final long[] data;
private final int lo, hi;
Sum(long[] data, int lo, int hi) {
this.data = data; this.lo = lo; this.hi = hi;
}
@Override
protected Long compute() {
int len = hi - lo;
if (len <= THRESHOLD) {
long s = 0;
for (int i = lo; i < hi; i++) s += data[i];
return s;
}
int mid = lo + len / 2;
Sum left = new Sum(data, lo, mid);
Sum right = new Sum(data, mid, hi);
left.fork(); // schedule left to run on another worker
long rightResult = right.compute(); // run right on this worker (avoid extra task)
long leftResult = left.join(); // wait for left
return leftResult + rightResult;
}
}
ForkJoinPool pool = new ForkJoinPool();
long total = pool.invoke(new Sum(data, 0, data.length));Схема — проверить порог → разбить → форкнуть одну половину → вычислить другую → объединить — является каноническим идиомом fork/join. Приём «вычислять одну половину здесь, а не форкать обе» позволяет избежать создания лишней задачи и даёт небольшой, но реальный выигрыш.
Порог имеет значение
Самое важное решение: когда прекращать разбиение. Слишком маленький порог порождает тысячи задач для тривиальных кусочков — накладные расходы начинают доминировать над полезной работой. Слишком большой — и ядра не удаётся задействовать полностью: многие потоки простаивают, пока один «грызёт» большой фрагмент.
Практические правила:
- Тело задачи должно занимать не менее 10 микросекунд. При меньшем времени накладные расходы на управление задачами сопоставимы с самой работой.
- Задайте порог как константу, которую можно настраивать.
100,1000,10000— типичные значения для примитивных массивов; правильное число зависит от стоимости обработки одного элемента. - Для очень небольших входных данных переключайтесь на чисто последовательную реализацию. Накладные расходы на разбиение и форкинг бессмысленны для данных, умещающихся в кэше.
fork(), join(), invoke()
Три операции над RecursiveTask:
| Метод | Поведение |
|---|---|
fork() | Запланировать задачу в текущем пуле; вернуться немедленно |
join() | Дождаться задачи и вернуть её результат (или пробросить возникшее исключение) |
invoke() | Комбинация fork + join для текущего потока — синхронное выполнение |
compute() | Запустить тело задачи непосредственно в вызывающем потоке (без форка) |
В приведённом выше паттерне left.fork(); right.compute(); left.join(); всё делается правильно — форкаем одну половину в другой поток, выполняем другую здесь, затем ждём завершения форка.
Не стоит писать left.fork(); right.fork(); left.join(); right.join();. Правая часть форкается, а текущий поток ждёт — нет потока выполнения, который реально запустит right, пока поток не дойдёт до join. Такая комбинация впустую тратит временной слот текущего потока.
Общий пул
ForkJoinPool.commonPool() — общий пул на уровне JVM, размер которого по умолчанию равен Runtime.getRuntime().availableProcessors() - 1. Он лежит в основе:
Stream.parallelStream()CompletableFuture.supplyAsync(supplier)(перегрузка без исполнителя)Arrays.parallelSort()
Размер общего пула можно настроить через системное свойство java.util.concurrent.ForkJoinPool.common.parallelism при запуске JVM. Не используйте общий пул для I/O — один блокирующий вызов занимает поток, который разделяет вся JVM.
Work-stealing в картинках
worker-1 deque: [t1 t2 t3 t4] (it forked these; t4 just got pushed)
worker-2 deque: [] (empty — workers steal)
worker-3 deque: [t10 t11] (still has its own)
worker-2 finds its deque empty; steals t1 from the BOTTOM of worker-1's deque
worker-1 keeps pulling its own tasks from the TOPДвусторонняя очередь — сердце этой конструкции: потоки-владельцы добавляют и извлекают задачи с одного конца (LIFO — локальность обращений для попаданий в кэш), а похитители берут с другого (FIFO — минимальная конкуренция с владельцем). Именно поэтому fork/join показывает такую высокую производительность: потоки редко конкурируют за структуры данных друг друга даже при высокой нагрузке.
Пример на практике: параллельная сумма против последовательной
Программа ниже суммирует массив из 10 миллионов элементов двумя способами — последовательным циклом и рекурсией с fork/join — и выводит реальное время выполнения каждого.
Что можно извлечь из запуска:
- Версия с fork/join оказалась в несколько раз быстрее последовательного цикла. На машине с
Nядрами теоретический предел — примерноN×— реальное число было меньше, потому что JVM, GC и другие потоки JVM также претендовали на CPU, а нагрузка при пороговом значении распределяется не идеально. Тем не менее это существенное ускорение за несколько строк рекурсивного кода. - Обе суммы совпали. Это проверка корректности разбиения и слияния: каждый лист суммировал свой непересекающийся срез; шаг объединения (
l + r) складывал их; никакого двойного счёта или гонок данных, поскольку каждый лист писал в свою локальную переменную. - Вариант
SumTinyс порогом10оказался медленнее последовательного цикла. При 10 миллионах элементов, разбитых до фрагментов по 10, создаётся около 2 миллионов задач — и накладные расходы на управление задачами затмевают саму работу сложения. Порог — реальный рычаг управления; измеряйте его на представительных входных данных. - Паттерн
left.fork(); long r = right.compute(); long l = left.join();использовал на одну задачу меньше, чемfork(); fork(); join(); join();. Текущий поток свободен во времяcompute()— использовать его для одной из половин экономит целое выделение задачи. Это небольшой, но накопительный выигрыш на многих реальных нагрузках. ForkJoinPool.commonPool()был исполнителем в этом демо. Для разового запуска общий пул вполне подходит. Для долго работающей программы, которая смешивает fork/join-работу с параллельными потоками (parallel) и асинхронными future, выделите тяжёлой fork/join-нагрузке собственный пул — общий пул предназначен для коротких всплесков, а не для непрерывных интенсивных вычислений.
Что дальше
Следующая глава, Java Concurrent Collections, охватывает структуры данных, спроектированные для одновременного обращения множества потоков: ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue и остальные классы java.util.concurrent.