Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

[apache FLUME] Get variable in JSON from kafka source and store in special dir in HDFS

[apache FLUME] Get variable in JSON from kafka source and store in special dir in HDFS

New Contributor

Hi all,

 

I'm in apache flume 1.5.2. 
I have to get the value of timestamp (Year, mounth, day, hour) in the json and write in hdfs directory with a variable like hdfs://FLUME/%{year}/%{mounth}/${day}/${hour}/filename.log

 

For example :

 

 

in source, i've this json events on kafka :
{"request":{uri:blabla},"@timestamp":"2019-03-04T13:26:47.000Z"}

and i need to store it on hdfs: 
/FLUME/2019/03/04/13/filename.log

I'm not sure I can use the interceptor timestamp ..

 

Thanks,

 

Regards

1 REPLY 1

Re: [apache FLUME] Get variable in JSON from kafka source and store in special dir in HDFS

New Contributor

agent_03.sources.zk_kafka.interceptors = i1
agent_03.sources.zk_kafka.interceptors.i1.type = regex_extractor
agent_03.sources.zk_kafka.interceptors.i1.regex = .*timestamp\\":\\"(\\d\\d\\d\\d)\\-(\\d\\d)\\-(\\d\\d)T(\\d\\d).*
agent_03.sources.zk_kafka.interceptors.i1.serializers = s1 s2 s3 s4
agent_03.sources.zk_kafka.interceptors.i1.serializers.s1.name = year
agent_03.sources.zk_kafka.interceptors.i1.serializers.s2.name = mounth
agent_03.sources.zk_kafka.interceptors.i1.serializers.s3.name = day
agent_03.sources.zk_kafka.interceptors.i1.serializers.s4.name = hour

# Describe the sink
agent_03.sinks.hdfs_sink.type= HDFS
agent_03.sinks.hdfs_sink.hdfs.fileType=DataStream
agent_03.sinks.hdfs_sink.writeFormat = Text
agent_03.sinks.hdfs_sink.channel = channel1
agent_03.sinks.hdfs_sink.hdfs.path = hdfs://localhost:8020/STATS_FLUME/%{topic}/%{year}/%{mounth}/%{day}/%{hour}

 

it works with this configuration.