<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Re: Custom processor extended GenerateTableFetch in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Custom-processor-extended-GenerateTableFetch/m-p/228259#M63903</link>
    <description>&lt;P&gt;&lt;A rel="user" href="https://community.cloudera.com/users/16453/ilyal.html" nodeid="16453"&gt;@Ilya Li&lt;/A&gt; I agree with &lt;A rel="user" href="https://community.cloudera.com/users/363/bbende.html" nodeid="363"&gt;@Bryan Bende&lt;/A&gt; that the best approach is to refactor things such that shared classes are moved to something under nifi-extension-utils.  I did this mainly for ListAzureBlobStorage, since it used the AbstractListProcessor code.  You can take a look at &lt;A href="https://github.com/apache/nifi/pull/1719" target="_blank"&gt;https://github.com/apache/nifi/pull/1719&lt;/A&gt;, and take a look at the last four commits before the PR was merged to master, for an example of the refactoring.&lt;/P&gt;</description>
    <pubDate>Fri, 30 Jun 2017 01:41:08 GMT</pubDate>
    <dc:creator>jts</dc:creator>
    <dc:date>2017-06-30T01:41:08Z</dc:date>
    <item>
      <title>Custom processor extended GenerateTableFetch</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Custom-processor-extended-GenerateTableFetch/m-p/228257#M63901</link>
      <description>&lt;P&gt;Hi all,&lt;/P&gt;&lt;P&gt;I have strange issue with NoClassDefFoundError.
I get standard processor GenerateTableFetch and change it for my needs. But I always get error &lt;/P&gt;&lt;PRE&gt;java.lang.NoClassDefFoundError: org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
	at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
	at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
	at org.apache.nifi.nar.ExtensionManager.loadExtensions(ExtensionManager.java:138)
	at org.apache.nifi.nar.ExtensionManager.discoverExtensions(ExtensionManager.java:113)
	at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:699)
	at org.apache.nifi.NiFi.&amp;lt;init&amp;gt;(NiFi.java:160)
	at org.apache.nifi.NiFi.main(NiFi.java:267)
Caused by: java.lang.ClassNotFoundException: org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 21 common frames omitted


&lt;/PRE&gt;&lt;P&gt; I read all relevant answers here and in google, but without success.&lt;/P&gt;&lt;P&gt;Files:&lt;/P&gt;&lt;P&gt;nifi-teg-bundle/pom.xml&lt;/P&gt;&lt;PRE&gt;&amp;lt;?xml version="1.0" encoding="UTF-8"?&amp;gt;
&amp;lt;project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"&amp;gt;
    &amp;lt;modelVersion&amp;gt;4.0.0&amp;lt;/modelVersion&amp;gt;


    &amp;lt;parent&amp;gt;
        &amp;lt;groupId&amp;gt;org.apache.nifi&amp;lt;/groupId&amp;gt;
        &amp;lt;artifactId&amp;gt;nifi-nar-bundles&amp;lt;/artifactId&amp;gt;
        &amp;lt;version&amp;gt;1.2.0&amp;lt;/version&amp;gt;
    &amp;lt;/parent&amp;gt;


    &amp;lt;groupId&amp;gt;org.apache.nifi&amp;lt;/groupId&amp;gt;
    &amp;lt;artifactId&amp;gt;nifi-teg-bundle&amp;lt;/artifactId&amp;gt;
    &amp;lt;version&amp;gt;1.0&amp;lt;/version&amp;gt;
    &amp;lt;packaging&amp;gt;pom&amp;lt;/packaging&amp;gt;


    &amp;lt;build&amp;gt;
        &amp;lt;plugins&amp;gt;
            &amp;lt;plugin&amp;gt;
                &amp;lt;groupId&amp;gt;org.apache.nifi&amp;lt;/groupId&amp;gt;
                &amp;lt;artifactId&amp;gt;nifi-nar-maven-plugin&amp;lt;/artifactId&amp;gt;
                &amp;lt;version&amp;gt;1.2.0&amp;lt;/version&amp;gt;
                &amp;lt;extensions&amp;gt;true&amp;lt;/extensions&amp;gt;
            &amp;lt;/plugin&amp;gt;
          &amp;lt;/plugins&amp;gt;
    &amp;lt;/build&amp;gt;


    &amp;lt;modules&amp;gt;
        &amp;lt;module&amp;gt;nifi-teg-processors&amp;lt;/module&amp;gt;
        &amp;lt;module&amp;gt;nifi-teg-nar&amp;lt;/module&amp;gt;
    &amp;lt;/modules&amp;gt;


