Member since
08-11-2017
136
Posts
1
Kudos Received
0
Solutions
12-07-2022
02:40 PM
@mburgess I have the same problem. The code was written in jython. from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import OutputStreamCallback from java.lang import Object # Define a subclass of OutputStreamCallback for use in session.write() class PyOutputStreamCallback(OutputStreamCallback): def __init__(self, cadena): self.cadena = cadena def process(self, outputStream): outputStream.write(bytearray(self.cadena.encode('utf-8'))) # end class flowFile = session.get() if (flowFile != None): table = flowFile.getAttribute('table_name') if (table != 'database_1.dbo.dtproperties'): newFlowFile = session.create(flowFile) newFFC = '' newFlowFile = session.write(newFlowFile, PyOutputStreamCallback(newFFC)) session.transfer(newFlowFile, REL_SUCCESS) elif (table == 'database_1.dbo.dtproperties'): session.remove(flowFile) else: session.transfer(flowFile, REL_FAILURE) I'm understanding that flow files are passing one by one, right? So if there is a flow file with an attribute ''database_1.dbo.dtproperties" should be eliminated without affecting other flow files. But I got "flowfile has already marked for removal"
... View more
01-28-2020
05:46 AM
@mbaid You should consider opening a new thread as this one is quite old. A new thread would also allow you to provide further details that will be unique to your situation.
... View more
11-14-2018
02:46 PM
I think you may have misunderstood me? Below an existing "Answer" in this existing HCC thread you will see a "Add comment" link you can click on to respond to that existing answer. Just as I have done here. I noticed you started an entirely new question in HCC with your response above about the truststore. Thank you, Matt
... View more
12-21-2017
08:20 PM
1 Kudo
@sally sally Check the back pressure object threshold value in the connection feeding the MergeContent processor to make sure it has been changed to a value that will allow enough files to queue up.
... View more
01-16-2019
02:23 PM
@Jose Paul - A bin would be eligible for merge with only 1 FlowFile in it since you set minEntries to 1. - When the Processor get scheduled to execute (based on configured run schedule and scheduling strategy), It will look at one of possible many incoming connections and look at only the queued FlowFile at that exact moment in time. It will then bin those FlowFiles based on configuration. So it multiple FlowFiles happen to exist in that connection with sam filename attribute value, they will be placed in same bin. At completion of of placing those FlowFiles in bins, the bins are evaluated if they are eligible to be merged. In your case since minEntries is 1 all bins with 1 or more FlowFiles would be merged. - If you run schedule is set to run as fast as possible (Timer Driven with run schedule of 0 sec), it may be reading the inbound connection so fast that it only contains 1 or just a few FlowFiles per execution. - The other scenario is an inbound connection with over 500 queued FlowFiles at time of execution. If we assume there are more than 500 FlowFiles with unique values assigned to the filename attribute, each would end up be placed in new bin (correlation attribute config). As soon as bin 500 has a FlowFile assigned to it and MergeContent tries to bin unique filename number 501, it has no available bins left so it forces the merging of the oldest bin to free a bin. - Thank you, Matt
... View more
11-10-2017
07:04 PM
1 Kudo
@sally sally yeah, you can do that by using replace text processor with search value property as <details>\s*([\s\S]+.*)\n+\s+<\/details> //capture every thing enclosed in details tag as capture group 1 then in replacement value <details>
${filename}
$1
</details> you can customize the replacement value as per your needs. Replace Text processor Configs:- input:- <?xml version="1.0" encoding="UTF-8"?>
<service>
<Person>
<details>
<start>2017-10-22</start>
<id>*******</id>
<makeVersion>1</makeVersion>
<patch>patch</patch>
<parameter>1</parameter>
</details>
</Person>
</service> output:- <?xml version="1.0" encoding="UTF-8"?>
<service>
<Person>
<details>
1497701925152409
<start>2017-10-22</start>
<id>*******</id>
<makeVersion>1</makeVersion>
<patch>patch</patch>
<parameter>1</parameter>
</details>
</Person>
</service
... View more
10-30-2017
04:24 PM
thank you
... View more
03-28-2019
04:46 AM
@Shu how can i use this for multiple file base on one file name example :- input path contains 3 files and one is .done.cvs emp.csv dept.csv account.csv date.done.csv if the input path contains the .done.csv then only my file should route in nifi flow . else it should not be route .
... View more
10-25-2017
07:10 PM
If you are waiting for X number of flow files to be received, you can use something like this (assuming you want 10 flow files): def flowfileList = session.get(10)
if(flowfileList.size() < 10) {
session.rollback()
return
}
// If you get here, you have 10 flowfiles in flowfileList
... View more
10-25-2017
12:41 PM
@sally sally If the server NiFi is running on shuts down, NiFi when restarted, will come back up in the last known state. So, the state for the ListHDFS processor will be recovered and it will resume the listing from the last known timestamp.
... View more