Java - Phaser

[Updated: Jan 9, 2018, Created: Jan 8, 2018]

java.util.concurrent.Phaser is similar in functionality to CyclicBarrier and CountDownLatch but supports more powerful and flexible features. Let's see examples to understand what Phaser can do.

Examples

Phaser as Barrier

This example will show Phaser functionality as a repeatable barrier which is similar to CyclicBarrier.

The method Phaser.arriveAndAwaitAdvance() will block on the current party (thread) until all 'registered' parties (registered via Phaser.register() method) has called this method.

public class PhaserExample {
  private static final Phaser phaser = new Phaser();

  public static void main(String[] args) throws InterruptedException {
      startTask(0);
      startTask(500);
      startTask(1000);
  }

  private static void startTask(long initialDelay) throws InterruptedException {
      Thread.sleep(initialDelay);
      new Thread(PhaserExample::taskRun).start();
  }

  private static void taskRun() {
      phaser.register();//registering this thread
      print("after registering");
      for (int i = 1; i <= 2; i++) {
          try {
              //doing some work
              Thread.sleep(3000);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
          //the barrier point
          print("before arrive " + i);
          phaser.arriveAndAwaitAdvance();//current thread will wait for others to arrive
          print("after arrive " + i);
      }
  }

  private static void print(String msg) {
      System.out.printf("%-20s: %10s, t=%s, registered=%s, arrived=%s, unarrived=%s phase=%s%n",
              msg,
              Thread.currentThread().getName(),
              LocalTime.now(),
              phaser.getRegisteredParties(),
              phaser.getArrivedParties(),
              phaser.getUnarrivedParties(),
              phaser.getPhase()
      );
  }
}
after registering   :   Thread-0, t=20:06:59.064, registered=1, arrived=0, unarrived=1 phase=0
after registering : Thread-1, t=20:06:59.548, registered=2, arrived=0, unarrived=2 phase=0
after registering : Thread-2, t=20:07:00.550, registered=3, arrived=0, unarrived=3 phase=0
before arrive 1 : Thread-0, t=20:07:02.071, registered=3, arrived=0, unarrived=3 phase=0
before arrive 1 : Thread-1, t=20:07:02.549, registered=3, arrived=1, unarrived=2 phase=0
before arrive 1 : Thread-2, t=20:07:03.552, registered=3, arrived=2, unarrived=1 phase=0
after arrive 1 : Thread-2, t=20:07:03.553, registered=3, arrived=0, unarrived=3 phase=1
after arrive 1 : Thread-0, t=20:07:03.553, registered=3, arrived=0, unarrived=3 phase=1
after arrive 1 : Thread-1, t=20:07:03.553, registered=3, arrived=0, unarrived=3 phase=1
before arrive 2 : Thread-2, t=20:07:06.554, registered=3, arrived=0, unarrived=3 phase=1
before arrive 2 : Thread-0, t=20:07:06.555, registered=3, arrived=0, unarrived=3 phase=1
before arrive 2 : Thread-1, t=20:07:06.557, registered=3, arrived=1, unarrived=2 phase=1
after arrive 2 : Thread-1, t=20:07:06.558, registered=3, arrived=0, unarrived=3 phase=2
after arrive 2 : Thread-0, t=20:07:06.558, registered=3, arrived=0, unarrived=3 phase=2
after arrive 2 : Thread-2, t=20:07:06.558, registered=3, arrived=0, unarrived=3 phase=2

As seen in above output, 'after arrive 1' and 'after arrive 2' are printed at the same time by all three threads, that's the point where all threads are unblocked when 'arrived' count reaches to its max value (3).

Other than register() and arriveAndAwaitAdvance() methods which we explained above, followings are quick descriptions of the Phaser's methods which we used in print() method (referred as 'monitoring' methods by the API doc):

