掘金 后端 ( ) • 2024-04-19 16:34

使用 Canal 同步数据

目标

  1. 安装 canal-server、canal-admin、canal-adpater
  2. 同步多个数据库的表到一个数据库

操作步骤

  1. 下载软件包
  2. 按照官方教程进行安装
  3. 进行必要的服务配置,启动服务,从canal-admin-web进行访问验证服务安装成功
  4. 添加同步任务
  5. 验证同步过程的正确性

说明
软件版本:canal.deployer-1.1.7、canal.admin-1.1.7、canal-adapter-1.1.7

执行过程中发现该版本无法直接在 admin 启停 canal-server,确认为 bug,不想升级或更换版本的话,手动启停 canal-server 即可。

本地安装方式

1、安装启动canal-admin

成功后,可以通过 http://127.0.0.1:8089/ */ 访问,默认密码:admin/123456

2、安装启动canal-server

这里使用远程配置方式(即通过canal-admin进行配置的编辑管理,数据保存在数据库)。

下载软件包到本地后,对 conf/canary.properties 进行必要的修改,如下:

# 
canal.register.ip = 127.0.0.1

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
# 对应密码123456
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
#admin auto register
# canal-server 启动后会从 canal-admin 拉取配置信息(canal.properties),如果为true,当配置不存在时会自动注册 canal-server 的信息,并自动在数据库中创建一个关联的配置记录
canal.admin.register.auto = true
canal.admin.register.cluster = 
canal.admin.register.name = local_demo

按照上面的配置,在 canal-server 启动成功后,会自动注册 server 到 canal-admin,然后就可以直接在 canal-admin 管理和配置该 canal-server 了。

image.png

注意:如果 canal-server 启动时拉取配置失败,记得检查 canal.register.ip 的配置,确保是 canal-admin 可以访问的 IP。

3、安装启动 canal-adapter

前面安装 canal-admin 时已经创建了 canal-manager 数据库,所以这里依然选择使用远程配置的方式:

  • 在 canal_config 表中创建 id=2 的数据对应 adapter 下的 application.yml 文件。
  • 在 canal_adapter_config 表对应每个adapter的子配置文件。

注意: 目前这个远程配置功能似乎并没有实装,所以后面依然采用修改本地配置的方式。

4、测试同步(全量+增量)

准备两个数据库:test_a 和 test_b,在 test_a 创建一张 user 表并插入一批数据,在 test_b 创建同样的表但是不写入任何数据。现在需要将 test_a 中 user 表的数据同步到 test_b 的对应表,然后测试增量同步。

操作步骤:

  1. 准备数据库和表
  2. 创建 instance,命名为 instance_test_a,表示同步源为 test_a
  3. 添加 adapter(key 为 syncUser),用来将 test_a.user 中的数据同步到 test_b.user
  4. 添加 adapter config,配置RDB映射关系,并通过 outerAdapterKey : syncUser 关联到 adapter
  5. 调用接口查询所有的同步任务
  6. 调用接口执行手动ETL
  7. 验证结果
create schema test_a;
-- 创建表
CREATE TABLE test_a.user (
    id INT AUTO_INCREMENT PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    password VARCHAR(50) NOT NULL,
    email VARCHAR(100),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 插入数据
INSERT INTO test_a.user (username, password, email) VALUES
('john_doe', 'password123', '[email protected]'),
('jane_smith', 'securepass', '[email protected]'),
('alice_jones', 'alice123', '[email protected]'),
('bob_brown', 'password456', '[email protected]'),
('carol_gray', 'carolpass', '[email protected]');

-- 创建备份库和表
create schema test_b;

CREATE TABLE test_b.user (
    id INT AUTO_INCREMENT PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    password VARCHAR(50) NOT NULL,
    email VARCHAR(100),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: -1
  timeout: 100
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test_a?useUnicode=true
      username: test
      password: test
  canalAdapters: # 适配器列表。
    - instance: instance_test_a # canal instance Name or mq topic name
      groups: # 一份数据可以被多个group同时消费, group之间并行执行, 一个group内部串行执行多个outerAdapters
        - groupId: g1
          outerAdapters:
            - name: rdb # 指定为rdb类型同步,同步到test_b库
              key: syncUser # 指定adapter的唯一key, 与表映射配置中outerAdapterKey对应
              properties:
                jdbc.driverClassName: com.mysql.jdbc.Driver
                jdbc.url: jdbc:mysql://127.0.0.1:3306/test_b?useUnicode=true
                jdbc.username: test
                jdbc.password: test
                druid.stat.enable: false
                druid.stat.slowSqlMillis: 1000

test_user.yml

dataSourceKey: defaultDS
destination: instance_test_a
groupId: g1
outerAdapterKey: syncUser
concurrent: true
dbMapping:
  database: test_a # 源数据库
  table: user # 源表
  targetTable: user #目标表
  targetPk:
    id: id
  mapAll: true
  commitBatch: 3000 # 批量提交的大小

注意
测试发现当前版本(v1.1.7)adapter 远程配置不生效,所以依然采用编辑本地配置文件(application.yml 和 conf/rdb/test_user.yml)的方式。

手动执行ETL:

curl http://127.0.0.1:8081/etl/rdb/test_user.yml -X POST
> {"succeeded":true,"resultMessage":"导入RDB 数据:6 条"}

后面直接在 test_a.user 表执行 DML 操作就会自动同步到 test_b.user 了。注意,DDL 不会同步哦!