Java 8 Streams - Mutable Reduction

[Updated: Apr 29, 2017, Created: Nov 26, 2016]

Mutable reductions collect the desired results into a mutable container object such as a java.util.Collection or an array.

Mutable reduction in Java stream API are implemented as collect() methods.

reduce() methods which we discussed in the last tutorial are immutable reduction, as they reduce the result into a single valued immutable variable.



reduce() vs collect()

Both terminal operations, reduce() and collect(), are categorized as reduction operations.

In collect() operations, elements are incorporated by updating the state of a mutable container object.

In reduce() operations result is updated by replacing the previous result.



Stream#collect() methods

(1) <R> R collect(Supplier<R> supplier,
                  BiConsumer<R,? super T> accumulator,
                  BiConsumer<R,R> combiner)

This method reduces stream element of type T to a mutable result container of type R.

supplier: this function creates a a new result container. In sequential execution it's called only once, whereas, for parallel execution, it may be called multiple times to get a new instance for different parallel threads.

accumulator: an associative function to incorporate the current element to the result object (the result object is created in supplier function)

combiner: in parallel execution this function combines the results received from different threads. This must be associative function.


Following example uses StringBuilder as the mutable container to concatenate the strings:

  List<String> list = Arrays.asList("Mike", "Nicki", "John");
  String s = list.stream().collect(StringBuilder::new,
                            (sb, s1) -> sb.append(" ").append(s1),
                            (sb1, sb2) -> sb1.append(sb2.toString())).toString();
  System.out.println(s);

Output

 Mike Nicki John

Here's the reduce() version of same concatenation operation:

  List<String> list = Arrays.asList("Mike", "Nicki", "John");
  String s = list.stream().reduce("", (s1, s2) -> s1 + " " + s2);
  System.out.println(s);

Output

 Mike Nicki John

The reduce() operation in an example like above will be less efficient in performance than collect() operation, specially for large number of stream elements. That's because collect() creates only one instance of the container object rather than creating a new one for each iteration.



Other variants:

Class Method

IntStream

<R> R collect(Supplier<R> supplier,
              ObjIntConsumer<R> accumulator,
              BiConsumer<R,R> combiner)

LongStream

<R> R collect(Supplier<R> supplier,
              ObjLongConsumer<R> accumulator,
              BiConsumer<R,R> combiner)

DoubleStream

<R> R collect(Supplier<R> supplier,
              ObjDoubleConsumer<R> accumulator,
              BiConsumer<R,R> combiner)

Following is an IntStream example to collect multiples of 10 into ArrayList:

public class MutableReductionExample2 {
    public static void main (String[] args) {
       IntStream stream = IntStream.range(1, 100);
       List<Integer> list = stream.parallel()
                                  .filter(i -> i % 10 == 0)
                                  .collect(ArrayList::new, ArrayList::add
                                                      , ArrayList::addAll);
       System.out.println(list);
    }
}

Output

[10, 20, 30, 40, 50, 60, 70, 80, 90]

(2) <R,A> R collect(Collector<? super T,A,R> collector)

Collector interface: encapsulates the same arguments used by the method collect(Supplier, BiConsumer, BiConsumer) which we discussed above, plus an optional finisher() method for final type conversion form A to R and a method characteristics() for indicating the collector properties.

package java.util.stream;

 //imports
public interface Collector<T, A, R> {

    Supplier<A> supplier();
    BiConsumer<A, T> accumulator();
    BinaryOperator<A> combiner();
    Function<A, R> finisher();
    Set<Characteristics> characteristics();

    //some static methods

    enum Characteristics {
      CONCURRENT,
      UNORDERED,
      IDENTITY_FINISH
    }
}

Note that 'A' type of this interface is actually 'R' type of the method collect(Supplier, BiConsumer, BiConsumer) which we discussed in (1). So to be clear:

 T => the underlying stream type,
 A => the accumulator mutable container returned from the supplier,
 R => the final type which returns from the finisher() call so does from Stream#collect() call.

Using this interface also allows to reuse various collect operations, which includes factory creation of the collectors by the class Collectors.


Implementing Collector

To understand Collector interface clearly, let's implement a collector for our first example of StringBuilder above where we did this:

List<String> list = Arrays.asList("Mike", "Nicki", "John");
String s = list.stream().collect(StringBuilder::new,
                            (sb, s1) -> sb.append(" ").append(s1),
                            (sb1, sb2) -> sb1.append(sb2.toString())).toString();

The equivalent Collector implementation:

public class CollectorExample {
    public static void main (String[] args) {
        String s = Stream.of("Mike", "Nicki", "John").collect(new
                            MyCollector());
        System.out.println(s);
    }

    private static class MyCollector implements
                        Collector<String, StringBuilder, String> {

        @Override
        public Supplier<StringBuilder> supplier () {
            return StringBuilder::new;
        }

        @Override
        public BiConsumer<StringBuilder, String> accumulator () {
            return (sb, s) -> sb.append(" ").append(s);
        }

        @Override
        public BinaryOperator<StringBuilder> combiner () {
            return StringBuilder::append;
        }

        @Override
        public Function<StringBuilder, String> finisher () {
            return stringBuilder -> stringBuilder.toString();
        }

        @Override
        public Set<Characteristics> characteristics () {
            return Collections.emptySet();
        }
    }
}

