Created 01-25-2018 06:12 PM
Hello,
I have a csv files with multiple attribute with header name:
Source FIle: Name, Age, Sex, Country, City, Postal Code
I want to break this csv on the basis of attribute name in 3 separate csv file:
File1: Name, Age, Country
File2: Name, Country, City
File 3: Country,. City, Postal code
Can someone please help me how can i do this into nifi?
Created on 01-25-2018 10:06 PM - edited 08-17-2019 11:17 PM
If you are using NiFi 1.2+ then you can use Three ConverRecord processors in parallel to read your Csv file and create 3 different CsvRecordSetwriter with your required columns in it and processors will give you 3 different files.
Flow:-
ConvertRecord Processor:-
Example:-
Input Csv File:-
Name,Age,Sex,Country,City,PostalCode hcc,21,M,US,OR,32811 HDP,22,F,US,OR,32746
Output Csv file:-
Name,City hcc,OR HDP,OR
So in Convert Record i have setup Csv Reader which is going to read the incoming flowfile and CsvRecordSetWriter which is going to output only the required columns that we have mentioned in the controller service.
Xml file:-
ConvertRecord example:-
https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi
Flow:-
(or)
If you are using NiFi version prior to <1.2 then you need to use
1.execute stream command processor//Delete the header of the file using tail as command Path and command arguments as -n;+2 ,connect output stream relation to next processor 2.Split Text //to split the content of csv file to one line count, connect splits relation to next processor 3.Extract Text //add new properties and matching regex values that can extract the each value as header,connect success to all three parallel replace text processors. 4.Three Replace Text in parallel //specifying your required attribute names that you want on each file then each processor will result the required file. 5. MergeContent(optional) //to merge the each small flowfile content as one file before storing into directory.
Extract text Example:-
Replace text Example:-
https://community.hortonworks.com/questions/158910/parsing-json-formatted-csv-file-using-nifi.html
MergeContent Example:-
Let us know if you are having any issues..!!
Created on 01-25-2018 10:06 PM - edited 08-17-2019 11:17 PM
If you are using NiFi 1.2+ then you can use Three ConverRecord processors in parallel to read your Csv file and create 3 different CsvRecordSetwriter with your required columns in it and processors will give you 3 different files.
Flow:-
ConvertRecord Processor:-
Example:-
Input Csv File:-
Name,Age,Sex,Country,City,PostalCode hcc,21,M,US,OR,32811 HDP,22,F,US,OR,32746
Output Csv file:-
Name,City hcc,OR HDP,OR
So in Convert Record i have setup Csv Reader which is going to read the incoming flowfile and CsvRecordSetWriter which is going to output only the required columns that we have mentioned in the controller service.
Xml file:-
ConvertRecord example:-
https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi
Flow:-
(or)
If you are using NiFi version prior to <1.2 then you need to use
1.execute stream command processor//Delete the header of the file using tail as command Path and command arguments as -n;+2 ,connect output stream relation to next processor 2.Split Text //to split the content of csv file to one line count, connect splits relation to next processor 3.Extract Text //add new properties and matching regex values that can extract the each value as header,connect success to all three parallel replace text processors. 4.Three Replace Text in parallel //specifying your required attribute names that you want on each file then each processor will result the required file. 5. MergeContent(optional) //to merge the each small flowfile content as one file before storing into directory.
Extract text Example:-
Replace text Example:-
https://community.hortonworks.com/questions/158910/parsing-json-formatted-csv-file-using-nifi.html
MergeContent Example:-
Let us know if you are having any issues..!!
Created 01-26-2018 01:03 AM
Hi,
It didnt work on my csv files as its very large with almost 40 field. Having issue in converting to avro. I see in your xml, avro schema has been hard coded. Is it possible to auto generate from the csv file i have ?
Created 01-26-2018 01:36 AM
Make sure your datatypes for 40 fields are matching in AvroSchemaRegistry and if there are null values for any of the fields then In AvroSchemaRegistry we need to add default value as null.
Example:-
{ "name": "Name", "type": "string"} //with this avro schema for Name field won't allow null values for Name
{ "name": "Name", "type": ["null", "string"]} //with this avro schema allow null values for Name filed
{ "namespace": "nifi", "name": "balances", "type": "record", "fields": [ { "name": "Name", "type": ["null", "string"]}, { "name": "Age" , "type": ["null", "int"]}, { "name": "Sex" , "type": ["null", "string"]}, { "name": "Country" , "type": ["null", "string"]}, { "name": "City" , "type": ["null", "string"]}, { "name": "PostalCode" , "type": ["null", "string"]} ] }
it's recommended to add default values for the fields in Avro Schema.
Make sure all the things are setup right, if you are still facing issues then share more info like logs,schema registry and sample input records that would be easy to understand the issue..
Created 01-26-2018 01:49 AM
Thanks for the reply. My source file have below column:
Source Code,Source Product ID,Source Party ID,Govt Issued Tax ID,Prefix Name,First Name,Middle Name,Last Name,Suffix Name,Full Legal Name,NickName,Birth Date,Birth Place Name,Gender,Smoker Indicator,Risk Class,Marriage Status Code,Agreement Name,Status Code,Payment Method,Agreement_Original_Inception_Date,Payment_Frequency,Payment_Frequency_Desc,Agreement_Covered_Amount,Agreement_Number,Agreement_Party_Role,Effective_Date,Expiration_Date,Agreement_Premium_Amount,Line_Of_Business_Identifier,Licensed_Product_Name,Product_Description,Product_Code,Address_Line1,Address_Line2,Region,Location_Code,Location_Name(City),Location_Number,State(State),State_code,Country,Coverage_Part_Code,Coverage_Type_identifier,Coverage_name,Coverage_Description,Coverage_Group_Identifier,Coverage_Code,Coverage_Term_Period,Coverage_Gender,Coverage_Class,Agreement_Type_Code,Party_Type_Code,County_Name,Coverage_Type_Code
I think i am having issue because of data type in avro. My goal is to ultimately load all these files into a hive table. Can you please help here specially with date type?
Created on 01-26-2018 03:02 AM - edited 08-17-2019 11:17 PM
It's better to open a new question for this date type issue for more visible to all community users.
Here is what i tried
Input:-
Timestamp,DATE,Age,Sex,Country,City,PostalCode 2017-01-10 12:34:56.33,2018-10-12,21,M,US,OR,32811 2017-01-10 12:34:56.33,2018-10-13,22,F,US,OR,32746
Output expected:-
Timestamp,City,DATE 2017-01-10 12:34:33.33,OR,2018-10-12 2017-01-10 12:34:33.33,OR,2018-10-13
As my Timestamp field format is yyyy-MM-dd HH:mm:ss.ss and DATE field format is yyyy-MM-dd, So in Convert Record Processor i specified these formats as shown below screenshot.
In AvroSchemaRegistry i have changed the Schema to
{ "namespace": "nifi", "name": "balances", "type": "record", "fields": [ { "name": "Timestamp", "type": { "type":"long", "logicalType":"timestamp-millis"} }, { "name": "DATE" , "type": {"type":"int", "logicalType":"date"} }, { "name": "Age" , "type": ["null", "int"]}, { "name": "Sex" , "type": ["null", "string"]}, { "name": "Country" , "type": ["null", "string"]}, { "name": "City" , "type": ["null", "string"]}, { "name": "PostalCode" , "type": ["null", "string"]} ] }
As i have defined Timestamp,DATE fileds with logicalTypes
In CsvRecordSetWriter controller service also we need to mention Date,Timestampformat as
So now CsvRecordSetWriter is going to write Timestamp,Date fields in that formats.
Schema Text property is
{ "namespace": "nifi", "name": "balances", "type": "record", "fields": [ { "name": "Timestamp", "type": { "type":"long", "logicalType":"timestamp-millis"} }, { "name": "City", "type": ["null","string"]}, { "name": "DATE" , "type": { "type":"int", "logicalType":"date"} } ] }
Once we are setup all this configs correctly then we are going to have output that we have expected
Timestamp,City,DATE 2017-01-10 12:34:33.33,OR,2018-10-12 2017-01-10 12:34:33.33,OR,2018-10-13
Created 01-26-2018 04:32 PM
Attaching the source file i have. Below are the Avroschemaregistry structure:
{ "namespace": "nifi", "name": "balances", "type": "record", "fields": [ { "name": "Source_Code", "type": "string" }, { "name": "Source_Product_ID", "type": "int" }, { "name": "Source_Party_ID", "type": "int" }, { "name": "Govt_Issued_Tax_ID", "type": "int" }, { "name": "Prefix_Name", "type": "string" }, { "name": "First_Name", "type": "string" }, { "name": "Middle_Name", "type": "string" }, { "name": "Last_Name", "type": "string" }, { "name": "Suffix_Name", "type": "string" }, { "name": "Full_Legal_Name", "type": "string" }, { "name": "NickName", "type": "string" }, { "name": "Birth_Date", "type": "int", "logicalType":"date" }, { "name": "Birth_Place_Name", "type": "string" }, { "name": "Gender", "type": "string" }, { "name": "Smoker_Indicator", "type": "string" }, { "name": "Risk_Class", "type": "string" }, { "name": "Marriage_Status_Code", "type": "string" }, { "name": "Agreement_Name", "type": "string" }, { "name": "Status_Code", "type": "string" }, { "name": "Payment_Method", "type": "string" }, { "name": "Agreement_Original_Inception_Date", "type": "int", "logicalType":"date" }, { "name": "Payment_Frequency", "type": "string" }, { "name": "Payment_Frequency_Desc", "type": "string" }, { "name": "Agreement_Covered_Amount", "type": "int" }, { "name": "Agreement_Number", "type": "string" }, { "name": "Agreement_Party_Role", "type": "string" }, { "name": "Effective_Date", "type": "int", "logicalType":"date" }, { "name": "Expiration_Date", "type": "int", "logicalType":"date" } ] }
Below are the schema from CSVRecordSerWriter
{ "namespace": "nifi", "name": "balances", "type": "record", "fields": [ { "name": "Source_Code", "type": "string" }, { "name": "Source_Product_ID", "type": "int" }, { "name": "Source_Party_ID", "type": "int" }, { "name": "Govt_Issued_Tax_ID", "type": "int" }, { "name": "Prefix_Name", "type": "string" }, { "name": "First_Name", "type": "string" }, { "name": "Middle_Name", "type": "string" }, { "name": "Last_Name", "type": "string" }, { "name": "Suffix_Name", "type": "string" }, { "name": "Full_Legal_Name", "type": "string" }, { "name": "NickName", "type": "string" }, { "name": "Birth_Date", "type": "int", "logicalType":"date" } ] }
I have given the date format as MM/dd/yy. I am attaching the error too. Is there something which i am doing wrong?
P.S: I changed the data in govt_issue_tax_id to make it integer and still the same error. I changed the date format as per your format and still error with date
Created 01-26-2018 06:08 PM
@Yogesh Singh
The source file that you have share above not having any value as 2018-10-12 ,the NumberFormatException because the date format is not matching with the date format that you have specified(MM/dd/yy) in CsvReader and in CsvRecordSetWriter.
I have tried with the source file you have provided and it works as expected
Input:-
Source_Code,Source_Product_ID,Source_Party_ID,Govt_Issued_Tax_ID,Prefix_Name,First_Name,Middle_Name,Last_Name,Suffix_Name,Full_Legal_Name,NickName,Birth_Date,Birth_Place_Name,Gender,Smoker_Indicator,Risk_Class,Marriage_Status_Code,Agreement_Name,Status_Code,Payment_Method,Agreement_Original_Inception_Date,Payment_Frequency,Payment_Frequency_Desc,Agreement_Covered_Amount,Agreement_Number,Agreement_Party_Role,Effective_Date,Expiration_Date FAST,1,1,000-23-1017,Mrs,First2310,F,Last31017,Dr,First2310 F Last31017,First2310 F Last31017,8/5/2013,USA,F,Y,P,N,Policy,Open,Cash,8/5/2013,M,Monthly,300000,231017,Insured,8/5/2013,8/6/2013 FAST,1,2,000-26-6065,Mrs,First2660,F,Last66065,Phd,First2660 F Last66065,First2660 F Last66065,8/6/2013,USA,F,Y,P,N,Policy,Open,Card,8/6/2013,M,Monthly,600000,266065,Insured,8/5/2013,8/6/2013 FAST,1,3,000-35-8819,Mrs,First3588,F,Last58819,Dr,First3588 F Last58819,First3588 F Last58819,8/7/2013,USA,F,Y,P,Y,Policy,Open,Check,8/7/2013,M,Monthly,150000,358819,Insured,8/5/2013,8/6/2013 FAST,1,4,000-39-7033,Mr,First3970,M,Last97033,Phd,First3970 M Last97033,First3970 M Last97033,8/8/2013,USA,M,Y,P,N,Policy,Open,Cash,8/8/2013,A,Annual,3000000,397033,Insured,8/5/2013,8/6/2013
Output:-
Source_Code,Source_Product_ID,Source_Party_ID,Govt_Issued_Tax_ID,Prefix_Name,First_Name,Middle_Name,Last_Name,Suffix_Name,Full_Legal_Name,NickName,Birth_Date FAST,1,1,000-23-1017,Mrs,First2310,F,Last31017,Dr,First2310 F Last31017,First2310 F Last31017,8/5/2013 FAST,1,2,000-26-6065,Mrs,First2660,F,Last66065,Phd,First2660 F Last66065,First2660 F Last66065,8/6/2013 FAST,1,3,000-35-8819,Mrs,First3588,F,Last58819,Dr,First3588 F Last58819,First3588 F Last58819,8/7/2013 FAST,1,4,000-39-7033,Mr,First3970,M,Last97033,Phd,First3970 M Last97033,First3970 M Last97033,8/8/2013
Here is the attached xml that having your source data and all the new CsvReader,AvroSchemaRegistry,CsvRecordSetWriter.
Use the below xml file and make sure your date formats(or) timestamp formats are matching with the formats that you are mentioning in CsvReader,CsvRecordSetwriter as i mentioned above comment.
Created 01-27-2018 08:28 PM
Thanks. This worked. Do you know if i can automate this flow using this external output file in avro format to load into a hive table?