掘金 后端 ( ) • 2023-05-26 11:15

缓存池 BufferPool

大家一定都了解Java的线程池,线程池有什么好处呢?如果没有线程池,我们每次创建线程都要新建一个线程,这样对CPU的消耗比较大。那么利用线程池我们可以对已经创建好的线程复用,线程就不用频繁创建和销毁了。

同样,我们的内存池也是这个原理,producerBatch需要空间存储消息的时候,就去缓存池申请一块内存,而不用频繁地创建和销毁内存,也就避免了频繁地GC

BufferPool简介

下面的结构图简单说明了BufferPool的组成结构和处理缓存的流程:

8938D7B4-99FC-49B0-8EAB-195FD929477B.png

整个BufferPool的大小默认为32M,内部内存区域分为两块:固定大小内存块集合free、非池化缓存nonPooledAvailableMemory。固定大小内存块默认大小为16k。当ProducerBatch向BufferPool申请一个大小为size的内存块时,BufferPool会根据size的大小判断由哪个内存区域分配内存块。同时,free和nonPooledAvailableMemory这两块区域的内存可以交换。

接下来,我们通过代码来学习Kafka底层提供的高效的内存池设计。

类BufferPool

重要字段如下:

public class BufferPool {

    static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time";

    private final long totalMemory;//默认32M
    private final int poolableSize;//池化大小16k
    private final ReentrantLock lock;//分配和回收时用的锁。
    private final Deque<ByteBuffer> free;//池化的内存
    private final Deque<Condition> waiters;//阻塞线程对应的Condition集合
    private long nonPooledAvailableMemory;//非池化可使用的内存
  • totalMemory:整个BufferPool内存大小,默认是32M。
  • poolableSize:池化缓存区一块内存块的大小,默认是16k。
  • lock:类型是ReentrantLock。因为会有多线程并发和回收ByteBuffer,所以使用锁控制并发,保证了线程的安全。
  • free:类型是Deque<ByteBuffer>。缓存了指定大小的ByteBuffer对象。
  • waiters:类型是Deque<Condition>队列。因为会有申请不到足够内存的线程,线程为了等待其他线程释放内存而阻塞等待,对应的Condition对象会进入该队列。
  • nonPooledAvailableMemory:非池化可使用的内存。

接下来,我再来介绍下重要的方法。

allocate()方法是向BufferPool申请ByteBuffer。

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
    //1.验证申请的内存是否大于总内存
    if (size > this.totalMemory)
        throw new IllegalArgumentException("Attempt to allocate " + size
                                           + " bytes, but there is a hard limit of "
                                           + this.totalMemory
                                           + " on memory allocations.");

    ByteBuffer buffer = null;
    //2.加锁,保证线程安全。
    this.lock.lock();

    if (this.closed) {
        this.lock.unlock();
        throw new KafkaException("Producer closed while allocating memory");
    }

    try {
        // check if we have a free buffer of the right size pooled
        //3.申请内存的大小是否是池化的内存大小,16k
        if (size == poolableSize && !this.free.isEmpty())
            //如果是就从池里Bytebuffer
            return this.free.pollFirst();
        // 池化内存空间的大小
        int freeListSize = freeSize() * this.poolableSize;
        //4.如果非池化可以空间加池化内存空间大于等于要申请的空间
        if (this.nonPooledAvailableMemory + freeListSize >= size) {
     
            // 如果申请的空间大小小于池化的大小,就从free队列里拿出一个池化的大小的Bytebuffer加到nonPooledAvailableMemory中
            // 5.如果一个池化的大小的Bytebuffer不满足size,就持续释放池化内存Bytebuffer直到满足为止。
            freeUp(size);
            this.nonPooledAvailableMemory -= size;
            //如果非池化可以空间加池化内存空间大于要申请的空间
        } else {
            // we are out of memory and will have to block
            int accumulated = 0;
            //创建对应的Condition
            Condition moreMemory = this.lock.newCondition();
            try {
                //线程最长阻塞时间
                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                //放入waiters集合中
                this.waiters.addLast(moreMemory);
               
                // 没有足够的空间就一直循环
                while (accumulated < size) {
                    long startWaitNs = time.nanoseconds();
                    long timeNs;
                    boolean waitingTimeElapsed;
                    try {
                        //空间不够就阻塞,并设置超时时间。
                        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                    } finally {
                        long endWaitNs = time.nanoseconds();
                        timeNs = Math.max(0L, endWaitNs - startWaitNs);
                        recordWaitTime(timeNs);
                    }

                    if (this.closed)
                        throw new KafkaException("Producer closed while allocating memory");

                    if (waitingTimeElapsed) {
                        this.metrics.sensor("buffer-exhausted-records").record();
                        throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                    }

                    remainingTimeToBlockNs -= timeNs;

                    // check if we can satisfy this request from the free list,
                    // otherwise allocate memory
                    //ByteBuffer池化集合里是否有元素
                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                        // just grab a buffer from the free list
                        buffer = this.free.pollFirst();
                        accumulated = size;
                    } else {
                        //尝试给nonPooledAvailableMemory扩容
                        freeUp(size - accumulated);
                        int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                        this.nonPooledAvailableMemory -= got;
                        //累计分配了多少空间
                        accumulated += got;
                    }
                }
                accumulated = 0;
            } finally {
                this.nonPooledAvailableMemory += accumulated;//把已经分配的内存还回nonPooledAvailableMemory
                this.waiters.remove(moreMemory);//删除对应的condition
            }
        }
    } finally {
        // signal any additional waiters if there is more memory left
        // over for them
        try {
            if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
                this.waiters.peekFirst().signal();
        } finally {
            // Another finally... otherwise find bugs complains
            lock.unlock();
        }
    }

    if (buffer == null)
        //非池化ByteBuffer分配内存
        return safeAllocateByteBuffer(size);
    else
        return buffer;
}

