掘金 后端 ( ) • 2024-05-29 13:10

theme: orange

主题

先说说场景,同样是渐进式的数据清洗能力增强,最先摸到的是taier,基于在线vscode的交互模式,功能方方面面其实挺好,但得具备flink和纯均的基础,以指定的如数据cdc、增量同步等处理,同时考量大批量同步扩展问题,综合下来就只能去定制了自己去摸查一下远程提交这块,其中包含flink、taier、chunjun等内容

Flink

Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算,这里的教程比较多,不过多做展开。 image.png

chunjun

Chunjun是一个基于Flink的批流统一的数据同步工具

  1. 基于 json , sql 快速构建数据同步任务,你只需要关注数据源的结构信息即可, 让您节省时间,专注于数据集成的开发。
  2. 基于flink 原生的input,output 相关接口来实现多种数据源之间的数据传输,同时你可以基于 flink 自己扩展插件。
  3. 多种数据源之间数据传输断点续传增量同步实时采集脏数据管理实时数据还原

两者结合

使用上来说两者结合是常态,chunjun类似于提供了一套增强的语法及提交模式,扩展了一些数据源支持、 chunjun-clients 提供了sh的远程提交模式,而需要处理的类似于实现flink的远程提交方式+chunjun-core的调用执行,以下是一些实现的问题枚举。

1.flink standalone模式

  • 可以看到flink的demo中WordCount.jar中的执行环境获取
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
  • 构造一个远程提交客户端RestClusterClient
 Configuration flinkConfig = new Configuration();
       flinkConfig.setString("jobmanager.rpc.address", "192.168.145.130"); // ?�I????JobManager???
       flinkConfig.setString("jobmanager.rpc.port", "6123"); // ?�I????JobManager RPC???
       // flinkConfig.setString("jobmanager.web.port", "8081");
       flinkConfig.setString("rest.address", "192.168.145.130");
       flinkConfig.setString("rest.port", "8081");
       flinkConfig.setString("classloader.resolve-order", "parent-first");
       // flinkConfig.setString("classloader.resolve-order", "child-first");
       // flinkConfig.setString("classloader.check-leaked-classloader", "false");
       RestClusterClient<StandaloneClusterId> clusterClient = new RestClusterClient<StandaloneClusterId>(
               flinkConfig, StandaloneClusterId.getInstance());
  • 构造flink PackagedProgram 进行jar包远程提交,进行任务执行,可理解为任务的“地面验证”,只有没有异常问题才会进入下一步。
     List<URL> urlList= getJarUrls("F:/chunjun/chunjun-dist");
    
         PackagedProgram packagedProgram = PackagedProgram.newBuilder()
                .setJarFile(jobJar)
                .setArguments(programArgs)
                .setEntryPointClassName(mainClass)
                 .setUserClassPaths(urlList)
                .setConfiguration(flinkConfig)
                .build();
                int parallelism = flinkConfig.getInteger(DEFAULT_PARALLELISM);
        // Pipeline pip = PackagedProgramUtils.getPipelineFromProgram(packagedProgram, flinkConfig, parallelism, true);
        // JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(null, pip, flinkConfig, parallelism);
        // FactoryUtil.getFactoryHelperThreadLocal
        JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram,
        flinkConfig, parallelism, false);
        jobGraph.getClasspaths().clear();
        jobGraph.getUserJars().clear();
        jobGraph.getUserArtifacts().clear();
    
  • 提交任务
    // 任务提交
     JobID jobID = clusterClient.submitJob(jobGraph).get(); 
    // 如果是批任务可以查看执行结果,注意,提交时会进行任务执行,可理解为任务的“地面验证”
            // result = clusterClient.requestJobResult(jobID).get();
    

2.用反射执行的时候遇到的问题比较多

  1. 先是对chunjun-client进行提交,后来发现这里面完整的职能与当前处理类似,于是与chunjun-client类似进行core包的提交。
  2. 一个setUserClassPaths这个相当于一运行依赖项的引用,这里相对比较友好,总之是有提示抛出,顺利的搞出json提交后发现sql模式有异常,原因是chunjun-core中对flink进行了一些扩展和封装,死活无法加载这些扩展内容、后来不得不把一些扩展core的扩展放入当前项目中 1716957012756.png
  3. 有关于maven引用的问题 flink-table-uberflink-table-uber-blink类似于包flink-table相关的包集合,反正这两个坑我是遇到了,动态搞了一堆外部的引用,在maven中引用相关引用包即可。 image.png

3. 提交结果检测

如此项是对mysql binlog日志的cdc捕获同步操作 image.png

  CREATE TABLE source
(
  id          bigint,
  `name`       varchar
) WITH (
    'connector' = 'binlog-x'
    ,'username' = 'root'
    ,'password' = 'root'
    ,'cat' = 'insert,delete,update'
    ,'url' = 'jdbc:mysql://192.168.145.1:3306/chunjun?useSSL=false'
    ,'host' = '192.168.145.1'
    ,'port' = '3306'
    ,'table' = 'test'
    ,'timestamp-format.standard' = 'SQL'
    );

CREATE TABLE sink
(
  id          bigint,
  `name`       varchar
)
WITH (
    'connector' = 'stream-x'
    );
insert into sink select id,`name` from source u;

CREATE TABLE table_sink
(
  id          bigint,
  `name`       varchar,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-x',
    'url' = 'jdbc:mysql://192.168.145.1:3306/chunjun?useSSL=false',
    'table-name' = 'test_log',
    'username' = 'root',
    'password' = 'root'
    );

insert into table_sink select id,`name` from source u;

总结

看理论的概念和直接上手去摸查实在是相差大,可能跟我的习惯相关, 定义目标->解决方案->实现->遇到问题->解决问题->实现目标,这个思路,反正遇到的问题挺多,桥脑到的问题也多,好在抓狂到最后问题也解决了,许多的问题尽量把自己的韧性提高一些,尽量不要降低目标,可以有PlanB的应急方案,但尽量不要放弃第一目标、

-------------------------------------------------------- 六爻卦起、知而不避,愿你的执着,因代码而美丽~、