【翻译】Flume 1.8.0 User Guide(用户指南) Sink

翻译自官网flume1.8用户指南,原文地址:Flume 1.8.0 User Guide

篇幅限制,分为以下5篇:

【翻译】Flume 1.8.0 User Guide(用户指南)

【翻译】Flume 1.8.0 User Guide(用户指南) source

【翻译】Flume 1.8.0 User Guide(用户指南) Sink

【翻译】Flume 1.8.0 User Guide(用户指南) Channel

【翻译】Flume 1.8.0 User Guide(用户指南) Processors

Flume Sinks

1、HDFS Sink 

这个sink 将事件写入Hadoop分布式文件系统(HDFS)。它目前支持创建文本和序列文件。它支持两种文件类型的压缩。可以根据运行时间、数据大小或事件数量定期滚动文件(关闭当前文件并创建一个新文件)。它还通过属性(如时间戳或事件起源的机器)存储/分区数据。HDFS目录路径可能包含格式化转义序列,该序列将被HDFS sink替换,以生成用于存储事件的目录/文件名。使用这个sink需要安装hadoop,这样Flume就可以使用hadoop jar与HDFS集群通信。请注意,需要一个支持sync()调用的Hadoop版本。

以下是支持的转义序列:

AliasDescription
%{host}Substitute value of event header named “host”. Arbitrary header names are supported.
%tUnix time in milliseconds
%alocale’s short weekday name (Mon, Tue, ...)
%Alocale’s full weekday name (Monday, Tuesday, ...)
%blocale’s short month name (Jan, Feb, ...)
%Blocale’s long month name (January, February, ...)
%clocale’s date and time (Thu Mar 3 23:05:25 2005)
%dday of month (01)
%eday of month without padding (1)
%Ddate; same as %m/%d/%y
%Hhour (00..23)
%Ihour (01..12)
%jday of year (001..366)
%khour ( 0..23)
%mmonth (01..12)
%nmonth without padding (1..12)
%Mminute (00..59)
%plocale’s equivalent of am or pm
%sseconds since 1970-01-01 00:00:00 UTC
%Ssecond (00..59)
%ylast two digits of year (00..99)
%Yyear (2010)
%z+hhmm numeric timezone (for example, -0400)
%[localhost]Substitute the hostname of the host where the agent is running
%[IP]Substitute the IP address of the host where the agent is running
%[FQDN]Substitute the canonical hostname of the host where the agent is running

注意:转义字符串%[localhost]、%[IP]和%[FQDN]都依赖于Java获取主机名的能力,这在某些网络环境中可能会失败。 

正在使用的文件的名称将被打乱最后是”.tmp“。一旦文件被关闭,这个扩展名将被删除。这允许在目录中排除部分完成的文件。必须属性以粗体显示。

注意,对于所有与时间相关的转义序列,带有键“timestamp”的消息头必须存在于事件的消息头中(除非是hdfs)。useLocalTimeStamp设置为true)。自动添加的一种方法是使用TimestampInterceptor。

 

NameDefaultDescription
channel 
typeThe component type name, needs to be hdfs
hdfs.pathHDFS directory path (eg hdfs://namenode/flume/webdata/)
hdfs.filePrefixFlumeDataName prefixed to files created by Flume in hdfs directory
hdfs.fileSuffixSuffix to append to file (eg .avro - NOTE: period is not automatically added)
hdfs.inUsePrefixPrefix that is used for temporal files that flume actively writes into
hdfs.inUseSuffix.tmpSuffix that is used for temporal files that flume actively writes into
hdfs.rollInterval30Number of seconds to wait before rolling current file (0 = never roll based on time interval)
hdfs.rollSize1024File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount10Number of events written to file before it rolled (0 = never roll based on number of events)
hdfs.idleTimeout0Timeout after which inactive files get closed (0 = disable automatic closing of idle files)
hdfs.batchSize100number of events written to file before it is flushed to HDFS
hdfs.codeCCompression codec. one of following : gzip, bzip2, lzo, lzop, snappy
hdfs.fileTypeSequenceFile

File format: currently SequenceFileDataStream or CompressedStream (1)DataStream

will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC

hdfs.maxOpenFiles5000Allow only this number of open files. If this number is exceeded, the oldest file is closed.
hdfs.minBlockReplicas

Specify minimum number of replicas per HDFS block. If not specified, it comes from the

default Hadoop config in the classpath.

hdfs.writeFormatWritable

Format for sequence file records. One of Text or Writable. Set to Text before creating data files with Flume,

otherwise those files cannot be read by either Apache Impala (incubating) or Apache Hive.

hdfs.callTimeout10000

Number of milliseconds allowed for HDFS operations, such as open, write, flush, close.

This number should be increased if many HDFS timeout operations are occurring.

hdfs.threadsPoolSize10Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
hdfs.rollTimerPoolSize1Number of threads per HDFS sink for scheduling timed file rolling
hdfs.kerberosPrincipalKerberos user principal for accessing secure HDFS
hdfs.kerberosKeytabKerberos keytab for accessing secure HDFS
hdfs.proxyUser  
hdfs.roundfalseShould the timestamp be rounded down (if true, affects all time based escape sequences except %t)
hdfs.roundValue1Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
hdfs.roundUnitsecondThe unit of the round down value - secondminute or hour.
hdfs.timeZoneLocal TimeName of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
hdfs.useLocalTimeStampfalseUse the local time (instead of the timestamp from the event header) while replacing the escape sequences.
hdfs.closeTries0

Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1,

this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure),

