JavaEE7をはじめよう(27) - Concurrency Utilities for Java EE (前編)
今回から2回に分けて Concurrency Utilities for Java EE を紹介する。今回は前編として、Concurrency Utilities for Java EE の概要と、アプリケーションサーバー側で管理・提供するスレッドプールの1つであるManagedExecutorService
を紹介する。
Java SE の Concurrency Utilities
Concurrency Utilities for Java EE のベースとなる Concurrency Utilities は並行処理用の API で、JDK5 からjava.util.concurrent
パッケージに追加されている。
Concurrency Utilities には、スレッドプールによるスレッド管理、Future
などの非同期処理の結果を取得する仕組み、セマフォやロック、バリアなどスレッド間の協調の仕組みが含まれている。このため、スレッドをそのまま使用するよりも安全かつ抽象度の高いプログラムを作成することができる。
Concurrency Utilities for Java EE とは
Concurrency Utilities for Java EE は、Java EE 7 から追加された サーバーサイドで動作する Concurrency Utilities である。
元々 Java EE は独自のスレッドを作成することを推奨しておらず、Concurrency Utilities のスレッドプールを使うこともできなかった。Concurrency Utilities for Java EE を用いることで、アプリケーションサーバーのリソースとしてスレッドプールを持つことが可能となり、これを利用して並行処理を行えるようになる。
そのため、Concurrency Utilities for Java EE を扱うには、 Concurrency Utilities の知識も必要である。
Concurrency Utilities for Java EE の主要なクラスは、javax.enterprise.concurrent
パッケージに定義されている。
登録できるリソース
Concurrency Utilities for Java EE は、データソースなどと同様に、サーバーのリソースとして定義する必要がある。
登録する手順はサーバー製品によって異なるが、glassfish では管理コンソールやコマンドによって登録が可能だ。 管理コンソールのリソースのスクリーンショットを示す。
上記のスクリーンショットにも表示されているように、Concurrency Utilities for Java EE として登録可能なリソースは以下の4種類である。
- Managed Executor Service - 並行タスク・スレッドを実行する標準的なスレッドプール
- Managed Scheduled Executor Service - タスクの繰り返しやタイマー実行が可能なスレッドプール
- Managed Thread Factory - コンテナ管理用のスレッドを生成するファクトリ
- Context Service - 並列タスクにコンテナの情報(トランザクションなど)を設定するユーティリティ
通常は、スレッドプールである最初の2つの ExecutorService のいずれかを用途にあわせて使用する。残りの2つは、スレッドプールを作成するための原始的なAPIであり、動的にスレッドプールを生成する場合などに使うものなので、使用頻度は少ないと想定される。本記事でも詳細については割愛する。
ExecutorService の取得
ExecutorService はサーバーリソースであるため、リソースを取得すれば使用できる。JNDI のルックアップを行ってもよいが、下記のように@Resource
によるインジェクションを行うのが簡単である。サーブレットや CDI Bean など任意のクラスで使用可能だ。
@Resource(lookup = "concurrent/__defaultManagedExecutorService") private ManagedExecutorService executor; @Resource(lookup = "concurrent/__defaultManagedScheduledExecutorService") private ManagedScheduledExecutorService scheduler;
また、CDI の記事で紹介した Producer フィールドを使用することで、@Inject
によるインジェクションも行える。この仕組みを利用すれば、多くのクラスでExecutorService
を使用する場合に@Resource
のlookup
属性を一元化できる。
ManagedExecutorService
はjava.util.concurrent
パッケージのExecutorService
を継承し、ManagedScheduledExecutorService
は同じパッケージのScheduledExecutorService
を継承している。
よって、通常の Concurrency Utilities と同じように利用できる。
ManagedExecutorService
の利用例
以降では、ManagedExecutorService
を使用して並列処理を行うことで、処理時間を短縮する例を示す。
ここで題材に取り上げるのは、複数の整数を入力してそれぞれの整数のフィボナッチ数を求める処理である。
このアプリケーションはこちらから実行できる。
(アプリケーションの起動に時間がかかる場合があるため、アクセス時にエラーとなる場合は、しばらく待ってから再度アクセスしていただきたい。ソース一式は github を参照いただきたい。)
以下に画面のスナップショットを示す。
このアプリケーションでは、複数行に整数を入力可能で、いずれかの実行ボタンを押すと、入力された整数のフィボナッチ数を計算する。
「計算開始(直列)」ボタン
通常のfor
ループを利用して1つずつ計算を行う。「計算開始(直列・SSE)」ボタン
通常のfor
ループを利用して1つずつ計算を行い、結果は SSE(後述)を使って逐次返す。「計算開始(並列)」
ExecutorService を利用して、並列実行で計算を行う。「計算開始(並列・SSE)」ボタン
ExecutorService を利用して、並列実行で計算を行い、結果は SSE を使って逐次返す。
入力された値が小さいと、直列と並列の差は出ないが、各行に大きい値を設定すると並列の方が全体の処理時間は小さくなる。(ただし、並列化に伴うオーバーヘッドも存在するため、個別の処理時間は並列処理の方が大きくなっている。)
SSE による並列実装の結果を見ると、並列実行では各処理がほぼ同時に計算が行われていることがわかる。
コラム「SSEとは」
SSE(Server Sent Event)は、HTML5 で追加された通信規格で、サーバーからクライアントへのテキストデータの連続的なプッシュ通信を目的にしている。 WebSocket と似ているが、以下の点で異なっている。
・HTTP 上で実現されている
・サーバーからのみデータを送信する
・テキストデータのみ送信可能
・原則的に1つのクライアントとのみ通信する
WebSocket と比較して用途は限定されるが、取り扱いは容易である。 SSE は Java EE 8 から正式に API が取り込まれることになっており、今回は独自の実装を用いて実現している。SSEの実装の解説は行わないので、興味があれば github のソースを参照いただきたい。
実装の解説
サンプルプログラムの実装を解説する。
フィボナッチ数を求めるfib
メソッドのコードは次の通りである。再帰を使った素直な実装にしているため、入力値が大きくなると指数的に処理時間が増大する。
/** 非効率なフィボナッチ数の計算 */ private long fib(long n) { if (n <= 2) { return 1; } return fib(n - 1) + fib(n - 2); }
次のcalc
メソッドでは、1つの入力値ごとにfib
メソッドを呼び出して、計算結果と処理時間を求めている。大きすぎる数を入力できないように、ガード条件も入れている。
/** 1つの入力値の計算処理 */ protected String calc(int index, String strNum) { long start = System.currentTimeMillis(); if (!strNum.matches("^[0-9]+$")) { return makeJson(index, "形式不正"); } else { long num = Long.parseLong(strNum); if (num >= 45) { return makeJson( index, "45未満で入力してください。"); } else { long answer = fib(num); // フィボナッチ数の計算 long time = System.currentTimeMillis() - start; return makeJson(index, "(" + time + "msで計算) fib(" + num + ")=" + answer); } } }
直列処理の実装
直列処理の実装は次の通りである。これは、先ほどの画面で、「計算開始(並列)」ボタンを押したときに実行されるサーブレットの抜粋である。
ここでは単純に、for
ループ内で入力値の各行の行数と値に対してcalc
メソッドを順番に呼び出している。ソース中の変数input
は、画面の入力フォームの各行の行数をキー、入力値を値としたMap
を示している。
// Servlet#doGet内。 List<String> list = new ArrayList<>(); // 処理開始 for (Map.Entry<Integer, String> entry : input.entrySet()) { list.add(calc(entry.getKey(), entry.getValue())); }
並列処理の実装
では、並列処理の実装を見てみよう。先ほどの画面で「計算開始(直列)」ボタンを押したときに実行されるサーブレットの抜粋を示す。
このサーブレットでは、ManagedExecutorService
をフィールドexecutor
でインジェクションしている。
@Resource(lookup = "concurrent/__defaultManagedExecutorService") private ManagedExecutorService executor;
ManagedExecutorService
を使用した並列処理の実装例は次のようになる。
List<Future<String>> futures = new ArrayList<>(); // 並列計算を順次開始し、計算結果のFutureリストを作成 for (Map.Entry<Integer, String> entry : input.entrySet()) { // 処理内容はラムダ式で定義 Future<String> f = executor.submit( () -> calc(entry.getKey(), entry.getValue()) ); futures.add(f); } List<String> results = new ArrayList<>(); // 登録順に計算結果を取得 for (Future<String> f : futures) { try { // 最初の計算時間が大きい場合、ブロックするため // 後続の計算が終わっていても取得は待たされる。 results.add(f.get()); } catch (InterruptedException | ExecutionException e) {} }
上記のコードでは、入力値を順次executor.submit
に渡して、非同期で計算を開始している(5~6行目)。
計算結果は、戻り値のFuture
に対してget
を呼ぶことで取得している(下から3行目)。
Future#get
は処理結果が取得できるようになるまで他の処理をブロックすることに注意が必要だ。このプログラムは Future#get
を入力順に実行しているため、最初の方の入力値の計算で時間がかかった場合、後続の入力値の計算が終了していたとしても、結果の取得まで時間差が発生する。
CompletableFuture の使用例
複数の非同期処理を連続で行いたい場合、Java8 から追加されたCompletableFuture
が使用できる。
CompletableFuture
は複数の非同期計算の合成を行うフレームワークである。これを使用すると、前述のコードに出てきた、複数のFuture#get
を実行した場合に待たされる問題にも柔軟に対応できる。また、CompletableFuture
は非同期計算用のスレッドプールを外部から指定できるため、Java EE 環境でも問題なく利用できる。
以下に、CompletableFuture
を使用して、計算結果を早い者勝ちでリストに格納する例を示す。このコードは前述のMangedExecutorService
による並行処理のコードをCompletableFuture
を使用するように書き換えたものである。
// 早い者勝ちで結果を格納するリスト List<String> firstComes = new CopyOnWriteArrayList<>(); // 並列処理のリストを取得する。 List<CompletableFuture<String>> futures = input.entrySet().stream().map(pair -> // supplyAsyncで非同期処理を定義 CompletableFuture.supplyAsync( () -> calc(pair.getKey(), pair.getValue()) , executor) // サーバーのスレッドプールを必ず指定。 // 上の計算が終わり次第、次のタスクを実行する。 .thenApplyAsync(s ->{firstComes.add(s);return s;}, executor) ).collect(Collectors.toList()); // CompletableFuture.allOf を使用して、 // 全てのFutureが終了後に行う処理を記述する。 List<String> results = CompletableFuture .allOf(futures.toArray(new CompletableFuture[]{})) // この処理を行わず、joinの後、直接firstComesを見てもよい。 .thenApplyAsync(none -> firstComes, executor) .join(); // joinで全てのFutureが完了するのを待つ。
このコードの主な流れは以下の通りである。
並列処理の結果は、変数firstComes
に随時格納する。リストの実装には、Concurrency Utilities (Java SE) が提供するCopyOnWriteArrayList
を用いている。これはスレッドセーフなArrayList
で、並列処理でリストへの追加操作が行われても問題なく動作する。
画面から入力された数値とそのインデックスの集まりは Map<Integer,Integer>
の形式で 変数input
に格納されてくる。ここではMap
の各要素を Stream API で扱うために、entrySet
を呼び出してSet<Map.Entry<Integer,String>>
型でエントリーの一覧を取得している。
map
メソッドのラムダ式の 引数pair
は、Map.Entry<Integer, String>
型で、インデックスと入力数値のペアである。
このpair
それぞれに対して、以下のようにしてCompletableFuture
オブジェクトを作成している。
- 最初に、
CompletableFuture#supplyAsync(ラムダ式, executor)
を呼び出し、executor
で指定したスレッドプールを使ってフィボナッチ数の計算処理を非同期で実行することを指示する。 - 続けて、戻り値の
CompletableFuture
に対してthenApplyAsync(ラムダ式, executor)
を呼び出し、計算結果をfirstComes
コレクションに格納することを指示する。
このCompletableFuture
オブジェクトは非同期で実行させたい処理そのもので、これをスレッドプールに渡すことで非同期処理を実行している。
コードの後半では、並列で実行した計算結果を取得している。最初にCompletableFuture.allOf
で計算終了を待ち合わせたいものを指定し、join
ですべての計算が終わるのを待つ。
join
直前のthenApplyAsync(none -> firstComes, executor)
は、allOf
に渡した処理がすべて終わった後で行う処理を示している。ここでは特別な処理を行わないので、単純に処理結果が詰まっている変数firstComes
を返すだけにしている。これにより、join
メソッドの戻り値として結果が取得できる。
thenApplyAsync
を呼び出さない場合、join
の戻り値はvoid
となり有効な戻り値を取得できない。ただし、join
で取得している変数results
は結局のところ、変数firstComes
と同じものなので、join
で単純に待機して、firstComes
を返す実装でもかまわない。
このように CompletableFuture
を用いると、小さな処理を組み合わせて並列処理を構築できる。しかも、ラムダ式を使うので、処理内容を定義するために特定のインターフェースを実装するなどの手間が発生しないため、構築する手間も少なくできる。
まとめ
ここまで、ManagedExecutorService
による並行処理の例を紹介した。
次回はManagedScheduledExecutorService
について解説する。
[前多 賢太郎]