掘金 后端 ( ) • 2024-06-18 10:35

背景

image.png

该应用是个业务数据推送客户端,业务功能是从数据库中不断的取出数据,把数据按一定格式处理后推送给数仓前置,数仓前置服务是个java项目运行在一个10年前的小机器上(奔腾2030,2013年),并且需要对数据进行正则校验,数据量差不多1秒300个数据的写入,采用netty-client和netty-server来创建tcp通道。

单条数据平均约65个字符,占用66个字节位

问题的发现

应用宿主机监控告警,应用内存占用一直处于危险线,查看应用日志发现: 数据写入tcp通道时频繁出现无法分配内存的OOM异常错误。由于应用未配置内存溢出dump,修改启动参数后重新启动

这里提醒下应用已经要配置输出dump日志,以便于快速发现定位问题

  • 启动参数增加设定了HeapDumpOnOutOfMemoryErrorHeapDumpPathdump文件存储路径
java -jar -Xmx6G -Xms6G -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/serverPath/applicationName.hprof -XX:+UseG1GC app.jar > app.log &

image.png 通过jstat -gc [pid]查看内存状态,发现OU一直增长后无限接近于OC,FGCT次数上升但是不见OU占用的内存下降

image.png

处置过程

分析内存文件

通过JProfiler打开dump文件,查看堆内存占用

image.png

image.png

可以看到ChanelOutboundBuffer占据了4828M的内存

问题来源

我们来分析一下neety怎么写出一个tcp消息

通过nettyClient成功连接到接收方的nettyServer后,会创建一个channel对象,调用channel的writeAndFlush方法写入消息,这里我们将EventLoopGroup绑定到boot时,设定的channel是NioSocketChannel,本次按理以这个channel的writeAndFlush方法为例,由于NioSocketChannel没有实现这个方法,所以调用的实现是其父类AbstractNioByteChannel的父类AbstractNioChannel的父类AbstractChannel实现的

image.png

@Override
public ChannelFuture writeAndFlush(Object msg) {
    return pipeline.writeAndFlush(msg);
}

@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    return pipeline.writeAndFlush(msg, promise);
}

具体的调用写入的方法是

private void write(Object msg, boolean flush, ChannelPromise promise) {
    ObjectUtil.checkNotNull(msg, "msg");
    try {
        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return;
        }
    } catch (RuntimeException e) {
        ReferenceCountUtil.release(msg);
        throw e;
    }

    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
            // and put it back in the Recycler for re-use later.
            //
            // See https://github.com/netty/netty/issues/8343.
            task.cancel();
        }
    }
}

我们调用的是writeAndFlushed所以具体的执行是这个方法

void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

private void invokeFlush0() {
    try {
        ((ChannelOutboundHandler) handler()).flush(this);
    } catch (Throwable t) {
        invokeExceptionCaught(t);
    }
}

这一部分的方法执行的过程就是把消息写入到ChannelOutboundHandler后调用flush写入到outboundBuffer上,这个具体的是调用AbstractUnsafe的writeflush,其中执行的是ChannelOutboundBuffer的addMessageaddFlush

/**
 * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
 * the message was written.
 */
public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
    }
    tailEntry = entry;
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    // Touch the message to make it easier to debug buffer leaks.

    // this save both checking against the ReferenceCounted interface
    // and makes better use of virtual calls vs interface ones
    if (msg instanceof AbstractReferenceCountedByteBuf) {
        ((AbstractReferenceCountedByteBuf) msg).touch();
    } else {
        ReferenceCountUtil.touch(msg);
    }

    // 将消息添加到未刷新数组后,增加挂起的字节数
    incrementPendingOutboundBytes(entry.pendingSize, false);
}

/**
 * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
 * and so you will be able to handle them.
 */
public void addFlush() {
    // There is no need to process all entries if there was already a flush before and no new messages
    // where added in the meantime.
    //
    // See https://github.com/netty/netty/issues/2577
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            // there is no flushedEntry yet, so start with the entry
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                // Was cancelled so make sure we free up memory and notify about the freed bytes
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);

        // All flushed so reset unflushedEntry
        unflushedEntry = null;
    }
}

image.png

写完entry后会触发flush0()

