W3docs

Java CompletableFuture

Асинхронные вычисления с CompletableFuture: thenApply, thenCompose, allOf, exceptionally и типичные ошибки.

Future — это одноразовый дескриптор результата: отправил, подождал, прочитал. Цепочку он не умеет строить. Если нужно «сделать A, затем с результатом A сделать B, затем объединить B и C и передать в D» без написания стейт-машины вручную, требуется CompletableFuture — переработанная в Java 8 идея асинхронного результата вокруг композиции.

CompletableFuture<V> реализует Future<V>, поэтому весь старый API никуда не делся. Новая часть — это API комбинаторов: около тридцати методов, позволяющих строить графы потоков данных из асинхронных операций — применять функции, выполнять побочные эффекты, комбинировать несколько futures, восстанавливаться после исключений, задавать тайм-ауты — никогда не блокируя поток в ожидании промежуточного результата.

Стартовые методы

Обычно CompletableFuture создаётся не напрямую. Пайплайн начинается с одного из следующих методов:

CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<Void>    b = CompletableFuture.runAsync(() -> log("hello"));
CompletableFuture<String>  c = CompletableFuture.completedFuture("ready");
CompletableFuture<String>  d = CompletableFuture.failedFuture(new IOException("nope"));
Стартовый методПоведение
supplyAsync(Supplier)Выполняет Supplier в общем пуле и возвращает его значение
runAsync(Runnable)Выполняет Runnable в общем пуле без значения
completedFuture(v)Уже завершённый future с заданным значением
failedFuture(t)Уже провалившийся future с заданным throwable

У supplyAsync и runAsync есть перегрузки, принимающие явный Executor. Передавать его стоит почти всегда. По умолчанию используется ForkJoinPool.commonPool() — общий пул, масштабируемый по количеству ядер CPU, подходящий для короткой CPU-работы, но катастрофический при I/O (один медленный вызов блокирует ядро для всех). Для I/O или работы с неизвестной стоимостью всегда передавайте явный executor.

Цепочки: thenApply, thenAccept, thenRun

Простейшие комбинаторы превращают один future в другой:

CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> 42);

CompletableFuture<String>  b = a.thenApply(n -> "value is " + n);          // transform
CompletableFuture<Void>    c = a.thenAccept(n -> System.out.println(n));    // consume, no result
CompletableFuture<Void>    d = a.thenRun(() -> System.out.println("done")); // side-effect, ignore value
МетодТип лямбдыВозвращает
thenApplyFunction<T,U>CompletableFuture<U>
thenAcceptConsumer<T>CompletableFuture<Void>
thenRunRunnableCompletableFuture<Void>

У каждого метода есть три варианта:

  • thenApply(fn) — выполняется в том же потоке, который завершил предыдущий этап
  • thenApplyAsync(fn) — выполняется в общем пуле
  • thenApplyAsync(fn, executor) — выполняется в конкретном executor

Вариант без Async самый быстрый (без переключения потока), но означает, что fn выполняется в потоке, завершившем предыдущий этап, — возможно, в I/O-потоке, который не стоит нагружать CPU-работой. Варианты *Async — более безопасный выбор по умолчанию в разнородных пайплайнах.

thenCompose — сглаживание future из future

thenApply хорош, когда функция возвращает обычное значение. Когда она возвращает ещё один CompletableFuture, вам не нужен CompletableFuture<CompletableFuture<V>> — нужен thenCompose:

CompletableFuture<User> user = lookupUser(id);
CompletableFuture<Profile> profile = user.thenCompose(u -> loadProfile(u.profileId()));
//                                          ^ Function<User, CompletableFuture<Profile>>

thenCompose — это flatMap для futures. Используйте его, когда следующий шаг сам асинхронный; thenApply — когда нет.

Комбинирование двух futures: thenCombine

Когда есть два независимых асинхронных значения и нужно их объединить:

CompletableFuture<Integer> price   = fetchPrice(symbol);
CompletableFuture<Integer> shares  = fetchShares(account);
CompletableFuture<Integer> total   = price.thenCombine(shares, (p, s) -> p * s);

thenCombine ждёт оба входа, затем применяет BiFunction к их результатам. Оба future выполняются параллельно — price и shares уже запущены к моменту регистрации thenCombine. Комбинатор выполняется в потоке, который завершается вторым.

Вариант «любой», applyToEither, берёт первый результат и игнорирует второй.

Много futures: allOf и anyOf

Когда параллелизм распространяется на коллекцию futures:

List<CompletableFuture<String>> all = ids.stream()
    .map(this::fetchAsync)
    .toList();

CompletableFuture<Void> doneAll  = CompletableFuture.allOf(all.toArray(new CompletableFuture[0]));
CompletableFuture<Object> firstOne = CompletableFuture.anyOf(all.toArray(new CompletableFuture[0]));

allOf завершается, когда завершены все входные futures. Возвращает CompletableFuture<Void> — чтобы получить список результатов, нужно применить thenApply и извлечь значения обратно:

CompletableFuture<List<String>> results = doneAll.thenApply(v ->
    all.stream().map(CompletableFuture::join).toList());        // .join() never blocks here — they're all complete

anyOf возвращает значение первого завершившегося входа (как Object — невозможно выразить «любой из этих типизированных futures» одним возвращаемым типом).

Обработка ошибок: exceptionally и handle