&amp;lt;/project&amp;gt;


&lt;/PRE&gt;&lt;P&gt;nifi-teg-bundle/nifi-teg-processors/pom.xml&lt;/P&gt;&lt;PRE&gt;&amp;lt;?xml version="1.0" encoding="UTF-8"?&amp;gt;
&amp;lt;project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"&amp;gt;
    &amp;lt;modelVersion&amp;gt;4.0.0&amp;lt;/modelVersion&amp;gt;


    &amp;lt;parent&amp;gt;
        &amp;lt;groupId&amp;gt;org.apache.nifi&amp;lt;/groupId&amp;gt;
        &amp;lt;artifactId&amp;gt;nifi-teg-bundle&amp;lt;/artifactId&amp;gt;
        &amp;lt;version&amp;gt;1.0&amp;lt;/version&amp;gt;
    &amp;lt;/parent&amp;gt;


    &amp;lt;artifactId&amp;gt;nifi-teg-processors&amp;lt;/artifactId&amp;gt;
    &amp;lt;packaging&amp;gt;jar&amp;lt;/packaging&amp;gt;


    &amp;lt;dependencies&amp;gt;
        &amp;lt;dependency&amp;gt;
            &amp;lt;groupId&amp;gt;org.apache.nifi&amp;lt;/groupId&amp;gt;
            &amp;lt;artifactId&amp;gt;nifi-api&amp;lt;/artifactId&amp;gt;
            &amp;lt;version&amp;gt;1.2.0&amp;lt;/version&amp;gt;
            &amp;lt;scope&amp;gt;provided&amp;lt;/scope&amp;gt;
        &amp;lt;/dependency&amp;gt;
        &amp;lt;dependency&amp;gt;
            &amp;lt;groupId&amp;gt;org.apache.nifi&amp;lt;/groupId&amp;gt;
            &amp;lt;artifactId&amp;gt;nifi-utils&amp;lt;/artifactId&amp;gt;
        &amp;lt;/dependency&amp;gt;
        &amp;lt;dependency&amp;gt;
            &amp;lt;groupId&amp;gt;org.apache.nifi&amp;lt;/groupId&amp;gt;
            &amp;lt;artifactId&amp;gt;nifi-processor-utils&amp;lt;/artifactId&amp;gt;
        &amp;lt;/dependency&amp;gt;
        &amp;lt;dependency&amp;gt;
            &amp;lt;groupId&amp;gt;org.apache.nifi&amp;lt;/groupId&amp;gt;
            &amp;lt;artifactId&amp;gt;nifi-standard-processors&amp;lt;/artifactId&amp;gt;
            &amp;lt;version&amp;gt;1.2.0&amp;lt;/version&amp;gt;
            &amp;lt;scope&amp;gt;provided&amp;lt;/scope&amp;gt;
        &amp;lt;/dependency&amp;gt;
        &amp;lt;dependency&amp;gt;
            &amp;lt;groupId&amp;gt;org.apache.nifi&amp;lt;/groupId&amp;gt;
            &amp;lt;artifactId&amp;gt;nifi-dbcp-service-api&amp;lt;/artifactId&amp;gt;
            &amp;lt;version&amp;gt;1.2.0&amp;lt;/version&amp;gt;
            &amp;lt;scope&amp;gt;provided&amp;lt;/scope&amp;gt;
        &amp;lt;/dependency&amp;gt;
        &amp;lt;dependency&amp;gt;
            &amp;lt;groupId&amp;gt;org.apache.nifi&amp;lt;/groupId&amp;gt;
            &amp;lt;artifactId&amp;gt;nifi-standard-utils&amp;lt;/artifactId&amp;gt;
            &amp;lt;version&amp;gt;1.2.0&amp;lt;/version&amp;gt;
            &amp;lt;scope&amp;gt;provided&amp;lt;/scope&amp;gt;
        &amp;lt;/dependency&amp;gt;
        &amp;lt;dependency&amp;gt;
            &amp;lt;groupId&amp;gt;org.apache.nifi&amp;lt;/groupId&amp;gt;
            &amp;lt;artifactId&amp;gt;nifi-mock&amp;lt;/artifactId&amp;gt;
            &amp;lt;scope&amp;gt;test&amp;lt;/scope&amp;gt;
        &amp;lt;/dependency&amp;gt;
        &amp;lt;dependency&amp;gt;
            &amp;lt;groupId&amp;gt;org.slf4j&amp;lt;/groupId&amp;gt;
            &amp;lt;artifactId&amp;gt;slf4j-simple&amp;lt;/artifactId&amp;gt;
            &amp;lt;scope&amp;gt;test&amp;lt;/scope&amp;gt;
        &amp;lt;/dependency&amp;gt;
        &amp;lt;dependency&amp;gt;
            &amp;lt;groupId&amp;gt;junit&amp;lt;/groupId&amp;gt;
            &amp;lt;artifactId&amp;gt;junit&amp;lt;/artifactId&amp;gt;
            &amp;lt;scope&amp;gt;test&amp;lt;/scope&amp;gt;
        &amp;lt;/dependency&amp;gt;
    &amp;lt;/dependencies&amp;gt;
