Java Executor Framework
Отправляйте задачи в пулы потоков с Executor и ExecutorService — иерархия типов, фабрики и правила выбора размера.
В предыдущей главе описывалось, что такое пул потоков. Эта глава посвящена иерархии типов, с помощью которых вы взаимодействуете с пулом — интерфейсам Executor, ExecutorService и ScheduledExecutorService. Вместе они называются executor framework, введённым в Java 5 для разделения «работы» и «потоков, которые её выполняют». Вы пишете Callable<Result> и Runnable, отправляете задачи, а фреймворк берёт на себя выделение потоков, постановку в очередь и передачу результата.
Трёхуровневая иерархия
Executor // execute(Runnable)
|
ExecutorService // + submit/invokeAll/invokeAny/shutdown/awaitTermination
|
ScheduledExecutorService // + schedule/scheduleAtFixedRate/scheduleWithFixedDelayПрограммируйте на наиболее общий интерфейс, который содержит всё необходимое:
Executor— базовый интерфейс с одним методом. Используйте его, когда нужно запустить задачу без ожидания результата. Параметр метода с типомExecutor— самый общий контракт «дай мне что-нибудь, способное выполнитьRunnable».ExecutorService— рабочая лошадка. Почти весь производственный код использует этот тип. Добавляетsubmit(с возвратомFuture), массовые операции и жизненный цикл.ScheduledExecutorService— когда нужно отложенное или повторяющееся выполнение.
Executor.execute — запуск без ожидания
public interface Executor {
void execute(Runnable command);
}Это весь интерфейс. execute принимает Runnable, запускает его в будущем и ничего не возвращает. Если задача бросает исключение, вы об этом не узнаете — оно попадёт в обработчик непойманных исключений рабочего потока.
execute подходит, когда:
- Задача не возвращает значения.
- Вам не нужно ждать её завершения или получать результат.
- Вам не нужно её отменять.
Для более сложных сценариев используйте submit.
ExecutorService.submit — расширенный вариант
public interface ExecutorService extends Executor {
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
// ... lifecycle, bulk ops
}submit возвращает Future, который позволяет:
- Дождаться завершения (
get()блокирует выполнение). - Прочитать результат (
get()возвращает значениеCallable). - Отменить задачу (
cancel(boolean mayInterrupt)). - Поймать исключение задачи (
get()перебрасывает его).
Future и Callable подробно рассмотрены в следующей главе; сейчас главное — понять разницу с execute. execute — односторонний; submit открывает обратный канал.
ExecutorService pool = Executors.newFixedThreadPool(4);
Future<Integer> result = pool.submit(() -> {
// Callable<Integer>; can throw, returns a value
return expensive();
});
Integer value = result.get(); // waits, throws ExecutionException if task failedМассовые операции: invokeAll и invokeAny
Когда у вас есть коллекция задач:
List<Callable<Integer>> tasks = makeTasks();
List<Future<Integer>> futures = pool.invokeAll(tasks); // run all, wait for all
Integer first = pool.invokeAny(tasks); // run all, return first success, cancel the restinvokeAll(tasks, timeout, unit) запускает задачи, но прерывается по дедлайну; задачи, которые не успели завершиться, возвращаются как Future, у которых isDone() равно true, но они были отменены.
invokeAny — правильный инструмент для избыточных запросов: обратитесь к трём DNS-серверам и возьмите тот, что ответит первым, остальные отмените.
ScheduledExecutorService — задержки и повторения
Когда нужна задержка или периодическое расписание:
ScheduledExecutorService sched = Executors.newScheduledThreadPool(2);
sched.schedule(() -> log("once, after 5 seconds"), 5, TimeUnit.SECONDS);
sched.scheduleAtFixedRate(this::flush, 0, 1, TimeUnit.SECONDS);
// runs at t=0, t=1, t=2, ... — even if a run takes longer, the next one queues
sched.scheduleWithFixedDelay(this::poll, 0, 1, TimeUnit.SECONDS);
// runs at t=0, then 1 second AFTER the previous finished — back-to-back delay is what's fixedРазница между atFixedRate и withFixedDelay заключается в том, что именно фиксируется: период между запусками или промежуток между завершением и следующим запуском. Для «сбрасывать каждую секунду по часам» используйте atFixedRate; для «пауза в 1 секунду между запусками независимо от их продолжительности» — withFixedDelay.
Если запланированная задача бросает исключение, все последующие запуски молча отменяются. Планировщик ничего не логирует. Всегда оборачивайте запланированные задачи в верхнеуровневый try/catch, чтобы они продолжали работать:
sched.scheduleAtFixedRate(() -> {
try { flush(); }
catch (Throwable t) { log.error("flush failed", t); }
}, 0, 1, TimeUnit.SECONDS);Забыть об этом — самая распространённая ошибка планировщика в производственном Java-коде.
Выбор размера пула
Правильный размер пула зависит от характера задач.
Для CPU-интенсивной работы действует правило: N + 1 потоков на машине с N ядрами. Каждый поток занимает одно ядро; +1 покрывает редкие моменты, когда поток ожидает данных из памяти.
Для I/O-интенсивной работы нужно значительно больше потоков. Приблизительная формула:
threads = cores * (1 + (wait_time / compute_time))Если задачи 90% времени ожидают ответа от базы данных, множитель равен 10 — 80 потоков на 8 ядрах. Точное число зависит от конкретного паттерна I/O; профилируйте и настраивайте.
На практике запускайте два пула: небольшой для CPU-работы и большой для I/O. Не смешивайте их — медленный запрос к базе данных внутри потока CPU-пула блокирует ядро, которое должно вычислять.
Виртуальные потоки Java 21 кардинально меняют эти расчёты: блокировка на I/O больше не тратит платформенный поток, поэтому можно использовать executor «один виртуальный поток на задачу» и вообще не думать о размере пула. Это рассматривается в конце раздела.
Фабрики Executors — краткий справочник
Фабричные методы возвращают ExecutorService (или его подынтерфейс). Каждый из них — это ThreadPoolExecutor с определёнными настройками:
| Фабрика | Конфигурация | Когда использовать |
|---|---|---|
newFixedThreadPool(n) | core=max=n, неограниченный LinkedBlockingQueue | Предсказуемый параллелизм; ловушка — неограниченная очередь |
newCachedThreadPool | core=0, max=MAX_VALUE, SynchronousQueue, keep-alive 60 с | Короткие всплески задач; ловушка — неограниченное число потоков |
newSingleThreadExecutor | То же, что newFixedThreadPool(1), но пул нельзя перенастроить | Последовательный единственный рабочий поток |
newScheduledThreadPool(n) | n основных потоков, планировщик очереди | Периодические задачи |
newWorkStealingPool | Java 8+: ForkJoinPool с параллелизмом = числу ядер | CPU-интенсивная работа, рекурсивные подзадачи |
newVirtualThreadPerTaskExecutor | Java 21+: один виртуальный поток на задачу | I/O-интенсивная работа, веб-серверы |
Не используйте newFixedThreadPool и newCachedThreadPool для путей с повышенной нагрузкой — у обоих есть неограниченные оси роста. Используйте new ThreadPoolExecutor(...) напрямую с ограниченной очередью.
Стандартная последовательность завершения работы
Пул, который никогда не завершается, удерживает свои не-демонические рабочие потоки, препятствуя выходу из JVM. Каждый созданный пул требует одного и того же паттерна очистки:
ExecutorService pool = Executors.newFixedThreadPool(4);
try {
// ... submit work, gather results ...
} finally {
pool.shutdown();
try {
if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
pool.shutdownNow();
pool.awaitTermination(5, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}Или, начиная с Java 19, то же самое через try-with-resources:
try (var pool = Executors.newFixedThreadPool(4)) {
pool.submit(...);
pool.submit(...);
} // close() runs shutdown + awaitTerminationExecutorService.close() в Java 19 выполняет корректное завершение и ждёт бесконечно; сочетайте с watchdog, если бесконечное ожидание недопустимо.
Развёрнутый пример: фреймворк от начала до конца
Программа ниже использует каждый из трёх интерфейсов — Executor для запуска без ожидания, ExecutorService для получения результатов и ScheduledExecutorService для периодического выполнения — всё в одном месте.
Что стоит вынести из запуска:
- В разделе 2 используется
try (ExecutorService pool = ...)— паттерн Java 19 с закрытием по выходу из области видимости.close()пула выполняетshutdown()и ждёт завершения. Это наиболее чистый способ завершения; для более старого кода или при жёстких временных ограничениях возвращайтесь к последовательностиshutdown+awaitTermination+shutdownNow. - В разделе 3 три задачи (50/80/20 мс) выполнялись на 4 рабочих потоках.
invokeAllвернул результат только после завершения самой медленной задачи — примерно через 80 мс. Это и есть контракт «ждать всех». Сумма результатовFutureсоответствовала сумме возвращённых значений в порядке отправки. - В разделе 4 та же структура использовалась с
invokeAny. Самая быстрая задача (50 мс) вернула результат первой; остальные были отменены.invokeAnyидеально подходит для паттернов «первый успешный ответ» — DNS-запросы к нескольким серверам, загрузка с зеркал, гонка по задержке. - В разделе 5 использовался
scheduleAtFixedRateс периодом 60 мс. Каждое срабатывание происходило в потоке пула планировщика. Обёрткаtry/catchвнутри тела — это производственный паттерн: если запланированная задача бросает исключение, планировщик молча отменяет все последующие запуски. Оборачивание тела в верхнеуровневый catch предотвращает это. - Запланированная задача явно отменялась через
cancel(false)перед завершением программы. Отмена и остановка планировщика позволяют JVM завершить работу; без этого планировщик удерживает не-демонические потоки и программа зависает. То же самое относится к каждому создаваемому вами executor.
Что дальше
В следующей главе, Java Callable and Future, подробно рассматривается сторона обработки результатов submit — Callable<V>, Future<V>, отмена и стандартные идиомы получения значения из асинхронной задачи.