Output

Mike Nicki John

What is Collector#characteristics() method:

The set of characteristics specified by the collector implementation is to provide followings one or more optimization hints to the processing stream pipeline.

  1. Characteristics.CONCURRENT

    If a collector has this characteristic and we have parallel() pipeline, then followings points are important to understand:

    1. The supplier function is called only once and a single shared instance for 'A' (the mutable result container) will be used in the accumulator() function by the multiple threads. In our last example that would be StringBuilder instance.
    2. We have to provide a thread-safe instance for 'A' because multiple threads will be updating it in a parallel stream. In our last example, we probably have to replace StringBuilder with StringBuffer.
    3. The combiner function will not be used to combine multiple result containers, because we don't have multiple of them, a single shared instance of result container has been updated in accumulator() directly by multiple threads.
    4. This hint may be ignored. The current stream implementation has only one such scenario:
      In the case of ordered stream (having Spliterator#ORDERED characteristic) if multiple threads are updating the shared accumulator container, the order in which results are updated, will be non-deterministic. To avoid that the pipeline will ignore CONCURRENT characteristic for the ordered source unless it has UNORDERED characteristic (next discussion) as well.
      See also: Encounter order tutorial

    Let's rewrite our last example and use CONCURRENT characteristic. We are going replace StringBuilder with StringBuffer and going to specify CONCURRENT. We will do that for unordered and then with ordered stream source one by one:

     public class CollectorExample2 {
       public static void main (String[] args) {
           String s = Stream.of("Mike", "Nicki", "John")
                            .parallel()
                            .unordered()
                            .collect(new MyCollector());
           System.out.println(s);
       }
    
       private static class MyCollector implements
                           Collector<String, StringBuffer, String> {
    
           @Override
           public Supplier<StringBuffer> supplier () {
               return () -> {
                   System.out.println("supplier call");
                   return new StringBuffer();
               };
           }
    
           @Override
           public BiConsumer<StringBuffer, String> accumulator () {
               return (sb, s) -> {
                   System.out.println("accumulator function call,"
                                       + "accumulator container: "
                                       + System.identityHashCode(sb)
                                       + " thread: "
                                       + Thread.currentThread().getName()
                                       + ", processing: " + s);
                   sb.append(" ").append(s);
               };
           }
    
           @Override
           public BinaryOperator<StringBuffer> combiner () {
               return (stringBuilder, s) -> {
                   System.out.println("combiner function call");
                   return stringBuilder.append(s);
               };
           }
    
           @Override
           public Function<StringBuffer, String> finisher () {
               return stringBuilder -> stringBuilder.toString();
           }
    
           @Override
           public Set<Characteristics> characteristics () {
               // return Collections.emptySet();
               return EnumSet.of(Collector.Characteristics.CONCURRENT);
           }
       }
     }
      

    Output

    supplier call
    accumulator function call, accumulator container: 1791741888 thread: main, processing: Nicki
    accumulator function call, accumulator container: 1791741888 thread: ForkJoinPool.commonPool-worker-2, processing: John
    accumulator function call, accumulator container: 1791741888 thread: ForkJoinPool.commonPool-worker-1, processing: Mike
     Nicki John Mike
    

    It's clear that supplier function was called only once even for parallel stream. Let's modify the example a little and return empty set from characteristics() method, the rest is unchanged. This time we will have this output:

    supplier call
    supplier call
    supplier call
    accumulator function call, accumulator container: 668386784 thread: main, processing: Nicki
    accumulator function call, accumulator container: 1697408497 thread: ForkJoinPool.commonPool-worker-1, processing: Mike
    accumulator function call, accumulator container: 1014427870 thread: ForkJoinPool.commonPool-worker-2, processing: John
    combiner function call
    combiner function call
     Mike Nicki John
    

    Now let's remove unordered() from the pipeline and put back the CONCURRENT characteristic

    package com.logicbig.example;
    
    import java.util.Collections;
    import java.util.EnumSet;
    import java.util.Set;
    import java.util.function.BiConsumer;
    import java.util.function.BinaryOperator;
    import java.util.function.Function;
    import java.util.function.Supplier;
    import java.util.stream.Collector;
    import java.util.stream.Stream;
    
    public class CollectorExample2 {
        public static void main (String[] args) {
            String s = Stream.of("Mike", "Nicki", "John")
                             .parallel()
                          //   .unordered()
                             .collect(new MyCollector());
            System.out.println(s);
        }
    
        private static class MyCollector implements
                            Collector<String, StringBuffer, String> {
    
            ....
            //the rest is the same
    
            @Override
            public Set<Characteristics> characteristics () {
                //  return Collections.emptySet();
                return EnumSet.of(Collector.Characteristics.CONCURRENT);
            }
        }
    }
      

    Output

    supplier call
    supplier call
    supplier call
    accumulator function call, accumulator container: 1897705563 thread: ForkJoinPool.commonPool-worker-2, processing: Mike
    accumulator function call, accumulator container: 1329552164 thread: main, processing: Nicki
    accumulator function call, accumulator container: 1674443221 thread: ForkJoinPool.commonPool-worker-1, processing: John
    combiner function call
    combiner function call
     Mike Nicki John
    

    It proves that Characteristics.CONCURRENT is ignored for ordered stream source.



    When/why we should use CONCURRENT characteristic?

    It is a common mistake to think that for a mutable container like ArrayList we should use CONCURRENT characteristic. No, that's not a deciding factor because even though ArrayList is not thread-safe, one instance of this mutable container will normally be used by only one thread in parallel execution.
    So when we should use CONCURRENT? One scenario (as stated by the API docs) is:
    It can be expensive to merge multiple results containers from multiple threads into one final result (normally done in combiner() function) e.g. merging multiple HashMap instances by keys. The performance can be improved by specifying CONCURRENT characteristic and replacing HashMap with ConcurrentHashMap. Also remember this performance improvement is only possible for unordered stream sources.

  2. Characteristics.UNORDERED

    This characteristic declares that this Collector instance doesn't want to respect encountered order and the caller (which of course is stream pipeline) is enforced to ignore any order it has. This is particularly important if we have also specified Characteristics.CONCURRENT and don't want stream source's encounter order to be deciding factor to ignore CONCURRENT characteristic.

    Let's modify our last example once more. We are going to remove unordered() from the stream pipeline and going to include Characteristics.UNORDERED along with Characteristics.CONCURRENT in our collector implementation

    public class CollectorExample3 {
        public static void main (String[] args) {
            String s = Stream.of("Mike", "Nicki", "John")
                             .parallel()
                             .collect(new MyCollector());
            System.out.println(s);
        }
    
        private static class MyCollector implements
                            Collector<String, StringBuffer, String> {
    
            ....
            // rest is same
    
            @Override
            public Set<Characteristics> characteristics () {
                //   return Collections.emptySet();
                return EnumSet.of(Collector.Characteristics.CONCURRENT
                                    , Characteristics.UNORDERED);
            }
        }
    }
      

    Output

    supplier call
    accumulator function call, accumulator container: 1791741888 thread: main, processing: Nicki
    accumulator function call, accumulator container: 1791741888 thread: ForkJoinPool.commonPool-worker-2, processing: John
    accumulator function call, accumulator container: 1791741888 thread: ForkJoinPool.commonPool-worker-1, processing: Mike
     Nicki John Mike
    

    This time only one instance of the mutable container is used even we have ordered stream.


  3. Characteristics.IDENTITY_FINISH

    Indicates that the finisher function is the identity function and can be skipped by the pipeline. Specifying this characteristic, we tell the pipeline that we are not performing a final transformation and finisher function should not even be invoked. With this, our Collector implementation becomes equivalent to using three parameters version of collect method i.e. collect(Supplier, BiConsumer, BiConsumer)

    package com.logicbig.example;
    
    import java.util.ArrayList;
    import java.util.EnumSet;
    import java.util.List;
    import java.util.Set;
    import java.util.function.BiConsumer;
    import java.util.function.BinaryOperator;
    import java.util.function.Function;
    import java.util.function.Supplier;
    import java.util.stream.Collector;
    import java.util.stream.Stream;
    
    public class CollectorExample4 {
        public static void main (String[] args) {
            List<String> s = Stream.of("Mike", "Nicki", "John")
                                   .parallel()
                                   .collect(new MyCollector());
            System.out.println(s);
        }
    
        private static class MyCollector implements
                            Collector<String, List<String>, List<String>> {
    
    
            @Override
            public Supplier<List<String>> supplier () {
                return ArrayList::new;
            }
    
            @Override
            public BiConsumer<List<String>, String> accumulator () {
                return List::add;
            }
    
            @Override
            public BinaryOperator<List<String>> combiner () {
                return (list, list2) -> {
                    list.addAll(list2);
                    return list;
                };
            }
    
            @Override
            public Function<List<String>, List<String>> finisher () {
                return null;
            }
    
            @Override
            public Set<Characteristics> characteristics () {
                //   return Collections.emptySet();
                return EnumSet.of(Characteristics.IDENTITY_FINISH);
            }
        }
    }
      

    Output

     [Mike, Nicki, John]
      

    If characteristic() returns empty set, in the same example:

      Exception in thread "main" java.lang.NullPointerException
    	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:503)
    	at com.logicbig.example.CollectorExample4.main(CollectorExample4.java:15)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
      


Collectors class

Collectors class defines various useful factory creation of Collector instances which includes converting to collections and maps, aggregation, grouping, partitioning etc.

Browse the Collectors examples here. They are also included under the package com.logicbig.example.collectors in the project browser below.


Example project

Dependencies and Technologies Used :

  • JDK 1.8
  • Maven 3.3.9

Stream Collect Examples Select All Download
  • streams-collect-examples
    • src
      • main
        • java
          • com
            • logicbig
              • example

See Also