and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until

the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open

if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart.

hdfs.retryInterval180

Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode,

so setting this too low can cause a lot of load on the name node. If set to 0 or less,

the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension.

serializerTEXT

Other possible options include avro_event or the fully-qualified class name of an implementation of

theEventSerializer.Builder interface.

serializer.*  

agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

上面的配置将把时间戳四舍五入到最后10分钟。例如,时间戳为2012年6月12日上午11:54:34的事件将导致hdfs路径变为/flume/events/2012-06-12/1150/00。

 2. hive sink

此接收器将包含分隔文本或JSON数据的事件直接汇入Hive表或分区。事件是使用Hive事务编写的。一旦一组事件提交给Hive,它们就会立即对Hive查询可见。可以预先创建flume要写到的分区,也可以选择,分区不存在的时候,由flume创建分区。来自传入事件数据的字段映射到Hive表中的相应列。

NameDefaultDescription
channel 
typeThe component type name, needs to be hive
hive.metastoreHive metastore URI (eg thrift://a.b.com:9083 )
hive.databaseHive database name
hive.tableHive table name
hive.partition

Comma separate list of partition values identifying the partition to write to. May contain escape sequences.

E.g: If the table is partitioned by (continent: string, country :string, time : string) then ‘Asia,India,2014-02-26-01-21’

will indicate continent=Asia,country=India,time=2014-02-26-01-21

hive.txnsPerBatchAsk100

Hive grants a batch of transactions instead of single transactions to streaming clients like Flume.

This setting configures the number of desired transactions per Transaction Batch.

Data from all transactions in a single batch end up in a single file. Flume will write a maximum of

batchSize events in each transaction in the batch.

This setting in conjunction with batchSize provides control over the size of each file.

Note that eventually Hive will transparently compact these files into larger files.

heartBeatInterval240

(In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring.

Set this value to 0 to disable heartbeats.

autoCreatePartitionstrueFlume will automatically create the necessary Hive partitions to stream to
batchSize15000Max number of events written to Hive in a single Hive transaction
maxOpenConnections500Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed.
callTimeout10000(In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort.
serializer 

Serializer is responsible for parsing out field from the event and mapping them to columns in the hive table.

Choice of serializer depends upon the format of the data in the event. Supported serializers: DELIMITED and JSON

roundUnitminuteThe unit of the round down value - secondminute or hour.
roundValue1Rounded down to the highest multiple of this (in the unit configured using hive.roundUnit), less than current time
timeZoneLocal TimeName of the timezone that should be used for resolving the escape sequences in partition, e.g. America/Los_Angeles.
useLocalTimeStampfalseUse the local time (instead of the timestamp from the event header) while replacing the escape sequences.

Hive sink提供以下序列化器:

JSON:处理UTF8编码的JSON(严格语法)事件,不需要配置。JSON中的对象名称直接映射到Hive表中具有相同名称的列。在内部使用org.apache.hive.hcatalog.data.JsonSerDe ,但是它独立于Hive表。这个序列化器需要安装HCatalog。
DELIMITED:处理简单的分隔文本事件。内部使用LazySimpleSerde,但独立于Hive表的Serde。

NameDefaultDescription
serializer.delimiter,(Type: string) The field delimiter in the incoming data. To use special characters, surround them with double quotes like “\t”
serializer.fieldnames

The mapping from input fields to columns in hive table. Specified as a comma separated list (no spaces) of

hive table columns names, identifying the input fields in order of their occurrence. To skip fields leave the

column name unspecified. Eg. ‘time,,ip,message’ indicates the 1st, 3rd and 4th fields in input map to time,

ip and message columns in the hive table.

serializer.serdeSeparatorCtrl-A

(Type: character) Customizes the separator used by underlying serde. There can be a gain in efficiency if the

fields in serializer.fieldnames are in same order as table columns, the serializer.delimiter is same as the

serializer.serdeSeparator and number of fields in serializer.fieldnames is less than or equal to number of

table columns, as the fields in incoming event body do not need to be reordered to match order of table columns.

Use single quotes for special characters like ‘\t’. Ensure input fields do not contain this character.

NOTE: If serializer.delimiter is a single character, preferably set this to the same character

以下是支持的转义序列:

 

AliasDescription
%{host}Substitute value of event header named “host”. Arbitrary header names are supported.
%tUnix time in milliseconds
%alocale’s short weekday name (Mon, Tue, ...)
%Alocale’s full weekday name (Monday, Tuesday, ...)
%blocale’s short month name (Jan, Feb, ...)
%Blocale’s long month name (January, February, ...)
%clocale’s date and time (Thu Mar 3 23:05:25 2005)
%dday of month (01)
%Ddate; same as %m/%d/%y
%Hhour (00..23)
%Ihour (01..12)
%jday of year (001..366)
%khour ( 0..23)
%mmonth (01..12)
%Mminute (00..59)
%plocale’s equivalent of am or pm
%sseconds since 1970-01-01 00:00:00 UTC
%Ssecond (00..59)
%ylast two digits of year (00..99)
%Yyear (2010)
%z+hhmm numeric timezone (for example, -0400)

注意,对于所有与时间相关的转义序列,带有键“timestamp”的消息头必须存在于事件的消息头中(除非useLocalTimeStamp设置为true)。自动添加的一种方法是使用TimestampInterceptor。

 hive table 示例:

create table weblogs ( id int , msg string )
    partitioned by (continent string, country string, time string)
    clustered by (id) into 5 buckets
    stored as orc;

agent a1 示例:

a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg

上面的配置将把时间戳四舍五入到最后10分钟。例如,将时间戳标头设置为2012年6月12日上午11:54:34,将“国家”标头设置为“印度”的事件将计算分区(大陆=“亚洲”,国家=“印度”,时间=“2012-06-12-11-50”)。序列化器被配置为接受包含三个字段的制表符分隔的输入,并跳过第二个字段。

 3. Logger Sink

在INFO级别记录事件。通常用于测试/调试目的。必须属性以粗体显示。此sink是惟一不需要在日志原始数据部分中解释的额外配置的异常。

 

Property NameDefaultDescription
channel 
typeThe component type name, needs to be logger
maxBytesToLog16Maximum number of bytes of the Event body to log

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

4. Avro Sink

这个sink构成了Flume分层收集支持的一半。发送到此sink的Flume事件被转换为Avro事件并发送到配置的主机名/端口对。事件以配置的批大小的批次从配置的Channel中获取。必须属性以粗体显示。

Property NameDefaultDescription
channel 
typeThe component type name, needs to be avro.
hostnameThe hostname or IP address to bind to.
portThe port # to listen on.
batch-size100number of event to batch together for send.
connect-timeout20000Amount of time (ms) to allow for the first (handshake) request.
request-timeout20000Amount of time (ms) to allow for requests after the first.
reset-connection-intervalnone

Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop.

This will allow the sink to connect to hosts behind a hardware load-balancer when news

hosts are added without having to restart the agent.

compression-typenoneThis can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource
compression-level6

The level of compression to compress event. 0 = no compression and 1-9 is compression.

The higher the number the more compression

sslfalse

Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally

set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”.

trust-all-certsfalse

If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked.

This should NOT be used in production because it makes it easier for an attacker to execute a

man-in-the-middle attack and “listen in” on the encrypted connection.

truststore

The path to a custom Java truststore file. Flume uses the certificate authority information in this file

to determine whether the remote Avro Source’s SSL authentication credentials should be trusted.

If not specified, the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts”

in the Oracle JRE) will be used.

truststore-passwordThe password for the specified truststore.
truststore-typeJKSThe type of the Java truststore. This can be “JKS” or other supported Java truststore type.
exclude-protocolsSSLv3Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
maxIoWorkers2 * the number of available processors in the machineThe maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory.

agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

5. Thrift Sink

这个接收器构成了Flume分层收集支持的一半。发送到此接收器的Flume事件被转换为Thrift事件并发送到配置的主机名/端口对。事件以配置的批大小的批次从配置的Channel中获取。
通过启用kerberos身份验证,可以将Thrift sink配置为以安全模式启动。要在安全模式下与Thrift源通信,Thrift sink也应该在安全模式下运行。客户机-主体和客户机-keytab是节俭接收器用于对kerberos KDC进行身份验证的属性。服务器主体表示此sink配置为以安全模式连接的Thrift源的主体。必须属性以粗体显示。

Property NameDefaultDescription
channel 
typeThe component type name, needs to be thrift.
hostnameThe hostname or IP address to bind to.
portThe port # to listen on.
batch-size100number of event to batch together for send.
connect-timeout20000Amount of time (ms) to allow for the first (handshake) request.
request-timeout20000Amount of time (ms) to allow for requests after the first.
connection-reset-intervalnone

Amount of time (s) before the connection to the next hop is reset. This will force the Thrift Sink to reconnect to the next hop.

This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.

sslfalseSet to true to enable SSL for this ThriftSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password” and “truststore-type”
truststore

The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote

Thrift Source’s SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically

“jssecacerts” or “cacerts” in the Oracle JRE) will be used.

truststore-passwordThe password for the specified truststore.
truststore-typeJKSThe type of the Java truststore. This can be “JKS” or other supported Java truststore type.
exclude-protocolsSSLv3Space-separated list of SSL/TLS protocols to exclude
kerberosfalse

Set to true to enable kerberos authentication. In kerberos mode, client-principal, client-keytab and server-principal are required for

successful authentication and communication to a kerberos enabled Thrift Source.

client-principal—-The kerberos principal used by the Thrift Sink to authenticate to the kerberos KDC.
client-keytab—-The keytab location used by the Thrift Sink in combination with the client-principal to authenticate to the kerberos KDC.
server-principalThe kerberos principal of the Thrift Source to which the Thrift Sink is configured to connect to.

 agent a1示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

6. IRC Sink

IRC sink接收来自附加channel的消息,并将这些消息转发到配置的IRC目的地。必须属性以粗体显示。

Property NameDefaultDescription
channel 
typeThe component type name, needs to be irc
hostnameThe hostname or IP address to connect to
port6667The port number of remote host to connect
nickNick name
userUser name
passwordUser password
chanchannel
name  
splitlines(boolean)
splitcharsn

line separator (if you were to enter the default value into the config file,

then you would need to escape the backslash, like this: “\n”)

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = irc
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = irc.yourdomain.com
a1.sinks.k1.nick = flume
a1.sinks.k1.chan = #flume

7. File Roll Sink

在本地文件系统上存储事件。必须属性以粗体显示。

 

Property NameDefaultDescription
channel 
typeThe component type name, needs to be file_roll.
sink.directoryThe directory where files will be stored
sink.pathManagerDEFAULTThe PathManager implementation to use.
sink.pathManager.extensionThe file extension if the default PathManager is used.
sink.pathManager.prefixA character string to add to the beginning of the file name if the default PathManager is used
sink.rollInterval30Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file.
sink.serializerTEXTOther possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
batchSize100 

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

8. Null Sink

丢弃从channel接收的所有事件。必须属性以粗体显示。

Property NameDefaultDescription
channel 
typeThe component type name, needs to be null.
batchSize100 

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1

9. Hbase Sink

9.1 Hbase Sink

这个sink将数据写入HBase。Hbase配置是从第一个在类路径中的hbase-site.xml 获取的。由配置指定的实现HbaseEventSerializer的类用于将事件转换为HBase put 和/或 增量。然后将这些put和increments写入HBase。这个sink提供了与HBase相同的一致性保证,HBase目前是行原子性的。如果Hbase无法写入某些事件,sink将重播该事务中的所有事件。
HBaseSink支持编写数据来保护HBase。要写入安全模式的HBase,agent运行的用户必须具有对配置为写入的sink的表的写入权限。可以在配置中指定用于根据KDC进行身份验证的主体和keytab。Flume代理类路径中的hbase-site.xml 必须将身份验证设置为kerberos(有关如何实现这一点的详细信息,请参阅HBase文档)。
为了方便,两个序列化器配有Flume。SimpleHbaseEventSerializer (org.apache.flume.sink.hbase.SimpleHbaseEventSerializer))按原样将事件体写入HBase,并可选地增加HBase中的一列。这主要是一个示例实现。RegexHbaseEventSerializer (org.apache.flume.sink.hbase.RegexHbaseEventSerializer)基于给定的regex分解事件体,并将每个部分写入不同的列中。
类型是FQCN: org.apache.flume.sink.hbase.HBaseSink。
必须属性以粗体显示。

 

