Flume作为一个日志收集工具,非常轻量级,基于一个个Flume Agent,能够构建一个很复杂很强大的日志收集系统,它的灵活性和优势,主要体现在如下几点:
有关Flume的相关内容,可以参考官网文档,或者通过阅读我之前写的文章《Flume(NG)架构设计要点及配置实践》来快速了解。
基于Flume设计实现分层日志收集系统,到底有什么好处呢?我们可以先看一下,如果不分层,会带来哪些问题:
通过下图我们可以看出,这种单层日志收集系统设计,存在太多的问题,而且系统或服务越多导致整个日志收集系统越难以控制:
上图中,无论是外部还是内部,只要部署了Flume Agent的节点,都直接同内部的Kafka集群和Hadoop集群相连,所以在数据平台内部只能尽量保持Kafka和Hadoop集群正常稳定运行,也要为外部日志收集Flume Agent的数据流量的陡增和异常变化做好防控准备。再者,如需停机维护或者升级某一个集群,可能都需要通知外部所有Flume Agent所在节点的业务方,做好应对(停机)准备。
接着看,如果我们基于Flume使用分层的方式来设计日志收集系统,又有哪些优势,如下图所示:
上图中,Flume日志收集系统采用两层架构设计:第一层(L1)是日志收集层,第二层(L2)是数据平台缓冲层(汇聚层)。通过这种方式,使得日志收集系统有如下特点:
通过上面分析可见,分层无非是为了使的日志数据源节点的Flume Agent服务与数据平台的存储系统(Kafka/HDFS)进行解耦,同时能够更好地对同类型业务多节点的日志流进行一个聚合操作,并分离开独立管理。另外,可以根据实际业务需要,适当增加Flume系统分层,满足日志流数据的汇聚需要。
我们看一下,Flume日志收集系统,在我们这个示例应用中处于一个什么位置,我简单画了一下图,加了一些有关数据处理和分析的节点/组件,如下图所示:
这里,简单了解一下上图即可,由于日志收集在整个应用系统中是很重要的一个环节,所以必须保证日志收集系统设计的可靠、可用、灵活、稳定,通过上面在日志收集系统收集日志之后,数据平台所做的大量分析处理,来凸显日志收集系统的重要性,这里其他内容不做过多说明。
这里,我们主要以实时收集日志为例,说明如何构建一个相对复杂的Flume分层日志收集系统。首先,简要说明一下日志收集需求:
详细分层设计如下图所示:
上图是从实际的整个数据平台中拿出来一部分,简单便于解释说明。有关上图中所涉及到的Flume Agent的配置详情,下面会根据Flume分层的结构(L1层、L2层)来详细配置说明。由于L1层的10.10.1.101和10.10.1.102节点上部署的Flume Agent是对称的,所以下面只拿出其中一个来说明配置,不同的是,这两个节点上Flume Agent的Sink使用Failover功能,分别交叉指向L2层Flume Agent,也能够起到一定的负载均衡的作用。
下面,分别针对10.10.1.101节点上的3个Flume Agent的配置内容,分别进行说明如下:
Flume Agent名称为a1,使用Exec Source、Memory Channel、Avro Sink,这里我们的Nginx日志文件始终指向/data/nginx/logs/app_user_events.log,即使日切或小时切文件,使用tail -F就能保证日志内容都被收集。具体配置内容如下所示:
a1.sources = s1 a1.channels = mc1 a1.sinks = k1 k2 # Configure source a1.sources.s1.channels = mc1 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /data/nginx/logs/app_user_events.log # Configure channel a1.channels.mc1.type = memory a1.channels.mc1.transactionCapacity = 50000 a1.channels.mc1.capacity = 100000 # Configure sinks a1.sinks.k1.channel = mc1 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 10.10.1.122 a1.sinks.k1.port = 44446 a1.sinks.k2.channel = mc1 a1.sinks.k2.type = avro a1.sinks.k2.hostname = 10.10.1.121 a1.sinks.k2.port = 44446 # Configure failover a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 9 a1.sinkgroups.g1.processor.priority.k2 = 7 a1.sinkgroups.g1.processor.maxpenalty = 10000
a2.sources = s2 a2.channels = mc2 a2.sinks = k3 k4 # Configure source a2.sources.s2.channels = mc2 a2.sources.s2.type = exec a2.sources.s2.command = tail -F /data/nginx/logs/push_click_events.log # Configure channel a2.channels.mc2.type = memory a2.channels.mc2.capacity = 50000 a2.channels.mc2.transactionCapacity = 100000 # Configure sinks a2.sinks.k3.channel = mc2 a2.sinks.k3.type = avro a2.sinks.k3.hostname = 10.10.1.121 a2.sinks.k3.port = 44447 a2.sinks.k4.channel = mc2 a2.sinks.k4.type = avro a2.sinks.k4.hostname = 10.10.1.122 a2.sinks.k4.port = 44447 # Configure failover a2.sinkgroups = g2 a2.sinkgroups.g2.sinks = k3 k4 a2.sinkgroups.g2.processor.type = failover a2.sinkgroups.g2.processor.priority.k3 = 9 a2.sinkgroups.g2.processor.priority.k4 = 7 a2.sinkgroups.g2.processor.maxpenalty = 10000
第三方点击事件通过统一的接口上传数据,那么配置起来也比较容易,如下所示:
a3.sources = s3 a3.channels = mc3 a3.sinks = k5 k6 # Configure source a3.sources.s3.channels = mc3 a3.sources.s3.type = exec a3.sources.s3.command = tail -F /data/nginx/logs/thirdparty_click_events.log # Configure channel a3.channels.mc3.type = memory a3.channels.mc3.transactionCapacity = 50000 a3.channels.mc3.capacity = 100000 # Configure sinks a3.sinks.k5.channel = mc3 a3.sinks.k5.type = avro a3.sinks.k5.hostname = 10.10.1.121 a3.sinks.k5.port = 44446 a3.sinks.k6.channel = mc3 a3.sinks.k6.type = avro a3.sinks.k6.hostname = 10.10.1.122 a3.sinks.k6.port = 44446 # Configure failover a3.sinkgroups = g3 a3.sinkgroups.g3.sinks = k5 k6 a3.sinkgroups.g3.processor.type = failover a3.sinkgroups.g3.processor.priority.k5 = 9 a3.sinkgroups.g3.processor.priority.k6 = 7 a3.sinkgroups.g3.processor.maxpenalty = 10000
广告点击事件日志收集配置,如下所示:
a4.sources = s4 a4.channels = mc4 a4.sinks = k7 k8 # Configure source a4.sources.s4.channels = mc4 a4.sources.s4.type = exec a4.sources.s4.command = tail -F /data/nginx/logs/ad.log # Configure channel a4.channels.mc4.type = memory a4.channels.mc4.transactionCapacity = 50000 a4.channels.mc4.capacity = 100000 # Configure sinks a4.sinks.k7.channel = mc4 a4.sinks.k7.type = avro a4.sinks.k7.hostname = 10.10.1.121 a4.sinks.k7.port = 44448 a4.sinks.k8.channel = mc4 a4.sinks.k8.type = avro a4.sinks.k8.hostname = 10.10.1.122 a4.sinks.k8.port = 44448 # Configure failover a4.sinkgroups = g4 a4.sinkgroups.g4.sinks = k7 k8 a4.sinkgroups.g4.processor.type = failover a4.sinkgroups.g4.processor.priority.k7 = 10 a4.sinkgroups.g4.processor.priority.k8 = 8 a4.sinkgroups.g4.processor.maxpenalty = 10000
这种业务需求是:把App用户事件和推送点击事件合并写入文件,最后都会写入HDFS,从而进一步在Hive中进行离线分析;同时又要使这两种事件分别独立地走实时计算的流程,App用户事件实时计算流程需要实时统计用户使用App过程中行为特征,而推送点击事件实时计算需要针对某一次活动来实时分析和展示用户的参与情况。具体配置内容,如下所示:
a1.sources = s1 s2 a1.channels = fc1 fc2 fc3 a1.sinks = kk1 fk2 kk3 # Configure source: # Configure app user event source: s1 -> fc1+fc2 a1.sources.s1.channels = fc1 fc2 a1.sources.s1.type = avro a1.sources.s1.bind = 10.10.1.121 a1.sources.s1.port = 44446 a1.sources.s1.threads = 8 # Configure source # Configure push click event source: s2 -> fc2+fc3 a1.sources.s2.channels = fc2 fc3 a1.sources.s2.type = avro a1.sources.s2.bind = 10.10.1.122 a1.sources.s2.port = 44447 a1.sources.s2.threads = 4 # Configure file channel(/data1) # Configure app user event channel: fc1 ->kk1 a1.channels.fc1.type = file a1.channels.fc1.checkpointDir = /data1/flume/channels/app_user_event/checkpoint a1.channels.fc1.useDualCheckpoints = true a1.channels.fc1.backupCheckpointDir = /data1/flume/channels/app_user_event/backup a1.channels.fc1.dataDirs = /data1/flume/channels/app_user_event/data a1.channels.fc1.transactionCapacity = 100000 a1.channels.fc1.capacity = 500000 a1.channels.fc1.checkpointInterval = 60000 a1.channels.fc1.keep-alive = 5 a1.channels.fc1.maxFileSize = 5368709120 # Configure file channel(/data2) # Configure app user event + push click event: fc2 - > fk2 a1.channels.fc2.type = file a1.channels.fc2.checkpointDir = /data2/flume/channels/offline_file_event/checkpoint a1.channels.fc2.useDualCheckpoints = true a1.channels.fc2.backupCheckpointDir = /data2/flume/channels/offline_file_event/backup a1.channels.fc2.dataDirs = /data2/flume/channels/offline_file_event/data a1.channels.fc2.transactionCapacity = 100000 a1.channels.fc2.capacity = 500000 a1.channels.fc2.checkpointInterval = 60000 a1.channels.fc2.keep-alive = 5 a1.channels.fc2.maxFileSize = 5368709120 # Configure file channel(/data3) # Configure push click channel: fc3 ->kk3 a1.channels.fc3.type = file a1.channels.fc3.checkpointDir = /data3/flume/channels/push_click_event/checkpoint a1.channels.fc3.useDualCheckpoints = true a1.channels.fc3.backupCheckpointDir = /data3/flume/channels/push_click_event/backup a1.channels.fc3.dataDirs = /data3/flume/channels/push_click_event/data a1.channels.fc3.transactionCapacity = 100000 a1.channels.fc3.capacity = 500000 a1.channels.fc3.checkpointInterval = 60000 a1.channels.fc3.keep-alive = 5 a1.channels.fc3.maxFileSize = 5368709120 # Configure sink: RealtimeMessageSink(app user event) a1.sinks.kk1.type = org.shirdrn.flume.sink.RealtimeMessageSink a1.sinks.kk1.channel = fc1 a1.sinks.kk1.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092 a1.sinks.kk1.topic = json_user_event a1.sinks.kk1.serializer.class = kafka.serializer.StringEncoder a1.sinks.kk1.producer.type = async a1.sinks.kk1.message.send.max.retries = 3 a1.sinks.kk1.client.id = flume_app_user_event_2_1 a1.sinks.kk1.event.decoder.count = 8 a1.sinks.kk1.output.stat.event.batch.size = 2000 a1.sinks.kk1.event.decoder.queue.size = 1000 # Configure sink: RichRollingFileSink a1.sinks.fk2.type = org.shirdrn.flume.sink.RichRollingFileSink a1.sinks.fk2.channel = fc2 a1.sinks.fk2.batchSize = 100 a1.sinks.fk2.serializer = TEXT a1.sinks.fk2.sink.rollInterval = 60 a1.sinks.fk2.sink.directory = /data/flume/rolling_files a1.sinks.fk2.sink.file.prefix = event a1.sinks.fk2.sink.file.suffix = .log a1.sinks.fk2.sink.file.pattern = yyyyMMddHHmmss # Configure sink: RealtimeMessageSink(push click) a1.sinks.kk3.type = org.shirdrn.flume.sink.RealtimeMessageSink a1.sinks.kk3.channel = fc3 a1.sinks.kk3.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092 a1.sinks.kk3.topic = json_push_click_event a1.sinks.kk3.serializer.class = kafka.serializer.StringEncoder a1.sinks.kk3.producer.type = async a1.sinks.kk3.message.send.max.retries = 3 a1.sinks.kk3.client.id = flume_push_click_2_1 a1.sinks.kk3.event.decoder.count = 4 a1.sinks.kk3.output.stat.event.batch.size = 2000 a1.sinks.kk3.event.decoder.queue.size = 1000
上面,可以看到我们自己实现的org.shirdrn.flume.sink.RealtimeMessageSink,该Sink主要是使Flume收集的日志写入Kafka中,在Flume 1.5.0版本中还没有内置实现,所以我们自己实现了,并在其中加入了适合我们业务的处理逻辑,比如,将Nginx日志记录行解析,然后根据实时计算需要,过滤掉不需要进入Kafka(最终在Storm集群中处理)事件数据,最后转成JSON字符串的格式,写入到Kafka中的Topic里。通过上面的配置也可以看出,可以配置很多参数,例如解析线程数、队列大小等。由于我们需要将写入本地文件系统的文件按照我们自己的方式来定义,所以基于Flume内置的file_roll实现进行修改,实现了自己的org.shirdrn.flume.sink.RichRollingFileSink,该Sink主要是对文件名字符串进行格式化,能够通过文件名来获取到文件生成的时间(人类可读格式)。
上面的图中,L1层可以根据需要扩展到更多的服务器节点,在L2层根据需要进行汇聚/缓冲,具体配置内容如下所示:
a2.sources = s3 a2.channels = fc4 a2.sinks = kk4 # Configure source: s3 -> fc4 a2.sources.s3.channels = fc4 a2.sources.s3.type = avro a2.sources.s3.bind = 10.10.1.121 a2.sources.s3.port = 44448 a2.sources.s3.threads = 2 # Configure channel(/data4) # Configure Ad channel: fc4 ->kk4 a2.channels.fc4.type = file a2.channels.fc4.checkpointDir = /data4/flume/channels/ad/checkpoint a2.channels.fc4.useDualCheckpoints = true a2.channels.fc4.backupCheckpointDir = /data4/flume/channels/ad/backup a2.channels.fc4.dataDirs = /data4/flume/channels/ad/data a2.channels.fc4.transactionCapacity = 100000 a2.channels.fc4.capacity = 500000 a2.channels.fc4.checkpointInterval = 60000 a2.channels.fc4.keep-alive = 5 a2.channels.fc1.maxFileSize = 5368709120 # Configure sinks: RealtimeAdKafkaSink a2.sinks.kk4.type = org.shirdrn.flume.sink.RealtimeAdKafkaSink a2.sinks.kk4.channel = fc4 a2.sinks.kk4.metadata.broker.list = kafka01:9092,kafka02:9092,kafka03:9092 a2.sinks.kk4.topic = json_ad_event a2.sinks.kk4.serializer.class = kafka.serializer.StringEncoder a2.sinks.kk4.producer.type = async a2.sinks.kk4.message.send.max.retries = 3 a2.sinks.kk4.client.id = flume_ad_2_1 a2.sinks.kk4.event.decoder.count = 4 a2.sinks.kk4.output.stat.event.batch.size = 2500 a2.sinks.kk4.event.decoder.queue.size = 5000
这里我们简单总结一些内容,如下所示:
简单一点的监控,直接在启动的时候,开启一个Web端口,通过端口来获取Flume Agent服务的一些相关数据,命令类似:
bin/flume-ng agent -n a1 -c conf -f conf/config.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
这样便可以在Flume Agent服务节点上,浏览Web端口34545来查看,数据以JSON格式表示,比较重要的一些元数据,如channel容量、当前使用量等等,通过这些数据可以了解当前Flume的工作状态,是否需要升级扩容等等。另外,也可以通过Ganglia来收集并分析Flume Agent服务运行状态,能够更加详细地展示Flume Agent服务的状态,因为Ganglia配置相对复杂,这里就不做过多解释,感兴趣可以尝试一下。
因为Flume使用Java实现的,所以就会遇到有关JVM调优的问题,这个也比较容易。默认情况下,Flume Agent进程的堆内存设置比较小,在日志数据量比较大的情况下就需要修改并调试这些参数,以满足业务需要。设置JVM相关参数,可以修改conf/flume-env.sh文件(也可以直接在启动Flume Agent服务时指定JVM选项参数),例如修改JAVA_OPTS变量,示例如下所示:
JAVA_OPTS="-server -Xms1024m -Xmx4096m -Dcom.sun.management.jmxremote -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:ParallelGCThreads=4 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/data/flume/logs/gc-ad.log"
这样,可以方便地修改GC策略,一般由于Flume实时收集日志比较注重实时性,希望能够快速地响应,尽量减少GC导致暂停业务线程被挂起的时间,所以可以将GC设置为ParNew+CMS策略。将GC日志输出,在一定程度上能够更加方便地观察Flume Agent服务运行过程中JVM GC的详细情况,通过诊断来优化服务运行。
通常,在开始部署Flume日志收集系统时,上游L1层服务节点比较少,在L2层汇聚时使用默认的配置可能效果也会不错,但是如果L1层Flume Agent越来越多,就能看到L2层处理速度慢下来。L2层的Flume Agent服务一般会远远小于L1层Flume Agent服务数,这种情况下,如果L2层Flume Agent服务使用Avro Source,可以调大Avro接收线程数,示例如下:
a1.sources.s1.type = avro a1.sources.s1.bind = 10.10.1.121 a1.sources.s1.port = 44446 a1.sources.s1.threads = 8
上面默认情况下threads参数的值1,可以将该值调大,否则的话,L1层就会堆积日志记录,严重可能导致数据丢失。
Flume的易扩展性使得我们可以根据自己的业务特点来实现一些组件,那么我们在将实际业务逻辑掺杂进Flume中时,需要考虑是否非得必须这么做?如果这么做是否会影响Flume实时传输日志的速度和效率?
Flume作为一个轻量级的日志收集工具,个人认为最好将相对复杂的业务逻辑(尤其是需要与一些存储系统,如MySQL、Redis交互时)后移,放在Storm集群中去处理,或者自己实现的业务处理集群中,而Flume就让它去做其擅长的事情——路由消息。
当然,有些业务场景可能必须在Flume日志收集层去做,如根据原始非结构化的消息,无法控制不同类型的消息路由到不同的目的地,那么可能需要在收集层做一个简单的解析或格式化,实际上这是在Flume层做了一个简单的日志分发。无论如何,如果想在Flume层插入业务逻辑处理,尽量避免过于复杂的处理而影响整个日志传输速度,如果后端有实时推荐需求,日志中事件的实时性大大延迟,就会影响实施个性化推荐。
本文基于 署名-非商业性使用-相同方式共享 4.0 许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。