Created on 05-19-2015 09:26 AM - edited 09-16-2022 02:29 AM
Just starting with Flume and Morphlines.
I am using Flume spooldir source to read a file of JSON records.
Need to expose a timestamp field from each record into the header so that Flume hdfs sink can partition by day.
For testing I'm just using Flume logger sink.
Trying to use a Morphline interceptor to extract the timestamp field 'occurrence_date' from the JSON, but it fails.
I've searched found a few JSON examples,
http://blog.cloudera.com/blog/2014/04/how-to-process-data-using-morphlines-in-kite-sdk/
I don't see a difference, have tried many many variations, no luck.
What am I doing wrong?
...
15/05/19 10:28:07 ERROR stdlib.LogErrorBuilder$LogError: record: [{_attachment_body=[{"received_by_source_utc_offset":"-300","latitude":"44.041803","occurrence_utc_offset":"-300","heading":"1.21","occurrence_date":"2015-02-24 13:40:48","speed_unit":"meters/sec","longitude":"-123.082105","speed":"3.981","external_device_id":"ER00E925","asset_name_desc":"12-423","received_by_source_date":"2015-02-24 13:43:00.789526","external_source":"Lytx","device_timezone":"EST "}], _attachment_mimetype=[json/java+memory]}]
15/05/19 10:28:07 ERROR source.SpoolDirectorySource: FATAL: Spool Directory source spool-1: { spoolDir: /root/data/spool }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
org.apache.flume.FlumeException: org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor must non generate attachments that are not a byte[] or InputStream
at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor.toEvent(MorphlineInterceptor.java:196)
...
JSON record:
[{"received_by_source_utc_offset": "-300", "latitude": "44.041803", "occurrence_utc_offset": "-300", "heading": "1.21", "occurrence_date": "2
015-02-24 13:40:48", "speed_unit": "meters/sec", "longitude": "-123.082105", "speed": "3.981", "external_device_id": "ER00E925", "asset_name_
desc": "12-423", "received_by_source_date": "2015-02-24 13:43:00.789526", "external_source": "Lytx", "device_timezone": "EST "}]
Morphlines script:
morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.**"]
commands : [
{ readJson {
outputClass : com.fasterxml.jackson.databind.JsonNode
}
}
{ logError { format : "record: {}", args : ["@{}"] } }
{ extractJsonPaths {
flatten : false
paths : {
my_timestamp : "/occurrence_date"
my_offset : "/received_by_source_utc_offset"
}
}
}
]
}
]
Created 05-21-2015 09:54 AM
Thanks for the quick reply.
I have a solution that works.
Must use the Jackson JSON from 'readJson' in order for 'extractJsonPaths' to work.
But my attempts to then coerce the '_attachment_body' back to byte[] or InputStream were unsuccessful.
So instead I first made a copy of '_attachment_body' and then restored it after processing.
Surely there is a cleaner way to accomplish this?
1) make a copy of '_attachement_body' as bodybak
{ addValuesIfAbsent {
bodybak : "@{_attachment_body}"
}
}
2) do the transformations, read JSON
{
readJson {
outputClass : com.fasterxml.jackson.databind.JsonNode
}
}
parse out fields we want, grab occurrence_date, offset
{
extractJsonPaths {
flatten : true
paths : {
timestamp : "/occurrence_date"
my_offset : "/received_by_source_utc_offset"
}
}
}
convert the timestamp to unixmilli (I'll apply offset eventually)
{
convertTimestamp {
field : timestamp
inputFormats : ["yyyy-MM-dd' 'HH:mm:ss"]
inputTimezone : America/Los_Angeles
outputFormat : "unixTimeInMillis"
outputTimezone : UTC
}
}
3) restore '_attachement_body' from bodybak, then remove everything except what is needed in header (timestamp)
{ setValues {
_attachment_body : "@{bodybak}"
bodybak : []
_attachment_mimetype : []
my_offset : []
}
}
Created 05-22-2015 12:26 AM
Created 05-20-2015 06:17 AM
Created 05-21-2015 09:54 AM
Thanks for the quick reply.
I have a solution that works.
Must use the Jackson JSON from 'readJson' in order for 'extractJsonPaths' to work.
But my attempts to then coerce the '_attachment_body' back to byte[] or InputStream were unsuccessful.
So instead I first made a copy of '_attachment_body' and then restored it after processing.
Surely there is a cleaner way to accomplish this?
1) make a copy of '_attachement_body' as bodybak
{ addValuesIfAbsent {
bodybak : "@{_attachment_body}"
}
}
2) do the transformations, read JSON
{
readJson {
outputClass : com.fasterxml.jackson.databind.JsonNode
}
}
parse out fields we want, grab occurrence_date, offset
{
extractJsonPaths {
flatten : true
paths : {
timestamp : "/occurrence_date"
my_offset : "/received_by_source_utc_offset"
}
}
}
convert the timestamp to unixmilli (I'll apply offset eventually)
{
convertTimestamp {
field : timestamp
inputFormats : ["yyyy-MM-dd' 'HH:mm:ss"]
inputTimezone : America/Los_Angeles
outputFormat : "unixTimeInMillis"
outputTimezone : UTC
}
}
3) restore '_attachement_body' from bodybak, then remove everything except what is needed in header (timestamp)
{ setValues {
_attachment_body : "@{bodybak}"
bodybak : []
_attachment_mimetype : []
my_offset : []
}
}
Created 05-22-2015 12:26 AM