Javaは、ストリーム操作を並列化してマルチコアシステムを活用できます。この記事では、適切な例を使用して、並列ストリームがパフォーマンスをどのように改善できるかを示します。
Javaのストリーム
ストリーム Javaでは、データのコンジットとして表される一連のオブジェクトです。通常、ソースがあります データの場所と宛先 それが送信される場所。ストリームはリポジトリではないことに注意してください。代わりに、配列やコレクションなどのデータソースで動作します。パッセージの中間ビットは、実際にはストリームと呼ばれます。送信のプロセス中、ストリームは通常、フィルタリングや並べ替えなどの1つ以上の可能な変換を通過します。または、データを操作する他のプロセスの場合もあります。これにより、通常、プログラマーのニーズに応じて、元のデータが別の形式にカスタマイズされます。したがって、適用された操作に従って新しいストリームが作成されます。たとえば、ストリームが並べ替えられると、新しいストリームが生成され、その結果が並べ替えられます。これは、新しいデータが元の形式ではなく、元のデータの変換されたコピーであることを意味します。
シーケンシャルストリーム
Javaでのストリーム操作は、並列として明示的に指定されていない限り、順次処理されます。それらは基本的に、パイプラインを処理するために単一のスレッドを使用する非並列ストリームです。基盤となるシステムが並列実行をサポートしている場合でも、シーケンシャルストリームがマルチコアシステムを利用することはありません。たとえば、ストリームを処理するためにマルチスレッドを適用するとどうなりますか?それでも、一度に1つのコアで動作します。ただし、特定のコアに明示的に固定されていない限り、あるコアから別のコアにホップする可能性があります。たとえば、4つの異なるスレッドと4つの異なるコアでの処理は、前者が後者と一致しない場合は明らかに異なります。シングルコア環境で複数のスレッドを実行することはかなり可能ですが、並列処理はまったく別のジャンルです。プログラムは、それをサポートする環境で実行することとは別に、並列プログラミングのためにゼロから設計する必要があります。これが、並列プログラミングが複雑な分野である理由です。
アイデアをさらに説明するために例を試してみましょう。
package org.mano.example; import java.util.Arrays; import java.util.List; public class Main2 { public static oid main(String[] args) { List<Integer> list=Arrays.asList(1,2,3,4,5,6,7,8,9); list.stream().forEach(System.out::println); System.out.println(); list.parallelStream().forEach(System.out::println); } }
出力
123456789 685973214
この例は、動作中のqシーケンシャルストリームとqパラレルストリームの図です。 list.stream() println()を使用して、単一のスレッドで順番に動作します 手術。 list.parallelStream() 一方、基盤となるマルチコア環境を最大限に活用して、並列処理されます。興味深い点は、前のプログラムの出力にあります。シーケンシャルストリームの場合、リストの内容は順序付けられた順序で印刷されます。一方、並列ストリームの出力は順序付けられておらず、プログラムが実行されるたびにシーケンスが変更されます。これは、少なくとも1つのことを意味します。 list.parallelStream()の呼び出しです。 メソッドはprintlnを作成します ステートメントは複数のスレッドで動作します。これはlist.stream() シングルスレッドで行います。
パラレルストリーム
並列ストリームを使用する主な動機は、プログラム全体が並列化されていない場合でも、ストリーム処理を並列プログラミングの一部にすることです。パラレルストリームはマルチコアプロセッサを活用するため、パフォーマンスが大幅に向上します。他の並列プログラミングとは異なり、それらは複雑でエラーが発生しやすいです。ただし、Javaストリームライブラリは、それを簡単かつ信頼できる方法で実行する機能を提供します。プログラム全体が並列化されていない可能性があります。ただし、少なくともストリームを処理する部分は並列化できます。いくつかのメソッドを呼び出すことができ、残りは処理されるという意味で、実際には非常に単純です。それを行うにはいくつかの方法があります。そのような方法の1つは、 parallelStream()を呼び出して並列ストリームを取得することです。 コレクションによって定義されたメソッド 。もう1つの方法は、 parallel()を呼び出すことです。 BaseStreamによって定義されたメソッド シーケンシャルストリーム。シーケンシャルストリームは、呼び出しによって並列化されます。基盤となるプラットフォームは、マルチコアシステムなどの並列プログラミングをサポートする必要があることに注意してください。それ以外の場合、呼び出しには意味がありません。このような場合、呼び出しを行ったとしても、ストリームは順番に処理されます。呼び出しがすでに並列ストリームで行われた場合、それは何もせず、単にストリームを返します。
ストリームに適用された並列処理の結果が順次処理で得られるものと同じになるようにするには、並列ストリームはステートレスで、干渉がなく、連想的である必要があります。
簡単な例
package org.mano.example; import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; public class Main { public static void main(String[] args) { List<Employee> employees = Arrays.asList( new Employee(1276, "FFF",2000.00), new Employee(7865, "AAA",1200.00), new Employee(4975, "DDD",3000.00), new Employee(4499, "CCC",1500.00), new Employee(9937, "GGG",2800.00), new Employee(5634, "HHH",1100.00), new Employee(9276, "BBB",3200.00), new Employee(6852, "EEE",3400.00)); System.out.println("Original List"); printList(employees); // Using sequential stream long start = System.currentTimeMillis(); List<Employee> sortedItems = employees.stream() .sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList()); long end = System.currentTimeMillis(); System.out.println("sorted using sequential stream"); printList(sortedItems); System.out.println("Total the time taken process :" + (end - start) + " milisec."); // Using parallel stream start = System.currentTimeMillis(); List<Employee> anotherSortedItems = employees .parallelStream().sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList()); end = System.currentTimeMillis(); System.out.println("sorted using parallel stream"); printList(anotherSortedItems); System.out.println("Total the time taken process :" + (end - start) + " milisec."); double totsal=employees.parallelStream() .map(e->e.getSalary()) .reduce(0.00,(a1,a2)->a1+a2); System.out.println("Total Salary expense: "+totsal); Optional<Employee> maxSal=employees.parallelStream() .reduce((Employee e1, Employee e2)-> e1.getSalary()<e2.getSalary()?e2:e1); if(maxSal.isPresent()) System.out.println(maxSal.get().toString()); } public static void printList(List<Employee> list) { for (Employee e : list) System.out.println(e.toString()); } } package org.mano.example; public class Employee { private int empid; private String name; private double salary; public Employee() { super(); } public Employee(int empid, String name, double salary) { super(); this.empid = empid; this.name = name; this.salary = salary; } public int getEmpid() { return empid; } public void setEmpid(int empid) { this.empid = empid; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getSalary() { return salary; } public void setSalary(double salary) { this.salary = salary; } @Override public String toString() { return "Employee [empid=" + empid + ", name=" + name + ", salary=" + salary + "]"; } }
前のコードでは、順次実行を使用してストリーム1に並べ替えを適用した方法に注意してください。
List<Employee> sortedItems = employees.stream() .sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList());
並列実行は、コードをわずかに変更することで実現されます。
List<Employee> anotherSortedItems = employees .parallelStream().sorted(Comparator .comparing(Employee::getName)) .collect(Collectors.toList());
また、システム時間を比較して、コードのどの部分に時間がかかるかを把握します。並列操作は、並列ストリームが parallelStream()によって明示的に取得されると開始されます。 方法。 reduce()と呼ばれる別の興味深いメソッドがあります 。このメソッドを並列ストリームに適用すると、操作が異なるスレッドで発生する可能性があります。
ただし、必要に応じて、常にパラレルとシーケンシャルを切り替えることができます。パラレルストリームをシーケンシャルに変更する場合は、 sequenceal()を呼び出すことで変更できます。 BaseStreamで指定されたメソッド 。最初のプログラムで見たように、ストリームで実行される操作は、要素の順序に従って順序付けまたは順序付け解除できます。これは、順序がデータソースに依存することを意味します。ただし、これは並列ストリームの場合の状況ではありません。パフォーマンスを向上させるために、それらは並行して処理されます。これはシーケンスなしで実行されるため、ストリームの各パーティションは調整なしで他のパーティションとは独立して処理されるため、結果は予期せず順序付けられません。ただし、順序付けする並列ストリームの各要素に対して具体的に操作を実行する場合は、 forEachOrdered()を検討できます。 forEach()の代わりとなるメソッド メソッド。
結論
ストリームAPIは長い間Javaの一部でしたが、並列処理の微調整を追加することは非常に歓迎され、同時に非常に興味深い機能です。最近のマシンはマルチコアであり、並列プログラミングの設計が複雑であるという汚名があるため、これは特に当てはまります。 Javaによって提供されるAPIは、順次実行の全体的な設計を備えたJavaプログラムに並列プログラミングの微調整を組み込む機能を提供します。これはおそらくこの機能の最良の部分です。