Reply
Highlighted
New Contributor
Posts: 2
Registered: ‎03-04-2019

[apache FLUME] Get variable in JSON from kafka source and store in special dir in HDFS

[ Edited ]

Hi all,

 

I'm in apache flume 1.5.2. 
I have to get the value of timestamp (Year, mounth, day, hour) in the json and write in hdfs directory with a variable like hdfs://FLUME/%{year}/%{mounth}/${day}/${hour}/filename.log

 

For example :

 

 

in source, i've this json events on kafka :
{"request":{uri:blabla},"@timestamp":"2019-03-04T13:26:47.000Z"}

and i need to store it on hdfs: 
/FLUME/2019/03/04/13/filename.log

I'm not sure I can use the interceptor timestamp ..

 

Thanks,

 

Regards

New Contributor
Posts: 2
Registered: ‎03-04-2019

Re: [apache FLUME] Get variable in JSON from kafka source and store in special dir in HDFS

agent_03.sources.zk_kafka.interceptors = i1
agent_03.sources.zk_kafka.interceptors.i1.type = regex_extractor
agent_03.sources.zk_kafka.interceptors.i1.regex = .*timestamp\\":\\"(\\d\\d\\d\\d)\\-(\\d\\d)\\-(\\d\\d)T(\\d\\d).*
agent_03.sources.zk_kafka.interceptors.i1.serializers = s1 s2 s3 s4
agent_03.sources.zk_kafka.interceptors.i1.serializers.s1.name = year
agent_03.sources.zk_kafka.interceptors.i1.serializers.s2.name = mounth
agent_03.sources.zk_kafka.interceptors.i1.serializers.s3.name = day
agent_03.sources.zk_kafka.interceptors.i1.serializers.s4.name = hour

# Describe the sink
agent_03.sinks.hdfs_sink.type= HDFS
agent_03.sinks.hdfs_sink.hdfs.fileType=DataStream
agent_03.sinks.hdfs_sink.writeFormat = Text
agent_03.sinks.hdfs_sink.channel = channel1
agent_03.sinks.hdfs_sink.hdfs.path = hdfs://localhost:8020/STATS_FLUME/%{topic}/%{year}/%{mounth}/%{day}/%{hour}

 

it works with this configuration.

Announcements