エンタープライズギークス (Enterprise Geeks)

企業システムの企画・開発に携わる技術者集団のブログです。開発言語やフレームワークなどアプリケーション開発に関する各種情報を発信しています。ウルシステムズのエンジニア有志が運営しています。

JavaEE7をはじめよう(28) - Concurrency Utilities for Java EE (後編)

前回の記事では Concurrency Utilities for Java EEManagedExecutorServiceによる並行処理の例を紹介した。

今回はManagedScheduledExecutorServiceについて解説する。

ManagedScheduledExecutorServiceのサンプル

この仕組みは、以前の記事で紹介した1分間隔で CDI イベントを送信する例でも使用した。

例えば、 1分ごとに処理を行うサンプルは以下の通りである。

@Singleton
@Startup // アプリケーション起動時に実行する
public class NotifySender {
    /** イベント通知 */
    @Inject 
    private Event<Message> event;
    
    @Inject
    private Logger logger;
    
    /** スケジューラーに登録されたタスク。終了時に使用する */
    private ScheduledFuture<?> task;
    
    /** サーバーリソースのスケジューラ */
    @Resource(lookup 
       = "concurrent/__defaultManagedScheduledExecutorService")
    private ManagedScheduledExecutorService scheduler;
    
    @PostConstruct // 起動時に呼ばれる処理
    public void startUp() {
        // 1分に1回、イベントで通知を行う。
        task = scheduler.scheduleAtFixedRate(() -> {
            String now = LocalDateTime.now().format(
         DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss"));
            logger.info(() -> "NotifySender executed." + now);
            event.fire(new Message("TimerBot", "notified at " + now));
        }, 0, 1, TimeUnit.MINUTES);
    }
    
    /** 終了時の処理。スケジュールタスクを終了させる。 */
    @PreDestroy
    public void pufOff() {
        System.out.println("NotifySender task cancelling...");
        task.cancel(true);
    }
    
}

startUpメソッドには@PostConstructアノテーションを指定しているため、アプリケーション起動時に一度だけ呼び出される。

startUpメソッドの内部では、ManagedScheduledExecutorServicescheduleAtFixedRate を呼び出している。このメソッドに指定する引数は、以下の4つである。

  1. タスク(処理内容)
  2. 初回実行時までの間隔
  3. 次回実行までの間隔
  4. 時間の単位

ここではタスクをラムダ式で記述している。このラムダ式では、現在時刻を取得してメッセージオブジェクトに設定し、CDI のイベントを発火させている。
第2引数の初回実行時までの間隔は「0」、 第3引数の次回実行までの間隔は「1」をそれぞれ指定している。第4引数にTimeUnit.MINUTES(分)を指定しているため、初回実行時までの間隔は「0分」で、次回実行までの間隔は「1分」となる。

これにより、ラムダ式の内容が1分ごとに実行され続ける。

補足しておくと、イベント発火されたメッセージオブジェクトは、CDIのイベントにより、WebSocket の全クライアント送信メソッドに引き渡される。
よって、WebSocket で接続しているすべてのクライアントに、1分ごとに定期的にメッセージが届くようになる。
(WebSocket を使ったプッシュ通信の仕組みについては、以前の記事を参照のこと。)

ManagedScheduledExecutorServiceのメソッド

ManagedScheduledExecutorService には、以下のメソッドがある。

  • scheduleAtFixedRate - 指定間隔で周期的にタスクを実行する。
  • scheduleWithFixedDelay - タスクの終了後に指定した間隔の経過後再度タスクを実行する。
  • schedule(Callable commnad, long delay, TimeUnit unit) - 指定間隔の後1度だけタスクを実行する。
  • schedule(Callable commnad, Trigger trigger) - trigger で指定した内容に基づいてタスクを実行する。

ManagedScheduledExecutorServiceJava SE の Concurrency Utilities にあるScheduledExecutorServiceを継承しており、上記のうち最初の3つは ScheduledExecutorServiceで定義されている。

最後のTriggerインターフェースを使用するscheduleメソッドは、ManagedScheduledExecutorServiceで追加されたメソッドである。このメソッドの目的は、絶対時刻を扱うことである。そのほかのメソッドは、現在時刻を基準に実行間隔を制御しており、絶対時刻は指定できない。サーバーサイドで使用する場合は、絶対時刻を基準に処理を行うことがよくあるので、このメソッドが追加された。

使用するためには、インターフェースTriggerを実装したクラスを用意し、scheduleメソッドに渡す。

Trigger は以下の2つのメソッドを実装する。

