Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Custom processor extended GenerateTableFetch

Solved Go to solution

Custom processor extended GenerateTableFetch

Contributor

Hi all,

I have strange issue with NoClassDefFoundError. I get standard processor GenerateTableFetch and change it for my needs. But I always get error

java.lang.NoClassDefFoundError: org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
	at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
	at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
	at org.apache.nifi.nar.ExtensionManager.loadExtensions(ExtensionManager.java:138)
	at org.apache.nifi.nar.ExtensionManager.discoverExtensions(ExtensionManager.java:113)
	at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:699)
	at org.apache.nifi.NiFi.<init>(NiFi.java:160)
	at org.apache.nifi.NiFi.main(NiFi.java:267)
Caused by: java.lang.ClassNotFoundException: org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 21 common frames omitted


I read all relevant answers here and in google, but without success.

Files:

nifi-teg-bundle/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>


    <parent>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-nar-bundles</artifactId>
        <version>1.2.0</version>
    </parent>


    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-teg-bundle</artifactId>
    <version>1.0</version>
    <packaging>pom</packaging>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.nifi</groupId>
                <artifactId>nifi-nar-maven-plugin</artifactId>
                <version>1.2.0</version>
                <extensions>true</extensions>
            </plugin>
          </plugins>
    </build>


    <modules>
        <module>nifi-teg-processors</module>
        <module>nifi-teg-nar</module>
    </modules>


</project>


nifi-teg-bundle/nifi-teg-processors/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>


    <parent>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-teg-bundle</artifactId>
        <version>1.0</version>
    </parent>


    <artifactId>nifi-teg-processors</artifactId>
    <packaging>jar</packaging>


    <dependencies>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-api</artifactId>
            <version>1.2.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-utils</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-processor-utils</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-standard-processors</artifactId>
            <version>1.2.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-dbcp-service-api</artifactId>
            <version>1.2.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-standard-utils</artifactId>
            <version>1.2.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-mock</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>


nifi-teg-bundle/nifi-teg-nar/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>


    <parent>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-teg-bundle</artifactId>
        <version>1.0</version>
    </parent>


    <artifactId>nifi-teg-nar</artifactId>
    <version>1.0</version>
    <packaging>nar</packaging>
    <properties>
        <maven.javadoc.skip>true</maven.javadoc.skip>
        <source.skip>true</source.skip>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-standard-services-api-nar</artifactId>
            <type>nar</type>
        </dependency>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-standard-processors</artifactId>
            <version>1.2.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-teg-processors</artifactId>
            <version>1.0</version>
        </dependency>
    </dependencies>
</project>

nifi-teg-bundle/nifi-teg-processors/src/main/java/org/apache/nifi/processors/teg/MyProcessor.java

package org.apache.nifi.processors.teg;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor;
import org.apache.nifi.processors.standard.QueryDatabaseTable;
import org.apache.nifi.processors.standard.ExecuteSQL;
import org.apache.nifi.processors.standard.ListDatabaseTables;


import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;




@TriggerSerially
@InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"sql", "select", "jdbc", "query", "database", "fetch", "generate"})
@SeeAlso({QueryDatabaseTable.class, ExecuteSQL.class, ListDatabaseTables.class})
@CapabilityDescription("Generates SQL select queries that fetch \"pages\" of rows from a table. The partition size property, along with the table's row count, "
        + "determine the size and number of pages and generated FlowFiles. In addition, incremental fetching can be achieved by setting Maximum-Value Columns, "
        + "which causes the processor to track the columns' maximum values, thus only fetching rows whose columns' values exceed the observed maximums. This "
        + "processor is intended to be run on the Primary Node only.\n\n"
        + "This processor can accept incoming connections; the behavior of the processor is different whether incoming connections are provided:\n"
        + "  - If no incoming connection(s) are specified, the processor will generate SQL queries on the specified processor schedule. Expression Language is supported for many "
        + "fields, but no flow file attributes are available. However the properties will be evaluated using the Variable Registry.\n"
        + "  - If incoming connection(s) are specified and no flow file is available to a processor task, no work will be performed.\n"
        + "  - If incoming connection(s) are specified and a flow file is available to a processor task, the flow file's attributes may be used in Expression Language for such fields "
        + "as Table Name and others. However, the Max-Value Columns and Columns to Return fields must be empty or refer to columns that are available in each specified table.")