Property NameDefaultDescription
channel 
typeThe component type name, needs to be hbase
tableThe name of the table in Hbase to write to.
columnFamilyThe column family in Hbase to write to.
zookeeperQuorumThe quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent/hbaseThe base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
batchSize100Number of events to be written per txn.
coalesceIncrementsfalse

Should the sink coalesce multiple increments to a cell per batch.

This might give better performance if there are multiple increments to a limited number of cells.

serializerorg.apache.flume.sink.hbase.SimpleHbaseEventSerializerDefault increment column = “iCol”, payload column = “pCol”.
serializer.*Properties to be passed to the serializer.
kerberosPrincipalKerberos user principal for accessing secure HBase
kerberosKeytabKerberos keytab for accessing secure HBase

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1

9.2  AsyncHBaseSink

这个sink使用异步模型将数据写入HBase。配置中指定的实现AsyncHbaseEventSerializer的类用于将事件转换为HBase put和/或增量。然后将这些put和increments写入HBase。这个sink使用Asynchbase API写入HBase。这个接收器提供了与HBase相同的一致性保证,HBase目前是行原子性的。如果Hbase无法写入某些事件,sink将重播该事务中的所有事件。类型是FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink。必须属性以粗体显示。

 

Property NameDefaultDescription
channel 
typeThe component type name, needs to be asynchbase
tableThe name of the table in Hbase to write to.
zookeeperQuorumThe quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent/hbaseThe base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
columnFamilyThe column family in Hbase to write to.
batchSize100Number of events to be written per txn.
coalesceIncrementsfalse

