掘金 后端 ( ) • 2024-05-20 10:32

今天想和大家聊一下,最近线上遇到的一个问题。这个问题是隔壁团队爆出来的,由于我跟他们组长关系还不错,就一起帮着看一下了,没想到一看还真让我找到了。最后发现原因其实不复杂,主要是排查过程上可能走了一些弯路。为了保护公司隐私,我会在后续的描述中把业务上的内容先屏蔽掉。

问题背景

近期,公司对接了一个大客户,该客户会向我们平台RabbitMQ发送相关的硬件数据。平台针对这类硬件数据,采用了InfluxDB来进行存储,以往都没有出现过问题。但是在对接该客户的数据后,RabbitMQ堵了。当时隔壁组的第一反应是去查看消费者的执行逻辑,看一下是否存在需要优化的点。事实是当时的消费者逻辑确实也存在问题,他们优化了一版之后,立刻进行了上线。然而消息堆积的现象依然会出现,最后没有办法,只能定时重启InfluxDB。其实到这里大家肯定猜到是InfluxDB的问题了,但实际上却不能说完全是InfluxDB的问题。

原因排查

循着背景我继续往下说,产生消息堆积的直接原因肯定是消费者处理速度跟不上。那么造成处理速度缓慢的原因又是什么呢?首先我想到的是消费者内部的处理逻辑存在长耗时的操作,但是排查下来都很正常。那显然不是程序逻辑的问题了。接着肯定是要排查中间件的问题了,我问了下隔壁组,他们说InfluxDB的CPU、IO占用都很正常。到这里就有点奇怪了,但是本着怀疑的精神我还是去看了他们组的InfluxDB监控,确实如他们所说,CPU、IO都没到瓶颈。但是我却发现了大量的Client Error,这表明有大量的客户端在请求连接InfluxDB的时候失败了。联想到RabbitMQ消息堆积的现象,我初步判断,应该是RabbitMQ消息产生的速度过快,导致消费者一直要去连接InfluxDB发送处理后的数据,最终导致InfluxDB连接耗尽,后续的消费者线程无法再与InfluxDB发起连接。

image.png 接着我去看了RabbitMQ那边的情况,有几个队列会周期性的发送大量消息,这个数量下确实会导致消费者频繁地申请InfluxDB连接。可能有的朋友会说,难道你们没有使用连接池吗?我去查了下源码,它还真没有用【手动捂脸】。这个消费者服务引用的InfluxDB组件是GitHub上的一个开源项目 有好多年没更新了。在读写数据过程中全是直接申请InfluxDB连接的,这种操作,在并发不高的情形下估计还能撑住,一旦并发量上来了,InfluxDB必然不可用了,就这消息不堆积都有鬼了。另外我发现,这个InfluxDB的配置项里面最大连接数设置的是100,我看着CPU和IO的数据,陷入了沉思(emmmm...)

image.png

解决方案

通过上面的原因分析知道,问题产生的根本原因就是没有对数据库连接数做控制。那解决的方案就是加个连接池呗,然后说到连接池这玩意儿,我之前是真没自己实现过。本着求真务实的态度,怎么着也得试试。

连接池原理

在说连接池以前先说一下对象池。众所周知,Java是一门面向对象的语言,JVM中每时每刻都在创造着新的对象。这其中有一些对象其实没有必要一直重复创建销毁的过程。过日子就得讲究一个精打细算嘛,能重复利用咱就别折腾了。这部分对象给他分配个池子,让它们泡着就行。谁要用拿去用就是,用完记得还回来就行。连接池的原理其实也是这样,我们可以定义好一组数据库连接对象实例,由需要发起数据库连接的线程自己去借用,然后完成对应的数据库操作,这样的话,无论有多少线程需要发起数据库连接,他们能创建的数据库连接的上限都不会超多连接池指定的上限。关于线程等待的问题,就涉及到连接池参数调优的问题了,这里就不讨论了。

基于连接池重写InfluxDB连接组件

连接池的实现我们可以借助Apache Commons Pool来完成。它主要由几个关键接口和类组成。最核心的是ObjectPool接口,它定义了对象池的基本操作,例如借用对象(borrowObject)、返还对象(returnObject)等。然后是PooledObjectFactory接口,这是一个用于创建和管理池对象生命周期的工厂接口。通常,当我们需要将自定义的对象放入对象池时,就需要实现这个接口。说干就干,代码走起

1、 实现PooledObjectFactory,创建一个对象工厂

package org.springframework.data.influxdb;


import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Pong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author Richard
 */
public class InfluxDBPooledObjectFactory implements PooledObjectFactory<InfluxDB> {
    private static final Logger logger = LoggerFactory.getLogger(InfluxDBPooledObjectFactory.class);

    private final InfluxDBProperties properties;

    public InfluxDBPooledObjectFactory(InfluxDBProperties properties) {
        this.properties = properties;
    }

