読者です 読者をやめる 読者になる 読者になる

エンタープライズギークス (Enterprise Geeks)

企業システムの企画・開発に携わる技術者集団のブログです。開発言語やフレームワークなどアプリケーション開発に関する各種情報を発信しています。ウルシステムズのエンジニア有志が運営しています。

Akkaで始める並行処理(3) - アクターの基本 (Java編)

Akka Java アクターモデル

前回の記事では、Scala による Akka の基本的なサンプルプログラムを紹介した。

Akka は ScalaJava それぞれに対応した API を提供している。 今回は、前回 Scala で作成したサンプルプログラムの Java 版を紹介する。

サンプルプログラムの全体構成を以下に再掲するが、詳細に関しては前回の記事を参照していただきたい。

f:id:enterprisegeeks:20161024142051p:plain

今回のサンプルプログラムも、github で公開している。

子アクターの生成

Java 版では、アクターは、抽象クラスのUntypedActorを継承して定義する。

まず、子アクターの定義のうち、staticメンバーのメッセージオブジェクトの定義について示す。

// UntypedActor を継承
public class SubActor extends UntypedActor {
    // イミュータブルオブジェクトとしてメッセージクラスを定義
    public static class Response {
        private final int answer;

        public Response(int answer) {
            this.answer = answer;
        }

        public int getAnswer() {
            return answer;
        }
    }

    public static class Request {
        private final int num;

        public Request(int num) {
            this.num = num;
        }

        public int getNum() {
            return num;
        }
    }

メッセージオブジェクトは不変(イミュータブル) にすることが推奨されている。 その理由は、可変オブジェクトを他のアクターに渡して共有すると、他のアクターから状態の変更が可能になってしまうためだ。 Java で不変オブジェクトを作るには、finalフィールドと、引数を取るコンストラクタを定義すればよい。

続いて、メッセージ受信時の処理を見ていく。

メッセージ受信処理は、onReceiveメソッドをオーバーライドして定義する。

public class SubActor extends UntypedActor {
    @Override
    public void onReceive(Object message) throws Exception {
        // instanceof によるメッセージ判定
        if (message instanceof SubActor.Request) {
            System.out.println("Start sub at " + new Date());

            Thread.currentThread().sleep(1000L);

            int num = ((SubActor.Request)message).getNum();
            System.out.println("Finish sub at " + new Date());
            // 送信元へ、メッセージ返信。
            getSender().tell(new Response(num * num), getSelf());
        } else {
            // 未処理メッセージの処理を明示する
            unhandled(message);
        }
    }
}

メッセージの判定

onReceiveは任意の型のメッセージを受け取るため、引数はObject型となる。 Scala と違って、Javaにはパターンマッチに該当する機能が無いため、ここではinstanceof後にダウンキャストしてメッセージの型を判定し、アクセッサを呼び出してフィールドを取得している。 子アクターはRequestメッセージだけを受け付けるため、ここでは受け取ったメッセージがSubActor.Request型かどうかを判定している。

また、メッセージオブジェクトが想定しているものでなかった場合は、最後の else 節で、unhandledメソッドを呼び出し、 Akka のデフォルトのメッセージ処理を呼んでいる。

他のアクターへのメッセージ送信

子アクターは呼び出し元の親アクターに、計算結果メッセージを返信する。

そのための処理が、以下のコードである。

getSender().tell(new Response(num * num), getSelf())

これは Scala 版では、次のようになっていた。

sender() ! Response(num * num)

Java では記号の一部はメソッド名に使えないので、!tellになったのはわかるとして、tellの第2引数のgetSelf()Scala 版にはなかったものだ。(Scala版でgetSelf()が不要な理由は、この記事の最後で解説する。)

getSelf()は自分自身のアクター参照を取得するメソッドである。 そして、tell の第2引数はメッセージの送信者を示している。

つまり、Response メッセージを送信するのは、自分自身(子アクター)であることを、このメッセージの受信者(今回は親アクター)に伝えている。

実は、アクターのメソッド内で、getSender()で取得できるアクター参照は、tellメソッドの第2引数に設定されたアクター参照である。 後で紹介する親アクターの定義でも、子アクターにメッセージを送信する際に、tellメソッドに親アクター自身の参照を与えている。 そのため、上記コードのgetSender()で親アクターが取得できている。 tellメソッドの引数の設定により、メッセージ送信者を自由に設定できるようになっている。

親アクターの定義

続いて、親アクターの定義を見ていく。

public class MyActor extends UntypedActor {

    private int finished = 0;