Should the sink coalesce multiple increments to a cell per batch. This might give better performance

if there are multiple increments to a limited number of cells.

timeout60000The length of time (in milliseconds) the sink waits for acks from hbase for all events in a transaction.
serializer

org.apache.flume.sink.hbase.

SimpleAsyncHbaseEventSerializer

 
serializer.*Properties to be passed to the serializer.

 

请注意,此接收器接受配置中的Zookeeper Quorum和父znode信息。Zookeeper Quorum和父节点配置可以在flume配置文件中指定。或者,这些配置值取自类路径中的第一个hbase-site.xml文件。
如果配置中没有提供这些信息,则sink将从类路径中的第一个hbase-site.xml读取此信息.
agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.k1.channel = c1

10 MorphlineSolrSink

这个sink从Flume事件中提取数据,对其进行转换,并将其近乎实时地加载到Apache Solr服务器中,然后由Apache Solr服务器向最终用户或搜索应用程序提供查询。
这个sink非常适合将原始数据流到HDFS(通过HdfsSink)并同时提取、转换和加载相同数据到Solr(通过MorphlineSolrSink)的用例。特别是,这个sink可以处理来自不同数据源的任意异构原始数据,并将其转换为对搜索应用程序有用的数据模型。
ETL功能可以使用一个形态线配置文件进行定制,该文件定义了一系列转换命令,将事件记录从一个命令传输到另一个命令。
形态线可以看作是Unix管道的演化,其中数据模型被一般化以处理通用记录流,包括任意二进制有效负载。形态线命令有点像flume拦截器。形态线可以嵌入到Hadoop组件中,比如Flume。
提供了开箱即用的命令来解析和转换一组标准数据格式,如日志文件、Avro、CSV、文本、HTML、XML、PDF、Word、Excel等,还可以作为形态线插件添加其他数据格式的定制命令和解析器。任何类型的数据格式都可以建立索引,任何类型Solr模式的任何Solr文档都可以生成,任何定制的ETL逻辑都可以注册和执行。
形态线操作连续的记录流。数据模型可以这样描述:记录是一组命名字段,其中每个字段都有一个或多个值的有序列表/值可以是任何Java对象。也就是说,记录本质上是一个哈希表,其中每个哈希表条目都包含一个字符串键和一个作为值的Java对象列表。(实现使用了番石榴的ArrayListMultimap,这是一个ListMultimap)。注意,一个字段可以有多个值,任何两个记录都不需要使用公共字段名。
这个sink将Flume事件的主体填充到morphline记录的_attachment_body字段中,并将Flume事件的头部复制到同名的记录字段中。然后命令可以对这些数据进行操作。
支持路由到SolrCloud集群,以提高可伸缩性。索引负载可以分散在大量的morphlinesolrsink上,以提高可伸缩性。索引负载可以跨多个morphlinesolrsink复制以获得高可用性,例如使用Flume特性(如负载平衡接收器处理器)。MorphlineInterceptor还可以帮助实现到多个Solr集合的动态路由(例如,对于多租户)。
您的环境所需的形态线和solr jar必须放在Apache Flume安装的lib目录中。
类型是FQCN: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
必须属性以粗体显示。

