読者です 読者をやめる 読者になる 読者になる

エンタープライズギークス (Enterprise Geeks)

企業システムの企画・開発に携わる技術者集団のブログです。開発言語やフレームワークなどアプリケーション開発に関する各種情報を発信しています。ウルシステムズのエンジニア有志が運営しています。

JavaEE7をはじめよう(27) - Concurrency Utilities for Java EE (前編)

JavaEE Concurrency

今回から2回に分けて Concurrency Utilities for Java EE を紹介する。今回は前編として、Concurrency Utilities for Java EE の概要と、アプリケーションサーバー側で管理・提供するスレッドプールの1つであるManagedExecutorServiceを紹介する。

Java SE の Concurrency Utilities

Concurrency Utilities for Java EE のベースとなる Concurrency Utilities は並行処理用の API で、JDK5 からjava.util.concurrentパッケージに追加されている。
Concurrency Utilities には、スレッドプールによるスレッド管理、Futureなどの非同期処理の結果を取得する仕組み、セマフォやロック、バリアなどスレッド間の協調の仕組みが含まれている。このため、スレッドをそのまま使用するよりも安全かつ抽象度の高いプログラムを作成することができる。

Concurrency Utilities for Java EE とは

Concurrency Utilities for Java EE は、Java EE 7 から追加された サーバーサイドで動作する Concurrency Utilities である。

元々 Java EE は独自のスレッドを作成することを推奨しておらず、Concurrency Utilities のスレッドプールを使うこともできなかった。Concurrency Utilities for Java EE を用いることで、アプリケーションサーバーのリソースとしてスレッドプールを持つことが可能となり、これを利用して並行処理を行えるようになる。

そのため、Concurrency Utilities for Java EE を扱うには、 Concurrency Utilities の知識も必要である。

Concurrency Utilities for Java EE の主要なクラスは、javax.enterprise.concurrentパッケージに定義されている。

登録できるリソース

Concurrency Utilities for Java EE は、データソースなどと同様に、サーバーのリソースとして定義する必要がある。

登録する手順はサーバー製品によって異なるが、glassfish では管理コンソールやコマンドによって登録が可能だ。 管理コンソールのリソースのスクリーンショットを示す。

f:id:enterprisegeeks:20150216183137p:plain

上記のスクリーンショットにも表示されているように、Concurrency Utilities for Java EE として登録可能なリソースは以下の4種類である。

  • Managed Executor Service - 並行タスク・スレッドを実行する標準的なスレッドプール
  • Managed Scheduled Executor Service - タスクの繰り返しやタイマー実行が可能なスレッドプール
  • Managed Thread Factory - コンテナ管理用のスレッドを生成するファクトリ
  • Context Service - 並列タスクにコンテナの情報(トランザクションなど)を設定するユーティリティ

通常は、スレッドプールである最初の2つの ExecutorService のいずれかを用途にあわせて使用する。残りの2つは、スレッドプールを作成するための原始的なAPIであり、動的にスレッドプールを生成する場合などに使うものなので、使用頻度は少ないと想定される。本記事でも詳細については割愛する。

ExecutorService の取得

ExecutorService はサーバーリソースであるため、リソースを取得すれば使用できる。JNDI のルックアップを行ってもよいが、下記のように@Resourceによるインジェクションを行うのが簡単である。サーブレットCDI Bean など任意のクラスで使用可能だ。

@Resource(lookup = "concurrent/__defaultManagedExecutorService")
private ManagedExecutorService executor;


@Resource(lookup 
    = "concurrent/__defaultManagedScheduledExecutorService")
private ManagedScheduledExecutorService scheduler;

また、CDI の記事で紹介した Producer フィールドを使用することで、@Injectによるインジェクションも行える。この仕組みを利用すれば、多くのクラスでExecutorServiceを使用する場合に@Resourcelookup属性を一元化できる。

ManagedExecutorServicejava.util.concurrentパッケージのExecutorServiceを継承し、ManagedScheduledExecutorServiceは同じパッケージのScheduledExecutorServiceを継承している。 よって、通常の Concurrency Utilities と同じように利用できる。

ManagedExecutorServiceの利用例

以降では、ManagedExecutorServiceを使用して並列処理を行うことで、処理時間を短縮する例を示す。

ここで題材に取り上げるのは、複数の整数を入力してそれぞれの整数のフィボナッチ数を求める処理である。

このアプリケーションはこちらから実行できる。
(アプリケーションの起動に時間がかかる場合があるため、アクセス時にエラーとなる場合は、しばらく待ってから再度アクセスしていただきたい。ソース一式は github を参照いただきたい。)

以下に画面のスナップショットを示す。

f:id:enterprisegeeks:20160118181118p:plain

