In the first part of this tutorial on the Java 8 Stream API, we covered what streams are and some of their most common operations.
Without further ado, let's continue with the methods used to program streams in a functional style. After that, we'll take a look at parallel streams.
map()
is used to transform the value or the type of the elements of a stream:
1<R> Stream<R> map(Function<? super T,? extends R> mapper)
2IntStream mapToInt(ToIntFunction<? super T> mapper)
3LongStream mapToLong(ToLongFunction<? super T> mapper)
4DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper)
As you can see, map()
takes a Function
to convert the elements of a stream of type T
to type R
, returning a stream of that type R
:
1Stream.of('a', 'b', 'c', 'd', 'e')
2 .map(c -> (int)c)
3 .forEach(i -> System.out.format("%d ", i));
The output:
197 98 99 100 101
There are versions for transforming to primitive types. For example:
1IntStream.of(100, 110, 120, 130 ,140)
2 .mapToDouble(i -> i/3.0)
3 .forEach(i -> System.out.format("%.2f ", i));
Will output:
133.33 36.67 40.00 43.33 46.67
flatMap()
is used to flatten (or combine) the elements of a stream into one (new) stream:
1<R> Stream<R> flatMap(Function<? super T,
2 ? extends Stream<? extends R>> mapper)
3DoubleStream flatMapToDouble(Function<? super T,
4 ? extends DoubleStream> mapper)
5IntStream flatMapToInt(Function<? super T,
6 ? extends IntStream> mapper)
7LongStream flatMapToLong(Function<? super T,
8 ? extends LongStream> mapper)
From its signature (and the signature of the primitive versions) we can see that, in contrast to map()
which returns a single value, flatMap()
must return a Stream. If flatMap()
maps to null
, the return value will be an empty stream, not null
itself.
Let's see how this works. Suppose we have a stream comprising lists of characters:
1List<Character> aToD = Arrays.asList('a', 'b', 'c', 'd');
2List<Character> eToG = Arrays.asList('e', 'f', 'g');
3Stream<List<Character>> stream = Stream.of(aToD, eToG);
We want to convert all the characters to their int representation. Notice through the code below that we we can't use map()
anymore; c represents an object of type List<Character>
, not Character
:
1stream .map(c -> (int)c)
Instead, we need to get the elements of the lists into one stream and then convert each character to an int. Fortunately, we have flatMap()
to combine the list elements into a single Stream object:
1stream
2 .flatMap(l -> l.stream())
3 .map(c -> (int)c)
4 .forEach(i -> System.out.format("%d ", i));
This outputs the following:
197 98 99 100 101 102 103
flatMap()
returns a stream whilemap()
returns an element.
Using peek()
(which just executes the provided expression and returns a new stream with the same elements of the original one) after flatMap()
may clarify how the elements are processed:
1stream
2 .flatMap(l -> l.stream())
3 .peek(System.out::print)
4 .map(c -> (int)c)
5 .forEach(i -> System.out.format("%d ", i));
As you can see from the output, the stream returned from flatMap()
is passed through the pipeline, as if we were working with a stream of single elements rather than a stream of lists of elements:
1a97 b98 c99 d100 e101 f102 g103
This way, with flatMap()
you can convert a Stream<List<Object>>
to Stream<Object>
. However, the important concept is that this method returns a stream and not a single element (as map()
does).
A reduction is an operation that takes many elements and combines them to reduce them into a single value or object. Reduction is done by applying an operation multiple times.
Some examples of reductions include summing N
elements, finding the maximum element of N
numbers, or counting elements.
In the following example, we use a for
loop to reduce an array of numbers to their sum:
1int[] numbers = {1, 2, 3, 4, 5, 6};
2int sum = 0;
3for(int n : numbers) {
4 sum += n;
5}
Of course, making reductions with streams instead of loops has benefits, such as easier parallelization and improved readability.
The Stream interface has two methods for reduction:
1collect()
2reduce()
We can implement reductions with both of these methods, but collect()
helps us implement a type of reduction called mutable reduction, where a container (like a Collection
) is used to accumulate the result of the operation.
The other reduction operation, reduce()
, has three versions:
1Optional<T> reduce(BinaryOperator<T> accumulator)
2
3T reduce(T identity,
4 BinaryOperator<T> accumulator)
5
6<U> U reduce(U identity,
7 BiFunction<U,? super T,U> accumulator,
8 BinaryOperator<U> combiner)
Remember that a BinaryOperator<T>
is equivalent to a BiFunction<T, T, T>
, where the two arguments and the return type are all of the same types.
Let's start with the version that takes one argument. This is equivalent to:
1boolean elementsFound = false;
2T result = null;
3for (T element : stream) {
4 if (!elementsFound) {
5 elementsFound = true;
6 result = element;
7 } else {
8 result = accumulator.apply(result, element);
9 }
10return elementsFound ? Optional.of(result)
11 : Optional.empty();
This code just applies a function for each element, accumulating the result and returning an Optional
wrapping that result, or an empty Optional
if there were no elements.
Let's see a concrete example. We just see how a sum is a reduce operation:
1int[] numbers = {1, 2, 3, 4, 5, 6};
2int sum = 0;
3for(int n : numbers) {
4 sum += n;
5}
Here, the accumulator operation is:
1sum += n; //or sum = sum + n
This translates to:
1OptionalInt total = IntStream.of(1, 2, 3, 4, 5, 6)
2 .reduce( (sum, n) -> sum + n );
Notice how the primitive version of Stream uses the primitive version of Optional
.
This is what happens step by step:
BinaryOperator
represented by the lambda expression (sum, n) -> sum + x
.BinaryOperator
.However, what if you need to have an initial value? For cases like that, we have the version that takes two arguments:
1T reduce(T identity, BinaryOperator<T> accumulator)
The first argument is the initial value, and it is called the identity because, strictly speaking, this value must be an identity for the accumulator function. In other words, for each value v, accumulator.apply(identity, v) must be equal to v.
This version of reduce()
is equivalent to:
1T result = identity;
2for (T element : stream) {
3 result = accumulator.apply(result, element);
4}
5return result;
Notice that this version does not return an Optional
object because if the stream empty, the identity value is returned.
For example, the sum example can be rewritten as:
1int total = IntStream.of(1, 2, 3, 4, 5, 6)
2 .reduce( 0,
3 (sum, n) -> sum + n ); // 21
Or using a different initial value:
1int total = IntStream.of(1, 2, 3, 4, 5, 6)
2 .reduce( 4,
3 (sum, n) -> sum + n ); // 25
However, notice that in the example above, the first value cannot be considered an identity because, for instance, 4 + 1
is not equal to 4
.
This can bring some problems when working with parallel streams, which we'll review in a few moments.
Now, notice that with these versions, you take elements of type T
and return a reduced value of type T
as well.
However, if you want to return a reduced value of a different type, you have to use the three arguments version of reduce()
:
1<U> U reduce(U identity,
2 BiFunction<U,? super T, U> accumulator,
3 BinaryOperator<U> combiner)
This is equivalent to using:
1U result = identity;
2for (T element : stream) {
3 result = accumulator.apply(result, element)
4}
5return result;
Consider for example that we want to get the sum of the length of all strings of a stream, so we take strings (type T
), and we want an integer result (type U
).
In that case, we use reduce()
like this:
1int length =
2 Stream.of("Parallel", "streams", "are", "great")
3 .reduce(0,
4 (accumInt, str) ->
5 accumInt + str.length(), //accumulator
6 (accumInt1, accumInt2) ->
7 accumInt1 + accumInt2);//combiner
We can make it clearer by adding the argument types:
1int length =
2 Stream.of("Parallel", "streams", "are", "great")
3 .reduce(0,
4 (Integer accumInt, String str) ->
5 accumInt + str.length(), //accumulator
6 (Integer accumInt1, Integer accumInt2) ->
7 accumInt1 + accumInt2);//combiner
As the accumulator function adds a mapping (transformation) step to the accumulator function, this version of the reduce()
can be written as a combination of map()
and the other versions of the reduce()
method (you may know this as the map-reduce pattern):
1int length =
2 Stream.of("Parallel", "streams", "are", "great")
3 .mapToInt(s -> s.length())
4 .reduce(0,
5 (sum, strLength) ->
6 sum + strLength);
Or simply:
1int length = Stream.of("Parallel", "streams", "are", "great")
2 .mapToInt(s -> s.length())
3 .sum();
In fact, the calculation operations that we learned about in the first part are implemented as reduce operations under the hood:
1average
2count
3max
4min
5sum
Also, notice that if we return a value of the same type, the combiner function is no longer necessary (it turns out that this function is the same as the accumulator function). So, in this case, it's better to use the two argument version.
It's recommended to use the three version reduce()
method when:
On the other hand, collect()
has two versions:
1<R,A> R collect(Collector<? super T,A,R> collector)
2
3<R> R collect(Supplier<R> supplier,
4 BiConsumer<R,? super T> accumulator,
5 BiConsumer<R,R> combiner)
The first version uses predefined collectors from the Collectors
class while the second one allows you to create your own collectors. Primitive streams (like IntStream
) only have this last version of collect()
.
Remember that collect()
performs a mutable reduction on the elements of a stream, which means that it uses a mutable object for accumulating, like a Collection
or a StringBuilder
. In contrast, reduce()
combines two elements to produce a new one and represents an immutable reduction.
However, let's start with the version that takes three arguments, as it's similar to the reduce()
version that also takes three arguments.
As you can see from its signature, first, it takes a Supplier
that returns the object that will be used and returned as a container (accumulator).
The second parameter is an accumulator function, which takes the container and the element to be added to it.
The third parameter is the combiner function, which merges the intermediate results into the final one (useful when working with parallel streams).
This version of collect() is equivalent to:
1R result = supplier.get();
2for (T element : stream) {
3 accumulator.accept(result, element);
4}
5return result;
For example, if we want to "reduce" or "collect" all the elements of a stream into a List
, use the following algorithm:
1List<Integer> list =
2 Stream.of(1, 2, 3, 4, 5)
3 .collect(
4 () -> new ArrayList<>(),// Creating the container
5 (l, i) -> l.add(i), // Adding an element
6 (l1, l2) -> l1.addAll(l2) // Combining elements
7 );
We can make it clearer by adding the argument types:
1List<Integer> list =
2 Stream.of(1, 2, 3, 4, 5)
3 .collect(
4 () -> new ArrayList<>(),
5 (l, i) -> l.add(i),
6 (l1, l2) -> l1.addAll(l2)
7 );
We can also use method references:
1List<Integer> list =
2 Stream.of(1, 2, 3, 4, 5)
3 .collect(
4 ArrayList::new,
5 ArrayList::add,
6 ArrayList::addAll
7 );
The previous version of collect()
is useful to learn how collectors work, but in practice, it's better to use the other version.
Some common collectors of the Collectors
class are:
toList
Accumulates elements into a List
.toSet
Accumulates elements into a Set
.toCollection
Accumulates elements into a Collection
implementation.toMap
Accumulates elements into a Map
.joining
Concatenates elements into a String
.groupingBy
Groups elements of type T
in lists according to a classification function, into a map with keys of type K
.partitioningBy
Partitions elements of type T
in lists according to a predicate, into a map.Since calculation methods can be implemented as reductions, the Collectors
class also provides them as collectors:
averagingInt/averagingLong/averagingDouble
Methods return the average of the input elements.counting
Counts the elements of input elements.maxBy
Returns the maximum element according to a given Comparator
.minBy
Returns the minimum element according to a given Comparator
.summingInt/summingLong/summingDouble
Returns the sum of the input elements.This way, we can rewrite our previous example:
1List<Integer> list =
2 Stream.of(1, 2, 3, 4, 5)
3 .collect(
4 ArrayList::new,
5 ArrayList::add,
6 ArrayList::addAll
7 );
As:
1List<Integer> list =
2 Stream.of(1, 2, 3, 4, 5)
3 .collect(Collectors.toList()); // [1, 2, 3, 4, 5]
Since all these methods are static
, we can use static
imports.
1import static java.util.stream.Collectors.*;
2...
3List<Integer> list =
4 Stream.of(1, 2, 3, 4, 5)
5 .collect(toList()); // [1, 2, 3, 4, 5]
If we are working with streams of strings, we can join all the elements into one String
with:
1String s = Stream.of("a", "simple", "string")
2 .collect(joining()); // "asimplestring"
We can also pass a separator:
1String s = Stream.of("a", "simple", "string")
2 .collect(joining(" ")); // " a simple string"
And a prefix and a suffix:
1String s = Stream.of("a", "simple", "string")
2 .collect(
3 joining(" ", "This is ", ".")
4 ); // "This is a simple string."
The calculation methods are easy to use. Except for counting()
, they either take a Function
to produce a value to apply the operation or (in the case of maxBy
and minBy
) they take a Comparator
to produce the result:
1double avg = Stream.of(1, 2, 3)
2 .collect(averagingInt(i -> i * 2)); // 4.0
3
4long count = Stream.of(1, 2, 3)
5 .collect(counting()); // 3
6
7Stream.of(1, 2, 3)
8 .collect(maxBy(Comparator.naturalOrder()))
9 .ifPresent(System.out::println); // 3
10
11Integer sum = Stream.of(1, 2, 3)
12 .collect(summingInt(i -> i)); // 6
The Collectors
class also provides two functions to group the elements of a stream into a list, in a kind of an SQL GROUP BY
style.
The first method is groupingBy()
and it has three versions. This is the first one:
1groupingBy(Function<? super T,? extends K> classifier)
It takes a Function
that classifies elements of type T
, groups them into a list and returns the result in a Map
where the keys (of type K
) are the Function
returned values.
For example, if we want to group a stream of numbers by the range they belong (tens, twenties, etc.), we can do it with something like this:
1Map<Integer, List<Integer>> map =
2 Stream.of(2, 34, 54, 23, 33, 20, 59, 11, 19, 37)
3 .collect( groupingBy (i -> i/10 * 10 ) );
The moment you compare this code with the iterative method (with a for
loop), you realize the power of streams and collect()
. Just look at how many lines of code are used in the traditional implementation.
1List<Integer> stream =
2 Arrays.asList(2,34,54,23,33,20,59,11,19,37);
3Map<Integer, List<Integer>> map = new HashMap<>();
4
5for(Integer i : stream) {
6 int key = i/10 * 10;
7 List<Integer> list = map.get(key);
8
9 if(list == null) {
10 list = new ArrayList<>();
11 map.put(key, list);
12 }
13 list.add(i);
14}
In the end, both strategies return the same map.
1{0=[2], 50=[54,59], 20=[23,20], 10=[11,19], 30=[34,33,37]}
As you may have noticed, the second version takes a downstream collector as an additional argument:
1groupingBy(Function<? super T,? extends K> classifier,
2 Collector<? super T,A,D> downstream)
A downstream collector is a collector that is applied to the results of another collector.
We can use any collector here, for instance, to count the elements in each group of the previous example:
1Map<Integer, Long> map =
2 Stream.of(2, 34, 54, 23, 33, 20, 59, 11, 19, 37)
3 .collect(
4 groupingBy(i -> i/10 * 10,
5 counting()
6 )
7 );
Notice how the type of the values of the Map
change to reflect the type returned by the downstream collector, counting()
.
This will return the following map:
1{0=1, 50=2, 20=2, 10=2, 30=3}
We can even use another groupingBy()
to classify the elements in a second level. For instance, instead of counting, we can further classify the elements as even or odd:
1Map<Integer, Map<String, List<Integer>>> map =
2 Stream.of(2,34,54,23,33,20,59,11,19,37)
3 .collect(groupingBy(i -> i/10 * 10,
4 groupingBy(i ->
5 i%2 == 0 ? "EVEN" : "ODD")
6 )
7 );
This will return the following map (with a little formatting):
1{
2 0 = {EVEN=[2]},
3 50 = {EVEN=[54], ODD=[59]},
4 20 = {EVEN=[20], ODD=[23]},
5 10 = {ODD=[11, 19]},
6 30 = {EVEN=[34], ODD=[33, 37]}
7}
The key to the high-level map is an Integer
because the first groupingBy()
returns a one.
The type of the values of the high-level map changed (again) to reflect the type returned by the downstream collector, groupingBy()
.
In this case, a String
is returned; this will be the type of the keys of the second-level map, and since we are working with an Integer
Stream, the values have a type of List<Integer>
.
Seeing the output of these examples, you may be wondering, is there a way to order the results?
Well, TreeMap
is the only implementation of Map
that is ordered. Fortunately, the third version of groupingBy()
has a Supplier
argument that lets us choose the type of the resulting Map
:
1groupingBy(Function<? super T,? extends K> classifier,
2 Supplier<M> mapFactory,
3 Collector<? super T,A,D> downstream)
This way, if we pass an instance of TreeMap:
1Map<Integer, Map<String, List<Integer>>> map =
2 Stream.of(2,34,54,23,33,20,59,11,19,37)
3 .collect( groupingBy(i -> i/10 * 10,
4 TreeMap::new,
5 groupingBy(i -> i%2 == 0 ? "EVEN" : "ODD")
6 )
7 );
This will return the following map:
1{
2 0 = {EVEN=[2]},
3 10 = {ODD=[11, 19]},
4 20 = {EVEN=[20], ODD=[23]},
5 30 = {EVEN=[34], ODD=[33, 37]},
6 50 = {EVEN=[54], ODD=[59]}
7}
The second method for grouping is partitioningBy()
.
The difference with groupingBy()
is that partitioningBy()
will return a Map
with a Boolean
as the key type, which means there are only two groups, one for true
and one for false
.
There are two versions of this method. The first one is:
1partitioningBy(Predicate<? super T> predicate)
It partitions the elements according to a Predicate
and organizes them into a Map<Boolean, List<T>>
.
For example, if we want to partition a stream of numbers by the ones that are less than 50
and the ones that don't, we can do it this way:
1Map<Boolean, List<Integer>> map =
2 Stream.of(45, 9, 65, 77, 12, 89, 31)
3 .collect(partitioningBy(i -> i < 50));
This will return the following map:
1{false=[65, 77, 89], true=[45, 9, 12, 31, 12]}
As you can see, because of the Predicate
, the map will always have two elements.
And like groupingBy()
, this method has a second version that takes a downstream collector.
For example, if we want to remove duplicates, we just have to collect the elements into a Set
like this:
1Map<Boolean, Set<Integer>> map =
2 Stream.of(45, 9, 65, 77, 12, 89, 31, 12)
3 .collect(
4 partitioningBy(i -> i < 50,
5 toSet()
6 )
7 );
This will produce the following Map
:
1{false=[65, 89, 77], true=[9, 12, 45, 31]}
However, unlike groupingBy()
, there's no version that allows us to change the type of the Map
returned. However, we only need two keys for our groups.
1Set<Integer> lessThan50 = map.get(true);
2Set<Integer> moreThan50 = map.get(false);
Until now, all the examples have used sequential streams, where each element are processed one by one.
Parallel streams split the stream into multiple parts. Each part is processed by a different thread at the same time (in parallel).
Under the hood, parallel streams use the Fork/Join Framework.
This means that, by default, the number of threads available to process parallel streams equals the number of available cores in your machine's processor (CPU).
To create a parallel stream just use the parallel()
method:
1Stream<String> parallelStream =
2 Stream.of("a","b","c").parallel();
To create a parallel stream from a Collection
use the parallelStream()
method:
1List<String> list = Arrays.asList("a","b","c");
2Stream<String> parStream = list.parallelStream();
You can turn a parallel stream into a sequential one with the sequential()
method:
1stream
2 .parallel()
3 .filter(..)
4 .sequential()
5 .forEach(...);
Check if a stream is parallel with isParallel()
:
1stream.parallel().isParallel(); // true
And turn an ordered stream into an unordered one (or ensure that the stream is unordered) with unordered()
;
1stream
2 .parallel()
3 .unordered()
4 .collect(...);
But how do parallel streams work? Let's start with the simplest example:
1Stream.of("a","b","c","d","e")
2 .forEach(System.out::print);
Printing a list of elements with a sequential stream will output the expected result:
1abcde
However, when using a parallel stream:
1Stream.of("a","b","c","d","e")
2 .parallel()
3 .forEach(System.out::print);
The output can be different for each execution:
1cbade // One execution
2cebad // Another execution
3cbdea // Yet another execution
Going back to the definition of parallel streams, this output starts making sense. The differences in output can be attributed to thread processing; it is possible that a different core is involved with a particular command each time the code is executed.
Thus parallel streams are more appropriate for operations where the order of processing doesn't matter and the operations don't need to keep a state (stateless and independent operations).
An example to see this difference is the use of findFirst()
versus findAny()
.
In the first part, we mentioned that findFirst()
method returns the first element of a stream. But since we're using parallel streams, this method has to "know" which element is the first one:
1long start = System.nanoTime();
2String first = Stream.of("a","b","c","d","e")
3 .parallel().findFirst().get();
4long duration = (System.nanoTime() - start) / 1000000;
5System.out.println(
6 first + " found in " + duration + " milliseconds");
The output:
1a found in 2.436155 milliseconds
Because of that, if the order doesn't matter, it's better to use findAny()
with parallel streams:
1long start = System.nanoTime();
2String any = Stream.of("a","b","c","d","e")
3 .parallel().findAny().get();
4long duration = (System.nanoTime() - start) / 1000000;
5System.out.println(
6 any + " found in " + duration + " milliseconds");
The output:
1c found in 0.063169 milliseconds
Since a parallel stream is processed by multiple cores, it's reasonable to believe that it will be processed faster than a sequential stream. But as you can see with findFirst()
, this is not always the case.
For example:
1Stream<T> distinct()
2Stream<T> sorted()
3Stream<T> sorted(Comparator<? super T> comparator)
4Stream<T> limit(long maxSize)
5Stream<T> skip(long n)
The stateful operations above incorporate state from previously processed elements and usually need to go through the entire stream to produce a result. Thus they work better with sequential streams since they end up looking through the stream anyway.
But don't believe that by first executing the stateful operations in a sequential format and then turning the stream into a parallel one, the performance will be better in all cases. It would be worse to assume that the entire operation may run in parallel, like the following example:
1double start = System.nanoTime();
2Stream.of("b","d","a","c","e")
3 .sorted()
4 .filter(s -> {
5 System.out.println("Filter:" + s);
6 return !"d".equals(s);
7 })
8 .parallel()
9 .map(s -> {
10 System.out.println("Map:" + s);
11 return s += s;
12 })
13 .forEach(System.out::println);
14double duration = (System.nanoTime() - start) / 1_000_000;
15System.out.println(duration + " milliseconds");
One might think that the stream is sorted and filtered sequentially, but the output shows something else:
1Filter:c
2Map:c
3cc
4Filter:a
5Map:a
6aa
7Filter:b
8Map:b
9bb
10Filter:d
11Filter:e
12Map:e
13ee
1479.470779 milliseconds
Compare this with the output of the sequential version (just comment out .parallel()
):
1Filter:a
2Map:a
3aa
4Filter:b
5Map:b
6bb
7Filter:c
8Map:c
9cc
10Filter:d
11Filter:e
12Map:e
13ee
141.554562 milliseconds
Clearly, the sequential version performed better; it took 78 milliseconds less.
But if we have an independent or stateless operation, and order doesn't matter, such as with counting the number of odd numbers in a large range, the parallel version will perform better:
1double start = System.nanoTime();
2long c = IntStream.rangeClosed(0, 1_000_000_000)
3 .parallel()
4 .filter(i -> i % 2 == 0)
5 .count();
6double duration = (System.nanoTime() - start) / 1_000_000;
7System.out.println("Got " + c + " in " + duration + " milliseconds");
The parallel version output:
1Got 500000001 in 738.678448 milliseconds
The sequential version output:
1Got 500000001 in 1275.271882 milliseconds
In summary, parallel streams don't always perform better than sequential streams when it comes to stateful operations, but they usually perform better when ordering is not an issue and operations are independent and stateless.
This, the fact that parallel streams process results independently, and the idea that the order cannot be guaranteed are the most important things you need to know.
In practice, how do you know when to use sequential or parallel streams for better performance?
Here are some rules:
sorted()
) and order-based (like findFirst()
) operations. Sequential streams do just fine (if not better) in these cases.In concurrent environments, assignments are bad.
This is because if you mutate the state of variables (especially if they are shared by more than one thread), you may run into invalid states.
Consider this example, which implements the factorial of 10
in a very particular way:
1class Total {
2 public long total = 1;
3 public void multiply(long n) { total *= n; }
4}
5...
6Total t = new Total();
7LongStream.rangeClosed(1, 10)
8 .forEach(t::multiply);
9System.out.println(t.total);
Here, we are using a variable to gather the result of the factorial. The output of executing this snippet of code is:
13628800
However, when we turn the stream into a parallel one:
1LongStream.rangeClosed(1, 10)
2 .parallel()
3 .forEach(t::multiply);
Sometimes we get the correct result and other times we don't.
The problem is caused by the multiple threads accessing the variable total concurrently. Yes, we can synchronize the access to this variable, but that defeats the purpose of parallelism.
Here's where reduce()
comes in handy.
Remember that reduce()
combines the elements of a stream into a single one.
With parallel streams, this method creates intermediate values and then combines them, avoiding the ordering problem while still allowing streams to be processed in parallel by eliminating the shared state and keeping it inside the reduction process.
The only requirement is that the applied reducing operation must be associative.
This means that the operation op must follow this equality:
1(a op b) op c == a op (b op c)
Or:
1a op b op c op d == (a op b) op (c op d)
So we can evaluate (a op b) and (c op d) in parallel.
We can implement our example using parallel()
and reduce()
in this way:
1long tot = LongStream.rangeClosed(1, 10)
2 .parallel()
3 .reduce(1, (a,b) -> a*b);
4System.out.println(tot);
When we execute this snippet of code, it produces the correct result every time (3628800
). Reduce guaranteed that the threads would not access the same stream entries simultaneously and throw off the results.
Plus, if we time the execution of the first snippet and this last one, we can see a drastic improvement in performance.
We can safely use collect()
with parallel streams if we follow the same requirements of associativity and identity. (For example, combining any partially accumulated result with an empty result container must produce an equivalent result.)
Or, if we are grouping with the Collectors
class and ordering is not important, we can use the method groupingByConcurrent()
, the concurrent version of groupingBy()
.
If you understand when to use parallel streams and the issues associated with concurrent execution, you should be ready to use parallel streams in practice!
We touched the most important parts of the Stream interface. I hope you find streams useful. Please post all your comments and feedback in the discussion section below. Thanks for reading.