Property NameDefaultDescription
channel 
typeThe component type name, needs to be org.apache.flume.sink.solr.morphline.MorphlineSolrSink
morphlineFile

The relative or absolute path on the local file system to the morphline configuration file.

Example: /etc/flume-ng/conf/morphline.conf

morphlineIdnullOptional name used to identify a morphline if there are multiple morphlines in a morphline config file
batchSize1000The maximum number of events to take per flume transaction.
batchDurationMillis1000

The maximum duration per flume transaction (ms). The transaction commits after this

duration or when batchSize is exceeded, whichever comes first.

handlerClass

org.apache.flume.sink.solr.

morphline.MorphlineHandlerImpl

The FQCN of a class implementing org.apache.flume.sink.solr.morphline.MorphlineHandler
isProductionModefalse

This flag should be enabled for mission critical, large-scale online production systems that

need to make progress without downtime when unrecoverable exceptions occur.

Corrupt or malformed parser input data, parser bugs, and errors related to unknown

Solr schema fields produce unrecoverable exceptions.

recoverableExceptionClassesorg.apache.solr.client.solrj.SolrServerException

Comma separated list of recoverable exceptions that tend to be transient,

in which case the corresponding task can be retried. Examples include network connection errors,

timeouts, etc. When the production mode flag is set to true,

the recoverable exceptions configured using this parameter will not be ignored and hence will lead to retries.

isIgnoringRecoverableExceptionsfalse

This flag should be enabled, if an unrecoverable exception is accidentally misclassified as recoverable.

This enables the sink to make progress and avoid retrying an event forever.

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.channel = c1
a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
# a1.sinks.k1.morphlineId = morphline1
# a1.sinks.k1.batchSize = 1000
# a1.sinks.k1.batchDurationMillis = 1000

11. ElasticSearchSink

这个sink将数据写入一个elasticsearch集群。默认情况下,事件将被写入,以便Kibana图形界面能够显示它们——就像logstash编写它们一样。
环境所需的elasticsearch和lucene-core jar必须放在Apache Flume安装的lib目录中。Elasticsearch要求客户机JAR的主版本与服务器的主版本匹配,并且两者运行相同的JVM小版本。如果不正确,将出现serializationexception。要选择所需的版本,首先要确定elasticsearch的版本和目标集群正在运行的JVM版本。然后选择一个与主版本匹配的elasticsearch客户端库 0.19.x客户端可以与一个 0.19.x的集群通信;0.20.x可以和0.20通信。0.90 x和可以和0.90.x对话。一旦确定了elasticsearch版本,然后读取pom。确定要使用的正确lucene-core JAR版本的xml文件。运行ElasticSearchSink的Flume代理还应该匹配目标集群运行到次要版本的JVM。
每天事件将被写入一个新的索引。名称将是-yyyy-MM-dd,其中是indexName参数。sink将在UTC午夜开始写入一个新索引。
默认情况下,ElasticSearchLogStashEventSerializer将事件序列化为elasticsearch。可以使用序列化器参数覆盖此行为。这个参数接受org.apache.flume.sink.elasticsearch的实现。ElasticSearchEventSerializer或org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory。不赞成实现ElasticSearchEventSerializer,支持更强大的ElasticSearchIndexRequestBuilderFactory。
类型是FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink
必须属性以粗体显示。

