apache flink 入门

现在非常流行流式计算,尤其在推荐领域特别需要实时的用户行为特征,以便满足用户短期的偏好。

流式计算存在已久,早期的有storm和spark streaming,而flink作为后起之秀已经成为主流选择,这很大成都上得益于flink的特性之强大,也就是提供了基于窗口的计算能力,从而可以实现诸如:统计5分钟内访问最多的N个商品等复杂计算场景。

无论你现在是否需要流式计算,我都建议你先来体验一下flink,它的确是一个非常强大的工具。

这篇博客的目的是:

  • 从纯小白的角度,安装idea+java+flink开发环境。
  • 搭建完整的流式计算pipeline示例,包括:输入、计算、输出。

搭建过程完全参照flink官方手册,代码示例经过我的一定扩展延伸。

安装环境

在mac或者linux都可以,我个人是在mac。

我们需要安装这些东西:

然后需要给maven换源,否则后续flink项目的一些jar包是无法下载成功的。

maven默认会去当前用户HOME目录下,寻找.m2/settings.xml文件,所以我在mac上创建了这个文件:

这里只需要写2个涉及到的镜像地址(世界上有很多maven仓库,如果你碰到下载失败,可以根据aliyun提供的镜像仓库添加对应的mirror即可):

创建项目(基于命令行)

我们需要使用flink官方提供的maven脚手架来初始化项目,它做了2个事情:

  • 生成项目的目录结构
  • 生成pom.xml,里面配置了依赖、构建、打包

  • archetype开头的都是flink提供的脚手架地址,这样maven就会找到对应的脚手架来初始化项目了。
  • groupId、artifactId、version是我们项目的信息,其中artifactId决定了生成的项目目录名。
  • package挺重要的,是最终项目里面的class包名,需要写个正经的。

很快项目就初始化好了,你能看到这些信息:

然后就可以进入flink-demo1目录了:

这是一个标准的maven项目,代码放在src/main/java下面,配置文件放在src/main/resources下面。

脚手架自带2个demo文件,一个做流式计算,一个做批量计算,我们删除它们即可。

