Close

Reactor - Transforming into Publisher and Maintaining the source order with flatMapSequential()

[Last Updated: Aug 11, 2020]

The FlatMapSequential() methods of Flux class transforms the elements into Publishers, then flatten these inner publishers into a single Flux, but merge them in the order of their source element.

public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper)

With max concurrency (see also flatMap()):

public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
                                           int maxConcurrency

With max concurrency and prefetch (see also flatMap()):

public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
                                           int maxConcurrency,
                                           int prefetch)

Examples

public class Example1FlatMapSequentialAndParallelism {
    public static void main(String[] args) {
        System.out.println("-- flatMapSequential without parallelism --");
        Flux.just(1, 5, 9)
            .flatMapSequential(integer -> {
                System.out.println("-- starting: " + integer + " --");
                return Flux.range(integer, 4);
            })
            .subscribe(x ->
                    System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));

        System.out.println("-- flatMapSequential with parallelism --");
        Flux.just(1, 5, 9)
            .flatMapSequential(integer -> {
                System.out.println("-- starting: " + integer + " --");
                return Flux.range(integer, 4)
                           .subscribeOn(Schedulers.newParallel("myThread", 8));
            })
            .subscribe(x ->
                    System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
    }
}
-- flatMapSequential without parallelism --
-- starting: 1 --
1 - main - 21:22:58.114
2 - main - 21:22:58.129
3 - main - 21:22:58.129
4 - main - 21:22:58.129
-- starting: 5 --
5 - main - 21:22:58.129
6 - main - 21:22:58.129
7 - main - 21:22:58.129
8 - main - 21:22:58.129
-- starting: 9 --
9 - main - 21:22:58.129
10 - main - 21:22:58.129
11 - main - 21:22:58.129
12 - main - 21:22:58.129
-- flatMapSequential with parallelism --
-- starting: 1 --
-- starting: 5 --
-- starting: 9 --
1 - myThread-1 - 21:22:58.160
2 - myThread-1 - 21:22:58.160
3 - myThread-1 - 21:22:58.160
4 - myThread-1 - 21:22:58.160
5 - myThread-1 - 21:22:58.160
6 - myThread-1 - 21:22:58.160
7 - myThread-1 - 21:22:58.160
8 - myThread-1 - 21:22:58.160
9 - myThread-3 - 21:22:58.160
10 - myThread-3 - 21:22:58.160
11 - myThread-3 - 21:22:58.160
12 - myThread-3 - 21:22:58.160
Comparing with flatMap()
public class Example2FlatMapAndParallelism {
    public static void main(String[] args) {
        System.out.println("-- flatMap without parallelism --");
        Flux.just(1, 5, 9)
            .flatMap(integer -> {
                System.out.println("-- starting: " + integer + " --");
                return Flux.range(integer, 4);
            })
            .subscribe(x ->
                    System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));

        System.out.println("-- flatMap with parallelism --");
        Flux.just(1, 5, 9)
            .flatMap(integer -> {
                System.out.println("-- starting: " + integer + " --");
                return Flux.range(integer, 4)
                           .subscribeOn(Schedulers.newParallel("myThread", 8));
            })
            .subscribe(x ->
                    System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
    }
}
-- flatMap without parallelism --
-- starting: 1 --
1 - main - 21:24:12.789
2 - main - 21:24:12.789
3 - main - 21:24:12.789
4 - main - 21:24:12.789
-- starting: 5 --
5 - main - 21:24:12.789
6 - main - 21:24:12.789
7 - main - 21:24:12.789
8 - main - 21:24:12.789
-- starting: 9 --
9 - main - 21:24:12.789
10 - main - 21:24:12.789
11 - main - 21:24:12.789
12 - main - 21:24:12.789
-- flatMap with parallelism --
-- starting: 1 --
-- starting: 5 --
-- starting: 9 --
1 - myThread-1 - 21:24:12.820
2 - myThread-1 - 21:24:12.820
3 - myThread-1 - 21:24:12.820
4 - myThread-1 - 21:24:12.820
9 - myThread-3 - 21:24:12.820
10 - myThread-3 - 21:24:12.820
11 - myThread-3 - 21:24:12.820
12 - myThread-3 - 21:24:12.820
5 - myThread-2 - 21:24:12.820
6 - myThread-2 - 21:24:12.820
7 - myThread-2 - 21:24:12.820
8 - myThread-2 - 21:24:12.820