@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for "
        + "the specified column(s) will be retained for use in future executions of the query. This allows the Processor "
        + "to fetch only those records that have max values greater than the retained values. This can be used for "
        + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
        + "per the State Management documentation")
@WritesAttributes({
        @WritesAttribute(attribute = "generatetablefetch.sql.error", description = "If the processor has incoming connections, and processing an incoming flow file causes "
        + "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message."),
        @WritesAttribute(attribute = "generatetablefetch.tableName", description = "The name of the database table to be queried."),
        @WritesAttribute(attribute = "generatetablefetch.columnNames", description = "The comma-separated list of column names used in the query."),
        @WritesAttribute(attribute = "generatetablefetch.whereClause", description = "Where clause used in the query to get the expected rows."),
        @WritesAttribute(attribute = "generatetablefetch.maxColumnNames", description = "The comma-separated list of column names used to keep track of data "
                    + "that has been returned since the processor started running."),
        @WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."),
        @WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.")
})
public class MyProcessor extends AbstractDatabaseFetchProcessor {


    public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder()
            .name("gen-table-fetch-partition-size")
            .displayName("Partition Size")
            .description("The number of result rows to be fetched by each generated SQL statement. The total number of rows in "
                    + "the table divided by the partition size gives the number of SQL statements (i.e. FlowFiles) generated. A "
                    + "value of zero indicates that a single FlowFile is to be generated whose SQL statement will fetch all rows "
                    + "in the table.")
            .defaultValue("10000")
            .required(true)
            .expressionLanguageSupported(true)
            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
            .build();
    public static final PropertyDescriptor MSSQL_ISOLATION_LEVEL = new PropertyDescriptor.Builder()
            .name("gen-table-fetch-isolation-level")
            .displayName("Isolation Level")
            .description("MSSQL Isolation level for transactions")
            .required(false)
            .expressionLanguageSupported(true)
            .build();
    
    public static final PropertyDescriptor RIGHT_BOUND_WHERE = new PropertyDescriptor.Builder()
             .name("gen-table-fetch-right-bounded")
             .displayName("Right Bounded")
             .description("Whether to include the new max value(s) as a right hand boundary in the where statement. "
                     + "If this is set to false duplicate data may be returned by consecutive executions of the "
                     + "processor if there are deletions or if the table is high volume.")
             .allowableValues("true", "false")
             .defaultValue("false")
             .required(true)
             .build();
    