    @Override
    public void onReceive(Object message) throws Exception {
        // instanceof によるメッセージ判定
        if (message instanceof SubActor.Request) {
            // 子アクターの生成と参照取得
            ActorRef subActor = getContext().actorOf(Props.create(SubActor.class));
            // メッセージ送信
            subActor.tell(message, getSelf());
        } else if (message instanceof SubActor.Response) {
            SubActor.Response res = (SubActor.Response) message;

            System.out.println("Answer is " + res.getAnswer());
            // 子アクターの停止
            getSender().tell(PoisonPill.getInstance(), getSelf());
            
            // 状態の更新と状態の条件に応じた、Akka の終了
            finished++;
            if (finished >= 10) {
                getContext().system().terminate();
            }

        } else {
            unhandled(message);
        }
    }
}

状態用に定義したフィールドに対して、synchronizedなどの同期化が不要なのは、Scala と同様である。

子アクターの生成は、ActorContext を取得して actorOf メソッドにアクターインスタンス生成用のパラメータを Props で指定することにより行う。 やはり Scala と同様に、コンストラクタで生成を行ってはいけない。この詳細については今後の記事で解説する。

getContext().actorOf(Props.create(SubActor.class))

その他、instanceofによるメッセージ判定や、unhandledの呼び出し、tellの呼び出し方法は、先ほど示した子アクターのコードと同様だ。

アプリケーションの起動

最後に Akka アプリケーションを起動するコードを紹介する。ActorSystemを作成し、親アクターを生成して、メッセージを送信していく。

public class ApplicationMain {

    public static void main(String[] args) 
            throws  TimeoutException,InterruptedException{
        
        ActorSystem system = ActorSystem.create("MyActorSystem");
        ActorRef myActor = system.actorOf(Props.create(MyActor.class), "myActor");

        for(int i = 0; i < 10; i++) {
            myActor.tell(new SubActor.Request(i), null);
        }
        Await.ready(system.whenTerminated(), Duration.apply(1, TimeUnit.MINUTES));
    }
}

ほぼ、Scala 版と同様であるが、 親アクターに対するtellメソッドの第2引数にはnullを指定している。 これは、Mainクラスは、Akka アプリケーションの外にあるため、送信者のアクターを指定できないためだ。

Scala版とJava版の違いについて

Scala版とJava版の差異についてまとめる。

メッセージオブジェクトの定義

メッセージオブジェクトは、不変オブジェクトとして定義する。

Java版では、finalフィールドと getter、引数有りのコンストラクタが必要だ。

一方 Scala では、case class Request(num:Int)のように、ケースクラスの定義だけで上記とほぼ同等の定義が可能だ。

メッセージ判定処理の方法

Java版ではinstanceofによる型判定を行ったが、Scala 版ではパターンマッチングによる判定が使える。

unhandledの呼び出し

Java版では、onReceiveメソッドで必ず unhandledメソッドを呼ぶ必要がある。 ここは間違いを起こしやすいので注意が必要だ。

tell メソッドの送信者の明示

Java 版では、tellメソッドで、送信者の指定を明示する必要がある。ここも、Scala版と大きく異なる点なので注意が必要だ。

少々余談となるが、Scala版ではなぜ、これが不要になるのかを解説する。

実は、Scala 版の ! メソッドの本当の定義は以下のようになっている。

def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit

Scala ではメソッド引数の括弧を複数個定義できる。2個目の括弧には、Javatellメソッド同様に、アクター参照を受け取るようになっている。 どうして、Scala 版では第2引数を指定しなくてよいかというと、引数のimplicitキーワードが関係している。 これは implicit parameter と呼ばれ、引数の値をスコープ内で implicit 宣言された値から自動的に補完する機能である。

Scala では Actorトレイト内に、以下のようにselfが implicit 宣言されている。

implicit final val self = context.self

このため、!メソッドにsenderを明示的に指定しない場合は、selfが自動的に渡るようになっている。 また、 Actor.noSender はメソッドの引数がない場合のデフォルト値で null に相当する。 よって、 mainメソッドなど、implicit宣言がない場所で呼んだ場合は、この値がデフォルト値で設定されることになる。

アクターへの機能追加の方法

アクターに機能を追加する方法も、言語仕様の違いのため ScalaJava では異なる。

例えば、Scala 版では アクターにログ機能を追加するには、以下のようにActorLogingトレイトの多重継承を行う。

class SomeActor extends Actor with ActorLogging{

  def receive = {
    case _ => log.info("messeage")
  }
}

一方、Java では、トレイトの多重継承の代わりに、委譲で機能を追加する。

public class SubActor extends UntypedActor {

    private LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    @Override
    public void onReceive(Object message) throws Exception {
        log.info("message");
    }
}

まとめ

Java 版の Akka のサンプルプログラムを紹介した。 Scala 版と同じAPIを提供しているため、機能の違いは無いが、JavaScalaの言語仕様の違いによって記述の仕方はだいぶ異なる。 開発メンバーの言語の習熟度や参考情報の有無などを考慮して、どちらで開発するかを選択するべきだろう。

ちなみに、Akka に関する書籍やWeb上の資料は Scalaの方が多い。 その理由としては、Scalaの方がパターンマッチングなどの機能を使って簡潔にプログラムを書けるからだと思われる。

この連載でも今後は主に Scala でサンプルコードを紹介していく。

次回は、Props と アクター階層について解説を行う。

[前多 賢太郎]