Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

'readJson' gives errors in Morphline

avatar
Explorer

Just starting with Flume and Morphlines.

I am using Flume spooldir source to read a file of JSON records.

Need to expose a timestamp field from each record into the header so that Flume hdfs sink can partition by day.

For testing I'm just using Flume logger sink.

Trying to use a Morphline interceptor to extract the timestamp field 'occurrence_date' from the JSON, but it fails.

I've searched found a few JSON examples,

http://blog.cloudera.com/blog/2014/04/how-to-process-data-using-morphlines-in-kite-sdk/

I don't see a difference, have tried many many variations, no luck.

 

What am I doing wrong?

 

...

15/05/19 10:28:07 ERROR stdlib.LogErrorBuilder$LogError: record: [{_attachment_body=[{"received_by_source_utc_offset":"-300","latitude":"44.041803","occurrence_utc_offset":"-300","heading":"1.21","occurrence_date":"2015-02-24 13:40:48","speed_unit":"meters/sec","longitude":"-123.082105","speed":"3.981","external_device_id":"ER00E925","asset_name_desc":"12-423","received_by_source_date":"2015-02-24 13:43:00.789526","external_source":"Lytx","device_timezone":"EST "}], _attachment_mimetype=[json/java+memory]}]
15/05/19 10:28:07 ERROR source.SpoolDirectorySource: FATAL: Spool Directory source spool-1: { spoolDir: /root/data/spool }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
org.apache.flume.FlumeException: org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor must non generate attachments that are not a byte[] or InputStream
        at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor.toEvent(MorphlineInterceptor.java:196)

...

 

JSON record:

 

[{"received_by_source_utc_offset": "-300", "latitude": "44.041803", "occurrence_utc_offset": "-300", "heading": "1.21", "occurrence_date": "2
015-02-24 13:40:48", "speed_unit": "meters/sec", "longitude": "-123.082105", "speed": "3.981", "external_device_id": "ER00E925", "asset_name_
desc": "12-423", "received_by_source_date": "2015-02-24 13:43:00.789526", "external_source": "Lytx", "device_timezone": "EST "}]

 

Morphlines script:

 

morphlines : [
  {
    id : morphline1
    importCommands : ["org.kitesdk.**"]
    commands : [
      { readJson {
            outputClass : com.fasterxml.jackson.databind.JsonNode
        }
      }
     
      { logError { format : "record: {}", args : ["@{}"] } }

      { extractJsonPaths {
          flatten : false
          paths : {
            my_timestamp : "/occurrence_date"
            my_offset : "/received_by_source_utc_offset"
          }
        }
      }
    ]
  }
]

2 ACCEPTED SOLUTIONS

avatar
Explorer

Thanks for the quick reply.

I have a solution that works.

Must use the Jackson JSON from 'readJson' in order for 'extractJsonPaths' to work.

But my attempts to then coerce the '_attachment_body' back to byte[] or InputStream were unsuccessful.

So instead I first made a copy of '_attachment_body' and then restored it after processing.

Surely there is a cleaner way to accomplish this?

 

1) make a copy of '_attachement_body' as bodybak
            { addValuesIfAbsent {
              bodybak : "@{_attachment_body}"
              }
            }
2) do the transformations, read JSON
            {
                readJson {
                    outputClass : com.fasterxml.jackson.databind.JsonNode
                }
            }
   parse out fields we want, grab occurrence_date, offset
            {
              extractJsonPaths {
                flatten : true
                paths : {
                  timestamp : "/occurrence_date"
                  my_offset : "/received_by_source_utc_offset"
                }
              }
            }
   convert the timestamp to unixmilli (I'll apply offset eventually)
            {
              convertTimestamp {
                field : timestamp
                inputFormats : ["yyyy-MM-dd' 'HH:mm:ss"]
                inputTimezone : America/Los_Angeles
                outputFormat : "unixTimeInMillis"
                outputTimezone : UTC
              }
            }
     
3) restore '_attachement_body' from bodybak, then remove everything except what is needed in header (timestamp)
            { setValues {
               _attachment_body : "@{bodybak}"
               bodybak : []
              _attachment_mimetype : []
               my_offset : []
              }
            }

View solution in original post

avatar
Super Collaborator
You found the recommended solution already 🙂

View solution in original post

3 REPLIES 3

avatar
Super Collaborator
The MorphlineInterceptor expects a byte[] or java.io.InputStream in the _attachment_body field of the morphline output record. This will become the body of the flume output event. In your case the _attachment_body field instead contains a jackson JsonNode object - hence it complains.

avatar
Explorer

Thanks for the quick reply.

I have a solution that works.

Must use the Jackson JSON from 'readJson' in order for 'extractJsonPaths' to work.

But my attempts to then coerce the '_attachment_body' back to byte[] or InputStream were unsuccessful.

So instead I first made a copy of '_attachment_body' and then restored it after processing.

Surely there is a cleaner way to accomplish this?

 

1) make a copy of '_attachement_body' as bodybak
            { addValuesIfAbsent {
              bodybak : "@{_attachment_body}"
              }
            }
2) do the transformations, read JSON
            {
                readJson {
                    outputClass : com.fasterxml.jackson.databind.JsonNode
                }
            }
   parse out fields we want, grab occurrence_date, offset
            {
              extractJsonPaths {
                flatten : true
                paths : {
                  timestamp : "/occurrence_date"
                  my_offset : "/received_by_source_utc_offset"
                }
              }
            }
   convert the timestamp to unixmilli (I'll apply offset eventually)
            {
              convertTimestamp {
                field : timestamp
                inputFormats : ["yyyy-MM-dd' 'HH:mm:ss"]
                inputTimezone : America/Los_Angeles
                outputFormat : "unixTimeInMillis"
                outputTimezone : UTC
              }
            }
     
3) restore '_attachement_body' from bodybak, then remove everything except what is needed in header (timestamp)
            { setValues {
               _attachment_body : "@{bodybak}"
               bodybak : []
              _attachment_mimetype : []
               my_offset : []
              }
            }

avatar
Super Collaborator
You found the recommended solution already 🙂