Created on 02-21-2017 01:25 AM - edited 09-16-2022 04:07 AM
Hi everyone,
I've been using cloudera just one week, so I am sorry for my ignorance.
My aim is following. There is a server producing CSV files every hour (more than 50MB raw) stored as GZIP.I've got an upload mechanism to make them uploaded into Flume input directory.
But now I need to store them into HDFS as "partition" structure shown below. I have been told this is required in order to let Impala efectively read the data.
/hive/warehouse/test/fact_my_service/year=2017/month=2/day=21
Can you please share some hints how to get my data stored that way and how to make Impala to understand already stored data?
Thank you,
Milan
Created 02-21-2017 08:50 AM
There are two questions,
1. how to make impala to understand already stored data?
2. how to get data stored?
Ans for first question:
Imapala is an SQL like query engine. so you can create 'external' table on top of existing data as follows.
CREATE EXTERNAL TABLE IF NOT EXISTS tblname(
col1 datatype,
col2 datatype)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
location '/hive/warehouse/.....';
Ans for the 2nd question:
Once you mentioned the location while table creation, it will take care the file internally. May be you can use partition option but no need to alter/add/modify your location
Created 02-22-2017 01:21 AM
Created on 02-21-2017 10:20 PM - edited 02-21-2017 10:21 PM
But now I need to store them into HDFS as "partition" structure shown below. I have been told this is required in order to let Impala efectively read the data.
/hive/warehouse/test/fact_my_service/year=2017/month=2/day=21
Answer : You can use hdfs sink - escape sequence like
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d
%d day of month (01) %m month (01..12) %Y year (2010)
refere the apache flume api - hdfs sink
https://flume.apache.org/FlumeUserGuide.html
Can you please share some hints how to get my data stored that way and how to make Impala to understand already stored data?
if you have a common shared metastore for hive and impala
you can create external table and point the location
CREATE EXTERNAL TABLE table_ name > (userid INT, movid STRING, age TINYINT) > ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' > LOCATION ' hive warehouse or any location of your data '
Note - Make sure to perform
INVALIDATE METADATA;
Since we have created the table outside we have to refersh hive metastore in impala to query .
Created 02-22-2017 01:19 AM
Thank you for your reply.
Right now I have a table in Impala with the desired structure. A manual loookup is working 🙂 Moreover I have a script wich creates a new partition on a daily basis. Today I got a new one:
/hive/warehouse/test/fact_my_service/year=2017/month=2/day=22
Above part is done. But the next is more complex as I initially thought. Maybe better to share a real example with you.There is a one record of CSV file. Typically 1 CSV file contains circa 200 records. Client pulls a bunch of CSV files in GZIP every hour to Flume.
"admxix12","14877123338113","1","51","4","2017-02-22 00:25:40","20985","6671366849","6671777157","2017-02-21 15:25:33","",0000000000,"","RESPONSE#6005","","","","334020","334050","033"
All I need is this:
1) As much as possible CSV files from a single day to convert into PARQUETE (binary format due to performace) and stored in HDFS as described below.
2) According to date populated in CSV files, the corresponding partition in HDFS has to be chosen. Above sample has to be stored in partition created just today. But if by chance the client pulled CSV with older date (due to crash or MW) the CSV file has to be stored into appropriate partition. I assume I miss some convert and lookup script here.
3) Add some extra field into the current CSV structure. As a partition key I have to add one new field into the current CSV structure. This is requested by Impala and Tablau for a quick search. So again here I need some hint how to modify the existing data.
Thank you in advance.
Milan
Created on 02-22-2017 03:24 AM - edited 02-22-2017 03:25 AM
I am letting me my thoughts. Please correct me if I am wrong in understanding your issue.
1. You can use the below when you are creating table in impala .
STORED AS PARQUET
2 . For example, with a school_records table partitioned on a year column, there is a separate data directory for each different year value, and all the data for that year is stored in a data file in that directory. A query that includes a WHERE condition such as YEAR=1966, YEAR IN (1989,1999), can examine only the data files from the appropriate directory " quoted from Cloudera Impala knoweldge base"
3. Would you consider writing a custom interceptor to add the field in the event header or you could you UUID interceptor for unique id , second option but i am not sure you could pull the data from hdfs and run a python script to add a new field.
Created 02-22-2017 05:24 AM
Ad 1 and 3) Clear.
Ad 2) I didnt get. I show you what I need:
I have 3 partitions
root@hdp-node1:~ # hdfs dfs -ls /hive/warehouse/default/fact_xtend/year=2017/month=2
drwxrwxrwt - impala hdfs 0 2017-02-22 12:35 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=20
drwxrwxrwt - impala hdfs 0 2017-02-22 12:35 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=21
drwxrwxrwt - hive hdfs 0 2017-02-22 12:32 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=22
Now, when Flume is processing below records from single CSV files:
"admxix12","14877123338113","1","51","4","2017-02-20 18:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033"
"admxix12","14877123338113","1","51","4","2017-02-21 10:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033"
"admxix12","14877123338113","1","51","4","2017-02-22 00:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033"
I needo to have them stored like this:
drwxrwxrwt - impala hdfs 0 2017-02-22 12:35 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=20
"admxix12","14877123338113","1","51","4","2017-02-20 18:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033"
drwxrwxrwt - impala hdfs 0 2017-02-22 12:35 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=21
"admxix12","14877123338113","1","51","4","2017-02-21 10:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033"
drwxrwxrwt - hive hdfs 0 2017-02-22 12:32 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=22
"admxix12","14877123338113","1","51","4","2017-02-22 00:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033"
Indeed in binary form.
Created on 02-22-2017 07:47 PM - edited 02-22-2017 07:50 PM
Below are the prerequisite for your requirements.
1 . We need a timestamp interceptor to inject the timestamp to every header. if you dont have one already in your flume -conf .properties.
for example
tail1.sources.src1.interceptors = ic1 tail1.sources.src1.interceptors.ic1.type = timestamp
2 . if its multi tier flume agent architecture it is recommended to use hdfs.UseLocalTimestamp that will use a timestamp generated by the flume agent runining the hdfs sink.
tail1.sinks.sink1.useLocalTimeStamp = true
3. To make all the files thats gets generated in month to put in a same month folder all we have to do is to you use below config - justs month and year
tail1.sinks.sink1.hdfs.path = flume/collector1/%m-%Y
Created 02-24-2017 02:42 AM
Thanks again for the hint. Lets see how I modified the configuration with using serializers. Now it does what I want. So now I am able to read the date from every event and store it into the proper partition.
NEXT topic is how to make Flume read incoming *.GZIP files and store them uncompressed into HDFS. I tried to use but without success.
xtend.sinks.mavenir.hdfs.codeC =gzip
xtend.sinks.mavenir.hdfs.fileType=CompressedStream
It stores the binary final into one concrete partition.
My existing code:
# Sources, channels, and sinks are defined per agent name, in this case 'xtend'.
xtend.sources = source1
xtend.channels = channel1
xtend.sinks = mavenir
# For each source, channel, and sink
xtend.sources.source1.type = spooldir
xtend.sources.source1.channels = channel1
xtend.sources.source1.spoolDir = /home/cloudera/runs
xtend.sources.source1.fileHeader = false
xtend.sources.source1.fileSuffix = .done
#xtend.sources.source1.deletePolicy = immediate
xtend.sources.source1.deletePolicy = never
xtend.sources.source1.consumeOrder=oldest
xtend.sources.source1.pollDelay=15000
xtend.sources.source1.decodeErrorPolicy=IGNORE
xtend.sources.source1.interceptors.i2.type = regex_filter
xtend.sources.source1.interceptors.i2.regex = ^ADM_INSTANCE.*
xtend.sources.source1.interceptors.i2.excludeEvents=true
xtend.sources.source1.interceptors.i3.type = regex_extractor
xtend.sources.source1.interceptors.i3.regex = ","([0-9]{4})-([0-9]{2})-([0-9]{2})
xtend.sources.source1.interceptors.i3.serializers.s1.name = myear
xtend.sources.source1.interceptors.i3.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
xtend.sources.source1.interceptors.i3.serializers.s2.name = mmonth
xtend.sources.source1.interceptors.i3.serializers.s2.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
xtend.sources.source1.interceptors.i3.serializers.s3.name = mday
xtend.sources.source1.interceptors.i3.serializers.s3.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
xtend.sources.source1.interceptors.i3.serializers = s1 s2 s3
xtend.sources.source1.interceptors = i2 i3
#channel config
xtend.channels.channel1.type = memory
xtend.channels.channel1.capacity = 1000
xtend.channels.channel1.transactionCapacity = 100
# sink config
xtend.sinks.mavenir.type = hdfs
xtend.sinks.mavenir.channel = channel1
xtend.sinks.mavenir.fileType = DataStream
xtend.sinks.mavenir.hdfs.filePrefix = xtend-
xtend.sinks.mavenir.hdfs.batchSize = 1000
xtend.sinks.mavenir.hdfs.rollSize = 268435456
xtend.sinks.mavenir.hdfs.useLocalTimeStamp=false
xtend.sinks.mavenir.hdfs.path = hdfs://flume/events/year=%{myear}/month=%{mmonth}/day=%{mday}
# Write format can be text or writable
xtend.sinks.mavenir.hdfs.writeFormat=Text
# use a single csv file at a time
xtend.sinks.mavenir.hdfs.maxOpenFiles = 1
Created 02-24-2017 05:21 AM
Use the event desearlizer
You can use BlobDeserializer - if you want to parse the whole file inside one event.
or You can use Line - one event per line of text input.
Refer the link
https://flume.apache.org/FlumeUserGuide.html#event-deserializers