掘金 阅读 ( ) • 2024-04-19 16:33

file DB2是IBM的一款关系型数据库管理系统,JDBC DB2 Source Connector是一个用于通过JDBC读取外部数据源数据的连接器。Apache SeaTunnel如何支持JDBC DB2 Sink Connector?请参考本文档。

支持引擎

Spark
Flink
SeaTunnel Zeta

主要功能

使用 Xa 事务 来确保 精确一次性。因此,只支持对支持 Xa 事务 的数据库进行 精确一次性 操作。您可以设置 is_exactly_once=true 来启用它。

描述

通过 JDBC 写入数据。支持批处理模式和流式模式,支持并发写入,支持精确一次性语义(使用 XA 事务保证)。

支持的数据源信息

数据源 支持的版本 驱动程序 URL Maven DB2 不同的依赖版本有不同的驱动程序 com.ibm.db2.jdbc.app.DB2Driver jdbc:db2://127.0.0.1:50000/dbname 下载

数据库依赖

请下载与 'Maven' 相对应的支持列表,并将其复制到 '$SEATNUNNEL_HOME/plugins/jdbc/lib/' 工作目录中
例如,对于 DB2 数据源:cp db2-connector-java-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/

数据类型映射

file

Sink 选项

名称 类型 必填 默认值 描述 url 字符串 是 - JDBC 连接的 URL。例如:jdbc:db2://127.0.0.1:50000/dbname driver 字符串 是 - 用于连接到远程数据源的 JDBC 类名,如果使用 DB2,则值为 com.ibm.db2.jdbc.app.DB2Driver。 user 字符串 否 - 连接实例的用户名 password 字符串 否 - 连接实例的密码 query 字符串 否 - 使用此 SQL 将上游输入数据写入数据库。例如 INSERT ...query 具有更高的优先级。 database 字符串 否 - 使用此 databasetable-name 自动生成 SQL,并接收上游输入数据写入数据库。此选项与 query 互斥,并具有更高的优先级。 table 字符串 否 - 使用数据库和此表名自动生成 SQL,接收上游输入数据写入数据库。此选项与 query 互斥,并具有更高的优先级。 primary_keys 数组 否 - 此选项用于支持自动生成 SQL 时的 insertdeleteupdate 操作。 support_upsert_by_query_primary_key_exist 布尔 否 false 根据查询主键是否存在选择使用 INSERT SQL、UPDATE SQL 处理更新事件(INSERT、UPDATE_AFTER)。此配置仅在数据库不支持 upsert 语法时使用。请注意,此方法性能较低。 connection_check_timeout_sec 整数 否 30 用于等待验证连接的数据库操作完成的时间(以秒为单位)。 max_retries 整数 否 0 提交失败(executeBatch)的重试次数。 batch_size 整数 否 1000 用于批处理写入,当缓冲记录数量达到 batch_size 或时间达到 batch_interval_ms 时,数据将刷新到数据库。 batch_interval_ms 整数 否 1000 用于批处理写入,当缓冲记录数量达到 batch_size 或时间达到 batch_interval_ms 时,数据将刷新到数据库。 is_exactly_once 布尔 否 false 是否启用精确一次性语义,将使用 XA 事务。如果启用,需要设置 xa_data_source_class_name。 generate_sink_sql 布尔 否 false 基于要写入的数据库表自动生成 SQL 语句。 xa_data_source_class_name 字符串 否 - 数据库驱动程序的 XA 数据源类名,例如,DB2 为 com.db2.cj.jdbc.Db2XADataSource。其他数据源请参考附录。 max_commit_attempts 整数 否 3 事务提交失败的重试次数。 transaction_timeout_sec 整数 否 -1 事务打开后的超时时间,默认为 -1(永不超时)。请注意,设置超时可能会影响精确一次性语义。 auto_commit 布尔 否 true 默认启用自动事务提交。 common-options 否 - Sink 插件的通用参数,请参考 Sink Common Options 获取详细信息。

提示

如果未设置 partition_column,则将以单一并发方式运行;如果设置了 partition_column,则根据任务的并发度并行执行。

任务示例

简单示例:

该示例定义了一个 SeaTunnel 同步任务,通过 FakeSource 自动生成数据并发送到 JDBC Sink。FakeSource 生成总共 16 行数据(row.num=16),每行有两个字段,name(字符串类型)和 age(整数类型)。最终的目标表是 test_table,在表中也将有 16 行数据。在运行此作业之前,您需要在您的 DB2 中创建数据库 test 和表 test_table。如果您尚未安装和部署 SeaTunnel,请按照 安装 SeaTunnel 中的说明安装和部署 SeaTunnel。然后按照 使用 SeaTunnel 引擎快速入门 中的说明运行此作业。

# 定义运行时环境
env {
  # 您可以在这里设置 Flink 配置
  execution.parallelism = 1
  job.mode = "BATCH"
}

source {
  # 这是一个示例源插件,仅用于测试和演示源插件功能
  FakeSource {
    parallelism = 1
    result_table_name = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
  # 如果您想要获取更多关于如何配置 SeaTunnel 并查看完整的源插件列表的信息,
  # 请访问 https://seatunnel.apache.org/docs/category/source-v2
}

transform {
  # 如果您想要获取更多关于如何配置 SeaTunnel 并查看完整的转换插件列表的信息,
  # 请访问 https://seatunnel.apache.org/docs/category/transform-v2
}

生成 Sink SQL

该示例不需要编写复杂的 SQL 语句,您可以配置数据库名称和表名称,以自动生成要插入的语句。

sink {
    jdbc {
        url = "jdbc:db2://127.0.0.1:50000/dbname"
        driver = "com.ibm.db2.jdbc.app.DB2Driver"
        user = "root"
        password = "123456"
        query = "insert into test_table(name,age) values(?,?)"
        }
  #  如果您想要获取更多关于如何配置 SeaTunnel 并查看完整的接收插件列表的信息,
  #  请访问 https://seatunnel.apache.org/docs/category/sink-v2
}

sink {
    jdbc {
        url = "jdbc:db2://127.0.0.1:50000/dbname"
        driver = "com.ibm.db2.jdbc.app.DB2Driver"
        user = "root"
        password = "123456"
        # 根据数据库表名自动生成 SQL 语句
        generate_sink_sql = true
        database = test
        table = test_table
    }
}

精确一次性:

为了确保精确写入场景,我们保证精确一次性。 sink { jdbc { url = "jdbc:db2://127.0.0.1:50000/dbname" driver = "com.ibm.db2.jdbc.app.DB2Driver"

    max_retries = 0
    user = "root"
    password = "123456"
    query = "insert into test_table(name,age) values(?,?)"

    is_exactly_once = "true"

    xa_data_source_class_name = "com.db2.cj.jdbc.Db2XADataSource"
}

}

本文由 白鲸开源科技 提供发布支持!