    /**
     * 重新初始化要由池返回的实例-即从池中借用一个对象时调用
     *
     * @param pooledObject 一个PooledObject包装要激活的实例
     * @throws Exception
     */
    @Override
    public void activateObject(PooledObject<InfluxDB> pooledObject) throws Exception {
        logger.debug("InfluxDB connection was borrowed!");
    }

    /**
     * 使用默认 (NORMAL) DestroyMode 销毁池不再需要的实例。
     *
     * @param pooledObject
     * @throws Exception
     */
    @Override
    public void destroyObject(PooledObject<InfluxDB> pooledObject) throws Exception {
        InfluxDB influxDBClient = pooledObject.getObject();
        influxDBClient.close();
    }

    /**
     * 创建可由池提供服务的实例,并将其包装在由池管理的PooledObject中
     *
     * @return
     * @throws Exception
     */
    @Override
    public PooledObject<InfluxDB> makeObject() throws Exception {
        InfluxDB connection = InfluxDBFactory.connect(properties.getUrl(), properties.getUsername(),
                properties.getPassword());
        if (properties.isGzip()) {
            connection.enableGzip();
        }
        return new DefaultPooledObject<>(connection);
    }

    /**
     * 取消初始化要返回到空闲对象池的实例-即从池中归还一个对象时调用
     *
     * @param pooledObject
     * @throws Exception
     */
    @Override
    public void passivateObject(PooledObject<InfluxDB> pooledObject) throws Exception {
        logger.debug("InfluxDB connection was returned!");
    }

    /**
     * 确保实例可以安全地由池返回。
     *
     * @param pooledObject
     * @return 如果obj无效并且应该从池中删除,则为false ,否则为true
     */
    @Override
    public boolean validateObject(PooledObject<InfluxDB> pooledObject) {
        InfluxDB influxDBClient = pooledObject.getObject();
        Pong pong = influxDBClient.ping();
        return pong.isGood();
    }
}

2、创建对象池,继承GenericObjectPool

package org.springframework.data.influxdb;

import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.AbandonedConfig;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.influxdb.InfluxDB;

/**
 * @author Richard
 */
public class InfluxDBClientPool extends GenericObjectPool<InfluxDB> {
    public InfluxDBClientPool(PooledObjectFactory<InfluxDB> factory) {
        super(factory);
    }

    public InfluxDBClientPool(PooledObjectFactory<InfluxDB> factory, GenericObjectPoolConfig<InfluxDB> config) {
        super(factory, config);
    }

    public InfluxDBClientPool(PooledObjectFactory<InfluxDB> factory, GenericObjectPoolConfig<InfluxDB> config, AbandonedConfig abandonedConfig) {
        super(factory, config, abandonedConfig);
    }
}

3、创建对象池自动装配配置类,将对象池做成一个Bean

package org.springframework.data.influxdb;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.influxdb.InfluxDB;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PreDestroy;

/**
 * 对象池自动装配
 *
 * @author Richard
 */
@Configuration
@EnableConfigurationProperties(InfluxDBProperties.class)
public class InfluxDBPoolAutoConfig {

    private InfluxDBClientPool pool;


    @Bean("influxDBClientPool")
    protected InfluxDBClientPool createInfluxDBClientPool(InfluxDBProperties properties) {
        // 创建对象工厂
        InfluxDBPooledObjectFactory factory = new InfluxDBPooledObjectFactory(properties);
        // 设置对象池相关参数
        GenericObjectPoolConfig<InfluxDB> poolConfig = new GenericObjectPoolConfig<>();
        InfluxDBProperties.Pool poolProperty = properties.getPool();
        poolConfig.setMaxIdle(poolProperty.getMaxIdle());
        poolConfig.setMaxTotal(poolProperty.getMaxActive());
        poolConfig.setMinIdle(poolProperty.getMinIdle());
        poolConfig.setMaxWait(poolProperty.getMaxWait());
        poolConfig.setTimeBetweenEvictionRuns(poolProperty.getTimeBetweenEvictionRuns());
        poolConfig.setBlockWhenExhausted(true);
        poolConfig.setTestOnBorrow(true);
        poolConfig.setTestOnReturn(true);
        poolConfig.setTestWhileIdle(true);
        //一定要关闭jmx,不然springboot启动会报已经注册了某个jmx的错误
        poolConfig.setJmxEnabled(false);
        // 新建一个对象池,传入对象工厂和配置
        pool = new InfluxDBClientPool(factory, poolConfig);
        initPool(poolProperty.getMinIdle(), poolProperty.getMaxIdle());
        return pool;
    }

