最近在公司做openresty+flume+kafka的前端日志采集,在测试flume时发现向kafka传输的带宽吞吐才20MB/s,远远无法满足需求。
找到瓶颈
不可盲目
盲目优化是很浪费时间的,一开始只能调调flume参数,改改batchSize之类的参数,结果无功而返。
最终无奈,决定静心看一下flume的架构原理。
了解架构
flume的流水线上,包括source采集磁盘日志,channel缓存采集的日志,sink将日志发往远端,按道理我需要分析出到底哪个环节拖慢了整体的流水线带宽。
对于流水线系统来说,靠后的环节慢,则会导致之前的环节全部变慢,因为流水线拥塞了。
比如sink处理慢,就导致channel填满,channel填满就导致source停止采集。
怀疑的点
source只是简单的TAILDIR模式,采集目录下的增量日志,顺序读磁盘的带宽远不止20MB/s,所以我对这个环节的怀疑是最轻的。
channel我采用了memory channel,因为source和sink各只有1个线程,锁竞争不至于成为瓶颈。
sink采用了kafka sink,采用阻塞模型,发送完一批才会从channel取下一批。
kafka集群是否存在性能问题呢?为了验证这一点,我配置了2个channel,让source采用repliacating模式复制2份流量,并为2个channel各配置1个kafka sink,发现带宽就是40MB/s了,说明kafka不是问题。
数据说话
性能调优不能靠猜,最好有数据为证,启动flume时可以配置开启metrics端口,这是一个http接口,可以查询实时flume性能指标。
启动时指定参数:
1 |
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545 |
查看实时指标:
1 |
curl localhost:34545/metrics |
返回JSON如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
{ "SINK.k1": { "ConnectionCreatedCount": "0", "BatchCompleteCount": "0", "BatchEmptyCount": "257", "EventDrainAttemptCount": "0", "StartTime": "1519711797903", "BatchUnderflowCount": "3", "ConnectionFailedCount": "0", "ConnectionClosedCount": "0", "Type": "SINK", "RollbackCount": "0", "EventDrainSuccessCount": "40713702", "KafkaEventSendTimer": "1545475", "StopTime": "0" }, "SINK.k2": { "ConnectionCreatedCount": "0", "BatchCompleteCount": "0", "BatchEmptyCount": "256", "EventDrainAttemptCount": "0", "StartTime": "1519711799103", "BatchUnderflowCount": "3", "ConnectionFailedCount": "0", "ConnectionClosedCount": "0", "Type": "SINK", "RollbackCount": "0", "EventDrainSuccessCount": "40713702", "KafkaEventSendTimer": "1555840", "StopTime": "0" }, "SINK.k3": { "ConnectionCreatedCount": "0", "BatchCompleteCount": "0", "BatchEmptyCount": "256", "EventDrainAttemptCount": "0", "StartTime": "1519711799335", "BatchUnderflowCount": "3", "ConnectionFailedCount": "0", "ConnectionClosedCount": "0", "Type": "SINK", "RollbackCount": "0", "EventDrainSuccessCount": "40713702", "KafkaEventSendTimer": "1556911", "StopTime": "0" }, "CHANNEL.c3": { "ChannelCapacity": "1000000", "ChannelFillPercentage": "0.0", "Type": "CHANNEL", "EventTakeSuccessCount": "40713702", "ChannelSize": "0", "EventTakeAttemptCount": "40713962", "StartTime": "1519711796033", "EventPutAttemptCount": "40713702", "EventPutSuccessCount": "40713702", "StopTime": "0" }, "CHANNEL.c2": { "ChannelCapacity": "1000000", "ChannelFillPercentage": "0.0", "Type": "CHANNEL", "EventTakeSuccessCount": "40713702", "ChannelSize": "0", "EventTakeAttemptCount": "40713962", "StartTime": "1519711796033", "EventPutAttemptCount": "40713702", "EventPutSuccessCount": "40713702", "StopTime": "0" }, "CHANNEL.c1": { "ChannelCapacity": "1000000", "ChannelFillPercentage": "0.0", "Type": "CHANNEL", "EventTakeSuccessCount": "40713702", "ChannelSize": "0", "EventTakeAttemptCount": "40713963", "StartTime": "1519711796033", "EventPutAttemptCount": "40713702", "EventPutSuccessCount": "40713702", "StopTime": "0" }, "SOURCE.src_taildir": { "EventReceivedCount": "122141106", "AppendBatchAcceptedCount": "1566", "Type": "SOURCE", "AppendReceivedCount": "0", "EventAcceptedCount": "122141106", "StartTime": "1519711796541", "AppendAcceptedCount": "0", "OpenConnectionCount": "0", "AppendBatchReceivedCount": "1566", "StopTime": "0" } } |
主要观察channel的填充率ChannelFillPercentage,如果接近100%说明队列无法及时被消费,瓶颈在sink端。(上述json是我优化后的,发现channel基本为空)。
优化
既然瓶颈是同步阻塞推送kafka的sink端,那么显然增加sink的数量就可以加快channel消费。
一开始我未经了解,直接使用了sink gourp为channel配置了2个kafka sink,采用load balance来分发流量,结果发现仍旧20MB/s,毫无提升。
经过思考得知,sink group仍旧采用单线程工作,只是充当了2个kafka sink的代理而已,日志轮转的被交给2个kafka sink对象,即2个kafka sink对象在同一个线程里交替被调用,根本没有并行能力。
照着这个优化方向,我期望可以配置多个channel,每个channel一个sink线程,并让source将流量均匀的派发给2个channel,从而实现多线程并发。
然而事实上,flume并没有提供可以直接拿来用的source分发插件,需要我们自己开发,这是我的插件:https://github.com/owenliang/flume-ng-round-robin-channel-selector。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
/** * 背景: Kafka Sink 单线程同步调用,吞吐无法继续提升。 * * 解决方案:自定义实现channel selector, 将source的流量均匀分发到多个channel, 并让每个channel由一个独立的kafka sink消费 * * 配置:为source指定selector.type=org.apache.flume.channel.RRChannelSelector * */ public class RRChannelSelector extends AbstractChannelSelector { private static final List<Channel> EMPTY_LIST = new ArrayList<>(); private int rrIndex = 0; @Override public List<Channel> getRequiredChannels(Event event) { List<Channel> allChannels = getAllChannels(); int index = rrIndex; rrIndex = (rrIndex + 1) % allChannels.size(); List<Channel> result = new ArrayList<>(); result.add(allChannels.get(index)); return result; } @Override public List<Channel> getOptionalChannels(Event event) { return RRChannelSelector.EMPTY_LIST; } @Override public void configure(Context context) { } } |
我们实现一个自定义的Channel Selector类,然后重新编译flume生成新的jar包,覆盖到flume即可。
下面是我最终的配置,我配置了3个channel,3个kafka sink,1个source,并配置source的channel selector为我的插件,从而可以将流量轮转的发给每一个channel:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# describe the agent agent_zdm1.sources=src_taildir agent_zdm1.sinks=k1 k2 k3 agent_zdm1.channels=c1 c2 c3 # Describe/configure the source agent_zdm1.sources.src_taildir.type = TAILDIR agent_zdm1.sources.src_taildir.positionFile = /root/log-analyze/taildir/taildir_position.json agent_zdm1.sources.src_taildir.filegroups = f1 agent_zdm1.sources.src_taildir.filegroups.f1 = /data/logs/collect/.* agent_zdm1.sources.src_taildir.batchSize = 100000 agent_zdm1.sources.src_taildir.selector.type = org.apache.flume.channel.RRChannelSelector #agent_zdm1.sources.src_taildir.backoffSleepIncrement = 2 #agent_zdm1.sources.src_taildir.maxBackoffSleep = 10 # Describe the sink agent_zdm1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink agent_zdm1.sinks.k1.kafka.topic = analytics-zcollect agent_zdm1.sinks.k1.kafka.bootstrap.servers = localhost:9092 agent_zdm1.sinks.k1.kafka.flumeBatchSize = 1000 agent_zdm1.sinks.k1.kafka.producer.acks = 0 agent_zdm1.sinks.k1.kafka.producer.linger.ms = 100 agent_zdm1.sinks.k1.kafka.producer.batch.size = 100000 agent_zdm1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink agent_zdm1.sinks.k2.kafka.topic = analytics-zcollect agent_zdm1.sinks.k2.kafka.bootstrap.servers = localhost:9092 agent_zdm1.sinks.k2.kafka.flumeBatchSize = 1000 agent_zdm1.sinks.k2.kafka.producer.acks = 0 agent_zdm1.sinks.k2.kafka.producer.linger.ms = 500 agent_zdm1.sinks.k2.kafka.producer.batch.size = 100000 agent_zdm1.sinks.k3.type = org.apache.flume.sink.kafka.KafkaSink agent_zdm1.sinks.k3.kafka.topic = analytics-zcollect agent_zdm1.sinks.k3.kafka.bootstrap.servers = localhost:9092 agent_zdm1.sinks.k3.kafka.flumeBatchSize = 1000 agent_zdm1.sinks.k3.kafka.producer.acks = 0 agent_zdm1.sinks.k3.kafka.producer.linger.ms = 500 agent_zdm1.sinks.k3.kafka.producer.batch.size = 100000 # Use a channel which buffers events in memory agent_zdm1.channels.c1.type = memory agent_zdm1.channels.c1.capacity = 1000000 agent_zdm1.channels.c1.transactionCapacity = 100000 agent_zdm1.channels.c2.type = memory agent_zdm1.channels.c2.capacity = 1000000 agent_zdm1.channels.c2.transactionCapacity = 100000 agent_zdm1.channels.c3.type = memory agent_zdm1.channels.c3.capacity = 1000000 agent_zdm1.channels.c3.transactionCapacity = 100000 # Bind the source and sink to the channel agent_zdm1.sources.src_taildir.channels = c1 c2 c3 agent_zdm1.sinks.k1.channel = c1 agent_zdm1.sinks.k2.channel = c2 agent_zdm1.sinks.k3.channel = c3 |
batchSize等参数当然具有一定的意义,但是仅用于优化单个pipeline(流水线),要实现线性扩展是需要在线程扩展性方面做上述优化工作的。
成果
经过优化,在4核的服务器上运行4个openresty+1个flume进程,仍旧可以跑出40MB/s的网卡外出流量,日志采集无延迟,达到了我预期中的效果。
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~

ack=0 容易丢数据怎么办 ack=1 又太慢
ak=0 容易丢数据, ack=1又太慢 这怎么办