この記事は:birthday:アイスタイル Advent Calendar 2020:birthday:16日目の記事になります
こんにちは、sakohです。
今年のアドカレでは、できるだけ説明的なコーディングをテーマに2件書きました。
こちらは2件目。 一件目はこちら: extractor付き述語を作る(Java)
やること
複数のCompletableFuture非同期処理を待ち合わせる際に、同じ型引数であれば
JavaScriptのPromise.allのようにfluentな非同期連続処理を書けるようにする。
というのが今回のゴールです。
Javaでもこうしたい
const valueWithDelay = (value, delay) =>
new Promise(resolve => setTimeout(() => resolve(value), delay));
Promise
.all([
valueWithDelay("abc", 500),
valueWithDelay("def", 1200),
valueWithDelay("ghi", 800),])
.then(values => console.log(values.join(""))); // 約1200ms後、コンソールにabcdefghiが出力される
課題
Completablefuture#allOfは後続処理に値を渡せない
こんな感じでそれぞれの非同期処理を書いておいて……
private static <E> CompletableFuture<E> withDelay(E value, long delay, Executor executor) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(delay);
return value;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, executor);
}
では、よーいどん。
CompletableFuture<Void> futureStrings =
CompletableFuture
.allOf(
withDelay("abc", 500, this.executor),
withDelay("def", 1200, this.executor),
withDelay("ghi", 800, this.executor));
ゴールに似たことをやろうとすると、ここでつまずきます。
待ち合わせ処理を可能にするCompletableFuture#allOfは
Void型のCompletableFutureを返却するため、
thenApplyやthenAcceptで直接事後処理を繋ぐことができず、
CompletableFuture<String> futureString1 = withDelay("abc", 500, this.executor);
CompletableFuture<String> futureString2 = withDelay("def", 1200, this.executor);
CompletableFuture<String> futureString3 = withDelay("ghi", 800, this.executor);
CompletableFuture
.allOf(
futureString1,
futureString2,
futureString3)
.thenApply(
nothing -> // ここに値が入ってこない……
String.join(
"",
List.of(
futureString1.join(), // ……ので、ここでちまちま集めている
futureString2.join(),
futureString3.join())))
.thenAccept(System.out::println); // 約1200ms後、標準出力にabcdefghiが出力される
このように一つ一つもとのCompletableFuture(解決済み)から
値を引き出してくる必要があります。
いかにも処理の都合に引きずられて
ごちゃごちゃした処理(Technical mumble-jumble)の多い実装ですね。
理由は理解できる
とはいえ元々のCompletableFuture達の型が一つとは限りませんし、
何も返さないこともあるでしょうから
この仕様自体には説明がつきます。
CompletableFuture<String> futureString = this.withDelay("abc", 500, this.executor);
CompletableFuture<Integer> futureInteger = this.withDelay(123, 1200, this.executor);
CompletableFuture<Void> futureNothing = this.withDelay(null, 800, this.executor);
CompletableFuture<Map<String, Integer>> futureMap =
CompletableFuture
.allOf(
futureString,
futureInteger,
futureNothing)
.thenApply(nothing ->
Map.of(
futureString.join(),
futureInteger.join()));
ではどうするか
- だからと言って本筋にフォーカスすることを諦めたりはしません。
- 同じ型であれば扱えるようにできるはずです。
- 困ったらcollector。(次元の違う値を扱う時はだいたいこれ)
複数のCompletableFutureから単一の「Streamの」CompletableFutureへのcollectorを作りましょう。
構想
- 各CompletableFutureをListに集める。
ここは問題ないですね。Collectors#toListからコピペしてもいいです。 - finisherに
後続処理に「集めた各CompletableFutureの結果を一つのStreamにして返す」処理を予約した
CompletableFuture#allOfを返させる。
記事冒頭の「いちいち各CompletableFutureから結果を取り出す」処理を
finisherに押し付けて隠蔽する形になります。
うっかりjoinして同期処理にしてしまわないように注意。
finisherに押し付ける
つまり、
Function<List<CompletableFuture<String>>, CompletableFuture<Stream<String>>> finisher =
futures ->
CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(nothing -> futures.stream().map(CompletableFuture::join)); //予約だけ。
こう。
これで構想は整ったのでCollectorを作ります。
Collector完成
Collector<
CompletableFuture<String>,
List<CompletableFuture<String>>,
CompletableFuture<Stream<String>>>
toFutureOfAll =
Collector.of(
ArrayList::new,
List::add,
(a, b) -> Stream.of(a, b).flatMap(List::stream).collect(toList()),
futures ->
CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(nothing -> futures.stream().map(CompletableFuture::join)));
Ta-da!
使ってみる
では早速。
Stream
.of(
withDelay("abc", 500, this.executor),
withDelay("def", 1200, this.executor),
withDelay("ghi", 800, this.executor))
.collect(toFutureOfAll)
.thenApply(strings -> strings.reduce("", String::concat))
.thenAccept(System.out::println);
よし、これでゴール達成です。
せっかくなので一般化する
今のcollectorは型引数の固定された変数なので、メソッドとして一般化しておきましょう。
public static <E> Collector<
CompletableFuture<E>,
List<CompletableFuture<E>>,
CompletableFuture<Stream<E>>> toFutureOfAll() {
return
Collector.of(
ArrayList::new,
List::add,
(a, b) -> Stream.of(a, b).flatMap(List::stream).collect(toList()),
futures ->
CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(nothing -> futures.stream().map(CompletableFuture::join)));
}
これで何型であろうが同一型・複数のCompletableFutureによる処理結果を
Promise.allよろしくfluentに扱えるようになりました。
めでたしめでたし。
おわりに
何を語るのかということがぶれないように、
適宜枝葉末節は片づけていきましょう。
おまけ
Technical mumble-jumble: (本筋と直接かかわってこない)技術的なごちゃごちゃ
という言い回しは
マルコ・エムリッヒ氏のこのスピーチで出てきて気に入っているのでよく使っています。
長いですが、この動画自体とても勉強になるのでお薦めです。