<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: NIFI custom processor return multipleflow files in recursion in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356173#M237209</link>
    <description>&lt;P&gt;You will need to complete the session.commit() call with right details to fit your scenario.&amp;nbsp;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Tue, 25 Oct 2022 20:28:28 GMT</pubDate>
    <dc:creator>steven-matison</dc:creator>
    <dc:date>2022-10-25T20:28:28Z</dc:date>
    <item>
      <title>NIFI custom processor return multipleflow files in recursion</title>
      <link>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356130#M237203</link>
      <description>&lt;DIV class="votecell post-layout--left"&gt;&lt;DIV class="js-voting-container d-flex jc-center fd-column ai-stretch gs4 fc-black-200"&gt;&lt;SPAN&gt;I have to do some custom preprocessing tasks on a huge data file (~200GB). currently, its works as below way.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV class="postcell post-layout--right"&gt;&lt;DIV class="s-prose js-post-body"&gt;&lt;OL&gt;&lt;LI&gt;select * from table&lt;/LI&gt;&lt;LI&gt;preprocessing line by line&lt;/LI&gt;&lt;LI&gt;return a new single flow file&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;so I decided to convert the above approach to the below way.&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;get the row count from the user (let's assume the user gives 1000)&lt;/LI&gt;&lt;LI&gt;execute select * query as resultSet&lt;/LI&gt;&lt;LI&gt;read the results line by line (rs.next())&lt;/LI&gt;&lt;LI&gt;when the line count reaches 1000 return the flow file and continues to other lines&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;So my approach is as below&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;onTrigger&lt;/STRONG&gt;&lt;/P&gt;&lt;PRE&gt; &lt;SPAN class="hljs-keyword"&gt;public&lt;/SPAN&gt; &lt;SPAN class="hljs-keyword"&gt;void&lt;/SPAN&gt; &lt;SPAN class="hljs-title function_"&gt;onTrigger&lt;/SPAN&gt;&lt;SPAN class="hljs-params"&gt;(&lt;SPAN class="hljs-keyword"&gt;final&lt;/SPAN&gt; ProcessContext context, &lt;SPAN class="hljs-keyword"&gt;final&lt;/SPAN&gt; ProcessSession session)&lt;/SPAN&gt; &lt;SPAN class="hljs-keyword"&gt;throws&lt;/SPAN&gt; ProcessException {
        logger = getLogger();
        &lt;SPAN class="hljs-type"&gt;FlowFile&lt;/SPAN&gt; &lt;SPAN class="hljs-variable"&gt;flowFile&lt;/SPAN&gt; &lt;SPAN class="hljs-operator"&gt;=&lt;/SPAN&gt; session.get();
        &lt;SPAN class="hljs-keyword"&gt;if&lt;/SPAN&gt; (flowFile == &lt;SPAN class="hljs-literal"&gt;null&lt;/SPAN&gt;) {
            &lt;SPAN class="hljs-keyword"&gt;return&lt;/SPAN&gt;;
        }
        &lt;SPAN class="hljs-keyword"&gt;try&lt;/SPAN&gt; {
            &lt;SPAN class="hljs-keyword"&gt;final&lt;/SPAN&gt; &lt;SPAN class="hljs-type"&gt;Long&lt;/SPAN&gt; &lt;SPAN class="hljs-variable"&gt;rowLimit&lt;/SPAN&gt; &lt;SPAN class="hljs-operator"&gt;=&lt;/SPAN&gt; context.getProperty(ProcessorUtils.MAX_RECORD).evaluateAttributeExpressions(flowFile).asLong(); 
            &lt;SPAN class="hljs-type"&gt;Connection&lt;/SPAN&gt; &lt;SPAN class="hljs-variable"&gt;conn&lt;/SPAN&gt; &lt;SPAN class="hljs-operator"&gt;=&lt;/SPAN&gt; DriverManager.getConnection(
                    &lt;SPAN class="hljs-comment"&gt;// db connection properties&lt;/SPAN&gt;
            );
            &lt;SPAN class="hljs-type"&gt;Statement&lt;/SPAN&gt; &lt;SPAN class="hljs-variable"&gt;stm&lt;/SPAN&gt; &lt;SPAN class="hljs-operator"&gt;=&lt;/SPAN&gt; conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            &lt;SPAN class="hljs-type"&gt;ResultSet&lt;/SPAN&gt; &lt;SPAN class="hljs-variable"&gt;rs&lt;/SPAN&gt; &lt;SPAN class="hljs-operator"&gt;=&lt;/SPAN&gt; stm.executeQuery(&lt;SPAN class="hljs-string"&gt;"sql query"&lt;/SPAN&gt;);
            Map&amp;lt;String, String&amp;gt; flowFileAttributes = flowFile.getAttributes();
            process(
                    rs,
                    session,
                    flowFileAttributes,
                    rowLimit,
            );
    
            &lt;SPAN class="hljs-type"&gt;FlowFile&lt;/SPAN&gt; &lt;SPAN class="hljs-variable"&gt;stateFlowFile&lt;/SPAN&gt; &lt;SPAN class="hljs-operator"&gt;=&lt;/SPAN&gt; session.create();
            session.putAttribute(stateFlowFile, &lt;SPAN class="hljs-string"&gt;"processing_status"&lt;/SPAN&gt;, &lt;SPAN class="hljs-string"&gt;"end"&lt;/SPAN&gt;);
            session.putAttribute(stateFlowFile, &lt;SPAN class="hljs-string"&gt;"record_count"&lt;/SPAN&gt;, &lt;SPAN class="hljs-string"&gt;"0"&lt;/SPAN&gt;);
            session.transfer(stateFlowFile, GPReaderProcessorUtils.STATUS); &lt;SPAN class="hljs-comment"&gt;// working line&lt;/SPAN&gt;

        } &lt;SPAN class="hljs-keyword"&gt;catch&lt;/SPAN&gt; (Exception e) {
            logger.warn(&lt;SPAN class="hljs-string"&gt;" conn "&lt;/SPAN&gt; + e);
            session.transfer(flowFile, GPReaderProcessorUtils.FAILURE);
        }
    }&lt;/PRE&gt;&lt;P&gt;&lt;STRONG&gt;Recursion Approach for termination based on line count&lt;/STRONG&gt;&lt;/P&gt;&lt;PRE&gt;        &lt;SPAN class="hljs-keyword"&gt;private&lt;/SPAN&gt; &lt;SPAN class="hljs-keyword"&gt;void&lt;/SPAN&gt; &lt;SPAN class="hljs-title function_"&gt;process&lt;/SPAN&gt;&lt;SPAN class="hljs-params"&gt;(ResultSet rs, ProcessSession session, Map&amp;lt;String, String&amp;gt; flowFileAttributes, Long rowLimit)&lt;/SPAN&gt; &lt;SPAN class="hljs-keyword"&gt;throws&lt;/SPAN&gt; SQLException {
           &lt;SPAN class="hljs-keyword"&gt;try&lt;/SPAN&gt;{
                logger.info(&lt;SPAN class="hljs-string"&gt;"-&amp;gt;  start processing with row limit = "&lt;/SPAN&gt; + rowLimit);
                &lt;SPAN class="hljs-type"&gt;AtomicInteger&lt;/SPAN&gt; &lt;SPAN class="hljs-variable"&gt;mainI&lt;/SPAN&gt; &lt;SPAN class="hljs-operator"&gt;=&lt;/SPAN&gt; &lt;SPAN class="hljs-keyword"&gt;new&lt;/SPAN&gt; &lt;SPAN class="hljs-title class_"&gt;AtomicInteger&lt;/SPAN&gt;(&lt;SPAN class="hljs-number"&gt;0&lt;/SPAN&gt;);
                &lt;SPAN class="hljs-type"&gt;FlowFile&lt;/SPAN&gt; &lt;SPAN class="hljs-variable"&gt;flowFile&lt;/SPAN&gt; &lt;SPAN class="hljs-operator"&gt;=&lt;/SPAN&gt; 
                session.write(session.putAllAttributes(session.create(), flowFileAttributes), (OutputStream out) -&amp;gt; {
                &lt;SPAN class="hljs-type"&gt;int&lt;/SPAN&gt; &lt;SPAN class="hljs-variable"&gt;i&lt;/SPAN&gt; &lt;SPAN class="hljs-operator"&gt;=&lt;/SPAN&gt; &lt;SPAN class="hljs-number"&gt;0&lt;/SPAN&gt;;
                Map&amp;lt;String, String&amp;gt; preProcessResults = &lt;SPAN class="hljs-literal"&gt;null&lt;/SPAN&gt;;
                &lt;SPAN class="hljs-keyword"&gt;try&lt;/SPAN&gt; {
                     &lt;SPAN class="hljs-type"&gt;String&lt;/SPAN&gt; &lt;SPAN class="hljs-variable"&gt;res&lt;/SPAN&gt; &lt;SPAN class="hljs-operator"&gt;=&lt;/SPAN&gt; &lt;SPAN class="hljs-string"&gt;""&lt;/SPAN&gt;;
                     &lt;SPAN class="hljs-keyword"&gt;while&lt;/SPAN&gt; (i &amp;lt; rowLimit &amp;amp;&amp;amp; rs.next()) {
                           &lt;SPAN class="hljs-comment"&gt;//preprocessing happens here&lt;/SPAN&gt;
                            i++;
                            mainI.set(i);
                            out.write(preprocess results.toString().getBytes(StandardCharsets.UTF_8));
                       }
                    }&lt;SPAN class="hljs-keyword"&gt;catch&lt;/SPAN&gt; (SQLException e) {
                        e.printStackTrace();
                    }
                }
                logger.info(&lt;SPAN class="hljs-string"&gt;"gp-log -&amp;gt;"&lt;/SPAN&gt;+ (String.valueOf(i)));
                out.close();
            });


            &lt;SPAN class="hljs-type"&gt;FlowFile&lt;/SPAN&gt; &lt;SPAN class="hljs-variable"&gt;stateFlowFile&lt;/SPAN&gt; &lt;SPAN class="hljs-operator"&gt;=&lt;/SPAN&gt; session.create();
            session.putAttribute(stateFlowFile, &lt;SPAN class="hljs-string"&gt;"processing_status"&lt;/SPAN&gt;, &lt;SPAN class="hljs-string"&gt;"processing"&lt;/SPAN&gt;);
            session.putAttribute(stateFlowFile, &lt;SPAN class="hljs-string"&gt;"record_count"&lt;/SPAN&gt;, mainI.toString());
            session.transfer(stateFlowFile, GPReaderProcessorUtils.STATUS); &lt;SPAN class="hljs-comment"&gt;// state relationship&lt;/SPAN&gt;


            session.transfer(flowFile, GPReaderProcessorUtils.SUCCESS); &lt;SPAN class="hljs-comment"&gt;// preprocessed flow files returns&lt;/SPAN&gt;

            &lt;SPAN class="hljs-keyword"&gt;if&lt;/SPAN&gt;(!rs.isAfterLast() &amp;amp;&amp;amp; mainI != &lt;SPAN class="hljs-number"&gt;0&lt;/SPAN&gt;  &amp;amp;&amp;amp; !rs.isLast()){ &lt;SPAN class="hljs-comment"&gt;// recurrsion call&lt;/SPAN&gt;
                logger.info(&lt;SPAN class="hljs-string"&gt;"gp-log -&amp;gt; recursion call"&lt;/SPAN&gt; );
                process(rs, session,flowFileAttributes,column,rowLimit);
            }

        }&lt;SPAN class="hljs-keyword"&gt;catch&lt;/SPAN&gt; (Exception e){
            logger.info(e.getMessage());
            logger.error(e.getMessage());
      session.transfer(session.putAllAttributes(session.create(),flowFileAttributes), GPReaderProcessorUtils.FAILURE);
        }


    }&lt;/PRE&gt;&lt;P&gt;Expected Behaviour -&amp;gt; while processing this one return completed rows as flow files&lt;/P&gt;&lt;P&gt;Current Behaviour -&amp;gt; after finishing all return all flow files (generated in recursion) once.&lt;/P&gt;&lt;P&gt;please advise on this.&lt;/P&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Tue, 25 Oct 2022 15:34:07 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356130#M237203</guid>
      <dc:creator>D5ha</dc:creator>
      <dc:date>2022-10-25T15:34:07Z</dc:date>
    </item>
    <item>
      <title>Re: NIFI custom processor return multipleflow files in recursion</title>
      <link>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356147#M237205</link>
      <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/101194"&gt;@D5ha&lt;/a&gt;&amp;nbsp;I have had a recent similar need and I learned that you use session.commit() after a session.transfer to send a flowfile in an inner loop.&amp;nbsp; In a custom script, without the commit specifically, nifi will assume and do the commit sending all the data in a single end execution flowfile.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;session.transfer(flowFile, REL_SUCCESS)&lt;BR /&gt;session.commit()&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 25 Oct 2022 16:30:53 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356147#M237205</guid>
      <dc:creator>steven-matison</dc:creator>
      <dc:date>2022-10-25T16:30:53Z</dc:date>
    </item>
    <item>
      <title>Re: NIFI custom processor return multipleflow files in recursion</title>
      <link>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356172#M237208</link>
      <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/95503"&gt;@steven-matison&lt;/a&gt;&amp;nbsp;in my scenario there are 2 steps&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;each recursion call I create a flow file and query results write into that flow file&lt;/P&gt;&lt;P&gt;in the end, I need to create another new flow file also and put 2 attributes&lt;/P&gt;&lt;P&gt;and I need to transfer these 2 files into 2 relationships (status and success),&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;as per your answer, I put &lt;STRONG&gt;session.commit&lt;/STRONG&gt; at the end of every &lt;STRONG&gt;session.transfer&lt;/STRONG&gt; line. but now I'm getting an error called relationship is not specified but earlier it works well.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;new development is as below&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;            &lt;SPAN class="hljs-type"&gt;FlowFile&lt;/SPAN&gt; &lt;SPAN class="hljs-variable"&gt;stateFlowFile&lt;/SPAN&gt; &lt;SPAN class="hljs-operator"&gt;=&lt;/SPAN&gt; session.create();
            session.putAttribute(stateFlowFile, &lt;SPAN class="hljs-string"&gt;"processing_status"&lt;/SPAN&gt;, &lt;SPAN class="hljs-string"&gt;"processing"&lt;/SPAN&gt;);
            session.putAttribute(stateFlowFile, &lt;SPAN class="hljs-string"&gt;"record_count"&lt;/SPAN&gt;, mainI.toString());
            session.transfer(stateFlowFile, GPReaderProcessorUtils.STATUS); &lt;SPAN class="hljs-comment"&gt;// state relationship&lt;/SPAN&gt;
            session.commit()

            session.transfer(flowFile, GPReaderProcessorUtils.SUCCESS); &lt;SPAN class="hljs-comment"&gt;// preprocessed flow files returns&lt;BR /&gt;            session.commit()&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 25 Oct 2022 20:16:21 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356172#M237208</guid>
      <dc:creator>D5ha</dc:creator>
      <dc:date>2022-10-25T20:16:21Z</dc:date>
    </item>
    <item>
      <title>Re: NIFI custom processor return multipleflow files in recursion</title>
      <link>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356173#M237209</link>
      <description>&lt;P&gt;You will need to complete the session.commit() call with right details to fit your scenario.&amp;nbsp;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 25 Oct 2022 20:28:28 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356173#M237209</guid>
      <dc:creator>steven-matison</dc:creator>
      <dc:date>2022-10-25T20:28:28Z</dc:date>
    </item>
    <item>
      <title>Re: NIFI custom processor return multipleflow files in recursion</title>
      <link>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356174#M237210</link>
      <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/95503"&gt;@steven-matison&lt;/a&gt;&amp;nbsp;can you please explain more about this&lt;/P&gt;</description>
      <pubDate>Tue, 25 Oct 2022 20:37:07 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356174#M237210</guid>
      <dc:creator>D5ha</dc:creator>
      <dc:date>2022-10-25T20:37:07Z</dc:date>
    </item>
    <item>
      <title>Re: NIFI custom processor return multipleflow files in recursion</title>
      <link>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356176#M237211</link>
      <description>&lt;P&gt;share the relationship error?&amp;nbsp; &amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I believe you need to complete the flowFIle transfer and committ first,&amp;nbsp; then do the other stateFlowFile transfer and commit&lt;/P&gt;</description>
      <pubDate>Tue, 25 Oct 2022 20:46:30 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356176#M237211</guid>
      <dc:creator>steven-matison</dc:creator>
      <dc:date>2022-10-25T20:46:30Z</dc:date>
    </item>
    <item>
      <title>Re: NIFI custom processor return multipleflow files in recursion</title>
      <link>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356304#M237253</link>
      <description>&lt;P&gt;&lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/95503"&gt;@steven-matison&lt;/a&gt;&amp;nbsp;thanks for your answer it is working fine with my local environment,&lt;BR /&gt;but I have to deploy this into production and it is 4 node cluster,&lt;/P&gt;&lt;P&gt;when running this processor in the cluster we are getting below logs and suddenly node going out of the cluster&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;xxx.xx.xx:xxxx- warning - Response time from yyy.yy.yy:yyyy was slow for each last 3 requests made.

