Close

Reactor - Retrying Flux/Mono Sequence

[Last Updated: Aug 11, 2020]

To retry an error producing Flux or Mono sequence we can use following methods.

Flux Methods

public final Flux<T> retry() 

Above method re-subscribes to this Flux sequence if it signals any error. It does so indefinitely.

public final Flux<T> retry(long numRetries)

Above method re-subscribes to this Flux sequence if it signals any error, for a fixed number of times (numRetries).

Mono Methods

public final Mono<T> retry()
public final Mono<T> retry(long numRetries) 

All above methods work by re-subscribing to the upstream Flux.

Examples

Flux.retry() Example

package com.logicbig.example;

import reactor.core.publisher.Flux;
import java.time.LocalTime;

public class RetryFluxExample {
  public static void main(String[] args) {
      Flux.just(1, 3, 5, 7)
          .map(RetryFluxExample::process)
          .doOnError(System.err::println)
          .retry(3)
          .subscribe(e ->
                  System.out.println(e + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));

      System.out.println("Program exits");
  }

  private static String process(Integer integer) {
      if (integer == 5) {
          throw new IllegalArgumentException("test exception");
      }
      return "Number: " + integer;
  }
}
Number: 1 - main - 04:06:28.181
Number: 3 - main - 04:06:28.194
java.lang.IllegalArgumentException: test exception
Number: 1 - main - 04:06:28.201
Number: 3 - main - 04:06:28.201
java.lang.IllegalArgumentException: test exception
Number: 1 - main - 04:06:28.201
Number: 3 - main - 04:06:28.201
java.lang.IllegalArgumentException: test exception
Number: 1 - main - 04:06:28.202
Number: 3 - main - 04:06:28.202
java.lang.IllegalArgumentException: test exception
Program exits

If we don't use doOnError() then:

package com.logicbig.example;

import reactor.core.publisher.Flux;
import java.time.LocalTime;

public class RetryFluxExample2 {
  public static void main(String[] args) {
      Flux.just(1, 3, 5, 7)
          .map(RetryFluxExample2::process)
          .retry(3)
          .subscribe(e ->
                  System.out.println(e + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));

      System.out.println("Program exits");
  }

  private static String process(Integer integer) {
      if (integer == 5) {
          throw new IllegalArgumentException("test exception");
      }
      return "Number: " + integer;
  }
}
Number: 1 - main - 04:16:53.356
Number: 3 - main - 04:16:53.375
Number: 1 - main - 04:16:53.386
Number: 3 - main - 04:16:53.386
Number: 1 - main - 04:16:53.386
Number: 3 - main - 04:16:53.386
Number: 1 - main - 04:16:53.387
Number: 3 - main - 04:16:53.387
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: test exception
Caused by: java.lang.IllegalArgumentException: test exception
at com.logicbig.example.RetryFluxExample2.lambda$main$0(RetryFluxExample2.java:11)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:171)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2125)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1999)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
at reactor.core.publisher.FluxRetry$RetrySubscriber.resubscribe(FluxRetry.java:110)
at reactor.core.publisher.FluxRetry.subscribeOrReturn(FluxRetry.java:51)
at reactor.core.publisher.Flux.subscribe(Flux.java:8311)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8494)
at reactor.core.publisher.Flux.subscribe(Flux.java:8295)
at reactor.core.publisher.Flux.subscribe(Flux.java:8222)
at reactor.core.publisher.Flux.subscribe(Flux.java:8165)
at com.logicbig.example.RetryFluxExample2.main(RetryFluxExample2.java:16)

Mono.retry() Example

public class RetryMonoExample {
  public static void main(String[] args) {
      Mono.just(4)
          .map(RetryMonoExample::process)
          .doOnError(System.err::println)
          .retry(3)
          .subscribe(e ->
                  System.out.println(e + " - " + Thread.currentThread().getName() + " - " + LocalTime.now()));

      System.out.println("Program exits");
  }

  private static String process(Integer integer) {
      if (integer % 2 == 0) {
          throw new IllegalArgumentException("test exception");
      }
      return "Number: " + integer;
  }
}
java.lang.IllegalArgumentException: test exception
java.lang.IllegalArgumentException: test exception
java.lang.IllegalArgumentException: test exception
java.lang.IllegalArgumentException: test exception
Program exits

Example Project

Dependencies and Technologies Used:

  • reactor-core 3.3.5.RELEASE: Non-Blocking Reactive Foundation for the JVM.
  • JDK 8
  • Maven 3.5.4

Reactor - Flux/Mono retry() Examples Select All Download
  • reactor-retry-example
    • src
      • main
        • java
          • com
            • logicbig
              • example
                • RetryFluxExample.java

    See Also