Support Questions

Find answers, ask questions, and share your expertise

How to split the csv files into multiple files based on attribute name in Nifi

avatar

Hello,

I have a csv files with multiple attribute with header name:

Source FIle: Name, Age, Sex, Country, City, Postal Code

I want to break this csv on the basis of attribute name in 3 separate csv file:

File1: Name, Age, Country

File2: Name, Country, City

File 3: Country,. City, Postal code

Can someone please help me how can i do this into nifi?

1 ACCEPTED SOLUTION

avatar
Master Guru

@Yogesh Singh

If you are using NiFi 1.2+ then you can use Three ConverRecord processors in parallel to read your Csv file and create 3 different CsvRecordSetwriter with your required columns in it and processors will give you 3 different files.

Flow:-

ConvertRecord Processor:-

  1. Read the source file that having Name,Age,Sex,Country,City,PostalCode and writes the file with Name, Age, Country
  2. I have attached .xml and i have implemented for one convert record processor which is going to read the csv file and result only Name,City columns as an output file.
  3. You can save the xml file,upload to your nifi instance and enhance CsvRecordSetwriter as per your requirements.

Example:-

Input Csv File:-

Name,Age,Sex,Country,City,PostalCode
hcc,21,M,US,OR,32811
HDP,22,F,US,OR,32746

Output Csv file:-

Name,City
hcc,OR
HDP,OR

So in Convert Record i have setup Csv Reader which is going to read the incoming flowfile and CsvRecordSetWriter which is going to output only the required columns that we have mentioned in the controller service.

Xml file:-

convertrecord.xml

ConvertRecord example:-

https://community.hortonworks.com/articles/115311/convert-csv-to-json-avro-xml-using-convertrecord-p...

https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi

Flow:-

58410-flow.png

(or)

If you are using NiFi version prior to <1.2 then you need to use

1.execute stream command processor//Delete the header of the file using tail as command Path and command arguments as -n;+2 ,connect output stream relation to next processor
2.Split Text //to split the content of csv file to one line count, connect splits relation to next processor
3.Extract Text //add new properties and matching regex values that can extract the each value as header,connect success to all three parallel replace text processors.
4.Three Replace Text in parallel //specifying your required attribute names that you want on each file then each processor will result the required file.
5. MergeContent(optional) //to merge the each small flowfile content as one file before storing into directory.

Extract text Example:-

https://community.hortonworks.com/questions/160868/how-to-extracttext-from-flow-file-using-nifi-proc...

Replace text Example:-

https://community.hortonworks.com/questions/158910/parsing-json-formatted-csv-file-using-nifi.html

MergeContent Example:-

https://community.hortonworks.com/questions/149047/nifi-how-to-handle-with-mergecontent-processor.ht...

Let us know if you are having any issues..!!

View solution in original post

8 REPLIES 8

avatar
Master Guru

@Yogesh Singh

If you are using NiFi 1.2+ then you can use Three ConverRecord processors in parallel to read your Csv file and create 3 different CsvRecordSetwriter with your required columns in it and processors will give you 3 different files.

Flow:-

ConvertRecord Processor:-

  1. Read the source file that having Name,Age,Sex,Country,City,PostalCode and writes the file with Name, Age, Country
  2. I have attached .xml and i have implemented for one convert record processor which is going to read the csv file and result only Name,City columns as an output file.
  3. You can save the xml file,upload to your nifi instance and enhance CsvRecordSetwriter as per your requirements.

Example:-

Input Csv File:-

Name,Age,Sex,Country,City,PostalCode
hcc,21,M,US,OR,32811
HDP,22,F,US,OR,32746

Output Csv file:-

Name,City
hcc,OR
HDP,OR

So in Convert Record i have setup Csv Reader which is going to read the incoming flowfile and CsvRecordSetWriter which is going to output only the required columns that we have mentioned in the controller service.

Xml file:-

convertrecord.xml

ConvertRecord example:-

https://community.hortonworks.com/articles/115311/convert-csv-to-json-avro-xml-using-convertrecord-p...

https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi

Flow:-

58410-flow.png

(or)

If you are using NiFi version prior to <1.2 then you need to use

