Member since
04-11-2016
174
Posts
29
Kudos Received
6
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3317 | 06-28-2017 12:24 PM | |
2506 | 06-09-2017 07:20 AM | |
6990 | 08-18-2016 11:39 AM | |
5089 | 08-12-2016 09:05 AM | |
5260 | 08-09-2016 09:24 AM |
05-26-2017
02:02 PM
Yes, your guess is right ! There was an upgrade from 1.1 The tip seems crucial, will try that ...
... View more
05-26-2017
09:04 AM
1 Kudo
NiFi 1.2.0, two nodes, kerberized. In the previous version, the custom processor executed properly. The process I follow for deployment : Place the nar file in lib of both the nodes(l4513t.sss.se.com and l4514t.sss.se.com, NiFi installations) Restart one NiFi instance, at a time I am getting issues when I begin with one of the NiFi nodes(l4513t.sss.se.com) First, I got the following error when I placed the nar file in lib and tried to restart NiFi : Failed to connect node to cluster because local flow is different than cluster flow.
org.apache.nifi.controller.UninheritableFlowException: Failed to connect node to cluster because local flow is different than cluster flow.
at org.apache.nifi.controller.StandardFlowService.loadFromConnectionResponse(StandardFlowService.java:934)
at org.apache.nifi.controller.StandardFlowService.load(StandardFlowService.java:515)
at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:790)
at org.apache.nifi.NiFi.<init>(NiFi.java:160)
at org.apache.nifi.NiFi.main(NiFi.java:267)
Caused by: org.apache.nifi.controller.UninheritableFlowException: Proposed configuration is not inheritable by the flow controller because of flow differences: Found difference in Flows:
Local Fingerprint: 91910-015b-1000-9677-68b017463306com.datalake.processors.SQLServerCDCProcessorNO_VALUEdefaultnifi-NiFiCDCPoC-narunversionedDatabase Connection Pooling Service=fe153649-5193-1a68-ffff-ffffc37686
Cluster Fingerprint: 91910-015b-1000-9677-68b017463306com.datalake.processors.SQLServerCDCProcessorNO_VALUEdefaultunknownunversionedDatabase Connection Pooling Service=fe153649-5193-1a68-ffff-ffffc37686c6containerD
at org.apache.nifi.controller.StandardFlowSynchronizer.sync(StandardFlowSynchronizer.java:259)
at org.apache.nifi.controller.FlowController.synchronize(FlowController.java:1544)
at org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO.load(StandardXMLFlowConfigurationDAO.java:84)
at org.apache.nifi.controller.StandardFlowService.loadFromBytes(StandardFlowService.java:720)
at org.apache.nifi.controller.StandardFlowService.loadFromConnectionResponse(StandardFlowService.java:909)
... 4 common frames omitted
2017-05-26 09:05:06,199 INFO [main] o.a.n.c.c.node.NodeClusterCoordinator l4513t.sss.se.com:9443 requested disconnection from cluster due to org.apache.nifi.controller.UninheritableFlowException: Failed to connect node to cluster because local flow is different than cluster flow.
2017-05-26 09:05:06,199 INFO [main] o.a.n.c.c.node.NodeClusterCoordinator Status of l4513t.sss.se.com:9443 changed from NodeConnectionStatus[nodeId=l4513t.sss.se.com:9443, state=CONNECTING, updateId=368] to NodeConnectionStatus[nodeId=l4513t.sss.se.com:9443, state=DISCONNECTED, Disconnect Code=Node's Flow did not Match Cluster Flow, Disconnect Reason=org.apache.nifi.controller.UninheritableFlowException: Failed to connect node to cluster because local flow is different than cluster flow., updateId=368]
2017-05-26 09:05:06,395 ERROR [main] o.a.n.c.c.node.NodeClusterCoordinator Event Reported for l4513t.sss.se.com:9443 -- Node disconnected from cluster due to org.apache.nifi.controller.UninheritableFlowException: Failed to connect node to cluster because local flow is different than cluster flow.
2017-05-26 09:05:06,395 INFO [main] o.a.n.c.l.e.CuratorLeaderElectionManager Cannot unregister Leader Election Role 'Primary Node' becuase that role is not registered
2017-05-26 09:05:06,395 WARN [main] org.apache.nifi.web.server.JettyServer Failed to start web server... shutting down.
java.lang.IllegalStateException: Already closed or has not been started
at com.google.common.base.Preconditions.checkState(Preconditions.java:173)
at org.apache.curator.framework.recipes.leader.LeaderSelector.close(LeaderSelector.java:270)
at org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager.unregister(CuratorLeaderElectionManager.java:151)
at org.apache.nifi.controller.FlowController.setClustered(FlowController.java:3667)
at org.apache.nifi.controller.StandardFlowService.handleConnectionFailure(StandardFlowService.java:554)
at org.apache.nifi.controller.StandardFlowService.load(StandardFlowService.java:518)
at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:790)
at org.apache.nifi.NiFi.<init>(NiFi.java:160)
at org.apache.nifi.NiFi.main(NiFi.java:267) I flipped through several existing threads and deleted the flow.xml.gz from a node and attempted to restart, now I am getting the following : 2017-05-26 09:20:30,803 INFO [Process Cluster Protocol Request-1] o.a.n.c.c.node.NodeClusterCoordinator Status of l4513t.sss.se.com:9443 changed from null to NodeConnectionStatus[nodeId=l4513t.sss.se.com:9443, state=CONNECTING, updateId=370]
2017-05-26 09:20:30,811 INFO [Process Cluster Protocol Request-1] o.a.n.c.p.impl.SocketProtocolListener Finished processing request 25e40b0f-b0ab-4f92-9e7f-440abb116999 (type=NODE_STATUS_CHANGE, length=1071 bytes) from l4514t.sss.se.com in 117 millis
2017-05-26 09:20:30,914 INFO [main] o.a.n.c.c.node.NodeClusterCoordinator Resetting cluster node statuses from {l4513t.sss.se.com:9443=NodeConnectionStatus[nodeId=l4513t.sss.se.com:9443, state=CONNECTING, updateId=370]} to {l4514t.sss.se.com:9443=NodeConnectionStatus[nodeId=l4514t.sss.se.com:9443, state=CONNECTED, updateId=360], l4513t.sss.se.com:9443=NodeConnectionStatus[nodeId=l4513t.sss.se.com:9443, state=CONNECTING, updateId=370]}
2017-05-26 09:20:31,348 ERROR [main] o.a.nifi.controller.StandardFlowService Failed to load flow from cluster due to: org.apache.nifi.controller.MissingBundleException: Failed to connect node to cluster because cluster flow contains bundles that do not exist on the current node
org.apache.nifi.controller.MissingBundleException: Failed to connect node to cluster because cluster flow contains bundles that do not exist on the current node
at org.apache.nifi.controller.StandardFlowService.loadFromConnectionResponse(StandardFlowService.java:936)
at org.apache.nifi.controller.StandardFlowService.load(StandardFlowService.java:515)
at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:790)
at org.apache.nifi.NiFi.<init>(NiFi.java:160)
at org.apache.nifi.NiFi.main(NiFi.java:267)
Caused by: org.apache.nifi.controller.MissingBundleException: com.datalake.processors.SQLServerCDCProcessor from default:unknown:unversioned is not known to this NiFi instance.
at org.apache.nifi.controller.StandardFlowSynchronizer.checkBundleCompatibility(StandardFlowSynchronizer.java:445)
at org.apache.nifi.controller.StandardFlowSynchronizer.sync(StandardFlowSynchronizer.java:253)
at org.apache.nifi.controller.FlowController.synchronize(FlowController.java:1544)
at org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO.load(StandardXMLFlowConfigurationDAO.java:84)
at org.apache.nifi.controller.StandardFlowService.loadFromBytes(StandardFlowService.java:720)
at org.apache.nifi.controller.StandardFlowService.loadFromConnectionResponse(StandardFlowService.java:909)
... 4 common frames omitted
Caused by: java.lang.IllegalStateException: com.datalake.processors.SQLServerCDCProcessor from default:unknown:unversioned is not known to this NiFi instance.
at org.apache.nifi.util.BundleUtils.findCompatibleBundle(BundleUtils.java:55)
at org.apache.nifi.util.BundleUtils.getBundle(BundleUtils.java:98)
at org.apache.nifi.controller.StandardFlowSynchronizer.checkBundleCompatibility(StandardFlowSynchronizer.java:443)
... 9 common frames omitted
2017-05-26 09:20:31,348 INFO [main] o.a.n.c.c.node.NodeClusterCoordinator l4513t.sss.se.com:9443 requested disconnection from cluster due to org.apache.nifi.controller.MissingBundleException: Failed to connect node to cluster because cluster flow contains bundles that do not exist on the current node
2017-05-26 09:20:31,348 INFO [main] o.a.n.c.c.node.NodeClusterCoordinator Status of l4513t.sss.se.com:9443 changed from NodeConnectionStatus[nodeId=l4513t.sss.se.com:9443, state=CONNECTING, updateId=370] to NodeConnectionStatus[nodeId=l4513t.sss.se.com:9443, state=DISCONNECTED, Disconnect Code=Node was missing bundle used by Cluster Flow, Disconnect Reason=org.apache.nifi.controller.MissingBundleException: Failed to connect node to cluster because cluster flow contains bundles that do not exist on the current node, updateId=370]
2017-05-26 09:20:31,448 INFO [main] o.a.n.c.c.node.NodeClusterCoordinator Event Reported for l4513t.sss.se.com:9443 -- Node disconnected from cluster due to org.apache.nifi.controller.MissingBundleException: Failed to connect node to cluster because cluster flow contains bundles that do not exist on the current node
2017-05-26 09:20:31,448 INFO [main] o.a.n.c.l.e.CuratorLeaderElectionManager Cannot unregister Leader Election Role 'Primary Node' becuase that role is not registered
2017-05-26 09:20:31,448 WARN [main] org.apache.nifi.web.server.JettyServer Failed to start web server... shutting down.
java.lang.IllegalStateException: Already closed or has not been started
at com.google.common.base.Preconditions.checkState(Preconditions.java:173)
at org.apache.curator.framework.recipes.leader.LeaderSelector.close(LeaderSelector.java:270)
at org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager.unregister(CuratorLeaderElectionManager.java:151)
at org.apache.nifi.controller.FlowController.setClustered(FlowController.java:3667)
at org.apache.nifi.controller.StandardFlowService.handleConnectionFailure(StandardFlowService.java:554)
at org.apache.nifi.controller.StandardFlowService.load(StandardFlowService.java:518)
at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:790)
at org.apache.nifi.NiFi.<init>(NiFi.java:160)
at org.apache.nifi.NiFi.main(NiFi.java:267)
2017-05-26 09:20:31,450 INFO [Thread-1] org.apache.nifi.NiFi Initiating shutdown of Jetty web server...
2017-05-26 09:20:31,456 INFO [Thread-1] o.eclipse.jetty.server.AbstractConnector Stopped ServerConnector@73bd146c{SSL,[ssl, http/1.1]}{l4513t.sss.se.com:9443}
2017-05-26 09:20:31,456 INFO [Thread-1] org.eclipse.jetty.server.session Stopped scavenging
2017-05-26 09:20:31,457 DEBUG [Thread-1] org.apache.jasper.servlet.JspServlet JspServlet.destroy() What is that I am missing ?
... View more
Labels:
- Labels:
-
Apache NiFi
03-20-2017
11:41 AM
NiFi 1.1.1 I am trying to persist a byte [] using the State Manager. private byte[] lsnUsedDuringLastLoad;
@Override
public void onTrigger(final ProcessContext context,
final ProcessSession session) throws ProcessException {
...
...
...
final StateManager stateManager = context.getStateManager();
try {
StateMap stateMap = stateManager.getState(Scope.CLUSTER);
final Map<String, String> newStateMapProperties = new HashMap<>();
newStateMapProperties.put(ProcessorConstants.LAST_MAX_LSN,
new String(lsnUsedDuringLastLoad));
logger.debug("Persisting stateMap : "
+ newStateMapProperties);
stateManager.replace(stateMap, newStateMapProperties,
Scope.CLUSTER);
} catch (IOException ioException) {
logger.error("Error while persisting the state to NiFi",
ioException);
throw new ProcessException(
"The state(LSN) couldn't be persisted", ioException);
}
...
...
...
} I don't get any exception or even a log error entry, the processor continues to run. The following load code always returns a null value(Retrieved the statemap : {})for the persisted field : try {
stateMap = stateManager.getState(Scope.CLUSTER);
stateMapProperties = new HashMap<>(stateMap.toMap());
logger.debug("Retrieved the statemap : "+stateMapProperties);
lastMaxLSN = (stateMapProperties
.get(ProcessorConstants.LAST_MAX_LSN) == null || stateMapProperties
.get(ProcessorConstants.LAST_MAX_LSN).isEmpty()) ? null
: stateMapProperties.get(
ProcessorConstants.LAST_MAX_LSN).getBytes();
logger.debug("Attempted to load the previous lsn from NiFi state : "
+ lastMaxLSN);
} catch (IOException ioe) {
logger.error("Couldn't load the state map", ioe);
throw new ProcessException(ioe);
}
I am wondering if the ZK is at fault or have I missed something while using the State Map !
... View more
Labels:
- Labels:
-
Apache NiFi
03-13-2017
08:27 AM
I want to avoid adding the sqljdbc42.jar in the nifi lib directory(that's what I am doing now to make it work).
... View more
03-09-2017
02:22 PM
I have created a controller service to connect to a test db. I have a custom processor that reads data from SQL Server, the mock tests, the build and the deployment to NiFi succeed. The processor runs into error, maybe the nar dependency scope is at fault or ... ? I am unsure The pom for processor and the nar projects are as follows : <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.datalake</groupId>
<artifactId>CDCNiFi</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-NiFiCDCPoC-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<!-- Third-party -->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>6.1.0.jre8</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.7</version>
</dependency>
<!-- Testing & Cross-cutting concerns -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.datalake</groupId>
<artifactId>CDCNiFi</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-NiFiCDCPoC-nar</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>com.datalake</groupId>
<artifactId>nifi-NiFiCDCPoC-processors</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>6.1.0.jre8</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</project> The issues : The SQL Server JDBC jar is , probably, not loaded at runtime, the app.log has the following error : 2017-03-09 15:04:06,074 ERROR [Timer-Driven Process Thread-1] c.s.d.processors.SQLServerCDCProcessor SQLServerCDCProcessor[id=ad9de403-015a-1000-2b40-7efbfdb049b1] Process or SQL exception in <configure logger template to pick the code location>
2017-03-09 15:04:06,080 ERROR [Timer-Driven Process Thread-1] c.s.d.processors.SQLServerCDCProcessor
org.apache.nifi.processor.exception.ProcessException: org.apache.commons.dbcp.SQLNestedException: Cannot load JDBC driver class 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
at org.apache.nifi.dbcp.DBCPConnectionPool.getConnection(DBCPConnectionPool.java:252) ~[na:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_71]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_71]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_71]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_71]
at org.apache.nifi.controller.service.StandardControllerServiceProvider$1.invoke(StandardControllerServiceProvider.java:177) ~[na:na]
at com.sun.proxy.$Proxy89.getConnection(Unknown Source) ~[na:na]
at com.datalake.processors.SQLServerCDCProcessor.onTrigger(SQLServerCDCProcessor.java:244) ~[nifi-NiFiCDCPoC-processors-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) [nifi-api-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) [nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.1.1.jar:1.1.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_71]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_71]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_71]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_71]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_71]
Caused by: org.apache.commons.dbcp.SQLNestedException: Cannot load JDBC driver class 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
at org.apache.commons.dbcp.BasicDataSource.createConnectionFactory(BasicDataSource.java:1429) ~[na:na]
at org.apache.commons.dbcp.BasicDataSource.createDataSource(BasicDataSource.java:1371) ~[na:na]
at org.apache.commons.dbcp.BasicDataSource.getConnection(BasicDataSource.java:1044) ~[na:na]
at org.apache.nifi.dbcp.DBCPConnectionPool.getConnection(DBCPConnectionPool.java:249) ~[na:na]
... 19 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.microsoft.sqlserver.jdbc.SQLServerDriver
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_71]
at org.apache.nifi.nar.InstanceClassLoader.findClass(InstanceClassLoader.java:117) ~[nifi-nar-utils-1.1.1.jar:1.1.1]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_71]
at org.apache.nifi.nar.InstanceClassLoader.loadClass(InstanceClassLoader.java:98) ~[nifi-nar-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.nar.InstanceClassLoader.loadClass(InstanceClassLoader.java:82) ~[nifi-nar-utils-1.1.1.jar:1.1.1]
at org.apache.commons.dbcp.BasicDataSource.createConnectionFactory(BasicDataSource.java:1420) ~[na:na]
... 22 common frames omitted In the controller service, I do not see my custom processor in 'Referencing Components'(it says 'no referencing components') Right now, I have hard-coded the 'Database Driver Location(s)' but in production, I would like the controller to pick up that from the classpath(somehow, is it possible?)
... View more
Labels:
- Labels:
-
Apache NiFi
03-06-2017
04:09 PM
To create a custom processor, I followed the documentation. I made the necessary code changes in the MyProcessor.java and the MyProcessorTest runs fine except when I try to use some 'optional' properties. Note : I tried all the builder methods like required(false), addValidator() etc. for the optional properties, in vain. Actually, a validator doesn't make sense for an optional property ... MyProcessor.java @Tags({ "example" })
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({ @ReadsAttribute(attribute = "", description = "") })
@WritesAttributes({ @WritesAttribute(attribute = "", description = "") })
@Stateful(description = "After a db-level LSN is processed, the same should be persisted as the last processed LSN", scopes = { Scope.CLUSTER })
public class MyProcessor extends AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description(
"Successfully created FlowFile from SQL query result set.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure").description("SQL query execution failed. ???")
.build();
/* Start : Mandatory properties */
public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("Database Connection Pooling Service")
.description(
"The Controller Service that is used to obtain connection to database")
.required(true).identifiesControllerService(DBCPService.class)
.build();
public static final PropertyDescriptor CONTAINER_DB = new PropertyDescriptor.Builder()
.name("containerDB").displayName("Container Database")
.description("The name of the container database").required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
...
...more mandatory properties
...
/* End : Mandatory properties */
/*Start : Optional properties */
public static final PropertyDescriptor CDC_TS_FROM = new PropertyDescriptor.Builder()
.name("cdcTSFrom").displayName("Load CDC on or after")
.description("The CDC on or after this datetime will be fetched.")
.required(false).defaultValue(null).build();
public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
.name("schema").displayName("DB Schema")
.description("The schema which contains the xxxxxx")
.defaultValue(null).required(false).build();
/*End : Optional properties */
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(CONTAINER_DB);
descriptors.add(DBCP_SERVICE);
...
...
...
descriptors.add(CDC_TS_FROM);
descriptors.add(SCHEMA);
...
...
...
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_FAILURE);
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
// TODO : Check if the component lifecycle methods esp. onScheduled() and
// onShutDown() are required
@Override
public void onTrigger(final ProcessContext context,
final ProcessSession session) throws ProcessException {
...
...
...
}
} MyProcessorTest.java public class MyProcessorTest {
private TestRunner testRunner;
private final String CONTAINER_DB = "test";
private final String DBCP_SERVICE = "test_dbcp";
...
...
...
private final String SCHEMA = "dbo";
private final String CDC_TS_FROM = "";
...
...
...
@Before
public void init() throws InitializationException {
testRunner = TestRunners.newTestRunner(MyProcessor.class);
final DBCPService dbcp = new DBCPServiceSQLServerImpl(...);
final Map<String, String> dbcpProperties = new HashMap<>();
testRunner = TestRunners.newTestRunner(MyProcessor.class);
testRunner.addControllerService(DBCP_SERVICE, dbcp, dbcpProperties);
testRunner.enableControllerService(dbcp);
testRunner.assertValid(dbcp);
testRunner.setProperty(MyProcessor.DBCP_SERVICE, DBCP_SERVICE);
testRunner.setProperty(MyProcessor.CONTAINER_DB, CONTAINER_DB);
...
...
...
testRunner.setProperty(MyProcessor.CDC_TS_FROM, CDC_TS_FROM);
testRunner.setProperty(MyProcessor.SCHEMA, SCHEMA);
...
...
...
}
@Test
public void testProcessor() {
testRunner.run();
}
/**
* Simple implementation only for MyProcessor processor testing.
*/
private class DBCPServiceSQLServerImpl extends AbstractControllerService
implements DBCPService {
private static final String SQL_SERVER_CONNECT_URL = "jdbc:sqlserver://%s;database=%s";
private String containerDB;
private String password;
private String userName;
private String dbHost;
public DBCPServiceSQLServerImpl(String containerDB, String password,
String userName, String dbHost) {
super();
this.containerDB = containerDB;
this.password = password;
this.userName = userName;
this.dbHost = dbHost;
}
@Override
public String getIdentifier() {
return DBCP_SERVICE;
}
@Override
public Connection getConnection() throws ProcessException {
try {
Connection connection = DriverManager.getConnection(String
.format(SQL_SERVER_CONNECT_URL, dbHost, containerDB),
userName, password);
return connection;
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
}
}
}
} Now if I comment the optional properties in the test class : //testRunner.setProperty(MyProcessor.CDC_TS_FROM, CDC_TS_FROM);
//testRunner.setProperty(MyProcessor.SCHEMA, SCHEMA); , the test completes normally but if I enable any or all of the optional properties, say, CDC_TS_FROM, then I the test case assertion fails, no matter what value I put for CDC_TS_FROM : java.lang.AssertionError: Processor has 1 validation failures:
'cdcTSFrom' validated against '' is invalid because 'cdcTSFrom' is not a supported property
at org.junit.Assert.fail(Assert.java:88)
at org.apache.nifi.util.MockProcessContext.assertValid(MockProcessContext.java:251)
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:161)
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:152)
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:147)
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:142)
at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:137)
at processors.NiFiCDCPoC.sqlserver.MyProcessorTest.testProcessor(MyProcessorTest.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
... View more
Labels:
- Labels:
-
Apache NiFi
02-17-2017
08:46 AM
HDP-2.5.3.0, NiFi 1.1.1 I am writing a custom processor in NiFi. There are several String and Timestamp fields that I need to store somewhere such that those are available on all/any nodes. @Tags({ "example" }) @CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({ @ReadsAttribute(attribute = "", description = "") })
@WritesAttributes({ @WritesAttribute(attribute = "", description = "") })
public class MyProcessor extends AbstractProcessor {
.
.
.
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
/* Persist these, probably, in ZK */
private Timestamp lastRunAt;
private String startPoint;
.
.
.
@Override
public void onTrigger(final ProcessContext context,final ProcessSession session) throws ProcessException {FlowFile flowFile = session.get();
/*Retrieve lastRunAt & startPoint and use*/
lastRunAt ;
startPoint ;
.
.
.
}
} Note that HDFS is NOT an option as NiFi may run without any Hadoop installation in picture. What are the options to do this - I was wondering if Zookeeper can be used to store this data since it's small in size and NiFi is backed by ZK. I tried to find ways to use the Zookeeper API to persist these fields, in vain.
... View more
Labels:
- Labels:
-
Apache NiFi
01-04-2017
02:50 PM
1 Kudo
HDP-2.5.0.0(2.5.0.0-1245) using Ambari 2.4.0.1 There are two variants of a managed, ORC table, one is denormalized approach and the other is influenced by the traditional star-schema design. At this point in time, I don't have the source scripts of the tables, hence, cannot share them for clarity. The tables are neither partitioned nor bucketed. Alarmed by the no. of the ORC files the original tables viz. fact_rtc and fact_rtc_starschema had, I did the following : Created copies of those tables (CTAS) Executed the 'ALTER TABLE ... CONCATENATE;' The table contains the steps and the stats, I am puzzled about the following :
For the fact_rtc_starschema, the no. of files are different for CTAS copy and the CONCAT on the original table, why so ? For fact_rtc, irrespective of the CTAS or CONCAT, the no. of files is unchanged, why so ? Certainly, the number of columns has an impact on the size but does a larger size and aplenty columns cause a large number of files ? I am unsure if 'small no. of large files' should be taken far. Currently, there are 4 data nodes, soon there will 8 and so on. Does this argument hold then ? I am confused about which table is behaving incorrectly. Or is it both are correct ? The problem started with a query differing drastically in response time on the two types of tables. The reduction of the files has improved the performance but added more confusions, hence, I will post those as a separate thread, later. Table Name Statistics Notes fact_rtc Number of columns : 249
Number of rows : 1775017302
Number of files : 1009
Average file size : 245 MB
Raw data size : 6.27 TB
Total Size : 241.05 GB Original table stats fact_rtc_copy Number of columns : 249 Number of rows : 1775017302 Number of files : 1009 Average file size : 245 MB Raw data size : 6.27 TB Total Size : 241.05 GB CTAS fact_rtc. Note that the number of the files is unchanged fact_RTC_copy Number of columns : 249
Number of rows : 1775017302
Number of files : 1009
Average file size : 245 MB
Raw data size : 6.27 TB
Total Size : 241.05 GB ALTER TABLE fact_RTC_copy CONCATENATE;
Note that the number of the files is unchanged fact_rtc_starschema Number of columns : 132 Number of rows : 1775017302 Number of files : 3732 File size range : 44MB - 76MB Raw data size : 2.31 TB Total Size : 229.23 GB Original table stats
fact_rtc_starschema_copy Number of columns : 132 Number of rows : 1775017302 Number of files : 239 File size range : 950MB - 1015MB Raw data size : 2.31 TB Total Size : 229.03 GB CTAS fact_rtc_starschema.
Note that the number of the files are changed fact_rtc_starschema Number of columns : 132
Number of rows : 1775017302
Number of files : 864
File size range : 245MB - 335MB
Raw data size : 2.31 TB
Total Size : 239.56 GB
ALTER TABLE fact_RTC_starschema CONCATENATE;
Note that the number of the files are changed
... View more
Labels:
- Labels:
-
Apache Hive
01-03-2017
11:22 AM
The command that worked : *Most conservative, takes the least cluster /usr/hdp/current/spark2-client/bin/spark-submit --class samples.FuelModel--master yarn --deploy-mode cluster --executor-memory 4g--driver-memory 8g spark-assembly-1.0.jar The time taken is too long(40+ minutes) to be acceptable, I tried increasing the no. of cores and executors but it didn't help much. I guess the overhead exceeds the performance benefit due to the processing a small file(< 200MB) on 2 nodes. I would be glad if you can provide any pointers.
... View more
01-03-2017
08:25 AM
bpreachuk you are right, the data type is string. The source RDBMS(both Oracle and SQL Server) handle this identically for both IN and OR operators but in Hive, the difference is surfacing only after a close look at the data. Any idea how to overcome this ? Since Sqoop is just using the source data types, I can't think of a way ...
... View more