    public static final Relationship REL_FAILURE = new Relationship.Builder()
            .name("failure")
            .description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. "
                    + "If no incoming connection(s) are specified, this relationship is unused.")
            .build();


    public MyProcessor() {
        final Set<Relationship> r = new HashSet<>();
        r.add(REL_SUCCESS);
        r.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(r);


        final List<PropertyDescriptor> pds = new ArrayList<>();
        pds.add(DBCP_SERVICE);
        pds.add(DB_TYPE);
        pds.add(TABLE_NAME);
        pds.add(COLUMN_NAMES);
        pds.add(RIGHT_BOUND_WHERE);
        pds.add(MAX_VALUE_COLUMN_NAMES);
        pds.add(QUERY_TIMEOUT);
        pds.add(PARTITION_SIZE);
        pds.add(MSSQL_ISOLATION_LEVEL);
        propDescriptors = Collections.unmodifiableList(pds);
    }


    @Override
    public Set<Relationship> getRelationships() {
        return relationships;
    }


    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propDescriptors;
    }


    @Override
    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        return super.customValidate(validationContext);
    }


    @Override
    @OnScheduled
    public void setup(final ProcessContext context) {
        // Pre-fetch the column types if using a static table name and max-value columns
        if (!isDynamicTableName && !isDynamicMaxValues) {
            super.setup(context);
        }
    }


    @Override
    public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSession session = sessionFactory.createSession();


        FlowFile fileToProcess = null;
        if (context.hasIncomingConnection()) {
            fileToProcess = session.get();


            if (fileToProcess == null) {
                // Incoming connection with no flow file available, do no work (see capability description)
                return;
            }
        }


        final ComponentLog logger = getLogger();


        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
        final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(fileToProcess).getValue();
        final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
        final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
        final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
        final String isolationLevel = context.getProperty(MSSQL_ISOLATION_LEVEL).evaluateAttributeExpressions(fileToProcess).getValue();
        final Boolean rightBounded = context.getProperty(RIGHT_BOUND_WHERE).asBoolean();


        final StateManager stateManager = context.getStateManager();
        final StateMap stateMap;


        try {
            stateMap = stateManager.getState(Scope.CLUSTER);
        } catch (final IOException ioe) {
            logger.error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
                    + "query until this is accomplished.", ioe);
            context.yield();
            return;
        }
        try {
            // Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
            // set as the current state map (after the session has been committed)
            final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());


            // Build a WHERE clause with maximum-value columns (if they exist), and a list of column names that will contain MAX(<column>) aliases. The
            // executed SQL query will retrieve the count of all records after the filter(s) have been applied, as well as the new maximum values for the
            // specified columns. This allows the processor to generate the correctly partitioned SQL statements as well as to update the state with the
            // latest observed maximum values.
            String whereClause = null;
            List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
                    ? new ArrayList<>(0)
                    : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
            List<String> maxValueClauses = new ArrayList<>(maxValueColumnNameList.size());


            String columnsClause = null;
            List<String> maxValueSelectColumns = new ArrayList<>(maxValueColumnNameList.size() + 1);
            maxValueSelectColumns.add("COUNT(*)");


            // For each maximum-value column, get a WHERE filter and a MAX(column) alias
            IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
                String colName = maxValueColumnNameList.get(index);
                maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