CompletableFuture может завершиться с ошибкой (исключение на любом этапе порождает failed future ниже по цепочке). Комбинаторы для восстановления и преобразования:

CompletableFuture<String> safe = riskyAsync()
    .exceptionally(ex -> "fallback for: " + ex.getMessage());

CompletableFuture<String> either = riskyAsync()
    .handle((value, ex) -> ex == null ? value : "fallback");
МетодКогда выполняетсяЧто возвращает
exceptionally(fn)Только при ошибке; получает причинуВосстановленное значение
handle(bi)Всегда; получает (value, ex) (одно из них null)Преобразованное значение
whenComplete(bi)Всегда; получает (value, ex)Тот же future, только побочный эффект

exceptionally — простой путь «поймать и заменить». handle — более универсальный вариант «выполнять всегда, решать по результату» — полезен, когда нужно логировать каждое завершение независимо от успеха.

orTimeout и completeOnTimeout

Java 9 добавила тайм-ауты прямо в API futures:

CompletableFuture<String> withDeadline = riskyAsync()
    .orTimeout(2, TimeUnit.SECONDS);                  // completes exceptionally if not done in 2s

CompletableFuture<String> withDefault = riskyAsync()
    .completeOnTimeout("fallback", 2, TimeUnit.SECONDS);

Они позволяют задавать дедлайны без написания собственного наблюдателя. Используют внутренние запланированные потоки, поэтому дёшевы в подключении.

Не блокируйте внутри асинхронных этапов

Самая распространённая ошибка с CompletableFuture: вызов .get() или .join() внутри Async-этапа. Это поток пула executor, стоящий в ожидании ещё одного потока из того же пула — под нагрузкой можно получить дедлок всего пула.

// WRONG — joining inside an async stage on the common pool
CompletableFuture.supplyAsync(() -> {
  Integer x = anotherFuture().join();                 // blocks a pool thread
  return x * 2;
});

// RIGHT — compose instead of join
anotherFuture().thenApply(x -> x * 2);

Если хочется написать .get() внутри Async-этапа, нужно использовать thenCompose/thenApply.

Использование собственного executor

Пул по умолчанию подходит для короткой CPU-работы. Для I/O или любой блокирующей операции используйте свой:

ExecutorService io = Executors.newFixedThreadPool(50, namedFactory("io"));
ExecutorService cpu = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), namedFactory("cpu"));

CompletableFuture.supplyAsync(this::loadFromDb, io)
    .thenApplyAsync(this::transform, cpu)
    .thenAcceptAsync(this::sendToClient, io);

Каждый шаг выполняется в правильном пуле. Общий пул остаётся свободным для parallelStream и других нужд фреймворка. Такое смешение — суть хорошо ведущего себя асинхронного Java-кода.

Пример: небольшой асинхронный пайплайн

Программа ниже получает «пользователя» и «профиль» параллельно, объединяет их, применяет дедлайн и восстанавливается после ошибки.

java— editable, runs on the server

Что важно вынести из запуска:

  • Раздел 1 использовал thenCombine для двух независимых запросов. Они выполнялись параллельноname (50 мс) и age (80 мс) уже были запущены до присоединения комбинатора. Объединённый future завершился вскоре после медленнейшего. Это и есть параллелизм: асинхронный пайплайн не ждёт каждого шага, а выстраивает шаги в виде графа.
  • Раздел 2 использовал thenCompose для цепочки шагов, каждый из которых сам асинхронный. thenApply дал бы CompletableFuture<CompletableFuture<String>> — бесполезно. thenCompose сглаживает, как flatMap для потоков и Optional.
  • Раздел 3 использовал allOf над списком и затем thenApply для извлечения значений. Сам allOf возвращает Void; сбор результатов — это отдельный поток над (уже завершёнными) futures через join(). Вызовы join() здесь не блокируют, потому что allOf уже завершён.
  • Раздел 4 показал exceptionally, восстанавливающийся после выброса исключения. Вышестоящий future провалился; нижестоящий вернул строку-заглушку. Без exceptionally (или handle) ошибка распространилась бы до .join() как CompletionException.
  • Раздел 5 использовал orTimeout для применения дедлайна 100 мс к задаче в 500 мс. Future завершился с ошибкой TimeoutException; join повторно бросил её внутри CompletionException. Это правильный шаблон: «хочу этот результат, но только если он появится достаточно быстро».
  • Раздел 6 использовал handle для ветвления по успеху/ошибке в одном шаге. handle всегда выполняется и получает оба значения (value, ex) — одно из них null. Удобно, когда нужен единый хвост пайплайна независимо от того, успешна ли операция.

Что дальше

Следующая глава, Java Fork/Join, рассматривает ForkJoinPool — пул с похищением работы, лежащий в основе параллельных потоков и общего пула CompletableFuture, а также правильный инструмент для CPU-работы по принципу «разделяй и властвуй».

Практика

Практика
Вы пишете `CompletableFuture.supplyAsync(() -> { Integer x = otherFuture().get(); return x * 2; })`. Внутри лямбды вы вызываете `.get()` для другого future, отправленного в тот же пул по умолчанию. В чём риск?
Вы пишете `CompletableFuture.supplyAsync(() -> { Integer x = otherFuture().get(); return x * 2; })`. Внутри лямбды вы вызываете `.get()` для другого future, отправленного в тот же пул по умолчанию. В чём риск?
Was this page helpful?