我们使用的是sqoop1.4.7,采用sqoop export将数仓ADS层数据导入到Mysql库,通常有两种方式:
- 采用hcatalog方式,此时sqoop能够直接访问hive表数据,因此hive表可以采用任意存储格式(textfile、orc、parquet等…)。
- 采用hdfs方式,此时sqoop直接读取hdfs目录数据,按照指定分隔符完成行切分与列切分。
模式对比
hcatalog模式底层原理是基于hive的hcatalog库实现的MapReduce InputFormat和OutputFormat,直接实现对Hive元数据的访问并自动完成文件格式解析得到行记录,sqoop利用hcatalog库开发MR JOB可以轻松访问任意Hive表数据,将其通过JDBC写入到Mysql。
HDFS模式则是sqoop直接访问HDFS上的裸文件,整个过程不需要访问Hive的元数据(完全与Hive无关),需要告知sqoop如何切割文件中的行与列,因此要求我们将Hive表建成textfile格式存储,这样才能和HDFS模式无缝对接。
我们当然希望使用hcatalog模式,因为它可以使用ORC、Parquet等强schema的Hive表,具备兼容性好、存储压缩率高等各方面优势。而使用textfile建hive表的话,就会遇到一个典型问题,比如某个列值带有换行符\n,因此sqoop在解析时就会提前结束本行解析,导致后续解析失败。
hcatalog模式
在实际使用sqoop export的hcatalog模式时,我们会使用如下命令:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
dt='2020-12-30' sqoop export \ --connect "jdbc:mysql://10.10.10.1:3306/UserDB?useUnicode=true&characterEncoding=utf8" \ --username "UserDB" \ --password "UserPass" \ --table "user_abnormal_behavior_flag_day" \ --num-mappers 1 \ --hcatalog-database safe \ --hcatalog-table ads_user_abnormal_behavior_flag_day \ --hcatalog-partition-keys dt \ --hcatalog-partition-values $dt \ --update-mode allowinsert \ --update-key user_id,flag |
- 通过–hcatalog系列的参数,指定要导出的Hive数据库、表名、分区字段、分区字段值。
- 通过–update-mode allowinsert指定插入Mysql时采用upsert模式(SQL对应为insert … on duplicate),mysql的唯一键是user_id,flag。
- 另外也可以通过–columns指定只导出Hive表的某些字段,在此不做演示。
然而运行时会偶尔成功,偶尔报错,随机出现如下错误:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
21/01/05 13:51:15 ERROR mapreduce.ExportJobBase: Export job failed! 21/01/05 13:51:15 ERROR tool.ExportTool: Error during export: Export job failed! at org.apache.sqoop.mapreduce.ExportJobBase.runExport(ExportJobBase.java:445) at org.apache.sqoop.manager.MySQLManager.upsertTable(MySQLManager.java:145) at org.apache.sqoop.tool.ExportTool.exportTable(ExportTool.java:73) at org.apache.sqoop.tool.ExportTool.run(ExportTool.java:99) at org.apache.sqoop.Sqoop.run(Sqoop.java:147) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76) at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:183) at org.apache.sqoop.Sqoop.runTool(Sqoop.java:234) at org.apache.sqoop.Sqoop.runTool(Sqoop.java:243) at org.apache.sqoop.Sqoop.main(Sqoop.java:252) |
查看yarn后台具体MapReduce失败日志:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
2020-12-31 18:34:21,164 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.NoClassDefFoundError: Could not initialize class org.apache.derby.jdbc.AutoloadedDriver40 at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.sql.DriverManager.isDriverAllowed(DriverManager.java:556) at java.sql.DriverManager.getConnection(DriverManager.java:661) at java.sql.DriverManager.getConnection(DriverManager.java:247) at org.apache.sqoop.mapreduce.db.DBConfiguration.getConnection(DBConfiguration.java:302) at org.apache.sqoop.mapreduce.AsyncSqlRecordWriter.<init>(AsyncSqlRecordWriter.java:78) at org.apache.sqoop.mapreduce.UpdateOutputFormat$UpdateRecordWriter.<init>(UpdateOutputFormat.java:97) at org.apache.sqoop.mapreduce.mysql.MySQLUpsertOutputFormat$MySQLUpsertRecordWriter.<init>(MySQLUpsertOutputFormat.java:62) at org.apache.sqoop.mapreduce.mysql.MySQLUpsertOutputFormat.getRecordWriter(MySQLUpsertOutputFormat.java:49) at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.<init>(MapTask.java:647) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:767) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:175) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:169) |
发现失败时sqoop的MR正尝试访问derby嵌入式数据库的相关class,但提示class无法找到,通过Yarn日志我们可以确认derby的相关jar包在提交MR任务时已经上传到HDFS,实在没搞明白。
粗略分析了一下sqoop 1.4.7这块代码,发现此处本应该是JDBC连接Mysql业务库,但不知道为啥连接了derby嵌入数据库,因为sqoop 1.4.7代码非常老了(2017年),用的还是java1.6编译都搞不定,所以一顿折腾放弃了加日志编译的想法。
最终选择了放弃hcatalog方式,因为看到sqoop其实只兼容到hadoop 2.6,而我们使用的是hadoop 2.8,而且也不知道到底是hadoop兼容性问题还是hive带的hcatalog库的问题,实在没时间折腾。
HDFS模式
这就简单了,我们把hive的ADS表建成textfile格式的,建表时采用默认行为即可:
- 列分隔符:\001,是一个特殊的unicode字符,正常业务字段值不会出现这玩意。
- 行分隔符:\n,我们得保证列值里面不要出现\n,否则就会导致sqoop解析时误判(说实话这个还挺容易避免的,毕竟ADS是我们清洗出来的最终数据)。
- 空值:采用\N这样的字符串表达。
1 2 3 4 5 6 7 8 |
create table if not exists safe.ads_user_abnormal_behavior_flag_day ( user_id bigint , flag string , description string ) partitioned by (dt string) stored as textfile location '/warehouse/xx/ads/ads_user_abnormal_behavior_flag_day'; |
然后导出时:
1 2 3 4 5 6 7 8 9 10 11 12 |
dt='2020-12-30' sqoop-export \ --connect "jdbc:mysql://10.10.10.1:3306/UserDB?useUnicode=true&characterEncoding=utf8" \ --username "UserDB" \ --password "Userpass" \ --table "user_abnormal_behavior_flag_day" \ --num-mappers 3 \ --fields-terminated-by '\001' \ --input-null-string '\\N' --input-null-non-string '\\N' \ --export-dir "/warehouse/xx/ads/ads_user_abnormal_behavior_flag_day/dt=${dt}" \ --update-mode allowinsert \ --update-key user_id,flag |
必须告知sqoop如何进行文件切割:
- fields-terminated-by:texfile表默认就是用\001这种罕见字符作为column的分隔符的。
- input-null-string:string类型的column如果为Null,textfile表默认会用\N存储表示,所以得告诉sqoop这个事情。
- input-null-non-string:非string类型的column如果为Null,textfile表默认也是用\N存储表示,所以得告诉sqoop这个事情。
- 换行符不用特殊说明,sqoop默认就会使用\n,正好与texfile表默认行为一致。
举个例子,假设上述texfile表中存在一行记录的后两列是Null的话,在HDFS上实际存储为:4\N\N,那么当sqoop export解析文本时遇到\N的列值就会往Mysql中插入Null,就是这样一个关系。
此时我们用export-dir指定textfile表要导出的分区hdfs路径即可。
唯一需要提示的是–input-null-string ‘\\N’中必须用单引号’\\N’,应该是了JAVA中拼接正则表达式使用的,\\在正则中会表达为\,才能最终对应到HDFS中的字符串 \N。
总结
sqoop1.4.7虽然是最新的1.x版本,但最后更新时间已经是2017年了,而sqoop2.x看起来变化很大似乎并不好用。
目前实际遇到sqoop1.4.7的问题只有sqoop export hcatalog,其他问题尚未发现。
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~
