Member since
10-18-2024
12
Posts
3
Kudos Received
0
Solutions
05-20-2025
10:54 AM
Thanks a lot! It's a solution. For my case ControlRate is enough, but I take into account other options you suggested
... View more
05-20-2025
08:10 AM
My NiFi pipeline fetches and processes files from an SFTP server. When the number of files to fetch does not exceed 500, everything works well. However, sometimes the number of files the pipeline has to list at once exceeds 2,000,000, which critically impacts performance. I’m trying to find a way to throttle the file stream so that no more than 500 files are processed at any given time. First Attempt: GetSFTP Processor First, I looked into the GetSFTP processor. It has a convenient property called "Remote Poll Batch Size". However, this deprecated processor has several disadvantages, the most significant being that it does not maintain state. Because of this, when I set "Delete original" to False, it fetches files infinitely. Setting "Delete original" to True is not desirable because my pipeline’s guaranteed delivery requirements mandate that files should only be deleted after successful processing (not at the start of the pipeline). Second Attempt: Queue Back Pressure Next, I explored back pressure settings in the queue between ListSFTP and FetchSFTP. For example, I set the back pressure threshold to 500 in the queue between these processors. However, as I understand it: When ListSFTP reads 2 million files, it pushes all of them into the queue. ListSFTP can only add next files to the queue once FetchSFTP has processed enough files to bring the count below the back pressure limit (e.g., after 1,900,501 files are consumed). This behavior is not what I expected. Expected Behavior: I wanted one of the following: ListSFTP lists no more than 500 files at a time, or The queue between ListSFTP and FetchSFTP holds no more than 500 files, or FetchSFTP fetches no more than 500 files at a time. Question: Is there a way to solve this problem without creating a custom processor or using Groovy scripts?
... View more
Labels:
- Labels:
-
Apache NiFi
11-24-2024
08:54 AM
1 Kudo
Hello! Input flowFile has column "date_time" with timestamp-millis type The task is to update each value of column, by substracting 3 hours from datetime value. In fact I need to transform datetime value from timezone GMT +3 to timezone GMT +0 For example, value "2024-11-24 19:43:17" to be transformed in "2024-11-24 16:43:17" I tried to use UpdateRecord with following script: format(toNumber((/date_time):minus(3600)), "yyyy-MM-dd HH:mm:ss") And, as expected, transformation failed with exception: "Unrecognized token 'minus'..." Operation "minus" applied to attributes values. And "date_time" is not an attribute. As an alternative, I considered next way: 1. SplitText for splitting one FlowFile into N FlowFiles, where N - is the number of records. 2.UpdateAttribute to add new attribute "dateTime" for each of N flowFiles by: dateTime <- /date_time 3. UpdateRecord for each of N flowFiles with script: /date_time <- ${dateTime:toNumber:minus(3600):format("yyyy-MM-dd HH:mm:ss")} 4.Merge N flowFiles to one FlowFile But this way seem to be too complicated. Is there are more straightforward way? ))
... View more
Labels:
- Labels:
-
Apache NiFi
11-20-2024
07:08 AM
Also I tried to solve this problem, by using ScriptExecutor with following Groovy: import org.w3c.dom.Document
import org.w3c.dom.Element
import groovy.xml.XmlSlurper
import groovy.json.JsonBuilder
def xmlContent = '''<root>
<nested1>
<nested2>
<nested2>value</nested2>
</nested2>
</nested1>
</root>'''
def xml = new XmlSlurper().parseText(xmlContent)
def nested2Value = xml.root.nested1.nested2
def json = [
nested2: nested2Value
]
def json2 = [
nested3: nested2
]
def builder = new JsonBuilder(json2)
def xmlOutput = builder.toPrettyString() I returns: { "nested3": [ ] } It's not, what I expected, because "value" is missed.
... View more
11-19-2024
07:38 AM
Hello! Xml file has the following structure: <root>
<nested1>
<nested2>
<nested1>value</nested1>
</nested2>
</nested1>
</root> EvaluateXPath processor type_rec property: name(/*/*[1]) And I got the following error: EvaluateXPath[id=ce4e0c05-169f-387f-520d-77e2e664bacc] Input parsing failed FlowFile[filename=69de5f2c-a887-4042-8734-beca03e3d27b]: org.apache.nifi.xml.processing.ProcessingException: Parsing failed
- Caused by: org.xml.sax.SAXParseException; lineNumber: 6; columnNumber: 5; The element type "nested2" must be terminated by the matching end-tag "</nested2>". As I understand, parser except, at 4 level closed tag </nested1> , because it'v met open tag with the same name at level 2 (<nested1>). But at level 4 file has a new tag with the same name. I tried to use ReplcaseText processor for replace <nested2> <nested1> tag combination with new value - for example: <nested2> <nested1_new> So I tried to rename "nested1" at 4 level with "nested1_new" value. But without result - nothing is replaced. It there are other ways to solve this problem? Thanks.
... View more
Labels:
- Labels:
-
Apache NiFi
11-19-2024
04:41 AM
1 Kudo
Yes, CSV Format is Custom Format
... View more
11-19-2024
01:31 AM
1 Kudo
Hello! The source csv file is: 123456TextValue1 654321TextValue2 where 123456 and TextValue1 are different values, separated by binary delimiter (\u0001) Similar, 654321 and TextValue2 have non-visible at Web-UI binary delimiter (\u0001) I use ConvertRecord for updating the delimiter from "\u0001" to ";" RecordReader is CSVReader with the following properties: - Schema Access Strategy: Use 'Schema Text' Property - Schema Text: #{test_schema} - Value Separator: \u0001 - Treat First Line as Header: false - Ignore CSV Header Column Names: true RecordWriter is CSVRecordSetWriter: - Schema Access Strategy: Use 'Schema Text' Property - Schema Text: #{test_schema} - Value Separator: ; - Include Header Line: true test_schema is { "type": "record", "name": "test_schema", "fields": [ { "name": "FIELD_1", "type": ["int","null"], "description": "FIELD_1" }, { "name": "FIELD_2", "type": ["string","null"], "description": "FIELD_2" } ] } Expected output is: FIELD_1;FIELD_2; 123456;TextValue1 654321;TextValue2 But I got the following error: ERROR ConvertRecord[id=01931001-0d7e-1e43-146d-1a380e6d43b7] Failed to process FlowFile[filename=7365b509-7100-4bc2-a070- 4cc8ce8377b9]; will route to failure: org.apache.nifi.processor.exception.ProcessException: Could not parse incoming data Caused by: org.apache.nifi.serialization.MalformedRecordException: Error while getting next record Caused by: java.lang.NumberFormatException: For input string: "123456TextValue1"
... View more
Labels:
- Labels:
-
Apache NiFi
10-23-2024
09:55 AM
Thank you, the matter was in "/", indeed ))
... View more
10-23-2024
05:45 AM
Hello! The general task is to transform xml to csv For this aim I created following pipeline: Source xml content is: <Email E-mail="AR-BIK@GMAIL.COM"> <Data GRN="3214600023849" Date="2024-07-28"></Data></Email> Parameters of EvaluateXPath are: ReplaceText parameters: Attribute grn successfully produced with expected value: But value "3214600023849" is not translated to csv. Only ; delimiter. Encoding is UTF-8 What am I doing wrong?
... View more
Labels:
- Labels:
-
Apache NiFi
10-20-2024
01:45 AM
@drewski7 wrote: @AndreyDE Is one flowfile going into the SplitText processor and outputting 10000 flowfiles? Yes - one flow file How big is the flowfile going into the SplitText processor? About 30 KB Or is the source of the pipeline recursively getting all objects in your S3 bucket? Yes, it searches all objects recursively
... View more