掘金 后端 ( ) • 2024-04-19 12:58

SynchronousQueue是一个没有存储空间的阻塞队列,它是java.util.concurrent包中的一部分。每一个put操作必须等待一个take操作,反之亦然。SynchronousQueue内部并不维护任何元素的存储,可以认为它是一种线程之间一对一传递消息的机制。

核心特性

  • 不存储元素SynchronousQueue不是用来存储元素的,每个插入操作必须等待另一个线程的移除操作。
  • 公平性选择:构造SynchronousQueue时可以选择公平策略,如果设置为公平模式,队列会按照线程等待的顺序来处理线程的插入和移除请求。

用途

SynchronousQueue通常用于传递性场景和线程池的工作队列。例如,在ThreadPoolExecutor中,如果创建了一个基于SynchronousQueue的线程池,提交任务时若有空闲线程则立即执行,若没有空闲线程则尝试创建新线程执行,如果线程数已达最大,则执行拒绝策略。

源码解析(简化)

SynchronousQueue的实现涉及到复杂的并发控制。以下是它的简化源码逻辑:

public class SynchronousQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    // 省略了锁和条件变量的实现细节
    
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // 将元素提供给正在等待的消费者
        if (!transferer.transfer(e, true, 0L)) {
            // 如果没有正在等待的消费者,则阻塞生产者线程直到元素被消费
            // 中断策略等逻辑在实际源码中有详细的实现,这里简化
            // transferer.transfer 实现了元素的传递逻辑
        }
    }

    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0L);
        if (e != null)
            return e;
        // 如果没有可用的元素,则阻塞消费者线程直到生产者线程插入元素
        // 中断策略等逻辑在实际源码中有详细的实现,这里简化
    }

    // 其他方法...
}

代码演示

这是一个SynchronousQueue的简单使用示例:

import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueDemo {

    public static void main(String[] args) {
        SynchronousQueue<String> queue = new SynchronousQueue<>();

        // 消费者线程
        Thread consumer = new Thread(() -> {
            try {
                String event = queue.take();
                System.out.println("消费了一个事件: " + event);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 生产者线程
        Thread producer = new Thread(() -> {
            try {
                String event = "四月之声";
                queue.put(event);
                System.out.println("生产了一个事件: " + event);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        consumer.start();
        producer.start();
    }
}

注意事项

  • 阻塞行为SynchronousQueue的操作几乎在所有情况下都可能阻塞,因为它不存储元素。
  • 中断:等待的线程可以响应中断,当线程在puttake时阻塞,如果线程被中断,将抛出InterruptedException
  • 元素空间:因为队列内部没有存储空间,所以peekcontainsclearisEmpty等方法不是很有意义,这些方法的实现总是返回特定的值(例如isEmpty总是返回true)。

SynchronousQueue是一个非常低层次的同步机制,它为高并发场景提供了一种线程间单个元素的交换方式。在设计高并发系统时,SynchronousQueue可以作为构建更高级同步结构的基础组件。