今天想和大家聊一下,最近线上遇到的一个问题。这个问题是隔壁团队爆出来的,由于我跟他们组长关系还不错,就一起帮着看一下了,没想到一看还真让我找到了。最后发现原因其实不复杂,主要是排查过程上可能走了一些弯路。为了保护公司隐私,我会在后续的描述中把业务上的内容先屏蔽掉。
问题背景
近期,公司对接了一个大客户,该客户会向我们平台RabbitMQ发送相关的硬件数据。平台针对这类硬件数据,采用了InfluxDB来进行存储,以往都没有出现过问题。但是在对接该客户的数据后,RabbitMQ堵了。当时隔壁组的第一反应是去查看消费者的执行逻辑,看一下是否存在需要优化的点。事实是当时的消费者逻辑确实也存在问题,他们优化了一版之后,立刻进行了上线。然而消息堆积的现象依然会出现,最后没有办法,只能定时重启InfluxDB。其实到这里大家肯定猜到是InfluxDB的问题了,但实际上却不能说完全是InfluxDB的问题。
原因排查
循着背景我继续往下说,产生消息堆积的直接原因肯定是消费者处理速度跟不上。那么造成处理速度缓慢的原因又是什么呢?首先我想到的是消费者内部的处理逻辑存在长耗时的操作,但是排查下来都很正常。那显然不是程序逻辑的问题了。接着肯定是要排查中间件的问题了,我问了下隔壁组,他们说InfluxDB的CPU、IO占用都很正常。到这里就有点奇怪了,但是本着怀疑的精神我还是去看了他们组的InfluxDB监控,确实如他们所说,CPU、IO都没到瓶颈。但是我却发现了大量的Client Error,这表明有大量的客户端在请求连接InfluxDB的时候失败了。联想到RabbitMQ消息堆积的现象,我初步判断,应该是RabbitMQ消息产生的速度过快,导致消费者一直要去连接InfluxDB发送处理后的数据,最终导致InfluxDB连接耗尽,后续的消费者线程无法再与InfluxDB发起连接。
接着我去看了RabbitMQ那边的情况,有几个队列会周期性的发送大量消息,这个数量下确实会导致消费者频繁地申请InfluxDB连接。可能有的朋友会说,难道你们没有使用连接池吗?我去查了下源码,它还真没有用【手动捂脸】。这个消费者服务引用的InfluxDB组件是GitHub上的一个开源项目 有好多年没更新了。在读写数据过程中全是直接申请InfluxDB连接的,这种操作,在并发不高的情形下估计还能撑住,一旦并发量上来了,InfluxDB必然不可用了,就这消息不堆积都有鬼了。另外我发现,这个InfluxDB的配置项里面最大连接数设置的是100,我看着CPU和IO的数据,陷入了沉思(emmmm...)
解决方案
通过上面的原因分析知道,问题产生的根本原因就是没有对数据库连接数做控制。那解决的方案就是加个连接池呗,然后说到连接池这玩意儿,我之前是真没自己实现过。本着求真务实的态度,怎么着也得试试。
连接池原理
在说连接池以前先说一下对象池。众所周知,Java是一门面向对象的语言,JVM中每时每刻都在创造着新的对象。这其中有一些对象其实没有必要一直重复创建销毁的过程。过日子就得讲究一个精打细算嘛,能重复利用咱就别折腾了。这部分对象给他分配个池子,让它们泡着就行。谁要用拿去用就是,用完记得还回来就行。连接池的原理其实也是这样,我们可以定义好一组数据库连接对象实例,由需要发起数据库连接的线程自己去借用,然后完成对应的数据库操作,这样的话,无论有多少线程需要发起数据库连接,他们能创建的数据库连接的上限都不会超多连接池指定的上限。关于线程等待的问题,就涉及到连接池参数调优的问题了,这里就不讨论了。
基于连接池重写InfluxDB
连接组件
连接池的实现我们可以借助Apache Commons Pool来完成。它主要由几个关键接口和类组成。最核心的是ObjectPool
接口,它定义了对象池的基本操作,例如借用对象(borrowObject
)、返还对象(returnObject
)等。然后是PooledObjectFactory
接口,这是一个用于创建和管理池对象生命周期的工厂接口。通常,当我们需要将自定义的对象放入对象池时,就需要实现这个接口。说干就干,代码走起
1、 实现PooledObjectFactory
,创建一个对象工厂
package org.springframework.data.influxdb;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Pong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Richard
*/
public class InfluxDBPooledObjectFactory implements PooledObjectFactory<InfluxDB> {
private static final Logger logger = LoggerFactory.getLogger(InfluxDBPooledObjectFactory.class);
private final InfluxDBProperties properties;
public InfluxDBPooledObjectFactory(InfluxDBProperties properties) {
this.properties = properties;
}
/**
* 重新初始化要由池返回的实例-即从池中借用一个对象时调用
*
* @param pooledObject 一个PooledObject包装要激活的实例
* @throws Exception
*/
@Override
public void activateObject(PooledObject<InfluxDB> pooledObject) throws Exception {
logger.debug("InfluxDB connection was borrowed!");
}
/**
* 使用默认 (NORMAL) DestroyMode 销毁池不再需要的实例。
*
* @param pooledObject
* @throws Exception
*/
@Override
public void destroyObject(PooledObject<InfluxDB> pooledObject) throws Exception {
InfluxDB influxDBClient = pooledObject.getObject();
influxDBClient.close();
}
/**
* 创建可由池提供服务的实例,并将其包装在由池管理的PooledObject中
*
* @return
* @throws Exception
*/
@Override
public PooledObject<InfluxDB> makeObject() throws Exception {
InfluxDB connection = InfluxDBFactory.connect(properties.getUrl(), properties.getUsername(),
properties.getPassword());
if (properties.isGzip()) {
connection.enableGzip();
}
return new DefaultPooledObject<>(connection);
}
/**
* 取消初始化要返回到空闲对象池的实例-即从池中归还一个对象时调用
*
* @param pooledObject
* @throws Exception
*/
@Override
public void passivateObject(PooledObject<InfluxDB> pooledObject) throws Exception {
logger.debug("InfluxDB connection was returned!");
}
/**
* 确保实例可以安全地由池返回。
*
* @param pooledObject
* @return 如果obj无效并且应该从池中删除,则为false ,否则为true
*/
@Override
public boolean validateObject(PooledObject<InfluxDB> pooledObject) {
InfluxDB influxDBClient = pooledObject.getObject();
Pong pong = influxDBClient.ping();
return pong.isGood();
}
}
2、创建对象池,继承GenericObjectPool
package org.springframework.data.influxdb;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.AbandonedConfig;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.influxdb.InfluxDB;
/**
* @author Richard
*/
public class InfluxDBClientPool extends GenericObjectPool<InfluxDB> {
public InfluxDBClientPool(PooledObjectFactory<InfluxDB> factory) {
super(factory);
}
public InfluxDBClientPool(PooledObjectFactory<InfluxDB> factory, GenericObjectPoolConfig<InfluxDB> config) {
super(factory, config);
}
public InfluxDBClientPool(PooledObjectFactory<InfluxDB> factory, GenericObjectPoolConfig<InfluxDB> config, AbandonedConfig abandonedConfig) {
super(factory, config, abandonedConfig);
}
}
3、创建对象池自动装配配置类,将对象池做成一个Bean
package org.springframework.data.influxdb;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.influxdb.InfluxDB;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PreDestroy;
/**
* 对象池自动装配
*
* @author Richard
*/
@Configuration
@EnableConfigurationProperties(InfluxDBProperties.class)
public class InfluxDBPoolAutoConfig {
private InfluxDBClientPool pool;
@Bean("influxDBClientPool")
protected InfluxDBClientPool createInfluxDBClientPool(InfluxDBProperties properties) {
// 创建对象工厂
InfluxDBPooledObjectFactory factory = new InfluxDBPooledObjectFactory(properties);
// 设置对象池相关参数
GenericObjectPoolConfig<InfluxDB> poolConfig = new GenericObjectPoolConfig<>();
InfluxDBProperties.Pool poolProperty = properties.getPool();
poolConfig.setMaxIdle(poolProperty.getMaxIdle());
poolConfig.setMaxTotal(poolProperty.getMaxActive());
poolConfig.setMinIdle(poolProperty.getMinIdle());
poolConfig.setMaxWait(poolProperty.getMaxWait());
poolConfig.setTimeBetweenEvictionRuns(poolProperty.getTimeBetweenEvictionRuns());
poolConfig.setBlockWhenExhausted(true);
poolConfig.setTestOnBorrow(true);
poolConfig.setTestOnReturn(true);
poolConfig.setTestWhileIdle(true);
//一定要关闭jmx,不然springboot启动会报已经注册了某个jmx的错误
poolConfig.setJmxEnabled(false);
// 新建一个对象池,传入对象工厂和配置
pool = new InfluxDBClientPool(factory, poolConfig);
initPool(poolProperty.getMinIdle(), poolProperty.getMaxIdle());
return pool;
}
/**
* 预先加载testObject对象到对象池中
*
* @param initialSize 初始化连接数
* @param maxIdle 最大空闲连接数
*/
private void initPool(int initialSize, int maxIdle) {
if (initialSize <= 0) {
return;
}
int size = Math.min(initialSize, maxIdle);
for (int i = 0; i < size; i++) {
try {
pool.addObject();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
@PreDestroy
public void destroy() {
if (pool != null) {
pool.close();
}
}
}
4、重写InfluxDBConnectionFactory
package org.springframework.data.influxdb;
import org.influxdb.InfluxDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
public class InfluxDBConnectionFactory implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(InfluxDBConnectionFactory.class);
private InfluxDBProperties properties;
private InfluxDBClientPool pool;
public InfluxDBConnectionFactory() {
}
public InfluxDBConnectionFactory(final InfluxDBProperties properties, final InfluxDBClientPool pool) {
this.properties = properties;
this.pool = pool;
}
public InfluxDB getConnection() {
try {
return pool.borrowObject();
} catch (Exception e) {
logger.error("Error while getting connection from pool", e);
throw new RuntimeException("Fail to get InfluxDB connection");
}
}
public void returnConnection(InfluxDB connection) {
try {
if (connection != null) {
pool.returnObject(connection);
}
} catch (Exception e) {
logger.error("Error while returning connection to pool", e);
throw new RuntimeException("Fail to return InfluxDB connection");
}
}
/**
* Returns the configuration properties.
*
* @return Returns the configuration properties
*/
public InfluxDBProperties getProperties() {
return properties;
}
/**
* Sets the configuration properties.
*
* @param properties The configuration properties to set
*/
public void setProperties(final InfluxDBProperties properties) {
this.properties = properties;
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(getProperties(), "InfluxDBProperties are required");
}
}
5、重写InfluxDBTemplate
package org.springframework.data.influxdb;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.springframework.data.influxdb.converter.PointCollectionConverter;
import org.springframework.util.Assert;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class InfluxDBTemplate<T> extends InfluxDBAccessor implements InfluxDBOperations<T> {
private PointCollectionConverter<T> converter;
public InfluxDBTemplate() {
}
public InfluxDBTemplate(final InfluxDBConnectionFactory connectionFactory, final PointCollectionConverter<T> converter) {
this.setConnectionFactory(connectionFactory);
this.setConverter(converter);
}
public void setConverter(final PointCollectionConverter<T> converter) {
this.converter = converter;
}
@Override
public void afterPropertiesSet() {
super.afterPropertiesSet();
Assert.notNull(converter, "PointCollectionConverter is required");
}
@Override
public void createDatabase() {
InfluxDB connection = null;
try {
connection = super.getConnection();
final String database = super.getDatabase();
connection.createDatabase(database);
} finally {
super.returnConnection(connection);
}
}
@Override
@SuppressWarnings("unchecked")
public void write(final T... payload) {
write(Arrays.asList(payload));
}
@Override
public void write(final List<T> payload) {
InfluxDB connection = null;
try {
final String database = getDatabase();
final String retentionPolicy = getConnectionFactory().getProperties().getRetentionPolicy();
final BatchPoints ops = BatchPoints.database(database)
.retentionPolicy(retentionPolicy)
.consistency(InfluxDB.ConsistencyLevel.ALL)
.build();
payload.forEach(t -> Objects.requireNonNull(converter.convert(t)).forEach(ops::point));
connection = super.getConnection();
connection.write(ops);
} finally {
super.returnConnection(connection);
}
}
@Override
public QueryResult query(final Query query) {
InfluxDB connection = null;
try {
connection = super.getConnection();
return connection.query(query);
} finally {
super.returnConnection(connection);
}
}
@Override
public QueryResult query(final Query query, final TimeUnit timeUnit) {
InfluxDB connection = null;
try {
connection = super.getConnection();
return connection.query(query, timeUnit);
} finally {
super.returnConnection(connection);
}
}
@Override
public void query(Query query, int chunkSize, Consumer<QueryResult> consumer) {
InfluxDB connection = null;
try {
connection = super.getConnection();
connection.query(query, chunkSize, consumer);
} finally {
super.returnConnection(connection);
}
}
@Override
public Pong ping() {
InfluxDB connection = null;
try {
connection = super.getConnection();
return connection.ping();
} finally {
super.returnConnection(connection);
}
}
@Override
public String version() {
InfluxDB connection = null;
try {
connection = super.getConnection();
return connection.version();
} finally {
super.returnConnection(connection);
}
}
}
结果验证
改完之后,跑了个测试验证了下,可以发现已经实现了数据库连接的复用了,剩下的就是改bug了,这个季度的OKR有了这不是【手动狗头】
结语
兄弟们,如果觉得我写的还不错的话,给我点个赞、关个注吧,谢谢你看到这~