Understanding Java 9 Reactive Streams

[Updated: Oct 25, 2017, Created: Oct 24, 2017]

Java 9 Reactive Streams is an initiative to provide a standard for asynchronous stream processing. It is based on asynchronous publication and subscription model with the benefit of having non-blocking back pressure feature.

Non-blocking Back Pressure

It is a mechanism which helps a subscriber(consumer) to avoid receiving a lot of data (beyond it's capabilities/resources). The subscriber asynchronously notifies the publisher(s) (producer) that the data rate has to be reduced.

The Reactive Stream API

Java 9 provides a set of interfaces that defines reactive streams. All these interfaces are defined as the static nested classes of java.util.concurrent.Flow.

Subscriber Interface

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);1
    public void onNext(T item);2
    public void onError(Throwable throwable);3
    public void onComplete();4
}

1invoked after a Publisher has completed the subscription for this Subscriber (but before sending any Subscription's item). The newly created Subscription object is passed via this method. The Subscriber typically assigns this instance to an instance variable for further use.

2invoked with a Subscription's next item of type T.

3invoked upon an unrecoverable error encountered by a Publisher or Subscription.

4invoked when no additional Subscriber method invocations will occur including onNext() method.

Subscription Interface

This is the same interface whose instance is passed to the Subscriber.onSubscribe() method.

public static interface Subscription {
    public void request(long n);1
    public void cancel();2
}

1this is the key method behind non-blocking back-pressure concept. The Subscriber uses it to request n more items for consumption. This way the Subscriber controls how many items it is currently capable to receive (probably it will want to limit the consumption according to what resources it has).

2used by the Subscriber to cancel its subscription. After this call, no further items will be received.

Publisher Interface

@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);1
}

1used by the Subscribers for subscribing to receive the items of type T.

Processor Interface

A component that acts as both a Subscriber and Publisher.

public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

SubmissionPublisher class

This is the only concrete class provided in the Reactive Streams API. It implements the Publisher interface. We can use its submit() method to publish the provided item to each subscriber. In the following example we will see how to use this class and at the same time we will get familiar with the usage of above interfaces.

Example

Implementing the Subscriber Interface

package com.logicbig.example;

import java.util.concurrent.Flow;

public class MySubscriber implements Flow.Subscriber<String> {
  private Flow.Subscription subscription;

  @Override
  public void onSubscribe(Flow.Subscription subscription) {
      System.out.println("onSubscribe: " + subscription);
      this.subscription = subscription;
      subscription.request(2);
  }

  @Override
  public void onNext(String item) {
      System.out.println("item: " + item);
      // conditionally use subscription.request(n);
  }

  @Override
  public void onError(Throwable throwable) {
      System.out.println("error: " + throwable);
  }

  @Override
  public void onComplete() {
      System.out.println("onComplete");
  }
}

Using SubmissionPublisher

package com.logicbig.example;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class SubmissionPublisherExample {
  public static void main(String[] args) throws InterruptedException {
      ExecutorService executor = Executors.newFixedThreadPool(1);
      SubmissionPublisher<String> sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
      sb.subscribe(new MySubscriber());
      sb.submit("item 1");
      sb.submit("item 2");
      sb.submit("item 3");

      executor.shutdown();
  }
}

Output

onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item: item 1
item: item 2

Note that, even though the publisher submitted three items, MySubscriber received only two items. That's because we used subscription.request(2); in MySubscriber#onSubscribe() method. We can call Subscription.request(n) in onNext() method as well to control the items consumptions dynamically and hence making use of non-blocking back pressure.

Example Project

Dependencies and Technologies Used :

  • JDK 9.0.1
Java 9 Reactive Stream Getting Started Example Select All Download
  • submission-publisher-reactive-stream
    • src
      • com
        • logicbig
          • example

See Also