Member since
06-06-2016
38
Posts
14
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2665 | 11-14-2016 09:08 AM | |
1329 | 10-31-2016 09:20 AM |
04-03-2017
12:18 PM
PutSlack was such a good addition! Be careful ingesting nifi-app.log though! I've tried this before and it quickly spirals out of control as each read of the log also generates log entries which then get picked up and generate more log entries.
... View more
01-25-2017
01:43 PM
1 Kudo
# Ruby Scripting in NiFi Today I was asked about an issue that I didn't know how to
solve using NiFi. On the surface it sounded simple; just map an attribute to
another value. The attribute to map was 'port' and based on the port number
just add an attribute to more easily identify the system downstream. E.g. for
port 10003; syslog:local, for 10004; syslog:db1, etc. After a bit of digging I
found a few different options to solve this. ## Many UpdateAttribute Processors The first is to create a new UpdateAttribute processor for
each incoming stream. This labels (places an attribute) on all files that come
in from that listener. It looked like this: ![Multiple UpdateAttribute Processors]({{ site.baseurl
}}/images/multiple_attribute_updates.png) This looks a bit confusing and tedious but is very precise
and arguably easier to read, especially when we label the processors. It also
has the added advantage of not having to change the port in more than one
place. If for example, the local logs coming in over port 10002 need to change
to port 10003, then we just make that change in the ListenSyslog processor and
the rest remains unchanged. ## One UpdateAttribute Using the Advanced The advanced option allowed me to keep all the configuration
in one place, easily mapping port numbers to tags. The two disadvantages I ran
into were: 1. A fairly tedious process to get each mapping. It
involved: * Create new rule * Add name * Search for existing
rule to import * Change port number
and associated label 2. Must now change the port in two different places if it
were to change I would look like: ![Single UpdateAttribute with Advanced Features]({{
site.baseurl }}/images/single-updateattribute-with-advanced-features.png) ## ExecuteScript Processor This again allows me to keep all the configuration in one
place and makes it much easier to make changes. I created a processor that stores the mappings in a hash and
adds the correct attribute appropriately. It looks like so: ![ExecuteScript Processor for Mapping]({{ site.baseurl
}}/images/executescript-processor-for-mapping.png) From the UI perspective it looks very similar to the single
UpdateAttribute solution. This requires the addition of the script: {% highlight ruby %} map = { 10 =>
"system1", 9 =>
"system2", 8 =>
"system3", } map.default = "unknown" flowFile = session.get() return unless flowFile label = map[flowFile.getAttribute("port")] flowFile = session.putAttribute(flowFile,
"system", label) session.transfer(flowFile, REL_SUCCESS) session.commit() {% endhighlight %} It is more complex by adding the need to understand a
scripting language and also doesn't remove the requirement of changing the port
number in more than one place. The script can add more complexity if it becomes
necessary to reference as a file rather than using the 'Script Body' option in
the processor. The main advantage is that it makes it easier to change the
mapping - just copy and paste one of the lines of the hash and make changes all
in one place. Given NiFi's goal of minimising the need for data flow
managers to know how to code, it's unlikely this is the best approach. # Conclusion The first option is quite foreign to programmers who feel
like it isn't generic. This is understandable given that it does feel a bit
like copy and paste. I would say it is the most NiFi way of achieving the
mapping as it is the solution which is most self-describing and resistant to
change.
... View more
Labels:
01-18-2017
03:57 PM
Hi @Andy Liang, In addition to @Pierre Villard's answer. There are three aspects of data processing joined up here: Streaming - Simple Event Processing This is what NiFi is very good at. All the information needed to do the processing is contained in the event. For example: Log processing: If the log contains an error then separate from the flow and send an email alert Transformation: Our legacy system uses XML but we want to use AVRO. Convert each XML event to AVRO Streaming - Complex Event Processing This is what Storm is good at covered by Pierre. Batch This is where MR/Hive/Spark (not spark streaming) come in. Land on HDFS and then the data can be processed and/or explored.
... View more
01-17-2017
04:40 PM
Thanks @Matt, thats a big help! It aligns with my understanding although I didn't know about the attributes. I currently have: 3.91GB of heap space allocated with 97% usage 6k/170MB flow files in just two queues No files seem to have large attributes (not checked all - just sample) 0 active threads With respect to point 4, this would only come into affect when the specific processor is running correct? If all relevant split/merge processors were stopped then this shouldn't have an effect. I can only imagine its a leak somewhere, I can't see any other reason why the heap would have grown to that size. If I was to turn off all processors, empty all queues and the memory still didn't drop, would this indicate a leak?
... View more
01-17-2017
12:43 PM
1 Kudo
My Dev NiFi instance is stuck (no active threads - nothing happening). I can see two errors in the log: Cannot update repository because all partitions are unusable at this time. Writing to the repository would cause corruption. This most often happens as a result of the repository running out of disk space or the JMV running out of memory. and Unable to merge /dfs1/nifi/data/provenance_repository/journals/4289216.journal.15 with other Journal Files due to java.io.FileNotFoundException: Unable to locate file /dfs1/nifi/data/provenance_repository/journals/4289216.journal.15
As suggested above, I looked at the disks and memory. The disks are fine (>30% free) but it looks like the JVM is running out of memory as the heap usage is currently (and consistently) 97%+. Also, machine still has 8g free. Are there legitimate reasons that NiFi might run out of memory or does this look more like a memory leak? There's lots of custom processors running around but I don't have access to the code. Are there resources about java memory management in a NiFi specific context? Just trying to narrow down what might have caused this. NiFi version is 0.6
... View more
Labels:
- Labels:
-
Apache NiFi
01-12-2017
12:35 AM
I have a custom NiFi processor that worked great up until me trying use a distributedMapCache. I tried to include it like: import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
...
public class ListBox extends AbstractListProcessor {
...
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node "
+ "begins pulling data, it won't duplicate all of the work that has been done.")
.required(false)
.identifiesControllerService(DistributedMapCacheClient.class)
.build();
But then when I mvn clean install and copy the nar over I get the following error: java.util.ServiceConfigurationError: org.apache.nifi.processor.Processor: Provider org.hortonworks.processors.boxconnector.ListBox could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232) ~[na:1.8.0_91]
at java.util.ServiceLoader.access$100(ServiceLoader.java:185) ~[na:1.8.0_91]
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384) ~[na:1.8.0_91]
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) ~[na:1.8.0_91]
at java.util.ServiceLoader$1.next(ServiceLoader.java:480) ~[na:1.8.0_91]
at org.apache.nifi.nar.ExtensionManager.loadExtensions(ExtensionManager.java:116) ~[nifi-nar-utils-1.1.0.jar:1.1.0]
at org.apache.nifi.nar.ExtensionManager.discoverExtensions(ExtensionManager.java:97) ~[nifi-nar-utils-1.1.0.jar:1.1.0]
at org.apache.nifi.NiFi.<init>(NiFi.java:139) ~[nifi-runtime-1.1.0.jar:1.1.0]
at org.apache.nifi.NiFi.main(NiFi.java:262) ~[nifi-runtime-1.1.0.jar:1.1.0]
Caused by: java.lang.NoClassDefFoundError: org/apache/nifi/distributed/cache/client/DistributedMapCacheClient
I also have the dependancy configured in my pom.xml file: <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
</dependency> If I copy over the distributed map cache nar before bundling it works fine. Is there somewhere else I have to list the dependancy to get it bundled into the nar?
... View more
Labels:
- Labels:
-
Apache NiFi
01-03-2017
12:37 PM
@Pierre Villard Thanks! I was hoping I had missed something in the API docs
... View more
01-03-2017
11:30 AM
Trying to work out the message consumption rate per time slice is fairly difficult with the current format where the stats are presented in a sliding 5 min window but updated every minute. Is it possible to get stats on a per minute or per second basis?
... View more
Labels:
- Labels:
-
Apache NiFi
11-22-2016
12:13 PM
I have ConsumeJMS reading form a Tibco Queue at NiFi version 0.6.1. NIFI-1628 only refers to the SSL integration. However use the latest version if that is possible as there are improvements that are worth having.
... View more
11-14-2016
09:08 AM
2 Kudos
Hi @m mary, Looks like you're out of disk space. Can you check your disks have space? Regards, Seb
... View more
- « Previous
-
- 1
- 2
- Next »