Member since
09-08-2016
27
Posts
23
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
15690 | 11-22-2016 06:57 PM | |
8901 | 09-08-2016 09:32 PM |
01-31-2018
07:25 PM
I have a nifi workflow that reads avro messages from kafka and writes them as snappy compressed parquet files to s3. I'm noticing some behavior I did not expect with this: In PutParquet I'm specifying the directory as s3://mybucketname/events/${kafka.topic}/${event}/${event_timestamp:format("yyyy/MM/dd/HH")} but it seems to be creating a directory with a blank name at the top level of my bucket before moving on to events/topic_name/etc. For all subsequent sub directories, it seems to be creating a file in addition to the directory with the same name. It is creating block files at the top level of the bucket, i.e. "block_1903392314985430358" The files it creates don't have a .snappy or a .parquet extension. Can anyone shed any light into what is happening here and how I can fix it? I'm using Apache Nifi 1.4.0 My core-site.xml has: <property>
<name>fs.defaultFS</name>
<value>s3://mybucketname</value>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3.S3FileSystem</value>
</property>
... View more
Labels:
- Labels:
-
Apache NiFi
07-27-2017
12:01 AM
@Harsh ChoudharyAgreed. I came to the conclusion that the distributed map cache is too flakey to keep track of important things. We've seen it mysteriously fail several times and have since changed all our processes to use a database.
... View more
02-16-2017
06:57 PM
1 Kudo
Thanks @Matt Burgess! This is working much better for me. For future reference, there are jython examples on how to do this in the unit tests of the nifi project: https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/jython
... View more
02-16-2017
04:05 PM
2 Kudos
I'm seeing very poor performance in my python ExecuteScript. Essentially, for each flow file, I'm querying the Distributed Map Cache and joining that information with the info in the flow file. Two things are coming to mind as far as possible performance bottlenecks: * Is ExecuteScript creating a new jython environment for every flow file? Or is it spinning it up once for each concurrent task and reusing? * If only one jython environment is created per concurrent thread, is it possible for me to connect to the DMC just once and then just query keys for all further executions? Is there a setup method hook or something like that? Any help would be appreciated. Thanks!
... View more
Labels:
- Labels:
-
Apache NiFi
02-14-2017
08:40 PM
Nevermind, I see the 'module directory' property in ExecuteScript.
... View more
02-14-2017
08:29 PM
Ah, thanks @Bryan Bende. I remember seeing some sample code on how to connect to DMC from within ExecuteScript. My script is in python...do you know offhand if ExecuteScript/jython includes libraries installed via pip? I'd like to write a library for interacting with DMC in python so that my actual join script isn't as complicated.
... View more
02-14-2017
06:31 PM
Thanks for the quick replies! This is very helpful. Yes, I'm storing a fairly large value in an attribute, but maybe you guys can suggest an alternate approach? What I'm doing right now is processing survey results. Unfortunately, the survey question data and the responses are coming in as separate streams. I want to be able to join these two data sets while in my Nifi flow so I don't have to kick off a separate ETL. So, what I chose to do is store the question data in the distributed map cache and then as each response comes in, query the cache by the survey id and assuming it is found, put the question data into an attribute. Then, an ExecuteScript runs to join the flowfile content (the response) and the attribute value (questions). Is there another, more scalable way, to do this?
... View more
02-14-2017
05:29 PM
when the latter happens, my nifi gets into a state where active threads are permanently stuck and I have to restart the server to recover.
... View more
02-14-2017
05:16 PM
I'm getting this error in my logs. Anyone knows what causes this or how to prevent it? Disk has plenty of space on it. 2017-02-14 17:04:04,687 ERROR [Timer-Driven Process Thread-2] o.a.n.p.s.FetchDistributedMapCache FetchDistributedMapCache[id=e69c1dbb-1011-1157-f
7f1-321d05a0a0f7] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: FlowFile Repository failed to update: or
g.apache.nifi.processor.exception.ProcessException: FlowFile Repository failed to update
2017-02-14 17:04:04,687 ERROR [Timer-Driven Process Thread-2] o.a.n.p.s.FetchDistributedMapCache
org.apache.nifi.processor.exception.ProcessException: FlowFile Repository failed to update
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:369) ~[nifi-framework-core-1.1.1.jar:1
.1.1]
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:305) ~[nifi-framework-core-1.1.1.jar:1
.1.1]
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28) ~[nifi-api-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.1.1.jar
:1.1.1]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.1.1.jar:
1.1.1]
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.1.1
.jar:1.1.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_101]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_101]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_101]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: java.io.IOException: All Partitions have been blacklisted due to failures when attempting to update. If the Write-Ahead Log is able to
perform a checkpoint, this issue may resolve itself. Otherwise, manual intervention will be required.
at org.wali.MinimalLockingWriteAheadLog.update(MinimalLockingWriteAheadLog.java:220) ~[nifi-write-ahead-log-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.WriteAheadFlowFileRepository.updateRepository(WriteAheadFlowFileRepository.java:210) ~[nifi-fram
ework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.WriteAheadFlowFileRepository.updateRepository(WriteAheadFlowFileRepository.java:178) ~[nifi-fram
ework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:363) ~[nifi-framework-core-1.1.1.jar:1
.1.1]
... 13 common frames omitted
I'm also seeing this error which may or may not be related (UTFDataFormatException: encoded string too long: 87941 bytes) 2017-02-14 17:08:44,567 ERROR [Timer-Driven Process Thread-7] o.a.n.p.s.FetchDistributedMapCache
org.apache.nifi.processor.exception.ProcessException: FlowFile Repository failed to update
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:369) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:305) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28) ~[nifi-api-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.1.1.jar:1.1.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_101]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_101]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_101]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: java.io.IOException: Failed to write field 'Repository Record Update'
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeRecordFields(SchemaRecordWriter.java:46) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeRecord(SchemaRecordWriter.java:35) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.SchemaRepositoryRecordSerde.serializeRecord(SchemaRepositoryRecordSerde.java:95) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.SchemaRepositoryRecordSerde.serializeEdit(SchemaRepositoryRecordSerde.java:67) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.SchemaRepositoryRecordSerde.serializeEdit(SchemaRepositoryRecordSerde.java:46) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.wali.MinimalLockingWriteAheadLog$Partition.update(MinimalLockingWriteAheadLog.java:957) ~[nifi-write-ahead-log-1.1.1.jar:1.1.1]
at org.wali.MinimalLockingWriteAheadLog.update(MinimalLockingWriteAheadLog.java:238) ~[nifi-write-ahead-log-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.WriteAheadFlowFileRepository.updateRepository(WriteAheadFlowFileRepository.java:210) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.WriteAheadFlowFileRepository.updateRepository(WriteAheadFlowFileRepository.java:178) ~[nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:363) ~[nifi-framework-core-1.1.1.jar:1.1.1]
... 13 common frames omitted
Caused by: java.io.IOException: Failed to write field 'Attributes'
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeRecordFields(SchemaRecordWriter.java:46) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeFieldValue(SchemaRecordWriter.java:131) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeFieldRepetitionAndValue(SchemaRecordWriter.java:57) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeRecordFields(SchemaRecordWriter.java:44) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
... 22 common frames omitted
Caused by: java.io.UTFDataFormatException: encoded string too long: 87941 bytes
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:364) ~[na:1.8.0_101]
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323) ~[na:1.8.0_101]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeFieldValue(SchemaRecordWriter.java:108) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeFieldRepetitionAndValue(SchemaRecordWriter.java:57) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeFieldValue(SchemaRecordWriter.java:124) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeFieldRepetitionAndValue(SchemaRecordWriter.java:84) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
at org.apache.nifi.repository.schema.SchemaRecordWriter.writeRecordFields(SchemaRecordWriter.java:44) ~[nifi-schema-utils-1.1.1.jar:1.1.1]
... View more
Labels:
- Labels:
-
Apache NiFi
01-27-2017
05:37 PM
Bah! I just realized the error of my ways. mvn --batch-mode release:prepare release:perform -Darguments="-Dmaven.deploy.skip=true -Dgpg.skip"
... View more