掘金 后端 ( ) • 2024-06-21 10:36

theme: juejin

一、概述

1.1 什么是 DataX

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。

DataX logo

1.2 DataX 设计

为了解决异构数据源同步问题,DataX 将复杂的网状的同步链路变成了星型数据链路,DataX 作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到 DataX,便能跟已有的数据源做到无缝数据同步。

DataX 设计

1.3 支持数据源

DataX 目前已经有了比较全面的插件体系,主流的 RDBMS 数据库、NOSQL、大数据计算系统都已经接入。

DataX 支持数据源

1.4 框架设计

DataX 框架设计

  • Reader:数据采集模块,负责采集数据源的数据,将数据发送给 Framework。
  • Writer:数据写入模块,负责不断向 Framework 取数据,并将数据写入到目的端。
  • Framework:用于连接 reader 和 writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

1.5 运行原理

DataX 运行原理

  • Job:单个作业的管理节点,负责数据清理、子任务划分、TaskGroup监控管理。
  • Task:由Job切分而来,是DataX作业的最小单元,每个Task负责一部分数据的同步工作。
  • Schedule:将Task组成TaskGroup,单个TaskGroup的并发数量为5
  • TaskGroup:负责启动Task。

举例来说,用户提交了一个 DataX 作业,并且配置了 20 个并发,目的是将一个 100 张分表的 mysql 数据同步到 odps 里面。 DataX 的调度决策思路是:

  1. DataXJob 根据分库分表切分成了 100 个 Task。
  2. 根据 20 个并发,DataX 计算共需要分配 4 个 TaskGroup。
  3. 4 个 TaskGroup 平分切分好的 100 个 Task,每一个 TaskGroup 负责以 5 个并发共计运行 25 个 Task。
功能 DataX Sqoop 运行模式 单进程 MR MySQL 读写 单机压力大;读写粒度容易控制 MR模式重,写出错处理麻烦 Hive 读写 单机压力大 很好 文件格式 ORC支持 ORC不支持,可添加 分布式 不支持,可以通过调度系统规避 支持 流控 有流控功能 需要定制 统计信息 已有一些统计,上报需要定制 没有,分布式的数据收集不方便 数据校验 在core部分有校验功能 没有,分布式的数据收集不方便 监控 需要定制 需要定制 社区 开源不久,社区不活跃 一直活跃,核心部分变动很少

二、快速入门

2.1 官方地址

下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

源码地址:https://github.com/alibaba/Data

2.2 前置要求

Linux

JDK 1.8 以上,推荐 1.8

Python 2.x 或 3.x 都可以

2.3 DataX 安装部署

  1. 将下载好的 tar 包上传到服务器,然后解压
tar -zxvf datax.tar.gz -C /opt/module
  1. 进入到 DataX 目录
cd /opt/module/datax
  1. 运行自检脚本
python ./bin/datax.py ./job/job.json

执行结束,输出以下信息即配置成功

2024-06-15 19:18:41.052 [job-0] INFO  JobContainer - PerfTrace not enable!
2024-06-15 19:18:41.052 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.023s |  All Task WaitReaderTime 0.030s | Percentage 100.00%
2024-06-15 19:18:41.052 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2024-06-15 19:18:30
任务结束时刻                    : 2024-06-15 19:18:41
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

三、使用案例

3.1 从 Stream 流读取数据打印到控制台

当前示例不会在实际开发中使用,只是了解 DataX 的使用,可以当作一个 HelloWorld。

3.1.1 查看配置模板

执行以下命令,查看 streamreader 和 streamwriter 对应的配置模板:

python ./bin/datax.py -r streamreader -w streamwriter

返回的配置模板:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column": [

                        ],
                        "sliceRecordCount": ""
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "encoding": "",
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
配置 是否必须 说明 clumn.type -- 列类型 column.value -- 列值 sliceRecordCount -- 记录数 parameter.encoding -- 编码 parameter.print -- 是否打印

3.1.2 根据模板编写配置文件

vim ./job/stream2stream.json

填写以下内容:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column": [
                            {
                                "type": "string",
                                "value": "hello datax"
                            },
                            {
                                "type": "long",
                                "value": "18"
                            }
                        ],
                        "sliceRecordCount": "10"
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "encoding": "UTF-8",
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

3.1.3 执行任务

