Member since
09-29-2015
871
Posts
721
Kudos Received
255
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2671 | 12-03-2018 02:26 PM | |
1754 | 10-16-2018 01:37 PM | |
3150 | 10-03-2018 06:34 PM | |
1883 | 09-05-2018 07:44 PM | |
1486 | 09-05-2018 07:31 PM |
03-21-2017
06:23 PM
If you are familiar with regular expressions you could try to use ExtractText to parse the timestamp from the syslog message, or maybe write a script and use ExecuteScript. I don't think Kafka offers any kind of timestamp. Your next best option would be to use UpdateAttribute right after ConsumeKafka to create "hadoop.dir", using expression language to get the current time. This approach should correctly bin data based on when you consumed it from Kafka, but won't handle the case I talked about earlier where you got data from a previous hour.
... View more
03-21-2017
04:05 PM
1 Kudo
You should be able to use the timestamp contained with in the syslog messages. There is a ParseSyslog processor which should work if your syslog messages are standard RFC5424 and RFC3164 formats. This will produce an attribute on each flow file called syslog.timestamp, from there you probably need to use an UpdateAttribute (or some processor) to take syslog.timestamp and create an attribute for the HDFS directory using the year, month, day, and hour from syslog.timestamp so that you have something like "/yyyy/mm/dd/hh", lets say this attribute is called "hadoop.dir". In MergeContent you can set the "Correlation Attribute" to "hadoop.dir" so that you only ever merge together events that have the same directory which means they came from the same hour. You should be able to set the Minimum Group Size to 10Kb and the Max Bin Age to something, maybe 10 mins, so you can say merge every 10kb or every 10 mins. Then in PutHDFS you would use ${hadoop.dir} as the directory which allows you to handle data that comes in way later. Lets say somehow you receive 1 syslog event with an event time from 5 hours ago... current time is 2017-03-21-11 and the event time is 2017-03-21-06, so your "hadoop.dir" attribute is "/2017/03/21/06" so when it hits MergeContent it creates a new bin for that value and then sits there for 10 mins since no other events are coming in for that hour, after 10 mins it merges and PutHDFS appends to the file in the folder from 5 hours ago.
... View more
03-20-2017
02:56 PM
2 Kudos
I answered this question on stackoverflow: https://stackoverflow.com/questions/42902718/state-manager-not-persisting-retrieving-data
... View more
03-17-2017
06:53 PM
I think if you change the schema type to record it will work...It will should take each entry in the JSON array, and write it as a record in the Avro data file.
... View more
03-17-2017
03:53 PM
Can you use the "matches" function in expression language? It should allow any regular expression: https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html#matches
... View more
03-17-2017
02:35 PM
1 Kudo
Yes its in the latest code that hasn't been released yet, so it would be in the next version which will likely be Apache NiFi 1.2.
... View more
03-17-2017
02:29 PM
In the latest code in master, there is an improvement to ListHDFS to add a new property: public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
.name("File Filter")
.description("Only files whose names match the given regular expression will be picked up")
.required(true)
.defaultValue("[^\\.].*")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
Does this help?
... View more
03-15-2017
08:21 PM
Wouldn't you define a new network input in Splunk enterprise, as described here: http://docs.splunk.com/Documentation/Splunk/latest/Data/Monitornetworkports Part of defining the input you would be choosing UDP or TCP, and specifying the type and index, which is essentially configuring the inputs.conf for Splunk enterprise. NiFi is just writing bytes to a socket via UDP or TCP, its not modifying the data in anyway (unless you do so earlier in the flow). You mentioned inputs.conf on the universal forwarder, isn't that for data coming into the universal forwarder? not for data sent from the forwarder to the indexer.
... View more
03-15-2017
08:10 PM
I think you could might be able to use the new Wait and Notify processors that should be in the upcoming 1.2 release of Apache NiFi. 1st processor -> Notify -> Funnel 2nd processor -> Notify -> Funnel Funnel -> Wait (Signal Count == 2) Not sure if that is exactly what you described, but seems like it could work. https://ijokarumawak.github.io/nifi/2017/02/02/nifi-notify-batch/
... View more
03-15-2017
08:02 PM
1 Kudo
PutSplunk sends data to Splunk over a TCP or UDP input, its essentially a combination of PutTcp and PutUdp wrapped into a single processor. Wouldn't the type and index be specified when you create the input in Splunk? As far as load balancing, you could possibly use DistributeLoad processor to route to two different PutSplunk processors pointing at different hosts, or stick a load balancer like haproxy or nginx in front of the indexers and point PutSplunk at the load balancer.
... View more