W3docs

Java Executor Framework

Отправляйте задачи в пулы потоков с Executor и ExecutorService — иерархия типов, фабрики и правила выбора размера.

В предыдущей главе описывалось, что такое пул потоков. Эта глава посвящена иерархии типов, с помощью которых вы взаимодействуете с пулом — интерфейсам Executor, ExecutorService и ScheduledExecutorService. Вместе они называются executor framework, введённым в Java 5 для разделения «работы» и «потоков, которые её выполняют». Вы пишете Callable<Result> и Runnable, отправляете задачи, а фреймворк берёт на себя выделение потоков, постановку в очередь и передачу результата.

Трёхуровневая иерархия

Executor          // execute(Runnable)
   |
ExecutorService   // + submit/invokeAll/invokeAny/shutdown/awaitTermination
   |
ScheduledExecutorService  // + schedule/scheduleAtFixedRate/scheduleWithFixedDelay

Программируйте на наиболее общий интерфейс, который содержит всё необходимое:

  • Executor — базовый интерфейс с одним методом. Используйте его, когда нужно запустить задачу без ожидания результата. Параметр метода с типом Executor — самый общий контракт «дай мне что-нибудь, способное выполнить Runnable».
  • ExecutorService — рабочая лошадка. Почти весь производственный код использует этот тип. Добавляет submit (с возвратом Future), массовые операции и жизненный цикл.
  • ScheduledExecutorService — когда нужно отложенное или повторяющееся выполнение.

Executor.execute — запуск без ожидания

public interface Executor {
  void execute(Runnable command);
}

Это весь интерфейс. execute принимает Runnable, запускает его в будущем и ничего не возвращает. Если задача бросает исключение, вы об этом не узнаете — оно попадёт в обработчик непойманных исключений рабочего потока.

execute подходит, когда:

  • Задача не возвращает значения.
  • Вам не нужно ждать её завершения или получать результат.
  • Вам не нужно её отменять.

Для более сложных сценариев используйте submit.

ExecutorService.submit — расширенный вариант

public interface ExecutorService extends Executor {
  <T> Future<T> submit(Callable<T> task);
  Future<?> submit(Runnable task);
  <T> Future<T> submit(Runnable task, T result);
  // ... lifecycle, bulk ops
}

submit возвращает Future, который позволяет:

  • Дождаться завершения (get() блокирует выполнение).
  • Прочитать результат (get() возвращает значение Callable).
  • Отменить задачу (cancel(boolean mayInterrupt)).
  • Поймать исключение задачи (get() перебрасывает его).

Future и Callable подробно рассмотрены в следующей главе; сейчас главное — понять разницу с execute. execute — односторонний; submit открывает обратный канал.

ExecutorService pool = Executors.newFixedThreadPool(4);
Future<Integer> result = pool.submit(() -> {
  // Callable<Integer>; can throw, returns a value
  return expensive();
});

Integer value = result.get();                       // waits, throws ExecutionException if task failed

Массовые операции: invokeAll и invokeAny

Когда у вас есть коллекция задач:

List<Callable<Integer>> tasks = makeTasks();

List<Future<Integer>> futures = pool.invokeAll(tasks);          // run all, wait for all
Integer first = pool.invokeAny(tasks);                          // run all, return first success, cancel the rest

invokeAll(tasks, timeout, unit) запускает задачи, но прерывается по дедлайну; задачи, которые не успели завершиться, возвращаются как Future, у которых isDone() равно true, но они были отменены.

invokeAny — правильный инструмент для избыточных запросов: обратитесь к трём DNS-серверам и возьмите тот, что ответит первым, остальные отмените.

ScheduledExecutorService — задержки и повторения

Когда нужна задержка или периодическое расписание:

ScheduledExecutorService sched = Executors.newScheduledThreadPool(2);

sched.schedule(() -> log("once, after 5 seconds"), 5, TimeUnit.SECONDS);

sched.scheduleAtFixedRate(this::flush, 0, 1, TimeUnit.SECONDS);
// runs at t=0, t=1, t=2, ... — even if a run takes longer, the next one queues

sched.scheduleWithFixedDelay(this::poll, 0, 1, TimeUnit.SECONDS);
// runs at t=0, then 1 second AFTER the previous finished — back-to-back delay is what's fixed

Разница между atFixedRate и withFixedDelay заключается в том, что именно фиксируется: период между запусками или промежуток между завершением и следующим запуском. Для «сбрасывать каждую секунду по часам» используйте atFixedRate; для «пауза в 1 секунду между запусками независимо от их продолжительности» — withFixedDelay.

Если запланированная задача бросает исключение, все последующие запуски молча отменяются. Планировщик ничего не логирует. Всегда оборачивайте запланированные задачи в верхнеуровневый try/catch, чтобы они продолжали работать:

sched.scheduleAtFixedRate(() -> {
  try { flush(); }
  catch (Throwable t) { log.error("flush failed", t); }
}, 0, 1, TimeUnit.SECONDS);

Забыть об этом — самая распространённая ошибка планировщика в производственном Java-коде.

Выбор размера пула

Правильный размер пула зависит от характера задач.

Для CPU-интенсивной работы действует правило: N + 1 потоков на машине с N ядрами. Каждый поток занимает одно ядро; +1 покрывает редкие моменты, когда поток ожидает данных из памяти.

Для I/O-интенсивной работы нужно значительно больше потоков. Приблизительная формула:

threads = cores * (1 + (wait_time / compute_time))

Если задачи 90% времени ожидают ответа от базы данных, множитель равен 10 — 80 потоков на 8 ядрах. Точное число зависит от конкретного паттерна I/O; профилируйте и настраивайте.

На практике запускайте два пула: небольшой для CPU-работы и большой для I/O. Не смешивайте их — медленный запрос к базе данных внутри потока CPU-пула блокирует ядро, которое должно вычислять.

Виртуальные потоки Java 21 кардинально меняют эти расчёты: блокировка на I/O больше не тратит платформенный поток, поэтому можно использовать executor «один виртуальный поток на задачу» и вообще не думать о размере пула. Это рассматривается в конце раздела.

Фабрики Executors — краткий справочник

Фабричные методы возвращают ExecutorService (или его подынтерфейс). Каждый из них — это ThreadPoolExecutor с определёнными настройками:

ФабрикаКонфигурацияКогда использовать
newFixedThreadPool(n)core=max=n, неограниченный LinkedBlockingQueueПредсказуемый параллелизм; ловушка — неограниченная очередь
newCachedThreadPoolcore=0, max=MAX_VALUE, SynchronousQueue, keep-alive 60 сКороткие всплески задач; ловушка — неограниченное число потоков
newSingleThreadExecutorТо же, что newFixedThreadPool(1), но пул нельзя перенастроитьПоследовательный единственный рабочий поток
newScheduledThreadPool(n)n основных потоков, планировщик очередиПериодические задачи
newWorkStealingPoolJava 8+: ForkJoinPool с параллелизмом = числу ядерCPU-интенсивная работа, рекурсивные подзадачи
newVirtualThreadPerTaskExecutorJava 21+: один виртуальный поток на задачуI/O-интенсивная работа, веб-серверы

Не используйте newFixedThreadPool и newCachedThreadPool для путей с повышенной нагрузкой — у обоих есть неограниченные оси роста. Используйте new ThreadPoolExecutor(...) напрямую с ограниченной очередью.

Стандартная последовательность завершения работы

Пул, который никогда не завершается, удерживает свои не-демонические рабочие потоки, препятствуя выходу из JVM. Каждый созданный пул требует одного и того же паттерна очистки:

ExecutorService pool = Executors.newFixedThreadPool(4);
try {
  // ... submit work, gather results ...
} finally {
  pool.shutdown();
  try {
    if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
      pool.shutdownNow();
      pool.awaitTermination(5, TimeUnit.SECONDS);
    }
  } catch (InterruptedException e) {
    pool.shutdownNow();
    Thread.currentThread().interrupt();
  }
}

Или, начиная с Java 19, то же самое через try-with-resources:

try (var pool = Executors.newFixedThreadPool(4)) {
  pool.submit(...);
  pool.submit(...);
}                                                    // close() runs shutdown + awaitTermination

ExecutorService.close() в Java 19 выполняет корректное завершение и ждёт бесконечно; сочетайте с watchdog, если бесконечное ожидание недопустимо.

Развёрнутый пример: фреймворк от начала до конца

Программа ниже использует каждый из трёх интерфейсов — Executor для запуска без ожидания, ExecutorService для получения результатов и ScheduledExecutorService для периодического выполнения — всё в одном месте.

java— editable, runs on the server

Что стоит вынести из запуска:

  • В разделе 2 используется try (ExecutorService pool = ...) — паттерн Java 19 с закрытием по выходу из области видимости. close() пула выполняет shutdown() и ждёт завершения. Это наиболее чистый способ завершения; для более старого кода или при жёстких временных ограничениях возвращайтесь к последовательности shutdown + awaitTermination + shutdownNow.
  • В разделе 3 три задачи (50/80/20 мс) выполнялись на 4 рабочих потоках. invokeAll вернул результат только после завершения самой медленной задачи — примерно через 80 мс. Это и есть контракт «ждать всех». Сумма результатов Future соответствовала сумме возвращённых значений в порядке отправки.
  • В разделе 4 та же структура использовалась с invokeAny. Самая быстрая задача (50 мс) вернула результат первой; остальные были отменены. invokeAny идеально подходит для паттернов «первый успешный ответ» — DNS-запросы к нескольким серверам, загрузка с зеркал, гонка по задержке.
  • В разделе 5 использовался scheduleAtFixedRate с периодом 60 мс. Каждое срабатывание происходило в потоке пула планировщика. Обёртка try/catch внутри тела — это производственный паттерн: если запланированная задача бросает исключение, планировщик молча отменяет все последующие запуски. Оборачивание тела в верхнеуровневый catch предотвращает это.
  • Запланированная задача явно отменялась через cancel(false) перед завершением программы. Отмена и остановка планировщика позволяют JVM завершить работу; без этого планировщик удерживает не-демонические потоки и программа зависает. То же самое относится к каждому создаваемому вами executor.

Что дальше

В следующей главе, Java Callable and Future, подробно рассматривается сторона обработки результатов submitCallable<V>, Future<V>, отмена и стандартные идиомы получения значения из асинхронной задачи.

Практика

Практика
Вы запланировали задачу с помощью `scheduleAtFixedRate`, и на третьем запуске она бросает `RuntimeException`. Что происходит с последующими запусками?
Вы запланировали задачу с помощью `scheduleAtFixedRate`, и на третьем запуске она бросает `RuntimeException`. Что происходит с последующими запусками?
Was this page helpful?