1.execute stream command processor//Delete the header of the file using tail as command Path and command arguments as -n;+2 ,connect output stream relation to next processor
2.Split Text //to split the content of csv file to one line count, connect splits relation to next processor
3.Extract Text //add new properties and matching regex values that can extract the each value as header,connect success to all three parallel replace text processors.
4.Three Replace Text in parallel //specifying your required attribute names that you want on each file then each processor will result the required file.
5. MergeContent(optional) //to merge the each small flowfile content as one file before storing into directory.

Extract text Example:-

https://community.hortonworks.com/questions/160868/how-to-extracttext-from-flow-file-using-nifi-proc...

Replace text Example:-

https://community.hortonworks.com/questions/158910/parsing-json-formatted-csv-file-using-nifi.html

MergeContent Example:-

https://community.hortonworks.com/questions/149047/nifi-how-to-handle-with-mergecontent-processor.ht...

Let us know if you are having any issues..!!

avatar

@Shu

Hi,

It didnt work on my csv files as its very large with almost 40 field. Having issue in converting to avro. I see in your xml, avro schema has been hard coded. Is it possible to auto generate from the csv file i have ?

avatar
Master Guru
@Yogesh Singh

Make sure your datatypes for 40 fields are matching in AvroSchemaRegistry and if there are null values for any of the fields then In AvroSchemaRegistry we need to add default value as null.

Example:-

{ "name": "Name", "type": "string"} //with this avro schema for Name field won't allow null values for Name
{ "name": "Name", "type": ["null", "string"]} //with this avro schema allow null values for Name filed
{
  "namespace": "nifi",
  "name": "balances",
  "type": "record",
  "fields": [
    	{ "name": "Name", "type": ["null", "string"]},
	{ "name": "Age" , "type": ["null", "int"]},
	{ "name": "Sex" , "type": ["null", "string"]},
	{ "name": "Country" , "type": ["null", "string"]},
	{ "name": "City" , "type": ["null", "string"]},
	{ "name": "PostalCode" , "type": ["null", "string"]}
  ]
}

it's recommended to add default values for the fields in Avro Schema.

Make sure all the things are setup right, if you are still facing issues then share more info like logs,schema registry and sample input records that would be easy to understand the issue..

avatar

@Shu

Thanks for the reply. My source file have below column:

Source Code,Source Product ID,Source Party ID,Govt Issued Tax ID,Prefix Name,First Name,Middle Name,Last Name,Suffix Name,Full Legal Name,NickName,Birth Date,Birth Place Name,Gender,Smoker Indicator,Risk Class,Marriage Status Code,Agreement Name,Status Code,Payment Method,Agreement_Original_Inception_Date,Payment_Frequency,Payment_Frequency_Desc,Agreement_Covered_Amount,Agreement_Number,Agreement_Party_Role,Effective_Date,Expiration_Date,Agreement_Premium_Amount,Line_Of_Business_Identifier,Licensed_Product_Name,Product_Description,Product_Code,Address_Line1,Address_Line2,Region,Location_Code,Location_Name(City),Location_Number,State(State),State_code,Country,Coverage_Part_Code,Coverage_Type_identifier,Coverage_name,Coverage_Description,Coverage_Group_Identifier,Coverage_Code,Coverage_Term_Period,Coverage_Gender,Coverage_Class,Agreement_Type_Code,Party_Type_Code,County_Name,Coverage_Type_Code

I think i am having issue because of data type in avro. My goal is to ultimately load all these files into a hive table. Can you please help here specially with date type?

avatar
Master Guru

@Yogesh Singh

It's better to open a new question for this date type issue for more visible to all community users.

Here is what i tried

Input:-

Timestamp,DATE,Age,Sex,Country,City,PostalCode
2017-01-10 12:34:56.33,2018-10-12,21,M,US,OR,32811
2017-01-10 12:34:56.33,2018-10-13,22,F,US,OR,32746

Output expected:-

Timestamp,City,DATE
2017-01-10 12:34:33.33,OR,2018-10-12
2017-01-10 12:34:33.33,OR,2018-10-13