&amp;lt;/project&amp;gt;


&lt;/PRE&gt;&lt;P&gt;nifi-teg-bundle/nifi-teg-nar/pom.xml&lt;/P&gt;&lt;PRE&gt;&amp;lt;?xml version="1.0" encoding="UTF-8"?&amp;gt;
&amp;lt;project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"&amp;gt;
    &amp;lt;modelVersion&amp;gt;4.0.0&amp;lt;/modelVersion&amp;gt;


    &amp;lt;parent&amp;gt;
        &amp;lt;groupId&amp;gt;org.apache.nifi&amp;lt;/groupId&amp;gt;
        &amp;lt;artifactId&amp;gt;nifi-teg-bundle&amp;lt;/artifactId&amp;gt;
        &amp;lt;version&amp;gt;1.0&amp;lt;/version&amp;gt;
    &amp;lt;/parent&amp;gt;


    &amp;lt;artifactId&amp;gt;nifi-teg-nar&amp;lt;/artifactId&amp;gt;
    &amp;lt;version&amp;gt;1.0&amp;lt;/version&amp;gt;
    &amp;lt;packaging&amp;gt;nar&amp;lt;/packaging&amp;gt;
    &amp;lt;properties&amp;gt;
        &amp;lt;maven.javadoc.skip&amp;gt;true&amp;lt;/maven.javadoc.skip&amp;gt;
        &amp;lt;source.skip&amp;gt;true&amp;lt;/source.skip&amp;gt;
    &amp;lt;/properties&amp;gt;


    &amp;lt;dependencies&amp;gt;
        &amp;lt;dependency&amp;gt;
            &amp;lt;groupId&amp;gt;org.apache.nifi&amp;lt;/groupId&amp;gt;
            &amp;lt;artifactId&amp;gt;nifi-standard-services-api-nar&amp;lt;/artifactId&amp;gt;
            &amp;lt;type&amp;gt;nar&amp;lt;/type&amp;gt;
        &amp;lt;/dependency&amp;gt;
        &amp;lt;dependency&amp;gt;
            &amp;lt;groupId&amp;gt;org.apache.nifi&amp;lt;/groupId&amp;gt;
            &amp;lt;artifactId&amp;gt;nifi-standard-processors&amp;lt;/artifactId&amp;gt;
            &amp;lt;version&amp;gt;1.2.0&amp;lt;/version&amp;gt;
            &amp;lt;scope&amp;gt;provided&amp;lt;/scope&amp;gt;
        &amp;lt;/dependency&amp;gt;
        &amp;lt;dependency&amp;gt;
            &amp;lt;groupId&amp;gt;org.apache.nifi&amp;lt;/groupId&amp;gt;
            &amp;lt;artifactId&amp;gt;nifi-teg-processors&amp;lt;/artifactId&amp;gt;
            &amp;lt;version&amp;gt;1.0&amp;lt;/version&amp;gt;
        &amp;lt;/dependency&amp;gt;
    &amp;lt;/dependencies&amp;gt;