  • getRegisteredParties() returns the number of parties registered at this phaser. The registration count increases at each register() call or during construction time (second next example).
  • getArrivedParties() returns the number of registered parties that have called arriveAndAwaitAdvance() method (the barrier point). There are other similar arrive methods which we will learn in next examples.
  • getUnarrivedParties() returns the number of registered parties that have not yet arrived at the barrier point.
  • getPhase() returns current 'phase' count. Each time all registered threads cross the barrier point, phase number is advanced (increased) by one.

Dynamic Barrier Point

Unlike the case for CyclicBarrier and CountDownLatch, the number of parties registered to synchronize on a barrier may vary dynamically by using registering and unresisting methods.

This example will show how to do that by using register() and arriveAndDeregister() methods:

public class PhaserExample2 {
  private static final Phaser phaser = new Phaser();
  private static AtomicBoolean unRegisteredFlag = new AtomicBoolean(false);

  public static void main(String[] args) throws InterruptedException {
      startTask(0);
      startTask(1000);
      startTask(2000);
  }

  private static void startTask(long initialDelay) throws InterruptedException {
      Thread.sleep(initialDelay);
      new Thread(PhaserExample2::taskRun).start();
  }

  private static void taskRun() {
      phaser.register();//registering this thread
      print("after registering");
      for (int i = 1; i <= 2; i++) {
          try {
              //doing some work
              Thread.sleep(5000);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
          print("before arrive " + i);
          phaser.arriveAndAwaitAdvance();//current thread will wait for others to arrive
          print("after arrive " + i);
          if(!unRegisteredFlag.get()){
              unRegisteredFlag.set(true);
              print("UnRegistering");
              phaser.arriveAndDeregister();// unregistering the party
              break;
          }
      }
  }
    .............
}
after registering   :   Thread-0, t=20:20:18.100, registered=1, arrived=0, unarrived=1 phase=0
after registering : Thread-1, t=20:20:19.084, registered=2, arrived=0, unarrived=2 phase=0
after registering : Thread-2, t=20:20:21.085, registered=3, arrived=0, unarrived=3 phase=0
before arrive 1 : Thread-0, t=20:20:23.107, registered=3, arrived=0, unarrived=3 phase=0
before arrive 1 : Thread-1, t=20:20:24.086, registered=3, arrived=1, unarrived=2 phase=0
before arrive 1 : Thread-2, t=20:20:26.088, registered=3, arrived=2, unarrived=1 phase=0
after arrive 1 : Thread-2, t=20:20:26.090, registered=3, arrived=0, unarrived=3 phase=1
after arrive 1 : Thread-0, t=20:20:26.090, registered=3, arrived=0, unarrived=3 phase=1
after arrive 1 : Thread-1, t=20:20:26.090, registered=3, arrived=0, unarrived=3 phase=1
UnRegistering : Thread-2, t=20:20:26.091, registered=3, arrived=0, unarrived=3 phase=1
before arrive 2 : Thread-0, t=20:20:31.092, registered=2, arrived=0, unarrived=2 phase=1
before arrive 2 : Thread-1, t=20:20:31.093, registered=2, arrived=1, unarrived=1 phase=1
after arrive 2 : Thread-0, t=20:20:31.094, registered=2, arrived=0, unarrived=2 phase=2
after arrive 2 : Thread-1, t=20:20:31.094, registered=2, arrived=0, unarrived=2 phase=2

As seen in above output, one of the party unregister itself after the first iteration which decreased the registered count.

Registration count during construction time

The constructor Phaser(int parties) creates a Phaser with given number of threads/parties already registered with it (apart from calling register() method). Such Phaser will trip (release the waiting parties) when equal number of await method calls are made.

public class PhaserExample3 {
  private static final Phaser phaser = new Phaser(1);//registered with one party

  public static void main(String[] args) throws InterruptedException {
      print("before running task in main method");
      startTask();
      startTask();
      startTask();
      //doing some work
      Thread.sleep(10000);
     print("deRegistering main thread");
     phaser.arriveAndDeregister();//unregistering one party
  }

  private static void startTask() throws InterruptedException {
      Thread.sleep(300);
      new Thread(PhaserExample3::taskRun).start();
  }