As my Timestamp field format is yyyy-MM-dd HH:mm:ss.ss and DATE field format is yyyy-MM-dd, So in Convert Record Processor i specified these formats as shown below screenshot.

58414-cr-csv-reader.png

In AvroSchemaRegistry i have changed the Schema to

{
  "namespace": "nifi",
  "name": "balances",
  "type": "record",
  "fields": [
     { "name": "Timestamp", "type": { "type":"long", "logicalType":"timestamp-millis"} },
 { "name": "DATE"      , "type": {"type":"int", "logicalType":"date"} },
 { "name": "Age" , "type": ["null", "int"]},
 { "name": "Sex" , "type": ["null", "string"]},
 { "name": "Country" , "type": ["null", "string"]},
 { "name": "City" , "type": ["null", "string"]},
 { "name": "PostalCode" , "type": ["null", "string"]}
  ]
}

As i have defined Timestamp,DATE fileds with logicalTypes

In CsvRecordSetWriter controller service also we need to mention Date,Timestampformat as

58415-cr-csv-set-writer.png

So now CsvRecordSetWriter is going to write Timestamp,Date fields in that formats.

Schema Text property is

{
  "namespace": "nifi",
  "name": "balances",
  "type": "record",
  "fields": [
     { "name": "Timestamp", "type": { "type":"long", "logicalType":"timestamp-millis"} },
     { "name": "City", "type": ["null","string"]},
 { "name": "DATE" , "type": { "type":"int", "logicalType":"date"} }
  ]
}

Once we are setup all this configs correctly then we are going to have output that we have expected

Timestamp,City,DATE
2017-01-10 12:34:33.33,OR,2018-10-12
2017-01-10 12:34:33.33,OR,2018-10-13

avatar

@Shu

Attaching the source file i have. Below are the Avroschemaregistry structure:

{
  "namespace": "nifi",
  "name": "balances",
  "type": "record",
  "fields": [
	{ "name": "Source_Code", "type": "string" },
	{ "name": "Source_Product_ID", "type": "int" },
	{ "name": "Source_Party_ID", "type": "int" },
	{ "name": "Govt_Issued_Tax_ID", "type": "int" },
	{ "name": "Prefix_Name", "type": "string" },
	{ "name": "First_Name", "type": "string" },
	{ "name": "Middle_Name", "type": "string" },
	{ "name": "Last_Name", "type": "string" },
	{ "name": "Suffix_Name", "type": "string" },
	{ "name": "Full_Legal_Name", "type": "string" },
	{ "name": "NickName", "type": "string" },
	{ "name": "Birth_Date", "type": "int", "logicalType":"date" },
	{ "name": "Birth_Place_Name", "type": "string" },
	{ "name": "Gender", "type": "string" },
	{ "name": "Smoker_Indicator", "type": "string" },
	{ "name": "Risk_Class", "type": "string" },
	{ "name": "Marriage_Status_Code", "type": "string" },
	{ "name": "Agreement_Name", "type": "string" },
	{ "name": "Status_Code", "type": "string" },
	{ "name": "Payment_Method", "type": "string" },
	{ "name": "Agreement_Original_Inception_Date", "type": "int", "logicalType":"date" },
	{ "name": "Payment_Frequency", "type": "string" },
	{ "name": "Payment_Frequency_Desc", "type": "string" },
	{ "name": "Agreement_Covered_Amount", "type": "int" },
	{ "name": "Agreement_Number", "type": "string" },
	{ "name": "Agreement_Party_Role", "type": "string" },
	{ "name": "Effective_Date", "type": "int", "logicalType":"date" },
	{ "name": "Expiration_Date", "type": "int", "logicalType":"date" }
 ]
}

Below are the schema from CSVRecordSerWriter

