Akkaで始める並行処理(2) - アクターの基本 (Scala編)
前回は、Akka と アクターモデルの概要について説明した。 今回は、2つのアクター間でメッセージを交換する基本的なプログラムを紹介する。
今回のサンプルプログラムでは、Scala を用いる。 この連載では主に Scala によるプログラムを扱うが、次回は Java で同等のプログラムを作成したものを掲載する予定だ。
全てのソースコードは、githubリポジトリにある。
全体の処理の流れ
今回のサンプルでは、親と子の2つのアクターを作成し、以下のように動作させることを目的としている。
- 親アクターが子アクターにメッセージとして整数値を送信する。
- 子アクターは親アクターから整数値を受け取ると、その値の2乗した値を親に送り返す。
親アクター(MyApp
)と子アクター(SubApp
)は、それぞれが独立したタスクを実行するオブジェクトである。
親アクターが、クライアント(ApplicationMain
)からRequest
メッセージを受け付けると、子アクターを生成して処理を行う。親アクターと子アクターはRequest
とResponse
のメッセージを介して通信を行う。このサンプルでは、クライアントから親アクター、親アクターから子アクターへのメッセージはどちらも同じRequest
オブジェクトを使っている。
クラス構造
今回のサンプルの全体のイメージをクラス図で示す。 なお、Akka のクラスやメソッドなどで、サンプルで解説しない部分については割愛してある。
子アクターの定義
まずは、子アクターの実装から見ていこう。
object SubActor{ // メッセージ用のオブジェクトを定義 case class Request(num:Int) case class Response(answer:Int) } // アクター trait を継承して、アクターを定義する class SubActor extends Actor{ // receive メソッドを実装する。 def receive = { case Request(n) => // 検証のため、時間の出力とアクターを実行しているスレッドを待機する。 println("start task at" + new Date() ) Thread.sleep(1000L) println("finish task at" + new Date()) // sender は、メッセージを送信したアクター(親アクター)を指す。 sender() ! Response(n * n); } }
メッセージオブジェクトの定義
ここでは、アクター間でやり取りするメッセージオブジェクト用に、Request
, Response
という2つのケースクラスを定義して、SubActor
のコンパニオンオブジェクトのフィールドに持たせた。2つのケースクラスはそれぞれ整数値のフィールドを1つ持っている。
Actor
トレイトの継承
class SubActor extends Actor
が 子アクタークラスの定義である。アクタークラスを定義するには、Actor
トレイトを継承すればよい。
receive
メソッドの定義
メッセージを受信した際の処理は、receive
メソッドに定義する。ここでは、親アクターからRequest
メッセージを受け取った場合に、Response
メッセージを返すという内容を定義している。
receive
メソッドは、Any
型の引数をとり、Unit
型の値を返す部分関数(PartialFunction
)である。
receive
メソッドでは、パターンマッチを使って、receive
メソッドの引数、つまりメッセージオブジェクトがRequest
型かどうかを判定し、該当する場合には、Request
型のオブジェクトのフィールドnum
の値を変数n
に設定している。
case
の条件に一致しないメッセージを受信した場合は、Akka がデフォルトのメッセージ処理を適用し、それでも処理できないメッセージは未処理メッセージとして記録する。
case
の後の処理ブロックでは、検証のためコンソールに時刻を出力した後、アクターを実行しているスレッドを待機する処理を入れている。
なお、スレッドを停止する処理はアプリケーション全体のパフォーマンス劣化を招くため本来は書いてはいけない。時間のかかる処理をどうやって定義するかは次回以降に解説する。
その後、sender()
でメッセージを送信したActorRef
(ここでは親アクターの参照)を取得し、メッセージResponse
オブジェクトを生成し、n
の2乗値を設定して、それをメッセージとして親アクターに送り返している。
以上が子アクターの実装である。
親アクターの定義
続いて、親アクターの定義を見ていく。
class MyActor extends Actor{ // アクターでは状態をフィールドとして定義できる var finished = 0 def receive = { case req:Request => // 子アクターを生成して、メッセージ送信 val child = context.actorOf(Props[SubActor]) child ! req case Response(n) => // 子アクターからの応答を受領したら、コンソールに出力 println("Answer is " + n) // 子アクターを停止 sender() ! PoisonPill // receive 内ではフィールドの更新は自由 finished = finished +1 // 10回以上、応答を受けたら、停止するようにする。 if (finished >= 10) { context.system.terminate() } } }
Actor
のフィールドは排他制御が不要
親アクターでは、状態を管理するためにfinished
フィールドを定義している。
アクターにはフィールドを定義可能で、receive
メソッド内であればフィールドの値を更新しても問題ない。
Java ではマルチスレッドで実行されるオブジェクトのフィールドを更新する際は、synchronized
などの排他制御が必要となる。
一方、Akka の receive
メソッドは複数スレッドから同時にメッセージが送信されたとしても、同時に1回しか実行されないように Akka が調整するため、排他制御の考慮は不要である。
複数種類のメッセージの取り扱い
親アクターでは、case
を2つ記述している。複数種類のメッセージを取り扱うには、case
を複数並べるようにする。
1つめのcase
は、親アクターが Request
型のメッセージを受け取った際の処理である。
子アクターの場合と異なり、メッセージオブジェクトのフィールドを見る必要が無いため、メッセージオブジェクト全体をreq
という変数に格納している。
ここでは、リクエストを受けるたびに、子アクターを生成してメッセージを送信している。
子アクターの生成と終了
context.actorOf(Props[SubActor])
で、子アクターの生成を行っている。
context
はActor
トレイトが持っているプロパティで、アクターの情報を保持するフィールドである。
Akka では コンストラクタではなく、actorOf
メソッドでアクターインスタンスの生成を行い、生成時の引数などは Props
を使う約束になっている。 Props
はアクター生成時に設定する初期値などを保持するオブジェクトである。
今回は特に初期値などを設定していないが、アクターが何らかの初期値を必要としている場合は Props
に初期値を設定して actorOf
に渡す。
詳細については、次回以降の記事で解説する。
また、Scala では変数の型宣言が省略できるため気づきづらいが、actorOf
の戻り値は、Actor
ではなく、ActorRef
というアクターインスタンスへの参照を保持するオブジェクトになっている。
ActorRef
を介した仕組みにすることで耐障害性や位置透過性などの考慮をしている。詳細については、次回以降の記事で解説する。
2つめのcase
は、子アクターからResponse
メッセージを受け取った際の処理である。
処理結果をコンソールに出力した後、子アクターにPoisonPill
という組み込みの停止用メッセージを送り、子アクターを停止している。
Akkaシステムの終了
その後、子アクターが処理を終えた回数をフィールドfinished
に設定し、10回以上処理が終わったら、アクター全体をcontext.system.terminate()
で終了するようにしている。context.system
で保持するActorSystem
は、Akka 全体の設定などを管理するオブジェクトである。
以上が親アクターの定義である。
Request-Reply パターン
Akka の処理は全て非同期で行われるため、!
メソッドでメッセージを送信すると送信結果を待たずに、次の処理が行われる。
そこで、子アクターが送信結果を親アクターに送り返すことで、同期処理のように処理結果を受け取ることができる。
このような処理方式は、システム間連携に関するパターンを定義した、エンタープライズインテグレーションパターン(EIP) では、Request-Reply パターンと呼んでいる。
Akka アプリケーションの開始
最後に、Akka アプリケーションを開始するmain
メソッドを紹介する。
object ApplicationMain { def main(args:Array[String]):Unit = { // 初期化処理 val system = ActorSystem("MyActorSystem") val myActor = system.actorOf(Props[MyActor], "myActor") // 1から10までのリクエストを親アクターに送信する。 (1 to 10).foreach(n => myActor ! Request(n)) // system.terminate が呼ばれるまで待機する。 Await.ready(system.whenTerminated, Duration(1, TimeUnit.MINUTES)) } }
Akka を使うには、ActorSystem
を作成する必要がある。上記は最も簡単な作成方法である。
ActorSystem
は Akka アプリケーションの動作環境であり、Akka 全体の設定を保持し、全てのアクターインスタンスを管理し、アクターを動作させるためのスレッドープールを保持する。
重量級のオブジェクトであるため通常はアプリケーションで1つだけ作成する。
ActorSystem
から、アクターを生成してメッセージを送信し、処理を開始する。
ここでは親アクターを1つ生成して、1から10までのRequest
を送っている。
その後、ActorSystem
が終了するまで待機している。
実行結果
環境により、順序は異なる可能性があるが、実行すると以下のような結果が得られる。
start task atMon Oct 17 16:31:18 JST 2016 start task atMon Oct 17 16:31:18 JST 2016 start task atMon Oct 17 16:31:18 JST 2016 start task atMon Oct 17 16:31:18 JST 2016 start task atMon Oct 17 16:31:18 JST 2016 start task atMon Oct 17 16:31:18 JST 2016 start task atMon Oct 17 16:31:18 JST 2016 start task atMon Oct 17 16:31:18 JST 2016 start task atMon Oct 17 16:31:18 JST 2016 start task atMon Oct 17 16:31:18 JST 2016 finish task atMon Oct 17 16:31:19 JST 2016 finish task atMon Oct 17 16:31:19 JST 2016 Answer is 36 finish task atMon Oct 17 16:31:19 JST 2016 finish task atMon Oct 17 16:31:19 JST 2016 finish task atMon Oct 17 16:31:19 JST 2016 finish task atMon Oct 17 16:31:19 JST 2016 finish task atMon Oct 17 16:31:19 JST 2016 finish task atMon Oct 17 16:31:19 JST 2016 finish task atMon Oct 17 16:31:19 JST 2016 finish task atMon Oct 17 16:31:19 JST 2016 Answer is 100 Answer is 1 Answer is 64 Answer is 81 Answer is 4 Answer is 25 Answer is 9 Answer is 16 Answer is 49
子アクターが10件生成され、ほぼ同時にメッセージ処理が起動し、1秒後にその結果をほぼ同時に順不同で返している。 子アクターではスレッドを停止する処理が入っているにもかかわらず、同時に処理が行われていることから、各子アクターの処理が Akka のスレッドプールで並列に実行されていることがわかるだろう。
もし順序を固定にしたい場合は、子アクターの生成を一度だけ行い、親アクターのフィールドに設定することで実現できる。 これは1つのアクターに対しては、到達したメッセージの順序で処理を行うことが保証されているためだ。 ただし、その場合子アクターの処理は並列にならないため、処理時間がかかることに注意が必要となる。
まとめ
アクターモデルを使うことで、マルチスレッドプログラミングにありがちな、状態更新時のロックや、スレッド間の協調といった煩雑な処理を意識することなく、非同期処理を書くことができる。
次回は、このサンプルを Java で書くとどうなるかについて解説する。 次回以降は、Akka の機能をさらに掘り下げ、Akka の設定や、障害からの復帰、ルーター、テスト、クラスタリングなどを扱っていく。
[前多 賢太郎]