//                final String fullyQualifiedStateKey = getStateKey(tableName, colName);
//                String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
//                if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) {
//                    // If the table name is static and the fully-qualified key was not found, try just the column name
//                    maxValue = statePropertyMap.get(getStateKey(null, colName));
//                }
                String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
                
                if (!StringUtils.isEmpty(maxValue)) {
//                    Integer type = columnTypeMap.get(fullyQualifiedStateKey);
//                    if (type == null && !isDynamicTableName) {
//                        // If the table name is static and the fully-qualified key was not found, try just the column name
//                        type = columnTypeMap.get(getStateKey(null, colName));
//                    }
//                    if (type == null) {
//                        // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed
//                        throw new IllegalArgumentException("No column type found for: " + colName);
//                    }
                    Integer type = getColumnType(tableName, colName);
                    // Add a condition for the WHERE clause
                    maxValueClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
                }
            });


            whereClause = StringUtils.join(maxValueClauses, " AND ");
            columnsClause = StringUtils.join(maxValueSelectColumns, ", ");


            // Build a SELECT query with maximum-value columns (if present)
            final String selectQuery = dbAdapter.getSelectStatement(tableName, columnsClause, whereClause, null, null, null);
            long rowCount = 0;
            String isolationLevelQuery = "";


            try (final Connection con = dbcpService.getConnection();
                final Statement st = con.createStatement()) {


                final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
                st.setQueryTimeout(queryTimeout); // timeout in seconds
                
                if(isolationLevel!= null){
                    isolationLevelQuery = String.format("SET TRANSACTION ISOLATION LEVEL %s;", isolationLevel);
                }


                logger.debug("Executing {}", new Object[]{selectQuery});
                ResultSet resultSet;
                
                resultSet = st.executeQuery(isolationLevelQuery.concat(selectQuery));


                if (resultSet.next()) {
                    // Total row count is in the first column
                    rowCount = resultSet.getLong(1);


                    // Update the state map with the newly-observed maximum values
                    ResultSetMetaData rsmd = resultSet.getMetaData();
                    for (int i = 2; i <= rsmd.getColumnCount(); i++) {
                        //Some JDBC drivers consider the columns name and label to be very different things.
                        // Since this column has been aliased lets check the label first,
                        // if there is no label we'll use the column name.
                        String resultColumnName = (StringUtils.isNotEmpty(rsmd.getColumnLabel(i))?rsmd.getColumnLabel(i):rsmd.getColumnName(i)).toLowerCase();
                        String fullyQualifiedStateKey = getStateKey(tableName, resultColumnName);
                        String resultColumnCurrentMax = statePropertyMap.get(fullyQualifiedStateKey);
                        if (StringUtils.isEmpty(resultColumnCurrentMax) && !isDynamicTableName) {
                            // If we can't find the value at the fully-qualified key name and the table name is static, it is possible (under a previous scheme)
                            // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
                            // maximum value is observed, it will be stored under the fully-qualified key from then on.
                            resultColumnCurrentMax = statePropertyMap.get(resultColumnName);
                        }


                        int type = rsmd.getColumnType(i);
                        if (isDynamicTableName) {
                            // We haven't pre-populated the column type map if the table name is dynamic, so do it here
                            columnTypeMap.put(fullyQualifiedStateKey, type);
                        }
                        try {
                            String newMaxValue = getMaxValueFromRow(resultSet, i, type, resultColumnCurrentMax, dbAdapter.getName());
                            if (newMaxValue != null) {
                                statePropertyMap.put(fullyQualifiedStateKey, newMaxValue);
                            }
                        } catch (ParseException | IOException pie) {
                            // Fail the whole thing here before we start creating flow files and such
                            throw new ProcessException(pie);
                        }


                    }
                } else {
                    // Something is very wrong here, one row (even if count is zero) should be returned
                    throw new SQLException("No rows returned from metadata query: " + selectQuery);
                }
                
                // If Right Bounded, for each maximum-value column get a right bounding WHERE condition
                if(rightBounded){
                    IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
                         String colName = maxValueColumnNameList.get(index);


                        maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
                        String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
                        if (!StringUtils.isEmpty(maxValue)) {
                            Integer type = getColumnType(tableName, colName);


                            // Add a condition for the WHERE clause
                            maxValueClauses.add(colName + " <= " + getLiteralByType(type, maxValue, dbAdapter.getName()));
                        }
                    });


                    //Update WHERE list to include new right hand boundaries
                    whereClause = StringUtils.join(maxValueClauses, " AND ");
                }
                
                final long numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);


                // Generate SQL statements to read "pages" of data
                for (long i = 0; i < numberOfFetches; i++) {
                    long limit = partitionSize == 0 ? null : partitionSize;
                    long offset = partitionSize == 0 ? null : i * partitionSize;
                    final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", ");
                    final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset);
                    FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
                    sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
                    sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.tableName", tableName);
                    if (columnNames != null) {
                        sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.columnNames", columnNames);
                    }
                    if (StringUtils.isNotBlank(whereClause)) {
                        sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.whereClause", whereClause);
                    }
                    if (StringUtils.isNotBlank(maxColumnNames)) {
                        sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.maxColumnNames", maxColumnNames);
                    }
                    sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.limit", String.valueOf(limit));
                    if (partitionSize != 0) {
                        sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.offset", String.valueOf(offset));
                    }
                    session.transfer(sqlFlowFile, REL_SUCCESS);
                }


                if (fileToProcess != null) {
                    session.remove(fileToProcess);
                }
            } catch (SQLException e) {
                if (fileToProcess != null) {
                    logger.error("Unable to execute SQL select query {} due to {}, routing {} to failure", new Object[]{selectQuery, e, fileToProcess});
                    fileToProcess = session.putAttribute(fileToProcess, "generatetablefetch.sql.error", e.getMessage());
                    session.transfer(fileToProcess, REL_FAILURE);


                } else {
                    logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
                    throw new ProcessException(e);
                }
            }


            session.commit();
            try {
                // Update the state
                stateManager.setState(statePropertyMap, Scope.CLUSTER);
            } catch (IOException ioe) {
                logger.error("{} failed to update State Manager, observed maximum values will not be recorded. "
                                + "Also, any generated SQL statements may be duplicated.",
                        new Object[]{this, ioe});
            }
        } catch (final ProcessException pe) {
            // Log the cause of the ProcessException if it is available
            Throwable t = (pe.getCause() == null ? pe : pe.getCause());
            logger.error("Error during processing: {}", new Object[]{t.getMessage()}, t);
            session.rollback();
            context.yield();
        }
    }
    
    private String getColumnStateMaxValue(String tableName, Map<String, String> statePropertyMap, String colName) {
        final String fullyQualifiedStateKey = getStateKey(tableName, colName);
        String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
        if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) {
            // If the table name is static and the fully-qualified key was not found, try just the column name
            maxValue = statePropertyMap.get(getStateKey(null, colName));
        }


        return maxValue;
    }


    private Integer getColumnType(String tableName, String colName) {
        final String fullyQualifiedStateKey = getStateKey(tableName, colName);
        Integer type = columnTypeMap.get(fullyQualifiedStateKey);
        if (type == null && !isDynamicTableName) {
            // If the table name is static and the fully-qualified key was not found, try just the column name
            type = columnTypeMap.get(getStateKey(null, colName));
        }
        if (type == null) {
            // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed
            throw new IllegalArgumentException("No column type found for: " + colName);
        }


        return type;
    }
}


