読者です 読者をやめる 読者になる 読者になる

エンタープライズギークス (Enterprise Geeks)

企業システムの企画・開発に携わる技術者集団のブログです。開発言語やフレームワークなどアプリケーション開発に関する各種情報を発信しています。ウルシステムズのエンジニア有志が運営しています。

Java8 Stream APIの基本(2) - 中間操作の種類と並列処理、副作用

Java 8 Stream API の中間操作の種類・並列処理・それらの注意点についてまとめておく。

プリミティブ型のStream

中間操作のメソッドを見ると、mapの他に、mapToInt,mapToLong, mapToDoubleのような、プリミティブ型への変換を行うメソッドがある。
また、 Stream の他に、 IntStream, LongStream, DoubleStreamというプリミティブ型に対応したStreamもある。
javaは歴史的な経緯から、プリミティブ型と参照型を分けているので、Streamでも同様の対応が取られているようだ。 効率が気になる場合は、プリミティブStreamの使用を検討するとよいだろう。

mapToXXXは、StreamからプリミティブStreamへの変換を行う。 プリミティブStreamには、 max, min,sum, average といった集計用のメソッドが予め用意されている。
通常のStreamでもreducecollectなどの終端操作を利用すれば集計処理は実現できるが、ロジックが煩雑になるため、プリミティブStreamを用いるほうがコードは簡潔になる。

// reduceにより年齢の合計を出し、件数で割る方式。最も複雑
double avg = list.stream().reduce(
       0.0, (res, u) -> res + u.getAge(), (r1, r2) -> r1 + r2) / list.size();
// collectでユーティリティaveragingIntを用いる方式。
double avg2 = list.stream().collect(Collectors.averagingInt(User::getAge));
// プリミティブStreamを用いる方式。
double avg3 = list.stream().mapToInt(User::getAge).average().getAsDouble();

また、プリミティブStreamからStreamへの変換を行うには、中間操作 boxed を用いる。 下記のように連番を生成して変換を行うような場合に便利だ。

IntStream.range(0, 100).boxed()
        .map(i -> "num=" + i).collect(Collectors.toList());

中間操作の種類

中間操作はstateless(状態を持たない)、 statefull(状態を持つ)の2種類がある。 またこれとは別に、中間操作・終端操作は、short-circuit(短絡的)な処理を行うものとそうでないものに分類できる。

stateless, statefullについて

一般的に中間操作と言うと、statelessな中間操作を指すことが多い。
stateless,statefullの違いを端的に言うと、処理の対象がStreamの1つの要素だけか、そうでないかの差である。
例えば、Stream#fillter はStreamの1要素が条件に合致するかを判定するだけであるため、これはstatelessな中間操作である。
一方、Stream#sorted は Streamの各要素を順番に並べ替えるメソッドで、順番に並べ替えるためにはStreamの中の全要素の比較が必要がある。
同様に Stream#distinct もStreamの中の同一の要素を1つにまとめるメソッドで、同一性を判定するためにはStreamの要素同士の比較が必要になる。
これらはstatefullな中間操作である。

Stream APIは並列処理が簡単に行える事が特徴の1つとなっている。 Streamの並列処理は簡単に言うと、要素1つの対する処理(ループ1回分の処理)をマルチスレッドで分割して実行するようなものである。
statelessな操作は、Streamの他の要素に影響を受けることなく実行できるので、並列処理と相性が良い。
一方statefullな操作は他の要素との関連があるため、並列で実行すると効率が落ちる可能性がある(後述)。

statefullな操作は、無限ストリームを対象とする場合に注意する必要がある。以下のコードは無限ループに陥る。

Stream.iterate(1000, n -> n - 1).sorted().limit(1000).collect(Collectors.toList());

これは、sortedメソッドStreamの全要素を取得して比較を行おうとするためである。
このコードを正しく動かすには、sortedlimitの順序を入れ替える必要がある。

short-circuitについて

short-circuitな処理とはStreamの全要素を対象としない操作を指す。
Stream#skipは指定された件数分Streamの要素を読み飛ばし、Stream#limit は指定された件数までしかStreamの要素を読み取らないようにする。
これらはshort-circuitな(かつ statefullな)中間操作である。

short-circuitな処理は無限ストリームの制御や、効率化のために用いる。
例えば、Streamの最初の要素を取得するには、

stream.collect(Collectors.toList()).get(0)

とするより、

stream.findFirst()

とする方が1度しかループが実行されないため効率が良い。

並列処理について

Streamに対して Stream#parallel を実行すると、Streamの各要素は並列に実行される。 逆に Stream#sequential を実行すると、逐次実行となる。

parallel/sequentialStream全体に対する設定である。 特定の中間操作のみを並列・逐次実行のどちらかに切り替えて実行するようにはなっていない。 もしこのような処理を行うのなら、一度終端操作でStreamを閉じ、新たにStreamを作成する必要がある。

順序

原則的に、Streamの処理は、逐次実行/並列実行のいずれで動作させても順番が変わらないように、内部的に順番を管理している。
以下のコードは1から10000までの数字から偶数であるものを抽出し、2000番目以降から2000個の要素を取得する処理を並列実行する。
peekは任意の処理を実行する中間操作であり、デバッグなどに用いる。)

