Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

CSV files stored in partition to HDFS

avatar
Explorer

Hi everyone,

 

I've been using cloudera just one week, so I am sorry for my ignorance. 

 

My aim is following. There is a server producing CSV files every hour (more than 50MB raw) stored as GZIP.I've got an upload mechanism to make them uploaded into Flume input directory.

 

But now I need to store them into HDFS as "partition" structure shown below. I have been told this is required in order to let Impala efectively read the data. 

 

/hive/warehouse/test/fact_my_service/year=2017/month=2/day=21

 

Can you please share some hints how to get my data stored that way and how to make Impala to understand already stored data?

 

Thank you,

Milan 

9 REPLIES 9

avatar
Champion

@Milanovo

 

There are two questions,
1. how to make impala to understand already stored data?
2. how to get data stored?

 

Ans for first question:
Imapala is an SQL like query engine. so you can create 'external' table on top of existing data as follows.

CREATE EXTERNAL TABLE IF NOT EXISTS tblname(
col1 datatype,
col2 datatype)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
location '/hive/warehouse/.....';

 

Ans for the 2nd question:

Once you mentioned the location while table creation, it will take care the file internally. May be you can use partition option but no need to alter/add/modify your location

avatar
Explorer
Thank you,
I have just replied to csguna with an aditional requests. If you have a clue, I will appreciate it.

milan

avatar
Champion

But now I need to store them into HDFS as "partition" structure shown below. I have been told this is required in order to let Impala efectively read the data. 

 /hive/warehouse/test/fact_my_service/year=2017/month=2/day=21

 

 

Answer :  You can use hdfs sink -  escape sequence like 

a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d 

 

%d	day of month (01)
%m	month (01..12)
%Y	year (2010)

refere the apache flume api  - hdfs sink

https://flume.apache.org/FlumeUserGuide.html

 

Can you please share some hints how to get my data stored that way and how to make Impala to understand already stored data?

 

if you have a common shared metastore for hive and impala 

you can create external table and point the location 

 

CREATE EXTERNAL TABLE table_ name
> (userid INT,  movid STRING, age TINYINT)
> ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
> LOCATION ' hive warehouse or any location of your data ' 

Note  - Make sure to perform 

INVALIDATE METADATA;

Since we have created the table outside we have  to refersh hive metastore in impala to query .

 

 

 

avatar
Explorer

Thank you for your reply.

 

Right now I have a table in Impala with the desired structure. A manual loookup is working 🙂 Moreover I have a script wich creates a new partition on a daily basis. Today I got a new one:

 

/hive/warehouse/test/fact_my_service/year=2017/month=2/day=22

 

Above part is done. But the next is more complex as I initially thought. Maybe better to share a real example with you.There is a one record of CSV file. Typically 1 CSV file contains circa 200 records. Client pulls a bunch of CSV files in GZIP every hour to Flume.

 

"admxix12","14877123338113","1","51","4","2017-02-22 00:25:40","20985","6671366849","6671777157","2017-02-21 15:25:33","",0000000000,"","RESPONSE#6005","","","","334020","334050","033"

 

All I need is this:

 

1) As much as possible CSV files from a single day to convert into PARQUETE (binary format due to performace) and stored in HDFS as described below.

 

2) According to date populated in CSV files, the corresponding partition in HDFS has to be chosen. Above sample has to be stored in partition created just today. But if by chance the client pulled CSV with older date (due to crash or MW) the CSV file has to be stored into appropriate partition. I assume I miss some convert and lookup script here.

 

3) Add some extra field into the current CSV structure. As a partition key I have to add one new field into the current CSV structure. This is requested by Impala and Tablau for a quick search. So again here I need some hint how to modify the existing data.

 

 

Thank you in advance.

Milan 

 

 

avatar
Champion

I am letting me my thoughts. Please correct me if I am wrong in understanding your issue. 

 

1. You can use  the below when you are creating table in impala .

STORED AS PARQUET

 

2 .  For example, with a school_records table partitioned on a year column, there is a separate data directory for each different year value, and all the data for that year is stored in a data file in that directory. A query that includes a WHERE condition such as YEAR=1966, YEAR IN (1989,1999), can examine only the data files from the appropriate directory   " quoted from Cloudera Impala  knoweldge base"

 

3. Would you consider writing a custom interceptor to add the field in the event header or you could you UUID interceptor for unique id , second option  but i am not sure you could pull the data from hdfs and run a  python script to add a new field. 

avatar
Explorer

Ad 1 and 3)  Clear.

Ad 2) I didnt get. I show you what I need:

 

I have 3 partitions

 

root@hdp-node1:~ # hdfs dfs -ls /hive/warehouse/default/fact_xtend/year=2017/month=2
drwxrwxrwt - impala hdfs 0 2017-02-22 12:35 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=20
drwxrwxrwt - impala hdfs 0 2017-02-22 12:35 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=21
drwxrwxrwt - hive hdfs 0 2017-02-22 12:32 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=22

Now, when Flume is processing below records from single CSV files:

 

