sqoop – mysql导入hive与hive导出mysql

sqoop用于关系型数据库与数仓进行数据交换,是大数据必备工具。

常见需求包含2种:

  • mysql数据T+1导入hdfs,再load到hive表分区下。
  • hive表T+1导出mysql,服务线上业务。

下面我们做一下具体演示。

下载sqoop

从官网下载sqoop(带hadoop后缀版本),

wget https://mirror.bit.edu.cn/apache/sqoop/1.4.7/sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz

解压sqoop到某个目录,然后导出PATH环境变量:

export PATH=/usr/local/jdk/bin:/root/bigdata/hadoop-3.3.0/bin:/root/bigdata/spark-3.0.1/bin:/root/bigdata/hive-3.1.3/bin:/root/bigdata/sqoop-1.4.7/bin:${PATH}

export SQOOP_HOME=/root/bigdata/sqoop-1.4.7

配置sqoop

sqoop会操作HDFS、Mapreduce、Hive、Mysql,因此有一些必要的配置项。

首先拷贝环境变量配置文件:

cp conf/sqoop-env-template.sh conf/sqoop-env.sh

在sqoop-env中必须添加如下3个环境变量:

hcatlog实际就是Hive的sdk,可以编程读写hive表的schema,提供了屏蔽底层文件格式的inputformat/outputformat,可以很方便的开发MapReducer程序处理表格数据。

上述缺一不可。

下载jar依赖

直接使用sqoop命令会报错找不到某个common lang的类,我们需要放一个合适版本的jar包到lib目录下:

另外需要mysql jdbc来访问mysql:

就此安装结束。

导入

接下来把mysql数据导入到hdfs,然后加载到hive的ods表中。

准备sql语句

出于简单,我们把mysql数据库的user表的某些字段提取出来,导入到hdfs中。

作为数仓T+1任务,我们假设昨天的日期恰好是2020-10-13,因此可以提取出上述4行数据。

执行sqoop import

现在执行sqoop import,把上述SQL的数据写入到HDFS中:

sqoop import –connect jdbc:mysql://localhost:3306/mysql –username root –password baidu@123 –query ‘select host,user,password_lifetime,password_last_changed from user where date_format(password_last_changed, “%Y-%m-%d”)=”2020-10-13″ and $CONDITIONS’ –as-parquetfile –compress –compression-codec snappy –target-dir “/dw/tmall/db/user/2020_10_13” –delete-target-dir –num-mappers 1

  • –connect:mysql地址
  • –username/–password:mysql账号密码
  • -query:SQL,得到当天的增量数据,其中$CONDITION必须写到where条件里占位,后续解释。
  • –as-parquetfile:HDFS按parquetfile文件格式存储,天然的列式存储,不需要指定什么列分隔符与行分隔符之类的,支持高效压缩,可以直接被Hive表load。
  • –compress/–compression-codec:指定snappy压缩。
  • –target-dir:数据写入HDFS路径,我们写到数仓的规范路径下,/dw/{数仓名}/db/{表名}/{分区}。
  • –delete-target-dir:写入前先删除target-dir。
  • –num-mappers:只启动1个mapper执行SQL写入HDFS文件。

sqoop是支持多个Mapper并发扫描Mysql的,但是这就要求告知sqoop如何为SQL分片查询结果集,$CONDITION就是sqoop为每个mapper替换的分片过滤条件,我们还需要额外传一些信息给sqoop协助它分片。但是不同的mapper是各自执行SQL事务,如果导入期间有数据更新,很难说分片之间是否会出现数据漂移产生重复数据,所以一般我们就1个mapper处理完整的SQL结果集最靠谱。

查看HDFS

SQL结果集已经按parquet压缩格式写入到HDFS,我们看一下:

该目录下的数据可以直接load到ods hive表中。

创建ods表

我们创建ods外表,数据目录遵循规范/dw/{数仓名}/ods/{表名},并且配置和sqoop一样的parquet格式与压缩算法snappy,同时按dt日期分区。

加载HDFS到Hive

执行load命令,将hdfs文件mv到hive表的对应分区下。

然后查看hive表分区数据:

可见数据和mysql结果集完全一致,有parquet列存格式的帮助,我们完全不需要担心mysql列如何对应到hive列,也不需要担心所谓的Null值兼容性问题。

导出

数仓最终是会将统计结果导出回mysql,供业务侧访问使用,下面演示这样的流程。

创建mysql数据库

因为T+1任务每天自动执行,我们可以习惯的使用if not exists来避免报错。

创建mysql业务表

该表供线上业务直接访问,字段和要导出的hive表字段一致,字段类型兼容即可。

创建当日mysql业务表

我们先将Hive当日分区数据导出到mysql当日的临时表,因为直接导入到线上bi_user表会影响业务使用。

逻辑是先清空临时表并重新创建它,表结构与bi_user一样,现在有这两张表:

提取hive分区数据到当日mysql表

sqoop export –connect jdbc:mysql://localhost:3306/sqoop –username root –password baidu@123 –table bi_user_2020_10_13 –num-mappers 2 –hcatalog-database tmall –hcatalog-table ods_user –hcatalog-partition-keys dt –hcatalog-partition-values 2020-10-13

  • –table:指定写入的mysql表名。
  • –num-mappers:把Hive数据写入Mysql建议并发执行,因为Hive表数据不会变动,可以直接分片多个mapper写入。
  • –hcatalog-database:指定hive数据库,hcatalog就是hive的编程SDK。
  • –hcatalog-table:指定hive表。
  • –hcatalog-partition-keys:导出hive分区的key。
  • –hcatalog-partition-values:导出hive分区的value。

查看mysql表数据:

与hive表完全一致。

重命名mysql表

最终将mysql临时表bi_user_2020_10_13改名为线上表bi_user,同时将bi_user表备份一下。

mysql> rename table bi_user to bi_user_bk_2020_10_13, bi_user_2020_10_13 to bi_user;

数据表变为了:

查看bi_user已经有了最新的数仓数据:

这就是sqoop的基本用法,T+1执行数仓任务,将全量或者前日增量数据进行导入或者导出,满足数仓与业务需求。

如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~