protected void flush0() {
    if (inFlush0) {
        // Avoid re-entrance
        return;
    }

    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
        return;
    }

    inFlush0 = true;

    // Mark all pending write requests as failure if the channel is inactive.
    if (!isActive()) {
        try {
            // Check if we need to generate the exception at all.
            if (!outboundBuffer.isEmpty()) {
                if (isOpen()) {
                    outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                } else {
                    // Do not trigger channelWritabilityChanged because the channel is closed already.
                    outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
                }
            }
        } finally {
            inFlush0 = false;
        }
        return;
    }

    try {
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        handleWriteError(t);
    } finally {
        inFlush0 = false;
    }
}

这里会调用doWrite()

##
/**
 * Returns the maximum loop count for a write operation until
 * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
 * It is similar to what a spin lock is used for in concurrency programming.
 * It improves memory utilization and write throughput depending on
 * the platform that JVM runs on.  The default value is {@code 16}.
 */
int getWriteSpinCount();

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    SocketChannel ch = javaChannel();
    int writeSpinCount = config().getWriteSpinCount();
    do {
        if (in.isEmpty()) {
            // All written so clear OP_WRITE
            clearOpWrite();
            // Directly return here so incompleteWrite(...) is not called.
            return;
        }

        // Ensure the pending writes are made of ByteBufs only.
        int maxBytesPerGatheringWrite = ((NioDomainSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
        ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
        int nioBufferCnt = in.nioBufferCount();

        // Always use nioBuffers() to workaround data-corruption.
        // See https://github.com/netty/netty/issues/2761
        switch (nioBufferCnt) {
            case 0:
                // We have something else beside ByteBuffers to write so fallback to normal writes.
                writeSpinCount -= doWrite0(in);
                break;
            case 1: {
                // Only one ByteBuf so use non-gathering write
                // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                // to check if the total size of all the buffers is non-zero.
                ByteBuffer buffer = nioBuffers[0];
                int attemptedBytes = buffer.remaining();
                final int localWrittenBytes = ch.write(buffer);
                if (localWrittenBytes <= 0) {
                    incompleteWrite(true);
                    return;
                }
                //重新计算下次写入的大小
                adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
            default: {
                // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                // to check if the total size of all the buffers is non-zero.
                // We limit the max amount to int above so cast is safe
                long attemptedBytes = in.nioBufferSize();
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                if (localWrittenBytes <= 0) {
                    incompleteWrite(true);
                    return;
                }
                // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
                adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                        maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
        }
    } while (writeSpinCount > 0);

    incompleteWrite(writeSpinCount < 0);
}

这里会循环16(默认配置)次去写入,看到这里好像我们知道了writeAndFlush会一直往buffer中写数据,当buffer写到网络中去的速度大于写入到buffer的速度时,就会出现堆积,从而导致OOM

解决

我们在分析netty源码的时候发现,再addMessage和addFlush时,分别有2个方法来计数写入多少(incrementPendingOutboundBytes)写完多少(decrementPendingOutboundBytes)

/**
 * Increment the pending bytes which will be written at some point.
 * This method is thread-safe!
 */
void incrementPendingOutboundBytes(long size) {
    incrementPendingOutboundBytes(size, true);
}

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }

    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        setUnwritable(invokeLater);
    }
}

/**
 * Decrement the pending bytes which will be written at some point.
 * This method is thread-safe!
 */
void decrementPendingOutboundBytes(long size) {
    decrementPendingOutboundBytes(size, true, true);
}

private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
    if (size == 0) {
        return;
    }

    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
        setWritable(invokeLater);
    }
}

private void setWritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue & ~1;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue != 0 && newValue == 0) {
                fireChannelWritabilityChanged(invokeLater);
            }
            break;
        }
    }
}

private void setUnwritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue | 1;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue == 0) {
                fireChannelWritabilityChanged(invokeLater);
            }
            break;
        }
    }
}

这里在incr和decr的时候会去计算buffer里的数量是否达到水位线

public final class WriteBufferWaterMark {

    private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
    private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;

    public static final WriteBufferWaterMark DEFAULT =
            new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK, false);

在创建channel的时候配置合适的水位值,和在写入的时候判断一下channel.isWritable()的返回值

/**
 * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
 * not exceed the write watermark of the {@link Channel} and
 * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
 * {@code false}.
 * 当且仅当{@linkplain totalPendingWriteBytes()待挂字节总数}未超过{@link Channel}的写入水印}且未将{@linkplain setUserDefinedWritability(int, boolean)用户自定义可写性标志}设置为{@code false}时返回{@code true}。
 */
public boolean isWritable() {
    return unwritable == 0;
}

image.png