The FlatMapSequential() methods of Flux class transforms the elements into Publishers, then flatten these inner publishers into a single Flux, but merge them in the order of their source element.
public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper)
With max concurrency (see also flatMap()):
public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency
With max concurrency and prefetch (see also flatMap()):
public final <R> Flux<R> flatMapSequential(Function<? super T,? extends Publisher<? extends R>> mapper,
int maxConcurrency,
int prefetch)
Examples
public class Example1FlatMapSequentialAndParallelism {
public static void main(String[] args) {
System.out.println("-- flatMapSequential without parallelism --");
Flux.just(1, 5, 9)
.flatMapSequential(integer -> {
System.out.println("-- starting: " + integer + " --");
return Flux.range(integer, 4);
})
.subscribe(x ->
System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
System.out.println("-- flatMapSequential with parallelism --");
Flux.just(1, 5, 9)
.flatMapSequential(integer -> {
System.out.println("-- starting: " + integer + " --");
return Flux.range(integer, 4)
.subscribeOn(Schedulers.newParallel("myThread", 8));
})
.subscribe(x ->
System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
}
}
-- flatMapSequential without parallelism --
-- starting: 1 --
1 - main - 21:22:58.114
2 - main - 21:22:58.129
3 - main - 21:22:58.129
4 - main - 21:22:58.129
-- starting: 5 --
5 - main - 21:22:58.129
6 - main - 21:22:58.129
7 - main - 21:22:58.129
8 - main - 21:22:58.129
-- starting: 9 --
9 - main - 21:22:58.129
10 - main - 21:22:58.129
11 - main - 21:22:58.129
12 - main - 21:22:58.129
-- flatMapSequential with parallelism --
-- starting: 1 --
-- starting: 5 --
-- starting: 9 --
1 - myThread-1 - 21:22:58.160
2 - myThread-1 - 21:22:58.160
3 - myThread-1 - 21:22:58.160
4 - myThread-1 - 21:22:58.160
5 - myThread-1 - 21:22:58.160
6 - myThread-1 - 21:22:58.160
7 - myThread-1 - 21:22:58.160
8 - myThread-1 - 21:22:58.160
9 - myThread-3 - 21:22:58.160
10 - myThread-3 - 21:22:58.160
11 - myThread-3 - 21:22:58.160
12 - myThread-3 - 21:22:58.160
Comparing with flatMap()
public class Example2FlatMapAndParallelism {
public static void main(String[] args) {
System.out.println("-- flatMap without parallelism --");
Flux.just(1, 5, 9)
.flatMap(integer -> {
System.out.println("-- starting: " + integer + " --");
return Flux.range(integer, 4);
})
.subscribe(x ->
System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
System.out.println("-- flatMap with parallelism --");
Flux.just(1, 5, 9)
.flatMap(integer -> {
System.out.println("-- starting: " + integer + " --");
return Flux.range(integer, 4)
.subscribeOn(Schedulers.newParallel("myThread", 8));
})
.subscribe(x ->
System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
}
}
-- flatMap without parallelism --
-- starting: 1 --
1 - main - 21:24:12.789
2 - main - 21:24:12.789
3 - main - 21:24:12.789
4 - main - 21:24:12.789
-- starting: 5 --
5 - main - 21:24:12.789
6 - main - 21:24:12.789
7 - main - 21:24:12.789
8 - main - 21:24:12.789
-- starting: 9 --
9 - main - 21:24:12.789
10 - main - 21:24:12.789
11 - main - 21:24:12.789
12 - main - 21:24:12.789
-- flatMap with parallelism --
-- starting: 1 --
-- starting: 5 --
-- starting: 9 --
1 - myThread-1 - 21:24:12.820
2 - myThread-1 - 21:24:12.820
3 - myThread-1 - 21:24:12.820
4 - myThread-1 - 21:24:12.820
9 - myThread-3 - 21:24:12.820
10 - myThread-3 - 21:24:12.820
11 - myThread-3 - 21:24:12.820
12 - myThread-3 - 21:24:12.820
5 - myThread-2 - 21:24:12.820
6 - myThread-2 - 21:24:12.820
7 - myThread-2 - 21:24:12.820
8 - myThread-2 - 21:24:12.820
With concurrency:
concurrency = 1
public class Example3FlatMapSequentialAndConcurrency {
public static void main(String[] args) {
System.out.println("-- flatMapSequential with parallelism and concurrency --");
Flux.just(1, 5, 9)
.flatMapSequential(integer -> {
System.out.println("-- starting: " + integer + " --");
return Flux.range(integer, 4)
.subscribeOn(Schedulers.newParallel("myThread", 8));
}, 1)
.subscribe(x -> System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
}
}
-- flatMapSequential with parallelism and concurrency --
-- starting: 1 --
1 - myThread-1 - 21:25:40.432
2 - myThread-1 - 21:25:40.448
3 - myThread-1 - 21:25:40.448
4 - myThread-1 - 21:25:40.448
-- starting: 5 --
5 - myThread-2 - 21:25:40.448
6 - myThread-2 - 21:25:40.448
7 - myThread-2 - 21:25:40.448
8 - myThread-2 - 21:25:40.448
-- starting: 9 --
9 - myThread-3 - 21:25:40.448
10 - myThread-3 - 21:25:40.448
11 - myThread-3 - 21:25:40.448
12 - myThread-3 - 21:25:40.448
concurrency = 2
public class Example4FlatMapSequentialAndConcurrency {
public static void main(String[] args) {
System.out.println("-- flatMapSequential with parallelism and concurrency --");
Flux.just(1, 5, 9)
.flatMapSequential(integer -> {
System.out.println("-- starting: " + integer + " --");
return Flux.range(integer, 4)
.subscribeOn(Schedulers.newParallel("myThread", 8));
}, 2)
.subscribe(x -> System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
}
}
-- flatMapSequential with parallelism and concurrency --
-- starting: 1 --
-- starting: 5 --
1 - myThread-2 - 21:26:21.442
2 - myThread-2 - 21:26:21.442
3 - myThread-2 - 21:26:21.442
4 - myThread-2 - 21:26:21.442
-- starting: 9 --
5 - myThread-2 - 21:26:21.442
6 - myThread-2 - 21:26:21.442
7 - myThread-2 - 21:26:21.442
8 - myThread-2 - 21:26:21.442
9 - myThread-3 - 21:26:21.442
10 - myThread-3 - 21:26:21.442
11 - myThread-3 - 21:26:21.442
12 - myThread-3 - 21:26:21.442
With concurrency and prefetch
public class Example5FlatMapSequentialConcurrencyAndPrefetch {
public static void main(String[] args) {
System.out.println("-- flatMapSequential with parallelism, concurrency and prefetch --");
Flux.just(1, 5, 9)
.flatMapSequential(integer -> {
System.out.println("-- starting: " + integer + " --");
return Flux.range(integer, 4)
.subscribeOn(Schedulers.newParallel("myThread", 8));
}, 1, 4)
.subscribe(x -> System.out.println(x + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
}
}
-- flatMapSequential with parallelism, concurrency and prefetch --
-- starting: 1 --
1 - myThread-1 - 21:28:42.495
2 - myThread-1 - 21:28:42.511
3 - myThread-1 - 21:28:42.511
4 - myThread-1 - 21:28:42.511
-- starting: 5 --
5 - myThread-2 - 21:28:42.511
6 - myThread-2 - 21:28:42.511
7 - myThread-2 - 21:28:42.511
8 - myThread-2 - 21:28:42.511
-- starting: 9 --
9 - myThread-3 - 21:28:42.511
10 - myThread-3 - 21:28:42.511
11 - myThread-3 - 21:28:42.511
12 - myThread-3 - 21:28:42.511
Example ProjectDependencies and Technologies Used: - reactor-core 3.3.3.RELEASE: Non-Blocking Reactive Foundation for the JVM.
- JDK 8
- Maven 3.5.4
|