List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10000; i++) list.add(i);
// 並列実行
List<Integer> result =  
    list.stream().parallel()
        .filter(x -> x % 2 == 0)
        .peek(s -> System.out.println(s + ":" + Thread.currentThread()))
        .skip(2000)
        .limit(2000)
        .collect(Collectors.toList());
// 結果の確認
System.out.printf("from %d to %d\n", result.get(0), result.get(1999));
int start = result.get(0);
for (int i : result) {
    if (i != start) throw new RuntimeException("" + i);
    start += 2;
}

このコードを実行し、peekが出力する内容を見ると、複数のスレッドで処理が実行されている事がわかる、そして、結果のリストの順序は入力のリストと同じになる。

statefullな操作との関連

statefullな操作を並列処理で行うと効率が悪くなる場合があるため、逐次実行での処理を検討した方がよい。
これは、複数スレッドに分割して割り当てられた各要素が操作に必要になったり、前述した順序付け制約のために全要素を取得する必要があったりするからである(後述)。
実際、skip,limit,distinctJavadocでも、なるべく逐次実行にすべきことや、Stream#unorderedを使うことで効率を改善できる可能性を示唆している。

Stream#unorderedはストリーム生成元から順序を保証しないようにする命令であり、並列処理の結果の順序が問題にならないのなら効率を改善できる可能性がある。
ただし、unorderedを用いたからと言って必ず効率が良くなったり、順序がバラバラになるわけでもないようだ。
これを以下のコードで試してみた。

public static void main(String[] args) {
    // 有限ストリームのテスト
    List<Integer> list = new ArrayList<>();
    for (int i = 0; i < 2000; i++) list.add(i);
    test(list.stream());

    // 無限ストリームのテスト
    test(Stream.iterate(0, n -> n + 1));
}

public static void test(Stream<Integer> stream) {
    List<Integer> result =  
        stream.parallel()
            .unordered()
            .map(x -> x + 1)
            .skip(1000)
            .limit(300)
            .collect(Collectors.toList());
    System.out.printf("from %d to %d\n", result.get(0), result.get(result.size()-1));
    System.out.println(result);
}

上記のコードを実行すると、Listから生成した有限ストリームの場合、順序は変わらなかったが、無限ストリームの場合は取得される結果が一意に定まらなかった。
(ただしこの結果は、要素数や実行環境によって変わる可能性がある)。

また、次のように無限ストリームに対して、並列処理でunorderedなしでskipを実行すると無限ループに陥る。

Stream.iterate(1, n -> n + 1)
    .parallel()
    .skip(1000)
    .limit(1000)
    .forEach(System.out::println);

skip逐次実行の場合、先頭から要素を回数分読み飛ばせばよいだけの、非常に単純な処理である。
しかし並列実行の場合、順序を保証するためには、各スレッドに分割された要素の処理を全て行ってからでないと、 元々の要素の順番がわからない。よって、全要素を評価しようとして無限ループに陥る。
この問題は、sequentialまたはunordered を指定することで対処できる。

副作用

Streamの外部にある変数の状態を、Stream内部で変更するような処理(副作用を起こす処理)は避けた方が無難である。
ラムダ式は外部にある参照の変わらない変数(実質final変数)を保持できる。 ただし以下にあるように、外部参照の状態を変更すると、statelessな操作も実質的にstatelessではなくなってしまう。 Javadocに記載されているが、Streamの結果をforEach で外部のリストに格納するようなコードは避けた方が良い。

List<String> result = new ArrayList<>(); //実質的にfinalな変数

Stream.of(1,2,3,4,5).map(n -> "" + n)
        .forEach(n -> result.add(n) );
System.out.println(result);

これは問題なく動作するように見えるが、Streamを並列実行に変更すると、resultへの格納順が不定となってしまうし、同期化されないので状態が不正となる可能性がある。
これを解消するためにはこのような外部変数を使うコードを避け、Streamだけで完結するようにするべきである。

List<String> result = Stream.of(1,2,3,4,5).map(n -> "" + n)
        .parallel().collect(Collectors.toList());
System.out.println(result);

以下のように2つのリストの各要素を1組ずつ組み合わせる場合も、問題がある。

public static Stream<String> fullNames(List<String> first, List<String> last) {
    // メソッド内では逐次実行としているが,
    Iterator<String> firstItr = first.iterator();
    return last.stream().map(l -> firstItr.next() + " " + l);
}

public static void main(String[] args) {
   List<String> firstNames = Arrays.asList("taro", "eiji", "ichiro");
   List<String> lasttNames = Arrays.asList("yamada", "tanaka", "takahashi");
   
    fullNames(firstNames, lasttNames)
            .parallel() // 呼び出し方で並列実行に設定できる。
            .forEach(System.out::println); // 姓と名が対応しない。
}

上記の通り、Streamを戻り値とする場合は、呼び出し側で並列実行に変更される可能性を考慮するべきであり、それを避けるならメソッド内で終端処理を実施するべきだろう。
ただし、Streamで様々な処理を行うには2つのStreamを合成するような処理が必要となってくる。 並列実行されても順序を保ちつつStreamを合成する方法については後日解説する。

[前多 賢太郎]