执行以下命令,执行任务:

python ./bin/datax.py job/stream2stream.json

执行任务,控制台输出的执行结果:

2024-06-18 21:03:50.820 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2024-06-18 21:03:40
任务结束时刻                    : 2024-06-18 21:03:50
任务总计耗时                    :                 10s
任务平均流量                    :               13B/s
记录写入速度                    :              1rec/s
读出记录总数                    :                  10
读写失败总数                    :                   0

3.2 读取 MySQL 数据写入 HDFS

3.2.1 准备数据

在 MySQL 数据库中,创建 t_student 表

CREATE TABLE `t_student`  (
  `id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '唯一标识',
  `name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '姓名',
  `gender` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '性别',
  `address` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '地址',
  `tenant_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '租户id',
  `create_time` datetime NULL DEFAULT NULL COMMENT '创建时间',
  `update_time` datetime NULL DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = DYNAMIC;

插入数据

INSERT INTO `t_student` VALUES ('11efea8a6d26752f5b546e18c22cffcb', 'xiaoLiang', 'male', 'zaoZhuang', 'DOS_DEFAULT_TENANCY', '2023-07-29 08:35:43', NULL);
INSERT INTO `t_student` VALUES ('2001', 'zhaoLiu', 'male', 'jiNan', 'DOS_DEFAULT_TENANCY', '2023-07-29 07:56:38', NULL);
INSERT INTO `t_student` VALUES ('2002', 'zhengQi', 'female', 'ziBo', 'DOS_DEFAULT_TENANCY', '2023-07-29 07:57:03', NULL);
INSERT INTO `t_student` VALUES ('2003', 'tianBa', 'male', 'qingDao', 'ODPT', '2023-07-29 07:57:43', NULL);
INSERT INTO `t_student` VALUES ('e20eeeb3ae1d5008a6a0d773b9b3557f', 'xiaoMing', 'female', 'dongYing', 'DOS_DEFAULT_TENANCY', '2023-07-29 08:34:55', NULL);

3.2.2 查看官方模板

执行以下命令,查看 mysqlreader 和 hdfswriter 对应的配置模板:

python ./bin/datax.py -r mysqlreader -w hdfswriter

返回的配置模板:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [

                        ],
                        "connection": [
                            {
                                "jdbcUrl": [

                                ],
                                "table": [

                                ]
                            }
                        ],
                        "password": "",
                        "username": "",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [

                        ],
                        "compress": "",
                        "defaultFS": "",
                        "fieldDelimiter": "",
                        "fileName": "",
                        "fileType": "",
                        "path": "",
                        "writeMode": ""
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

mysqlreader 参数解析:

配置 是否必须 说明 column 是 需要同步列名集合,使用 json 数组描述,* 代表所有列 connection.jdbcUrl 是 对数据库的 JDBC 连接信息,支持多个连接地址 connection.table 是 需要同步的表,支持多个 connection.querySql 否 自定义 SQL,优先级高,配置 querySql 后,mysqlreader 直接忽略 table/column/where password 是 数据库密码 username 是 数据库用户名 where 否 筛选条件 splitPK 否 数据分片字段,一般是主键,仅支持整型

hfdswriter 参数解析:

配置 是否必须 说明 column 是 写入数据的字段,其中 name 指定字段名,type 指定类型 compress 否 hdfs 文件压缩类型,默认不填写,就是没有压缩 defaultFS 是 hdfs 文件系统 namenode 节点地址,格式 hdfs://ip:port fieldDelimiter 是 字段分隔符 fileName 是 写入文件名 fileType 是 文件类型,目前只支持用户配置为 text 或 orc path 是 存储到 hdfs 文件系统的路径 writeMode 是 hdfswriter 定稿前数据清理处理模式

3.2.3 类型转换

MySQL

目前MysqlReader支持大部分Mysql类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。

下面列出MysqlReader针对Mysql类型转换列表:

DataX 内部类型 Mysql 数据类型 Long int, tinyint, smallint, mediumint, int, bigint Double float, double, decimal String varchar, char, tinytext, text, mediumtext, longtext, year Date date, datetime, timestamp, time Boolean bit, bool Bytes tinyblob, mediumblob, blob, longblob, varbinary

请注意: 目前 HdfsWriter 支持大部分 Hive 类型,请注意检查你的类型。

下面列出 HdfsWriter 针对 Hive 数据类型转换列表:

DataX 内部类型 HIVE 数据类型 Long TINYINT,SMALLINT,INT,BIGINT Double FLOAT,DOUBLE String STRING,VARCHAR,CHAR Boolean BOOLEAN Date DATE,TIMESTAMP
  • 除上述罗列字段类型外,其他类型均不支持;
  • tinyint(1) DataX视作为整形;
  • year DataX视作为字符串类型;
  • bit DataX属于未定义行为;

HDFS

目前 HdfsWriter 支持大部分 Hive 类型,请注意检查你的类型。

下面列出 HdfsWriter 针对 Hive 数据类型转换列表:

DataX 内部类型 HIVE 数据类型 Long TINYINT,SMALLINT,INT,BIGINT Double FLOAT,DOUBLE String STRING,VARCHAR,CHAR Boolean BOOLEAN Date DATE,TIMESTAMP

3.2.4 根据模板编写配置文件

vim ./job/mysql2hdfs.json 

填写以下内容:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "name",
                            "gender",
                            "address",
                            "tenant_id",
                            "create_time",
                            "update_time"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.8.102:3306/test"
                                ],
                                "table": [
                                    "t_student"
                                ]
                            }
                        ],
                        "password": "Root@123.",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "string"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            },
                            {
                                "name": "gender",
                                "type": "string"
                            },
                            {
                                "name": "address",
                                "type": "string"
                            },
                            {
                                "name": "tenantId",
                                "type": "string"
                            },
                            {
                                "name": "createTime",
                                "type": "string"
                            },
                            {
                                "name": "updateTime",
                                "type": "string"
                            }
                        ],
                        "defaultFS": "hdfs://192.168.8.102:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "student.txt",
                        "fileType": "text",
                        "path": "/",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

注意:

  • MySQL 列顺序与 HDFS 列顺序必须相同,一一对应,不可以打乱排序;
  • HDFS 的列,必须有列名与类型;

3.2.5 执行任务

执行以下命令,执行任务:

python ./bin/datax.py job/mysql2hdfs.json 

执行任务,控制台输出的执行结果:

2023-09-16 22:30:13.374 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2024-06-18 22:12:00
任务结束时刻                    : 2024-06-18 22:12:13
任务总计耗时                    :                 13s
任务平均流量                    :               29B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   5
读写失败总数                    :                   0

可以去 HDFS 上查看导入的文件。

补充一下,使用 DataX 写入 Hive 时,没有直接写入 Hive 的方式,官方给出的方式是写入到 HDFS,写入 Hive 其实有两个方式:

  1. DataX 写入 HDFS 中,然后通过 Hive 的 load data 命令将数据加载到 Hive,推荐使用此方式;
  2. DataX 直接将数据写入 Hive 表对应的路径中,这种方式不推荐。

3.2.6 关于 HDFS HA 的支持

"hadoopConfig": {
    "dfs.nameservices": "ns",
    "dfs.ha.namenodes.ns": "nn1,nn2",
    "dfs.namenode.rpc-address.ns.nn1": "主机名:端口",
    "dfs.namenode.rpc-address.ns.nn2": "主机名:端口",
    "dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}

将上面的配置,直接添加到 hdfswriter 中的 parameter 中即可。

3.3 读取 HDFS 数据写入 MySQL

与上面示例相反,配置基本上一致,具体步骤略... ...

模板如下,对应修改一下配置即可:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "column": [

                        ],
                        "defaultFS": "",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ",",
                        "fileType": "orc",
                        "path": ""
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [

                        ],
                        "connection": [
                            {
                                "jdbcUrl": "",
                                "table": [

                                ]
                            }
                        ],
                        "password": "",
                        "preSql": [

                        ],
                        "session": [

                        ],
                        "username": "",
                        "writeMode": ""
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

3.4 读取 MySQL 数据写入 Elasticsearch

3.4.1 查看官方模板

执行以下命令,查看 mysqlreader 和 hdfswriter 对应的配置模板:

./bin/datax.py -r mysqlreader -w elasticsearchwriter

命令报错,并没有获取到对应的模板,我们可以从官网上直接获取模板:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [

                        ],
                        "connection": [
                            {
                                "jdbcUrl": [

                                ],
                                "table": [

                                ]
                            }
                        ],
                        "password": "",
                        "username": "",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "elasticsearchwriter",
                    "parameter": {
                        "endpoint": "http://xxx:9999",
                        "accessId": "xxxx",
                        "accessKey": "xxxx",
                        "index": "test-1",
                        "type": "default",
                        "cleanup": true,
                        "settings": {
                            "index": {
                                "number_of_shards": 1,
                                "number_of_replicas": 0
                            }
                        },
                        "discovery": false,
                        "batchSize": 1000,
                        "splitter": ",",
                        "column": [
                            {
                                "name": "pk",
                                "type": "id"
                            },
                            {
                                "name": "col_ip",
                                "type": "ip"
                            },
                            {
                                "name": "col_double",
                                "type": "double"
                            },
                            {
                                "name": "col_long",
                                "type": "long"
                            },
                            {
                                "name": "col_integer",
                                "type": "integer"
                            },
                            {
                                "name": "col_keyword",
                                "type": "keyword"
                            },
                            {
                                "name": "col_text",
                                "type": "text",
                                "analyzer": "ik_max_word"
                            },
                            {
                                "name": "col_geo_point",
                                "type": "geo_point"
                            },
                            {
                                "name": "col_date",
                                "type": "date",
                                "format": "yyyy-MM-dd HH:mm:ss"
                            },
                            {
                                "name": "col_nested1",
                                "type": "nested"
                            },
                            {
                                "name": "col_nested2",
                                "type": "nested"
                            },
                            {
                                "name": "col_object1",
                                "type": "object"
                            },
                            {
                                "name": "col_object2",
                                "type": "object"
                            },
                            {
                                "name": "col_integer_array",
                                "type": "integer",
                                "array": true
                            },
                            {
                                "name": "col_geo_shape",
                                "type": "geo_shape",
                                "tree": "quadtree",
                                "precision": "10m"
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

elasticsearchwriter 参数解析:

配置 是否必须 默认值 说明 endpoint 是 Elasticsearch 的连接地址 accessId 否 http auth 中的 user accessKey http auth 中的 password index 是 Elasticsearch 的 Index 名 type 否 index 名 Elasticsearch 中 Index 的 type 名 cleanup 否 false 是否删除万年青 number_of_shards 否 Elasticsearch 分片 number_of_replicas 否 Elasticsearch 副本 discovery 否 false 启用节点发现将(轮询)并定期更新客户机中的服务器列表 batchSize 否 1000 每次批量数据的条数 splitter 否 -,- 如果插入数据是array,就使用指定分隔符 column 是 Elasticsearch 所支持的字段类型,样例中包含了全部

3.4.2 根据模板编写配置文件

vim ./job/mysql2es.json

填写以下内容:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "name",
                            "gender",
                            "address",
                            "create_time",
                            "update_time"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.8.102:3306/test"
                                ],
                                "table": [
                                    "t_student"
                                ]
                            }
                        ],
                        "password": "Root@123.",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "elasticsearchwriter",
                    "parameter": {
                        "endpoint": "http://192.168.8.102:9200",
                        "index": "student",
                        "cleanup": true,
                        "settings": {
                            "index": {
                                "number_of_shards": 1,
                                "number_of_replicas": 2
                            }
                        },
                        "discovery": false,
                        "batchSize": 1000,
                        "splitter": ",",
                        "column": [
                            {
                                "name": "pk",
                                "type": "id"
                            },
                            {
                                "name": "name",
                                "type": "text"
                            },
                            {
                                "name": "gender",
                                "type": "text"
                            },
                            {
                                "name": "address",
                                "type": "text"
                            },
                            {
                                "name": "create_time",
                                "type": "date",
                                "format":"yyyy-MM-dd HH:mm:ss"
                            },
                            {
                                "name": "update_time",
                                "type": "date",
                                "format":"yyyy-MM-dd HH:mm:ss"
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

3.4.3 执行任务

执行以下命令,执行任务:

python ./bin/datax.py job/mysql2es.json

执行任务,控制台输出的执行结果:

任务启动时刻                    : 2024-06-19 21:54:38
任务结束时刻                    : 2024-06-19 21:54:50
任务总计耗时                    :                 11s
任务平均流量                    :               21B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   5
读写失败总数                    :                   0

到 Elasticsearch 上查看导入的数据,看是否导入成功。