采用T+1天级同步会遇到一些问题:
- mysql表存在delete操作,无法增量同步到删除操作。
- mysql表没有更新时间字段,必须全量同步。
- mysql表大量分库分表,同步合并过程复杂&慢。
将mysql的binlog实时同步到hive,这样凌晨就不需要sqoop抽数据了,只需要直接将增量合并到全量即可。
canal架构
canal是阿里开源工具,它由server端和client端构成:
- canal client:自行使用canal client sdk开发,连接canal server消费binlog。
- canal server:代理client向mysql请求binlog(模仿slave协议),并维护client消费binlog的游标。
canal client与canal server之间的关系与rabbitmq很像,client消费server的binlog并应答ack或者rollback,而消费游标则由canal server持久化维护在本地磁盘或者zookeeper上。
另外也有一种玩法是canal server将binlog直接投递到kafka,然后canal client直接消费kafka,这种模式消费游标就是由kafka来维护了。
搭建canal
参考官方文档:https://github.com/alibaba/canal/wiki/AdminGuide
开启binlog
找到要同步的mysql数据库实例,修改my.cnf开启binlog(具体差异自行谷歌):
1 2 3 |
server-id = 1 log_bin = /var/log/mysql/mysql-bin.log binlog-format = ROW |
- server-id:实例唯一ID,在主从关系中不能冲突
- log_bin:binlog写入路径
- binlog-format:采用ROW行模式,这样binlog会记录修改前后的完整行记录,这样才能从binlog中拿到完整行记录
然后在mysql里建库建表,做测试用:
create database canal_test;
CREATE TABLE
userlog
(
id
bigint(20) NOT NULL AUTO_INCREMENT,
content
text,
PRIMARY KEY (id
)
);
记得授权一个具备slave权限的Mysql账号,我直接用的root账号所以跳过了这一步。
下载canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
观察canal.properties
一个canal-server进程可以同时消费多个数据库实例的binlog,每个数据库实例叫做1个instance,默认会消费instance内所有的database和table的binlog。
配置instance列表,需要编辑canal.properties的属性:
1 2 3 4 5 6 7 8 9 |
################################################# ######### destinations ############# ################################################# canal.destinations = example # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 |
- canal.destinations:可以写多个instance名字,也就是多个数据库实例,每个instance的具体数据库地址/账号密码等配置要写在独立文件中。
- canal.conf.dir:扫描instance配置的父目录,也就是conf目录,开启了这个的话就不需要配置destinations了,它会默认发现所有instance。
example是示例用的instance,我们下面直接对这个example数据库实例做配置修改,指向我们的数据库。
修改example/instance.properties
不同的instance只有配置目录名不同,配置文件的名字一定要叫做instance.properties。
首先配置数据库地址、起始的消费偏移量(可选,不填就是从binlog末尾开始消费后续日志):
1 2 3 4 5 6 |
# position info canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= |
然后配置账号密码:
1 2 3 |
# username/password canal.instance.dbUsername=root canal.instance.dbPassword=baidu@123 |
然后配置正则,只保留我们关心的database和table:
1 2 |
# table regex canal.instance.filter.regex=canal_test\\..* |
这里有个坑就是binlog是按mysql实例为单位同步的,而我们也许只关注实例上某些db和table,所以通常我们都需要做好约束。
启动canal server
现在启动,即可开始同步example instance的binlog,如果有其他instance配置也会一起开始。
bin/startup.sh
程序是后台启动的,可以jps查看。
canal client示例
官方客户端SDK支持多种编程语言,利用客户端SDK可以轻松连接canal server进行binlog消费,可靠的ack机制让我们无需操心数据丢失。
一次SQL操作产生一条binlog,该SQL操作可能影响多行数据,了解这个对我们编写逻辑有意义。
可以查看完整官方代码示例,下面我讲一下主要流程原理。
创建连接
1 2 |
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(Config.configuration().getCanalHostname(), Config.configuration().getCanalPort()), Config.configuration().getCanalDestination(), "", ""); |
传参:
- canal server地址、端口
- canal instance的名字(即消费哪个数据库实例的binlog)
- canal server账号密码(我们没配置验证,所以传空)
连接&订阅
1 2 |
connector.connect(); connector.subscribe(".*\\..*"); |
订阅的意思是筛选要访问的database和table名字,传空字符串则使用canal instance配置的filter,如果想进一步过滤某些表就在这里写一下正则就行,例如:canal_test\\.userlog表示仅处理canal_test库下userlog表的binlog。
拉取一批binlog(message)
1 2 3 |
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); |
每次拉N条binlog回来得到message,canal server将其表达为一个batch,稍后处理完成后我们需要ack这个batch id表示处理成功,如果ack之前崩溃则下次消费仍旧可以拿到这批数据。
此时batch可能为空表示没有新的binlog产生,因为getWithoutAck是非阻塞的,我们需要做好判断:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 |
如果为空就睡一会,否则message.getEntries()则可以取出这N条binlog,每条binlog代表一个SQL操作,有待进一步处理。
遍历binlog
batch内的每一个entry是一条binlog,我们循环处理:
1 2 3 4 |
for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } |
跳过transaction事务start和end的操作。
从binlog entry中解析出影响了哪些数据行:
1 2 3 4 5 6 7 |
RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } |
此处rowchange包含了此SQL所影响的所有行数据,因为update等语句都是一次性改变多行数据的。
在具体处理rowchange之前,我们可以打印一下binlog entry中的各种元信息:
1 2 3 4 5 |
EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); |
- binlog所在的binlog文件名和偏移量。
- binlog的SQL类型eventType,比如INSERT/DELETE/UPDATE等。
- schema name:数据库名。
- table name:表名。
接下来让我们看看binlog关联的rowchange到底影响了多少行数据。
遍历rowchange
1 2 3 4 5 6 7 8 9 10 11 12 |
for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } |
根据这次binlog的eventype不同(INSERT/DELETE/UPDATE…),我们需要对受影响的row做不同的处理,但通常我们只关注DELETE和非DELETE:
- DELETE的话,可以获取row修改前的列值。
- 其他情况均获取row修改后的列值。
打印row的各个列:
1 2 3 4 5 |
private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } |
因为binlog采用row format,所以binlog中包含了修改前后完整的行数据快照,这是binlog同步的关键。
我的实现
我写了一个JAVA的例子,可以将binlog轮转写入到hdfs中,以便后续hive加载,项目地址:https://github.com/owenliang/canal-demo。
写入HDFS
为了将binlog写入HDFS,所以我首先将受影响的行封装成了RowEntity对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
private void userCallback(CanalEntry.EventType eventType, Long executeTime, String db, String table, CanalEntry.RowData row) throws Exception { RowEntity rowEntity = new RowEntity(); rowEntity.setDb(db); rowEntity.setTable(table); rowEntity.setRaw_op(eventType.toString()); rowEntity.setExecuteTime(executeTime); List<CanalEntry.Column> columns = null; if (eventType == CanalEntry.EventType.DELETE) { rowEntity.setOp("DEL"); columns = row.getBeforeColumnsList(); } else { rowEntity.setOp("SET"); columns = row.getAfterColumnsList(); } Map<String, String> fields = new HashMap<String, String>(); for (CanalEntry.Column column : columns) { fields.put(column.getName(), column.getValue()); } rowEntity.setFields(fields); this.handler.handleRowEntity(rowEntity); } |
RowEntity经过格式化(采用\001列分隔符,\n行分隔符)写入到HDFS文件中:
1 2 3 4 5 6 7 8 9 10 11 12 |
public void handleRowEntity(RowEntity rowEntity) throws Exception { prepareForWriting(); String db = rowEntity.getDb(); String table = rowEntity.getTable(); String op = rowEntity.getOp(); String rawOp = rowEntity.getRaw_op(); String fields = JSONObject.toJSONString(rowEntity.getFields()); String row = String.format("%s\001%s\001%s\001%s\001%s\001%d\n", db, table, op, rawOp, fields, rowEntity.getExecuteTime()); stream.write(row.getBytes("utf-8")); } |
在mysql执行一下插入和删除操作:
insert into userlog(content) values(‘hello\n world’);
delete from userlog;
最终写入HDFS中如下格式的text数据:
1 2 3 4 5 6 |
(base) root@ubuntu:~# hdfs dfs -cat /canal_test/2020-12-29-08/binlog-8251@ubuntu.lan-1609230645018 canal_testuserlogSETINSERT{"id":"178","content":"hello\n world"}1609230639000 canal_testuserlogSETINSERT{"id":"179","content":"hello\n world"}1609230640000 canal_testuserlogSETINSERT{"id":"180","content":"hello\n world"}1609230640000 canal_testuserlogSETINSERT{"id":"181","content":"hello\n world"}1609230641000 canal_testuserlogSETINSERT{"id":"182","content":"hello\n world"}1609230641000 |
各列含义如下:
- 数据库名
- 表名
- 操作类型(归纳为2种,SET/DEL)
- 原始操作类型(INSERT/DELETE/UPDATE等原始SQL类型)
- 行数据(JSON编码的KV结构)
- binlog时间戳(从binlog entry中提取,代表该行数据的新旧)
因为涉及到HDFS操作,所以启动jar包时需要使用hadoop jar来替代java -jar。
Hive加载HDFS数据
创建hive表来加载HDFS上的binlog:
1 2 3 4 5 6 7 8 9 10 11 |
create table if not exists ods_binlog_userlog( db_name string, table_name string, op string, raw_op string, fields string, execute_time bigint ) partitioned by(dt string) stored as textfile location '/warehouse/safe/ods/ods_binlog_userlog'; |
默认textfile列分隔符就是\001,行分隔符就是\n,所以不必做特殊设置,我们写入的行文本中不会包含\001和\n字符,可以确保Hive分割正确。
然后执行load将某小时的数据加载到Hive小时级分区内:
load data inpath ‘/canal_test/2020-12-29-08/’ overwrite into table ods_binlog_userlog partition(dt=’2020-12-29-08′);
查询可以看到数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
hive> select * from ods_binlog_userlog where dt='2020-12-29-08'; OK ods_binlog_userlog.db_name ods_binlog_userlog.table_name ods_binlog_userlog.op ods_binlog_userlog.raw_op ods_binlog_userlog.fields ods_binlog_userlog.execute_time ods_binlog_userlog.dt canal_test userlog SET INSERT {"id":"172","content":"hello\n world"} 1609230400000 2020-12-29-08 canal_test userlog SET INSERT {"id":"173","content":"hello\n world"} 1609230405000 2020-12-29-08 canal_test userlog SET INSERT {"id":"174","content":"hello\n world"} 1609230406000 2020-12-29-08 canal_test userlog SET INSERT {"id":"175","content":"hello\n world"} 1609230406000 2020-12-29-08 canal_test userlog SET INSERT {"id":"176","content":"hello\n world"} 1609230406000 2020-12-29-08 canal_test userlog SET INSERT {"id":"177","content":"hello\n world"} 1609230406000 2020-12-29-08 canal_test userlog SET INSERT {"id":"178","content":"hello\n world"} 1609230639000 2020-12-29-08 canal_test userlog SET INSERT {"id":"179","content":"hello\n world"} 1609230640000 2020-12-29-08 canal_test userlog SET INSERT {"id":"180","content":"hello\n world"} 1609230640000 2020-12-29-08 canal_test userlog SET INSERT {"id":"181","content":"hello\n world"} 1609230641000 2020-12-29-08 canal_test userlog SET INSERT {"id":"182","content":"hello\n world"} 1609230641000 2020-12-29-08 |
合并至全量
先处理增量:
通过Hive SQL的get_json_object函数提取出json中的各个字段,将op=SET的记录通过window窗口机制保留同一个业务ID的最新execute_time的一条记录,最后再左连接op=DEL的记录来剔除掉删除的记录。
合并至全量:
将处理好的增量通过full outer join的方式合入到全量表,得到新的全量表即可。
举一个例子(meta_id是业务自增主键),每天凌晨将昨日的binlog合并到前天的全量dwd表中,比如${dt}是2020-12-30,那么会取2020-12-30-00~2020-12-30-23之间的binlog合并到2020-12-29的全量表,结果插入到2020-12-30的全量表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
-- 提取youhui_meta的binlog with youhui_meta_binlog as ( select op binlog_op, execute_time binlog_time, cast(get_json_object(fields,'$.meta_id') as bigint) meta_id, cast(get_json_object(fields,'$.article_id') as bigint) article_id, cast(get_json_object(fields,'$.meta_key') as string) meta_key, cast(get_json_object(fields,'$.meta_value') as string) meta_value from safe.ods_youhui_binlog where dt>='${dt}-00' and dt<='${dt}-23' and db_name='dbzdm_youhui' and table_name='youhui_meta' ), -- 保留每个meta_id的最新修改 youhui_meta_delta as ( select binlog_op, meta_id, article_id, meta_key, meta_value from ( select *, row_number() over(partition by meta_id order by binlog_time desc) row_num from youhui_meta_binlog ) tmp where tmp.row_num=1 ), -- 分离出set操作 youhui_meta_add as ( select meta_id, article_id, meta_key, meta_value from youhui_meta_delta where youhui_meta_delta.binlog_op='SET' ), -- 分离出del操作 youhui_meta_del as ( select meta_id, article_id, meta_key, meta_value from youhui_meta_delta where youhui_meta_delta.binlog_op='DEL' ), -- 更新记录合并到全量 youhui_meta_add_merged as ( select if(b.meta_id is not null, b.meta_id, a.meta_id) meta_id, if(b.meta_id is not null, b.article_id, a.article_id) article_id, if(b.meta_id is not null, b.meta_key, a.meta_key) meta_key, if(b.meta_id is not null, b.meta_value, a.meta_value) meta_value from ( select * from dwd_fact_youhui_youhui_meta where dt=date_sub('${dt}',1) ) a full outer join youhui_meta_add b on a.meta_id=b.meta_id ) -- 从新全量中删除del的记录 insert overwrite table safe.dwd_fact_youhui_youhui_meta partition(dt='${dt}') select a.meta_id, a.article_id, a.meta_key, a.meta_value from youhui_meta_add_merged a left join youhui_meta_del b on a.meta_id=b.meta_id where b.meta_id is null; |
扩展阅读:美团DB数据同步到数据仓库的架构与实践
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~

你为什么什么都会 hive一顿猛操作,有鱼儿老师不会的东西吗😭