Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

'readJson' gives errors in Morphline

Solved Go to solution

'readJson' gives errors in Morphline

New Contributor

Just starting with Flume and Morphlines.

I am using Flume spooldir source to read a file of JSON records.

Need to expose a timestamp field from each record into the header so that Flume hdfs sink can partition by day.

For testing I'm just using Flume logger sink.

Trying to use a Morphline interceptor to extract the timestamp field 'occurrence_date' from the JSON, but it fails.

I've searched found a few JSON examples,

http://blog.cloudera.com/blog/2014/04/how-to-process-data-using-morphlines-in-kite-sdk/

I don't see a difference, have tried many many variations, no luck.

 

What am I doing wrong?

 

...

15/05/19 10:28:07 ERROR stdlib.LogErrorBuilder$LogError: record: [{_attachment_body=[{"received_by_source_utc_offset":"-300","latitude":"44.041803","occurrence_utc_offset":"-300","heading":"1.21","occurrence_date":"2015-02-24 13:40:48","speed_unit":"meters/sec","longitude":"-123.082105","speed":"3.981","external_device_id":"ER00E925","asset_name_desc":"12-423","received_by_source_date":"2015-02-24 13:43:00.789526","external_source":"Lytx","device_timezone":"EST "}], _attachment_mimetype=[json/java+memory]}]
15/05/19 10:28:07 ERROR source.SpoolDirectorySource: FATAL: Spool Directory source spool-1: { spoolDir: /root/data/spool }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
org.apache.flume.FlumeException: org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor must non generate attachments that are not a byte[] or InputStream
        at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor.toEvent(MorphlineInterceptor.java:196)

...

 

JSON record:

 

[{"received_by_source_utc_offset": "-300", "latitude": "44.041803", "occurrence_utc_offset": "-300", "heading": "1.21", "occurrence_date": "2
015-02-24 13:40:48", "speed_unit": "meters/sec", "longitude": "-123.082105", "speed": "3.981", "external_device_id": "ER00E925", "asset_name_
desc": "12-423", "received_by_source_date": "2015-02-24 13:43:00.789526", "external_source": "Lytx", "device_timezone": "EST "}]

 

Morphlines script:

 

morphlines : [
  {
    id : morphline1
    importCommands : ["org.kitesdk.**"]
    commands : [
      { readJson {
            outputClass : com.fasterxml.jackson.databind.JsonNode
        }
      }
     
      { logError { format : "record: {}", args : ["@{}"] } }

      { extractJsonPaths {
          flatten : false
          paths : {
            my_timestamp : "/occurrence_date"
            my_offset : "/received_by_source_utc_offset"
          }
        }
      }
    ]
  }
]

2 ACCEPTED SOLUTIONS

Accepted Solutions

Re: 'readJson' gives errors in Morphline

New Contributor

Thanks for the quick reply.

I have a solution that works.

Must use the Jackson JSON from 'readJson' in order for 'extractJsonPaths' to work.

But my attempts to then coerce the '_attachment_body' back to byte[] or InputStream were unsuccessful.

So instead I first made a copy of '_attachment_body' and then restored it after processing.

Surely there is a cleaner way to accomplish this?

 

1) make a copy of '_attachement_body' as bodybak
            { addValuesIfAbsent {
              bodybak : "@{_attachment_body}"
              }
            }
2) do the transformations, read JSON
            {
                readJson {
                    outputClass : com.fasterxml.jackson.databind.JsonNode
                }
            }
   parse out fields we want, grab occurrence_date, offset
            {
              extractJsonPaths {
                flatten : true
                paths : {
                  timestamp : "/occurrence_date"
                  my_offset : "/received_by_source_utc_offset"
                }
              }
            }
   convert the timestamp to unixmilli (I'll apply offset eventually)
            {
              convertTimestamp {
                field : timestamp
                inputFormats : ["yyyy-MM-dd' 'HH:mm:ss"]
                inputTimezone : America/Los_Angeles
                outputFormat : "unixTimeInMillis"
                outputTimezone : UTC
              }
            }
     
3) restore '_attachement_body' from bodybak, then remove everything except what is needed in header (timestamp)
            { setValues {
               _attachment_body : "@{bodybak}"
               bodybak : []
              _attachment_mimetype : []
               my_offset : []
              }
            }

Re: 'readJson' gives errors in Morphline

Expert Contributor
You found the recommended solution already :-)

3 REPLIES 3
Highlighted

Re: 'readJson' gives errors in Morphline

Expert Contributor
The MorphlineInterceptor expects a byte[] or java.io.InputStream in the _attachment_body field of the morphline output record. This will become the body of the flume output event. In your case the _attachment_body field instead contains a jackson JsonNode object - hence it complains.

Re: 'readJson' gives errors in Morphline

New Contributor

Thanks for the quick reply.

I have a solution that works.

Must use the Jackson JSON from 'readJson' in order for 'extractJsonPaths' to work.

But my attempts to then coerce the '_attachment_body' back to byte[] or InputStream were unsuccessful.

So instead I first made a copy of '_attachment_body' and then restored it after processing.

Surely there is a cleaner way to accomplish this?

 

1) make a copy of '_attachement_body' as bodybak
            { addValuesIfAbsent {
              bodybak : "@{_attachment_body}"
              }
            }
2) do the transformations, read JSON
            {
                readJson {
                    outputClass : com.fasterxml.jackson.databind.JsonNode
                }
            }
   parse out fields we want, grab occurrence_date, offset
            {
              extractJsonPaths {
                flatten : true
                paths : {
                  timestamp : "/occurrence_date"
                  my_offset : "/received_by_source_utc_offset"
                }
              }
            }
   convert the timestamp to unixmilli (I'll apply offset eventually)
            {
              convertTimestamp {
                field : timestamp
                inputFormats : ["yyyy-MM-dd' 'HH:mm:ss"]
                inputTimezone : America/Los_Angeles
                outputFormat : "unixTimeInMillis"
                outputTimezone : UTC
              }
            }
     
3) restore '_attachement_body' from bodybak, then remove everything except what is needed in header (timestamp)
            { setValues {
               _attachment_body : "@{bodybak}"
               bodybak : []
              _attachment_mimetype : []
               my_offset : []
              }
            }

Re: 'readJson' gives errors in Morphline

Expert Contributor
You found the recommended solution already :-)

Don't have an account?
Coming from Hortonworks? Activate your account here