Support Questions

Find answers, ask questions, and share your expertise

Nifi: how to remove session based on atribute value in nifi

avatar
Contributor

I want to remove session in case i get certain data from file i have code like this,but i got errors "flowfile has already marked for removal", what should i change to get rid of extra errors?

  1. In case of session rollback flowfile will dissapear in queues also?

    2.should i use rollback instead of remove()?

    NodeList childNodes = nodeGettingChanged.getChildNodes();for(int i =0; i != childNodes.getLength();++i){Node child = childNodes.item(i);if(!(child instanceofElement))continue;if(child.getNodeName().equals("runAs")){if(child.getFirstChild().getTextContent()=="false"){
    
                                    session.remove(flowFile1);File deleteExtraFile =newFile("C://Users//s.tkhilaishvili//Desktop//try2//nifi-1.3.0//1//conf.xml");booleandelete=deleteExtraFile.delete();}else{
                                    child.getFirstChild().setNodeValue("false");}}}

    Document finalXmlDocument = xmlDocument; session.write(flowFile1, new StreamCallback() {

    publicvoid process(InputStream inputStream,OutputStream outputStream)throwsIOException{TransformerFactory transformerFactory =TransformerFactory.newInstance();Transformer transformer =null;try{
                            transformer = transformerFactory.newTransformer();}catch(TransformerConfigurationException e){
                            e.printStackTrace();}DOMSource source =newDOMSource(finalXmlDocument);
                        ffStream.close();ByteArrayOutputStream bos =newByteArrayOutputStream();StreamResult result =newStreamResult(bos);try{
                            transformer.transform(source, result);}catch(TransformerException e){
                            e.printStackTrace();}byte[] array = bos.toByteArray();
                        outputStream.write(array);}});
    
                 session.remove(flowFile);
                session.transfer(flowFile1, REL_SUCCESS);}
2 REPLIES 2

avatar
Master Guru

It's hard to tell from the formatting of your code above, but if you are executing session.remove(flowFile1) then later trying to transfer it to REL_SUCCESS, you will get that error. You can either change the logic to put the remove/transfer calls into an if-else block, or keep a boolean variable indicating whether the flow file has already been removed, and transfer if it has not been removed. It looks like you already have an if-clause checking the firstChild for "false", perhaps you could put the transfer in an else-clause.

avatar
Explorer

@mburgess I have the same problem. The code was written in jython.

 

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import OutputStreamCallback
from java.lang import Object



# Define a subclass of OutputStreamCallback for use in session.write()
class PyOutputStreamCallback(OutputStreamCallback):
  def __init__(self, cadena):
    self.cadena = cadena

  def process(self, outputStream):
    outputStream.write(bytearray(self.cadena.encode('utf-8')))
# end class

flowFile = session.get()
if (flowFile != None):
    table = flowFile.getAttribute('table_name')
   
    if (table != 'database_1.dbo.dtproperties'):
        newFlowFile = session.create(flowFile)
        newFFC = ''
        newFlowFile = session.write(newFlowFile, PyOutputStreamCallback(newFFC))        
        session.transfer(newFlowFile, REL_SUCCESS)
   
   
    elif (table == 'database_1.dbo.dtproperties'):
        session.remove(flowFile)

    else:
        session.transfer(flowFile, REL_FAILURE)
 
I'm understanding that flow files are passing one by one, right? So if there is a flow file with an attribute ''database_1.dbo.dtproperties" should be eliminated without affecting other flow files. But I got "flowfile has already marked for removal"