Mutable reductions collect the desired results into a mutable container object such as a java.util.Collection or an array.
Mutable reduction in Java stream API are implemented as collect() methods.
reduce() methods which we discussed in the last tutorial are immutable reduction, as they reduce the result into a single valued immutable variable.
reduce() vs collect()
Both terminal operations, reduce() and collect(), are categorized as reduction operations.
In collect() operations, elements are incorporated by updating the state of a mutable container object.
In reduce() operations result is updated by replacing the previous result.
Stream#collect() methods
(1) <R> R collect(Supplier<R> supplier,
BiConsumer<R,? super T> accumulator,
BiConsumer<R,R> combiner)
This method reduces stream element of type T to a mutable result container of type R.
supplier: this function creates a a new result container. In sequential execution it's called only once, whereas, for parallel execution, it may be called multiple times to get a new instance for different parallel threads.
accumulator: an associative function to incorporate the current element to the result object (the result object is created in supplier function)
combiner: in parallel execution this function combines the results received from different threads. This must be associative function.
Following example uses StringBuilder as the mutable container to concatenate the strings:
List<String> list = Arrays.asList("Mike", "Nicki", "John");
String s = list.stream().collect(StringBuilder::new,
(sb, s1) -> sb.append(" ").append(s1),
(sb1, sb2) -> sb1.append(sb2.toString())).toString();
System.out.println(s);
Output
Mike Nicki John
Here's the reduce() version of same concatenation operation:
List<String> list = Arrays.asList("Mike", "Nicki", "John");
String s = list.stream().reduce("", (s1, s2) -> s1 + " " + s2);
System.out.println(s);
Output
Mike Nicki John
The reduce() operation in an example like above will be less efficient in performance than collect() operation, specially for large number of stream elements. That's because collect() creates only one instance of the container object rather than creating a new one for each iteration.
Other variants:
Class |
Method |
IntStream |
<R> R collect(Supplier<R> supplier,
ObjIntConsumer<R> accumulator,
BiConsumer<R,R> combiner) |
LongStream |
<R> R collect(Supplier<R> supplier,
ObjLongConsumer<R> accumulator,
BiConsumer<R,R> combiner) |
DoubleStream |
<R> R collect(Supplier<R> supplier,
ObjDoubleConsumer<R> accumulator,
BiConsumer<R,R> combiner) |
Following is an IntStream example to collect multiples of 10 into ArrayList:
public class MutableReductionExample2 {
public static void main (String[] args) {
IntStream stream = IntStream.range(1, 100);
List<Integer> list = stream.parallel()
.filter(i -> i % 10 == 0)
.collect(ArrayList::new, ArrayList::add
, ArrayList::addAll);
System.out.println(list);
}
}
Output
[10, 20, 30, 40, 50, 60, 70, 80, 90]
(2) <R,A> R collect(Collector<? super T,A,R> collector)
Collector interface: encapsulates the same arguments used by the method collect(Supplier, BiConsumer, BiConsumer) which we discussed above, plus an optional finisher() method for final type conversion form A to R and a method characteristics() for indicating the collector properties.
package java.util.stream;
//imports
public interface Collector<T, A, R> {
Supplier<A> supplier();
BiConsumer<A, T> accumulator();
BinaryOperator<A> combiner();
Function<A, R> finisher();
Set<Characteristics> characteristics();
//some static methods
enum Characteristics {
CONCURRENT,
UNORDERED,
IDENTITY_FINISH
}
}
Note that 'A' type of this interface is actually 'R' type of the method collect(Supplier, BiConsumer, BiConsumer) which we discussed in (1). So to be clear:
T => the underlying stream type,
A => the accumulator mutable container returned from the supplier,
R => the final type which returns from the finisher() call so does from Stream#collect() call.
Using this interface also allows to reuse various collect operations, which includes factory creation of the collectors by the class Collectors.
Implementing Collector
To understand Collector interface clearly, let's implement a collector for our first example of StringBuilder above where we did this:
List<String> list = Arrays.asList("Mike", "Nicki", "John");
String s = list.stream().collect(StringBuilder::new,
(sb, s1) -> sb.append(" ").append(s1),
(sb1, sb2) -> sb1.append(sb2.toString())).toString();
The equivalent Collector implementation:
public class CollectorExample {
public static void main (String[] args) {
String s = Stream.of("Mike", "Nicki", "John").collect(new
MyCollector());
System.out.println(s);
}
private static class MyCollector implements
Collector<String, StringBuilder, String> {
@Override
public Supplier<StringBuilder> supplier () {
return StringBuilder::new;
}
@Override
public BiConsumer<StringBuilder, String> accumulator () {
return (sb, s) -> sb.append(" ").append(s);
}
@Override
public BinaryOperator<StringBuilder> combiner () {
return StringBuilder::append;
}
@Override
public Function<StringBuilder, String> finisher () {
return stringBuilder -> stringBuilder.toString();
}
@Override
public Set<Characteristics> characteristics () {
return Collections.emptySet();
}
}
}
Output
Mike Nicki John
What is Collector#characteristics() method:
The set of characteristics specified by the collector implementation is to provide followings one or more optimization hints to the processing stream pipeline.
Characteristics.CONCURRENT If a collector has this characteristic and we have parallel() pipeline, then followings points are important to understand:
- The supplier function is called only once and a single shared instance for 'A' (the mutable result container) will be used in the accumulator() function by the multiple threads. In our last example that would be
StringBuilder instance.
- We have to provide a thread-safe instance for 'A' because multiple threads will be updating it in a parallel stream. In our last example, we probably have to replace
StringBuilder with StringBuffer .
- The combiner function will not be used to combine multiple result containers, because we don't have multiple of them, a single shared instance of result container has been updated in accumulator() directly by multiple threads.
- This hint may be ignored. The current stream implementation has only one such scenario:
In the case of ordered stream (having Spliterator#ORDERED characteristic) if multiple threads are updating the shared accumulator container, the order in which results are updated, will be non-deterministic. To avoid that the pipeline will ignore CONCURRENT characteristic for the ordered source unless it has UNORDERED characteristic (next discussion) as well. See also: Encounter order tutorial
Let's rewrite our last example and use CONCURRENT characteristic. We are going replace StringBuilder with StringBuffer and going to specify CONCURRENT. We will do that for unordered and then with ordered stream source one by one:
public class CollectorExample2 {
public static void main (String[] args) {
String s = Stream.of("Mike", "Nicki", "John")
.parallel()
.unordered()
.collect(new MyCollector());
System.out.println(s);
}
private static class MyCollector implements
Collector<String, StringBuffer, String> {
@Override
public Supplier<StringBuffer> supplier () {
return () -> {
System.out.println("supplier call");
return new StringBuffer();
};
}
@Override
public BiConsumer<StringBuffer, String> accumulator () {
return (sb, s) -> {
System.out.println("accumulator function call,"
+ "accumulator container: "
+ System.identityHashCode(sb)
+ " thread: "
+ Thread.currentThread().getName()
+ ", processing: " + s);
sb.append(" ").append(s);
};
}
@Override
public BinaryOperator<StringBuffer> combiner () {
return (stringBuilder, s) -> {
System.out.println("combiner function call");
return stringBuilder.append(s);
};
}
@Override
public Function<StringBuffer, String> finisher () {
return stringBuilder -> stringBuilder.toString();
}
@Override
public Set<Characteristics> characteristics () {
// return Collections.emptySet();
return EnumSet.of(Collector.Characteristics.CONCURRENT);
}
}
}
Output
supplier call
accumulator function call, accumulator container: 1791741888 thread: main, processing: Nicki
accumulator function call, accumulator container: 1791741888 thread: ForkJoinPool.commonPool-worker-2, processing: John
accumulator function call, accumulator container: 1791741888 thread: ForkJoinPool.commonPool-worker-1, processing: Mike
Nicki John Mike
It's clear that supplier function was called only once even for parallel stream. Let's modify the example a little and return empty set from characteristics() method, the rest is unchanged. This time we will have this output:
supplier call
supplier call
supplier call
accumulator function call, accumulator container: 668386784 thread: main, processing: Nicki
accumulator function call, accumulator container: 1697408497 thread: ForkJoinPool.commonPool-worker-1, processing: Mike
accumulator function call, accumulator container: 1014427870 thread: ForkJoinPool.commonPool-worker-2, processing: John
combiner function call
combiner function call
Mike Nicki John
Now let's remove unordered() from the pipeline and put back the CONCURRENT characteristic
package com.logicbig.example;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Stream;
public class CollectorExample2 {
public static void main (String[] args) {
String s = Stream.of("Mike", "Nicki", "John")
.parallel()
// .unordered()
.collect(new MyCollector());
System.out.println(s);
}
private static class MyCollector implements
Collector<String, StringBuffer, String> {
....
//the rest is the same
@Override
public Set<Characteristics> characteristics () {
// return Collections.emptySet();
return EnumSet.of(Collector.Characteristics.CONCURRENT);
}
}
}
Output
supplier call
supplier call
supplier call
accumulator function call, accumulator container: 1897705563 thread: ForkJoinPool.commonPool-worker-2, processing: Mike
accumulator function call, accumulator container: 1329552164 thread: main, processing: Nicki
accumulator function call, accumulator container: 1674443221 thread: ForkJoinPool.commonPool-worker-1, processing: John
combiner function call
combiner function call
Mike Nicki John
It proves that Characteristics.CONCURRENT is ignored for ordered stream source.
When/why we should use CONCURRENT characteristic? It is a common mistake to think that for a mutable container like ArrayList we should use CONCURRENT characteristic. No, that's not a deciding factor because even though ArrayList is not thread-safe, one instance of this mutable container will normally be used by only one thread in parallel execution. So when we should use CONCURRENT? One scenario (as stated by the API docs) is: It can be expensive to merge multiple results containers from multiple threads into one final result (normally done in combiner() function) e.g. merging multiple HashMap instances by keys. The performance can be improved by specifying CONCURRENT characteristic and replacing HashMap with ConcurrentHashMap. Also remember this performance improvement is only possible for unordered stream sources.
Characteristics.UNORDERED This characteristic declares that this Collector instance doesn't want to respect encountered order and the caller (which of course is stream pipeline) is enforced to ignore any order it has. This is particularly important if we have also specified Characteristics.CONCURRENT and don't want stream source's encounter order to be deciding factor to ignore CONCURRENT characteristic. Let's modify our last example once more. We are going to remove unordered() from the stream pipeline and going to include Characteristics.UNORDERED along with Characteristics.CONCURRENT in our collector implementation
public class CollectorExample3 {
public static void main (String[] args) {
String s = Stream.of("Mike", "Nicki", "John")
.parallel()
.collect(new MyCollector());
System.out.println(s);
}
private static class MyCollector implements
Collector<String, StringBuffer, String> {
....
// rest is same
@Override
public Set<Characteristics> characteristics () {
// return Collections.emptySet();
return EnumSet.of(Collector.Characteristics.CONCURRENT
, Characteristics.UNORDERED);
}
}
}
Output
supplier call
accumulator function call, accumulator container: 1791741888 thread: main, processing: Nicki
accumulator function call, accumulator container: 1791741888 thread: ForkJoinPool.commonPool-worker-2, processing: John
accumulator function call, accumulator container: 1791741888 thread: ForkJoinPool.commonPool-worker-1, processing: Mike
Nicki John Mike
This time only one instance of the mutable container is used even we have ordered stream.
Characteristics.IDENTITY_FINISH Indicates that the finisher function is the identity function and can be skipped by the pipeline. Specifying this characteristic, we tell the pipeline that we are not performing a final transformation and finisher function should not even be invoked. With this, our Collector implementation becomes equivalent to using three parameters version of collect method i.e. collect(Supplier, BiConsumer, BiConsumer)
package com.logicbig.example;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Stream;
public class CollectorExample4 {
public static void main (String[] args) {
List<String> s = Stream.of("Mike", "Nicki", "John")
.parallel()
.collect(new MyCollector());
System.out.println(s);
}
private static class MyCollector implements
Collector<String, List<String>, List<String>> {
@Override
public Supplier<List<String>> supplier () {
return ArrayList::new;
}
@Override
public BiConsumer<List<String>, String> accumulator () {
return List::add;
}
@Override
public BinaryOperator<List<String>> combiner () {
return (list, list2) -> {
list.addAll(list2);
return list;
};
}
@Override
public Function<List<String>, List<String>> finisher () {
return null;
}
@Override
public Set<Characteristics> characteristics () {
// return Collections.emptySet();
return EnumSet.of(Characteristics.IDENTITY_FINISH);
}
}
}
Output
[Mike, Nicki, John]
If characteristic() returns empty set, in the same example:
Exception in thread "main" java.lang.NullPointerException
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:503)
at com.logicbig.example.CollectorExample4.main(CollectorExample4.java:15)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Collectors class
Collectors class defines various useful factory creation of Collector instances which includes converting to collections and maps, aggregation, grouping, partitioning etc.
Browse the Collectors examples here. They are also included under the package com.logicbig.example.collectors in the project browser below.
Example project
Dependencies and Technologies Used:
|
|