xxx.xx.xx:xxxx- warning - Response time from yyy.yy.yy:yyyy was slow for each last 3 requests made.

xxx.xx.xx:xxxx- warning - Response time from yyy.yy.yy:yyyy was slow for each last 3 requests made.
&lt;/LI-CODE&gt;&lt;P&gt;xxx -&amp;nbsp; primary node&amp;nbsp;&lt;/P&gt;&lt;P&gt;yyy - node which custom processor is running&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;please guide me on this.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 27 Oct 2022 13:13:50 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356304#M237253</guid>
      <dc:creator>D5ha</dc:creator>
      <dc:date>2022-10-27T13:13:50Z</dc:date>
    </item>
    <item>
      <title>Re: NIFI custom processor return multipleflow files in recursion</title>
      <link>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356387#M237280</link>
      <description>&lt;P&gt;Are you sure the code is the same?&amp;nbsp; &amp;nbsp;Sounds like the clustered version is kicking off job and its never finishing (endless loop?) or the task is creating some performance or connectivity issue..&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Depending on your processor,&amp;nbsp; it may need to be flagged as Primary Only.&lt;/P&gt;</description>
      <pubDate>Fri, 28 Oct 2022 12:16:02 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356387#M237280</guid>
      <dc:creator>steven-matison</dc:creator>
      <dc:date>2022-10-28T12:16:02Z</dc:date>
    </item>
    <item>
      <title>Re: NIFI custom processor return multipleflow files in recursion</title>
      <link>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356395#M237287</link>
      <description>&lt;P&gt;yes code is same for small tables it works fine also, here I need to query around (~200GB) data&lt;/P&gt;</description>
      <pubDate>Fri, 28 Oct 2022 13:56:49 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/NIFI-custom-processor-return-multipleflow-files-in-recursion/m-p/356395#M237287</guid>
      <dc:creator>dulanga</dc:creator>
      <dc:date>2022-10-28T13:56:49Z</dc:date>
    </item>
  </channel>
</rss>

