Created on 08-29-2016 01:23 PM - edited 08-19-2019 03:14 AM
We have a source file with pipe delimited rows and we need to fetch specific columns from the flow file. We are using a regular expression in a replaceText processor to extract the columns.The flow we are using is as shown
ListFile->FetchFile->Routetext->ReplaceText->PutFile
The source file we are using has some 21 columns and around 100000 records.The file size is around 25 MB.As soon as I start the processor the records are getting queued before the replaceText processor and the job is running indefinitely.In fact even after stopping the job we are unable to empty the queue or even delete any processor for that matter.
The Replace Text processor is configured as shown below:
I have increased the Maximum buffer size to 10 MB(1 MB Default) but still it is of no use. Considering there are only 100000 records in the file(25 MB) this should not take so long? Is there anything wrong with the configuration or the way we are using the flow? Any inputs would be very helpful.
The system we are using has 16 GB RAM and 4 cores.
Regards,
Indranil Roy
Created 08-29-2016 01:38 PM
Hi @INDRANIL ROY,
This should not be an issue and I am not really sure why you have this behavior. However here are two suggestions you may want to try:
- Increase the number of threads for your ReplaceText processor (Setting 'concurrent tasks' in scheduling tab)
- I think your regular expression may be consuming. I am guessing that your regex matches your whole input line and you just want to extract some columns. If this is the case, you could try using '^' and '$' to indicate start and end of the line respectively. Example: ^(.+)\|(.+)$
Just to help me understanding the situation (because the current configuration should not cause the issue you are reporting):
- What is the NiFi version you are using?
- Is the whole file coming in the ReplaceText processor? Or do you have one flow file per input line coming in the processor?
Created 08-29-2016 01:38 PM
Hi @INDRANIL ROY,
This should not be an issue and I am not really sure why you have this behavior. However here are two suggestions you may want to try:
- Increase the number of threads for your ReplaceText processor (Setting 'concurrent tasks' in scheduling tab)
- I think your regular expression may be consuming. I am guessing that your regex matches your whole input line and you just want to extract some columns. If this is the case, you could try using '^' and '$' to indicate start and end of the line respectively. Example: ^(.+)\|(.+)$
Just to help me understanding the situation (because the current configuration should not cause the issue you are reporting):
- What is the NiFi version you are using?
- Is the whole file coming in the ReplaceText processor? Or do you have one flow file per input line coming in the processor?
Created 08-29-2016 03:42 PM
We are using NIFI 0.6.0
To answer your second question a subset of the whole file is coming to the ReplaceText processor based on the condition in the RouteText processor.Say we have 100 records and some 50 records satisfies the condition so 50 records will come to the processor.
The regular expression we are using is
(.+)\|(.+)\|(.+)....
Where (.+) is repeated n of times based on the number of columns in the flowfile.
So as per your observation we should be using
^(.+)\|(.+)\|(.+)....$
Any other suggestion to improve the performance?
Created 08-30-2016 08:55 AM
@Pierre Villard , my thoughts exactly, I am really doubtful about the regex processing there, so far I have browsed through communities and mail archives and learned that compilation of regular expression in a ReplaceText Processor or any other processor occurs every time a flow file is pulled from queue with records, and since in this case the evaluation mode is set to "line by line" , the regex is compiled first, checked against each line and then replaced with the selected $1,$2.. expression in ReplacementValue. It could go unnoticed when a small file with 10-100 records with 5-10 columns, but when the file has 21 columns with 1000+ records, I do believe the processing can hit the performance wall really hard, hence the discernible bog down in speed as we can see in the RouteText -->ReplaceText stage.
Correct me if I'm wrong, but isn't it a more ETL specific operation ( by that I meant the selective column splitting with regex) than a dataflow specific operation. I mean its not entirely wrong to presume that NIFI being a DFM tool, will behave like this if ETL heavy operations are forced into it? Are there other efficient ways to do this in NIFI or is it a good idea to delegate the splitting task to an external script using ExcecuteScript processor? Please hit us with some insights.
Created 08-30-2016 09:19 AM
Your comments are correct. I was not expecting such a performance degradation but you are probably right that 21 columns are not helping. I will try to perform some tests on my side to check if performances can be improved.
But clearly, I do agree with you: in such a case I believe that ExecuteScript processor would be a better fit to solve the issue. It's really easy to write some lines of groovy (for example) to perform what you are looking for. Let me know if you need any help on this.
Created 08-30-2016 10:08 AM
Yes please, we do need help with how to do it with groovy, because sooner or later we will have to implement this in any performance-friendly way possible. We are looking at files that comes in GBs or more, and no way we can have a disappointingly slow DF like the current one. Any help is appreciated.
Created 08-30-2016 12:24 PM
OK, so I gave it a quick try. If some groovy gurus have feedbacks, don't hesitate.
I have simulated the following DF (template split-execute-script.xml😞 GenerateFlowFile -> ReplaceText -> ExecuteScript -> PutFile
GenerateFlowFile and ReplaceText are just used to generate flow files respecting your requirements. The ExecuteScript has the following body:
import org.apache.commons.io.IOUtils import java.nio.charset.* def flowFile = session.get() if (!flowFile) return flowFile = session.write(flowFile, {inputStream, outputStream -> inputStream.eachLine { line, count -> def columns = line.split("\\|") outputStream.write((columns[0] + "," + columns[1] + "," + columns[7] + "," + columns[8] + "\n").getBytes(StandardCharsets.UTF_8)) } } as StreamCallback) session.transfer(flowFile, REL_SUCCESS)
It may exist a better version of this code but it does the job. I let you try it with your data just to confirm but I think this will fulfill your performance expectations.
If dealing with huge files, you may want to first split your data into small chunks and then merge the data back in order to leverage data balancing (in cluster configuration) and multithreading.
Let me know if you have any question.
Created 12-30-2016 09:21 PM
In addition to @Pierre Villard's answer (which nicely gets the job done with ExecuteScript, I have a similar example here), since you are looking to do row-level operations (i.e. select columns from each row), you could use SplitText to split the large file into individual lines, then your ReplaceText above, then MergeContent to put the whole thing back together. I'm not sure which approach is faster per se; it would be an interesting exercise to try both.