Support Questions

Find answers, ask questions, and share your expertise

GZIP files stored into HDFS


Hi there,


I made some progress, but maybe I miss the general point. Here is my task. 


1) Input are CSV files with text header stored as *.GZ, see sample below 


# zcat node_b.csv.gz
"GW1","148107752951195","7","5","2","2016-12-07 02:25:34","2033","4f77e7dad9036236e4f7da40548dc554","4f77e7dad9036236e4f7da40548dc554"
"GW1","148106672991977","7","6","4","2016-12-07 02:25:35","2033","3a718b73c50979f007c0d1313694f1c2","3a718b73c50979f007c0d1313694f1c2"
"GW1","148107758951539","7","5","2","2016-12-07 02:26:34","2033","2aefedc71ca3323098a5e937cd88eb6f","8af7f1b8c31891a15cd08aca699403f5"


2) I have to store somehow the CSV files into HDFS in binary from (parquete or avro) in the form which will be used by IMPALA for subsequent queries. Moreover it must be stored according to the day of CSV record.


I have already succeed with uncompressed files which I can store according to the day which contains every CSV record, but now I have start processing compressed files and not as text but binary and strucutured.


Any clue? My current configuration:


# Sources, channels, and sinks are defined per agent name, in this case 'xtend'.
xtend.sources = source1
xtend.channels = channel1
xtend.sinks = mavenir

# For each source, channel, and sink
xtend.sources.source1.type = spooldir
xtend.sources.source1.channels = channel1
xtend.sources.source1.spoolDir = /home/cloudera/runs
xtend.sources.source1.fileHeader = false
xtend.sources.source1.fileSuffix = .done
#xtend.sources.source1.deletePolicy = immediate
xtend.sources.source1.deletePolicy = never

xtend.sources.source1.interceptors.i2.type = regex_filter
xtend.sources.source1.interceptors.i2.regex = ^ADM_INSTANCE.*

xtend.sources.source1.interceptors.i3.type = regex_extractor
xtend.sources.source1.interceptors.i3.regex = ","([0-9]{4})-([0-9]{2})-([0-9]{2}) = myear
xtend.sources.source1.interceptors.i3.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer = mmonth
xtend.sources.source1.interceptors.i3.serializers.s2.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer = mday
xtend.sources.source1.interceptors.i3.serializers.s3.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer

xtend.sources.source1.interceptors.i3.serializers = s1 s2 s3
xtend.sources.source1.interceptors = i2 i3

#channel config
xtend.channels.channel1.type = memory
xtend.channels.channel1.capacity = 1000
xtend.channels.channel1.transactionCapacity = 100

# sink config
xtend.sinks.mavenir.type = hdfs = channel1
xtend.sinks.mavenir.fileType = DataStream
xtend.sinks.mavenir.hdfs.filePrefix = xtend-
xtend.sinks.mavenir.hdfs.batchSize = 1000
xtend.sinks.mavenir.hdfs.rollSize = 268435456
xtend.sinks.mavenir.hdfs.path = hdfs://flume/events/year=%{myear}/month=%{mmonth}/day=%{mday}



# Write format can be text or writable
# use a single csv file at a time
xtend.sinks.mavenir.hdfs.maxOpenFiles = 1






There is the avro sink to look into using. And you can always load it as it, lay a table over it, create the parquet version of the same table, and run 'insert into select... on a scheduler.

This post links to a preso that may help.
Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.