Support Questions

Find answers, ask questions, and share your expertise

Apache nifi merge two different csv file and create combine output to json

avatar
New Contributor

Hi All,

I am novice to nifi and want to achieve my task with nifi

Problem statement :

I have two csv file 1) Inventory 2) Rate which are in csv

Both of files have common UNIT_ID (InventoryId)

Input Inventory File

UNIT_ID|NAME|TOTAL
1111|ABC|90
1112|XYZ|50

Input Rate File

UNIT_ID|TYPE|RATE
1111|ONE|900
1111|TWO|5800
1112|RTY|500

Output Json

[{
"UNIT_ID": "1111",
"NAME": "ABC",
"TOTAL": "90",
"RATES": [{
"TYPE": "ONE",
"RATE": "900"
}, {
"TYPE": "TWO",
"RATE": "5800"
}]
},
{
"UNIT_ID": "1112",
"NAME": "XYZ",
"TOTAL": "50",
"RATES": [{
"TYPE": "RTY",
"RATE": "500"
}]
}
]

Please help to find a way to do this, I am able to convert csv to json but stuck with how can I join by unit_id

Thanks.

2 REPLIES 2

avatar
Super Collaborator

NiFi doesn't do joins in-stream. To accomplish this, I'd recommend processing the first CSV file and storing it in HBase with key=id. You can then process the second csv file, creating a flow to use the id as a query to HBase and get back the reference data from the first CSV.

If you'd like to do windowed joins, take a look at Streaming Analytics Manager inside of the HDF platform.

avatar

@Chintan Visani

I believe it's possible. Let me explain my usecase first. My objective is to take 4 CSVs as input and joining all of them based on a unique column named 'CustomerID' available in all the CSVs.

I did not directly join the CSVs rather I put each of them into database tables and later join them using ExecuteSQL command.

1. GetFile -> UpdateAttribute -> PutDatabaseRecord (For each of the CSVs i.e., for my case - 4 CSVs put into 4 different tables)

2. ExecuteSQL (you write SQL query to join all the required tables) -> ConvertRecord -> PutFile (This will give you the join of all the CSVs)

3. After that again, GetFile -> UpdateAttribute -> PutDatabaseRecord (If you again want to put the aggregated/joined CSV into the DB, then you can do it again)

Hope it would help.

Thanks and regards,

Spandan.