掘金 后端 ( ) • 2024-05-09 09:52

CAS介绍

CAS(Compare and Swap)即比较并交换,是Java并发编程中一种非常重要的无锁算法实现方式。它主要用于在多线程环境下实现非阻塞同步,提高程序的执行效率。下面将从原理、底层实现、使用方法、应用场景、与其他技术的区别、容易产生的问题及解决方法几个方面详细解释Java线程中的CAS。

原理

CAS操作包含三个操作数:内存位置(V)、预期原值(A)和新值(B)。如果内存位置V的值与预期原值A相匹配,那么将内存位置V的值更新为新值B。否则,不做任何操作。整个过程是一个原子操作,保证了多线程环境下的数据一致性。

底层实现

在Java中,CAS操作是通过JNI(Java Native Interface)调用CPU硬件提供的特殊指令来实现的,如Intel的CMPXCHG8B、CMPXCHG16B指令。这些指令能够确保操作的原子性。在Java中,主要通过java.util.concurrent.atomic包下的原子类(如AtomicInteger)来使用CAS。

public class CasExample {
    private AtomicInteger count = new AtomicInteger(0);

    public void increment() {
        count.incrementAndGet(); // 使用CAS自增
    }

    public static void main(String[] args) {
        CasExample example = new CasExample();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    example.increment();
                }
            }).start();
        }

        //当前线程组中活动线程的数量,这包括调用该方法的当前线程在内。
        while (Thread.activeCount() > 1) { // 等待所有线程执行完毕
            Thread.yield();
        }
        System.out.println(example.count.get()); // 输出最终结果
    }
}

底层代码

直接查看底层实现代码可能会涉及到JDK源码和native代码,这里我将概述其工作流程和关键点,因为直接的底层汇编或C/C++实现细节会根据不同的JVM实现和操作系统有所差异,但核心逻辑遵循以下步骤:

Java层面的抽象

在Java层面,如AtomicIntegerincrementAndGet方法,大致逻辑如下伪代码所示:

public int incrementAndGet() {
    int current;
    int next;
    do {
        current = get(); // 获取当前值
        next = current + 1; // 计算新值
    } while (!compareAndSet(current, next)); // CAS操作,失败则重试
    return next;
}

这里的compareAndSet方法就是利用了底层的CAS指令。

Native层实现概览

对于compareAndSet等原子操作,Java通过JNI调用进入native代码,最终会调用到操作系统的底层硬件指令。以HotSpot虚拟机为例,这部分操作主要由unsafe.cpp文件中的相关函数完成,比如Unsafe::compareAndSwapInt方法。但直接的C++实现依赖于具体的CPU架构,例如在x86架构上,会调用到类似于下面这样的汇编指令:

lock cmpxchg [memory], reg32 // 对应32位整型的CAS操作
  • lock前缀确保了指令的原子性和内存的锁定,防止其他CPU同时修改这段内存。
  • cmpxchg指令执行比较并交换操作,它会比较内存地址指向的值是否与期望值相等,如果相等,则用新值替换旧值;如果不等,则不做任何操作。

应用场景

  1. 计数器:如统计访问次数。
  2. 非阻塞数据结构:如无锁队列、无锁栈等。
  3. 原子变量更新:如AtomicIntegerAtomicBoolean等原子类的使用。

与其他技术的区别

  • 与锁(synchronized、ReentrantLock)相比:CAS是非阻塞同步,不会导致线程挂起等待,提高了并发性能;但可能导致ABA问题,且在竞争激烈时可能引发大量的重试,降低效率。
  • 与volatile相比:volatile只能保证可见性和部分有序性,而CAS能在此基础上实现原子性更新。

容易产生的问题及解决方法

  1. ABA问题:一个值从A变为B再变回A,CAS检查时认为没有变化,但实际上发生了变化。解决办法是使用版本号或者添加额外标记,如AtomicStampedReference
  2. 循环时间长开销大:在高并发下,CAS失败重试可能导致大量线程进行自旋,消耗CPU资源。可以通过调整JVM参数减少自旋次数,或者使用锁(如LockSupport.parkNanos)来减少CPU占用。
  3. 只能保证一个共享变量的原子操作:对于需要同时更新多个共享变量的情况,可以使用AtomicReference包装一个对象,或者使用java.util.concurrent包下的更高级并发工具类,如AtomicIntegerArrayAtomicReferenceArray等。

