掘金 后端 ( ) • 2024-05-16 13:46

highlight: a11y-light theme: channing-cyan

数据准备

为了准备相对庞大的数据量,我从GitHub上找了一些公开的真实数据,是2019年到2023年WHO公开的Covid-19数据。

数据集地址

image.png

数据插入

拉取数据集,打开如下文件夹

image.png

我们可以看到很多的csv文件,这些都是全球各个地区的每日新增报告。

容器准备

我们使用Logstash对这批文件进行导入

docker启动Logstash

使用dockerfile进行docker启动,因为在启动之前我没需要对容器进行一些操作。

# 使用官方的 Logstash 镜像
FROM logstash:7.12.0

# 维护者信息
LABEL maintainer="[email protected]"

# 安装 logstash-output-jdbc 插件
RUN logstash-plugin install logstash-output-jdbc

# 创建目录以存放 JDBC 驱动程序
RUN mkdir -p /usr/share/logstash/jdbc

# 将 MySQL 连接器 JAR 包复制到容器中
COPY ./covid19/mysql-connector-java-8.0.28.jar /usr/share/logstash/jdbc/mysql-connector-java-8.0.28.jar

# 将配置文件和管道复制到容器中
COPY ./config/  /usr/share/logstash/config/
COPY ./pipeline/ /usr/share/logstash/pipeline/
# 暴露需要的端口
EXPOSE 5044 16060

# 设置 Logstash 命令
CMD ["logstash", "-f", "/usr/share/logstash/pipeline/xxxx.conf"]

注意要点:

  1. RUN logstash-plugin install logstash-output-jdbc 此命令会执行很久,耐心等待,可能与网络有关,可尝试切换快速的节点进行尝试。安装此插件目的是为了不编写代码,通过logstash直接操作数据库进行全量同步。
  2. xxxx.conf 请把conf文件名换成你的文件名。此文件内容是下面的logstash管道配置内容。
  3. 把上面拉取到的数据集放到如下文件夹 /home/mycontainers/mylogstash/covid19/
  4. 把mysql相关驱动的jar包下载到本地,然后放到服务器上,编写Dockerfile的时候根据你的宿主机路径进行实际更改。

请注意相对路径和当前目录,避免文件找不到报错。

具体配置请参考我之前的文章 告别服务器捞日志,开启自动化日志搜集之旅

logstash管道配置

