Java8 Stream APIの基本(2) - 中間操作の種類と並列処理、副作用
Java 8 Stream API の中間操作の種類・並列処理・それらの注意点についてまとめておく。
プリミティブ型のStream
中間操作のメソッドを見ると、map
の他に、mapToInt
,mapToLong
, mapToDouble
のような、プリミティブ型への変換を行うメソッドがある。
また、 Stream
の他に、 IntStream
, LongStream
, DoubleStream
というプリミティブ型に対応したStreamもある。
javaは歴史的な経緯から、プリミティブ型と参照型を分けているので、Streamでも同様の対応が取られているようだ。
効率が気になる場合は、プリミティブStreamの使用を検討するとよいだろう。
mapToXXX
は、StreamからプリミティブStreamへの変換を行う。 プリミティブStreamには、 max
, min
,sum
, average
といった集計用のメソッドが予め用意されている。
通常のStreamでもreduce
やcollect
などの終端操作を利用すれば集計処理は実現できるが、ロジックが煩雑になるため、プリミティブStreamを用いるほうがコードは簡潔になる。
// reduceにより年齢の合計を出し、件数で割る方式。最も複雑 double avg = list.stream().reduce( 0.0, (res, u) -> res + u.getAge(), (r1, r2) -> r1 + r2) / list.size(); // collectでユーティリティaveragingIntを用いる方式。 double avg2 = list.stream().collect(Collectors.averagingInt(User::getAge)); // プリミティブStreamを用いる方式。 double avg3 = list.stream().mapToInt(User::getAge).average().getAsDouble();
また、プリミティブStreamからStreamへの変換を行うには、中間操作 boxed
を用いる。
下記のように連番を生成して変換を行うような場合に便利だ。
IntStream.range(0, 100).boxed() .map(i -> "num=" + i).collect(Collectors.toList());
中間操作の種類
中間操作はstateless(状態を持たない)、 statefull(状態を持つ)の2種類がある。 またこれとは別に、中間操作・終端操作は、short-circuit(短絡的)な処理を行うものとそうでないものに分類できる。
stateless, statefullについて
一般的に中間操作と言うと、statelessな中間操作を指すことが多い。
stateless,statefullの違いを端的に言うと、処理の対象がStreamの1つの要素だけか、そうでないかの差である。
例えば、Stream#fillter
はStreamの1要素が条件に合致するかを判定するだけであるため、これはstatelessな中間操作である。
一方、Stream#sorted
は Streamの各要素を順番に並べ替えるメソッドで、順番に並べ替えるためにはStreamの中の全要素の比較が必要がある。
同様に Stream#distinct
もStreamの中の同一の要素を1つにまとめるメソッドで、同一性を判定するためにはStreamの要素同士の比較が必要になる。
これらはstatefullな中間操作である。
Stream APIは並列処理が簡単に行える事が特徴の1つとなっている。
Streamの並列処理は簡単に言うと、要素1つの対する処理(ループ1回分の処理)をマルチスレッドで分割して実行するようなものである。
statelessな操作は、Streamの他の要素に影響を受けることなく実行できるので、並列処理と相性が良い。
一方statefullな操作は他の要素との関連があるため、並列で実行すると効率が落ちる可能性がある(後述)。
statefullな操作は、無限ストリームを対象とする場合に注意する必要がある。以下のコードは無限ループに陥る。
Stream.iterate(1000, n -> n - 1).sorted().limit(1000).collect(Collectors.toList());
これは、sorted
メソッドがStream
の全要素を取得して比較を行おうとするためである。
このコードを正しく動かすには、sorted
とlimit
の順序を入れ替える必要がある。
short-circuitについて
short-circuitな処理とはStream
の全要素を対象としない操作を指す。
Stream#skip
は指定された件数分Stream
の要素を読み飛ばし、Stream#limit
は指定された件数までしかStream
の要素を読み取らないようにする。
これらはshort-circuitな(かつ statefullな)中間操作である。
short-circuitな処理は無限ストリームの制御や、効率化のために用いる。
例えば、Stream
の最初の要素を取得するには、
stream.collect(Collectors.toList()).get(0)
とするより、
stream.findFirst()
とする方が1度しかループが実行されないため効率が良い。
並列処理について
Stream
に対して Stream#parallel
を実行すると、Stream
の各要素は並列に実行される。
逆に Stream#sequential
を実行すると、逐次実行となる。
parallel/sequential
はStream
全体に対する設定である。
特定の中間操作のみを並列・逐次実行のどちらかに切り替えて実行するようにはなっていない。
もしこのような処理を行うのなら、一度終端操作でStream
を閉じ、新たにStream
を作成する必要がある。
順序
原則的に、Stream
の処理は、逐次実行/並列実行のいずれで動作させても順番が変わらないように、内部的に順番を管理している。
以下のコードは1から10000までの数字から偶数であるものを抽出し、2000番目以降から2000個の要素を取得する処理を並列実行する。
(peek
は任意の処理を実行する中間操作であり、デバッグなどに用いる。)
List<Integer> list = new ArrayList<>(); for (int i = 0; i < 10000; i++) list.add(i); // 並列実行 List<Integer> result = list.stream().parallel() .filter(x -> x % 2 == 0) .peek(s -> System.out.println(s + ":" + Thread.currentThread())) .skip(2000) .limit(2000) .collect(Collectors.toList()); // 結果の確認 System.out.printf("from %d to %d\n", result.get(0), result.get(1999)); int start = result.get(0); for (int i : result) { if (i != start) throw new RuntimeException("" + i); start += 2; }
このコードを実行し、peek
が出力する内容を見ると、複数のスレッドで処理が実行されている事がわかる、そして、結果のリストの順序は入力のリストと同じになる。
statefullな操作との関連
statefullな操作を並列処理で行うと効率が悪くなる場合があるため、逐次実行での処理を検討した方がよい。
これは、複数スレッドに分割して割り当てられた各要素が操作に必要になったり、前述した順序付け制約のために全要素を取得する必要があったりするからである(後述)。
実際、skip
,limit
,distinct
のJavadocでも、なるべく逐次実行にすべきことや、Stream#unordered
を使うことで効率を改善できる可能性を示唆している。
Stream#unordered
はストリーム生成元から順序を保証しないようにする命令であり、並列処理の結果の順序が問題にならないのなら効率を改善できる可能性がある。
ただし、unordered
を用いたからと言って必ず効率が良くなったり、順序がバラバラになるわけでもないようだ。
これを以下のコードで試してみた。
public static void main(String[] args) { // 有限ストリームのテスト List<Integer> list = new ArrayList<>(); for (int i = 0; i < 2000; i++) list.add(i); test(list.stream()); // 無限ストリームのテスト test(Stream.iterate(0, n -> n + 1)); } public static void test(Stream<Integer> stream) { List<Integer> result = stream.parallel() .unordered() .map(x -> x + 1) .skip(1000) .limit(300) .collect(Collectors.toList()); System.out.printf("from %d to %d\n", result.get(0), result.get(result.size()-1)); System.out.println(result); }
上記のコードを実行すると、List
から生成した有限ストリームの場合、順序は変わらなかったが、無限ストリームの場合は取得される結果が一意に定まらなかった。
(ただしこの結果は、要素数や実行環境によって変わる可能性がある)。
また、次のように無限ストリームに対して、並列処理でunordered
なしでskip
を実行すると無限ループに陥る。
Stream.iterate(1, n -> n + 1) .parallel() .skip(1000) .limit(1000) .forEach(System.out::println);
skip
は逐次実行の場合、先頭から要素を回数分読み飛ばせばよいだけの、非常に単純な処理である。
しかし並列実行の場合、順序を保証するためには、各スレッドに分割された要素の処理を全て行ってからでないと、
元々の要素の順番がわからない。よって、全要素を評価しようとして無限ループに陥る。
この問題は、sequential
またはunordered
を指定することで対処できる。
副作用
Stream
の外部にある変数の状態を、Stream
内部で変更するような処理(副作用を起こす処理)は避けた方が無難である。
ラムダ式は外部にある参照の変わらない変数(実質final変数)を保持できる。
ただし以下にあるように、外部参照の状態を変更すると、statelessな操作も実質的にstatelessではなくなってしまう。
Javadocに記載されているが、Stream
の結果をforEach
で外部のリストに格納するようなコードは避けた方が良い。
List<String> result = new ArrayList<>(); //実質的にfinalな変数 Stream.of(1,2,3,4,5).map(n -> "" + n) .forEach(n -> result.add(n) ); System.out.println(result);
これは問題なく動作するように見えるが、Stream
を並列実行に変更すると、result
への格納順が不定となってしまうし、同期化されないので状態が不正となる可能性がある。
これを解消するためにはこのような外部変数を使うコードを避け、Stream
だけで完結するようにするべきである。
List<String> result = Stream.of(1,2,3,4,5).map(n -> "" + n) .parallel().collect(Collectors.toList()); System.out.println(result);
以下のように2つのリストの各要素を1組ずつ組み合わせる場合も、問題がある。
public static Stream<String> fullNames(List<String> first, List<String> last) { // メソッド内では逐次実行としているが, Iterator<String> firstItr = first.iterator(); return last.stream().map(l -> firstItr.next() + " " + l); } public static void main(String[] args) { List<String> firstNames = Arrays.asList("taro", "eiji", "ichiro"); List<String> lasttNames = Arrays.asList("yamada", "tanaka", "takahashi"); fullNames(firstNames, lasttNames) .parallel() // 呼び出し方で並列実行に設定できる。 .forEach(System.out::println); // 姓と名が対応しない。 }
上記の通り、Stream
を戻り値とする場合は、呼び出し側で並列実行に変更される可能性を考慮するべきであり、それを避けるならメソッド内で終端処理を実施するべきだろう。
ただし、Stream
で様々な処理を行うには2つのStream
を合成するような処理が必要となってくる。
並列実行されても順序を保ちつつStream
を合成する方法については後日解説する。
[前多 賢太郎]