Property NameDefaultDescription
channel 
typeThe component type name, needs to be org.apache.flume.sink.elasticsearch.ElasticSearchSink
hostNamesComma separated list of hostname:port, if the port is not present the default port ‘9300’ will be used
indexNameflume

The name of the index which the date will be appended to. Example ‘flume’ -> ‘flume-yyyy-MM-dd’

Arbitrary header substitution is supported, eg. %{header} replaces with value of named event header

indexTypelogs

The type to index the document to, defaults to ‘log’ Arbitrary header substitution is supported,

eg. %{header} replaces with value of named event header

clusterNameelasticsearchName of the ElasticSearch cluster to connect to
batchSize100Number of events to be written per txn.
ttl

TTL in days, when set will cause the expired documents to be deleted automatically,

if not set documents will never be automatically deleted. TTL is accepted both in the earlier form

of integer only e.g. a1.sinks.k1.ttl = 5 and also with a qualifier ms (millisecond), s (second), m (minute),

h (hour), d (day) and w (week). Example a1.sinks.k1.ttl = 5d will set TTL to 5 days.

Followhttp://www.elasticsearch.org/guide/reference/mapping/ttl-field/ for more information.

serializer

org.apache.flume.sink.elasticsearch.

ElasticSearchLogStashEventSerializer

The ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use.

Implementations of either class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred.

serializer.*Properties to be passed to the serializer.

 

注意,使用事件头的值来动态决定存储事件时要使用的索引名和索引类型非常方便。使用此功能时要小心,因为事件提交器现在已经控制了indexName和indexType。此外,如果使用elasticsearch REST客户端,则事件提交器可以控制所使用的URL路径。

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

12  Kite Dataset Sink

将事件写入Kite数据集的实验性接收器。这个接收器将反序列化每个传入事件的主体,并将结果记录存储在Kite数据集中。它通过按URI加载数据集来确定目标数据集。
唯一受支持的序列化是avro,记录模式必须在事件头中传递,使用flume.avro.schema中的任何一个。使用JSON模式表示的文本或flume.avro.schema。一个可以找到模式的url (hdfs:/…支持uri)。这与Log4jAppender flume客户机和使用反序列化器的假脱机目录源的Avro反序列化器兼容。schemaType =LITERAL
注1:flume.avro.schema。不支持哈希头。注2:在某些情况下,文件滚动可能会在超过滚动间隔后轻微发生。但是,这个延迟不会超过5秒。在大多数情况下,延迟是无法辨认的。

Property NameDefaultDescription
channel 
typeMust be org.apache.flume.sink.kite.DatasetSink
kite.dataset.uriURI of the dataset to open
kite.repo.uriURI of the repository to open (deprecated; use kite.dataset.uri instead)
kite.dataset.namespaceNamespace of the Dataset where records will be written (deprecated; use kite.dataset.uri instead)
kite.dataset.nameName of the Dataset where records will be written (deprecated; use kite.dataset.uri instead)
kite.batchSize100Number of records to process in each batch
kite.rollInterval30Maximum wait time (seconds) before data files are released
kite.flushable.commitOnBatchtrue

If true, the Flume transaction will be commited and the writer will be

flushed on each batch of kite.batchSize records. This setting only

applies to flushable datasets. When true, it’s possible for temp files with

commited data to be left in the dataset directory. These files need to be

recovered by hand for the data to be visible to DatasetReaders.

kite.syncable.syncOnBatchtrue

Controls whether the sink will also sync data when committing the transaction.

This setting only applies to syncable datasets. Syncing gaurentees that data will

be written on stable storage on the remote system while flushing only gaurentees

that data has left Flume’s client buffers. When the kite.flushable.commitOnBatch 

property is set to false, this property must also be set to false.

kite.entityParseravro

Parser that turns Flume Events into Kite entities. Valid values are avro 

and the fully-qualified class name of an implementation of the EntityParser.Builder interface.

kite.failurePolicyretry

Policy that handles non-recoverable errors such as a missing Schema in the Event header.

The default value, retry, will fail the current batch and try again which matches the old behavior.

Other valid values are save, which will write the raw Event to the kite.error.dataset.uri dataset,

and the fully-qualified class name of an implementation of the FailurePolicy.Builder interface.

kite.error.dataset.uri

URI of the dataset where failed events are saved when kite.failurePolicy is set to save.

 Required when the kite.failurePolicy is set to save.

auth.kerberosPrincipalKerberos user principal for secure authentication to HDFS
auth.kerberosKeytabKerberos keytab location (local FS) for the principal
auth.proxyUserThe effective user for HDFS actions, if different from the kerberos principal

 13.  Kafka Sink

这是一个Flume Sink实现,可以将数据发布到Kafka主题。目标之一是将Flume与Kafka集成,这样基于pull的处理系统就可以处理来自各种Flume源的数据。目前支持Kafka 0.9.x系列发行版。
这个版本的Flume不再支持Kafka的旧版本(0.8.x)。
必需的属性用粗体标记。

