Created 06-29-2017 09:14 AM
Hi all,
I have strange issue with NoClassDefFoundError. I get standard processor GenerateTableFetch and change it for my needs. But I always get error
java.lang.NoClassDefFoundError: org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at org.apache.nifi.nar.ExtensionManager.loadExtensions(ExtensionManager.java:138) at org.apache.nifi.nar.ExtensionManager.discoverExtensions(ExtensionManager.java:113) at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:699) at org.apache.nifi.NiFi.<init>(NiFi.java:160) at org.apache.nifi.NiFi.main(NiFi.java:267) Caused by: java.lang.ClassNotFoundException: org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 21 common frames omitted
I read all relevant answers here and in google, but without success.
Files:
nifi-teg-bundle/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.2.0</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-teg-bundle</artifactId>
<version>1.0</version>
<packaging>pom</packaging>
<build>
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-maven-plugin</artifactId>
<version>1.2.0</version>
<extensions>true</extensions>
</plugin>
</plugins>
</build>
<modules>
<module>nifi-teg-processors</module>
<module>nifi-teg-nar</module>
</modules>
</project>
nifi-teg-bundle/nifi-teg-processors/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-teg-bundle</artifactId>
<version>1.0</version>
</parent>
<artifactId>nifi-teg-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>1.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-processors</artifactId>
<version>1.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>1.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-utils</artifactId>
<version>1.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
nifi-teg-bundle/nifi-teg-nar/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-teg-bundle</artifactId>
<version>1.0</version>
</parent>
<artifactId>nifi-teg-nar</artifactId>
<version>1.0</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-processors</artifactId>
<version>1.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-teg-processors</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
</project>
nifi-teg-bundle/nifi-teg-processors/src/main/java/org/apache/nifi/processors/teg/MyProcessor.java
package org.apache.nifi.processors.teg;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor;
import org.apache.nifi.processors.standard.QueryDatabaseTable;
import org.apache.nifi.processors.standard.ExecuteSQL;
import org.apache.nifi.processors.standard.ListDatabaseTables;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@TriggerSerially
@InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"sql", "select", "jdbc", "query", "database", "fetch", "generate"})
@SeeAlso({QueryDatabaseTable.class, ExecuteSQL.class, ListDatabaseTables.class})
@CapabilityDescription("Generates SQL select queries that fetch \"pages\" of rows from a table. The partition size property, along with the table's row count, "
+ "determine the size and number of pages and generated FlowFiles. In addition, incremental fetching can be achieved by setting Maximum-Value Columns, "
+ "which causes the processor to track the columns' maximum values, thus only fetching rows whose columns' values exceed the observed maximums. This "
+ "processor is intended to be run on the Primary Node only.\n\n"
+ "This processor can accept incoming connections; the behavior of the processor is different whether incoming connections are provided:\n"
+ " - If no incoming connection(s) are specified, the processor will generate SQL queries on the specified processor schedule. Expression Language is supported for many "
+ "fields, but no flow file attributes are available. However the properties will be evaluated using the Variable Registry.\n"
+ " - If incoming connection(s) are specified and no flow file is available to a processor task, no work will be performed.\n"
+ " - If incoming connection(s) are specified and a flow file is available to a processor task, the flow file's attributes may be used in Expression Language for such fields "
+ "as Table Name and others. However, the Max-Value Columns and Columns to Return fields must be empty or refer to columns that are available in each specified table.")
@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for "
+ "the specified column(s) will be retained for use in future executions of the query. This allows the Processor "
+ "to fetch only those records that have max values greater than the retained values. This can be used for "
+ "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
+ "per the State Management documentation")
@WritesAttributes({
@WritesAttribute(attribute = "generatetablefetch.sql.error", description = "If the processor has incoming connections, and processing an incoming flow file causes "
+ "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message."),
@WritesAttribute(attribute = "generatetablefetch.tableName", description = "The name of the database table to be queried."),
@WritesAttribute(attribute = "generatetablefetch.columnNames", description = "The comma-separated list of column names used in the query."),
@WritesAttribute(attribute = "generatetablefetch.whereClause", description = "Where clause used in the query to get the expected rows."),
@WritesAttribute(attribute = "generatetablefetch.maxColumnNames", description = "The comma-separated list of column names used to keep track of data "
+ "that has been returned since the processor started running."),
@WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."),
@WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.")
})
public class MyProcessor extends AbstractDatabaseFetchProcessor {
public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder()
.name("gen-table-fetch-partition-size")
.displayName("Partition Size")
.description("The number of result rows to be fetched by each generated SQL statement. The total number of rows in "
+ "the table divided by the partition size gives the number of SQL statements (i.e. FlowFiles) generated. A "
+ "value of zero indicates that a single FlowFile is to be generated whose SQL statement will fetch all rows "
+ "in the table.")
.defaultValue("10000")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MSSQL_ISOLATION_LEVEL = new PropertyDescriptor.Builder()
.name("gen-table-fetch-isolation-level")
.displayName("Isolation Level")
.description("MSSQL Isolation level for transactions")
.required(false)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor RIGHT_BOUND_WHERE = new PropertyDescriptor.Builder()
.name("gen-table-fetch-right-bounded")
.displayName("Right Bounded")
.description("Whether to include the new max value(s) as a right hand boundary in the where statement. "
+ "If this is set to false duplicate data may be returned by consecutive executions of the "
+ "processor if there are deletions or if the table is high volume.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. "
+ "If no incoming connection(s) are specified, this relationship is unused.")
.build();
public MyProcessor() {
final Set<Relationship> r = new HashSet<>();
r.add(REL_SUCCESS);
r.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(r);
final List<PropertyDescriptor> pds = new ArrayList<>();
pds.add(DBCP_SERVICE);
pds.add(DB_TYPE);
pds.add(TABLE_NAME);
pds.add(COLUMN_NAMES);
pds.add(RIGHT_BOUND_WHERE);
pds.add(MAX_VALUE_COLUMN_NAMES);
pds.add(QUERY_TIMEOUT);
pds.add(PARTITION_SIZE);
pds.add(MSSQL_ISOLATION_LEVEL);
propDescriptors = Collections.unmodifiableList(pds);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propDescriptors;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return super.customValidate(validationContext);
}
@Override
@OnScheduled
public void setup(final ProcessContext context) {
// Pre-fetch the column types if using a static table name and max-value columns
if (!isDynamicTableName && !isDynamicMaxValues) {
super.setup(context);
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession();
FlowFile fileToProcess = null;
if (context.hasIncomingConnection()) {
fileToProcess = session.get();
if (fileToProcess == null) {
// Incoming connection with no flow file available, do no work (see capability description)
return;
}
}
final ComponentLog logger = getLogger();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(fileToProcess).getValue();
final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
final String isolationLevel = context.getProperty(MSSQL_ISOLATION_LEVEL).evaluateAttributeExpressions(fileToProcess).getValue();
final Boolean rightBounded = context.getProperty(RIGHT_BOUND_WHERE).asBoolean();
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
try {
stateMap = stateManager.getState(Scope.CLUSTER);
} catch (final IOException ioe) {
logger.error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
+ "query until this is accomplished.", ioe);
context.yield();
return;
}
try {
// Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
// set as the current state map (after the session has been committed)
final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
// Build a WHERE clause with maximum-value columns (if they exist), and a list of column names that will contain MAX(<column>) aliases. The
// executed SQL query will retrieve the count of all records after the filter(s) have been applied, as well as the new maximum values for the
// specified columns. This allows the processor to generate the correctly partitioned SQL statements as well as to update the state with the
// latest observed maximum values.
String whereClause = null;
List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
? new ArrayList<>(0)
: Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
List<String> maxValueClauses = new ArrayList<>(maxValueColumnNameList.size());
String columnsClause = null;
List<String> maxValueSelectColumns = new ArrayList<>(maxValueColumnNameList.size() + 1);
maxValueSelectColumns.add("COUNT(*)");
// For each maximum-value column, get a WHERE filter and a MAX(column) alias
IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
String colName = maxValueColumnNameList.get(index);
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
// final String fullyQualifiedStateKey = getStateKey(tableName, colName);
// String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
// if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) {
// // If the table name is static and the fully-qualified key was not found, try just the column name
// maxValue = statePropertyMap.get(getStateKey(null, colName));
// }
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
if (!StringUtils.isEmpty(maxValue)) {
// Integer type = columnTypeMap.get(fullyQualifiedStateKey);
// if (type == null && !isDynamicTableName) {
// // If the table name is static and the fully-qualified key was not found, try just the column name
// type = columnTypeMap.get(getStateKey(null, colName));
// }
// if (type == null) {
// // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed
// throw new IllegalArgumentException("No column type found for: " + colName);
// }
Integer type = getColumnType(tableName, colName);
// Add a condition for the WHERE clause
maxValueClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
}
});
whereClause = StringUtils.join(maxValueClauses, " AND ");
columnsClause = StringUtils.join(maxValueSelectColumns, ", ");
// Build a SELECT query with maximum-value columns (if present)
final String selectQuery = dbAdapter.getSelectStatement(tableName, columnsClause, whereClause, null, null, null);
long rowCount = 0;
String isolationLevelQuery = "";
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
if(isolationLevel!= null){
isolationLevelQuery = String.format("SET TRANSACTION ISOLATION LEVEL %s;", isolationLevel);
}
logger.debug("Executing {}", new Object[]{selectQuery});
ResultSet resultSet;
resultSet = st.executeQuery(isolationLevelQuery.concat(selectQuery));
if (resultSet.next()) {
// Total row count is in the first column
rowCount = resultSet.getLong(1);
// Update the state map with the newly-observed maximum values
ResultSetMetaData rsmd = resultSet.getMetaData();
for (int i = 2; i <= rsmd.getColumnCount(); i++) {
//Some JDBC drivers consider the columns name and label to be very different things.
// Since this column has been aliased lets check the label first,
// if there is no label we'll use the column name.
String resultColumnName = (StringUtils.isNotEmpty(rsmd.getColumnLabel(i))?rsmd.getColumnLabel(i):rsmd.getColumnName(i)).toLowerCase();
String fullyQualifiedStateKey = getStateKey(tableName, resultColumnName);
String resultColumnCurrentMax = statePropertyMap.get(fullyQualifiedStateKey);
if (StringUtils.isEmpty(resultColumnCurrentMax) && !isDynamicTableName) {
// If we can't find the value at the fully-qualified key name and the table name is static, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
// maximum value is observed, it will be stored under the fully-qualified key from then on.
resultColumnCurrentMax = statePropertyMap.get(resultColumnName);
}
int type = rsmd.getColumnType(i);
if (isDynamicTableName) {
// We haven't pre-populated the column type map if the table name is dynamic, so do it here
columnTypeMap.put(fullyQualifiedStateKey, type);
}
try {
String newMaxValue = getMaxValueFromRow(resultSet, i, type, resultColumnCurrentMax, dbAdapter.getName());
if (newMaxValue != null) {
statePropertyMap.put(fullyQualifiedStateKey, newMaxValue);
}
} catch (ParseException | IOException pie) {
// Fail the whole thing here before we start creating flow files and such
throw new ProcessException(pie);
}
}
} else {
// Something is very wrong here, one row (even if count is zero) should be returned
throw new SQLException("No rows returned from metadata query: " + selectQuery);
}
// If Right Bounded, for each maximum-value column get a right bounding WHERE condition
if(rightBounded){
IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
String colName = maxValueColumnNameList.get(index);
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
if (!StringUtils.isEmpty(maxValue)) {
Integer type = getColumnType(tableName, colName);
// Add a condition for the WHERE clause
maxValueClauses.add(colName + " <= " + getLiteralByType(type, maxValue, dbAdapter.getName()));
}
});
//Update WHERE list to include new right hand boundaries
whereClause = StringUtils.join(maxValueClauses, " AND ");
}
final long numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
// Generate SQL statements to read "pages" of data
for (long i = 0; i < numberOfFetches; i++) {
long limit = partitionSize == 0 ? null : partitionSize;
long offset = partitionSize == 0 ? null : i * partitionSize;
final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", ");
final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset);
FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.tableName", tableName);
if (columnNames != null) {
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.columnNames", columnNames);
}
if (StringUtils.isNotBlank(whereClause)) {
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.whereClause", whereClause);
}
if (StringUtils.isNotBlank(maxColumnNames)) {
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.maxColumnNames", maxColumnNames);
}
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.limit", String.valueOf(limit));
if (partitionSize != 0) {
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.offset", String.valueOf(offset));
}
session.transfer(sqlFlowFile, REL_SUCCESS);
}
if (fileToProcess != null) {
session.remove(fileToProcess);
}
} catch (SQLException e) {
if (fileToProcess != null) {
logger.error("Unable to execute SQL select query {} due to {}, routing {} to failure", new Object[]{selectQuery, e, fileToProcess});
fileToProcess = session.putAttribute(fileToProcess, "generatetablefetch.sql.error", e.getMessage());
session.transfer(fileToProcess, REL_FAILURE);
} else {
logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
throw new ProcessException(e);
}
}
session.commit();
try {
// Update the state
stateManager.setState(statePropertyMap, Scope.CLUSTER);
} catch (IOException ioe) {
logger.error("{} failed to update State Manager, observed maximum values will not be recorded. "
+ "Also, any generated SQL statements may be duplicated.",
new Object[]{this, ioe});
}
} catch (final ProcessException pe) {
// Log the cause of the ProcessException if it is available
Throwable t = (pe.getCause() == null ? pe : pe.getCause());
logger.error("Error during processing: {}", new Object[]{t.getMessage()}, t);
session.rollback();
context.yield();
}
}
private String getColumnStateMaxValue(String tableName, Map<String, String> statePropertyMap, String colName) {
final String fullyQualifiedStateKey = getStateKey(tableName, colName);
String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) {
// If the table name is static and the fully-qualified key was not found, try just the column name
maxValue = statePropertyMap.get(getStateKey(null, colName));
}
return maxValue;
}
private Integer getColumnType(String tableName, String colName) {
final String fullyQualifiedStateKey = getStateKey(tableName, colName);
Integer type = columnTypeMap.get(fullyQualifiedStateKey);
if (type == null && !isDynamicTableName) {
// If the table name is static and the fully-qualified key was not found, try just the column name
type = columnTypeMap.get(getStateKey(null, colName));
}
if (type == null) {
// This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed
throw new IllegalArgumentException("No column type found for: " + colName);
}
return type;
}
}
I thought what I added all possible dependencies.
Anybody please help me.
Thanks
Created 06-29-2017 05:06 PM
The reason this isn't working is because AbstractDatabaseFetchProcessor is in nifi-standard-nar which is not on the classpath of your custom NAR at runtime.
You could add a NAR dependency in your NAR pom on nifi-standard-nar (you already have the dependency but its currently marked provided):
<dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-standard-processors</artifactId> <version>1.2.0</version> <type>nar</type> </dependency>
However, the better approach here would be to refactor things such that AbstractDatabaseFetchProcessor lived inside of some utility JAR that could be re-used by standard NAR and your NAR. There could be a nifi-db-utils module here:
https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-extension-utils
That would be a cleaner approach and follow the pattern used for other abstract processors.
Created 06-29-2017 05:06 PM
The reason this isn't working is because AbstractDatabaseFetchProcessor is in nifi-standard-nar which is not on the classpath of your custom NAR at runtime.
You could add a NAR dependency in your NAR pom on nifi-standard-nar (you already have the dependency but its currently marked provided):
<dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-standard-processors</artifactId> <version>1.2.0</version> <type>nar</type> </dependency>
However, the better approach here would be to refactor things such that AbstractDatabaseFetchProcessor lived inside of some utility JAR that could be re-used by standard NAR and your NAR. There could be a nifi-db-utils module here:
https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-extension-utils
That would be a cleaner approach and follow the pattern used for other abstract processors.
Created 06-29-2017 06:41 PM
@Ilya Li I agree with @Bryan Bende that the best approach is to refactor things such that shared classes are moved to something under nifi-extension-utils. I did this mainly for ListAzureBlobStorage, since it used the AbstractListProcessor code. You can take a look at https://github.com/apache/nifi/pull/1719, and take a look at the last four commits before the PR was merged to master, for an example of the refactoring.
Created 07-02-2017 02:37 PM