Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NIFI custom processor return multipleflow files in recursion

avatar
Explorer
I have to do some custom preprocessing tasks on a huge data file (~200GB). currently, its works as below way.
  1. select * from table
  2. preprocessing line by line
  3. return a new single flow file

so I decided to convert the above approach to the below way.

  1. get the row count from the user (let's assume the user gives 1000)
  2. execute select * query as resultSet
  3. read the results line by line (rs.next())
  4. when the line count reaches 1000 return the flow file and continues to other lines

So my approach is as below onTrigger

 public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        logger = getLogger();
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        try {
            final Long rowLimit = context.getProperty(ProcessorUtils.MAX_RECORD).evaluateAttributeExpressions(flowFile).asLong(); 
            Connection conn = DriverManager.getConnection(
                    // db connection properties
            );
            Statement stm = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            ResultSet rs = stm.executeQuery("sql query");
            Map<String, String> flowFileAttributes = flowFile.getAttributes();
            process(
                    rs,
                    session,
                    flowFileAttributes,
                    rowLimit,
            );
    
            FlowFile stateFlowFile = session.create();
            session.putAttribute(stateFlowFile, "processing_status", "end");
            session.putAttribute(stateFlowFile, "record_count", "0");
            session.transfer(stateFlowFile, GPReaderProcessorUtils.STATUS); // working line

        } catch (Exception e) {
            logger.warn(" conn " + e);
            session.transfer(flowFile, GPReaderProcessorUtils.FAILURE);
        }
    }

Recursion Approach for termination based on line count

        private void process(ResultSet rs, ProcessSession session, Map<String, String> flowFileAttributes, Long rowLimit) throws SQLException {
           try{
                logger.info("->  start processing with row limit = " + rowLimit);
                AtomicInteger mainI = new AtomicInteger(0);
                FlowFile flowFile = 
                session.write(session.putAllAttributes(session.create(), flowFileAttributes), (OutputStream out) -> {
                int i = 0;
                Map<String, String> preProcessResults = null;
                try {
                     String res = "";
                     while (i < rowLimit && rs.next()) {
                           //preprocessing happens here
                            i++;
                            mainI.set(i);
                            out.write(preprocess results.toString().getBytes(StandardCharsets.UTF_8));
                       }
                    }catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
                logger.info("gp-log ->"+ (String.valueOf(i)));
                out.close();
            });


            FlowFile stateFlowFile = session.create();
            session.putAttribute(stateFlowFile, "processing_status", "processing");
            session.putAttribute(stateFlowFile, "record_count", mainI.toString());
            session.transfer(stateFlowFile, GPReaderProcessorUtils.STATUS); // state relationship


            session.transfer(flowFile, GPReaderProcessorUtils.SUCCESS); // preprocessed flow files returns

            if(!rs.isAfterLast() && mainI != 0  && !rs.isLast()){ // recurrsion call
                logger.info("gp-log -> recursion call" );
                process(rs, session,flowFileAttributes,column,rowLimit);
            }

        }catch (Exception e){
            logger.info(e.getMessage());
            logger.error(e.getMessage());
      session.transfer(session.putAllAttributes(session.create(),flowFileAttributes), GPReaderProcessorUtils.FAILURE);
        }


    }

Expected Behaviour -> while processing this one return completed rows as flow files

Current Behaviour -> after finishing all return all flow files (generated in recursion) once.

please advise on this.

2 ACCEPTED SOLUTIONS

avatar
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login

avatar
Explorer
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login
8 REPLIES 8

avatar
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login

avatar
Explorer
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login

avatar

You will need to complete the session.commit() call with right details to fit your scenario.  

avatar
Explorer

@steven-matison can you please explain more about this

avatar

share the relationship error?   

 

I believe you need to complete the flowFIle transfer and committ first,  then do the other stateFlowFile transfer and commit

avatar
Explorer

@steven-matison thanks for your answer it is working fine with my local environment,
but I have to deploy this into production and it is 4 node cluster,

when running this processor in the cluster we are getting below logs and suddenly node going out of the cluster

xxx.xx.xx:xxxx- warning - Response time from yyy.yy.yy:yyyy was slow for each last 3 requests made.

xxx.xx.xx:xxxx- warning - Response time from yyy.yy.yy:yyyy was slow for each last 3 requests made.

xxx.xx.xx:xxxx- warning - Response time from yyy.yy.yy:yyyy was slow for each last 3 requests made.

xxx -  primary node 

yyy - node which custom processor is running

 

please guide me on this.

 

avatar

Are you sure the code is the same?   Sounds like the clustered version is kicking off job and its never finishing (endless loop?) or the task is creating some performance or connectivity issue..

 

Depending on your processor,  it may need to be flagged as Primary Only.

avatar
New Contributor

yes code is same for small tables it works fine also, here I need to query around (~200GB) data