Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Merge Content for small content issue

avatar
Expert Contributor

This problem is theoretical at this point but we are trying to design our system to prepare for all scenarios.

Currently, we have this flow: ConsumeKafka -> MergeContent (Minimum Group Size: 10 KB blocks) -> UpdateAttribute (create filename based on date in format xx.year-month-day-hour.log) -> PutDFS (Into directories of this structure: year/month/day/hour with the Conflict Resolution Strategy being: append)

This format allows us to take 10 KB chunks of data and append them into files that are divided for each hour.

This is the theoretical issue. What happens if, perhaps, we have less than 10 KB data in an hour.. say we have 1 KB or 1 event per hour. That means events over multiple hours would be in the 10 KB chunk and then they would be incorrectly indexed in the particular filenames? Has anyone encountered this situation (which I'd expect would a common concern) and how they overcame it?

An idea is to switch the MergeContent Minimum Group Size to 0 B so that it doesn't buffer any files in chunks. This seems to resolve the issue of events per hour being less than 10 KB by not buffering but it reveals a new issue that we have a delay of 3 min from Syslog to Kafka to HDFS so essentially, we are not indexing the data correctly by hour. The hourly files are from xx:57:xx of the previous hour to xx:57:xx of the current hour.

Upon further examination of this method, it seems rather ad-hoc. Is there a way with Nifi to read every timestamp attached to an event and parse each event into a file that is correctly named for the time frame of the timestamp? If theres any delay in events even by a minute or less, events will be placed into the wrong filenames if they are created by the hour.

Any suggestions are very appreciated

1 ACCEPTED SOLUTION

avatar
Master Guru

You should be able to use the timestamp contained with in the syslog messages. There is a ParseSyslog processor which should work if your syslog messages are standard RFC5424 and RFC3164 formats. This will produce an attribute on each flow file called syslog.timestamp, from there you probably need to use an UpdateAttribute (or some processor) to take syslog.timestamp and create an attribute for the HDFS directory using the year, month, day, and hour from syslog.timestamp so that you have something like "/yyyy/mm/dd/hh", lets say this attribute is called "hadoop.dir".

In MergeContent you can set the "Correlation Attribute" to "hadoop.dir" so that you only ever merge together events that have the same directory which means they came from the same hour. You should be able to set the Minimum Group Size to 10Kb and the Max Bin Age to something, maybe 10 mins, so you can say merge every 10kb or every 10 mins.

Then in PutHDFS you would use ${hadoop.dir} as the directory which allows you to handle data that comes in way later. Lets say somehow you receive 1 syslog event with an event time from 5 hours ago... current time is 2017-03-21-11 and the event time is 2017-03-21-06, so your "hadoop.dir" attribute is "/2017/03/21/06" so when it hits MergeContent it creates a new bin for that value and then sits there for 10 mins since no other events are coming in for that hour, after 10 mins it merges and PutHDFS appends to the file in the folder from 5 hours ago.

View solution in original post

5 REPLIES 5

avatar
Master Guru

You should be able to use the timestamp contained with in the syslog messages. There is a ParseSyslog processor which should work if your syslog messages are standard RFC5424 and RFC3164 formats. This will produce an attribute on each flow file called syslog.timestamp, from there you probably need to use an UpdateAttribute (or some processor) to take syslog.timestamp and create an attribute for the HDFS directory using the year, month, day, and hour from syslog.timestamp so that you have something like "/yyyy/mm/dd/hh", lets say this attribute is called "hadoop.dir".

In MergeContent you can set the "Correlation Attribute" to "hadoop.dir" so that you only ever merge together events that have the same directory which means they came from the same hour. You should be able to set the Minimum Group Size to 10Kb and the Max Bin Age to something, maybe 10 mins, so you can say merge every 10kb or every 10 mins.

Then in PutHDFS you would use ${hadoop.dir} as the directory which allows you to handle data that comes in way later. Lets say somehow you receive 1 syslog event with an event time from 5 hours ago... current time is 2017-03-21-11 and the event time is 2017-03-21-06, so your "hadoop.dir" attribute is "/2017/03/21/06" so when it hits MergeContent it creates a new bin for that value and then sits there for 10 mins since no other events are coming in for that hour, after 10 mins it merges and PutHDFS appends to the file in the folder from 5 hours ago.

avatar
Expert Contributor

Unfortunately (and I'm not sure why), our Syslog results are not conforming to those standards cause we get the error shown in the screenshot. Im not sure if this has something to do with the Syslog data going into Kafka and then us having to Consume it from Kafka, maybe its altering the format. Is there a way to get the timestamp data from data from a Kafka topic?

13852-screen-shot-2017-03-21-at-20924-pm.png

avatar
Master Guru

If you are familiar with regular expressions you could try to use ExtractText to parse the timestamp from the syslog message, or maybe write a script and use ExecuteScript.

I don't think Kafka offers any kind of timestamp. Your next best option would be to use UpdateAttribute right after ConsumeKafka to create "hadoop.dir", using expression language to get the current time. This approach should correctly bin data based on when you consumed it from Kafka, but won't handle the case I talked about earlier where you got data from a previous hour.

avatar
Expert Contributor

Thanks Bryan. Im really not sure why this data that came from Syslog into Kafka is not matching the Syslog RFC standards... We didn't alter the data in Kafka at all.

This is an example of what an event looks like... looks like standard Syslog data to me. Im hesitant about using ExtractText but will if I have to.

Mar 21 18:10:37 xxxfw01 /kernel: KERN_ARP_ADDR_CHANGE: arp info overwritten for 10.10.1.000 from 00:a0:ad:0e:4f:a1 to 00:e1:ed:34:26:c5

If I was to extract the Timestamp from the file using ExtractText and say assign it to Timestamp as an attribute, then can I use that new Timestamp attribute to create the filename and directory instead of using the time it was consumed from Kafka (which is a 3 min delay from the timestamps in the log files)

avatar
Master Guru

Syslog messages start with a priority which is enclosed in < > so it should have started with something like "<10> Mar 21...."

https://tools.ietf.org/html/rfc5424#section-6.2.1

https://www.ietf.org/rfc/rfc3164 (section 4.1.1)

Regarding using ExtractText... yes if you got the date out of the content and into an attribute then you should be able to use UpdateAttribute with expression language functions to parse the date into the directory and filename you want.