"admxix12","14877123338113","1","51","4","2017-02-20 18:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033"

"admxix12","14877123338113","1","51","4","2017-02-21 10:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033"

"admxix12","14877123338113","1","51","4","2017-02-22 00:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033"

 

I needo to have them stored like this:

 

drwxrwxrwt - impala hdfs 0 2017-02-22 12:35 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=20

"admxix12","14877123338113","1","51","4","2017-02-20 18:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033"

 

drwxrwxrwt - impala hdfs 0 2017-02-22 12:35 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=21

"admxix12","14877123338113","1","51","4","2017-02-21 10:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033"

 

drwxrwxrwt - hive hdfs 0 2017-02-22 12:32 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=22

"admxix12","14877123338113","1","51","4","2017-02-22 00:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033"

 

Indeed in binary form.

 

avatar
Champion

Below are the prerequisite for your requirements.

 

1  . We need a timestamp interceptor to inject the timestamp to every header. if you dont have one already in your flume -conf .properties. 

for example

 

tail1.sources.src1.interceptors  = ic1
tail1.sources.src1.interceptors.ic1.type = timestamp

 

2 . if its multi tier flume agent architecture it is recommended to use hdfs.UseLocalTimestamp that will use a timestamp generated by the flume agent runining the hdfs sink.

 

tail1.sinks.sink1.useLocalTimeStamp = true

 

3. To make all the files thats gets generated in month to put in a same  month folder all we have to do is to you use below config  - justs month and year 

tail1.sinks.sink1.hdfs.path = flume/collector1/%m-%Y

 

 

avatar
Explorer

Thanks again for the hint. Lets see how I modified the configuration with using serializers. Now it does what I want. So now I am able to read the date from every event and store it into the proper partition. 

 

NEXT topic is how to make Flume read incoming *.GZIP files and store them uncompressed into HDFS. I tried to use but without success.

 

xtend.sinks.mavenir.hdfs.codeC =gzip
xtend.sinks.mavenir.hdfs.fileType=CompressedStream

 

It stores the binary final into one concrete partition. 

 

My existing code:

 

# Sources, channels, and sinks are defined per agent name, in this case 'xtend'.
xtend.sources = source1
xtend.channels = channel1
xtend.sinks = mavenir

# For each source, channel, and sink
xtend.sources.source1.type = spooldir
xtend.sources.source1.channels = channel1
xtend.sources.source1.spoolDir = /home/cloudera/runs
xtend.sources.source1.fileHeader = false
xtend.sources.source1.fileSuffix = .done
#xtend.sources.source1.deletePolicy = immediate
xtend.sources.source1.deletePolicy = never
xtend.sources.source1.consumeOrder=oldest
xtend.sources.source1.pollDelay=15000
xtend.sources.source1.decodeErrorPolicy=IGNORE

xtend.sources.source1.interceptors.i2.type = regex_filter
xtend.sources.source1.interceptors.i2.regex = ^ADM_INSTANCE.*
xtend.sources.source1.interceptors.i2.excludeEvents=true

xtend.sources.source1.interceptors.i3.type = regex_extractor
xtend.sources.source1.interceptors.i3.regex = ","([0-9]{4})-([0-9]{2})-([0-9]{2})

xtend.sources.source1.interceptors.i3.serializers.s1.name = myear
xtend.sources.source1.interceptors.i3.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer

xtend.sources.source1.interceptors.i3.serializers.s2.name = mmonth
xtend.sources.source1.interceptors.i3.serializers.s2.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer

xtend.sources.source1.interceptors.i3.serializers.s3.name = mday
xtend.sources.source1.interceptors.i3.serializers.s3.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer

xtend.sources.source1.interceptors.i3.serializers = s1 s2 s3
xtend.sources.source1.interceptors = i2 i3

#channel config
xtend.channels.channel1.type = memory
xtend.channels.channel1.capacity = 1000
xtend.channels.channel1.transactionCapacity = 100

# sink config
xtend.sinks.mavenir.type = hdfs
xtend.sinks.mavenir.channel = channel1
xtend.sinks.mavenir.fileType = DataStream
xtend.sinks.mavenir.hdfs.filePrefix = xtend-
xtend.sinks.mavenir.hdfs.batchSize = 1000
xtend.sinks.mavenir.hdfs.rollSize = 268435456
xtend.sinks.mavenir.hdfs.useLocalTimeStamp=false
xtend.sinks.mavenir.hdfs.path = hdfs://flume/events/year=%{myear}/month=%{mmonth}/day=%{mday}

 

 

# Write format can be text or writable
xtend.sinks.mavenir.hdfs.writeFormat=Text
# use a single csv file at a time
xtend.sinks.mavenir.hdfs.maxOpenFiles = 1

avatar
Champion

Use the event desearlizer 

You can use BlobDeserializer  - if  you want to parse the whole file inside one event.

or You can use Line  -  one event per line of text input. 

 

Refer the link 

https://flume.apache.org/FlumeUserGuide.html#event-deserializers