    /**
     * 预先加载testObject对象到对象池中
     *
     * @param initialSize 初始化连接数
     * @param maxIdle     最大空闲连接数
     */
    private void initPool(int initialSize, int maxIdle) {
        if (initialSize <= 0) {
            return;
        }

        int size = Math.min(initialSize, maxIdle);
        for (int i = 0; i < size; i++) {
            try {
                pool.addObject();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    @PreDestroy
    public void destroy() {
        if (pool != null) {
            pool.close();
        }
    }
}

4、重写InfluxDBConnectionFactory

package org.springframework.data.influxdb;

import org.influxdb.InfluxDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;

public class InfluxDBConnectionFactory implements InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(InfluxDBConnectionFactory.class);


    private InfluxDBProperties properties;

    private InfluxDBClientPool pool;

    public InfluxDBConnectionFactory() {

    }

    public InfluxDBConnectionFactory(final InfluxDBProperties properties, final InfluxDBClientPool pool) {
        this.properties = properties;
        this.pool = pool;
    }

    public InfluxDB getConnection() {
        try {
            return pool.borrowObject();
        } catch (Exception e) {
            logger.error("Error while getting connection from pool", e);
            throw new RuntimeException("Fail to get InfluxDB connection");
        }
    }

    public void returnConnection(InfluxDB connection) {
        try {
            if (connection != null) {
                pool.returnObject(connection);
            }
        } catch (Exception e) {
            logger.error("Error while returning connection to pool", e);
            throw new RuntimeException("Fail to return InfluxDB connection");
        }
    }

    /**
     * Returns the configuration properties.
     *
     * @return Returns the configuration properties
     */
    public InfluxDBProperties getProperties() {
        return properties;
    }

    /**
     * Sets the configuration properties.
     *
     * @param properties The configuration properties to set
     */
    public void setProperties(final InfluxDBProperties properties) {
        this.properties = properties;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Assert.notNull(getProperties(), "InfluxDBProperties are required");
    }
}

5、重写InfluxDBTemplate

package org.springframework.data.influxdb;

import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.springframework.data.influxdb.converter.PointCollectionConverter;
import org.springframework.util.Assert;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class InfluxDBTemplate<T> extends InfluxDBAccessor implements InfluxDBOperations<T> {
    private PointCollectionConverter<T> converter;

    public InfluxDBTemplate() {

    }

    public InfluxDBTemplate(final InfluxDBConnectionFactory connectionFactory, final PointCollectionConverter<T> converter) {
        this.setConnectionFactory(connectionFactory);
        this.setConverter(converter);
    }


    public void setConverter(final PointCollectionConverter<T> converter) {
        this.converter = converter;
    }

    @Override
    public void afterPropertiesSet() {
        super.afterPropertiesSet();
        Assert.notNull(converter, "PointCollectionConverter is required");
    }

    @Override
    public void createDatabase() {
        InfluxDB connection = null;
        try {
            connection = super.getConnection();
            final String database = super.getDatabase();
            connection.createDatabase(database);
        } finally {
            super.returnConnection(connection);
        }
    }

    @Override
    @SuppressWarnings("unchecked")
    public void write(final T... payload) {
        write(Arrays.asList(payload));
    }

    @Override
    public void write(final List<T> payload) {
        InfluxDB connection = null;
        try {
            final String database = getDatabase();
            final String retentionPolicy = getConnectionFactory().getProperties().getRetentionPolicy();
            final BatchPoints ops = BatchPoints.database(database)
                    .retentionPolicy(retentionPolicy)
                    .consistency(InfluxDB.ConsistencyLevel.ALL)
                    .build();
            payload.forEach(t -> Objects.requireNonNull(converter.convert(t)).forEach(ops::point));
            connection = super.getConnection();
            connection.write(ops);
        } finally {
            super.returnConnection(connection);
        }
    }

    @Override
    public QueryResult query(final Query query) {
        InfluxDB connection = null;
        try {
            connection = super.getConnection();
            return connection.query(query);
        } finally {
            super.returnConnection(connection);
        }
    }

    @Override
    public QueryResult query(final Query query, final TimeUnit timeUnit) {
        InfluxDB connection = null;
        try {
            connection = super.getConnection();
            return connection.query(query, timeUnit);
        } finally {
            super.returnConnection(connection);
        }
    }

    @Override
    public void query(Query query, int chunkSize, Consumer<QueryResult> consumer) {
        InfluxDB connection = null;
        try {
            connection = super.getConnection();
            connection.query(query, chunkSize, consumer);
        } finally {
            super.returnConnection(connection);
        }
    }

    @Override
    public Pong ping() {
        InfluxDB connection = null;
        try {
            connection = super.getConnection();
            return connection.ping();
        } finally {
            super.returnConnection(connection);
        }
    }

    @Override
    public String version() {
        InfluxDB connection = null;
        try {
            connection = super.getConnection();
            return connection.version();
        } finally {
            super.returnConnection(connection);
        }
    }
}

结果验证

改完之后,跑了个测试验证了下,可以发现已经实现了数据库连接的复用了,剩下的就是改bug了,这个季度的OKR有了这不是【手动狗头】

image.png

结语

兄弟们,如果觉得我写的还不错的话,给我点个赞、关个注吧,谢谢你看到这~