このアプリケーションでは、複数行に整数を入力可能で、いずれかの実行ボタンを押すと、入力された整数のフィボナッチ数を計算する。

  • 「計算開始(直列)」ボタン
    通常のforループを利用して1つずつ計算を行う。

  • 「計算開始(直列・SSE)」ボタン
    通常のforループを利用して1つずつ計算を行い、結果は SSE(後述)を使って逐次返す。

  • 「計算開始(並列)」
    ExecutorService を利用して、並列実行で計算を行う。

  • 「計算開始(並列・SSE)」ボタン
    ExecutorService を利用して、並列実行で計算を行い、結果は SSE を使って逐次返す。

入力された値が小さいと、直列と並列の差は出ないが、各行に大きい値を設定すると並列の方が全体の処理時間は小さくなる。(ただし、並列化に伴うオーバーヘッドも存在するため、個別の処理時間は並列処理の方が大きくなっている。)

SSE による並列実装の結果を見ると、並列実行では各処理がほぼ同時に計算が行われていることがわかる。

コラム「SSEとは」

SSE(Server Sent Event)は、HTML5 で追加された通信規格で、サーバーからクライアントへのテキストデータの連続的なプッシュ通信を目的にしている。 WebSocket と似ているが、以下の点で異なっている。
・HTTP 上で実現されている
・サーバーからのみデータを送信する
・テキストデータのみ送信可能
・原則的に1つのクライアントとのみ通信する
WebSocket と比較して用途は限定されるが、取り扱いは容易である。 SSE は Java EE 8 から正式に API が取り込まれることになっており、今回は独自の実装を用いて実現している。SSEの実装の解説は行わないので、興味があれば github のソースを参照いただきたい。

実装の解説

サンプルプログラムの実装を解説する。

フィボナッチ数を求めるfibメソッドのコードは次の通りである。再帰を使った素直な実装にしているため、入力値が大きくなると指数的に処理時間が増大する。

/** 非効率なフィボナッチ数の計算 */
private long fib(long n) {
    if (n <= 2) {
        return 1;
    }
    return fib(n - 1) + fib(n - 2);
}

次のcalcメソッドでは、1つの入力値ごとにfibメソッドを呼び出して、計算結果と処理時間を求めている。大きすぎる数を入力できないように、ガード条件も入れている。

/** 1つの入力値の計算処理 */
protected String calc(int index, String strNum) {
    long start = System.currentTimeMillis();
    if (!strNum.matches("^[0-9]+$")) {
       return makeJson(index, "形式不正");
    } else {
        long num = Long.parseLong(strNum);
        if (num >= 45) {
            return makeJson( index, "45未満で入力してください。");
        } else {
            long answer = fib(num);  // フィボナッチ数の計算
            long time = System.currentTimeMillis() - start;
            return makeJson(index, 
              "(" + time + "msで計算) fib(" + num + ")=" + answer);
        }
    }
}

直列処理の実装

直列処理の実装は次の通りである。これは、先ほどの画面で、「計算開始(並列)」ボタンを押したときに実行されるサーブレットの抜粋である。

ここでは単純に、forループ内で入力値の各行の行数と値に対してcalcメソッドを順番に呼び出している。ソース中の変数inputは、画面の入力フォームの各行の行数をキー、入力値を値としたMapを示している。

// Servlet#doGet内。
List<String> list = new ArrayList<>();

// 処理開始
for (Map.Entry<Integer, String> entry : input.entrySet()) {
    list.add(calc(entry.getKey(), entry.getValue()));
}

並列処理の実装

では、並列処理の実装を見てみよう。先ほどの画面で「計算開始(直列)」ボタンを押したときに実行されるサーブレットの抜粋を示す。

このサーブレットでは、ManagedExecutorServiceをフィールドexecutorでインジェクションしている。

    @Resource(lookup = "concurrent/__defaultManagedExecutorService")
    private ManagedExecutorService executor;

ManagedExecutorServiceを使用した並列処理の実装例は次のようになる。

List<Future<String>> futures = new ArrayList<>();
// 並列計算を順次開始し、計算結果のFutureリストを作成
for (Map.Entry<Integer, String> entry : input.entrySet()) {
    // 処理内容はラムダ式で定義
    Future<String> f = executor.submit(
        () -> calc(entry.getKey(), entry.getValue())
    );
    futures.add(f);
}

List<String> results = new ArrayList<>();
// 登録順に計算結果を取得
for (Future<String> f : futures) {
    try {
        // 最初の計算時間が大きい場合、ブロックするため
        // 後続の計算が終わっていても取得は待たされる。
        results.add(f.get());
    } catch (InterruptedException | ExecutionException e) {}
}