&amp;lt;/project&amp;gt;
&lt;/PRE&gt;&lt;P&gt;nifi-teg-bundle/nifi-teg-processors/src/main/java/org/apache/nifi/processors/teg/MyProcessor.java&lt;/P&gt;&lt;PRE&gt;package org.apache.nifi.processors.teg;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor;
import org.apache.nifi.processors.standard.QueryDatabaseTable;
import org.apache.nifi.processors.standard.ExecuteSQL;
import org.apache.nifi.processors.standard.ListDatabaseTables;


import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;




@TriggerSerially
@InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"sql", "select", "jdbc", "query", "database", "fetch", "generate"})
@SeeAlso({QueryDatabaseTable.class, ExecuteSQL.class, ListDatabaseTables.class})
@CapabilityDescription("Generates SQL select queries that fetch \"pages\" of rows from a table. The partition size property, along with the table's row count, "
        + "determine the size and number of pages and generated FlowFiles. In addition, incremental fetching can be achieved by setting Maximum-Value Columns, "
        + "which causes the processor to track the columns' maximum values, thus only fetching rows whose columns' values exceed the observed maximums. This "
        + "processor is intended to be run on the Primary Node only.\n\n"
        + "This processor can accept incoming connections; the behavior of the processor is different whether incoming connections are provided:\n"
        + "  - If no incoming connection(s) are specified, the processor will generate SQL queries on the specified processor schedule. Expression Language is supported for many "
        + "fields, but no flow file attributes are available. However the properties will be evaluated using the Variable Registry.\n"
        + "  - If incoming connection(s) are specified and no flow file is available to a processor task, no work will be performed.\n"
        + "  - If incoming connection(s) are specified and a flow file is available to a processor task, the flow file's attributes may be used in Expression Language for such fields "
        + "as Table Name and others. However, the Max-Value Columns and Columns to Return fields must be empty or refer to columns that are available in each specified table.")
@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for "
        + "the specified column(s) will be retained for use in future executions of the query. This allows the Processor "
        + "to fetch only those records that have max values greater than the retained values. This can be used for "
        + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
        + "per the State Management documentation")
@WritesAttributes({
        @WritesAttribute(attribute = "generatetablefetch.sql.error", description = "If the processor has incoming connections, and processing an incoming flow file causes "
        + "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message."),
        @WritesAttribute(attribute = "generatetablefetch.tableName", description = "The name of the database table to be queried."),
        @WritesAttribute(attribute = "generatetablefetch.columnNames", description = "The comma-separated list of column names used in the query."),
        @WritesAttribute(attribute = "generatetablefetch.whereClause", description = "Where clause used in the query to get the expected rows."),
        @WritesAttribute(attribute = "generatetablefetch.maxColumnNames", description = "The comma-separated list of column names used to keep track of data "
                    + "that has been returned since the processor started running."),
        @WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."),
        @WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.")
})
public class MyProcessor extends AbstractDatabaseFetchProcessor {


    public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder()
            .name("gen-table-fetch-partition-size")
            .displayName("Partition Size")
            .description("The number of result rows to be fetched by each generated SQL statement. The total number of rows in "
                    + "the table divided by the partition size gives the number of SQL statements (i.e. FlowFiles) generated. A "
                    + "value of zero indicates that a single FlowFile is to be generated whose SQL statement will fetch all rows "
                    + "in the table.")
            .defaultValue("10000")
            .required(true)
            .expressionLanguageSupported(true)
            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
            .build();
    public static final PropertyDescriptor MSSQL_ISOLATION_LEVEL = new PropertyDescriptor.Builder()
            .name("gen-table-fetch-isolation-level")
            .displayName("Isolation Level")
            .description("MSSQL Isolation level for transactions")
            .required(false)
            .expressionLanguageSupported(true)
            .build();
    
    public static final PropertyDescriptor RIGHT_BOUND_WHERE = new PropertyDescriptor.Builder()
             .name("gen-table-fetch-right-bounded")
             .displayName("Right Bounded")
             .description("Whether to include the new max value(s) as a right hand boundary in the where statement. "
                     + "If this is set to false duplicate data may be returned by consecutive executions of the "
                     + "processor if there are deletions or if the table is high volume.")
             .allowableValues("true", "false")
             .defaultValue("false")
             .required(true)
             .build();
    
