Support Questions

Find answers, ask questions, and share your expertise

NIFI custom processor return multipleflow files in recursion

avatar
Explorer
I have to do some custom preprocessing tasks on a huge data file (~200GB). currently, its works as below way.
  1. select * from table
  2. preprocessing line by line
  3. return a new single flow file

so I decided to convert the above approach to the below way.

  1. get the row count from the user (let's assume the user gives 1000)
  2. execute select * query as resultSet
  3. read the results line by line (rs.next())
  4. when the line count reaches 1000 return the flow file and continues to other lines

So my approach is as below onTrigger

 public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        logger = getLogger();
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        try {
            final Long rowLimit = context.getProperty(ProcessorUtils.MAX_RECORD).evaluateAttributeExpressions(flowFile).asLong(); 
            Connection conn = DriverManager.getConnection(
                    // db connection properties
            );
            Statement stm = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            ResultSet rs = stm.executeQuery("sql query");
            Map<String, String> flowFileAttributes = flowFile.getAttributes();
            process(
                    rs,
                    session,
                    flowFileAttributes,
                    rowLimit,
            );
    
            FlowFile stateFlowFile = session.create();
            session.putAttribute(stateFlowFile, "processing_status", "end");
            session.putAttribute(stateFlowFile, "record_count", "0");
            session.transfer(stateFlowFile, GPReaderProcessorUtils.STATUS); // working line

        } catch (Exception e) {
            logger.warn(" conn " + e);
            session.transfer(flowFile, GPReaderProcessorUtils.FAILURE);
        }
    }

Recursion Approach for termination based on line count

        private void process(ResultSet rs, ProcessSession session, Map<String, String> flowFileAttributes, Long rowLimit) throws SQLException {
           try{
                logger.info("->  start processing with row limit = " + rowLimit);
                AtomicInteger mainI = new AtomicInteger(0);
                FlowFile flowFile = 
                session.write(session.putAllAttributes(session.create(), flowFileAttributes), (OutputStream out) -> {
                int i = 0;
                Map<String, String> preProcessResults = null;
                try {
                     String res = "";
                     while (i < rowLimit && rs.next()) {
                           //preprocessing happens here
                            i++;
                            mainI.set(i);
                            out.write(preprocess results.toString().getBytes(StandardCharsets.UTF_8));
                       }
                    }catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
                logger.info("gp-log ->"+ (String.valueOf(i)));
                out.close();
            });


            FlowFile stateFlowFile = session.create();
            session.putAttribute(stateFlowFile, "processing_status", "processing");
            session.putAttribute(stateFlowFile, "record_count", mainI.toString());
            session.transfer(stateFlowFile, GPReaderProcessorUtils.STATUS); // state relationship


            session.transfer(flowFile, GPReaderProcessorUtils.SUCCESS); // preprocessed flow files returns

            if(!rs.isAfterLast() && mainI != 0  && !rs.isLast()){ // recurrsion call
                logger.info("gp-log -> recursion call" );
                process(rs, session,flowFileAttributes,column,rowLimit);
            }

        }catch (Exception e){
            logger.info(e.getMessage());
            logger.error(e.getMessage());
      session.transfer(session.putAllAttributes(session.create(),flowFileAttributes), GPReaderProcessorUtils.FAILURE);
        }


    }

Expected Behaviour -> while processing this one return completed rows as flow files

Current Behaviour -> after finishing all return all flow files (generated in recursion) once.

please advise on this.

2 ACCEPTED SOLUTIONS

avatar

@D5ha I have had a recent similar need and I learned that you use session.commit() after a session.transfer to send a flowfile in an inner loop.  In a custom script, without the commit specifically, nifi will assume and do the commit sending all the data in a single end execution flowfile.

 

session.transfer(flowFile, REL_SUCCESS)
session.commit()

 

View solution in original post

avatar
Explorer

@steven-matison in my scenario there are 2 steps

 

each recursion call I create a flow file and query results write into that flow file

in the end, I need to create another new flow file also and put 2 attributes

and I need to transfer these 2 files into 2 relationships (status and success),

 

as per your answer, I put session.commit at the end of every session.transfer line. but now I'm getting an error called relationship is not specified but earlier it works well.

 

new development is as below

 

            FlowFile stateFlowFile = session.create();
            session.putAttribute(stateFlowFile, "processing_status", "processing");
            session.putAttribute(stateFlowFile, "record_count", mainI.toString());
            session.transfer(stateFlowFile, GPReaderProcessorUtils.STATUS); // state relationship
            session.commit()

            session.transfer(flowFile, GPReaderProcessorUtils.SUCCESS); // preprocessed flow files returns
session.commit()

 

 

View solution in original post

8 REPLIES 8

avatar

@D5ha I have had a recent similar need and I learned that you use session.commit() after a session.transfer to send a flowfile in an inner loop.  In a custom script, without the commit specifically, nifi will assume and do the commit sending all the data in a single end execution flowfile.

 

session.transfer(flowFile, REL_SUCCESS)
session.commit()

 

avatar
Explorer

@steven-matison in my scenario there are 2 steps

 

each recursion call I create a flow file and query results write into that flow file

in the end, I need to create another new flow file also and put 2 attributes

and I need to transfer these 2 files into 2 relationships (status and success),

 

as per your answer, I put session.commit at the end of every session.transfer line. but now I'm getting an error called relationship is not specified but earlier it works well.

 

new development is as below

 

            FlowFile stateFlowFile = session.create();
            session.putAttribute(stateFlowFile, "processing_status", "processing");
            session.putAttribute(stateFlowFile, "record_count", mainI.toString());
            session.transfer(stateFlowFile, GPReaderProcessorUtils.STATUS); // state relationship
            session.commit()

            session.transfer(flowFile, GPReaderProcessorUtils.SUCCESS); // preprocessed flow files returns
session.commit()

 

 

avatar

You will need to complete the session.commit() call with right details to fit your scenario.  

avatar
Explorer

@steven-matison can you please explain more about this

avatar

share the relationship error?   

 

I believe you need to complete the flowFIle transfer and committ first,  then do the other stateFlowFile transfer and commit

avatar
Explorer

@steven-matison thanks for your answer it is working fine with my local environment,
but I have to deploy this into production and it is 4 node cluster,

when running this processor in the cluster we are getting below logs and suddenly node going out of the cluster

xxx.xx.xx:xxxx- warning - Response time from yyy.yy.yy:yyyy was slow for each last 3 requests made.

xxx.xx.xx:xxxx- warning - Response time from yyy.yy.yy:yyyy was slow for each last 3 requests made.

xxx.xx.xx:xxxx- warning - Response time from yyy.yy.yy:yyyy was slow for each last 3 requests made.

xxx -  primary node 

yyy - node which custom processor is running

 

please guide me on this.

 

avatar

Are you sure the code is the same?   Sounds like the clustered version is kicking off job and its never finishing (endless loop?) or the task is creating some performance or connectivity issue..

 

Depending on your processor,  it may need to be flagged as Primary Only.

avatar
New Contributor

yes code is same for small tables it works fine also, here I need to query around (~200GB) data