Member since
05-19-2015
4
Posts
1
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
6323 | 05-21-2015 09:54 AM |
05-21-2015
09:54 AM
1 Kudo
Thanks for the quick reply. I have a solution that works. Must use the Jackson JSON from 'readJson' in order for 'extractJsonPaths' to work. But my attempts to then coerce the '_attachment_body' back to byte[] or InputStream were unsuccessful. So instead I first made a copy of '_attachment_body' and then restored it after processing. Surely there is a cleaner way to accomplish this? 1) make a copy of '_attachement_body' as bodybak { addValuesIfAbsent { bodybak : "@{_attachment_body}" } } 2) do the transformations, read JSON { readJson { outputClass : com.fasterxml.jackson.databind.JsonNode } } parse out fields we want, grab occurrence_date, offset { extractJsonPaths { flatten : true paths : { timestamp : "/occurrence_date" my_offset : "/received_by_source_utc_offset" } } } convert the timestamp to unixmilli (I'll apply offset eventually) { convertTimestamp { field : timestamp inputFormats : ["yyyy-MM-dd' 'HH:mm:ss"] inputTimezone : America/Los_Angeles outputFormat : "unixTimeInMillis" outputTimezone : UTC } } 3) restore '_attachement_body' from bodybak, then remove everything except what is needed in header (timestamp) { setValues { _attachment_body : "@{bodybak}" bodybak : [] _attachment_mimetype : [] my_offset : [] } }
... View more
05-19-2015
09:26 AM
Just starting with Flume and Morphlines. I am using Flume spooldir source to read a file of JSON records. Need to expose a timestamp field from each record into the header so that Flume hdfs sink can partition by day. For testing I'm just using Flume logger sink. Trying to use a Morphline interceptor to extract the timestamp field 'occurrence_date' from the JSON, but it fails. I've searched found a few JSON examples, http://blog.cloudera.com/blog/2014/04/how-to-process-data-using-morphlines-in-kite-sdk/ I don't see a difference, have tried many many variations, no luck. What am I doing wrong? ... 15/05/19 10:28:07 ERROR stdlib.LogErrorBuilder$LogError: record: [{_attachment_body=[{"received_by_source_utc_offset":"-300","latitude":"44.041803","occurrence_utc_offset":"-300","heading":"1.21","occurrence_date":"2015-02-24 13:40:48","speed_unit":"meters/sec","longitude":"-123.082105","speed":"3.981","external_device_id":"ER00E925","asset_name_desc":"12-423","received_by_source_date":"2015-02-24 13:43:00.789526","external_source":"Lytx","device_timezone":"EST "}], _attachment_mimetype=[json/java+memory]}] 15/05/19 10:28:07 ERROR source.SpoolDirectorySource: FATAL: Spool Directory source spool-1: { spoolDir: /root/data/spool }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing. org.apache.flume.FlumeException: org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor must non generate attachments that are not a byte[] or InputStream at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor.toEvent(MorphlineInterceptor.java:196) ... JSON record: [{"received_by_source_utc_offset": "-300", "latitude": "44.041803", "occurrence_utc_offset": "-300", "heading": "1.21", "occurrence_date": "2 015-02-24 13:40:48", "speed_unit": "meters/sec", "longitude": "-123.082105", "speed": "3.981", "external_device_id": "ER00E925", "asset_name_ desc": "12-423", "received_by_source_date": "2015-02-24 13:43:00.789526", "external_source": "Lytx", "device_timezone": "EST "}] Morphlines script: morphlines : [ { id : morphline1 importCommands : ["org.kitesdk.**"] commands : [ { readJson { outputClass : com.fasterxml.jackson.databind.JsonNode } } { logError { format : "record: {}", args : ["@{}"] } } { extractJsonPaths { flatten : false paths : { my_timestamp : "/occurrence_date" my_offset : "/received_by_source_utc_offset" } } } ] } ]
... View more
Labels:
- Labels:
-
Apache Flume
-
Apache Solr
-
HDFS