Created 02-11-2022 07:15 AM
Hi
I want to merge 6 CSV files into 1
I use
ListHDFS >> FechHDFS >> UpdateAttribute >> MergeContent >> QueryRecord >> ...
ListHDFS >> FechHDFS >> UpdateAttribute is repeated as the number of files to merge ( 6 times)
because I shoud to give for each file the fragment.index parameter and an allias ( used later for the join query in QueryRecord )
The UpdateAttribute for one of the files:
Is there a way to avoid multiple processors to get the files ListHDFS >> FechHDFS >> UpdateAttribute
How to reduce is into one ListHDFS >> FechHDFS >> UpdateAttribute and give a different fragment.index for each different file which shloud be between 0 and 6 (max number of files) ?
I tried NextInt() to attribute a new fragment.index value but it is incremental, not suitable for multiple executions.
Thanks in advance.
Created 02-12-2022 05:30 PM
Are all the files similar and you assign the fragment indexes in a round robin fashion? (1,2,3,4,5,6,1,2,3,...)
Or do the different index numbers identify different types of files?
When you merge, can you merge as many files as possible or do they always need to be merged 6 by 6?
Can you give an example of how you are going to use the index in the QueryRecord processor?
Created 02-12-2022 06:36 PM
Hi @araujo thanks for your reply
This an example:
I have six csv files:.
file1.csv, file2.csv, file3.csv, file4.csv have the same structure
file5.csv, file6.csv have a different structure but the have some common columns that I will use in the QueryRecord
In order to use The MergeContent, I should give a different.index attribute to each filename, it should be between 0 and 5 (as I have 6 files ).
Before the MergeContent, I use ListHDFS >> FechHDFS >> UpdateAttribute 6 times (for each file) which is not a good design as I can have more than 6 files in the future, UpdateAttribute is where I assign the frangment.index attribute for each file.
My question is, is there a way to have ONE ListHDFS >> FechHDFS >> UpdateAttribute that get all files and assign a different frangment.index for each file (between 0 and 5) in one UpdateAttribute processor
For your question about the QueryRecord:
I give a "metric"parameter for the 4 first files and another to two others in UpdateAttribute processor
then in the QueryRecord I use this kind of query:
select file1.col1, file1.col2, file2.col3, file2.col4,file3.col5,file3.col6
from (
select ID, file1.col1, file1.col2 where m = 'a'
) file1
left join (
select ID, file2.col3, file2.col4 from FLOWFILE where m = 'b'
) file2 on file1.ID_ART = file2.ID_ART
left join (
select ID, file3.col5,file3.col6 from FLOWFILE where m = 'c'
) file3 on file1.ID = file3.ID
Created 02-12-2022 07:40 PM
How do you differentiate the files in HDFS? Are they in different directories? Have different filenames?
Created on 02-12-2022 08:02 PM - edited 02-12-2022 08:22 PM
If the different types of files are in different directories in HDFS, for example, you can use Expression Language to set the values for fragment.index and metric, using a single ListHDFS -> FetchHDFS -> UpdateAttribute.
The expression below sets the value for metric according to the path where the file came from:
${path:equals("/tmp/input/dir1"):ifElse("a",
${path:equals("/tmp/input/dir2"):ifElse("b",
${path:equals("/tmp/input/dir3"):ifElse("c",
${path:equals("/tmp/input/dir4"):ifElse("d",
${path:equals("/tmp/input/dir5"):ifElse("e",
${path:equals("/tmp/input/dir6"):ifElse("f",
"other")})})})})})}
You can do the same for fragment.index.
Created 02-13-2022 02:07 PM
@yamaga , does the above help?
Created 02-14-2022 02:43 AM
Hi @araujo
Thanks a lot for you implication.
That helped me to assign the metric attribute.
But not for fragment.index attribute because I might have more than one file coming from the same directory so I should assign different fragment.index for each one.
I also need to count the number of incoming files in order to assign the fragment.count attribute.
Created 02-14-2022 02:45 AM
I think about Groovy script but did not find how to loop each flowfile or how to get the count of the files