Java CompletableFuture
Асинхронные вычисления с CompletableFuture: thenApply, thenCompose, allOf, exceptionally и типичные ошибки.
Future — это одноразовый дескриптор результата: отправил, подождал, прочитал. Цепочку он не умеет строить. Если нужно «сделать A, затем с результатом A сделать B, затем объединить B и C и передать в D» без написания стейт-машины вручную, требуется CompletableFuture — переработанная в Java 8 идея асинхронного результата вокруг композиции.
CompletableFuture<V> реализует Future<V>, поэтому весь старый API никуда не делся. Новая часть — это API комбинаторов: около тридцати методов, позволяющих строить графы потоков данных из асинхронных операций — применять функции, выполнять побочные эффекты, комбинировать несколько futures, восстанавливаться после исключений, задавать тайм-ауты — никогда не блокируя поток в ожидании промежуточного результата.
Стартовые методы
Обычно CompletableFuture создаётся не напрямую. Пайплайн начинается с одного из следующих методов:
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> log("hello"));
CompletableFuture<String> c = CompletableFuture.completedFuture("ready");
CompletableFuture<String> d = CompletableFuture.failedFuture(new IOException("nope"));| Стартовый метод | Поведение |
|---|---|
supplyAsync(Supplier) | Выполняет Supplier в общем пуле и возвращает его значение |
runAsync(Runnable) | Выполняет Runnable в общем пуле без значения |
completedFuture(v) | Уже завершённый future с заданным значением |
failedFuture(t) | Уже провалившийся future с заданным throwable |
У supplyAsync и runAsync есть перегрузки, принимающие явный Executor. Передавать его стоит почти всегда. По умолчанию используется ForkJoinPool.commonPool() — общий пул, масштабируемый по количеству ядер CPU, подходящий для короткой CPU-работы, но катастрофический при I/O (один медленный вызов блокирует ядро для всех). Для I/O или работы с неизвестной стоимостью всегда передавайте явный executor.
Цепочки: thenApply, thenAccept, thenRun
Простейшие комбинаторы превращают один future в другой:
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<String> b = a.thenApply(n -> "value is " + n); // transform
CompletableFuture<Void> c = a.thenAccept(n -> System.out.println(n)); // consume, no result
CompletableFuture<Void> d = a.thenRun(() -> System.out.println("done")); // side-effect, ignore value| Метод | Тип лямбды | Возвращает |
|---|---|---|
thenApply | Function<T,U> | CompletableFuture<U> |
thenAccept | Consumer<T> | CompletableFuture<Void> |
thenRun | Runnable | CompletableFuture<Void> |
У каждого метода есть три варианта:
thenApply(fn)— выполняется в том же потоке, который завершил предыдущий этапthenApplyAsync(fn)— выполняется в общем пулеthenApplyAsync(fn, executor)— выполняется в конкретном executor
Вариант без Async самый быстрый (без переключения потока), но означает, что fn выполняется в потоке, завершившем предыдущий этап, — возможно, в I/O-потоке, который не стоит нагружать CPU-работой. Варианты *Async — более безопасный выбор по умолчанию в разнородных пайплайнах.
thenCompose — сглаживание future из future
thenApply хорош, когда функция возвращает обычное значение. Когда она возвращает ещё один CompletableFuture, вам не нужен CompletableFuture<CompletableFuture<V>> — нужен thenCompose:
CompletableFuture<User> user = lookupUser(id);
CompletableFuture<Profile> profile = user.thenCompose(u -> loadProfile(u.profileId()));
// ^ Function<User, CompletableFuture<Profile>>thenCompose — это flatMap для futures. Используйте его, когда следующий шаг сам асинхронный; thenApply — когда нет.
Комбинирование двух futures: thenCombine
Когда есть два независимых асинхронных значения и нужно их объединить:
CompletableFuture<Integer> price = fetchPrice(symbol);
CompletableFuture<Integer> shares = fetchShares(account);
CompletableFuture<Integer> total = price.thenCombine(shares, (p, s) -> p * s);thenCombine ждёт оба входа, затем применяет BiFunction к их результатам. Оба future выполняются параллельно — price и shares уже запущены к моменту регистрации thenCombine. Комбинатор выполняется в потоке, который завершается вторым.
Вариант «любой», applyToEither, берёт первый результат и игнорирует второй.
Много futures: allOf и anyOf
Когда параллелизм распространяется на коллекцию futures:
List<CompletableFuture<String>> all = ids.stream()
.map(this::fetchAsync)
.toList();
CompletableFuture<Void> doneAll = CompletableFuture.allOf(all.toArray(new CompletableFuture[0]));
CompletableFuture<Object> firstOne = CompletableFuture.anyOf(all.toArray(new CompletableFuture[0]));allOf завершается, когда завершены все входные futures. Возвращает CompletableFuture<Void> — чтобы получить список результатов, нужно применить thenApply и извлечь значения обратно:
CompletableFuture<List<String>> results = doneAll.thenApply(v ->
all.stream().map(CompletableFuture::join).toList()); // .join() never blocks here — they're all completeanyOf возвращает значение первого завершившегося входа (как Object — невозможно выразить «любой из этих типизированных futures» одним возвращаемым типом).
Обработка ошибок: exceptionally и handle
CompletableFuture может завершиться с ошибкой (исключение на любом этапе порождает failed future ниже по цепочке). Комбинаторы для восстановления и преобразования:
CompletableFuture<String> safe = riskyAsync()
.exceptionally(ex -> "fallback for: " + ex.getMessage());
CompletableFuture<String> either = riskyAsync()
.handle((value, ex) -> ex == null ? value : "fallback");| Метод | Когда выполняется | Что возвращает |
|---|---|---|
exceptionally(fn) | Только при ошибке; получает причину | Восстановленное значение |
handle(bi) | Всегда; получает (value, ex) (одно из них null) | Преобразованное значение |
whenComplete(bi) | Всегда; получает (value, ex) | Тот же future, только побочный эффект |
exceptionally — простой путь «поймать и заменить». handle — более универсальный вариант «выполнять всегда, решать по результату» — полезен, когда нужно логировать каждое завершение независимо от успеха.
orTimeout и completeOnTimeout
Java 9 добавила тайм-ауты прямо в API futures:
CompletableFuture<String> withDeadline = riskyAsync()
.orTimeout(2, TimeUnit.SECONDS); // completes exceptionally if not done in 2s
CompletableFuture<String> withDefault = riskyAsync()
.completeOnTimeout("fallback", 2, TimeUnit.SECONDS);Они позволяют задавать дедлайны без написания собственного наблюдателя. Используют внутренние запланированные потоки, поэтому дёшевы в подключении.
Не блокируйте внутри асинхронных этапов
Самая распространённая ошибка с CompletableFuture: вызов .get() или .join() внутри Async-этапа. Это поток пула executor, стоящий в ожидании ещё одного потока из того же пула — под нагрузкой можно получить дедлок всего пула.
// WRONG — joining inside an async stage on the common pool
CompletableFuture.supplyAsync(() -> {
Integer x = anotherFuture().join(); // blocks a pool thread
return x * 2;
});
// RIGHT — compose instead of join
anotherFuture().thenApply(x -> x * 2);Если хочется написать .get() внутри Async-этапа, нужно использовать thenCompose/thenApply.
Использование собственного executor
Пул по умолчанию подходит для короткой CPU-работы. Для I/O или любой блокирующей операции используйте свой:
ExecutorService io = Executors.newFixedThreadPool(50, namedFactory("io"));
ExecutorService cpu = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), namedFactory("cpu"));
CompletableFuture.supplyAsync(this::loadFromDb, io)
.thenApplyAsync(this::transform, cpu)
.thenAcceptAsync(this::sendToClient, io);Каждый шаг выполняется в правильном пуле. Общий пул остаётся свободным для parallelStream и других нужд фреймворка. Такое смешение — суть хорошо ведущего себя асинхронного Java-кода.
Пример: небольшой асинхронный пайплайн
Программа ниже получает «пользователя» и «профиль» параллельно, объединяет их, применяет дедлайн и восстанавливается после ошибки.
Что важно вынести из запуска:
- Раздел 1 использовал
thenCombineдля двух независимых запросов. Они выполнялись параллельно —name(50 мс) иage(80 мс) уже были запущены до присоединения комбинатора. Объединённый future завершился вскоре после медленнейшего. Это и есть параллелизм: асинхронный пайплайн не ждёт каждого шага, а выстраивает шаги в виде графа. - Раздел 2 использовал
thenComposeдля цепочки шагов, каждый из которых сам асинхронный.thenApplyдал быCompletableFuture<CompletableFuture<String>>— бесполезно.thenComposeсглаживает, какflatMapдля потоков иOptional. - Раздел 3 использовал
allOfнад списком и затемthenApplyдля извлечения значений. СамallOfвозвращаетVoid; сбор результатов — это отдельный поток над (уже завершёнными) futures черезjoin(). Вызовыjoin()здесь не блокируют, потому чтоallOfуже завершён. - Раздел 4 показал
exceptionally, восстанавливающийся после выброса исключения. Вышестоящий future провалился; нижестоящий вернул строку-заглушку. Безexceptionally(илиhandle) ошибка распространилась бы до.join()какCompletionException. - Раздел 5 использовал
orTimeoutдля применения дедлайна 100 мс к задаче в 500 мс. Future завершился с ошибкойTimeoutException;joinповторно бросил её внутриCompletionException. Это правильный шаблон: «хочу этот результат, но только если он появится достаточно быстро». - Раздел 6 использовал
handleдля ветвления по успеху/ошибке в одном шаге.handleвсегда выполняется и получает оба значения(value, ex)— одно из них null. Удобно, когда нужен единый хвост пайплайна независимо от того, успешна ли операция.
Что дальше
Следующая глава, Java Fork/Join, рассматривает ForkJoinPool — пул с похищением работы, лежащий в основе параллельных потоков и общего пула CompletableFuture, а также правильный инструмент для CPU-работы по принципу «разделяй и властвуй».