上記のコードでは、入力値を順次executor.submitに渡して、非同期で計算を開始している(5~6行目)。
計算結果は、戻り値のFutureに対してgetを呼ぶことで取得している(下から3行目)。

Future#getは処理結果が取得できるようになるまで他の処理をブロックすることに注意が必要だ。このプログラムは Future#get を入力順に実行しているため、最初の方の入力値の計算で時間がかかった場合、後続の入力値の計算が終了していたとしても、結果の取得まで時間差が発生する。

CompletableFuture の使用例

複数の非同期処理を連続で行いたい場合、Java8 から追加されたCompletableFutureが使用できる。

CompletableFutureは複数の非同期計算の合成を行うフレームワークである。これを使用すると、前述のコードに出てきた、複数のFuture#getを実行した場合に待たされる問題にも柔軟に対応できる。また、CompletableFutureは非同期計算用のスレッドプールを外部から指定できるため、Java EE 環境でも問題なく利用できる。

以下に、CompletableFutureを使用して、計算結果を早い者勝ちでリストに格納する例を示す。このコードは前述のMangedExecutorServiceによる並行処理のコードをCompletableFutureを使用するように書き換えたものである。

// 早い者勝ちで結果を格納するリスト
List<String> firstComes = new CopyOnWriteArrayList<>();

// 並列処理のリストを取得する。
List<CompletableFuture<String>> futures = 
    input.entrySet().stream().map(pair ->
        // supplyAsyncで非同期処理を定義
        CompletableFuture.supplyAsync(
            () -> calc(pair.getKey(), pair.getValue())
            , executor) // サーバーのスレッドプールを必ず指定。
        // 上の計算が終わり次第、次のタスクを実行する。
        .thenApplyAsync(s ->{firstComes.add(s);return s;}, executor)
    ).collect(Collectors.toList());


// CompletableFuture.allOf を使用して、
// 全てのFutureが終了後に行う処理を記述する。
List<String> results = 
    CompletableFuture
    .allOf(futures.toArray(new CompletableFuture[]{}))
    // この処理を行わず、joinの後、直接firstComesを見てもよい。
    .thenApplyAsync(none -> firstComes, executor) 
    .join(); // joinで全てのFutureが完了するのを待つ。

このコードの主な流れは以下の通りである。

並列処理の結果は、変数firstComesに随時格納する。リストの実装には、Concurrency Utilities (Java SE) が提供するCopyOnWriteArrayListを用いている。これはスレッドセーフなArrayListで、並列処理でリストへの追加操作が行われても問題なく動作する。

画面から入力された数値とそのインデックスの集まりは Map<Integer,Integer> の形式で 変数inputに格納されてくる。ここではMapの各要素を Stream API で扱うために、entrySetを呼び出してSet<Map.Entry<Integer,String>>型でエントリーの一覧を取得している。

mapメソッドのラムダ式の 引数pairは、Map.Entry<Integer, String>型で、インデックスと入力数値のペアである。

このpairそれぞれに対して、以下のようにしてCompletableFutureオブジェクトを作成している。

  1. 最初に、CompletableFuture#supplyAsync(ラムダ式, executor)を呼び出し、executorで指定したスレッドプールを使ってフィボナッチ数の計算処理を非同期で実行することを指示する。
  2. 続けて、戻り値のCompletableFutureに対してthenApplyAsync(ラムダ式, executor)を呼び出し、計算結果をfirstComesコレクションに格納することを指示する。

このCompletableFutureオブジェクトは非同期で実行させたい処理そのもので、これをスレッドプールに渡すことで非同期処理を実行している。

コードの後半では、並列で実行した計算結果を取得している。最初にCompletableFuture.allOfで計算終了を待ち合わせたいものを指定し、joinですべての計算が終わるのを待つ。

join直前のthenApplyAsync(none -> firstComes, executor)は、allOfに渡した処理がすべて終わった後で行う処理を示している。ここでは特別な処理を行わないので、単純に処理結果が詰まっている変数firstComesを返すだけにしている。これにより、joinメソッドの戻り値として結果が取得できる。
thenApplyAsync を呼び出さない場合、joinの戻り値はvoidとなり有効な戻り値を取得できない。ただし、joinで取得している変数resultsは結局のところ、変数firstComesと同じものなので、joinで単純に待機して、firstComesを返す実装でもかまわない。

このように CompletableFuture を用いると、小さな処理を組み合わせて並列処理を構築できる。しかも、ラムダ式を使うので、処理内容を定義するために特定のインターフェースを実装するなどの手間が発生しないため、構築する手間も少なくできる。

まとめ

ここまで、ManagedExecutorServiceによる並行処理の例を紹介した。

次回ManagedScheduledExecutorService について解説する。

[前多 賢太郎]