  • getNextRunTime - 次回の処理を実行する時刻を決定する。
  • skipRun - 処理の実行をスキップするか判定する。一度スキップすると、次回以降処理は実行されない。前回の処理結果が大幅に遅延したり、異常終了したりしたときに、処理を続行させたくない場合に使用する。

例として、毎時0分ちょうどに処理を開始するTriggerの実装例を示す。

public class HourTrigger implements Trigger{
    
    /**
     * 次回処理の実行時間を返す
     * @param last 前回実行時の情報。初回はnull。
     * @param scheduledTime executorが設定した実行予定時刻。
     * @return 調整後の次回実行時刻
     */
    @Override
    public Date getNextRunTime(LastExecution last,
            Date scheduledTime) {
        
        if (last == null) {
            // 初回実行
            Calendar cal = Calendar.getInstance();
            cal.setTime(scheduledTime);
            // 時刻を調整し、次の0分0秒にあわせる
            cal.add(Calendar.HOUR, 1);
            cal.set(Calendar.MINUTE, 0);
            cal.set(Calendar.SECOND, 0);
            cal.set(Calendar.MILLISECOND, 0);
            return cal.getTime();
        }
        // 次回以降は前回実行開始時刻 + 1時間
        return new Date(last.getRunStart().getTime() + 3600 * 1000);
    }

    /**
     * タスクの実行をスキップするか判定する。
     * @param last 前回実行時の情報
     * @param scheduledRunTime 実行予定時刻(過去になる事もありえる)
     * @return true - 実行スキップ
     */
    @Override
    public boolean skipRun(LastExecution last, Date scheduledRunTime) {
        // 前回処理が長引き、
        // すでに実行予定時刻を超過している場合は処理しない。
        return System.currentTimeMillis() 
                  - scheduledRunTime.getTime() > 0;
    }
}

Concurrency Utilities for Java EE 以外の並行処理

ここまで、2回に分けて Concurrency Utilities for Java EE による並行処理を説明してきたが、最後に Java EE におけるその他の並行処理について説明しておこう。

Java EE には、Concurrency Utilities for Java EE 以外にも、次のような並行処理の仕組みがある。

以下に、非同期サーブレットの例を紹介する。

// asyncSupported=trueによって、非同期処理可能であることを宣言。
@WebServlet(urlPatterns = {"/asyncSample"}, asyncSupported = true)
public class AsyncSampleServlet extends HttpServlet {

    @Override
    protected void doGet(HttpServletRequest request,
            HttpServletResponse response)
            throws ServletException, IOException {

        response.setContentType("text/plain");
        response.getWriter().println("output by " 
                + Thread.currentThread().getName());
        response.getWriter().flush();
    
    // 非同期処理の開始
        // サーブレットが終了してもAsyncContextがレスポンスを保持
        AsyncContext ac = request.startAsync();
        ac.start(() -> {
            try {
                response.getWriter().println(
                        "process in async thread");
                Thread.sleep(3000);
                response.getWriter().println(
                        "output by "
                        + Thread.currentThread().getName());
                response.getWriter().close();
                ac.complete(); // 非同期処理の終了
            } catch (Exception e) {}
        });
    // 非同期処理の結果を待たず、サーブレットは終了
    }
}

上記のサンプルでは、AsyncContextを使用することで、AsyncContextが生成するスレッド内で時間のかかる処理を実行し、サーブレットの処理は早々に終了させている。

AsyncContext 内でレスポンスを使用しているので、クライアント(ブラウザ)側は非同期処理が終わるまで待たされるが、サーブレットの処理を行ったスレッドは解放されるため、サーブレットを処理するスレッドが長時間ブロックされることなく、別のクライアントからの接続を受け付けることが可能である。

非同期という言葉が示すように、これらの機能は時間のかかる処理を別のスレッドで行うことを目的としている。

Concurrency Utilities for Java EE は、より汎用的な並行処理の仕組みを提供する。 上記の非同期処理のほかにも、時間のかかる処理を並列化して処理速度を向上したり、タスクの定期実行を行ったりと、様々な用途に使用できる。

どのような処理を実装するかによって、使用する技術を検討するとよいだろう。

まとめ

2回に分けて Concurrency Utilities for Java EE の代表的なAPI であるManagedExecutorServiceManagedScheduledExecutorServiceを紹介した。

Java の Webアプリケーション開発では、スレッドなどの並行処理はサーバーやフレームワークで行うのが一般的だったが、Concurrency Utilities for Java EE の導入により、開発者が独自に並行処理を実装しやすくなった。

参考情報

[前多 賢太郎]