Структурированный параллелизм в Java
Управляйте конкурентными подзадачами как единым блоком работы в Java с помощью структурированного параллелизма (StructuredTaskScope).
Структурированный параллелизм рассматривает группу конкурентных подзадач как единый блок работы: они запускаются вместе, завершаются вместе, и если одна из них завершается с ошибкой или вызывающий код отменяется, остальные тоже отменяются — никаких "осиротевших" потоков, переживающих блок, который их создал. Модель реализована в java.util.concurrent.StructuredTaskScope (предварительный API, представленный в Java 21) и опирается на те же виртуальные потоки, рассмотренные ранее в этой части. Цель проста: сделать конкурентный код таким же простым для чтения, отладки и понимания, как обычный последовательный метод.
В этой главе объясняется, почему важно слово "структурированный", анатомия области задач, две встроенные политики завершения, как распространяются дедлайны и отмены, а также приведён работающий пример. Предполагается, что вы знакомы с фреймворком исполнителей и Callable/Future.
Почему "структурированный"?
Классические пулы потоков являются неструктурированными: вы submit задачу в общий ExecutorService и получаете обратно Future, время жизни которого не связано с методом, его создавшим. Задача может пережить своего вызывающего, ошибка в одной задаче невидима для её "соседей", а отмена требует ручной настройки. Результатом являются утечки потоков и запутанная обработка ошибок.
Структурированный параллелизм заимствует дисциплину структурированного потока управления: подобно тому, как блок try ограничивает область своих операторов, область задач ограничивает свои подзадачи. Подзадачи, порождённые внутри блока, должны все завершиться до выхода из блока. Время жизни вложено аккуратно, поэтому дамп потоков и трассировка стека действительно показывают, кто что запустил.
| Аспект | Неструктурированный (общий пул ExecutorService) | Структурированный (StructuredTaskScope) |
|---|---|---|
| Время жизни подзадачи | Независимо от вызывающего | Ограничено охватывающим блоком |
| Ошибка в одной подзадаче | Скрыта в Future до вызова get | Может прервать всю область |
| Отмена | Ручная, легко забыть | Автоматическая при сбое или прерывании |
| Очистка ресурсов | На ваше усмотрение | close() ожидает завершения каждой подзадачи |
Структура области задач
Область задач — это AutoCloseable, поэтому она живёт в блоке try-with-resources. Вы fork подзадачи (каждая возвращает дескриптор Subtask), вызываете join() для ожидания, затем считываете каждый результат. Политика ShutdownOnFailure отменяет оставшиеся подзадачи, как только одна из них бросает исключение:
import java.util.concurrent.StructuredTaskScope;
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
StructuredTaskScope.Subtask<String> user = scope.fork(() -> fetchUser(id));
StructuredTaskScope.Subtask<Integer> order = scope.fork(() -> fetchOrderCount(id));
scope.join(); // wait for both branches
scope.throwIfFailed(); // rethrow if either branch failed
return new Profile(user.get(), order.get());
} // close() guarantees both subtasks have ended before we leaveЕсли fetchUser бросает исключение, ShutdownOnFailure прерывает всё ещё выполняющийся fetchOrderCount, join() возвращает управление, а throwIfFailed() повторно бросает исходную причину, обёрнутую в ExecutionException. Вы никогда не допускаете утечки потока.
Встроенные политики завершения
Два предоставляемых шаблона покрывают типичные случаи; для всего остального вы наследуете StructuredTaskScope.
| Политика | Завершается когда | Используется для |
|---|---|---|
ShutdownOnFailure | Все успешны, или одна завершается с ошибкой | Разветвление, когда нужен каждый результат (типичный случай) |
ShutdownOnSuccess<T> | Первый успех, или все завершились с ошибкой | Гонка резервных источников; берёт самый быстрый ответ |
ShutdownOnSuccess возвращает победителя через result() и отменяет проигравших:
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
scope.fork(() -> queryMirrorA());
scope.fork(() -> queryMirrorB());
scope.join();
return scope.result(); // the first one to return; the slower is cancelled
}Дедлайны и отмены распространяются
К области задач можно присоединиться с дедлайном; когда он истекает, незавершённые подзадачи отменяются:
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
scope.fork(() -> slowService());
scope.joinUntil(Instant.now().plusSeconds(2)); // throws TimeoutException if late
scope.throwIfFailed();
}Отмена является кооперативной и распространяется вниз: если поток, которому принадлежит область, прерывается, все подзадачи прерываются в свою очередь. Поскольку каждая подзадача выполняется на собственном виртуальном потоке, порождение тысяч из них обходится дёшево — область, а не фиксированный размер пула, является единицей, о которой вы рассуждаете.
Рабочий пример: разветвление, сбой и объединение списка
StructuredTaskScope является предварительной функцией, поэтому для сохранения работоспособности этого примера на стабильном JDK мы моделируем ту же идею с исполнителем "по одному виртуальному потоку на задачу": блок try-with-resources, ограничивающий группу подзадач, который выходит только после завершения каждого потока подзадачи. Он разветвляет два вызова конкурентно, затем показывает, как сбой прерывает единицу работы, и как invokeAll объединяет сразу целый список.
Что можно извлечь из выполнения:
- Обе подзадачи сообщили
is virtual : true— каждыйsubmitзапускался на собственном виртуальном потоке, таком же лёгком носителе, который используетStructuredTaskScope.fork, поэтому запуск одного потока на подзадачу обходится дёшево. - Блок успешного пути напечатал
ran concurrently (<320ms): true, хотя два получения ждут 120мс и 200мс: они перекрывались, поэтому реальное время определяется самым медленным ветвлением (~200мс), а не суммой (320мс). Это перекрытие и есть весь смысл разветвления. - Выход из блока try-with-resources вызвал
close(), который блокировался до завершения каждого потока подзадачи — область является единицей времени жизни, именно ту дисциплину, которуюStructuredTaskScopeобеспечивает по построению. - В разделе сбоя программа напечатала
caught: IllegalStateException -> upstream said no: ошибка, брошенная внутри подзадачи, всплывает в точке объединения, обёрнутая вExecutionException, аgetCause()возвращает исходное исключение. - После перехвата сбоя было напечатано
sibling cancelled: true— мы отменили всё ещё выполняющуюся веткуgood, чтобы никакой "сирота" не пережил блок, что именноShutdownOnFailureделает за вас автоматически; здесь мы сделали это вручную для наглядности механизма.
Связанные темы
- Виртуальные потоки — лёгкие потоки, на которых выполняется каждая подзадача.
- Современные виртуальные потоки — практические шаблоны и подводные камни.
- Фреймворк исполнителей — неструктурированная база, которую заменяет эта модель.
CallableиFuture— типы задачи и результата, используемые в точке объединения.CompletableFuture— компоновка асинхронных результатов без блокирующих объединений.