input {
  file {
    path => "/home/mycontainers/mylogstash/covid19/*.csv"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

filter {
  csv {
    separator => ","
    columns => ["FIPS", "Admin2", "Province_State", "Country_Region", "Last_Update", "Lat", "Long_", "Confirmed", "Deaths", "Recovered", "Active", "Combined_Key", "Incident_Rate", "Case_Fatality_Ratio"]
  }

  mutate {
    rename => {
      "FIPS" => "fips"
      "Admin2" => "admin2"
      "Province_State" => "province_state"
      "Country_Region" => "country_region"
      "Last_Update" => "last_update"
      "Lat" => "lat"
      "Long_" => "long"
      "Confirmed" => "confirmed"
      "Deaths" => "deaths"
      "Recovered" => "recovered"
      "Active" => "active"
      "Combined_Key" => "combined_key"
      "Incident_Rate" => "incident_rate"
      "Case_Fatality_Ratio" => "case_fatality_ratio"
    }
  }
}
output {
  elasticsearch {
    hosts => ["http://myes:9200"]
    index => "covid19"
  }
}

启动容器

确保配置和文件都正确后,在Dockerfile执行

docker build -t mylogstash:custom .

d3bc614f4ff86dcf65192a78ce28c9a.png

成功后我们继续执行如下命令

docker run --name mylogstashes
-p 5044:5044
-p 16060:16060
-itd --restart=always
-v /etc/localtime:/etc/localtime
-v /home/mycontainers/mylogstash/config:/usr/share/logstash/config
-v /home/mycontainers/mylogstash/pipeline:/usr/share/logstash/pipeline
-v /home/mycontainers/mylogstash/covid19:/home/mycontainers/mylogstash/covid19 --net mynetwork logstash:7.12.0

执行完成后,我们就可以去es查看数据了 使用Kibana的开发工具查询总数

image.png

好了,我们现在有400w条数据在es里面了。

数据同步

logstash

接下来我们要把es里面的数据全量同步到MySQL里面。

CREATE TABLE `covid19` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `combined_key` varchar(255) DEFAULT NULL,
  `country_region` varchar(255) DEFAULT NULL,
  `confirmed` int(11) DEFAULT NULL,
  `longitude` decimal(18,8) DEFAULT NULL,
  `source_path` varchar(255) DEFAULT NULL,
  `last_update_time` datetime DEFAULT NULL,
  `host_name` varchar(255) DEFAULT NULL,
  `province_state` varchar(255) DEFAULT NULL,
  `deaths` int(11) DEFAULT NULL,
  `latitude` decimal(18,8) DEFAULT NULL,
  `fips_code` varchar(255) DEFAULT NULL,
  `incident_rate` decimal(18,8) DEFAULT NULL,
  `case_fatality_ratio` decimal(18,8) DEFAULT NULL,
  `message_text` text,
  `admin2` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_combined_key_country_region_province_state` (`combined_key`,`country_region`,`province_state`),
  KEY `idx_last_update_time` (`last_update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

新建一个logstash容器,然后把管道配置改成如下配置

input {
  elasticsearch {
    hosts => ["http://myes:9200"]  # Elasticsearch 主机地址
    index => "covid19"          # 要同步的索引名称
    query => '{"size": 5000, "query": {"match_all": {}}}'  # 查询 JSON 格式
	docinfo => true
    scroll => "1m"  # 添加滚动时间以确保只取一次
  }
}

filter {
  # 可以在这里添加过滤器,对从 Elasticsearch 中读取的数据进行处理
  mutate {
    rename => {
      "long" => "longitude"
      "host" => "host_name"
    }
  }
}

output {
  jdbc {
    connection_string => "jdbc:mysql://192.168.31.150:3306/jycloud?user=root&password=123456&useUnicode=true&useSSL=false&characterEncoding=utf8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2B8"  # MySQL 连接字符串
    driver_jar_path => "/usr/share/logstash/jdbc/mysql-connector-java-8.0.28.jar"  # MySQL 驱动程序 JAR 包路径
    driver_class => "com.mysql.cj.jdbc.Driver"  # MySQL 驱动程序类名
    # statement => "INSERT INTO covid19 (combined_key, country_region, confirmed, longitude, source_path, last_update_time, host_name, province_state, deaths, latitude, fips_code, incident_rate, case_fatality_ratio, message_text, admin2) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"  # 插入语句,根据实际情况修改
    # 以下是从 Elasticsearch 中获取字段值并插入到 MySQL 表中对应字段的配置
    # 字段名需与 Elasticsearch 中的字段名对应
    statement => [
      "INSERT INTO covid19 (combined_key, country_region, confirmed, longitude, source_path, last_update_time, host_name, province_state, deaths, latitude, fips_code, incident_rate, case_fatality_ratio, message_text, admin2) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
      "combined_key",
      "country_region",
      "confirmed",
      "longitude",
      "source_path",
      "last_update",
      "host_name",
      "province_state",
      "deaths",
      "latitude",
      "fips_code",
      "incident_rate",
      "case_fatality_ratio",
      "message_text",
      "admin2"
    ]
  }
}

执行docker复制文件命令替换掉的原本的管道配置文件

docker cp xxx.conf 容器id:/usr/share/logstash/pipeline/xxx.conf

确保/usr/share/logstash/pipeline文件夹里只有最新的conf文件。 然后重启容器。

docker restart 容器id/容器名称

然后查询数据库,会发现数据在逐渐递增。全量同步过程非常久,因为不是批量插入,而批量插入的情况下比逐条插入要快。笔者这次全量同步执行了大概2小时。

java

上面使用了逐条插入的方法,接下来我没使用一些批量插入进行一些对比。

配置

@Configuration
public class DatabaseConfig {
    @Value("${spring.datasource.url}")
    private String url;

    @Value("${spring.datasource.username}")
    private String username;

    @Value("${spring.datasource.password}")
    private String password;

    @Value("${spring.datasource.driver-class-name}")
    private String driverClassName;

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource() {
        DruidDataSource dataSource =  new DruidDataSource();
        dataSource.setUrl(url);
        dataSource.setUsername(username);
        dataSource.setPassword(password);
        dataSource.setDriverClassName(driverClassName);
        return dataSource;
    }

    //配置SqlSessionFactory-常规写法
    @Bean
    public SqlSessionFactory sqlSessionFactoryBean(DataSource dataSourceProxy)
            throws Exception {
        //出现invalid bound statement (not found) 使用此配置
        MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
//        SqlSessionFactoryBean sqlSessionFactoryBean =
//                new SqlSessionFactoryBean();
        bean.setDataSource(dataSourceProxy);
        bean.setMapperLocations
                (new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/*.xml"));
        bean.setTransactionFactory
                (new SpringManagedTransactionFactory());
        return bean.getObject();
    }

}

核心代码

 @Test
    void SyncEsToMysql() {
        IndexCoordinates index = IndexCoordinates.of("covid19");
        int from = 0;
        List<Covid> list = new ArrayList<>();
        Long currentDateTimeStart = System.currentTimeMillis();
        NativeSearchQuery query = new NativeSearchQueryBuilder()
                .withQuery(QueryBuilders.matchAllQuery())
                .withPageable(PageRequest.of(0, BATCH_SIZE))
                .build();
        //首次查找
        SearchScrollHits<Covid> hits = elasticsearchRestTemplate.searchScrollStart(5000, query, Covid.class, index);
        String scrollId = hits.getScrollId();
        try {
            while (hits != null && !hits.isEmpty()) {
                // 如果 resultList 的大小达到 batchSize,就插入数据库并清空 resultList
                hits.getSearchHits().forEach(hit -> list.add(hit.getContent()));
                if (list.size() >= BATCH_SIZE) {
                    //covid19Service.saveBatch(list);
                    list.clear();
                }
                //根据scrollId再次查询es
                hits = elasticsearchRestTemplate.searchScrollContinue(scrollId, 5000, Covid.class, index);
                if (hits != null) {
                    scrollId = hits.getScrollId();
                }
                log.info("from :" + from);
                from += BATCH_SIZE;
            }
            if (!list.isEmpty()) {
                //covid19Service.saveBatch(list);
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.info(e.getMessage());
        } finally {
            Long currentDateTimeEnd = System.currentTimeMillis();
            log.info("from :" + from);
            log.info("time:" + (currentDateTimeEnd - currentDateTimeStart));
        }

    }

先把入库方法 covid19Service.saveBatch(list) 注释掉,跑一下看看需要多久

39a056750788ef71ae45767ec086e47.png

只查询的话需要519544毫秒,转换成分钟就是8.6分钟。

然后我再把入库方法 covid19Service.saveBatch(list) 打开试试看,执行代码,看控制台。

27780109067625ecf1d548be7035a2c.png

836151毫秒,折算成分钟就是13分钟。

查看MySQL和es,验证数据是否一致 image.png

大功告成,全量同步就说到这里了,增量同步我们下次再相遇。