  private static void taskRun() {
      print("before registering");
      phaser.register();//registering more
      try {
          //doing some work
          Thread.sleep(500);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      print("before arrive");
      phaser.arriveAndAwaitAdvance();//current thread will wait wait for others to arrive
      print("after arrive");
  }
    .............
}
before running task in main method:       main, t=20:31:45.039, registered=1, arrived=0, unarrived=1 phase=0
before registering : Thread-0, t=20:31:45.382, registered=1, arrived=0, unarrived=1 phase=0
before registering : Thread-1, t=20:31:45.682, registered=2, arrived=0, unarrived=2 phase=0
before arrive : Thread-0, t=20:31:45.883, registered=3, arrived=0, unarrived=3 phase=0
before registering : Thread-2, t=20:31:45.982, registered=3, arrived=1, unarrived=2 phase=0
before arrive : Thread-1, t=20:31:46.183, registered=4, arrived=1, unarrived=3 phase=0
before arrive : Thread-2, t=20:31:46.483, registered=4, arrived=2, unarrived=2 phase=0
deRegistering main thread: main, t=20:31:55.982, registered=4, arrived=3, unarrived=1 phase=0
after arrive : Thread-0, t=20:31:55.984, registered=3, arrived=0, unarrived=3 phase=1
after arrive : Thread-2, t=20:31:55.984, registered=3, arrived=0, unarrived=3 phase=1
after arrive : Thread-1, t=20:31:55.984, registered=3, arrived=0, unarrived=3 phase=1

Taking action on Phase changes and controlling termination

onAdvance method can be overridden to perform an action upon phase advancement (when phase number changes), and to control termination by returning true (to terminate) or false (to continue). Termination does not terminate the threads but parties are not synchronized (blocked) at the barrier and getPhase() will start to return random negative numbers.

In following example, we are also using arrive() instead of arriveAndAwaitAdvance(), the difference is arrive() does not block/wait for other parties to arrive.

public class PhaserExample4 {
  private static final Phaser phaser = new Phaser() {
      @Override
      protected boolean onAdvance(int phase, int registeredParties) {
          print(String.format("On Advance phase=%s, registered=%s", phase, registeredParties));
          return true;//super.onAdvance(phase, registeredParties);
      }
  };

  public static void main(String[] args) throws InterruptedException {
      print("before running task in main method");
      startTask();
      startTask();
      startTask();
  }

  private static void startTask() throws InterruptedException {
      Thread.sleep(300);
      new Thread(PhaserExample4::taskRun).start();
  }

  private static void taskRun() {
      print("before registering");
      phaser.register();//registering this thread
      try {
          //doing some work
          Thread.sleep(500);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      print("before arrive");
      phaser.arrive();//current thread will not  wait
      print("after arrive");
  }

  private static void print(String msg) {
      System.out.printf("%-20s: %10s, t=%s, registered=%s, arrived=%s, unarrived=%s phase=%s%n",
              msg,
              Thread.currentThread().getName(),
              LocalTime.now(),
              phaser.getRegisteredParties(),
              phaser.getArrivedParties(),
              phaser.getUnarrivedParties(),
              phaser.getPhase()
      );
  }
}
before running task in main method:       main, t=20:33:55.295, registered=0, arrived=0, unarrived=0 phase=0
before registering : Thread-0, t=20:33:55.638, registered=0, arrived=0, unarrived=0 phase=0
before registering : Thread-1, t=20:33:55.939, registered=1, arrived=0, unarrived=1 phase=0
before arrive : Thread-0, t=20:33:56.141, registered=2, arrived=0, unarrived=2 phase=0
after arrive : Thread-0, t=20:33:56.142, registered=2, arrived=1, unarrived=1 phase=0
before registering : Thread-2, t=20:33:56.240, registered=2, arrived=1, unarrived=1 phase=0
before arrive : Thread-1, t=20:33:56.442, registered=3, arrived=1, unarrived=2 phase=0
after arrive : Thread-1, t=20:33:56.443, registered=3, arrived=2, unarrived=1 phase=0
before arrive : Thread-2, t=20:33:56.742, registered=3, arrived=2, unarrived=1 phase=0
On Advance phase=0, registered=3: Thread-2, t=20:33:56.743, registered=3, arrived=3, unarrived=0 phase=0
after arrive : Thread-2, t=20:33:56.744, registered=3, arrived=3, unarrived=0 phase=-2147483647

Example Project

Dependencies and Technologies Used :

  • JDK 1.8
  • Maven 3.3.9

Phaser Examples Select All Download
  • phaser-example
    • src
      • main
        • java
          • com
            • logicbig
              • example

See Also