Property NameDefaultDescription
typeMust be set to org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers

List of brokers Kafka-Sink will connect to, to get the list of topic partitions

This can be a partial list of brokers, but we recommend at least two for HA.

The format is comma separated list of hostname:port

kafka.topicdefault-flume-topic

The topic in Kafka to which the messages will be published.

If this parameter is configured, messages will be published to this topic.

If the event header contains a “topic” field, the event will be published to that

topic overriding the topic configured here. Arbitrary header substitution is supported,

eg. %{header} is replaced with value of event header named “header”. (If using the substitution,

it is recommended to set “auto.create.topics.enable” property of Kafka broker to true.)

flumeBatchSize100How many messages to process in one batch. Larger batches improve throughput while adding latency.
kafka.producer.acks1

How many replicas must acknowledge a message before its considered successfully written.

Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas)

Set this to -1 to avoid data loss in some cases of leader failure.

useFlumeEventFormatfalse

By default events are put as bytes onto the Kafka topic directly from the event body.

Set to true to store events as the Flume Avro binary format. Used in conjunction with

the same property on the KafkaSource or with the parseAsFlumeEvent property on the

Kafka Channel this will preserve any Flume headers for the producing side.

defaultPartitionId

Specifies a Kafka partition ID (integer) for all events in this channel to be sent to,

unless overriden by partitionIdHeader. By default, if this property is not set,

events will be distributed by the Kafka Producer’s partitioner - including by key 

if specified (or by a partitioner specified by kafka.partitioner.class).

partitionIdHeader

When set, the sink will take the value of the field named using the value of this property from

the event header and send the message to the specified partition of the topic.

If the value represents an invalid partition, an EventDeliveryException will be thrown.

If the header value is present then this setting overrides defaultPartitionId.

allowTopicOverridetrue

When set, the sink will allow a message to be produced into a topic specified by the 

topicHeaderproperty (if provided).

topicHeadertopic

When set in conjunction with allowTopicOverride will produce a message

into the value of the header named using the value of this property. Care should be taken

when using in conjunction with the Kafka Source topicHeader property to avoid creating a loopback.

kafka.producer.security.protocolPLAINTEXT

Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security.

See below for additional info on secure setup.

more producer security props 

If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties

that need to be set on producer.

Other Kafka Producer Properties

These properties are used to configure the Kafka Producer. Any producer property

supported by Kafka can be used. The only requirement is to prepend the property name with the

prefixkafka.producer. For example: kafka.producer.linger.ms

注意,Kafka Sink使用来自FlumeEvent头部的主题和键属性将事件发送到Kafka。如果标题中存在主题,则事件将被发送到该特定主题,覆盖为Sink配置的主题。如果键存在于标题中,Kafka将使用该键在主题分区之间对数据进行分区。具有相同键的事件将被发送到相同的分区。如果键为null,则事件将发送到随机分区。
Kafka接收器还为key.serializer(org.apache. Kafka .common. serialize . stringserializer)和value.serializer(org.apache. Kafka .common. serialize . bytearrayserializer)提供默认值。不建议修改这些参数。
弃用属性:

Property NameDefaultDescription
brokerListUse kafka.bootstrap.servers
topicdefault-flume-topicUse kafka.topic
batchSize100Use kafka.flumeBatchSize
requiredAcks1Use kafka.producer.acks

下面给出了Kafka sink的一个配置示例。属性以前缀kafka开头.kafka producer.在创建Kafka生成器时传递的属性并不仅限于本例中给出的属性。还可以在这里包含定制属性,并通过作为方法参数传入的Flume上下文对象在预处理器中访问它们。

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

Security and Kafka Sink:

 

Flume和Kafka之间的通信通道支持安全认证和数据加密。对于安全身份验证,可以使用Kafka版本0.9.0中的SASL/GSSAPI (Kerberos V5)或SSL(尽管参数名为SSL,但实际的协议是TLS实现)。

到目前为止,数据加密仅由SSL/TLS提供。

设置kafka.producer.security.protocol 符合下列任何一项值意味着:

  • SASL_PLAINTEXT - 没有数据加密的Kerberos或明文身份验证
  • SASL_SSL - 带有数据加密的Kerberos或纯文本身份验证
  • SSL - 基于TLS加密,具有可选的身份验证.

警告:启用SSL时会导致性能下降,其程度取决于CPU类型和JVM实现。参考文献:Kafka安全概述和用于跟踪这个问题的jira: Kafka -2561

TLS and Kafka Sink:

 

请阅读配置Kafka客户端SSL中描述的步骤,以了解用于微调的其他配置设置,例如以下任何一种:安全提供程序、密码套件、启用的协议、信任存储或密钥存储类型。

使用服务器端身份验证和数据加密的示例配置。

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SSL
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

注意:默认情况下属性ssl.endpoint.identification.algorithm没有定义,因此没有执行主机名验证。为了启用主机名验证,请设置以下属性:

a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS

一旦启用,客户端将针对以下两个字段之一验证服务器的完全限定域名(FQDN):

  1. Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  2. Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6

