Reply
Highlighted
Explorer
Posts: 8
Registered: ‎02-21-2017

GZIP files stored into HDFS

Hi there,

 

I made some progress, but maybe I miss the general point. Here is my task. 

 

1) Input are CSV files with text header stored as *.GZ, see sample below 

 

# zcat node_b.csv.gz
ADM_INSTANCE,DIALOGUE_ID,DIALOGUE_TYPE,EVENT_TYPE,DIALOGUE_STATE,EVENT_TIME,DIALOGUE_SHORT_CODE,DIALOGUE_INITIATOR,DIALOGUE_RECEIVER
"GW1","148107752951195","7","5","2","2016-12-07 02:25:34","2033","4f77e7dad9036236e4f7da40548dc554","4f77e7dad9036236e4f7da40548dc554"
"GW1","148106672991977","7","6","4","2016-12-07 02:25:35","2033","3a718b73c50979f007c0d1313694f1c2","3a718b73c50979f007c0d1313694f1c2"
"GW1","148107758951539","7","5","2","2016-12-07 02:26:34","2033","2aefedc71ca3323098a5e937cd88eb6f","8af7f1b8c31891a15cd08aca699403f5"

 

2) I have to store somehow the CSV files into HDFS in binary from (parquete or avro) in the form which will be used by IMPALA for subsequent queries. Moreover it must be stored according to the day of CSV record.

 

I have already succeed with uncompressed files which I can store according to the day which contains every CSV record, but now I have start processing compressed files and not as text but binary and strucutured.

 

Any clue? My current configuration:

 

# Sources, channels, and sinks are defined per agent name, in this case 'xtend'.
xtend.sources = source1
xtend.channels = channel1
xtend.sinks = mavenir

# For each source, channel, and sink
xtend.sources.source1.type = spooldir
xtend.sources.source1.channels = channel1
xtend.sources.source1.spoolDir = /home/cloudera/runs
xtend.sources.source1.fileHeader = false
xtend.sources.source1.fileSuffix = .done
#xtend.sources.source1.deletePolicy = immediate
xtend.sources.source1.deletePolicy = never
xtend.sources.source1.consumeOrder=oldest
xtend.sources.source1.pollDelay=15000
xtend.sources.source1.decodeErrorPolicy=IGNORE

xtend.sources.source1.interceptors.i2.type = regex_filter
xtend.sources.source1.interceptors.i2.regex = ^ADM_INSTANCE.*
xtend.sources.source1.interceptors.i2.excludeEvents=true

xtend.sources.source1.interceptors.i3.type = regex_extractor
xtend.sources.source1.interceptors.i3.regex = ","([0-9]{4})-([0-9]{2})-([0-9]{2})

xtend.sources.source1.interceptors.i3.serializers.s1.name = myear
xtend.sources.source1.interceptors.i3.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer

xtend.sources.source1.interceptors.i3.serializers.s2.name = mmonth
xtend.sources.source1.interceptors.i3.serializers.s2.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer

xtend.sources.source1.interceptors.i3.serializers.s3.name = mday
xtend.sources.source1.interceptors.i3.serializers.s3.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer

xtend.sources.source1.interceptors.i3.serializers = s1 s2 s3
xtend.sources.source1.interceptors = i2 i3

#channel config
xtend.channels.channel1.type = memory
xtend.channels.channel1.capacity = 1000
xtend.channels.channel1.transactionCapacity = 100

# sink config
xtend.sinks.mavenir.type = hdfs
xtend.sinks.mavenir.channel = channel1
xtend.sinks.mavenir.fileType = DataStream
xtend.sinks.mavenir.hdfs.filePrefix = xtend-
xtend.sinks.mavenir.hdfs.batchSize = 1000
xtend.sinks.mavenir.hdfs.rollSize = 268435456
xtend.sinks.mavenir.hdfs.useLocalTimeStamp=false
xtend.sinks.mavenir.hdfs.path = hdfs://flume/events/year=%{myear}/month=%{mmonth}/day=%{mday}

 

 

# Write format can be text or writable
xtend.sinks.mavenir.hdfs.writeFormat=Text
# use a single csv file at a time
xtend.sinks.mavenir.hdfs.maxOpenFiles = 1

 

 

 

 

Posts: 642
Topics: 3
Kudos: 119
Solutions: 67
Registered: ‎08-16-2016

Re: GZIP files stored into HDFS

There is the avro sink to look into using. And you can always load it as it, lay a table over it, create the parquet version of the same table, and run 'insert into select... on a scheduler.

This post links to a preso that may help.

http://community.cloudera.com/t5/Data-Ingestion-Integration/Apache-Flume-and-parquet/m-p/8548
Announcements
New solutions