Support Questions

Find answers, ask questions, and share your expertise

'readJson' gives errors in Morphline

avatar
Explorer

Just starting with Flume and Morphlines.

I am using Flume spooldir source to read a file of JSON records.

Need to expose a timestamp field from each record into the header so that Flume hdfs sink can partition by day.

For testing I'm just using Flume logger sink.

Trying to use a Morphline interceptor to extract the timestamp field 'occurrence_date' from the JSON, but it fails.

I've searched found a few JSON examples,

http://blog.cloudera.com/blog/2014/04/how-to-process-data-using-morphlines-in-kite-sdk/

I don't see a difference, have tried many many variations, no luck.

 

What am I doing wrong?

 

...

15/05/19 10:28:07 ERROR stdlib.LogErrorBuilder$LogError: record: [{_attachment_body=[{"received_by_source_utc_offset":"-300","latitude":"44.041803","occurrence_utc_offset":"-300","heading":"1.21","occurrence_date":"2015-02-24 13:40:48","speed_unit":"meters/sec","longitude":"-123.082105","speed":"3.981","external_device_id":"ER00E925","asset_name_desc":"12-423","received_by_source_date":"2015-02-24 13:43:00.789526","external_source":"Lytx","device_timezone":"EST "}], _attachment_mimetype=[json/java+memory]}]
15/05/19 10:28:07 ERROR source.SpoolDirectorySource: FATAL: Spool Directory source spool-1: { spoolDir: /root/data/spool }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
org.apache.flume.FlumeException: org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor must non generate attachments that are not a byte[] or InputStream
        at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor.toEvent(MorphlineInterceptor.java:196)

...

 

JSON record:

 

[{"received_by_source_utc_offset": "-300", "latitude": "44.041803", "occurrence_utc_offset": "-300", "heading": "1.21", "occurrence_date": "2
015-02-24 13:40:48", "speed_unit": "meters/sec", "longitude": "-123.082105", "speed": "3.981", "external_device_id": "ER00E925", "asset_name_
desc": "12-423", "received_by_source_date": "2015-02-24 13:43:00.789526", "external_source": "Lytx", "device_timezone": "EST "}]

 

Morphlines script:

 

morphlines : [
  {
    id : morphline1
    importCommands : ["org.kitesdk.**"]
    commands : [
      { readJson {
            outputClass : com.fasterxml.jackson.databind.JsonNode
        }
      }
     
      { logError { format : "record: {}", args : ["@{}"] } }

      { extractJsonPaths {
          flatten : false
          paths : {
            my_timestamp : "/occurrence_date"
            my_offset : "/received_by_source_utc_offset"
          }
        }
      }
    ]
  }
]

2 ACCEPTED SOLUTIONS

avatar
Explorer

Thanks for the quick reply.

I have a solution that works.

Must use the Jackson JSON from 'readJson' in order for 'extractJsonPaths' to work.

But my attempts to then coerce the '_attachment_body' back to byte[] or InputStream were unsuccessful.

So instead I first made a copy of '_attachment_body' and then restored it after processing.

Surely there is a cleaner way to accomplish this?

 

1) make a copy of '_attachement_body' as bodybak
            { addValuesIfAbsent {
              bodybak : "@{_attachment_body}"
              }
            }
2) do the transformations, read JSON
            {
                readJson {
                    outputClass : com.fasterxml.jackson.databind.JsonNode
                }
            }
   parse out fields we want, grab occurrence_date, offset
            {
              extractJsonPaths {
                flatten : true
                paths : {
                  timestamp : "/occurrence_date"
                  my_offset : "/received_by_source_utc_offset"
                }
              }
            }
   convert the timestamp to unixmilli (I'll apply offset eventually)
            {
              convertTimestamp {
                field : timestamp
                inputFormats : ["yyyy-MM-dd' 'HH:mm:ss"]
                inputTimezone : America/Los_Angeles
                outputFormat : "unixTimeInMillis"
                outputTimezone : UTC
              }
            }
     
3) restore '_attachement_body' from bodybak, then remove everything except what is needed in header (timestamp)
            { setValues {
               _attachment_body : "@{bodybak}"
               bodybak : []
              _attachment_mimetype : []
               my_offset : []
              }
            }

View solution in original post

avatar
Super Collaborator
You found the recommended solution already 🙂

View solution in original post

3 REPLIES 3

avatar
Super Collaborator
The MorphlineInterceptor expects a byte[] or java.io.InputStream in the _attachment_body field of the morphline output record. This will become the body of the flume output event. In your case the _attachment_body field instead contains a jackson JsonNode object - hence it complains.

avatar
Explorer

Thanks for the quick reply.

I have a solution that works.

Must use the Jackson JSON from 'readJson' in order for 'extractJsonPaths' to work.

But my attempts to then coerce the '_attachment_body' back to byte[] or InputStream were unsuccessful.

So instead I first made a copy of '_attachment_body' and then restored it after processing.

Surely there is a cleaner way to accomplish this?

 

1) make a copy of '_attachement_body' as bodybak
            { addValuesIfAbsent {
              bodybak : "@{_attachment_body}"
              }
            }
2) do the transformations, read JSON
            {
                readJson {
                    outputClass : com.fasterxml.jackson.databind.JsonNode
                }
            }
   parse out fields we want, grab occurrence_date, offset
            {
              extractJsonPaths {
                flatten : true
                paths : {
                  timestamp : "/occurrence_date"
                  my_offset : "/received_by_source_utc_offset"
                }
              }
            }
   convert the timestamp to unixmilli (I'll apply offset eventually)
            {
              convertTimestamp {
                field : timestamp
                inputFormats : ["yyyy-MM-dd' 'HH:mm:ss"]
                inputTimezone : America/Los_Angeles
                outputFormat : "unixTimeInMillis"
                outputTimezone : UTC
              }
            }
     
3) restore '_attachement_body' from bodybak, then remove everything except what is needed in header (timestamp)
            { setValues {
               _attachment_body : "@{bodybak}"
               bodybak : []
              _attachment_mimetype : []
               my_offset : []
              }
            }

avatar
Super Collaborator
You found the recommended solution already 🙂