Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi - MergeContent - Multiple CSV files - counter

avatar
Contributor

Hi

I want to merge 6 CSV files into 1

I use

ListHDFS >> FechHDFS >> UpdateAttribute >> MergeContent >> QueryRecord >> ...

 

ListHDFS >> FechHDFS >> UpdateAttribute is repeated as the number of files to merge ( 6 times)

because I shoud to give for each file the fragment.index parameter and an allias ( used later for the join query in QueryRecord )

yamaga_0-1644591989998.png

 

The UpdateAttribute for one of the files:

yamaga_1-1644592040948.png

 

 

Is there a way to avoid multiple processors to get the files ListHDFS >> FechHDFS >>  UpdateAttribute 

How to reduce is into one ListHDFS >> FechHDFS >> UpdateAttribute and give a different fragment.index for each different file which shloud be between 0 and 6 (max number of files) ?

 

I tried NextInt() to attribute a new fragment.index value but it is incremental, not suitable for multiple executions.

 

Thanks in advance.

7 REPLIES 7

avatar
Super Guru

Are all the files similar and you assign the fragment indexes in a round robin fashion? (1,2,3,4,5,6,1,2,3,...)

Or do the different index numbers identify different types of files?

When you merge, can you merge as many files as possible or do they always need to be merged 6 by 6?

Can you give an example of how you are going to use the index in the QueryRecord processor?

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

avatar
Contributor

Hi @araujo thanks for your reply

 

This an example:

 

I have six csv files:.

file1.csv, file2.csv, file3.csv, file4.csv have the same structure

file5.csv, file6.csv have a different structure but the have some common columns that I will use in the QueryRecord

 

In order to use The MergeContent, I should give a different.index attribute to each filename, it should be between 0 and 5 (as I have 6 files ).

 

Before the MergeContent, I use ListHDFS >> FechHDFS >> UpdateAttribute 6 times (for each file) which is not a good design as I can have more than 6 files in the future, UpdateAttribute is where I assign the frangment.index attribute for each file.

 

My question is, is there a way to have ONE ListHDFS >> FechHDFS >> UpdateAttribute that get all files and assign a different frangment.index for each file (between 0 and 5) in one UpdateAttribute processor

 

For your question about the QueryRecord:

I give a "metric"parameter for the 4 first files and another to two others in UpdateAttribute processor

then in the QueryRecord I use this kind of query:

 

select file1.col1, file1.col2, file2.col3, file2.col4,file3.col5,file3.col6
 from (
   select ID, file1.col1, file1.col2 where m = 'a'
 ) file1
 left join (
   select ID, file2.col3, file2.col4 from FLOWFILE where m = 'b'
 ) file2 on file1.ID_ART = file2.ID_ART
 left join (
   select ID, file3.col5,file3.col6 from FLOWFILE where m = 'c'
) file3 on file1.ID = file3.ID

avatar
Super Guru

How do you differentiate the files in HDFS? Are they in different directories? Have different filenames?

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

avatar
Super Guru

If the different types of files are in different directories in HDFS, for example, you can use Expression Language to set the values for fragment.index and metric, using a single ListHDFS -> FetchHDFS -> UpdateAttribute.

 

The expression below sets the value for metric according to the path where the file came from:

 

${path:equals("/tmp/input/dir1"):ifElse("a",
${path:equals("/tmp/input/dir2"):ifElse("b",
${path:equals("/tmp/input/dir3"):ifElse("c",
${path:equals("/tmp/input/dir4"):ifElse("d",
${path:equals("/tmp/input/dir5"):ifElse("e",
${path:equals("/tmp/input/dir6"):ifElse("f",
"other")})})})})})}

 

You can do the same for fragment.index.

 

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

avatar
Super Guru

@yamaga , does the above help?

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

avatar
Contributor

Hi @araujo 

Thanks a lot for you implication.

 

That helped me to assign the metric attribute.

But not for fragment.index attribute because I might have more than one file coming from the same directory so I should assign different fragment.index for each one.

I also need to count the number of incoming files in order to assign the fragment.count attribute.

 

 

avatar
Contributor

I think about Groovy script but did not find how to loop each flowfile or how to get the count of the files