编写flume-ng扩展提升吞吐

最近在公司做openresty+flume+kafka的前端日志采集,在测试flume时发现向kafka传输的带宽吞吐才20MB/s,远远无法满足需求。

找到瓶颈

不可盲目

盲目优化是很浪费时间的,一开始只能调调flume参数,改改batchSize之类的参数,结果无功而返。

最终无奈,决定静心看一下flume的架构原理。

了解架构

flume的流水线上,包括source采集磁盘日志,channel缓存采集的日志,sink将日志发往远端,按道理我需要分析出到底哪个环节拖慢了整体的流水线带宽。

对于流水线系统来说,靠后的环节慢,则会导致之前的环节全部变慢,因为流水线拥塞了。

比如sink处理慢,就导致channel填满,channel填满就导致source停止采集。

怀疑的点

source只是简单的TAILDIR模式,采集目录下的增量日志,顺序读磁盘的带宽远不止20MB/s,所以我对这个环节的怀疑是最轻的。

channel我采用了memory channel,因为source和sink各只有1个线程,锁竞争不至于成为瓶颈。

sink采用了kafka sink,采用阻塞模型,发送完一批才会从channel取下一批。

kafka集群是否存在性能问题呢?为了验证这一点,我配置了2个channel,让source采用repliacating模式复制2份流量,并为2个channel各配置1个kafka sink,发现带宽就是40MB/s了,说明kafka不是问题。

数据说话

性能调优不能靠猜,最好有数据为证,启动flume时可以配置开启metrics端口,这是一个http接口,可以查询实时flume性能指标。

启动时指定参数:

查看实时指标:

返回JSON如下:

 

主要观察channel的填充率ChannelFillPercentage,如果接近100%说明队列无法及时被消费,瓶颈在sink端。(上述json是我优化后的,发现channel基本为空)。

优化

既然瓶颈是同步阻塞推送kafka的sink端,那么显然增加sink的数量就可以加快channel消费。

一开始我未经了解,直接使用了sink gourp为channel配置了2个kafka sink,采用load balance来分发流量,结果发现仍旧20MB/s,毫无提升。

经过思考得知,sink group仍旧采用单线程工作,只是充当了2个kafka sink的代理而已,日志轮转的被交给2个kafka sink对象,即2个kafka sink对象在同一个线程里交替被调用,根本没有并行能力。

照着这个优化方向,我期望可以配置多个channel,每个channel一个sink线程,并让source将流量均匀的派发给2个channel,从而实现多线程并发。

然而事实上,flume并没有提供可以直接拿来用的source分发插件,需要我们自己开发,这是我的插件:https://github.com/owenliang/flume-ng-round-robin-channel-selector

我们实现一个自定义的Channel Selector类,然后重新编译flume生成新的jar包,覆盖到flume即可。

下面是我最终的配置,我配置了3个channel,3个kafka sink,1个source,并配置source的channel selector为我的插件,从而可以将流量轮转的发给每一个channel:

batchSize等参数当然具有一定的意义,但是仅用于优化单个pipeline(流水线),要实现线性扩展是需要在线程扩展性方面做上述优化工作的。

成果

经过优化,在4核的服务器上运行4个openresty+1个flume进程,仍旧可以跑出40MB/s的网卡外出流量,日志采集无延迟,达到了我预期中的效果。

如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~