    public static final Relationship REL_FAILURE = new Relationship.Builder()
            .name("failure")
            .description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. "
                    + "If no incoming connection(s) are specified, this relationship is unused.")
            .build();


    public MyProcessor() {
        final Set&amp;lt;Relationship&amp;gt; r = new HashSet&amp;lt;&amp;gt;();
        r.add(REL_SUCCESS);
        r.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(r);


        final List&amp;lt;PropertyDescriptor&amp;gt; pds = new ArrayList&amp;lt;&amp;gt;();
        pds.add(DBCP_SERVICE);
        pds.add(DB_TYPE);
        pds.add(TABLE_NAME);
        pds.add(COLUMN_NAMES);
        pds.add(RIGHT_BOUND_WHERE);
        pds.add(MAX_VALUE_COLUMN_NAMES);
        pds.add(QUERY_TIMEOUT);
        pds.add(PARTITION_SIZE);
        pds.add(MSSQL_ISOLATION_LEVEL);
        propDescriptors = Collections.unmodifiableList(pds);
    }


    @Override
    public Set&amp;lt;Relationship&amp;gt; getRelationships() {
        return relationships;
    }


    @Override
    protected List&amp;lt;PropertyDescriptor&amp;gt; getSupportedPropertyDescriptors() {
        return propDescriptors;
    }


    @Override
    protected Collection&amp;lt;ValidationResult&amp;gt; customValidate(ValidationContext validationContext) {
        return super.customValidate(validationContext);
    }


    @Override
    @OnScheduled
    public void setup(final ProcessContext context) {
        // Pre-fetch the column types if using a static table name and max-value columns
        if (!isDynamicTableName &amp;amp;&amp;amp; !isDynamicMaxValues) {
            super.setup(context);
        }
    }


    @Override
    public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSession session = sessionFactory.createSession();


        FlowFile fileToProcess = null;
        if (context.hasIncomingConnection()) {
            fileToProcess = session.get();


            if (fileToProcess == null) {
                // Incoming connection with no flow file available, do no work (see capability description)
                return;
            }
        }


        final ComponentLog logger = getLogger();


        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
        final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(fileToProcess).getValue();
        final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
        final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
        final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
        final String isolationLevel = context.getProperty(MSSQL_ISOLATION_LEVEL).evaluateAttributeExpressions(fileToProcess).getValue();
        final Boolean rightBounded = context.getProperty(RIGHT_BOUND_WHERE).asBoolean();


        final StateManager stateManager = context.getStateManager();
        final StateMap stateMap;


        try {
            stateMap = stateManager.getState(Scope.CLUSTER);
        } catch (final IOException ioe) {
            logger.error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
                    + "query until this is accomplished.", ioe);
            context.yield();
            return;
        }
        try {
            // Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
            // set as the current state map (after the session has been committed)
            final Map&amp;lt;String, String&amp;gt; statePropertyMap = new HashMap&amp;lt;&amp;gt;(stateMap.toMap());


            // Build a WHERE clause with maximum-value columns (if they exist), and a list of column names that will contain MAX(&amp;lt;column&amp;gt;) aliases. The
            // executed SQL query will retrieve the count of all records after the filter(s) have been applied, as well as the new maximum values for the
            // specified columns. This allows the processor to generate the correctly partitioned SQL statements as well as to update the state with the
            // latest observed maximum values.
            String whereClause = null;
            List&amp;lt;String&amp;gt; maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
                    ? new ArrayList&amp;lt;&amp;gt;(0)
                    : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
            List&amp;lt;String&amp;gt; maxValueClauses = new ArrayList&amp;lt;&amp;gt;(maxValueColumnNameList.size());


            String columnsClause = null;
            List&amp;lt;String&amp;gt; maxValueSelectColumns = new ArrayList&amp;lt;&amp;gt;(maxValueColumnNameList.size() + 1);
            maxValueSelectColumns.add("COUNT(*)");


            // For each maximum-value column, get a WHERE filter and a MAX(column) alias
            IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -&amp;gt; {
                String colName = maxValueColumnNameList.get(index);
                maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
//                final String fullyQualifiedStateKey = getStateKey(tableName, colName);
//                String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
//                if (StringUtils.isEmpty(maxValue) &amp;amp;&amp;amp; !isDynamicTableName) {
//                    // If the table name is static and the fully-qualified key was not found, try just the column name
//                    maxValue = statePropertyMap.get(getStateKey(null, colName));
//                }
                String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
                
                if (!StringUtils.isEmpty(maxValue)) {
//                    Integer type = columnTypeMap.get(fullyQualifiedStateKey);
//                    if (type == null &amp;amp;&amp;amp; !isDynamicTableName) {
//                        // If the table name is static and the fully-qualified key was not found, try just the column name
//                        type = columnTypeMap.get(getStateKey(null, colName));
//                    }
//                    if (type == null) {
//                        // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed
//                        throw new IllegalArgumentException("No column type found for: " + colName);
//                    }
                    Integer type = getColumnType(tableName, colName);
                    // Add a condition for the WHERE clause
                    maxValueClauses.add(colName + (index == 0 ? " &amp;gt; " : " &amp;gt;= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
                }
            });


            whereClause = StringUtils.join(maxValueClauses, " AND ");
            columnsClause = StringUtils.join(maxValueSelectColumns, ", ");


            // Build a SELECT query with maximum-value columns (if present)
            final String selectQuery = dbAdapter.getSelectStatement(tableName, columnsClause, whereClause, null, null, null);
            long rowCount = 0;
            String isolationLevelQuery = "";


            try (final Connection con = dbcpService.getConnection();
                final Statement st = con.createStatement()) {


                final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
                st.setQueryTimeout(queryTimeout); // timeout in seconds
                
                if(isolationLevel!= null){
                    isolationLevelQuery = String.format("SET TRANSACTION ISOLATION LEVEL %s;", isolationLevel);
                }


                logger.debug("Executing {}", new Object[]{selectQuery});
                ResultSet resultSet;
                
                resultSet = st.executeQuery(isolationLevelQuery.concat(selectQuery));


                if (resultSet.next()) {
                    // Total row count is in the first column
                    rowCount = resultSet.getLong(1);


                    // Update the state map with the newly-observed maximum values
                    ResultSetMetaData rsmd = resultSet.getMetaData();
                    for (int i = 2; i &amp;lt;= rsmd.getColumnCount(); i++) {
                        //Some JDBC drivers consider the columns name and label to be very different things.
                        // Since this column has been aliased lets check the label first,
                        // if there is no label we'll use the column name.
                        String resultColumnName = (StringUtils.isNotEmpty(rsmd.getColumnLabel(i))?rsmd.getColumnLabel(i):rsmd.getColumnName(i)).toLowerCase();
                        String fullyQualifiedStateKey = getStateKey(tableName, resultColumnName);
                        String resultColumnCurrentMax = statePropertyMap.get(fullyQualifiedStateKey);
                        if (StringUtils.isEmpty(resultColumnCurrentMax) &amp;amp;&amp;amp; !isDynamicTableName) {
                            // If we can't find the value at the fully-qualified key name and the table name is static, it is possible (under a previous scheme)
                            // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
                            // maximum value is observed, it will be stored under the fully-qualified key from then on.
                            resultColumnCurrentMax = statePropertyMap.get(resultColumnName);
                        }


                        int type = rsmd.getColumnType(i);
                        if (isDynamicTableName) {
                            // We haven't pre-populated the column type map if the table name is dynamic, so do it here
                            columnTypeMap.put(fullyQualifiedStateKey, type);
                        }
                        try {
                            String newMaxValue = getMaxValueFromRow(resultSet, i, type, resultColumnCurrentMax, dbAdapter.getName());
                            if (newMaxValue != null) {
                                statePropertyMap.put(fullyQualifiedStateKey, newMaxValue);
                            }
                        } catch (ParseException | IOException pie) {
                            // Fail the whole thing here before we start creating flow files and such
                            throw new ProcessException(pie);
                        }


                    }
                } else {
                    // Something is very wrong here, one row (even if count is zero) should be returned
                    throw new SQLException("No rows returned from metadata query: " + selectQuery);
                }
                
                // If Right Bounded, for each maximum-value column get a right bounding WHERE condition
                if(rightBounded){
                    IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -&amp;gt; {
                         String colName = maxValueColumnNameList.get(index);


                        maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
                        String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
                        if (!StringUtils.isEmpty(maxValue)) {
                            Integer type = getColumnType(tableName, colName);


                            // Add a condition for the WHERE clause
                            maxValueClauses.add(colName + " &amp;lt;= " + getLiteralByType(type, maxValue, dbAdapter.getName()));
                        }
                    });


                    //Update WHERE list to include new right hand boundaries
                    whereClause = StringUtils.join(maxValueClauses, " AND ");
                }
                
                final long numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);


                // Generate SQL statements to read "pages" of data
                for (long i = 0; i &amp;lt; numberOfFetches; i++) {
                    long limit = partitionSize == 0 ? null : partitionSize;
                    long offset = partitionSize == 0 ? null : i * partitionSize;
                    final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", ");
                    final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset);
                    FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
                    sqlFlowFile = session.write(sqlFlowFile, out -&amp;gt; out.write(query.getBytes()));
                    sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.tableName", tableName);
                    if (columnNames != null) {
                        sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.columnNames", columnNames);
                    }
                    if (StringUtils.isNotBlank(whereClause)) {
                        sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.whereClause", whereClause);
                    }
                    if (StringUtils.isNotBlank(maxColumnNames)) {
                        sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.maxColumnNames", maxColumnNames);
                    }
                    sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.limit", String.valueOf(limit));
                    if (partitionSize != 0) {
                        sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.offset", String.valueOf(offset));
                    }
                    session.transfer(sqlFlowFile, REL_SUCCESS);
                }


                if (fileToProcess != null) {
                    session.remove(fileToProcess);
                }
            } catch (SQLException e) {
                if (fileToProcess != null) {
                    logger.error("Unable to execute SQL select query {} due to {}, routing {} to failure", new Object[]{selectQuery, e, fileToProcess});
                    fileToProcess = session.putAttribute(fileToProcess, "generatetablefetch.sql.error", e.getMessage());
                    session.transfer(fileToProcess, REL_FAILURE);


                } else {
                    logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
                    throw new ProcessException(e);
                }
            }


            session.commit();
            try {
                // Update the state
                stateManager.setState(statePropertyMap, Scope.CLUSTER);
            } catch (IOException ioe) {
                logger.error("{} failed to update State Manager, observed maximum values will not be recorded. "
                                + "Also, any generated SQL statements may be duplicated.",
                        new Object[]{this, ioe});
            }
        } catch (final ProcessException pe) {
            // Log the cause of the ProcessException if it is available
            Throwable t = (pe.getCause() == null ? pe : pe.getCause());
            logger.error("Error during processing: {}", new Object[]{t.getMessage()}, t);
            session.rollback();
            context.yield();
        }
    }
    
    private String getColumnStateMaxValue(String tableName, Map&amp;lt;String, String&amp;gt; statePropertyMap, String colName) {
        final String fullyQualifiedStateKey = getStateKey(tableName, colName);
        String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
        if (StringUtils.isEmpty(maxValue) &amp;amp;&amp;amp; !isDynamicTableName) {
            // If the table name is static and the fully-qualified key was not found, try just the column name
            maxValue = statePropertyMap.get(getStateKey(null, colName));
        }


        return maxValue;
    }


    private Integer getColumnType(String tableName, String colName) {
        final String fullyQualifiedStateKey = getStateKey(tableName, colName);
        Integer type = columnTypeMap.get(fullyQualifiedStateKey);
        if (type == null &amp;amp;&amp;amp; !isDynamicTableName) {
            // If the table name is static and the fully-qualified key was not found, try just the column name
            type = columnTypeMap.get(getStateKey(null, colName));
        }
        if (type == null) {
            // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed
            throw new IllegalArgumentException("No column type found for: " + colName);
        }


        return type;
    }
}


