Created 06-29-2017 09:14 AM
Hi all,
I have strange issue with NoClassDefFoundError. I get standard processor GenerateTableFetch and change it for my needs. But I always get error
java.lang.NoClassDefFoundError: org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at org.apache.nifi.nar.ExtensionManager.loadExtensions(ExtensionManager.java:138) at org.apache.nifi.nar.ExtensionManager.discoverExtensions(ExtensionManager.java:113) at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:699) at org.apache.nifi.NiFi.<init>(NiFi.java:160) at org.apache.nifi.NiFi.main(NiFi.java:267) Caused by: java.lang.ClassNotFoundException: org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 21 common frames omitted
I read all relevant answers here and in google, but without success.
Files:
nifi-teg-bundle/pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-nar-bundles</artifactId> <version>1.2.0</version> </parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-teg-bundle</artifactId> <version>1.0</version> <packaging>pom</packaging> <build> <plugins> <plugin> <groupId>org.apache.nifi</groupId> <artifactId>nifi-nar-maven-plugin</artifactId> <version>1.2.0</version> <extensions>true</extensions> </plugin> </plugins> </build> <modules> <module>nifi-teg-processors</module> <module>nifi-teg-nar</module> </modules> </project>
nifi-teg-bundle/nifi-teg-processors/pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-teg-bundle</artifactId> <version>1.0</version> </parent> <artifactId>nifi-teg-processors</artifactId> <packaging>jar</packaging> <dependencies> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> <version>1.2.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-utils</artifactId> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-processor-utils</artifactId> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-standard-processors</artifactId> <version>1.2.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-dbcp-service-api</artifactId> <version>1.2.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-standard-utils</artifactId> <version>1.2.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> </project>
nifi-teg-bundle/nifi-teg-nar/pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-teg-bundle</artifactId> <version>1.0</version> </parent> <artifactId>nifi-teg-nar</artifactId> <version>1.0</version> <packaging>nar</packaging> <properties> <maven.javadoc.skip>true</maven.javadoc.skip> <source.skip>true</source.skip> </properties> <dependencies> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-standard-services-api-nar</artifactId> <type>nar</type> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-standard-processors</artifactId> <version>1.2.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-teg-processors</artifactId> <version>1.0</version> </dependency> </dependencies> </project>
nifi-teg-bundle/nifi-teg-processors/src/main/java/org/apache/nifi/processors/teg/MyProcessor.java
package org.apache.nifi.processors.teg; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.db.DatabaseAdapter; import org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor; import org.apache.nifi.processors.standard.QueryDatabaseTable; import org.apache.nifi.processors.standard.ExecuteSQL; import org.apache.nifi.processors.standard.ListDatabaseTables; import java.io.IOException; import java.sql.Connection; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; @TriggerSerially @InputRequirement(Requirement.INPUT_ALLOWED) @Tags({"sql", "select", "jdbc", "query", "database", "fetch", "generate"}) @SeeAlso({QueryDatabaseTable.class, ExecuteSQL.class, ListDatabaseTables.class}) @CapabilityDescription("Generates SQL select queries that fetch \"pages\" of rows from a table. The partition size property, along with the table's row count, " + "determine the size and number of pages and generated FlowFiles. In addition, incremental fetching can be achieved by setting Maximum-Value Columns, " + "which causes the processor to track the columns' maximum values, thus only fetching rows whose columns' values exceed the observed maximums. This " + "processor is intended to be run on the Primary Node only.\n\n" + "This processor can accept incoming connections; the behavior of the processor is different whether incoming connections are provided:\n" + " - If no incoming connection(s) are specified, the processor will generate SQL queries on the specified processor schedule. Expression Language is supported for many " + "fields, but no flow file attributes are available. However the properties will be evaluated using the Variable Registry.\n" + " - If incoming connection(s) are specified and no flow file is available to a processor task, no work will be performed.\n" + " - If incoming connection(s) are specified and a flow file is available to a processor task, the flow file's attributes may be used in Expression Language for such fields " + "as Table Name and others. However, the Max-Value Columns and Columns to Return fields must be empty or refer to columns that are available in each specified table.") @Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for " + "the specified column(s) will be retained for use in future executions of the query. This allows the Processor " + "to fetch only those records that have max values greater than the retained values. This can be used for " + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor " + "per the State Management documentation") @WritesAttributes({ @WritesAttribute(attribute = "generatetablefetch.sql.error", description = "If the processor has incoming connections, and processing an incoming flow file causes " + "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message."), @WritesAttribute(attribute = "generatetablefetch.tableName", description = "The name of the database table to be queried."), @WritesAttribute(attribute = "generatetablefetch.columnNames", description = "The comma-separated list of column names used in the query."), @WritesAttribute(attribute = "generatetablefetch.whereClause", description = "Where clause used in the query to get the expected rows."), @WritesAttribute(attribute = "generatetablefetch.maxColumnNames", description = "The comma-separated list of column names used to keep track of data " + "that has been returned since the processor started running."), @WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."), @WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.") }) public class MyProcessor extends AbstractDatabaseFetchProcessor { public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder() .name("gen-table-fetch-partition-size") .displayName("Partition Size") .description("The number of result rows to be fetched by each generated SQL statement. The total number of rows in " + "the table divided by the partition size gives the number of SQL statements (i.e. FlowFiles) generated. A " + "value of zero indicates that a single FlowFile is to be generated whose SQL statement will fetch all rows " + "in the table.") .defaultValue("10000") .required(true) .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .build(); public static final PropertyDescriptor MSSQL_ISOLATION_LEVEL = new PropertyDescriptor.Builder() .name("gen-table-fetch-isolation-level") .displayName("Isolation Level") .description("MSSQL Isolation level for transactions") .required(false) .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor RIGHT_BOUND_WHERE = new PropertyDescriptor.Builder() .name("gen-table-fetch-right-bounded") .displayName("Right Bounded") .description("Whether to include the new max value(s) as a right hand boundary in the where statement. " + "If this is set to false duplicate data may be returned by consecutive executions of the " + "processor if there are deletions or if the table is high volume.") .allowableValues("true", "false") .defaultValue("false") .required(true) .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. " + "If no incoming connection(s) are specified, this relationship is unused.") .build(); public MyProcessor() { final Set<Relationship> r = new HashSet<>(); r.add(REL_SUCCESS); r.add(REL_FAILURE); relationships = Collections.unmodifiableSet(r); final List<PropertyDescriptor> pds = new ArrayList<>(); pds.add(DBCP_SERVICE); pds.add(DB_TYPE); pds.add(TABLE_NAME); pds.add(COLUMN_NAMES); pds.add(RIGHT_BOUND_WHERE); pds.add(MAX_VALUE_COLUMN_NAMES); pds.add(QUERY_TIMEOUT); pds.add(PARTITION_SIZE); pds.add(MSSQL_ISOLATION_LEVEL); propDescriptors = Collections.unmodifiableList(pds); } @Override public Set<Relationship> getRelationships() { return relationships; } @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return propDescriptors; } @Override protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { return super.customValidate(validationContext); } @Override @OnScheduled public void setup(final ProcessContext context) { // Pre-fetch the column types if using a static table name and max-value columns if (!isDynamicTableName && !isDynamicMaxValues) { super.setup(context); } } @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { ProcessSession session = sessionFactory.createSession(); FlowFile fileToProcess = null; if (context.hasIncomingConnection()) { fileToProcess = session.get(); if (fileToProcess == null) { // Incoming connection with no flow file available, do no work (see capability description) return; } } final ComponentLog logger = getLogger(); final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(fileToProcess).getValue(); final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue(); final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue(); final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger(); final String isolationLevel = context.getProperty(MSSQL_ISOLATION_LEVEL).evaluateAttributeExpressions(fileToProcess).getValue(); final Boolean rightBounded = context.getProperty(RIGHT_BOUND_WHERE).asBoolean(); final StateManager stateManager = context.getStateManager(); final StateMap stateMap; try { stateMap = stateManager.getState(Scope.CLUSTER); } catch (final IOException ioe) { logger.error("Failed to retrieve observed maximum values from the State Manager. Will not perform " + "query until this is accomplished.", ioe); context.yield(); return; } try { // Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually // set as the current state map (after the session has been committed) final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap()); // Build a WHERE clause with maximum-value columns (if they exist), and a list of column names that will contain MAX(<column>) aliases. The // executed SQL query will retrieve the count of all records after the filter(s) have been applied, as well as the new maximum values for the // specified columns. This allows the processor to generate the correctly partitioned SQL statements as well as to update the state with the // latest observed maximum values. String whereClause = null; List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames) ? new ArrayList<>(0) : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*")); List<String> maxValueClauses = new ArrayList<>(maxValueColumnNameList.size()); String columnsClause = null; List<String> maxValueSelectColumns = new ArrayList<>(maxValueColumnNameList.size() + 1); maxValueSelectColumns.add("COUNT(*)"); // For each maximum-value column, get a WHERE filter and a MAX(column) alias IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> { String colName = maxValueColumnNameList.get(index); maxValueSelectColumns.add("MAX(" + colName + ") " + colName); // final String fullyQualifiedStateKey = getStateKey(tableName, colName); // String maxValue = statePropertyMap.get(fullyQualifiedStateKey); // if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) { // // If the table name is static and the fully-qualified key was not found, try just the column name // maxValue = statePropertyMap.get(getStateKey(null, colName)); // } String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName); if (!StringUtils.isEmpty(maxValue)) { // Integer type = columnTypeMap.get(fullyQualifiedStateKey); // if (type == null && !isDynamicTableName) { // // If the table name is static and the fully-qualified key was not found, try just the column name // type = columnTypeMap.get(getStateKey(null, colName)); // } // if (type == null) { // // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed // throw new IllegalArgumentException("No column type found for: " + colName); // } Integer type = getColumnType(tableName, colName); // Add a condition for the WHERE clause maxValueClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName())); } }); whereClause = StringUtils.join(maxValueClauses, " AND "); columnsClause = StringUtils.join(maxValueSelectColumns, ", "); // Build a SELECT query with maximum-value columns (if present) final String selectQuery = dbAdapter.getSelectStatement(tableName, columnsClause, whereClause, null, null, null); long rowCount = 0; String isolationLevelQuery = ""; try (final Connection con = dbcpService.getConnection(); final Statement st = con.createStatement()) { final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue(); st.setQueryTimeout(queryTimeout); // timeout in seconds if(isolationLevel!= null){ isolationLevelQuery = String.format("SET TRANSACTION ISOLATION LEVEL %s;", isolationLevel); } logger.debug("Executing {}", new Object[]{selectQuery}); ResultSet resultSet; resultSet = st.executeQuery(isolationLevelQuery.concat(selectQuery)); if (resultSet.next()) { // Total row count is in the first column rowCount = resultSet.getLong(1); // Update the state map with the newly-observed maximum values ResultSetMetaData rsmd = resultSet.getMetaData(); for (int i = 2; i <= rsmd.getColumnCount(); i++) { //Some JDBC drivers consider the columns name and label to be very different things. // Since this column has been aliased lets check the label first, // if there is no label we'll use the column name. String resultColumnName = (StringUtils.isNotEmpty(rsmd.getColumnLabel(i))?rsmd.getColumnLabel(i):rsmd.getColumnName(i)).toLowerCase(); String fullyQualifiedStateKey = getStateKey(tableName, resultColumnName); String resultColumnCurrentMax = statePropertyMap.get(fullyQualifiedStateKey); if (StringUtils.isEmpty(resultColumnCurrentMax) && !isDynamicTableName) { // If we can't find the value at the fully-qualified key name and the table name is static, it is possible (under a previous scheme) // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new // maximum value is observed, it will be stored under the fully-qualified key from then on. resultColumnCurrentMax = statePropertyMap.get(resultColumnName); } int type = rsmd.getColumnType(i); if (isDynamicTableName) { // We haven't pre-populated the column type map if the table name is dynamic, so do it here columnTypeMap.put(fullyQualifiedStateKey, type); } try { String newMaxValue = getMaxValueFromRow(resultSet, i, type, resultColumnCurrentMax, dbAdapter.getName()); if (newMaxValue != null) { statePropertyMap.put(fullyQualifiedStateKey, newMaxValue); } } catch (ParseException | IOException pie) { // Fail the whole thing here before we start creating flow files and such throw new ProcessException(pie); } } } else { // Something is very wrong here, one row (even if count is zero) should be returned throw new SQLException("No rows returned from metadata query: " + selectQuery); } // If Right Bounded, for each maximum-value column get a right bounding WHERE condition if(rightBounded){ IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> { String colName = maxValueColumnNameList.get(index); maxValueSelectColumns.add("MAX(" + colName + ") " + colName); String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName); if (!StringUtils.isEmpty(maxValue)) { Integer type = getColumnType(tableName, colName); // Add a condition for the WHERE clause maxValueClauses.add(colName + " <= " + getLiteralByType(type, maxValue, dbAdapter.getName())); } }); //Update WHERE list to include new right hand boundaries whereClause = StringUtils.join(maxValueClauses, " AND "); } final long numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); // Generate SQL statements to read "pages" of data for (long i = 0; i < numberOfFetches; i++) { long limit = partitionSize == 0 ? null : partitionSize; long offset = partitionSize == 0 ? null : i * partitionSize; final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", "); final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset); FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes())); sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.tableName", tableName); if (columnNames != null) { sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.columnNames", columnNames); } if (StringUtils.isNotBlank(whereClause)) { sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.whereClause", whereClause); } if (StringUtils.isNotBlank(maxColumnNames)) { sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.maxColumnNames", maxColumnNames); } sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.limit", String.valueOf(limit)); if (partitionSize != 0) { sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.offset", String.valueOf(offset)); } session.transfer(sqlFlowFile, REL_SUCCESS); } if (fileToProcess != null) { session.remove(fileToProcess); } } catch (SQLException e) { if (fileToProcess != null) { logger.error("Unable to execute SQL select query {} due to {}, routing {} to failure", new Object[]{selectQuery, e, fileToProcess}); fileToProcess = session.putAttribute(fileToProcess, "generatetablefetch.sql.error", e.getMessage()); session.transfer(fileToProcess, REL_FAILURE); } else { logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e}); throw new ProcessException(e); } } session.commit(); try { // Update the state stateManager.setState(statePropertyMap, Scope.CLUSTER); } catch (IOException ioe) { logger.error("{} failed to update State Manager, observed maximum values will not be recorded. " + "Also, any generated SQL statements may be duplicated.", new Object[]{this, ioe}); } } catch (final ProcessException pe) { // Log the cause of the ProcessException if it is available Throwable t = (pe.getCause() == null ? pe : pe.getCause()); logger.error("Error during processing: {}", new Object[]{t.getMessage()}, t); session.rollback(); context.yield(); } } private String getColumnStateMaxValue(String tableName, Map<String, String> statePropertyMap, String colName) { final String fullyQualifiedStateKey = getStateKey(tableName, colName); String maxValue = statePropertyMap.get(fullyQualifiedStateKey); if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) { // If the table name is static and the fully-qualified key was not found, try just the column name maxValue = statePropertyMap.get(getStateKey(null, colName)); } return maxValue; } private Integer getColumnType(String tableName, String colName) { final String fullyQualifiedStateKey = getStateKey(tableName, colName); Integer type = columnTypeMap.get(fullyQualifiedStateKey); if (type == null && !isDynamicTableName) { // If the table name is static and the fully-qualified key was not found, try just the column name type = columnTypeMap.get(getStateKey(null, colName)); } if (type == null) { // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed throw new IllegalArgumentException("No column type found for: " + colName); } return type; } }
I thought what I added all possible dependencies.
Anybody please help me.
Thanks
Created 06-29-2017 05:06 PM
The reason this isn't working is because AbstractDatabaseFetchProcessor is in nifi-standard-nar which is not on the classpath of your custom NAR at runtime.
You could add a NAR dependency in your NAR pom on nifi-standard-nar (you already have the dependency but its currently marked provided):
<dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-standard-processors</artifactId> <version>1.2.0</version> <type>nar</type> </dependency>
However, the better approach here would be to refactor things such that AbstractDatabaseFetchProcessor lived inside of some utility JAR that could be re-used by standard NAR and your NAR. There could be a nifi-db-utils module here:
https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-extension-utils
That would be a cleaner approach and follow the pattern used for other abstract processors.
Created 06-29-2017 05:06 PM
The reason this isn't working is because AbstractDatabaseFetchProcessor is in nifi-standard-nar which is not on the classpath of your custom NAR at runtime.
You could add a NAR dependency in your NAR pom on nifi-standard-nar (you already have the dependency but its currently marked provided):
<dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-standard-processors</artifactId> <version>1.2.0</version> <type>nar</type> </dependency>
However, the better approach here would be to refactor things such that AbstractDatabaseFetchProcessor lived inside of some utility JAR that could be re-used by standard NAR and your NAR. There could be a nifi-db-utils module here:
https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-extension-utils
That would be a cleaner approach and follow the pattern used for other abstract processors.
Created 06-29-2017 06:41 PM
@Ilya Li I agree with @Bryan Bende that the best approach is to refactor things such that shared classes are moved to something under nifi-extension-utils. I did this mainly for ListAzureBlobStorage, since it used the AbstractListProcessor code. You can take a look at https://github.com/apache/nifi/pull/1719, and take a look at the last four commits before the PR was merged to master, for an example of the refactoring.
Created 07-02-2017 02:37 PM