掘金 后端 ( ) • 2024-05-01 14:47

茶艺师学微服务(实操篇4 数据迁移怎么做? 中)

前言

经过了上篇《茶艺师学微服务(实操篇3-数据迁移怎么做? 上)》,我们的数据迁移进度来到了这里:

我们开始聊聊校验逻辑。

校验逻辑

哪里需要校验?

这里打个比方,在小时候抄课文,抄完后了,对作业态度认真的同学,肯定会拿自己抄的与原文对比检查一下。
对于这次作业,课文的篇幅的固定的,不会再增加的,检查起来很简单。
假设一下,要抄的课文篇幅不断增加,那我们又还得继续抄。
在这样背景下,要保证没抄错书,当到了可以停下笔的时候,是不是希望“我抄的部分和原文对应部分的差别尽可能小”?
就像在不停机数据迁移下,在初始化好新库、新表以及往新表导入了数据,这时来个完整的新旧数据库的校验与比对,可以保证整个数据迁移工作有个比较好的开始。
当然了,数据量不大的情况,可以直接开始双写阶段,这时,不管是完全检查(全量校验),还是部分检查(增量校验),校验与修复这个动作都会伴随在整个过程里。

校验到有不一致,如何处理?

校验到有不一致的数据了,当然是要修。
只是这个“修”,我们至少有三个思路可选:

  • 哪个 goroutine 发现问题了,就由那个 goroutine 来修
  • 交给另外的 goroutine 来修
  • 当做生产者发消息给 Kafka ,让消费者来从消息队列中一个个拿去修

下面的讨论默认使用第三种思路进行,这思路的好处的能利用 Kafka 的解耦特性来对业务流进行削峰填谷,从而保护目标数据库。

Kafka 里的修复通知要按照顺序消费(修复)吗?

程序在一条条地对比数据,一条条修复数据的通知往 Kafka 里发,那么很自然会有这么一个情况:

很自然的解决思路是,在往 Kafka 的数据包里也加上时间戳,只拿有最新的时间戳的消息(修改通知)。
又或者,不用加时间戳,更不加要修改的数据,拿到消息(修改通知)时再去源表那里看一眼对应的数据,根据该数据来修改。

这样一来,就不需要考虑 Kafka 里的消息(修改通知)的顺序了。

如果发现了数据不一致,但向 Kafka 发送修复通知失败了,怎么办?

可以重试发送消息,当然也有可能重试发送失败。
因此兜底的做法是,告警,写入日志,通知人手动去修。
偷懒的做法,不用管,等下一轮的校验与修复。

校验的思路与校验方案选型

基本思路就是从源表读取数据,根据主键去目标表查找对应的数据,比对两条数据是否相等。
在里面需要考虑:

  • 数据库的类型,都能转成 Go 语言类型吗?
  • 转出来的 Go 类型,还能比较吗?
  • 浮点数之类的类型从数据库里出来,转成 GO 类型,会不会丢失精度?还能不能用于比较?
  • 还使用 kafka 做削峰填谷,发出去的数据能不能顺利转换,会不会又会丢失精度?

接着就是方案设计,基于 Go 语言,相信大家很快想到以下方案:

  1. 直接对每一张表定制其查询与比较方法
    最直接,也可能会是最累的,会写出很多重复代码的方案
  2. 借助 Go 的泛型 any
    目前 MySQL 的 driver 会对 any 适配合适的 Go 类型,比较方便。
  3. 用 Go 最底层的 []byte 来接收数据,然后直接比较 []byte
    虽然看着很底层很扎实的思路,然而有些数据库的 driver 反而会不大处理好 []byte

除此之外,还要需要结合业务方的需求。
比如他们的要求是“表 A 里的某3个字段和表 B 的对应相同就行了”,能提前得知这需求就可以避免白花功夫。

两边的数据都拿出来了,到底如何比较?

肯定不能这样比较:

if 源数据 == 目标数据 ? 

我们是以 GO 的泛型 any 来接收数据,理论上可以使用 Go 的反射来比较:

if reflect.DeepEqual(源数据, 目标数据) {
  ......
}

如果需要考虑业务方是否他们的自定义比较方法,先通过类型断言看存不存在自定义比较逻辑,有的话就用,没有的话才用 GO 的反射:

var srcAny any = 源数据
if c1, ok := srcAny.(interface {
	// 有没有自定义的比较逻辑
    ...
}); ok {
	// 有,我就用它的
    ...
} else {
	// 没有,我就用反射
    ...
	}
}

在以源表为准的阶段,如果目标表有的数据,在源表反而没有,怎么办?

第一反应,你也会是:不可能!
然而在写入目标表的过程中,有人直接链接源表,硬删除掉数据,我们又该如何应对?
那就反过来,以目标表为准查找源表没有的数据,把它修回来,这就是反向校验修复。

反向校验能不能使用 COUNT 来加速处理?