这里先明确三个变量:

  • free:由固定大小ByteBuffer组成的集合。
  • nonPooledAvailableMemory:非池化可利用的内存。
  • size:申请的ByteBuffer大小。

第一步,验证申请的空间大小size是否大于总内存,BufferPool的总内存默认是32M。如果比总内存还大,就抛出异常。

第二步,因为会涉及到Deque<ByteBuffer>的操作,而Deque<ByteBuffer>不是线程安全的,这里要加锁,防止多线程操作引起的问题。

第三步,如果free不为空,而且申请的空间size和free的元素的大小相同,就从free中拿出一个ByteBuffer并返回,ByteBuffer申请成功。

第四步,如果不满足上述条件,free加上nonPooledAvailableMemory比要申请的大,就调用freeUp(size)方法凑齐足够的空间给size。

freeUp(size)方法源码参考下面:

private void freeUp(int size) {
    while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
        this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}

只要固定大小ByteBuffer集合不为空且非池化可利用空间小于申请的size,就不断从free里往nonPooledAvailableMemory添加ByteBuffer,直到满足size的大小。

第五步,如果nonPooledAvailableMemory加上free的空间小于size的大小,就意味着现在的剩余空间满足不了size的大小。要随着其他线程对内存的释放一点点累加到满足size大小。那该怎么办呢?

首先定义int型变量accumulated作为标记已经获得了多大的空间。定义这个线程的Condition,并放入Condition的集合waiters中。然后进入到while循环中,当累加的空间和size一样大了才跳出循环。进入while循环后先通过await()阻塞线程,等待其他线程释放内存。当其他线程释放内存时,会唤醒这里的阻塞。假如有线程释放内存且唤醒这里的阻塞了,那么先看size是否满足释放free里的ByteBuffer的条件,如果满足就从free里取出一个ByteBuffer,否则再调用freeUp()给nonPooledAvailableMemory扩容。如果累计的空间还是不满足size的大小,那就再次await()等待下次有线程释放空间。

我们再来分析下释放空间的代码,deallocate()方法:

public void deallocate(ByteBuffer buffer, int size) {
    lock.lock();
    try {
        //如果是池化ByteBuffer大小的ByteBuffer
        if (size == this.poolableSize && size == buffer.capacity()) {
            buffer.clear();
            this.free.add(buffer);
        } else {
            //否则释放到nonPooledAvailableMemory
            this.nonPooledAvailableMemory += size;
        }
        //拿出一个condition,并signal,唤醒阻塞。
        Condition moreMem = this.waiters.peekFirst();
        if (moreMem != null)
            moreMem.signal();
    } finally {
        lock.unlock();
    }
}

先判断size是否和池化ByteBuffer的大小一样,如果满足就把要释放的ByteBuffer放回free里,否则非池化可利用缓存会回收这个ByteBuffer。因为有ByteBuffer回收了,我们就要看阻塞线程的Condition集合waiters是否为空,如果不为空就取第一个Condition并唤醒阻塞。