rm src/main/java/demo1/*

然后在src/main/java/demo1下面创建一个我们自己的程序MyStreaming.java:

然后打开pom.xml,默认依赖库很多都是provided和runtime类型的,这样并不会把依赖类打到最终的jar包中,就会导致运行jar包的时候依赖class找不到,一般我们都是希望把依赖以及项目代码全部打包到一个jar包做发布的,所以我们都改成compile:

另外,pom.xml最终是依靠maven-shade-plugin来打包jar的,我们希望它把我们的class以及依赖的所有class都打到一个jar里,为此我们需要做一些调整。

默认pom.xml中排除掉了log4j等库,这样jar中就缺少了这些依赖的class,所以我们注释掉exclude部分:

然后在transformer部分修改一下jar包的main class,指向我们自定义的程序入口:

另外,我们还需要添加一个transformers来处理一个特殊问题:flink内部用了akka库,需要把akka库的配置文件reference.conf文件也打包到jar里,最终transformers部分就是这样:

验证项目初始化正确

执行打包命令:

mvn package

在target目录下将生成包含完整依赖的jar包,并且jar包的程序入口是demo1.MyStreaming,所以我们只需要执行jar即可:

java -jar target/flink-demo1-0.1.jar

程序没有任何报错,就代表我们已经具备了开发flink应用的项目框架。

启动flink

flink由jobmanager和若干taskmanager组成,前者负责调度,后者负责执行任务。

现在我们利用brew info apache-flink看一下flink的安装目录:

然后进入/usr/local/Cellar/apache-flink/1.9.1。

编辑一下libexec/conf/flink-conf.yaml,将taskmanager可以处理的并发任务数量调高(slot就是线程,一般与核心数量相同):

启动1个jobmanager(也就是flink的master):

然后启动1个taskmanager:

打开浏览器打开http://localhost:8081,可以看到管理界面了。

打开idea IDE

利用idea打开代码,它会提示识别到pom.xml,我们点击同意则项目被识别为一个maven项目。

打开首选项,调整一下idea使用我们brew安装的maven而不是idea自带的maven:

然后右侧执行mvn package编译一下代码:

编译好之后的jar还是在target目录,右键执行即可:

编写代码

在flink中,数据需要从source流入,然后经过一系列transformation聚合统计的计算之后,再通过sink流出,其中source和sink一般就是kafka等队列。

我们要做的是自己实现source和sink,在source中源源不断的伪造”用户点击日志”,在agg阶段利用flink window窗口机制实现统计每5秒内各个用户的点击量,在sink阶段将每个window的统计结果打印出来。

flink官方给出了一个流程图如下:

更多编程上的理解大家自己阅读一下官方手册:

获取execution env

我们的程序最终打包成jar包,然后交给flink run命令来提交执行。

flink run会给我们的程序一些环境变量,这样我们main函数里可以获取到flink run指定的flink集群信息,对其进行计算图的提交。

自定义source

source就是数据源,大部分情况都是直接使用flink提供的kafka source。

为了演示,我们自定义一个DataSource,参考官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html#data-sources

新建src/main/java/demo1/MySource.java,编写如下自定义Source:

根据官方手册说法,我们继承一下SourceFunction即可,模板参数是我们吐出去的数据类型。

  • run方法:源源不断的向ctx提交输入,我们伪造日志行,其格式是”用户名 时间”。
  • cancel方法:当任务被取消的死后,需要设置标记位,以便打断死循环的run方法。

然后把这个source添加到我们的计算拓扑图里:

自定义数据解析

我们可以把日志预先解析一下,这样方便后续高效处理。

flink内置TupleN类型,对于简单的数据结构比较适用。

我们新定义一个转换类MyMapper.java:

它输入的是String原始日志,返回的是(username, timestamp)的二元组。

我们把这个类应用到flatMap方法,就可以对原始日志做进一步解析:

提取event time,设置watermark

因为后续我们要在无边界的流数据中按一定的时间窗口进行统计,所以需要知道每一条数据的时间,以便落入正确的窗口进行计算。

但是数据被划入某个时间段的window后并不是立即计算,而是需要等待watermark的更新,一旦watermark大于了该window的结束时间,那么这个window中的数据才开始被聚合计算。

有一个博客总结的非常好,我把它引用过来,大家理解一下即可:

Window: Window是处理无界流的关键,Windows将流拆分为一个个有限大小的buckets,可以可以在每一个buckets中进行计算

start_time,end_time: 当Window时时间窗口的时候,每个window都会有一个开始时间和结束时间(前开后闭),这个时间是系统时间

event-time: 事件发生时间,是事件发生所在设备的当地时间,比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间

Watermarks: 可以把他理解为一个水位线,这个Watermarks在不断的变化,一旦Watermarks大于了某个window的end_time,就会触发此window的计算,Watermarks就是用来触发window计算的

我们要从日志中提取真正的event time,但是因为日志到达时会乱序,所以我们需要让window到期之后再延迟几秒等到delay到达的数据。

一个更好的方法是基于最新到达数据的event time,在其基础上减去一定的时间作为watermark,这样就可以让已经过期的window多等待一会,我们就使用这种方法。

自定义一个类叫做MyAssigner:

复写extractTimestamp方法,从tuple里提取出日志时间,作为event time,用于指导flink该数据落入哪个window。

复写checkAndGetNextWatermark方法,根据该event time倒退2秒作为watermark,这样就可以让已经过期的window可以延迟2秒进行最终计算。

然后设置flink基于event time来将数据划入window,同时设置我们的assginer从日志提取eventtime以及更新watermark:

transformation:按username分组

正式进入计算环节。

我们希望是统计每个user在5秒内的点击次数,所以首先要按user聚合,每个user有各自的window进行统计。

这时候需要使用keyBy方法:

transformation:划分窗口(window)

在无边界的流式数据中,我们按系统时间每5秒作为一个window,数据根据其event time落入对应区间的window,当watermark超过window的end_time时,该window中的所有数据被一次性计算。

划分窗口只需要调用window方法:

transformation:自定义统计

最终我们需要定义一个统计方法,用于给每个window做计算,其输入就是window内的所有数据,它们当然一定属于同一个username,因为此前已经keyBy分组过了。

window展开来说又有很多概念,但是都不难理解,大家可以重点来看一下官方手册window部分:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html

我演示一下用Aggregation方式来统计次数,我们新建一个MyAgg.java文件:

这个类用于对1个window做计算,accumulator表示中间计算结果,一开始需要初始化一个。

flink会不断的回调add让我们对当前窗口进行统计计算,我们需要不断的更新中间结果accumulator。

因为flink允许并行计算,可能同一个username被2个windows分别计算,最终再merge为一个结果。

getResult就是输出Output,我们做的仅仅是把中间结果直接返回即可。

把MyAgg添加到后续计算过程中:

自定义sink输出

终于到了最后,每个window经过计算最终输出了一个tuple,就是当前5秒窗口的一个计数结果,他属于某个用户。

我们可以自定义一个sink把结果打印出来(真实情况应该是写到hbase/redis等存储):

这里我直接输出到了终端,稍后我们基于本地模式调试运行代码,是可以看到每个计算环节的标准输出的。而最终代码如果打包成jar提交到flink,那么就只能去flink的web界面查看日志了。

提交计算图

最后调用env把计算任务提交到flink集群:

运行代码

本地调试

写好flink代码后,我们可以在idea中点绿色小箭头运行MyStreaming.java,它会启动一套临时的flink集群环境,并且代码中的println都可以在终端看到。

我这里运行后,每隔5秒就会输出若干window的agg计算结果:

其统计了最近5秒,各个用户的日志条数。

正式提交

我们只需要执行mvn package得到jar包,然后利用flink命令行工具正式向flink集群提交即可。

-d表示提交给flink就直接返回,如果不传则flink run命令会阻塞住,但是要注意flink run即便被杀死任务也已经提交到flink,不会因为终端程序杀死而取消任务。

我们可以打开http://localhost:8081/,查看任务的执行情况(但是并不直观。。。因为多个步骤都被flink作为一个task运行,所以看不出各个子阶段的输入输出次数):

也可以通过查看taskManager看到终端日志输出:

如果要更新job代码,那么需要通过flink命令行或者web界面先杀死正在运行的,然后再重新提交一次jar;如果不杀死旧的而是直接重新提交一次jar,那么就会生成1个新job,老job也同时在运行,这个真是不方便。

其他

并发度

从flink web可以看出,这个任务目前被分为2个并行度为1的task,分别占用1个slot(也就是线程):

  • source的task
  • window -> sink的task

为了提高计算能力,我们可以根据需要配置各个步骤的并行度,可以从几个角度配置,具体见官方手册:https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/parallel.html

最简单的就是flink run提交任务的时候通过命令行参数指定一个并发度:

这样flink会自动的去增加每个计算步骤的并发线程数量,并自动做好shuffle。

因为有了更高并发度,所以flink调度的时候已经把flatmap和window计算环节单独摘了出来作为新的task,并且并行度都被提高到了3个slot(线程)。

checkpoints

为了避免flink集群节点宕机导致某些正在计算中途的数据丢失,flink通过checkpoint定期备份来确保数据不会丢失,我们要做的主要是激活这个特性即可,具体参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/checkpointing.html

我们要做的就是激活这个特性:

checkpoints是每个flink节点都会去作的,所以checkpoint还需要我们指定一个分布式的文件系统来作存储,一般选择是hdfs,这里我就不展开了。

更复杂的拓扑

从我们上面的代码来看,每次都是基于上一个datastream执行增加新的算子得到新的datastream,整个流程是串行向后的。

实际上,我们完全可以基于上一个datastream做2个分叉出来,也就是对同一个datastream分别添加2次新算子,那么就会得到2个新的datastream,从而实现更复杂的计算拓扑。

比如我们把处理结果复制到2个sink各自打印:

关于flink入门的部分就到这里,完整代码见github:https://github.com/owenliang/flink-demo1

如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~