{
  "namespace": "nifi",
  "name": "balances",
  "type": "record",
  "fields": [
	{ "name": "Source_Code", "type": "string" },
	{ "name": "Source_Product_ID", "type": "int" },
	{ "name": "Source_Party_ID", "type": "int" },
	{ "name": "Govt_Issued_Tax_ID", "type": "int" },
	{ "name": "Prefix_Name", "type": "string" },
	{ "name": "First_Name", "type": "string" },
	{ "name": "Middle_Name", "type": "string" },
	{ "name": "Last_Name", "type": "string" },
	{ "name": "Suffix_Name", "type": "string" },
	{ "name": "Full_Legal_Name", "type": "string" },
	{ "name": "NickName", "type": "string" },
	{ "name": "Birth_Date", "type": "int", "logicalType":"date" }
  ]
 }

I have given the date format as MM/dd/yy. I am attaching the error too. Is there something which i am doing wrong?

P.S: I changed the data in govt_issue_tax_id to make it integer and still the same error. I changed the date format as per your format and still error with date


undefined

avatar
Master Guru
@Yogesh Singh

The source file that you have share above not having any value as 2018-10-12 ,the NumberFormatException because the date format is not matching with the date format that you have specified(MM/dd/yy) in CsvReader and in CsvRecordSetWriter.

I have tried with the source file you have provided and it works as expected

Input:-

Source_Code,Source_Product_ID,Source_Party_ID,Govt_Issued_Tax_ID,Prefix_Name,First_Name,Middle_Name,Last_Name,Suffix_Name,Full_Legal_Name,NickName,Birth_Date,Birth_Place_Name,Gender,Smoker_Indicator,Risk_Class,Marriage_Status_Code,Agreement_Name,Status_Code,Payment_Method,Agreement_Original_Inception_Date,Payment_Frequency,Payment_Frequency_Desc,Agreement_Covered_Amount,Agreement_Number,Agreement_Party_Role,Effective_Date,Expiration_Date
FAST,1,1,000-23-1017,Mrs,First2310,F,Last31017,Dr,First2310 F Last31017,First2310 F Last31017,8/5/2013,USA,F,Y,P,N,Policy,Open,Cash,8/5/2013,M,Monthly,300000,231017,Insured,8/5/2013,8/6/2013
FAST,1,2,000-26-6065,Mrs,First2660,F,Last66065,Phd,First2660 F Last66065,First2660 F Last66065,8/6/2013,USA,F,Y,P,N,Policy,Open,Card,8/6/2013,M,Monthly,600000,266065,Insured,8/5/2013,8/6/2013
FAST,1,3,000-35-8819,Mrs,First3588,F,Last58819,Dr,First3588 F Last58819,First3588 F Last58819,8/7/2013,USA,F,Y,P,Y,Policy,Open,Check,8/7/2013,M,Monthly,150000,358819,Insured,8/5/2013,8/6/2013
FAST,1,4,000-39-7033,Mr,First3970,M,Last97033,Phd,First3970 M Last97033,First3970 M Last97033,8/8/2013,USA,M,Y,P,N,Policy,Open,Cash,8/8/2013,A,Annual,3000000,397033,Insured,8/5/2013,8/6/2013

Output:-

Source_Code,Source_Product_ID,Source_Party_ID,Govt_Issued_Tax_ID,Prefix_Name,First_Name,Middle_Name,Last_Name,Suffix_Name,Full_Legal_Name,NickName,Birth_Date
FAST,1,1,000-23-1017,Mrs,First2310,F,Last31017,Dr,First2310 F Last31017,First2310 F Last31017,8/5/2013
FAST,1,2,000-26-6065,Mrs,First2660,F,Last66065,Phd,First2660 F Last66065,First2660 F Last66065,8/6/2013
FAST,1,3,000-35-8819,Mrs,First3588,F,Last58819,Dr,First3588 F Last58819,First3588 F Last58819,8/7/2013
FAST,1,4,000-39-7033,Mr,First3970,M,Last97033,Phd,First3970 M Last97033,First3970 M Last97033,8/8/2013

Here is the attached xml that having your source data and all the new CsvReader,AvroSchemaRegistry,CsvRecordSetWriter.

Use the below xml file and make sure your date formats(or) timestamp formats are matching with the formats that you are mentioning in CsvReader,CsvRecordSetwriter as i mentioned above comment.

hcc-convertrecord-support-167066.xml

avatar

@Shu

Thanks. This worked. Do you know if i can automate this flow using this external output file in avro format to load into a hive table?