spark系列 – spark-sql编程使用

体验过spark RDD编程,我们能感受到几个问题:

  • RDD最多支持到K-V形式,对编程模型有点束缚。– 不如SQL好用~
  • RDD中的数据有类型,因此各种算子都需要持续关注模板类型。 — 能不能隐式类型?
  • RDD没法和Hive表结构天然对应,导致spark清洗数据导出到Hive不方便。–能否支持table形式?

那么这些问题都由spark-sql库来支持:

Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed.

spark-sql可以让我们创建table表格,定义表格字段与类型,然后直接SQL操作数据,最后还可以把table写到HIve中去,供后续数仓使用。

同时,spark-sql和spark-core的RDD可以互相转换,下面的例子我们会逐步演示。

下面的示例代码地址:https://github.com/owenliang/spark-sql-demo

引入maven依赖

除了spark-core外,我们额外引入spark-sql库。

后续演示代码用了fastjson库,也引入进来。

初始化连接

spark-sql需要使用SparkSession来初始化与spark的连接,这和spark-core是不一样的。

定义Person类

后续我们会定义Person表,所以我们先定义一个Person的Bean类型:

它有name和age两个字段,后续会对应到table的2个列,另外用fastjson注解了一下JSON序列化。

例子1:加载JSON文本为Dataframe

我们接下来在HDFS中生成一个JSON文本文件,每行是一个Person的JSON字符串。

然后我们利用spark-sql的api,将JSON文本加载为一个Dataframe,它就是spark中对table的抽象。

所谓Dataframe,在Java代码中的具体表现就是Dataset<Row>的一个数据集,我们并不会直接看到Dataframe的代码字样。

Dataset是一种更高级的RDD,它可以和JavaRDD互相转化。

Dataset<>的模板参数也支持任意可序列化类型,但是当Dataset用<Row>作为模板参数时,我们把它叫做Dataframe,此时它是一个table,可以直接执行SQL。

所以Dataset是高级的RDD,而Dataset<Row>是更高级的Dataset,我们把它叫做Dataframe。

  • 我们打开HDFS文件,随机写入1000个PersonBean的JSON字符串,用换行符分隔,为这些Person生成唯一的name和随机的age。
  • 利用sparkSession.read().json()加载 HDFS文件,可以直接得到Dataset<Row>,也就是DataFrame(简称DF)。
  • 既然DF是表格,我们直接可以打印表格的schema,就像mysql show create table一样,也可以通过show打印一些数据样例。

程序输出如下:

可以看到age列是long类型,name列是string类型,这些都是spark-sql自动从json中识别出来的。

这里Row只是一种特殊的类型,它里面存储了表格的列类型和内容,我们还需要通过下面的几个例子来加深理解。

例子2:加载JSON文本,手动转为Dataframe

上个例子中,我们直接利用spark-sql的read().json()方法把json文本解析成了DataFrame。

现在我们把例子1的代码改成手动版,用于加深Dataset理解。

  • 利用read().text()来加载json文件,虽然返回的也是Dataset<Row>,但是这里的Row里面只有1列原始JSON文本,并没有解析成字段。
  • 利用df的javaRDD()转回JavaRDD,方便进行编程操作。
  • 对javaRDD进行map处理,传入function取出Row的第0列,利用fastjson解析到PersonBean,返回一个RDD<PersonBean>。
  • 最后通过sparkSession的createDataFrame,把RDD<Person>转成Dataset<Row>,这一步spark会自动根据PersonBean的字段来创建Dataframe的列schema。

可见,spark-sql能够从RDD转换为DataFrame,而且schema能够自动从java bean的字段解析得到,还有另外一种方式是createDataFrame时手动传入schema(大家可以文章头部的官方文档)。

例子3:Dataframe执行SQL

说了半天dataframe怎么加载,现在我们看看怎么写SQL:

  • read().json()把json文件加载为dataframe。
  • 在当前sparkSession下,把上述dataframe创建为会话级的临时视图表people。
  • 直接在sparkSession下执行SQL查询people表,就像使用mysql/hive一样,其返回值还是dataframe,里面统计了每个年龄有多少人,包含age和cnt两个字段。
  • show()打印结果集的前几行。
  • 直接对DataFrame执行map(Dataset是一种高级的RDD),传入MapFunction把每行Row的cnt列返回,结果Dataset是Integer数据类型,你会发现返回的Dataset需要传入的Encoders.INT()序列化方法,这个和RDD不太一样(RDD只要求类型实现Seriablizable接口即可,而Dataset则要求传入一种序列化方法)。
  • 最后再执行一波reduce把所有计数汇总起来。

Dataset是spark-sql的抽象,它比RDD性能更好的原因是,它采用特殊的数据序列化方法,可以在不反序列化数据的情况下直接对某个列进行条件过滤或者聚合统计等,所以如果我们要创建Dataset必须传入Encoders。

Encoders不需要我们自己实现,常见类型都内置,自定义Java bean类型spark-sql会自动反射字段为schema,也支持手动挡配置一个schema出来。

例子4:保存DataFrame到Hive

经过spark清洗的dataframe,最后一般要写入HIVE表。

建议用SQL方式来完成数据的插入,这样就和用HIve没啥区别,不容易坑。

  • 加载json为dataframe。
  • 将dataframe覆盖写出到某个hive表的目录下,这里用save方法直接保存到hdfs路径下。
  • 最后再通过load直接加载某个路径下的数据。

show将输出表格内容:

实际情况中,建议把dataframe通过createOrReplaceTempView注册为临时表,然后直接sess.sql将数据insert overwrite到hive表中(通常可能涉及到和老表的数据merge,所以SQL是更加实际的)。

无论hive是内表还是外表,如果hive是带分区的,我们都可以直接向hdfs对应分区路径写入数据,然后再去通过hive命令修改hive表的metadata添加该分区信息。

但是上述先写数据再改meta的方法太不实用,实际情况我们还是通过临时表+sql的方式向hive表导入数据更加方便,此时如果需要hive动态分区特性也可以通过配置开启,和hive原生体验基本一致。

总结

spark-sql的意义是把数据清洗后转换为dataframe表格结构,再进一步写入到hive表,这样就可以衔接起ETL和数仓两个部分了。

由于现在还没开始讲hive,所以spark和hive更多的交互留在以后再做更多举例。

如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~