&lt;/PRE&gt;&lt;P&gt;
I thought what I added all possible dependencies.&lt;/P&gt;&lt;P&gt;Anybody please help me.&lt;/P&gt;&lt;P&gt;Thanks&lt;/P&gt;</description>
      <pubDate>Thu, 29 Jun 2017 16:14:15 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Custom-processor-extended-GenerateTableFetch/m-p/228257#M63901</guid>
      <dc:creator>ilyal</dc:creator>
      <dc:date>2017-06-29T16:14:15Z</dc:date>
    </item>
    <item>
      <title>Re: Custom processor extended GenerateTableFetch</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Custom-processor-extended-GenerateTableFetch/m-p/228258#M63902</link>
      <description>&lt;P&gt;The reason this isn't working is because AbstractDatabaseFetchProcessor is in nifi-standard-nar which is not on the classpath of your custom NAR at runtime.&lt;/P&gt;&lt;P&gt;You could add a NAR dependency in your NAR pom on nifi-standard-nar (you already have the dependency but its currently marked provided):&lt;/P&gt;&lt;PRE&gt;&amp;lt;dependency&amp;gt;
  &amp;lt;groupId&amp;gt;org.apache.nifi&amp;lt;/groupId&amp;gt;
  &amp;lt;artifactId&amp;gt;nifi-standard-processors&amp;lt;/artifactId&amp;gt;
  &amp;lt;version&amp;gt;1.2.0&amp;lt;/version&amp;gt;
  &amp;lt;type&amp;gt;nar&amp;lt;/type&amp;gt;
