エンタープライズギークス (Enterprise Geeks)

企業システムの企画・開発に携わる技術者集団のブログです。開発言語やフレームワークなどアプリケーション開発に関する各種情報を発信しています。ウルシステムズのエンジニア有志が運営しています。

Akkaで始める並行処理(2) - アクターの基本 (Scala編)

前回は、Akka と アクターモデルの概要について説明した。 今回は、2つのアクター間でメッセージを交換する基本的なプログラムを紹介する。

今回のサンプルプログラムでは、Scala を用いる。 この連載では主に Scala によるプログラムを扱うが、次回は Java で同等のプログラムを作成したものを掲載する予定だ。

全てのソースコードは、githubリポジトリにある。

全体の処理の流れ

今回のサンプルでは、親と子の2つのアクターを作成し、以下のように動作させることを目的としている。

  1. 親アクターが子アクターにメッセージとして整数値を送信する。
  2. 子アクターは親アクターから整数値を受け取ると、その値の2乗した値を親に送り返す。

f:id:enterprisegeeks:20161024142051p:plain

親アクター(MyApp)と子アクター(SubApp)は、それぞれが独立したタスクを実行するオブジェクトである。
親アクターが、クライアント(ApplicationMain)からRequestメッセージを受け付けると、子アクターを生成して処理を行う。親アクターと子アクターはRequestResponseのメッセージを介して通信を行う。このサンプルでは、クライアントから親アクター、親アクターから子アクターへのメッセージはどちらも同じRequestオブジェクトを使っている。

クラス構造

今回のサンプルの全体のイメージをクラス図で示す。 なお、Akka のクラスやメソッドなどで、サンプルで解説しない部分については割愛してある。

f:id:enterprisegeeks:20161025123005p:plain

子アクターの定義

まずは、子アクターの実装から見ていこう。

object SubActor{
  // メッセージ用のオブジェクトを定義
  case class Request(num:Int)
  case class Response(answer:Int)
}
// アクター trait を継承して、アクターを定義する
class SubActor extends Actor{
  // receive メソッドを実装する。
  def receive = {
    case Request(n) =>
      // 検証のため、時間の出力とアクターを実行しているスレッドを待機する。
      println("start task at" + new Date() )
      Thread.sleep(1000L)
      println("finish task at" + new Date())
      // sender は、メッセージを送信したアクター(親アクター)を指す。
      sender() ! Response(n * n);
  }
}

メッセージオブジェクトの定義

ここでは、アクター間でやり取りするメッセージオブジェクト用に、Request, Responseという2つのケースクラスを定義して、SubActorのコンパニオンオブジェクトのフィールドに持たせた。2つのケースクラスはそれぞれ整数値のフィールドを1つ持っている。

Actorトレイトの継承

class SubActor extends Actor が 子アクタークラスの定義である。アクタークラスを定義するには、Actorトレイトを継承すればよい。

receiveメソッドの定義

メッセージを受信した際の処理は、receiveメソッドに定義する。ここでは、親アクターからRequestメッセージを受け取った場合に、Responseメッセージを返すという内容を定義している。

receiveメソッドは、Any型の引数をとり、Unit型の値を返す部分関数(PartialFunction)である。

receiveメソッドでは、パターンマッチを使って、receiveメソッドの引数、つまりメッセージオブジェクトがRequest型かどうかを判定し、該当する場合には、Request型のオブジェクトのフィールドnumの値を変数nに設定している。

caseの条件に一致しないメッセージを受信した場合は、Akka がデフォルトのメッセージ処理を適用し、それでも処理できないメッセージは未処理メッセージとして記録する。

caseの後の処理ブロックでは、検証のためコンソールに時刻を出力した後、アクターを実行しているスレッドを待機する処理を入れている。

なお、スレッドを停止する処理はアプリケーション全体のパフォーマンス劣化を招くため本来は書いてはいけない。時間のかかる処理をどうやって定義するかは次回以降に解説する。

その後、sender()でメッセージを送信したActorRef(ここでは親アクターの参照)を取得し、メッセージResponseオブジェクトを生成し、nの2乗値を設定して、それをメッセージとして親アクターに送り返している。

以上が子アクターの実装である。

親アクターの定義

続いて、親アクターの定義を見ていく。

class MyActor extends Actor{
  // アクターでは状態をフィールドとして定義できる
  var finished = 0

  def receive = {

    case req:Request =>
      // 子アクターを生成して、メッセージ送信
      val child = context.actorOf(Props[SubActor])
      child ! req
    case Response(n) =>
      // 子アクターからの応答を受領したら、コンソールに出力
      println("Answer is " + n)
      // 子アクターを停止
      sender() ! PoisonPill
      // receive 内ではフィールドの更新は自由
      finished = finished +1
      // 10回以上、応答を受けたら、停止するようにする。
      if (finished >= 10) {
        context.system.terminate()
      }
  }
}

Actorのフィールドは排他制御が不要

親アクターでは、状態を管理するためにfinishedフィールドを定義している。 アクターにはフィールドを定義可能で、receiveメソッド内であればフィールドの値を更新しても問題ない。

Java ではマルチスレッドで実行されるオブジェクトのフィールドを更新する際は、synchronizedなどの排他制御が必要となる。 一方、Akka の receiveメソッドは複数スレッドから同時にメッセージが送信されたとしても、同時に1回しか実行されないように Akka が調整するため、排他制御の考慮は不要である。

複数種類のメッセージの取り扱い

親アクターでは、caseを2つ記述している。複数種類のメッセージを取り扱うには、caseを複数並べるようにする。

1つめのcaseは、親アクターが Request型のメッセージを受け取った際の処理である。 子アクターの場合と異なり、メッセージオブジェクトのフィールドを見る必要が無いため、メッセージオブジェクト全体をreqという変数に格納している。 ここでは、リクエストを受けるたびに、子アクターを生成してメッセージを送信している。

