掘金 后端 ( ) • 2024-04-20 11:59

Phaser 是 Java 并发包(java.util.concurrent)中一个非常有用的同步辅助类,它允许执行并发多阶段任务。当我们有一系列要分步进行的操作,且每一步都需要多个线程并发执行,而且一个步骤完成之前,下一个步骤不能开始时,Phaser 是一个理想的选择。

核心特性

  • 灵活的任务协调: Phaser提供了一个灵活的方式来同步一个或多个任务的多个阶段。
  • 动态的线程管理: 可以注册任意数量的参与者(线程),也可以随时注销。

用途

Phaser常用于并发编程,其中任务可以分为多个阶段执行。它可以替代CountDownLatchCyclicBarrier,因为它提供了更为灵活的功能。

源码解析(简化)

Phaser内部维护了一个表示当前阶段(generation)的状态,以及注册的参与者数量。以下是核心的简化源码逻辑:

public class Phaser {
    // 注册的任务数
    private final AtomicInteger state;

    // 注册一个新的任务
    public int register() {
        // 更新状态表示一个新的参与者加入
    }

    // 到达当前阶段并等待前进
    public int arriveAndAwaitAdvance() {
        // 到达,然后等待其他参与者到达
    }

    // 到达,并且注销
    public int arriveAndDeregister() {
        // 到达当前阶段,并且减少参与者的数量
    }
    
    // 其他方法...
}

代码演示

假设我们有一个任务,分为三个阶段执行,每个阶段有多个线程参与:

import java.util.concurrent.Phaser;

public class PhaserDemo {

    public static void main(String[] args) {
        // 创建一个Phaser,并注册主线程
        final Phaser phaser = new Phaser(1);

        for(int i = 0; i < 3; i++) { // 创建三个线程
            final int taskId = i;
            Thread task = new Thread(() -> {
                phaser.register(); // 注册各个任务线程
                for(int phase = 0; phase < 3; phase++) {
                    System.out.printf("任务%d, 阶段%d\n", taskId, phase);
                    phaser.arriveAndAwaitAdvance(); // 等待其他任务到达当前阶段
                }
                phaser.arriveAndDeregister(); // 任务完成,注销
            });
            task.start();
        }

        // 主线程解除注册,等待子任务完成
        while (!phaser.isTerminated()) {
            int phase = phaser.getPhase();
            phaser.arriveAndAwaitAdvance(); // 等待所有的Phaser参与者到达
            System.out.println("阶段 " + phase + " 完成");
        }
    }
}

在这个例子中,我们创建了一个Phaser并注册了主线程(new Phaser(1)),然后创建了3个参与的线程,每个线程都注册自己(phaser.register())。每个任务完成一阶段的工作之后,会调用phaser.arriveAndAwaitAdvance(),等待其他任务完成同一阶段。每个任务完成后,调用phaser.arriveAndDeregister()注销自己。主线程在所有任务完成后继续执行。

注意事项

  • 动态注册和注销: Phaser 的注册和注销操作是动态的,任务可以在任何时刻注册或注销。
  • 适应性: Phaser 可以适应不同的任务和阶段,不需要预先知道并发任务的具体数量。
  • 阶段跟踪: 可以通过getPhase()获取当前阶段的信息。

Phaser 相较于其他并发辅助类(如CyclicBarrierCountDownLatch)提供了更为灵活的多阶段同步能力,适用于复杂的并发任务流程控制。它在处理多阶段任务时的能力,使得编写并发程序时对任务同步控制更加精细和灵活。