Member since
09-08-2016
27
Posts
23
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
9831 | 11-22-2016 06:57 PM | |
6314 | 09-08-2016 09:32 PM |
01-31-2018
07:25 PM
I have a nifi workflow that reads avro messages from kafka and writes them as snappy compressed parquet files to s3. I'm noticing some behavior I did not expect with this: In PutParquet I'm specifying the directory as s3://mybucketname/events/${kafka.topic}/${event}/${event_timestamp:format("yyyy/MM/dd/HH")} but it seems to be creating a directory with a blank name at the top level of my bucket before moving on to events/topic_name/etc. For all subsequent sub directories, it seems to be creating a file in addition to the directory with the same name. It is creating block files at the top level of the bucket, i.e. "block_1903392314985430358" The files it creates don't have a .snappy or a .parquet extension. Can anyone shed any light into what is happening here and how I can fix it? I'm using Apache Nifi 1.4.0 My core-site.xml has: <property>
<name>fs.defaultFS</name>
<value>s3://mybucketname</value>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3.S3FileSystem</value>
</property>
... View more
Labels:
- Labels:
-
Apache NiFi
07-27-2017
12:01 AM
@Harsh ChoudharyAgreed. I came to the conclusion that the distributed map cache is too flakey to keep track of important things. We've seen it mysteriously fail several times and have since changed all our processes to use a database.
... View more
02-16-2017
06:57 PM
1 Kudo
Thanks @Matt Burgess! This is working much better for me. For future reference, there are jython examples on how to do this in the unit tests of the nifi project: https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython
... View more
02-16-2017
04:05 PM
2 Kudos
I'm seeing very poor performance in my python ExecuteScript. Essentially, for each flow file, I'm querying the Distributed Map Cache and joining that information with the info in the flow file. Two things are coming to mind as far as possible performance bottlenecks: * Is ExecuteScript creating a new jython environment for every flow file? Or is it spinning it up once for each concurrent task and reusing? * If only one jython environment is created per concurrent thread, is it possible for me to connect to the DMC just once and then just query keys for all further executions? Is there a setup method hook or something like that? Any help would be appreciated. Thanks!
... View more
Labels:
- Labels:
-
Apache NiFi
02-14-2017
08:40 PM
Nevermind, I see the 'module directory' property in ExecuteScript.
... View more
02-14-2017
08:29 PM
Ah, thanks @Bryan Bende. I remember seeing some sample code on how to connect to DMC from within ExecuteScript. My script is in python...do you know offhand if ExecuteScript/jython includes libraries installed via pip? I'd like to write a library for interacting with DMC in python so that my actual join script isn't as complicated.
... View more
02-14-2017
06:31 PM
Thanks for the quick replies! This is very helpful. Yes, I'm storing a fairly large value in an attribute, but maybe you guys can suggest an alternate approach? What I'm doing right now is processing survey results. Unfortunately, the survey question data and the responses are coming in as separate streams. I want to be able to join these two data sets while in my Nifi flow so I don't have to kick off a separate ETL. So, what I chose to do is store the question data in the distributed map cache and then as each response comes in, query the cache by the survey id and assuming it is found, put the question data into an attribute. Then, an ExecuteScript runs to join the flowfile content (the response) and the attribute value (questions). Is there another, more scalable way, to do this?
... View more
02-14-2017
05:29 PM
when the latter happens, my nifi gets into a state where active threads are permanently stuck and I have to restart the server to recover.
... View more
02-14-2017
05:16 PM
I'm getting this error in my logs. Anyone knows what causes this or how to prevent it? Disk has plenty of space on it. 2017-02-14 17:04:04,687 ERROR [Timer-Driven Process Thread-2] o.a.n.p.s.FetchDistributedMapCache FetchDistributedMapCache[id=e69c1dbb-1011-1157-f
7f1-321d05a0a0f7] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: FlowFile Repository failed to update: or
g.apache.nifi.processor.exception.ProcessException: FlowFile Repository failed to update
2017-02-14 17:04:04,687 ERROR [Timer-Driven Process Thread-2] o.a.n.p.s.FetchDistributedMapCache
org.apache.nifi.processor.exception.ProcessException: FlowFile Repository failed to update
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:369) ~[nifi-framework-core-1.1.1.jar:1
.1.1]
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:305) ~[nifi-framework-core-1.1.1.jar:1
.1.1]
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28) ~[nifi-api-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.1.1.jar
:1.1.1]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.1.1.jar:
1.1.1]
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.1.1
.jar:1.1.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_101]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_101]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_101]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: java.io.IOException: All Partitions have been blacklisted due to failures when attempting to update. If the Write-Ahead Log is able to
perform a checkpoint, this issue may resolve itself. Otherwise, manual intervention will be required.
at org.wali.MinimalLockingWriteAheadLog.update(MinimalLockingWriteAheadLog.java:220) ~[nifi-write-ahead-log-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.WriteAheadFlowFileRepository.updateRepository(WriteAheadFlowFileRepository.java:210) ~[nifi-fram
ework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.WriteAheadFlowFileRepository.updateRepository(WriteAheadFlowFileRepository.java:178) ~[nifi-fram
ework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:363) ~[nifi-framework-core-1.1.1.jar:1
.1.1]
... 13 common frames omitted
I'm also seeing this error which may or may not be related (UTFDataFormatException: encoded string too long: 87941 bytes) 2017-02-14 17:08:44,567 ERROR [Timer-Driven Process Thread-7] o.a.n.p.s.FetchDistributedMapCache
org.apache.nifi.processor.exception.ProcessException: FlowFile Repository failed to update
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:369) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:305) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28) ~[nifi-api-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.1.1.jar:1.1.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_101]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_101]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_101]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: java.io.IOException: Failed to write field 'Repository Record Update'
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeRecordFields(SchemaRecordWriter.java:46) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeRecord(SchemaRecordWriter.java:35) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.SchemaRepositoryRecordSerde.serializeRecord(SchemaRepositoryRecordSerde.java:95) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.SchemaRepositoryRecordSerde.serializeEdit(SchemaRepositoryRecordSerde.java:67) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.SchemaRepositoryRecordSerde.serializeEdit(SchemaRepositoryRecordSerde.java:46) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.wali.MinimalLockingWriteAheadLog$Partition.update(MinimalLockingWriteAheadLog.java:957) ~[nifi-write-ahead-log-1.1.1.jar:1.1.1]
at org.wali.MinimalLockingWriteAheadLog.update(MinimalLockingWriteAheadLog.java:238) ~[nifi-write-ahead-log-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.WriteAheadFlowFileRepository.updateRepository(WriteAheadFlowFileRepository.java:210) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.WriteAheadFlowFileRepository.updateRepository(WriteAheadFlowFileRepository.java:178) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:363) ~[nifi-framework-core-1.1.1.jar:1.1.1]
... 13 common frames omitted
Caused by: java.io.IOException: Failed to write field 'Attributes'
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeRecordFields(SchemaRecordWriter.java:46) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeFieldValue(SchemaRecordWriter.java:131) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeFieldRepetitionAndValue(SchemaRecordWriter.java:57) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeRecordFields(SchemaRecordWriter.java:44) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
... 22 common frames omitted
Caused by: java.io.UTFDataFormatException: encoded string too long: 87941 bytes
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:364) ~[na:1.8.0_101]
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323) ~[na:1.8.0_101]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeFieldValue(SchemaRecordWriter.java:108) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeFieldRepetitionAndValue(SchemaRecordWriter.java:57) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeFieldValue(SchemaRecordWriter.java:124) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeFieldRepetitionAndValue(SchemaRecordWriter.java:84) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeRecordFields(SchemaRecordWriter.java:44) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
... View more
Labels:
- Labels:
-
Apache NiFi
01-27-2017
05:37 PM
Bah! I just realized the error of my ways. mvn --batch-mode release:prepare release:perform -Darguments="-Dmaven.deploy.skip=true -Dgpg.skip"
... View more
01-27-2017
05:30 PM
mvn --batch-mode release:prepare release:perform -Darguments="-Dmaven.deploy.skip=true" -Dgpg.skip Nope. It still looks like it's trying to run it. [INFO] [INFO] --- maven-gpg-plugin:1.5:sign (default) @ nifi-mycompany-bundle --- [INFO] gpg: no default secret key: No secret key [INFO] gpg: signing failed: No secret key ....[snip] [INFO] [ERROR] Failed to execute goal org.apache.maven.plugins:maven-gpg-plugin:1.5:sign (default) on project nifi-mycompany-bundle: Exit code: 2 -> [Help 1]
... View more
01-27-2017
04:41 PM
Hi, I created a maven project using the nifi-processor-bundle-archetype that contains some custom processors for my company. The problem I'm having is when I want to use the maven release plugin to create a release version and deploy it to our local repository I'm getting gpg errors. Is there any way to skip the gpg step? I don't need to sign my release for local use... Thanks!
... View more
Labels:
- Labels:
-
Apache NiFi
12-27-2016
05:51 PM
No, not from what I was able to find. If you want to start all processors in a processor group, you need to get the list of all processors in that group by calling GET /process-groups/{id}/processors and then one by one set them to RUNNING or STOPPED by doing PUT /processors/{id} Hope this helps!
... View more
12-20-2016
05:46 PM
1 Kudo
@Gayathri Rajendran My processor update json looks like this: {
"status": {
"runStatus": "RUNNING"
},
"component": {
"state": "RUNNING",
"id": "01571000-f7fb-1da4-7d70-b6be89100354"
},
"id": "01571000-f7fb-1da4-7d70-b6be89100354",
"revision": {
"version": 1,
"clientId": "c4cd2dc5-a57f-4025-aa4b-7e29118ce795"
}
}
You may need to add the separate "status" object in there. Also, I've never specified a parentGroupId so I'm not sure if that is necessary. Lastly, you may need to add a -H 'Accept:application/json' to your curl command as well. Hope this helps!
... View more
11-29-2016
07:02 PM
1 Kudo
You will need to do a Get on that processor first. The revision/version is contained in that json and you just pass it thru for your Put request. GET /processors/{id} returning json will have something like this: {
"status": { ... },
"component": { ... },
"revision": {
"version": 1
}
} So you would just take that whole revision element and add it to your Put payload. clientId is any string that identifies your tool/app. In my example above, I was just generating a random uuid. Hope this helps!
... View more
11-23-2016
06:27 PM
1 Kudo
Sure! The endpoint for updating a processor is a PUT request against /processors/[processor_id] where you would post json similar to what I have above. The runStatus should be either "STOPPED" or "RUNNING". You have to include the clientId or the version in the request. The 'component' field is also required. I discovered all this by reading the code since there really isn't a user guide on how to use these endpoints. As part of my script, I was getting to the list of processors by the name of the process group. So my flow is: GET /flow/search_results?q=[process_group_name] Then I get a list of processors for that group with: GET /process-groups/[process_group_id]/processors I loop through each of those results with the PUT request I mentioned above. Hope this helps!
... View more
11-22-2016
06:57 PM
4 Kudos
ok I figured it out. Since I was taking the processor object returned from my 'get' call and just modifying a few fields, it thought I was trying to do an update on the object. You cannot do this while it is running so that was the error message. I modified my request to only contain the minimum number of fields (i think) to stop the processor. I'm still unclear whether I need to set status.runStatus and/or component.state to STOPPED to get what I want as they both seem to indicate the same thing. Anyway, the below request works: modified_processor = {
'revision': {
'version': processor["revision"]["version"],
'clientId': str(uuid.uuid4())
},
'status': {
'runStatus': status
},
'component': {
'id': processor['id'],
'state': status
},
'id' : processor['id']
}
... View more
11-22-2016
05:37 PM
2 Kudos
Hi, I'm trying to start and stop processors via the nifi api version 1.0.0. I'm getting status code 409 returned and the message is "afb7dcf1-0157-1000-9450-ea0931a67e0f is not stopped." I have read previous articles about the optimistic locking and I am supplying the version and client id but I'm still getting this error. Any ideas? Here is a snippet of my python code: process_group = nifiapi.find_process_group(process_group_name)
if process_group is None:
return None
processors = nifiapi.get_processors_by_pg(process_group["id"])
if processors is None:
return None
for processor in processors['processors']:
processor["revision"]["clientId"] = str(uuid.uuid4())
processor["status"]["runStatus"] = "STOPPED"
logging.debug("Updating processor: {}".format(json.dumps(processor)))
nifiapi.update_processor(processor) # this makes the put request
... View more
Labels:
- Labels:
-
Apache NiFi
10-14-2016
02:54 PM
Thanks @Andy LoPresto. If I go the flow.xml.gz route, it looks like I have to shut down the currently running nifi instance, then copy the flow.xml.gz file to the conf directory and then restart the server. Is that right? When I tried to copy it over while the server was running it didn't appear to pick up the changes. The variable registry page you linked didn't really have much content at all. Is that under development or still just being discussed?
... View more
10-13-2016
05:43 PM
3 Kudos
What are people doing as far as version control and automated deployment for Nifi workflows? What I'd like is to develop or modify the workflow in a dev environment, check it in to git, then import this workflow into the QA environment. After testing is complete, deploy this to the production nifi instance(s). It would be best if this process: 1) is able to be automated 2) would replace the existing workflow without needing to manual intervention; i.e. re-enter sensitive values like I have to do when importing templates. thoughts?
... View more
Labels:
- Labels:
-
Apache NiFi
10-12-2016
10:08 PM
1 Kudo
Great! Thanks I will play with this. Is there a way to know when the whole workflow is complete? The last step in my workflow writes the data to a file, but it doesn't always come at once. Some items may be waiting in one of the queues or whatever. Suggestions?
... View more
10-12-2016
06:19 PM
2 Kudos
I have some external rest apis that I have to query for data periodically using InvokeHTTP. I'd like to pass in the date as a query arg which I last extracted data to only retrieve the incremental changes. What are the best practices on how to do this with Nifi? Should I * Use an external database table to update/query the last date? * Is there a different built in mechanism I can use to accomplish this? Currently, I'm just using ${now():toNumber():minus(86400000):format('yyyy-MM-dd')} to get the last day's date and passing this in to the rest api, but this isn't a good way to do it because if my daily load fails one day then the next day I will skip it.
... View more
Labels:
- Labels:
-
Apache NiFi
09-22-2016
02:42 PM
Ah Thanks @Pierre Villard! Nice blog post. This is good validation to what I did 🙂 I just discovered that my InvokeHTTP step had a longer schedule defined from earlier when I was testing that by itself. When I changed it to 0 sec it started paginating through the data like I expected. Thanks again!
... View more
09-22-2016
02:07 PM
Hi All, I'm trying to use InvokeHTTP to query a REST service that requires me to paginate through the results. Pagination is done through two url parameters, limit and offset. The first time through, limit=100, offset=0. If limit number of results are returned, I need to set offset to 100 and then query again. I think I'm close to getting the flow working but as you can see below, the request seems to sit in the queue after "Set Recurring Pagination Parameters" step. Any ideas? Am I missing a step? Are there any example flows out there on how to do this? EvaluateJsonPath pulls out a field from the resulting json that says how many results are returned. RouteOnAttribute sets more_pages if num_results = limit. Set Recurring Pagination Parameters modifies the variable used in the url of InvokeHTTP. Thanks in advance!
... View more
Labels:
- Labels:
-
Apache NiFi
09-08-2016
09:32 PM
3 Kudos
Thanks for the answer. What you posted didn't work, but I did play with and eventually got it. What I had to do was have an UpdateAttribute processor before my InvokeHTTP that sets an attribute called "Content-Type", then in InvokeHTTP, set "attributes to send" to "Content-Type" and it worked. Thanks!
... View more
09-08-2016
08:39 PM
1 Kudo
The web service I'm calling requires the Content-Type header to be set to application/json, but it appears the InvokeHttp processor only sets this header if it is a POST or PUT request. Is there any way to set this header for GET requests?
... View more
Labels:
- Labels:
-
Apache NiFi