読者です 読者をやめる 読者になる 読者になる

エンタープライズギークス (Enterprise Geeks)

企業システムの企画・開発に携わる技術者集団のブログです。開発言語やフレームワークなどアプリケーション開発に関する各種情報を発信しています。ウルシステムズのエンジニア有志が運営しています。

JavaEE7をはじめよう(21) - WebSocketによるプッシュ通信とCDIのイベント通知

前回までは、WebSocket を使ってクライアントからのリクエストに対して応答を返す方法を中心に解説してきた。

WebSocket ではクライアントとサーバーで相互通信を行えるため、クライアントからのリクエストを契機にするだけでなく、サーバー側から任意のタイミングで、クライアントにプッシュ通信することも可能である。この仕組みは、時報のように定期的なタイミングで処理を行ったり、株価の更新のように何らかのイベント発生を契機として通知を行う処理に適用できる。

時刻通知アプリケーション

今回のサンプルは、前回の記事で紹介した WebSocket の チャットアプリケーション に含まれている。 内容としては、チャットアプリケーションを使用している全てのクライアントへ1分間隔で現在時刻を通知するというものだ。

上記アプリケーションのページを開いて放置しておくと、1分おきに現在時刻が通知される。(アプリケーションの起動に時間がかかる場合があるため、アクセス時にエラーとなる場合は、しばらく待ってから再度アクセスしていただきたい)。

クライアントのSessionをどうやって取得するか

サーバー側からメッセージをプッシュ通信するにあたっての課題は、クライアントのjavax.websocket.Sessionをどうやって取得するかである。

これまでの記事で解説したとおり、クライアントにメッセージを送信するためには、javax.websocket.Sessionインスタンスが必要だ。クライアントからの受信を契機に応答を返すのであれば、サーバーエンドポイントに@OnMessageを付与したメソッドを用意すればよかったのだが、サーバー側が主体となってプッシュ通信を行う場合にはこの方法は使えない。

サーバーからプッシュ通信を行う機能は、次に示すコードのように実装したくなるのが自然だろう。

