canal初体验 – 同步binlog到hive

采用T+1天级同步会遇到一些问题:

  • mysql表存在delete操作,无法增量同步到删除操作。
  • mysql表没有更新时间字段,必须全量同步。
  • mysql表大量分库分表,同步合并过程复杂&慢。

将mysql的binlog实时同步到hive,这样凌晨就不需要sqoop抽数据了,只需要直接将增量合并到全量即可。

canal架构

canal是阿里开源工具,它由server端和client端构成:

  • canal client:自行使用canal client sdk开发,连接canal server消费binlog。
  • canal server:代理client向mysql请求binlog(模仿slave协议),并维护client消费binlog的游标。

canal client与canal server之间的关系与rabbitmq很像,client消费server的binlog并应答ack或者rollback,而消费游标则由canal server持久化维护在本地磁盘或者zookeeper上。

另外也有一种玩法是canal server将binlog直接投递到kafka,然后canal client直接消费kafka,这种模式消费游标就是由kafka来维护了。

搭建canal

参考官方文档:https://github.com/alibaba/canal/wiki/AdminGuide

开启binlog

找到要同步的mysql数据库实例,修改my.cnf开启binlog(具体差异自行谷歌):

  • server-id:实例唯一ID,在主从关系中不能冲突
  • log_bin:binlog写入路径
  • binlog-format:采用ROW行模式,这样binlog会记录修改前后的完整行记录,这样才能从binlog中拿到完整行记录

然后在mysql里建库建表,做测试用:

create database canal_test;

CREATE TABLE userlog (
id bigint(20) NOT NULL AUTO_INCREMENT,
content text,
PRIMARY KEY (id)
);

记得授权一个具备slave权限的Mysql账号,我直接用的root账号所以跳过了这一步。

下载canal

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

观察canal.properties

一个canal-server进程可以同时消费多个数据库实例的binlog,每个数据库实例叫做1个instance,默认会消费instance内所有的database和table的binlog。

配置instance列表,需要编辑canal.properties的属性:

  • canal.destinations:可以写多个instance名字,也就是多个数据库实例,每个instance的具体数据库地址/账号密码等配置要写在独立文件中。
  • canal.conf.dir:扫描instance配置的父目录,也就是conf目录,开启了这个的话就不需要配置destinations了,它会默认发现所有instance。

example是示例用的instance,我们下面直接对这个example数据库实例做配置修改,指向我们的数据库。

修改example/instance.properties

不同的instance只有配置目录名不同,配置文件的名字一定要叫做instance.properties。

首先配置数据库地址、起始的消费偏移量(可选,不填就是从binlog末尾开始消费后续日志):

然后配置账号密码:

然后配置正则,只保留我们关心的database和table:

这里有个坑就是binlog是按mysql实例为单位同步的,而我们也许只关注实例上某些db和table,所以通常我们都需要做好约束。

启动canal server

现在启动,即可开始同步example instance的binlog,如果有其他instance配置也会一起开始。

bin/startup.sh

程序是后台启动的,可以jps查看。

canal client示例

官方客户端SDK支持多种编程语言,利用客户端SDK可以轻松连接canal server进行binlog消费,可靠的ack机制让我们无需操心数据丢失。

一次SQL操作产生一条binlog,该SQL操作可能影响多行数据,了解这个对我们编写逻辑有意义。

可以查看完整官方代码示例,下面我讲一下主要流程原理。

创建连接

传参:

  • canal server地址、端口
  • canal instance的名字(即消费哪个数据库实例的binlog)
  • canal server账号密码(我们没配置验证,所以传空)

连接&订阅

订阅的意思是筛选要访问的database和table名字,传空字符串则使用canal instance配置的filter,如果想进一步过滤某些表就在这里写一下正则就行,例如:canal_test\\.userlog表示仅处理canal_test库下userlog表的binlog。

拉取一批binlog(message)

每次拉N条binlog回来得到message,canal server将其表达为一个batch,稍后处理完成后我们需要ack这个batch id表示处理成功,如果ack之前崩溃则下次消费仍旧可以拿到这批数据。

此时batch可能为空表示没有新的binlog产生,因为getWithoutAck是非阻塞的,我们需要做好判断:

如果为空就睡一会,否则message.getEntries()则可以取出这N条binlog,每条binlog代表一个SQL操作,有待进一步处理。

遍历binlog

batch内的每一个entry是一条binlog,我们循环处理:

跳过transaction事务start和end的操作。

从binlog entry中解析出影响了哪些数据行:

此处rowchange包含了此SQL所影响的所有行数据,因为update等语句都是一次性改变多行数据的。

在具体处理rowchange之前,我们可以打印一下binlog entry中的各种元信息:

  • binlog所在的binlog文件名和偏移量。
  • binlog的SQL类型eventType,比如INSERT/DELETE/UPDATE等。
  • schema name:数据库名。
  • table name:表名。

接下来让我们看看binlog关联的rowchange到底影响了多少行数据。

遍历rowchange

根据这次binlog的eventype不同(INSERT/DELETE/UPDATE…),我们需要对受影响的row做不同的处理,但通常我们只关注DELETE和非DELETE:

  • DELETE的话,可以获取row修改前的列值。
  • 其他情况均获取row修改后的列值。

打印row的各个列:

因为binlog采用row format,所以binlog中包含了修改前后完整的行数据快照,这是binlog同步的关键。

我的实现

我写了一个JAVA的例子,可以将binlog轮转写入到hdfs中,以便后续hive加载,项目地址:https://github.com/owenliang/canal-demo

写入HDFS

为了将binlog写入HDFS,所以我首先将受影响的行封装成了RowEntity对象:

RowEntity经过格式化(采用\001列分隔符,\n行分隔符)写入到HDFS文件中:

在mysql执行一下插入和删除操作:

insert into userlog(content) values(‘hello\n world’);

delete from userlog;

最终写入HDFS中如下格式的text数据:

各列含义如下:

  • 数据库名
  • 表名
  • 操作类型(归纳为2种,SET/DEL)
  • 原始操作类型(INSERT/DELETE/UPDATE等原始SQL类型)
  • 行数据(JSON编码的KV结构)
  • binlog时间戳(从binlog entry中提取,代表该行数据的新旧)

因为涉及到HDFS操作,所以启动jar包时需要使用hadoop jar来替代java -jar。

Hive加载HDFS数据

创建hive表来加载HDFS上的binlog:

默认textfile列分隔符就是\001,行分隔符就是\n,所以不必做特殊设置,我们写入的行文本中不会包含\001和\n字符,可以确保Hive分割正确。

然后执行load将某小时的数据加载到Hive小时级分区内:

load data inpath ‘/canal_test/2020-12-29-08/’ overwrite into table ods_binlog_userlog partition(dt=’2020-12-29-08′);

查询可以看到数据:

合并至全量

先处理增量:

通过Hive SQL的get_json_object函数提取出json中的各个字段,将op=SET的记录通过window窗口机制保留同一个业务ID的最新execute_time的一条记录,最后再左连接op=DEL的记录来剔除掉删除的记录。

合并至全量:

将处理好的增量通过full outer join的方式合入到全量表,得到新的全量表即可。

举一个例子(meta_id是业务自增主键),每天凌晨将昨日的binlog合并到前天的全量dwd表中,比如${dt}是2020-12-30,那么会取2020-12-30-00~2020-12-30-23之间的binlog合并到2020-12-29的全量表,结果插入到2020-12-30的全量表:

 

扩展阅读:美团DB数据同步到数据仓库的架构与实践

如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~