With concurrency:

concurrency = 1
public class Example3FlatMapSequentialAndConcurrency {
    public static void main(String[] args) {
        System.out.println("-- flatMapSequential with parallelism and  concurrency --");
        Flux.just(1, 5, 9)
            .flatMapSequential(integer -> {
                System.out.println("-- starting: " + integer + " --");
                return Flux.range(integer, 4)
                           .subscribeOn(Schedulers.newParallel("myThread", 8));
            }, 1)
            .subscribe(x -> System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
    }
}
-- flatMapSequential with parallelism and  concurrency --
-- starting: 1 --
1 - myThread-1 - 21:25:40.432
2 - myThread-1 - 21:25:40.448
3 - myThread-1 - 21:25:40.448
4 - myThread-1 - 21:25:40.448
-- starting: 5 --
5 - myThread-2 - 21:25:40.448
6 - myThread-2 - 21:25:40.448
7 - myThread-2 - 21:25:40.448
8 - myThread-2 - 21:25:40.448
-- starting: 9 --
9 - myThread-3 - 21:25:40.448
10 - myThread-3 - 21:25:40.448
11 - myThread-3 - 21:25:40.448
12 - myThread-3 - 21:25:40.448
concurrency = 2
public class Example4FlatMapSequentialAndConcurrency {
    public static void main(String[] args) {
        System.out.println("-- flatMapSequential with parallelism and  concurrency --");
        Flux.just(1, 5, 9)
            .flatMapSequential(integer -> {
                System.out.println("-- starting: " + integer + " --");
                return Flux.range(integer, 4)
                           .subscribeOn(Schedulers.newParallel("myThread", 8));
            }, 2)
            .subscribe(x -> System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
    }
}
-- flatMapSequential with parallelism and  concurrency --
-- starting: 1 --
-- starting: 5 --
1 - myThread-2 - 21:26:21.442
2 - myThread-2 - 21:26:21.442
3 - myThread-2 - 21:26:21.442
4 - myThread-2 - 21:26:21.442
-- starting: 9 --
5 - myThread-2 - 21:26:21.442
6 - myThread-2 - 21:26:21.442
7 - myThread-2 - 21:26:21.442
8 - myThread-2 - 21:26:21.442
9 - myThread-3 - 21:26:21.442
10 - myThread-3 - 21:26:21.442
11 - myThread-3 - 21:26:21.442
12 - myThread-3 - 21:26:21.442

With concurrency and prefetch

public class Example5FlatMapSequentialConcurrencyAndPrefetch {
    public static void main(String[] args) {
        System.out.println("-- flatMapSequential with parallelism, concurrency and prefetch --");
        Flux.just(1, 5, 9)
            .flatMapSequential(integer -> {
                System.out.println("-- starting: " + integer + " --");
                return Flux.range(integer, 4)
                           .subscribeOn(Schedulers.newParallel("myThread", 8));
            }, 1, 4)
            .subscribe(x -> System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
    }
}
-- flatMapSequential with parallelism, concurrency and prefetch --
-- starting: 1 --
1 - myThread-1 - 21:28:42.495
2 - myThread-1 - 21:28:42.511
3 - myThread-1 - 21:28:42.511
4 - myThread-1 - 21:28:42.511
-- starting: 5 --
5 - myThread-2 - 21:28:42.511
6 - myThread-2 - 21:28:42.511
7 - myThread-2 - 21:28:42.511
8 - myThread-2 - 21:28:42.511
-- starting: 9 --
9 - myThread-3 - 21:28:42.511
10 - myThread-3 - 21:28:42.511
11 - myThread-3 - 21:28:42.511
12 - myThread-3 - 21:28:42.511

Example Project

Dependencies and Technologies Used:

  • reactor-core 3.3.3.RELEASE: Non-Blocking Reactive Foundation for the JVM.
  • JDK 8
  • Maven 3.5.4

Reactor - flatMapSequential() example Select All Download
  • reactor-flat-map-sequential-operation
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • Example1FlatMapSequentialAndParallelism.java

    See Also