To retry an error producing Flux or Mono sequence we can use following methods.
Flux Methods
public final Flux<T> retry()
Above method re-subscribes to this Flux sequence if it signals any error. It does so indefinitely.
public final Flux<T> retry(long numRetries)
Above method re-subscribes to this Flux sequence if it signals any error, for a fixed number of times (numRetries).
Mono Methods
public final Mono<T> retry()
public final Mono<T> retry(long numRetries)
All above methods work by re-subscribing to the upstream Flux.
Examples
Flux.retry() Example
package com.logicbig.example;
import reactor.core.publisher.Flux;
import java.time.LocalTime;
public class RetryFluxExample {
public static void main(String[] args) {
Flux.just(1, 3, 5, 7)
.map(RetryFluxExample::process)
.doOnError(System.err::println)
.retry(3)
.subscribe(e ->
System.out.println(e + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
System.out.println("Program exits");
}
private static String process(Integer integer) {
if (integer == 5) {
throw new IllegalArgumentException("test exception");
}
return "Number: " + integer;
}
}
Number: 1 - main - 04:06:28.181
Number: 3 - main - 04:06:28.194
java.lang.IllegalArgumentException: test exception
Number: 1 - main - 04:06:28.201
Number: 3 - main - 04:06:28.201
java.lang.IllegalArgumentException: test exception
Number: 1 - main - 04:06:28.201
Number: 3 - main - 04:06:28.201
java.lang.IllegalArgumentException: test exception
Number: 1 - main - 04:06:28.202
Number: 3 - main - 04:06:28.202
java.lang.IllegalArgumentException: test exception
Program exits
If we don't use doOnError() then:
package com.logicbig.example;
import reactor.core.publisher.Flux;
import java.time.LocalTime;
public class RetryFluxExample2 {
public static void main(String[] args) {
Flux.just(1, 3, 5, 7)
.map(RetryFluxExample2::process)
.retry(3)
.subscribe(e ->
System.out.println(e + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
System.out.println("Program exits");
}
private static String process(Integer integer) {
if (integer == 5) {
throw new IllegalArgumentException("test exception");
}
return "Number: " + integer;
}
}
Number: 1 - main - 04:16:53.356
Number: 3 - main - 04:16:53.375
Number: 1 - main - 04:16:53.386
Number: 3 - main - 04:16:53.386
Number: 1 - main - 04:16:53.386
Number: 3 - main - 04:16:53.386
Number: 1 - main - 04:16:53.387
Number: 3 - main - 04:16:53.387
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: test exception Caused by: java.lang.IllegalArgumentException: test exception at com.logicbig.example.RetryFluxExample2.lambda$main$0(RetryFluxExample2.java:11) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107) at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:171) at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2125) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1999) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53) at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59) at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) at reactor.core.publisher.FluxRetry$RetrySubscriber.resubscribe(FluxRetry.java:110) at reactor.core.publisher.FluxRetry.subscribeOrReturn(FluxRetry.java:51) at reactor.core.publisher.Flux.subscribe(Flux.java:8311) at reactor.core.publisher.Flux.subscribeWith(Flux.java:8494) at reactor.core.publisher.Flux.subscribe(Flux.java:8295) at reactor.core.publisher.Flux.subscribe(Flux.java:8222) at reactor.core.publisher.Flux.subscribe(Flux.java:8165) at com.logicbig.example.RetryFluxExample2.main(RetryFluxExample2.java:16)
Mono.retry() Example
public class RetryMonoExample {
public static void main(String[] args) {
Mono.just(4)
.map(RetryMonoExample::process)
.doOnError(System.err::println)
.retry(3)
.subscribe(e ->
System.out.println(e + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));
System.out.println("Program exits");
}
private static String process(Integer integer) {
if (integer % 2 == 0) {
throw new IllegalArgumentException("test exception");
}
return "Number: " + integer;
}
}
java.lang.IllegalArgumentException: test exception
java.lang.IllegalArgumentException: test exception
java.lang.IllegalArgumentException: test exception
java.lang.IllegalArgumentException: test exception
Program exits
Example ProjectDependencies and Technologies Used: - reactor-core 3.3.5.RELEASE: Non-Blocking Reactive Foundation for the JVM.
- JDK 8
- Maven 3.5.4
|