@ServerEndpoint("/sample") {
    
    // クライアントからのメッセージ受信
    @OnMessage
    public void onMessage(String message, Session client){//...}

    // サーバーからのメッセージ送信
    public void serverPush(String message) {
        List<Session> allClients = // 何らかの手段で全クライアントを取得
        for (Session client : allClients){
            client.getBasicRemote().sendText(messagae);
        }
    }
}

しかし、WebSocket では、サーバー側のエンドポイントのインスタンスはクライアントごとに生成される仕組みになっており、WebSocket 内部で管理するクライアントのSessionを取得する API は提供されていない。

この問題は、次に説明する方法で対処できる。

クライアントのSession情報をstatic変数で管理する

クライアントの接続・切断を検知したタイミングで、クライアントのSessionインスタンスを、サーバーエンドポイントクラスのstatic変数として保持することで、必要に応じて取得できるようになる。

このとき、並行処理に対して安全に処理できる必要がある(詳細は こちらの記事 を参照のこと)。
具体的な方法としては、同期化を行うコレクションを使用するか、以下のコードのようにjava.util.concurrentで提供される並行コレクションを使うとよいだろう。

@ServerEndpoint("/sample")
public class WebSocketSampleEndPoint {
    
    /** クライアントからの全接続を保持するセット */
    private final static Set<Session> sessions = 
         Collections.newSetFromMap(new ConcurrentHashMap<Session, Boolean>());
    
    /** sessionsへの追加を行う。 */
    @OnOpen
    public void onOpen(Session session) {
        System.out.println(session.getId() + " was connect.");
        sessions.add(session);
    }

    /** sessionsからの削除を行う。 */
    @OnClose
    public void onClose(Session session) {
        System.out.println(session.getId() + " was disconnnet.");
        sessions.remove(session);
    }

こうすることで、プッシュ通信を行うメソッドはstaticメソッドとして実装できる。

    // WebSocketSampleEndPoint に追加
    public static void broadCast(Message message) {
        for (Session session : sessions) {
            session.getAsyncRemote().sendObject(message);
        }
    }

そして、別のプログラムからは以下のように呼び出せる。

    public void someMethod() {
        //do something..
        Message message = new Message("server", "hello!");
        WebSocketSampleEndPoint.broadCast(message);
    }

但し、この方式は通知先が固定される上に、staticメソッドを使っているため単体テストが難しくなるデメリットもある。このため、次に紹介する CDI イベントを導入して柔軟な通知を行うように改善する。

CDI イベントによるプッシュ通信の実行

サーバーからのプッシュ通信の通知先が、WebSocket のクライアントだけに限らず、複数のサーバーエンドポイントや WebSocket 以外の別のサービスなど多数ある場合にはどうすればよいだろうか。対象の通知先ごとに通知処理のロジックを記述すれば対応できるが、コードが複雑になる上に、通知先が増減した場合にはコードを変更しなければならなくなる。

通知を行う側は通知処理だけに専念し、通知先を動的に追加・変更できると、保守性や拡張性が高くなる。

一般的に、このような設計は Observer パターンとして知られている。CDI ではイベントを使用することで簡単に Observer パターンによるイベント送受信を行うことができる。

前述のプッシュ通信のコードを CDI イベントを使用した実装に書き換えてみよう。

イベントを送信する側のコード

まずは、イベントを送信する側である。

@Dependent
public class SomeClass {

    // イベントオブジェクトを注入
    @Inject
    private Event<Message> event;
    
    public void someMethodByEvent() {
        //do something..
        Message message = new Message("server", "hello!");
        // どこに送るかは、CDI イベント経由で解決する。
        event.fire(message);
    }
}

送信側では、javax.enterprise.event.Event<T>型のフィールドをインジェクションする。型パラメータのTには、イベントの送受信でやり取りするデータの型を指定する。ここでは、自作のMessage型を指定している。

イベントを送信するには、たんにEvent#fireメソッドを呼び出せばよい。引数にはEvent<T>の型パラメータTと同じ型のオブジェクトを渡す。そうすることで、CDIコンテナが、そのイベントに登録されたすべての通知先に対して通知を行う。

前述のstaticメソッドの例と比較すると、通知する相手先を記述していないことがわかるだろう。イベントを送信する側は、イベントを送信するだけで、誰に送るかは一切関知していない。このため、受信するクラスが増えたとしても、送信側は修正する必要がない。

イベントを受信する側のコード

では、イベントの受信側のコードを見てみよう。

    /**
     * CDI イベントを受信し、接続中のセッション全てにメッセージを送信する。
     *
     * サーバープッシュメソッド。
     * @param message メッセージ CDIイベントから実行するため、
     *                           引数にObservesアノテーションを付与
     */
    public void broadCast(@Observes Message message) {
        // sessionsはstatic変数
        for (Session session : sessions) {
            session.getAsyncRemote().sendObject(message);
        }
    }

受信側では、引数message@Observesアノテーションを指定している。これにより、Messageオブジェクトに対するイベントが発火した場合、このbroadCastメソッドが呼び出されるようになる。具体的には、送信側でEvent#fireを呼び出すと、CDI のイベント処理によって@Observesを付与したメソッドが実行される。

@Observesを付与する引数の型は、 送信側のEvent<T>Tと同じ型である必要がある。これは、CDI によるイベント送受信では、イベントの型によって受信用メソッドを特定するためである。そのため、このMessage型のイベントを他にも受信したいクラスが出てきた場合には、someMethod(@Observes Message message)のようなメソッドを作成すればよい。

カスタム限定子を使ったイベント処理

Producerメソッドの記事で紹介したQualifierアノテーションによるカスタム限定子を使うことで、型が同じでもイベントを種類ごとに分けることができる。

メッセージの一斉送信と個別クライアントへの送信を使い分ける例で説明する。

送信側は以下のようになる。ここでは、@BroadCast@One2Oneという2つのカスタム限定子を用意して、@InjectとあわせてEventを定義するフィールドに指定している。

@Dependent
public class Sender {

    @Inject @BroadCast
    private Event<Message> broadCastEvent;
    
    @Inject @One2One
    private Event<Message> one2oneEvent;

    /** 一斉送信用メッセージの送信 */
    public void sendBroadCast(String message) {
        Message message = new Message("server", message);
        broadCastEvent.fire(message);
    }
    /** 特定のクライアントへの送信 */
    public void sendBroadCast(String message, String clientId) {
        Message message = new Message("server", message);
        // 送信相手のIDのプロパティを追加
        message.setSendTo(clinentId); 
        one2oneEvent.fire(message);
    }

}

受信側では、Message型の引数に対して、@Observes アノテーションとカスタム限定子を指定する。こうすることで、送信側のEventに付与した カスタム限定子と同じ限定子を持つメソッドが呼び出されるようになる。また、@Observesのみを指定し、カスタム限定子を付与しないメソッドはどちらのイベントからも呼び出される。

@Dependent
public class Receiver {
    /** 一斉送信イベントに対応するメソッド */
    public void broadCast(@Observes @BroadCast Message message) {
        // sessionsはstatic変数
        for (Session session : sessions) {
            session.getAsyncRemote().sendObject(message);
        }
    }
    /** 特定クライアント送信イベントに対応するメソッド */
    public void broadCast(@Observes @One2One Message message) {
        for (Session session : sessions) {
            // 送信相手のIDを検索して、送信
            if(session.getId().equals(message.getSendTo())) {
                session.getBasicRemote().sendObject(message);
            }
        }
    }
    /** メッセージの記録 */
    public void logMessage(@Observes Message message) {
        // 限定子なしの@Observesは、
        //@BroadCast, @One2One両方のメッセージを受け取る。
        logger.trace(message.getMessage());
    }

}

このように、CDI のイベントを使うことで、受信側、送信側双方とも相手を固定せずに、イベント経由で情報のやり取りができるようになる。

定期的に通知(イベント発行)を行う処理の実装

次に、サーバー内で定期的にプッシュ通信を行う処理を見ていこう。

今回のサンプルアプリケーションでは、1分おきに現在時刻を通知する必要がある。これを実現するためには、サーバー起動時に一度だけ、通知処理を定期的に行うことを登録する必要がある。現状、Java EE で初期処理を組み込むには サーブレットinitメソッドや、EJB@StartUpアノテーションなどがあるが、今回は後者を選択した。
(余談だが、当初は CDI@ApplicationScopedを使用して実現しようとしたが、CDI の Bean はその Bean の@Injectによる要求があって初めて作成されるため、実現できなかった。)

次に示すように、クラス定義に@Startupを設定することで、@PostConstructを付与したメソッド(後述するstartUpメソッド)は、サーバー起動時に一度だけ実行される。

/**
 * 通知送信EJB シングルトン
 * 
 * サーバー起動時のみスケジューラ登録を行うため、
 * シングルトンかつスタートアップ指定。
 */
@Singleton // インスタンスは1つだけ
@Startup   // 開始時に実行する
public class NotifySender {

このクラスのフィールド定義は次の通りである。定期的に行う処理の定義には Java EE 7 から追加されたサーバー側の並行処理を行う仕組みである Concurrency Utilities for Java EE を用いた。

public class NotifySender {

    /** イベント通知用のインターフェース */
    @Inject
    private Event<Message> event;
    
    @Inject
    private Logger logger;
    
    /** スケジューラーに登録されたタスク */
    private ScheduledFuture<?> task;
    
    /** Java EE7 から追加されたConcurrency Utilitiesのスレッドプール */
    @Resource(lookup 
        = "concurrent/__defaultManagedScheduledExecutorService")
    private ManagedScheduledExecutorService scheduler;

Concurrency Utilities for Java EE は次回以降に改めて詳細を解説する予定である(→記事へ)。現状では以下の内容だけ押さえていただきたい。

  • Concurrency Utilities for Java EE は主にスレッドプールを提供する。
  • スレッドプールはサーバーのリソースとして登録する必要がある。
  • リソースなので、@Resourceでインジェクションする。
  • 今回は、定期的に行う処理を登録可能なManagedScheduledExecutorServiceをスレッドプールとして取得している。

開始時に一度だけ実行するstartUpメソッドのコードは次の通りである。

    @PostConstruct
    public void startUp() {
        // 1分に1回通知を行う。
        // 通知に、event.fireを使うことで、
        //どのオブジェクトがイベントを受け取るかは考慮不要になる。
        task = scheduler.scheduleAtFixedRate(() -> {
            // 現在時刻を取得
            String now = LocalDateTime.now().format(
                 DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss"));

            logger.info(() -> "NotifySender executed." + now);

            // メッセージを作成して、イベントの通知
            event.fire(new Message("TimerBot","notified at " + now));
        }, // ここまでラムダ式
        0, 1, TimeUnit.MINUTES);
    }
    // 終了時の処理は割愛
}

定期的に行う処理はManagedScheduledExecutorService#scheduleAtFixedRateメソッドで登録できる。

第1引数には、Runnableなどの並列処理を登録する。ここでは、現在時刻を取得してイベント送信を行う処理を、 Java8 から追加されたラムダ式で記述している。

第2引数には、初回の処理を遅らせる量を指定する(ここでは「0」、すなわち遅延なしとした)。

第3引数には、次回以降の処理をどの間隔で行うかを指定する(ここでは「1」を指定した)。

第4引数には、第2および第3引数の量の単位を指定する(ここでは「分」を指定したため、第3引数と合わせて「1分間隔」を指定したことになる)。

なお、EJB を使用するならば@Scheduleアノテーションをメソッドに指定することで、そのメソッドを定期的に実行できるため、Concurrency Utilities を使う必要がない。

しかしここでは、Concurrency Utilities の実装例を示したかったことに加えて、@Scheduleアノテーションを用いる EJBEJB lite でないことから、War に含めることができず、アプリケーション全体を Ear とする必要があったため、EJB を使用しなかった。 (EJB lite とは、 War に含めることが可能な、EJB の軽量仕様を指す。)

まとめ

今回は、WebSocket でサーバーからのプッシュ通信を行うために必要な技術を紹介した。

WebSocket API の実装サンプルを参照していると、static変数でSessionの一覧を保持している例が多く、最初はどうしてなのか理解できなかったが、サーバーからプッシュ通信を行うコードを書いてみて、ようやく理解できた。
今回は、全クライアントにプッシュ通信を行う例を紹介したが、特定の条件に該当するクライアントにのみプッシュ通信を行うことも応用すれば可能だろう。
javax.websokcet.SessionにはMap型で任意の情報を保持できるgetUserPropertiesメソッドがあるため、そこに何らかの情報を持たせれば、送信対象のクライアントを絞り込むことができるはずだ。)

また、WebSocket によるプッシュ通信を題材に、CDI のイベント通知機能についても解説した。 イベント通知は、DI とは異なる機能ではあるが、DI 同様にオブジェクト間の依存を疎結合に保つため、色々と応用できるだろう。

参考資料

今回紹介したソースコードの全体は以下にある。

今回の実装を行うにあたり、以下の記事を参考にさせて頂いた。

[前多 賢太郎]