Member since
02-21-2017
8
Posts
0
Kudos Received
0
Solutions
02-24-2017
02:42 AM
Thanks again for the hint. Lets see how I modified the configuration with using serializers. Now it does what I want. So now I am able to read the date from every event and store it into the proper partition. NEXT topic is how to make Flume read incoming *.GZIP files and store them uncompressed into HDFS. I tried to use but without success. xtend.sinks.mavenir.hdfs.codeC =gzip xtend.sinks.mavenir.hdfs.fileType=CompressedStream It stores the binary final into one concrete partition. My existing code: # Sources, channels, and sinks are defined per agent name, in this case 'xtend'. xtend.sources = source1 xtend.channels = channel1 xtend.sinks = mavenir # For each source, channel, and sink xtend.sources.source1.type = spooldir xtend.sources.source1.channels = channel1 xtend.sources.source1.spoolDir = /home/cloudera/runs xtend.sources.source1.fileHeader = false xtend.sources.source1.fileSuffix = .done #xtend.sources.source1.deletePolicy = immediate xtend.sources.source1.deletePolicy = never xtend.sources.source1.consumeOrder=oldest xtend.sources.source1.pollDelay=15000 xtend.sources.source1.decodeErrorPolicy=IGNORE xtend.sources.source1.interceptors.i2.type = regex_filter xtend.sources.source1.interceptors.i2.regex = ^ADM_INSTANCE.* xtend.sources.source1.interceptors.i2.excludeEvents=true xtend.sources.source1.interceptors.i3.type = regex_extractor xtend.sources.source1.interceptors.i3.regex = ","([0-9]{4})-([0-9]{2})-([0-9]{2}) xtend.sources.source1.interceptors.i3.serializers.s1.name = myear xtend.sources.source1.interceptors.i3.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer xtend.sources.source1.interceptors.i3.serializers.s2.name = mmonth xtend.sources.source1.interceptors.i3.serializers.s2.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer xtend.sources.source1.interceptors.i3.serializers.s3.name = mday xtend.sources.source1.interceptors.i3.serializers.s3.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer xtend.sources.source1.interceptors.i3.serializers = s1 s2 s3 xtend.sources.source1.interceptors = i2 i3 #channel config xtend.channels.channel1.type = memory xtend.channels.channel1.capacity = 1000 xtend.channels.channel1.transactionCapacity = 100 # sink config xtend.sinks.mavenir.type = hdfs xtend.sinks.mavenir.channel = channel1 xtend.sinks.mavenir.fileType = DataStream xtend.sinks.mavenir.hdfs.filePrefix = xtend- xtend.sinks.mavenir.hdfs.batchSize = 1000 xtend.sinks.mavenir.hdfs.rollSize = 268435456 xtend.sinks.mavenir.hdfs.useLocalTimeStamp=false xtend.sinks.mavenir.hdfs.path = hdfs://flume/events/year=%{myear}/month=%{mmonth}/day=%{mday} # Write format can be text or writable xtend.sinks.mavenir.hdfs.writeFormat=Text # use a single csv file at a time xtend.sinks.mavenir.hdfs.maxOpenFiles = 1
... View more
02-22-2017
05:24 AM
Ad 1 and 3) Clear. Ad 2) I didnt get. I show you what I need: I have 3 partitions root@hdp-node1:~ # hdfs dfs -ls /hive/warehouse/default/fact_xtend/year=2017/month=2 drwxrwxrwt - impala hdfs 0 2017-02-22 12:35 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=20 drwxrwxrwt - impala hdfs 0 2017-02-22 12:35 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=21 drwxrwxrwt - hive hdfs 0 2017-02-22 12:32 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=22 Now, when Flume is processing below records from single CSV files: "admxix12","14877123338113","1","51","4","2017-02-20 18:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033" "admxix12","14877123338113","1","51","4","2017-02-21 10:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033" "admxix12","14877123338113","1","51","4","2017-02-22 00:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033" I needo to have them stored like this: drwxrwxrwt - impala hdfs 0 2017-02-22 12:35 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=20 "admxix12","14877123338113","1","51","4","2017-02-20 18:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033" drwxrwxrwt - impala hdfs 0 2017-02-22 12:35 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=21 "admxix12","14877123338113","1","51","4","2017-02-21 10:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033" drwxrwxrwt - hive hdfs 0 2017-02-22 12:32 /hive/warehouse/default/fact_xtend/year=2017/month=2/day=22 "admxix12","14877123338113","1","51","4","2017-02-22 00:25:40","20985","6671366849","6671777157","","",0000000000,"","RESPONSE#6005","","","","334020","334050","033" Indeed in binary form.
... View more
02-22-2017
01:21 AM
Thank you, I have just replied to csguna with an aditional requests. If you have a clue, I will appreciate it. milan
... View more
02-22-2017
01:19 AM
Thank you for your reply. Right now I have a table in Impala with the desired structure. A manual loookup is working 🙂 Moreover I have a script wich creates a new partition on a daily basis. Today I got a new one: /hive/warehouse/test/fact_my_service/year=2017/month=2/day=22 Above part is done. But the next is more complex as I initially thought. Maybe better to share a real example with you.There is a one record of CSV file. Typically 1 CSV file contains circa 200 records. Client pulls a bunch of CSV files in GZIP every hour to Flume. "admxix12","14877123338113","1","51","4","2017-02-22 00:25:40","20985","6671366849","6671777157","2017-02-21 15:25:33","",0000000000,"","RESPONSE#6005","","","","334020","334050","033" All I need is this: 1) As much as possible CSV files from a single day to convert into PARQUETE (binary format due to performace) and stored in HDFS as described below. 2) According to date populated in CSV files, the corresponding partition in HDFS has to be chosen. Above sample has to be stored in partition created just today. But if by chance the client pulled CSV with older date (due to crash or MW) the CSV file has to be stored into appropriate partition. I assume I miss some convert and lookup script here. 3) Add some extra field into the current CSV structure. As a partition key I have to add one new field into the current CSV structure. This is requested by Impala and Tablau for a quick search. So again here I need some hint how to modify the existing data. Thank you in advance. Milan
... View more
02-21-2017
01:25 AM
Hi everyone, I've been using cloudera just one week, so I am sorry for my ignorance. My aim is following. There is a server producing CSV files every hour (more than 50MB raw) stored as GZIP.I've got an upload mechanism to make them uploaded into Flume input directory. But now I need to store them into HDFS as "partition" structure shown below. I have been told this is required in order to let Impala efectively read the data. /hive/warehouse/test/fact_my_service/year=2017/month=2/day=21 Can you please share some hints how to get my data stored that way and how to make Impala to understand already stored data? Thank you, Milan
... View more
Labels:
- Labels:
-
Apache Flume
-
Apache Hive
-
Apache Impala
-
HDFS