如果还需要客户端身份验证,那么应该向Flume agent配置添加以下内容。每个Flume agent必须拥有自己的客户端证书,这些证书必须由Kafka agent单独或通过其签名链进行信任。常见的示例是通过一个根CA对每个客户端证书进行签名,而这个根CA又受到Kafka代理的信任。

a1.sinks.sink1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.sinks.sink1.kafka.producer.ssl.keystore.password = <password to access the keystore>

如果密钥存储和密钥使用不同的密码保护,则使用ssl.key.password 属性将为生产者密钥库提供所需的额外秘密:

a1.sinks.sink1.kafka.producer.ssl.key.password = <password to access the key>

Kerberos and Kafka Sink: 

要将Kafka sink与Kerberos保护的Kafka集群一起使用,请设置producer.security.protocol上面为生产者指出的属性。与Kafka代理一起使用的Kerberos keytab和主体在JAAS文件的“KafkaClient”部分中指定。“客户端”部分描述了需要时的Zookeeper连接。有关JAAS文件内容的信息,请参见Kafka文档。可以通过flume-env.sh中的JAVA_OPTS指定这个JAAS文件的位置,也可以选择指定系统范围内的kerberos配置: 

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

使用SASL_PLAINTEXT的安全配置示例:

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka

使用SASL_SSL的安全配置示例: 

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

JAAS文件示例。有关其内容的参考,请参阅SASL配置的Kafka文档中所需身份验证机制(GSSAPI/PLAIN)的客户端配置部分。与Kafka源或Kafka通道不同,“客户端”部分不是必需的,除非其他连接组件需要它。另外,请确保Flume进程的操作系统用户具有jaas和keytab文件上的读权限。

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

14  HTTP Sink

此接收器的行为是,它将从通道获取事件,并使用HTTP POST请求将这些事件发送到远程服务。事件内容作为POST主体发送。

此接收器的错误处理行为取决于目标服务器返回的HTTP响应。sink backoff/ready状态是可配置的,事务提交/回滚结果也是可配置的,该事件是否有助于成功的事件排放计数也是可配置的。

状态代码不可读的服务器返回的任何格式错误的HTTP响应都将导致回退信号,并且事件不会从channel中消费。

必须属性以粗体显示。

Property NameDefaultDescription
channel 
typeThe component type name, needs to be http.
endpointThe fully qualified URL endpoint to POST to
connectTimeout5000The socket connection timeout in milliseconds
requestTimeout5000The maximum request processing time in milliseconds
contentTypeHeadertext/plainThe HTTP Content-Type header
acceptHeadertext/plainThe HTTP Accept header value
defaultBackofftrueWhether to backoff by default on receiving all HTTP status codes
defaultRollbacktrueWhether to rollback by default on receiving all HTTP status codes
defaultIncrementMetricsfalseWhether to increment metrics by default on receiving all HTTP status codes
backoff.CODEConfigures a specific backoff for an individual (i.e. 200) code or a group (i.e. 2XX) code
rollback.CODEConfigures a specific rollback for an individual (i.e. 200) code or a group (i.e. 2XX) code
incrementMetrics.CODEConfigures a specific metrics increment for an individual (i.e. 200) code or a group (i.e. 2XX) code

 

请注意,最特定的HTTP状态代码匹配用于backoff、rollback和incrementMetrics配置选项。如果2XX和200状态码都有配置值,那么200个HTTP代码将使用200值,而201-299范围内的所有其他HTTP代码将使用2XX值。

在不向HTTP端点发出任何请求的情况下,将使用任何空事件或空事件。

agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = http
a1.sinks.k1.channel = c1
a1.sinks.k1.endpoint = http://localhost:8080/someuri
a1.sinks.k1.connectTimeout = 2000
a1.sinks.k1.requestTimeout = 2000
a1.sinks.k1.acceptHeader = application/json
a1.sinks.k1.contentTypeHeader = application/json
a1.sinks.k1.defaultBackoff = true
a1.sinks.k1.defaultRollback = true
a1.sinks.k1.defaultIncrementMetrics = false
a1.sinks.k1.backoff.4XX = false
a1.sinks.k1.rollback.4XX = false
a1.sinks.k1.incrementMetrics.4XX = true
a1.sinks.k1.backoff.200 = false
a1.sinks.k1.rollback.200 = false
a1.sinks.k1.incrementMetrics.200 = true

15 Custom Sink

自定义接收器是接收器接口的自己实现。启动Flume agent时,自定义sink的类及其依赖项必须包含在代理的类路径中。自定义接收器的类型是它的FQCN。必须属性以粗体显示。

Property NameDefaultDescription
channel 
typeThe component type name, needs to be your FQCN

agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.example.MySink
a1.sinks.k1.channel = c1

 

翻译自官网flume1.8用户指南,原文地址:Flume 1.8.0 User Guide

篇幅限制,分为以下5篇:

【翻译】Flume 1.8.0 User Guide(用户指南)

【翻译】Flume 1.8.0 User Guide(用户指南) source

【翻译】Flume 1.8.0 User Guide(用户指南) Sink

【翻译】Flume 1.8.0 User Guide(用户指南) Channel

【翻译】Flume 1.8.0 User Guide(用户指南) Processors

转载于:https://www.cnblogs.com/Springmoon-venn/p/10341705.html