Member since
02-15-2017
12
Posts
1
Kudos Received
0
Solutions
02-05-2018
11:28 PM
Thanks @matt burgess for getting back on this. Would I be able to use nifi-script-tester for testing the InvokeScriptedProcessor (running ECMAScript)?
... View more
02-05-2018
06:34 PM
No need for external Java program. You can just write a script(ecmascript or any other) in ExecuteScript processor to flatten your incoming JSON. At which point you can relay the updated flowfile to next processor.
... View more
02-05-2018
06:18 PM
I need help with a NiFi (1.4.0) mystery. I have a flowfile(config json, below) consumed by the ExecuteScript processor running listed ECMASCRIPT below. In gist, idea is to emit the atom feed URLs on set frequency, for the next InvokeHttp to connect and pull data for processing. Script works fine on setting the schedule and triggering, however the issue is that the flowfile (containing the URL) generated by the script vanishes. There is no error or any trace of the file created every time the function is triggered. Any help in getting the out flow files to the next processor in flow (REL_SUCCESS) is much appreciated. Thank you. Input flowfile: {
"dataSource":[{
"name": "My Atom Feed - #1",
"sourceURL": "http://xyz.com/1",
"protocol": "http",
"format": "xml",
"feed": "atom",
"recordType": "workrecord",
"frequencyInSeconds": "2",
"batchSize": "100",
"maxRetries": "0"
},
{
"name": "My Atom Feed - #2",
"sourceURL": "http://xyz.com/2",
"protocol": "http",
"format": "xml",
"feed": "atom",
"recordType": "client",
"frequencyInSeconds": "10",
"batchSize": "100",
"maxRetries": "1"
},
{
"name": "My Atom Feed - #3",
"sourceURL": "http://xyz.com/3",
"protocol": "http",
"format": "xml",
"feed": "atom",
"recordType": "service",
"frequencyInSeconds": "20",
"batchSize": "100",
"maxRetries": "2"
}]
}
ECMAScript: (function(context) {
'use strict';
log.info("Starting....")
var Timer = Java.type('java.util.Timer');
var Phaser = Java.type('java.util.concurrent.Phaser');
var timer = new Timer('amberIngestScheduler', false);
var phaser = new Phaser();
var timeoutStack = 0;
function pushTimeout() {
timeoutStack++;
}
function popTimeout() {
timeoutStack--;
if (timeoutStack > 0) {
return;
}
timer.cancel();
phaser.forceTermination();
}
var onTaskFinished = function() {
phaser.arriveAndDeregister();
};
context.setTimeout = function(fn, millis /* [, args...] */) {
var args = [].slice.call(arguments, 2, arguments.length);
var phase = phaser.register();
var canceled = false;
timer.schedule(function() {
if (canceled) {
return;
}
try {
fn.apply(context, args);
} catch (e) {
print(e);
} finally {
onTaskFinished();
popTimeout();
}
}, millis);
pushTimeout();
return function() {
onTaskFinished();
canceled = true;
popTimeout();
};
};
context.clearTimeout = function(cancel) {
cancel();
};
context.setInterval = function(fn, delay /* [, args...] */) {
var args = [].slice.call(arguments, 2, arguments.length);
var cancel = null;
var loop = function() {
cancel = context.setTimeout(loop, delay);
fn.apply(context, args);
};
cancel = context.setTimeout(loop, delay);
return function() {
cancel();
};
};
context.clearInterval = function(cancel) {
cancel();
};
})(this);
var parentFlowFile = session.get();
if (parentFlowFile !== null) {
var InputStreamCallback = Java.type("org.apache.nifi.processor.io.InputStreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var isError = false
var inFlowFileJSON = null
session.read(parentFlowFile, new InputStreamCallback(function (inputStream) {
inFlowFileJSON = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
}))
var timer = null
var timerHandles = []
try {
var conf = JSON.parse(inFlowFileJSON)
for (i = 0; i < conf.dataSource.length; i++) {
timer = setInterval(publishFlowFile, parseInt(conf.dataSource[i].frequencyInSeconds) * 1000, conf, i, session)
timerHandles.concat(timer)
}
} catch (error) {
log.error('Issue loading the config for datapipe, ' + error)
isError = true
}
if (isError) {
log.info("Moving the parent flowfile to the Failure path....")
session.transfer(parentFlowFile, REL_FAILURE)
} else {
log.info("Removing the parent flowfile....")
session.remove(parentFlowFile)
}
}
function publishFlowFile(conf, i, session) {
log.info(session)
try{
log.info("Sending file...."+i)
var OutputStreamCallback = Java.type("org.apache.nifi.processor.io.OutputStreamCallback")
var outFlowFile = session.create()
outFlowFile = session.write(outFlowFile, new OutputStreamCallback(function (outputStream) {
outputStream.write(JSON.stringify(conf.dataSource[i].sourceURL).getBytes(StandardCharsets.UTF_8))
}))
log.info(outFlowFile)
session.transfer(outFlowFile, REL_SUCCESS)
}
catch(err){
log.info(err)
}
}
... View more
Labels:
- Labels:
-
Apache NiFi
06-23-2017
07:14 PM
Thanks Shashank, logs are now rolling over on both time and size triggers. Awesome!
... View more
06-23-2017
06:16 PM
Thanks Bryan, it helped a lot.
... View more
06-23-2017
01:22 AM
I have custom controller service with few properties. Here is sample of one of them: public static final PropertyDescriptor CONFIG_DELIMITER = new PropertyDescriptor.Builder()
.name("Field delimiter to be used")
.description("Field delimiter used to separate the fields in the cache data file...")
.defaultValue(",")
.expressionLanguageSupported(true)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); I have expressionLanguageSupported set to true for them with the idea of resolving them in onConfigred method of the service. @OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException {
configDelim = context.getProperty(CONFIG_DELIMITER).evaluateAttributeExpressions().getValue();
..........
}
I have created a custom properties file custom.properties with following content: nifi.iSOS.MyService.delim=, I have setup properties in my test as : @Test
public void testService() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final MyService service = new MyService();
runner.addControllerService("MyService", service);
runner.setProperty(service, LookupService.CONFIG_DELIMITER , "${nifi.iSOS.MyService.delim}");
..........
} I need help with 2 things:
How to pass the custom properties file to the test for MyService? Is context.getProperty(CONFIG_DELIMITER).evaluateAttributeExpressions().getValue(); correct way to evaluate the value of the custom property in the controller service? Big thank you for all the help...
... View more
Labels:
- Labels:
-
Apache NiFi
06-23-2017
12:48 AM
1 Kudo
I have NiFi 1.2.0 installed on the Windows 2012 R2 server, it creates the standard logs files as under:
nifi-bootstrap.log nifi-app.log nifi-user.log Issue is that these files are not rolling over daily or based on size. Can someone help? I have attached the logback.xml file. Also, are the changes to this logback.xml dynamically implemented, or it requires NiFi to get into effect?
... View more
Labels:
- Labels:
-
Apache NiFi
06-20-2017
12:13 AM
Matt, how would the properties set in custom properties file be accessed within the custom controller service's onConfigured method? Thanks.
... View more
03-01-2017
05:16 PM
Your explanation helped me in identify the core issue. You are correct in assuming I was calling the service implementation rather than the interface. Once I made appropriate changes in my code and unit tested, I was able to drop in NiFi and have the integration testing done. I was able to configure the custom service by adding it in drop down on the custom Lookup processor in NiFi Web UI. THANK YOU for all the help!!!!
... View more
02-28-2017
10:48 PM
I have built a custom controller service and custom processor. I am able to build and run tests in IntelliJ on my laptop. Once the maven build is successfully complete, I copied all the 3 nar files and dropped them in the lib folder of NiFi. In NiFi web interface, I can the see my custom service in Controller Service page. I am able to configure it and enable it. NiFi logs confirms the service is online. Now, when I drop the custom processor, I cannot see the service to choose in drop down. Event though it is service is running. I verified the dependencies in pom for all nar modules.
nifi-customservice-api-nar nifi-customservice-nar nifi-LookupProcessor-nar Following is the error in the NiFi app log: java.lang.IllegalArgumentException: ControllerServices may be referenced only via their interfaces; class org.pwc.nifi.customservice.LookupService is not an interface
at org.apache.nifi.attribute.expression.language.StandardPropertyValue.asControllerService(StandardPropertyValue.java:172) ~[nifi-expression-language-1.1.1.jar:1.1.1]
at org.pwc.nifi.customservice.LookupProcessor.onTrigger(LookupProcessor.java:79) ~[na:na]
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.1.1.jar:1.1.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_91]
at java.util.concurrent.FutureTask.runAndReset(Unknown Source) [na:1.8.0_91]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source) [na:1.8.0_91]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [na:1.8.0_91]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.8.0_91]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.8.0_91]
at java.lang.Thread.run(Unknown Source) [na:1.8.0_91] I would appreciate if someone can help with :
Resolving missing service in drop down when configuring the processor in properties. Resolving error "java.lang.IllegalArgumentException: ControllerServices may be referenced only via their interfaces" Thank you.
... View more
Labels:
- Labels:
-
Apache NiFi
02-15-2017
11:44 PM
Koji Kawamura,
Did you get chance to work on the offset reset feature? Having ability to reset offset to a target value is necessary in case the pipeline needs to be reprocessed.
... View more