解决ABA代码示例

public class ABAProblemSolution {
    public static void main(String[] args) {
        // 初始化引用和版本戳
        AtomicStampedReference<String> reference = new AtomicStampedReference<>("A", 0);
        
        // 线程1尝试更新A->B->A
        new Thread(() -> {
            int stamp = reference.getStamp(); // 获取初始版本戳
            reference.compareAndSet("A", "B", stamp, stamp + 1); // A->B,版本+1
            reference.compareAndSet("B", "A", stamp + 1, stamp + 2); // B->A,版本再+1
        }).start();
        
        // 线程2模拟检查ABA问题
        new Thread(() -> {
            try {
                Thread.sleep(100); // 确保线程1先执行
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            int initialStamp = reference.getStamp(); // 获取当前版本戳
            String initialValue = reference.getReference(); // 获取当前值
            
            // 模拟线程在此期间被暂停,ABA问题可能已经发生
            System.out.println("Initial Value: " + initialValue + ", Stamp: " + initialStamp);
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            // 再次检查值和版本戳
            if (reference.compareAndSet(initialValue, "C", initialStamp, initialStamp + 1)) {
                System.out.println("Successfully changed A->C");
            } else {
                System.out.println("ABA detected, change failed!");
            }
        }).start();
    }
}

AtomicInteger介绍

AtomicInteger是Java并发包(java.util.concurrent.atomic)中的一个类,它提供了一种线程安全的方式去进行整数的原子性操作,如增加、减少、设置等,而无需显式地加锁。AtomicInteger的核心在于使用了CAS(Compare and Swap,比较并交换)操作,这是一种乐观锁机制,能够在多线程环境下高效地更新共享变量,避免了传统锁的开销。

底层实现

AtomicInteger的底层实现依赖于JNI(Java Native Interface)调用,最终会转化为CPU级别的原子指令,如x86架构的CMPXCHG指令。这些指令保证了在多线程环境下对内存的读改写操作是不可分割的,从而避免了数据不一致的问题。

使用方法

AtomicInteger提供了多种原子操作方法,如get()set(int newValue)incrementAndGet()decrementAndGet()等。下面是一些常用方法的简要说明和示例:

  • get() :获取当前值。
  • set(int newValue) :设置新的值。
  • incrementAndGet() :原子性地增加1并返回新值。
  • decrementAndGet() :原子性地减少1并返回新值。
  • compareAndSet(int expect, int update) :如果当前值等于预期值expect,则以原子方式将该值设置为update,返回true;否则,不做任何操作并返回false。

应用场景

AtomicInteger非常适合用于以下场景:

  • 计数器:如统计网站访问次数、数据库操作计数等。
  • 线程安全变量更新:在不需要复杂锁机制的情况下,进行简单变量的线程安全更新。
  • 无锁并发编程:构建高性能的无锁数据结构,如无锁栈、无锁队列等。

AtomicIntegerArray介绍

AtomicIntegerArray是Java并发包(java.util.concurrent.atomic)中的一个类,它允许对整型数组的元素进行原子性的更新操作。与AtomicInteger相似,AtomicIntegerArray也是基于CAS(Compare and Swap)操作来保证更新操作的原子性,从而避免了传统的锁机制,减少了线程间的上下文切换,提升了并发性能。

底层实现

AtomicIntegerArray的底层同样依赖于JNI(Java Native Interface)调用,利用了CPU的原子指令来实现元素的原子性更新。这意味着,即使是多线程环境下对数组元素的修改,也能确保每次修改的完整性,不会出现数据不一致的问题。

使用方法

AtomicIntegerArray提供了针对数组元素的一系列原子操作方法,包括但不限于:

