转载

Flume-ng 高级功能配置

看看flume的高级功能:

1    flume channel selectors

如果没有特殊说明,则默认是replicating模式。 还有Multiplexing、Custom模式可以选择。

1    Replicating Channel Selector

需要设置以下的属性:

selector.type    默认值是replicating,用来设置该组件的名称

selector.optional    设置selector的channel

a1.sources = r1 a1.channels = c1 c2 c3 a1.source.r1.selector.type = replicating a1.source.r1.channels = c1 c2 c3 a1.source.r1.selector.optional = c3

在上面的配置中,c3是optional的channel,如果向c3写入失败,则会被忽略,c1和c2没有被标记为optional,如果向c2和c1写入失败则会导致事件失败。

2    Multiplexing Channel Selector

需要设置以下属性:

selector.type    默认值是replicating,可以设置为multiplexing

selector.header    默认值是flume.selector.header

selector.default

selector.mapping.*

a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4

2    flume sink processors

sink groups允许给一个实体设置多个sinks,sink processors可以使在sink group中所有sink具有负载均衡的能力,或者在一个sink失效后切换到另一个sink的fail over模式。

需要设置以下属性:

sinks    空格分隔的一系列sinks列表

processor.type    该组件的名称,可以使default、failover、load_balance

a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance

默认的sink processor只接受一个sink,用户不一定非得创建sink group。

1    failover sink processor

failover机制维护一个sink优先级列表,保证有效事件可以被处理掉。

需要设置以下属性:

sinks    在同一组sinks中的以空格分隔的sink列表

processor.type    默认值是default,在此可以设置为failover

processor.priority.<sinkName>    sink在该sink group中的优先级

processor.maxpenalty    默认值3000毫秒,故障转移的默认时间

a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000

2    load balancing sink processor

load balancing sink processor给多个sinks之间提供负载均衡,它维护一个可用sink索引,它支持通过round_robin和random两种方法进行负载分配,默认的选择方式是round_type类型的,也可以通过配置文件进行更改。当被选择器被调用的时候,它不会屏蔽故障的sink,继续尝试访问每一个可用的sink,如果所有的sink都故障了,选择器则无法给sink传播数据。如果backoff被开启,则sink processor会屏蔽故障的sink,选择器会在一个给定的超时时间内移除它们,当超时时间完毕后,sink还是无法访问,则超时时间以指数方式增长。

需要设置以下属性:

processor.sinks    在同一组sinks中的以空格分隔的sink列表

processor.type    默认是default,在此可以设置成load_balance

processor.backoff    默认是false

processor.selector    默认是round_robin,还可以选择random

processor.selector.maxTimeOut    默认是30000毫秒,屏蔽故障sink的时间

a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random

3    Event Serializers

file_roll sink和hdfs sink都支持EventSerializer接口,    

4    Body Text Serializer

别名是text,该拦截器会把事件的body输出,不做任何改变,而事件的头部会被忽略。

可以配置以下参数:

appendNewline    默认值是true,在每个事件的后面追加一个新行。

a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume a1.sinks.k1.sink.serializer = text a1.sinks.k1.sink.serializer.appendNewline = false

5    Avro Event Serializer

别名是avro_event,该拦截器会把flume的事件插入一个avro容器文件中,

可设置的参数如下:

syncIntervalBytes    默认值是2048000,avro的同步间隔,单位是bytes

compressionCodec    默认值是null,avro的压缩codec。

a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.serializer = avro_event a1.sinks.k1.serializer.compressionCodec = snappy

6    monitor flume

1    ganglia report

flume可以报告他的metrics到ganglia3,只要你在启动flume agent的时候设置一些参数即可,也可以把这些参数设置在flume-env.sh配置文件中。需要设置的参数如下,这些参数的前缀如下flume.monitoring.:

type    该组件的名称,这里设置为ganglia

hosts    逗号分隔的ganglia服务器列表,hostname:port

pollFrequency    默认值是60秒,flume向ganglia报告metrics的时间间隔

isGanglia3    默认是false,ganglia server的版本在3以上,flume 发送的是ganglia3.1的数据格式

启动flume agent

flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455

2    JSON report

flume也可以报告JSON格式的report,为了开启JSON report,在flume机器上启动了一个web server。需要在客户端启动时设置以下参数:

type    该组件的名称,这里设置为http

port    该服务监听的端口,默认是41414

启动flume agent

flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545

然后通过http://<hostname>:<port>/metrics来查看值

7    Flume Interceptors

flume拦截器可以修改或者删除事件,flume还支持连接器链,事件可以经过一系列拦截器。多个拦截器在配置文件中以空格分隔,拦截器的顺序就是事件处理的顺序,只有一个拦截器通过之后才会进行到下一个拦截器。

1    Timestamp Interceptor

该拦截器会插入到事件头中,会在事件头中插入一个key是timestamp的KV对,value的值是相关的timestamp。该拦截器可以保护相关的已经存在的timestamp。可以设置以下参数:

type    该组件的名称,此处是timestamp

preserveExisting    默认值是false,如果timestamp已经存在,应该被保护

a1.sources = r1 a1.channels = c1 a1.sources.r1.channels =  c1 a1.sources.r1.type = seq a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp

2    Host Interceptor

该拦截器会插入当前agent运行机器的hostname或者ip,插入KV对,key名称是host,可以设置以下值:

type    该组件的名称,此处是host

preserveExisting    默认值是false,如果在头部已经存在host,则应该被保护

useIP    默认值是true,使用ip,而非hostname

hostHeader        默认值是host,头部的key名称

a1.sources = r1 a1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = host a1.sources.r1.interceptors.i1.hostHeader = hostname

3    Static Interceptor

该拦截器允许用户追加静态头部在所有事件中,可以设置以下参数:

type    该组件的名称,此处是static

preserveExisting    默认值是true,如果头部已经存在,则应该被保护

key    默认值是key,头部创建的key名称

value    默认值是value,静态value值

a1.sources = r1 a1.channels = c1 a1.sources.r1.channels =  c1 a1.sources.r1.type = seq a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = datacenter a1.sources.r1.interceptors.i1.value = NEW_YORK
正文到此结束
Loading...