子アクターの生成と終了

context.actorOf(Props[SubActor]) で、子アクターの生成を行っている。 contextActorトレイトが持っているプロパティで、アクターの情報を保持するフィールドである。

Akka では コンストラクタではなく、actorOfメソッドでアクターインスタンスの生成を行い、生成時の引数などは Props を使う約束になっている。 Propsはアクター生成時に設定する初期値などを保持するオブジェクトである。 今回は特に初期値などを設定していないが、アクターが何らかの初期値を必要としている場合は Props に初期値を設定して actorOf に渡す。 詳細については、次回以降の記事で解説する。

また、Scala では変数の型宣言が省略できるため気づきづらいが、actorOfの戻り値は、Actorではなく、ActorRefというアクターインスタンスへの参照を保持するオブジェクトになっている。 ActorRefを介した仕組みにすることで耐障害性や位置透過性などの考慮をしている。詳細については、次回以降の記事で解説する。

2つめのcaseは、子アクターからResponseメッセージを受け取った際の処理である。 処理結果をコンソールに出力した後、子アクターにPoisonPillという組み込みの停止用メッセージを送り、子アクターを停止している。

Akkaシステムの終了

その後、子アクターが処理を終えた回数をフィールドfinishedに設定し、10回以上処理が終わったら、アクター全体をcontext.system.terminate()で終了するようにしている。context.systemで保持するActorSystemは、Akka 全体の設定などを管理するオブジェクトである。

以上が親アクターの定義である。

Request-Reply パターン

Akka の処理は全て非同期で行われるため、!メソッドでメッセージを送信すると送信結果を待たずに、次の処理が行われる。 そこで、子アクターが送信結果を親アクターに送り返すことで、同期処理のように処理結果を受け取ることができる。 このような処理方式は、システム間連携に関するパターンを定義した、エンタープライズインテグレーションパターン(EIP) では、Request-Reply パターンと呼んでいる。

Akka アプリケーションの開始

最後に、Akka アプリケーションを開始するmainメソッドを紹介する。

object ApplicationMain {

  def main(args:Array[String]):Unit = {

    // 初期化処理
    val system = ActorSystem("MyActorSystem")
    val myActor = system.actorOf(Props[MyActor], "myActor")

    // 1から10までのリクエストを親アクターに送信する。
    (1 to 10).foreach(n => myActor ! Request(n))

    // system.terminate が呼ばれるまで待機する。
    Await.ready(system.whenTerminated, Duration(1, TimeUnit.MINUTES))
  }
}

Akka を使うには、ActorSystemを作成する必要がある。上記は最も簡単な作成方法である。 ActorSystemは Akka アプリケーションの動作環境であり、Akka 全体の設定を保持し、全てのアクターインスタンスを管理し、アクターを動作させるためのスレッドープールを保持する。 重量級のオブジェクトであるため通常はアプリケーションで1つだけ作成する。

ActorSystemから、アクターを生成してメッセージを送信し、処理を開始する。 ここでは親アクターを1つ生成して、1から10までのRequestを送っている。
その後、ActorSystemが終了するまで待機している。

実行結果

環境により、順序は異なる可能性があるが、実行すると以下のような結果が得られる。

start task atMon Oct 17 16:31:18 JST 2016
start task atMon Oct 17 16:31:18 JST 2016
start task atMon Oct 17 16:31:18 JST 2016
start task atMon Oct 17 16:31:18 JST 2016
start task atMon Oct 17 16:31:18 JST 2016
start task atMon Oct 17 16:31:18 JST 2016
start task atMon Oct 17 16:31:18 JST 2016
start task atMon Oct 17 16:31:18 JST 2016
start task atMon Oct 17 16:31:18 JST 2016
start task atMon Oct 17 16:31:18 JST 2016
finish task atMon Oct 17 16:31:19 JST 2016
finish task atMon Oct 17 16:31:19 JST 2016
Answer is 36
finish task atMon Oct 17 16:31:19 JST 2016
finish task atMon Oct 17 16:31:19 JST 2016
finish task atMon Oct 17 16:31:19 JST 2016
finish task atMon Oct 17 16:31:19 JST 2016
finish task atMon Oct 17 16:31:19 JST 2016
finish task atMon Oct 17 16:31:19 JST 2016
finish task atMon Oct 17 16:31:19 JST 2016
finish task atMon Oct 17 16:31:19 JST 2016
Answer is 100
Answer is 1
Answer is 64
Answer is 81
Answer is 4
Answer is 25
Answer is 9
Answer is 16
Answer is 49

子アクターが10件生成され、ほぼ同時にメッセージ処理が起動し、1秒後にその結果をほぼ同時に順不同で返している。 子アクターではスレッドを停止する処理が入っているにもかかわらず、同時に処理が行われていることから、各子アクターの処理が Akka のスレッドプールで並列に実行されていることがわかるだろう。

もし順序を固定にしたい場合は、子アクターの生成を一度だけ行い、親アクターのフィールドに設定することで実現できる。 これは1つのアクターに対しては、到達したメッセージの順序で処理を行うことが保証されているためだ。 ただし、その場合子アクターの処理は並列にならないため、処理時間がかかることに注意が必要となる。

まとめ

アクターモデルを使うことで、マルチスレッドプログラミングにありがちな、状態更新時のロックや、スレッド間の協調といった煩雑な処理を意識することなく、非同期処理を書くことができる。

次回は、このサンプルを Java で書くとどうなるかについて解説する。 次回以降は、Akka の機能をさらに掘り下げ、Akka の設定や、障害からの復帰、ルーター、テスト、クラスタリングなどを扱っていく。

[前多 賢太郎]