  • get(int index) :获取指定索引处的元素值。
  • set(int index, int newValue) :设置指定索引处的元素值,非原子操作,主要用于初始化。
  • getAndIncrement(int index) :原子性地将指定索引处的元素值加1并返回旧值。
  • getAndDecrement(int index) :原子性地将指定索引处的元素值减1并返回旧值。
  • compareAndSet(int index, int expectedValue, int newValue) :如果指定索引处的元素值等于预期值,则原子性地将其设置为新值,并返回true;否则不做操作并返回false。

应用场景

AtomicIntegerArray特别适合用于需要在多线程环境中对数组元素进行频繁更新的场景,例如:

  • 计数器数组:维护多个独立计数器,如统计不同类型事件的发生次数。
  • 状态标志数组:管理一组布尔型状态标志,每个元素代表一个独立的状态。
  • 并发队列:构建基于数组的简单无锁队列,通过索引控制入队和出队操作。

代码示例

public class AtomicIntegerArrayExample {

    public static void main(String[] args) {
        final int SIZE = 10; // 数组大小
        AtomicIntegerArray counts = new AtomicIntegerArray(SIZE); // 初始化数组
        
        ExecutorService executor = Executors.newFixedThreadPool(5); // 创建线程池

        // 提交任务到线程池,模拟多线程更新不同索引的计数器
        for (int i = 0; i < 5; i++) {
            final int threadIndex = i;
            executor.submit(() -> {
                for (int j = 0; j < 1000; j++) {
                    counts.getAndIncrement(threadIndex); // 每个线程更新对应索引的计数器
                }
            });
        }

        executor.shutdown(); // 关闭线程池
        while (!executor.isTerminated()) {} // 等待所有任务完成

        // 打印最终计数结果
        for (int i = 0; i < SIZE; i++) {
            System.out.println("Counter at index " + i + ": " + counts.get(i));
        }
    }
}

LongAdder介绍

LongAdder是Java并发包(java.util.concurrent.atomic)中的一个类,旨在提供在高并发环境下更高效的线程安全计数器。相比AtomicLongLongAdder通过牺牲一定的内存空间来换取更高的吞吐量,尤其适用于竞争激烈的并发场景。

其核心思想是分段更新LongAdder内部维护了一个base值和一个名为cells的Cell数组。当多个线程尝试更新值时,不是直接竞争base,而是尽可能地分散到不同的Cell中进行更新,这样大大减少了线程之间的竞争。每个Cell实质上是一个单独的AtomicLong,负责一部分计数工作。在无竞争或竞争较轻的情况下,直接更新base值;当竞争激烈时,会尝试将更新分散到较少竞争的Cell中,最后统计所有Cell和base的值得到总和。

底层实现

  1. 初始化LongAdder在开始时,只有base值,Cell数组为空。
  2. 更新:当调用add方法时,首先尝试直接更新base,如果竞争激烈,则尝试分配或使用已有的Cell数组进行更新。
  3. 读取:读取时,先读取base,然后遍历所有Cell并累加它们的值,最后求和返回总和。

方法

  • add(long x) :原子性地将给定值加到当前计数器上。
  • sumThenReset() :返回当前计数器的值并重置为0。
  • reset() :将计数器重置为0。
  • long sum() :返回当前计数器的总和。

应用场景

  • 性能监控:在高并发的Web服务器中记录请求总数、错误计数等。
  • 统计计数:大规模数据分析系统中,需要频繁更新的计数器,如日志处理中的事件计数。
  • 分布式系统指标:收集分布式系统中各个节点的局部统计信息,如活跃连接数。

代码示例

public class LongAdderExample {
    public static void main(String[] args) {
        LongAdder counter = new LongAdder();

        // 模拟多线程更新计数器
        Thread[] threads = new Thread[100];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    counter.increment();
                }
            });
            threads[i].start();
        }

        // 等待所有线程完成
        for (Thread t : threads) {
            try {
                t.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                e.printStackTrace();
            }
        }

        // 输出最终计数结果
        System.out.println("Total count: " + counter.sum());
    }
}