CompletableFutureをJSのPromise.allっぽく扱う(Java)

この記事は:birthday:アイスタイル Advent Calendar 2020:birthday:16日目の記事になります

こんにちは、sakohです。

今年のアドカレでは、できるだけ説明的なコーディングをテーマに2件書きました。
こちらは2件目。 一件目はこちら: extractor付き述語を作る(Java)

やること

複数のCompletableFuture非同期処理を待ち合わせる際に、同じ型引数であれば
JavaScriptのPromise.allのようにfluentな非同期連続処理を書けるようにする。
というのが今回のゴールです。

Javaでもこうしたい

const valueWithDelay = (value, delay) =>
  new Promise(resolve => setTimeout(() => resolve(value), delay));

Promise
  .all([
    valueWithDelay("abc", 500),
    valueWithDelay("def", 1200),
    valueWithDelay("ghi", 800),])
  .then(values => console.log(values.join(""))); // 約1200ms後、コンソールにabcdefghiが出力される

課題

Completablefuture#allOfは後続処理に値を渡せない

こんな感じでそれぞれの非同期処理を書いておいて……

private static <E> CompletableFuture<E> withDelay(E value, long delay, Executor executor) {
  return CompletableFuture.supplyAsync(() -> {
    try {
      Thread.sleep(delay);
      return value;
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }, executor);
}

では、よーいどん。

    CompletableFuture<Void> futureStrings =
        CompletableFuture
            .allOf(
                withDelay("abc", 500, this.executor),
                withDelay("def", 1200, this.executor),
                withDelay("ghi", 800, this.executor));

ゴールに似たことをやろうとすると、ここでつまずきます。

待ち合わせ処理を可能にするCompletableFuture#allOfは
Void型のCompletableFutureを返却するため、
thenApplyやthenAcceptで直接事後処理を繋ぐことができず、

    CompletableFuture<String> futureString1 = withDelay("abc", 500, this.executor);
    CompletableFuture<String> futureString2 = withDelay("def", 1200, this.executor);
    CompletableFuture<String> futureString3 = withDelay("ghi", 800, this.executor);

    CompletableFuture
        .allOf(
            futureString1,
            futureString2,
            futureString3)
        .thenApply(
            nothing -> // ここに値が入ってこない……
                String.join(
                    "",
                    List.of(
                        futureString1.join(), // ……ので、ここでちまちま集めている
                        futureString2.join(),
                        futureString3.join())))
        .thenAccept(System.out::println); // 約1200ms後、標準出力にabcdefghiが出力される

このように一つ一つもとのCompletableFuture(解決済み)から
値を引き出してくる必要があります。
いかにも処理の都合に引きずられて
ごちゃごちゃした処理(Technical mumble-jumble)の多い実装ですね。

理由は理解できる

とはいえ元々のCompletableFuture達の型が一つとは限りませんし、
何も返さないこともあるでしょうから
この仕様自体には説明がつきます。

    CompletableFuture<String> futureString = this.withDelay("abc", 500, this.executor);
    CompletableFuture<Integer> futureInteger = this.withDelay(123, 1200, this.executor);
    CompletableFuture<Void> futureNothing = this.withDelay(null, 800, this.executor);

    CompletableFuture<Map<String, Integer>> futureMap =
        CompletableFuture
            .allOf(
                futureString,
                futureInteger,
                futureNothing)
            .thenApply(nothing ->
                Map.of(
                    futureString.join(),
                    futureInteger.join()));

ではどうするか

  1. だからと言って本筋にフォーカスすることを諦めたりはしません。
  2. 同じ型であれば扱えるようにできるはずです。
  3. 困ったらcollector。(次元の違う値を扱う時はだいたいこれ)

複数のCompletableFutureから単一の「Streamの」CompletableFutureへのcollectorを作りましょう。

構想

  1. 各CompletableFutureをListに集める。
    ここは問題ないですね。Collectors#toListからコピペしてもいいです。
  2. finisherに
    後続処理に「集めた各CompletableFutureの結果を一つのStreamにして返す」処理を予約した
    CompletableFuture#allOfを返させる。

記事冒頭の「いちいち各CompletableFutureから結果を取り出す」処理を
finisherに押し付けて隠蔽する形になります。
うっかりjoinして同期処理にしてしまわないように注意。

finisherに押し付ける

つまり、

    Function<List<CompletableFuture<String>>, CompletableFuture<Stream<String>>> finisher =
        futures ->
            CompletableFuture
                .allOf(futures.toArray(new CompletableFuture[0]))
                .thenApply(nothing -> futures.stream().map(CompletableFuture::join)); //予約だけ。

こう。
これで構想は整ったのでCollectorを作ります。

Collector完成

    Collector<
        CompletableFuture<String>,
        List<CompletableFuture<String>>,
        CompletableFuture<Stream<String>>>
        toFutureOfAll =
        Collector.of(
            ArrayList::new,
            List::add,
            (a, b) -> Stream.of(a, b).flatMap(List::stream).collect(toList()),
            futures ->
                CompletableFuture
                    .allOf(futures.toArray(new CompletableFuture[0]))
                    .thenApply(nothing -> futures.stream().map(CompletableFuture::join)));

Ta-da!

使ってみる

では早速。

    Stream
        .of(
            withDelay("abc", 500, this.executor),
            withDelay("def", 1200, this.executor),
            withDelay("ghi", 800, this.executor))
        .collect(toFutureOfAll)
        .thenApply(strings -> strings.reduce("", String::concat))
        .thenAccept(System.out::println);

よし、これでゴール達成です。

せっかくなので一般化する

今のcollectorは型引数の固定された変数なので、メソッドとして一般化しておきましょう。

  public static <E> Collector<
      CompletableFuture<E>,
      List<CompletableFuture<E>>,
      CompletableFuture<Stream<E>>> toFutureOfAll() {
    return
        Collector.of(
            ArrayList::new,
            List::add,
            (a, b) -> Stream.of(a, b).flatMap(List::stream).collect(toList()),
            futures ->
                CompletableFuture
                    .allOf(futures.toArray(new CompletableFuture[0]))
                    .thenApply(nothing -> futures.stream().map(CompletableFuture::join)));
  }

これで何型であろうが同一型・複数のCompletableFutureによる処理結果を
Promise.allよろしくfluentに扱えるようになりました。
めでたしめでたし。

おわりに

何を語るのかということがぶれないように、
適宜枝葉末節は片づけていきましょう。

おまけ

Technical mumble-jumble: (本筋と直接かかわってこない)技術的なごちゃごちゃ
という言い回しは
マルコ・エムリッヒ氏のこのスピーチで出てきて気に入っているのでよく使っています。

長いですが、この動画自体とても勉強になるのでお薦めです。

飯を食い、やがて酒を飲むでしょう 2019/12中途入社のバックエンドエンジニア。 とは言いつつフロントエンドの作業もずっとしているので フォアシュトッパーあたり。 アイコンは https://commons.wikimedia.org/wiki/File:Glencairn_Glass-pjt.jpg より。