sqoop – mysql导入hive与hive导出mysql
sqoop用于关系型数据库与数仓进行数据交换,是大数据必备工具。
常见需求包含2种:
- mysql数据T+1导入hdfs,再load到hive表分区下。
- hive表T+1导出mysql,服务线上业务。
下面我们做一下具体演示。
下载sqoop
从官网下载sqoop(带hadoop后缀版本),
wget https://mirror.bit.edu.cn/apache/sqoop/1.4.7/sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz
解压sqoop到某个目录,然后导出PATH环境变量:
export PATH=/usr/local/jdk/bin:/root/bigdata/hadoop-3.3.0/bin:/root/bigdata/spark-3.0.1/bin:/root/bigdata/hive-3.1.3/bin:/root/bigdata/sqoop-1.4.7/bin:${PATH}
export SQOOP_HOME=/root/bigdata/sqoop-1.4.7
配置sqoop
sqoop会操作HDFS、Mapreduce、Hive、Mysql,因此有一些必要的配置项。
首先拷贝环境变量配置文件:
cp conf/sqoop-env-template.sh conf/sqoop-env.sh
在sqoop-env中必须添加如下3个环境变量:
1 2 3 |
export HADOOP_HOME=/root/bigdata/hadoop-3.3.0 export HCAT_HOME=/root/bigdata/hive-3.1.3/hcatalog export HIVE_HOME=/root/bigdata/hive-3.1.3 |
hcatlog实际就是Hive的sdk,可以编程读写hive表的schema,提供了屏蔽底层文件格式的inputformat/outputformat,可以很方便的开发MapReducer程序处理表格数据。
上述缺一不可。
下载jar依赖
直接使用sqoop命令会报错找不到某个common lang的类,我们需要放一个合适版本的jar包到lib目录下:
1 2 |
wget https://repo1.maven.org/maven2/commons-lang/commons-lang/2.6/commons-lang-2.6.jar mv commons-lang-2.6.jar lib/ |
另外需要mysql jdbc来访问mysql:
1 2 |
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.49/mysql-connector-java-5.1.49.jar mv mysql-connector-java-5.1.49.jar lib/ |
就此安装结束。
导入
接下来把mysql数据导入到hdfs,然后加载到hive的ods表中。
准备sql语句
出于简单,我们把mysql数据库的user表的某些字段提取出来,导入到hdfs中。
1 2 3 4 5 6 7 8 9 10 11 12 |
mysql> use mysql; Database changed mysql> select host,user,password_lifetime,password_last_changed from user where date_format(`password_last_changed`, '%Y-%m-%d')='2020-10-13'; +-----------+------------------+-------------------+-----------------------+ | host | user | password_lifetime | password_last_changed | +-----------+------------------+-------------------+-----------------------+ | localhost | mysql.session | NULL | 2020-10-13 05:27:09 | | localhost | mysql.sys | NULL | 2020-10-13 05:27:09 | | localhost | debian-sys-maint | NULL | 2020-10-13 05:27:13 | | % | root | NULL | 2020-10-13 05:51:52 | +-----------+------------------+-------------------+-----------------------+ 4 rows in set (0.00 sec) |
作为数仓T+1任务,我们假设昨天的日期恰好是2020-10-13,因此可以提取出上述4行数据。
执行sqoop import
现在执行sqoop import,把上述SQL的数据写入到HDFS中:
sqoop import –connect jdbc:mysql://localhost:3306/mysql –username root –password baidu@123 –query ‘select host,user,password_lifetime,password_last_changed from user where date_format(
password_last_changed
, “%Y-%m-%d”)=”2020-10-13″ and $CONDITIONS’ –as-parquetfile –compress –compression-codec snappy –target-dir “/dw/tmall/db/user/2020_10_13” –delete-target-dir –num-mappers 1
- –connect:mysql地址
- –username/–password:mysql账号密码
- -query:SQL,得到当天的增量数据,其中$CONDITION必须写到where条件里占位,后续解释。
- –as-parquetfile:HDFS按parquetfile文件格式存储,天然的列式存储,不需要指定什么列分隔符与行分隔符之类的,支持高效压缩,可以直接被Hive表load。
- –compress/–compression-codec:指定snappy压缩。
- –target-dir:数据写入HDFS路径,我们写到数仓的规范路径下,/dw/{数仓名}/db/{表名}/{分区}。
- –delete-target-dir:写入前先删除target-dir。
- –num-mappers:只启动1个mapper执行SQL写入HDFS文件。
sqoop是支持多个Mapper并发扫描Mysql的,但是这就要求告知sqoop如何为SQL分片查询结果集,$CONDITION就是sqoop为每个mapper替换的分片过滤条件,我们还需要额外传一些信息给sqoop协助它分片。但是不同的mapper是各自执行SQL事务,如果导入期间有数据更新,很难说分片之间是否会出现数据漂移产生重复数据,所以一般我们就1个mapper处理完整的SQL结果集最靠谱。
查看HDFS
SQL结果集已经按parquet压缩格式写入到HDFS,我们看一下:
1 2 3 4 5 |
root@ubuntu:~/bigdata# hdfs dfs -ls /dw/tmall/db/user/2020_10_13 Found 3 items drwxr-xr-x - root supergroup 0 2020-10-22 06:09 /dw/tmall/db/user/2020_10_13/.metadata drwxr-xr-x - root supergroup 0 2020-10-22 06:09 /dw/tmall/db/user/2020_10_13/.signals -rw-r--r-- 1 root supergroup 1291 2020-10-22 06:09 /dw/tmall/db/user/2020_10_13/5dec0a19-cf0a-4d30-af9c-46fa3fe364ec.parquet |
该目录下的数据可以直接load到ods hive表中。
创建ods表
1 2 3 4 5 6 7 8 9 10 11 |
create external table if not exists tmall.ods_user( `host` string, `user` string, `password_lifetime` string, `password_last_changed` string ) comment 'ODS mysql user' partitioned by (`dt` string) stored as parquet location '/dw/tmall/ods/ods_user' tblproperties("parquet.compression"="SNAPPY"); |
我们创建ods外表,数据目录遵循规范/dw/{数仓名}/ods/{表名},并且配置和sqoop一样的parquet格式与压缩算法snappy,同时按dt日期分区。
加载HDFS到Hive
1 2 3 4 |
hive> load data inpath "/dw/tmall/db/user/2020_10_13" overwrite into table tmall.ods_user partition(dt='2020-10-13'); Loading data to table tmall.ods_user partition (dt=2020-10-13) OK Time taken: 2.696 seconds |
执行load命令,将hdfs文件mv到hive表的对应分区下。
然后查看hive表分区数据:
1 2 3 4 5 6 7 8 9 |
hive> select * from tmall.ods_user where `dt`='2020-10-13'; OK ods_user.host ods_user.user ods_user.password_lifetime ods_user.password_last_changed ods_user.dt localhost mysql.session NULL 1602566829000 2020-10-13 localhost mysql.sys NULL 1602566829000 2020-10-13 localhost debian-sys-maint NULL 1602566833000 2020-10-13 % root NULL 1602568312000 2020-10-13 Time taken: 3.623 seconds, Fetched: 4 row(s) |
可见数据和mysql结果集完全一致,有parquet列存格式的帮助,我们完全不需要担心mysql列如何对应到hive列,也不需要担心所谓的Null值兼容性问题。
导出
数仓最终是会将统计结果导出回mysql,供业务侧访问使用,下面演示这样的流程。
创建mysql数据库
1 |
create database if not exists sqoop; |
因为T+1任务每天自动执行,我们可以习惯的使用if not exists来避免报错。
创建mysql业务表
1 2 3 4 5 6 |
create table if not exists sqoop.bi_user( `host` varchar(256), `user` varchar(256), `password_lifetime` varchar(256), `password_last_changed` varchar(256) ); |
该表供线上业务直接访问,字段和要导出的hive表字段一致,字段类型兼容即可。
创建当日mysql业务表
我们先将Hive当日分区数据导出到mysql当日的临时表,因为直接导入到线上bi_user表会影响业务使用。
1 2 |
drop table if exists bi_user_2020_10_13; create table sqoop.bi_user_2020_10_13 like sqoop.bi_user; |
逻辑是先清空临时表并重新创建它,表结构与bi_user一样,现在有这两张表:
1 2 3 4 5 6 7 8 |
mysql> show tables; +--------------------+ | Tables_in_sqoop | +--------------------+ | bi_user | | bi_user_2020_10_13 | +--------------------+ 2 rows in set (0.00 sec) |
提取hive分区数据到当日mysql表
sqoop export –connect jdbc:mysql://localhost:3306/sqoop –username root –password baidu@123 –table bi_user_2020_10_13 –num-mappers 2 –hcatalog-database tmall –hcatalog-table ods_user –hcatalog-partition-keys dt –hcatalog-partition-values 2020-10-13
- –table:指定写入的mysql表名。
- –num-mappers:把Hive数据写入Mysql建议并发执行,因为Hive表数据不会变动,可以直接分片多个mapper写入。
- –hcatalog-database:指定hive数据库,hcatalog就是hive的编程SDK。
- –hcatalog-table:指定hive表。
- –hcatalog-partition-keys:导出hive分区的key。
- –hcatalog-partition-values:导出hive分区的value。
查看mysql表数据:
1 2 3 4 5 6 7 8 9 10 |
mysql> select * from bi_user_2020_10_13; +-----------+------------------+-------------------+-----------------------+ | host | user | password_lifetime | password_last_changed | +-----------+------------------+-------------------+-----------------------+ | localhost | mysql.session | NULL | 1602566829000 | | localhost | mysql.sys | NULL | 1602566829000 | | localhost | debian-sys-maint | NULL | 1602566833000 | | % | root | NULL | 1602568312000 | +-----------+------------------+-------------------+-----------------------+ 4 rows in set (0.00 sec) |
与hive表完全一致。
重命名mysql表
最终将mysql临时表bi_user_2020_10_13改名为线上表bi_user,同时将bi_user表备份一下。
mysql> rename table bi_user to bi_user_bk_2020_10_13, bi_user_2020_10_13 to bi_user;
数据表变为了:
1 2 3 4 5 6 7 |
mysql> show tables; +-----------------------+ | Tables_in_sqoop | +-----------------------+ | bi_user | | bi_user_bk_2020_10_13 | +-----------------------+ |
查看bi_user已经有了最新的数仓数据:
1 2 3 4 5 6 7 8 9 10 |
mysql> select * from bi_user; +-----------+------------------+-------------------+-----------------------+ | host | user | password_lifetime | password_last_changed | +-----------+------------------+-------------------+-----------------------+ | localhost | mysql.session | NULL | 1602566829000 | | localhost | mysql.sys | NULL | 1602566829000 | | localhost | debian-sys-maint | NULL | 1602566833000 | | % | root | NULL | 1602568312000 | +-----------+------------------+-------------------+-----------------------+ 4 rows in set (0.00 sec) |
这就是sqoop的基本用法,T+1执行数仓任务,将全量或者前日增量数据进行导入或者导出,满足数仓与业务需求。
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~