I thought what I added all possible dependencies.

Anybody please help me.

Thanks

1 ACCEPTED SOLUTION

Accepted Solutions

Re: Custom processor extended GenerateTableFetch

The reason this isn't working is because AbstractDatabaseFetchProcessor is in nifi-standard-nar which is not on the classpath of your custom NAR at runtime.

You could add a NAR dependency in your NAR pom on nifi-standard-nar (you already have the dependency but its currently marked provided):

<dependency>
  <groupId>org.apache.nifi</groupId>
  <artifactId>nifi-standard-processors</artifactId>
  <version>1.2.0</version>
  <type>nar</type>
</dependency>

However, the better approach here would be to refactor things such that AbstractDatabaseFetchProcessor lived inside of some utility JAR that could be re-used by standard NAR and your NAR. There could be a nifi-db-utils module here:

https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-extension-utils

That would be a cleaner approach and follow the pattern used for other abstract processors.

3 REPLIES 3

Re: Custom processor extended GenerateTableFetch

The reason this isn't working is because AbstractDatabaseFetchProcessor is in nifi-standard-nar which is not on the classpath of your custom NAR at runtime.

You could add a NAR dependency in your NAR pom on nifi-standard-nar (you already have the dependency but its currently marked provided):

<dependency>
  <groupId>org.apache.nifi</groupId>
  <artifactId>nifi-standard-processors</artifactId>
  <version>1.2.0</version>
  <type>nar</type>
</dependency>

However, the better approach here would be to refactor things such that AbstractDatabaseFetchProcessor lived inside of some utility JAR that could be re-used by standard NAR and your NAR. There could be a nifi-db-utils module here:

https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-extension-utils

That would be a cleaner approach and follow the pattern used for other abstract processors.

Re: Custom processor extended GenerateTableFetch

Rising Star

@Ilya Li I agree with @Bryan Bende that the best approach is to refactor things such that shared classes are moved to something under nifi-extension-utils. I did this mainly for ListAzureBlobStorage, since it used the AbstractListProcessor code. You can take a look at https://github.com/apache/nifi/pull/1719, and take a look at the last four commits before the PR was merged to master, for an example of the refactoring.

Highlighted

Re: Custom processor extended GenerateTableFetch

Contributor
@Bryan Bende

Thank you very much. This type works. My next step will be refactoring.