掘金 后端 ( ) • 2024-06-21 10:46

理解Java并发编程中的同步工具:ReentrantLock、CountDownLatch、CyclicBarrier和Semaphore

在多线程编程中,同步和协作是至关重要的任务。Java提供了一系列的同步工具来帮助开发者更好地控制多线程的执行。本文将介绍四种常用的同步工具:ReentrantLock、CountDownLatch、CyclicBarrier和Semaphore,以及它们如何在不同场景下被应用。

ReentrantLock

ReentrantLock是一个互斥锁,它具有完全灵活的加锁/解锁机制,允许更丰富的同步结构。与内置的synchronized关键字不同,ReentrantLock提供了更高的灵活性,包括可轮询的锁请求、定时的锁等待,以及可中断的锁等待。

  • 重入性:一个线程可以重复获取它已经持有的锁,该锁保持一个持有计数器,用于跟踪锁被同一个线程获取的次数[^1^]。
  • 公平性:可以构造为公平锁或非公平锁。公平锁将锁授予等待时间最长的线程,而非公平锁不保证任何特定的顺序[^1^]。

CountDownLatch

CountDownLatch是一个同步辅助工具,允许一个或多个线程等待直到一组操作完成。它使用一个给定的计数初始化,并且每个线程在完成其操作后调用countDown()方法递减这个计数。当计数达到零时,所有等待的线程被唤醒并继续执行。

  • 简化代码CountDownLatch通常用于简化代码,使线程间的交互更易于管理和理解。
  • 一次性事件CountDownLatch适合用于只需要所有等待线程响应一次的场景,它是一个不可重用的同步工具。

CyclicBarrier

CyclicBarrier是一个同步辅助工具,它允许一组线程互相等待,直到所有线程都准备好之后再各自继续执行。它在某些迭代计算中非常有用,例如并行数据处理,每个线程处理数据的一部分,然后再合并结果。

  • 循环使用:与CountDownLatch不同,CyclicBarrier可以被重置并重用。
  • 参与者角色CyclicBarrier区分参与线程和屏障动作,参与线程可以在屏障点之前执行一些准备工作,而屏障动作则在所有线程都到达后执行。

Semaphore

Semaphore是一个计数信号量,用于管理一组资源。它可以用于控制同时访问某个特定资源的线程数量,或者用于某种资源的可用性。

  • 资源控制Semaphore可以限制同时访问某一资源或资源池的线程数。
  • 许可的概念:线程通过acquire()方法获得许可,并在完成后通过release()方法释放许可。如果没有足够的许可,acquire()方法将阻塞直到有可用的许可。

这些工具类提供了强大的多线程控制机制,使得开发者能够设计出更加精确和复杂的多线程应用程序。选择正确的同步工具对于提高程序的性能和可靠性至关重要。在实际应用中,根据具体的并发需求和场景选择合适的同步工具,可以有效地解决多线程中的同步问题。

业务背景

在多线程环境中,我面临一个挑战:执行一段资源密集型的业务逻辑,该逻辑仅需要同步执行一次。为了高效地处理这种情形,我的策略是确保在众多线程中,仅有单一线程负责执行这项任务,而其他线程则在等待这一过程完成后直接退出,无需再次执行相同的业务代码。

实现

实现逻辑为定义一个Semaphore信号变量,设置1个信号,确保只有一个线程执行。其余的线程通过lock.condition锁住,等待后续唤醒。


public class ReentrantLockTest {
    final Lock lock=new ReentrantLock();
    final Condition condition=lock.newCondition();

    final Semaphore semaphore=new Semaphore(1);

    public void execute(){
        if (semaphore.tryAcquire()){
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName()+"正常业务执行");
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                semaphore.release();
                lock.lock();
                condition.signalAll();
                lock.unlock();
            }
        }else{
            lock.lock();
            try {
                condition.await();
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
            System.out.println(Thread.currentThread().getName()+"其他业务执行");
        }
    }
    public static void main(String[] args) {
        ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
        Thread thread1 = new Thread(reentrantLockTest::execute);
        Thread thread2 = new Thread(reentrantLockTest::execute);
        Thread thread3 = new Thread(reentrantLockTest::execute);
        thread1.start();
        thread3.start();
        thread2.start();

       try {
           Thread.sleep(1000);
       }catch (Exception e){

       }
        new Thread(reentrantLockTest::execute).start();
        new Thread(reentrantLockTest::execute).start();;
        new Thread(reentrantLockTest::execute).start();;
    }
}

代码执行结果如下:

Thread-0正常业务执行
Thread-2其他业务执行
Thread-1其他业务执行
Thread-3正常业务执行
Thread-4其他业务执行
Thread-5其他业务执行