&amp;lt;/dependency&amp;gt;&lt;/PRE&gt;&lt;P&gt;However, the better approach here would be to refactor things such that AbstractDatabaseFetchProcessor lived inside of some utility JAR that could be re-used by standard NAR and your NAR. There could be a nifi-db-utils module here: &lt;/P&gt;&lt;P&gt;&lt;A href="https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-extension-utils" target="_blank"&gt;https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-extension-utils&lt;/A&gt;&lt;/P&gt;&lt;P&gt;That would be a cleaner approach and follow the pattern used for other abstract processors.&lt;/P&gt;</description>
      <pubDate>Fri, 30 Jun 2017 00:06:26 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Custom-processor-extended-GenerateTableFetch/m-p/228258#M63902</guid>
      <dc:creator>bbende</dc:creator>
      <dc:date>2017-06-30T00:06:26Z</dc:date>
    </item>
    <item>
      <title>Re: Custom processor extended GenerateTableFetch</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Custom-processor-extended-GenerateTableFetch/m-p/228259#M63903</link>
      <description>&lt;P&gt;&lt;A rel="user" href="https://community.cloudera.com/users/16453/ilyal.html" nodeid="16453"&gt;@Ilya Li&lt;/A&gt; I agree with &lt;A rel="user" href="https://community.cloudera.com/users/363/bbende.html" nodeid="363"&gt;@Bryan Bende&lt;/A&gt; that the best approach is to refactor things such that shared classes are moved to something under nifi-extension-utils.  I did this mainly for ListAzureBlobStorage, since it used the AbstractListProcessor code.  You can take a look at &lt;A href="https://github.com/apache/nifi/pull/1719" target="_blank"&gt;https://github.com/apache/nifi/pull/1719&lt;/A&gt;, and take a look at the last four commits before the PR was merged to master, for an example of the refactoring.&lt;/P&gt;</description>
      <pubDate>Fri, 30 Jun 2017 01:41:08 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Custom-processor-extended-GenerateTableFetch/m-p/228259#M63903</guid>
      <dc:creator>jts</dc:creator>
      <dc:date>2017-06-30T01:41:08Z</dc:date>
    </item>
    <item>
      <title>Re: Custom processor extended GenerateTableFetch</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Custom-processor-extended-GenerateTableFetch/m-p/228260#M63904</link>
      <description>&lt;A rel="user" href="https://community.cloudera.com/users/363/bbende.html" nodeid="363"&gt;@Bryan Bende&lt;/A&gt;&lt;P&gt; Thank you very much. This type works. My next step will be refactoring.&lt;/P&gt;&lt;A rel="user" href="https://community.cloudera.com/users/363/bbende.html" nodeid="363"&gt;&lt;/A&gt;&lt;P style="display: inline !important;"&gt;&lt;/P&gt;</description>
      <pubDate>Sun, 02 Jul 2017 21:37:49 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Custom-processor-extended-GenerateTableFetch/m-p/228260#M63904</guid>
      <dc:creator>ilyal</dc:creator>
      <dc:date>2017-07-02T21:37:49Z</dc:date>
    </item>
  </channel>
</rss>

