Created 03-06-2017 04:09 PM
To create a custom processor, I followed the documentation.
I made the necessary code changes in the MyProcessor.java and the MyProcessorTest runs fine except when I try to use some 'optional' properties. Note : I tried all the builder methods like required(false), addValidator() etc. for the optional properties, in vain. Actually, a validator doesn't make sense for an optional property ...
MyProcessor.java
@Tags({ "example" }) @CapabilityDescription("Provide a description") @SeeAlso({}) @ReadsAttributes({ @ReadsAttribute(attribute = "", description = "") }) @WritesAttributes({ @WritesAttribute(attribute = "", description = "") }) @Stateful(description = "After a db-level LSN is processed, the same should be persisted as the last processed LSN", scopes = { Scope.CLUSTER }) public class MyProcessor extends AbstractProcessor { public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description( "Successfully created FlowFile from SQL query result set.") .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure").description("SQL query execution failed. ???") .build(); /* Start : Mandatory properties */ public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() .name("Database Connection Pooling Service") .description( "The Controller Service that is used to obtain connection to database") .required(true).identifiesControllerService(DBCPService.class) .build(); public static final PropertyDescriptor CONTAINER_DB = new PropertyDescriptor.Builder() .name("containerDB").displayName("Container Database") .description("The name of the container database").required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); ... ...more mandatory properties ... /* End : Mandatory properties */ /*Start : Optional properties */ public static final PropertyDescriptor CDC_TS_FROM = new PropertyDescriptor.Builder() .name("cdcTSFrom").displayName("Load CDC on or after") .description("The CDC on or after this datetime will be fetched.") .required(false).defaultValue(null).build(); public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() .name("schema").displayName("DB Schema") .description("The schema which contains the xxxxxx") .defaultValue(null).required(false).build(); /*End : Optional properties */ private List<PropertyDescriptor> descriptors; private Set<Relationship> relationships; @Override protected void init(final ProcessorInitializationContext context) { final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); descriptors.add(CONTAINER_DB); descriptors.add(DBCP_SERVICE); ... ... ... descriptors.add(CDC_TS_FROM); descriptors.add(SCHEMA); ... ... ... this.descriptors = Collections.unmodifiableList(descriptors); final Set<Relationship> relationships = new HashSet<Relationship>(); relationships.add(REL_FAILURE); relationships.add(REL_SUCCESS); this.relationships = Collections.unmodifiableSet(relationships); } @Override public Set<Relationship> getRelationships() { return this.relationships; } @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { return descriptors; } // TODO : Check if the component lifecycle methods esp. onScheduled() and // onShutDown() are required @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { ... ... ... } }
MyProcessorTest.java
public class MyProcessorTest { private TestRunner testRunner; private final String CONTAINER_DB = "test"; private final String DBCP_SERVICE = "test_dbcp"; ... ... ... private final String SCHEMA = "dbo"; private final String CDC_TS_FROM = ""; ... ... ... @Before public void init() throws InitializationException { testRunner = TestRunners.newTestRunner(MyProcessor.class); final DBCPService dbcp = new DBCPServiceSQLServerImpl(...); final Map<String, String> dbcpProperties = new HashMap<>(); testRunner = TestRunners.newTestRunner(MyProcessor.class); testRunner.addControllerService(DBCP_SERVICE, dbcp, dbcpProperties); testRunner.enableControllerService(dbcp); testRunner.assertValid(dbcp); testRunner.setProperty(MyProcessor.DBCP_SERVICE, DBCP_SERVICE); testRunner.setProperty(MyProcessor.CONTAINER_DB, CONTAINER_DB); ... ... ... testRunner.setProperty(MyProcessor.CDC_TS_FROM, CDC_TS_FROM); testRunner.setProperty(MyProcessor.SCHEMA, SCHEMA); ... ... ... } @Test public void testProcessor() { testRunner.run(); } /** * Simple implementation only for MyProcessor processor testing. */ private class DBCPServiceSQLServerImpl extends AbstractControllerService implements DBCPService { private static final String SQL_SERVER_CONNECT_URL = "jdbc:sqlserver://%s;database=%s"; private String containerDB; private String password; private String userName; private String dbHost; public DBCPServiceSQLServerImpl(String containerDB, String password, String userName, String dbHost) { super(); this.containerDB = containerDB; this.password = password; this.userName = userName; this.dbHost = dbHost; } @Override public String getIdentifier() { return DBCP_SERVICE; } @Override public Connection getConnection() throws ProcessException { try { Connection connection = DriverManager.getConnection(String .format(SQL_SERVER_CONNECT_URL, dbHost, containerDB), userName, password); return connection; } catch (final Exception e) { throw new ProcessException("getConnection failed: " + e); } } } }
Now if I comment the optional properties in the test class :
//testRunner.setProperty(MyProcessor.CDC_TS_FROM, CDC_TS_FROM); //testRunner.setProperty(MyProcessor.SCHEMA, SCHEMA);
, the test completes normally but if I enable any or all of the optional properties, say, CDC_TS_FROM, then I the test case assertion fails, no matter what value I put for CDC_TS_FROM :
java.lang.AssertionError: Processor has 1 validation failures: 'cdcTSFrom' validated against '' is invalid because 'cdcTSFrom' is not a supported property at org.junit.Assert.fail(Assert.java:88) at org.apache.nifi.util.MockProcessContext.assertValid(MockProcessContext.java:251) at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:161) at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:152) at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:147) at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:142) at org.apache.nifi.util.StandardProcessorTestRunner.run(StandardProcessorTestRunner.java:137) at processors.NiFiCDCPoC.sqlserver.MyProcessorTest.testProcessor(MyProcessorTest.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Created 03-06-2017 04:22 PM
All Property Descriptors (required or optional) must have a Validator set explicitly, otherwise it will return the error you are seeing. It appears you are not looking to perform validation, but you still must set a validator, so on your optional properties add the following to the builder:
.addValidator(Validator.VALID)
Created 03-06-2017 04:22 PM
All Property Descriptors (required or optional) must have a Validator set explicitly, otherwise it will return the error you are seeing. It appears you are not looking to perform validation, but you still must set a validator, so on your optional properties add the following to the builder:
.addValidator(Validator.VALID)