反向校验,可以一口气取出目标表一组数据,直接与源表的比对看有没有少的。
然而数据库(至少是 MySQL)是有个 COUNT ,直接返回一张表里面的记录数,这样目标表、源表的 COUNT 一比较不就知道源表有没有少数据?
假设用于初始化目标表的数据是昨天23:59:59的,那么使用 COUNT(*) WHERE ctime < 今天的零点 ,发现 COUNT 相等,那么源表数据是没有被删的。
当然了还是那句话,真有人直接连数据库硬删了,这招就不好使了。
这时虽然能通过分时段的 COUNT 快速判断被删掉的数据是位于哪个时间端的,但最后还是得一条条对比把它给找出来。

如果是异构数据库的场合

上面说的,还只是在同构数据库之间的校验和修复。
在异构数据库的场合下,还要考虑:

  • 当从源表拿出数据了,却没法用主键在目标表里找数据
    这时得从唯一索引或者“外键”那里想想办法。
  • 源表的一些字段丢弃了,或者目标表有新数据(还是根据源表的算出来的)
    别想通用校验方案了,老老实实地为它定制吧

校验的时机

唤起校验与修复的时机,我们有两个选择:

  • 定时任务 在主体业务低峰时进行
  • 动态判断当前任务负载,负载高时校验挂起,负载底时继续

需要注意的是,判断负载,比起看 CPU 与内存,更应该看数据库,因为性能瓶颈往往是在数据库上。
也可以在考虑数据库的基础上再结合 CPU 与内存的情况。

数据修复

由于我们选择的校验方案是,在检查出数据对不上的情况下,发信号给 Kafka ,让它通知去修。
而这异常信号,可以很简单明了的设计成以下的样子:

const (
	// InconsistentEventTypeTargetMissing 校验的目标数据,缺了这一条
	InconsistentEventTypeTargetMissing = "target_missing"
	// InconsistentEventTypeNEQ 不相等
	InconsistentEventTypeNEQ = "neq"
	// InconsistentEventTypeBaseMissing 校验的源数据,缺了这一条(反向校验用)
	InconsistentEventTypeBaseMissing = "base_missing"
)

那么当拿到了这样的信号,该怎么做呢?
一种很老实的做法就是,对于这些信号,我们每一种都针对处理,就是:

  • InconsistentEventTypeTargetMissing 目标库少了这数据,就再读一下源库的数据,找到了源库的数据,再写入目标库;如果源库已经没有了该数据,那就不管了。
  • InconsistentEventTypeNEQ 两个数的数据不相等,那么在读一下源库的数据,拿其去更新目标库的数据;如果源库已经没有了该数据,那就把目标库对应 ID 的数据删掉。
  • InconsistentEventTypeBaseMissing 源库少了这数据,那么删掉目标库对应 ID 的数据

另外一种做法是,可以把 InconsistentEventTypeTargetMissingInconsistentEventTypeNEQ 放在一起处理:

  • 当查找源库的数据,没找到,把目标库对应 ID 的数据删掉
  • 能找到,就用该数据更新目标库对应 ID 的数据(如果是与源库数据冲突,就更新,否则插入数据)
// Clause 更新
// Create 新建并插入 
f.target.Clauses(clause.OnConflict{
				DoUpdates: clause.AssignmentColumns(f.columns),
			}).Create(&t).Error

其实,还有一个最一了百了的做法,把三种信号一并处理了:

  • 当查找源库的数据,没找到,把目标库对应 ID 的数据删掉
  • 能找到,就用该数据更新目标库对应 ID 的数据(如果是与源库数据冲突,就更新,否则插入数据)

这三种写法可以用下图概括:

双写

我们准备好了 数据校验与修复 ,接着开始准备如何同时读写源库与目标库方法了。

先设定好该四个阶段的“开关”:

const (
	patternDstOnly  = "DST_ONLY" // 只操作目标库
	patternSrcOnly  = "SRC_ONLY" // 只操作源库
	patternDstFirst = "DST_FIRST" // 以目标库为准
	patternSrcFirst = "SRC_FIRST" // 以源库为准
)

根据这开关,来决定如何操作目标库与源库。

最直接的做法,就是操作两个库的表。
换句话说,就是两个库的增删改查都挨个写一遍。

那么有没有更通用的做法?
答案是有的,我们可以利用 gorm 的 ConnPool 接口,它是与数据库直接打交道的接口,可以在这里操作两个表。
只不过要实现 ConnPool 接口稍微有点复杂:
因为它还需要一个启动器,因此还要把这启动器一并装饰起来:

到这里还没完,因为数据库最后看的是事务,因此要用这个 ConnPool 去实现“事务接口”:

在这里我们可以看到,除了 ConnPool ,它还至少要求 TxConmittter

最后大致是这样子的:

到这里,就可以不用考虑这些表里的具体增删改查行为,仅仅通过“开关”选择操作模式从而实现对两个数据库的操作

结语

我们已经到了双写阶段,在双写的过程中,我们如何确认新写的数据是对的呢?

这就是下一篇要讨论的 增量校验与修复 ,敬请期待。