Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

JSON to HDFS via Morphlines: empty strings in resulting HDFS files

Highlighted

JSON to HDFS via Morphlines: empty strings in resulting HDFS files

New Contributor

Hello,

My Flume is 1.6. Cloudera CDH 5.5.
I apply my Flume configuration (events.conf), my Morphlines configuration (morph_full.conf).

My test JSON file has 1987 records, each record is new line.

My flume-env.sh:
export JAVA_HOME=/usr/lib/jvm/java-8-oracle/
export JAVA_OPTS="-Xms100m -Xmx20480m -Dcom.sun.management.jmxremote"

I run Flume so:
./flume-ng agent -n test1 -c ../conf/ -f ../conf/events.conf

My Flume ingests JSON from KafkaSource.
JSONs are like:

{"comp_id": 23738849,
"uid": "1422949550611.14",
"lib": "smarttv",
"subsite_id": 11,
"ts": 1448275864520,
"props": {"g_source": "direct", "$brand": "LG", "from": "content", "firmware": "05.00.15", "abt": 1, "$os": "NetCast", "$manufacturer": "LG", "$os_version": "03.00.000", "block_id": "content_similar", "svod_active": 0, "authorizeduser": 0, "ssl": 1, "m_avod_hd": 1, "compilation_id": 10036, "tls": 1, "remote": "dpad", "$device_model": "39LB650V-ZE", "monetization_avod": 1, "$app_version": 2783, "m_avod_sd": 1, "page": "content"},
"name": "page_view999666"}
{"comp_id": 47375083,
"uid": "1401075317140.64",
"lib": "smarttv",
"subsite_id": 10,
"ts": 1449127613202,
"props": {"tls": 0, "$manufacturer": "Samsung", "$url": "http://movie.comp.com/897/", "$os_version": "4.626", "$device_model": "12_ECHOP", "g_source": "direct", "firmware": "T-INFOLINK2012-1019", "authorizeduser": 0, "ssl": 0, "abt": 3, "remote": "dpad", "$app_version": 2759, "$os": "SmartHub", "$brand": "Samsung"},
"name": "application_open20151202",
"abTests": {"ab_9991":2}}

My goal is to transform JSON into csv files in HDFS.
The problem is that in HDFS I either get files with empty rows: file is several Kbytes but nothing in it. Or some rows are empty and some aren't. There is no errors or exceptions in log as I see.
For some reason, _attachment_body  disappears from output (see log).

I began to exclude some fields (see morph.conf). For example, if I leave name, subsite_id, user_id, ts, lib fields, then HDFS is complete, no empty rows.

Is there anything wrong with my confs? Syntactic errors?
Is there any limits on the size of _attachment_body? Are there special non-visible symbols (for example, '\n') that crash _attachment_body?

 

I also have JSONs like
{"name": "cpa_view", "lib": "cpa", "subsite_id": "1", "ts": 1448272943165, "props": {"partner": 5, "r_complete": 1, "transaction_key": "kjyWRoXlZ6ZvYag9", "$content_id": 103300}, "uid": "cpa_user"}
{"name": "cpa_view", "lib": "cpa", "subsite_id": "1", "ts": 1448272943147, "props": {"partner": 4, "r_complete": 1, "transaction_key": "bze9xKkQ3pbx8rN7"}, "uid": "cpa_user"}
{"name": "mraid_site_adv_view", "ts": 1448272943343, "props": {"percent": 25}}

from the same KafkaSource.
I.e. not all fields are mandatory. Some fields from morph_full.conf could be missing. How to handle such situation via Morphlines? Is there any specific flag in setValues{} or extractJsonPaths{}?


Thank you!

 

events.conf:

test1.sources = source1
test1.channels = channel1
test1.sinks = sink1

test1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
test1.sources.source1.zookeeperConnect = localhost:2181
test1.sources.source1.topic = events1
test1.sources.source1.groupId = test20151123
test1.sources.source1.channels = channel1
test1.sources.source1.batchSize = 10000
test1.sources.source1.batchDurationMillis = 1000

test1.sources.source1.interceptors = mi1
test1.sources.source1.interceptors.mi1.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
test1.sources.source1.interceptors.mi1.morphlineFile = /usr/local/apache-flume-1.6.0-bin/conf/morph.conf
test1.sources.source1.interceptors.mi1.morphlineId = morphline1

 
test1.channels.channel1.type = memory
test1.channels.channel1.capacity = 100000
test1.channels.channel1.transactionCapacity = 100000
 
test1.sinks.sink1.channel = channel1

test1.sinks.sink1.type = hdfs
test1.sinks.sink1.hdfs.path = hdfs://localhost:8020/user/root/%{topic}/%Y-%m-%d
test1.sinks.sink1.hdfs.rollInterval = 0
test1.sinks.sink1.hdfs.rollSize = 10000
test1.sinks.sink1.hdfs.rollCount = 0
test1.sinks.sink1.hdfs.fileType = DataStream
test1.sinks.sink1.hdfs.batchSize = 10000
test1.sinks.sink1.hdfs.writeFormat = Text
test1.sinks.sink1.hdfs.minBlockReplicas = 1

 

morph.conf:

morphlines : [
  {
    id : morphline1
    importCommands : ["org.kitesdk.**"]
    commands : [

       {readJson {}}

       {
         extractJsonPaths {
           flatten : true
           paths : {
              name : /name
              subsite_id : /subsite_id
              user_id : /uid
              comp_id : /comp_id
              ts : /ts
              lib : /lib

              browser : /props/"$browser"
              browser_version : /props/"$browser_version"
              flash_version : /props/"$flash_version"
              os : /props/"$os"
              os_version : /props/"$os_version"
              screen : /props/"$screen"
              referrer_domain : /props/"$referrer_domain"
              url : /props/"$url"
              carrier : /props/"$carrier"
              manufacturer : /props/"$manufacturer"
              device_model : /props/"$device_model"
              brand : /props/"$brand"
              app_version : /props/"$app_version"
              
              watch_id : /props/watch_id
              watch_id_hash : /props/watch_id_hash
              
              content_id : /props/content_id
              content_id : /props/"$content_id"
                            
              compilation_id : /props/compilation_id
              compilation_id : /props/"$compilation_id"
              
              g_source : /props/g_source
              g_source : /props/"$g_source"
              
              g_campaign : /props/g_campaign
              g_campaign : /props/"$g_campaign"
              
              g_medium : /props/g_medium
              g_medium : /props/"$g_medium"
              
              g_term : /props/g_term
              g_term : /props/"$g_term"
              
              g_content : /props/g_content
              g_content : /props/"$g_content"
              
              payment_id : /props/payment_id
              payment_method : /props/payment_method
              monetization_avod : /props/monetization_avod
              monetization_est : /props/monetization_est
              monetization_tvod : /props/monetization_tvod
              monetization_svod : /props/monetization_svod
              purchase_id : /props/purchase_id
              credit_id : /props/credit_id
              duration : /props/duration
              adv_video_id : /props/adv_video_id
              
              mnc : /props/"$mnc"
              mcc : /props/"$mcc"
              radio : /props/"$radio"
              abt : /props/abt
           }
         }
       }
       { logInfo { format : "(2) : {}", args : ["@{}"] } }
       
        { setValues {
            _attachment_body : "@{name}, @{subsite_id}, @{user_id}, @{ts}, @{lib}, @{browser}"
            
          }    }

      
        { logInfo { format : "(3) : {}", args : ["@{}"] } }
        
        {toByteArray {field: _attachment_body}}
        { logInfo { format : "output record: {}", args : ["@{}"] } }
      
    ]
 }
]

 

morph_full.conf:

morphlines : [
  {
    id : morphline1
    importCommands : ["org.kitesdk.**"]
    commands : [

       {readJson {}}

       {
         extractJsonPaths {
           flatten : true
           paths : {
              name : /name
              subsite_id : /subsite_id
              user_id : /uid
              comp_id : /comp_id
              ts : /ts
              lib : /lib

              browser : /props/"$browser"
              browser_version : /props/"$browser_version"
              flash_version : /props/"$flash_version"
              os : /props/"$os"
              os_version : /props/"$os_version"
              screen : /props/"$screen"
              referrer_domain : /props/"$referrer_domain"
              url : /props/"$url"
              carrier : /props/"$carrier"
              manufacturer : /props/"$manufacturer"
              device_model : /props/"$device_model"
              brand : /props/"$brand"
              app_version : /props/"$app_version"
              
              watch_id : /props/watch_id
              watch_id_hash : /props/watch_id_hash
              
              content_id : /props/content_id
              content_id : /props/"$content_id"
                            
              compilation_id : /props/compilation_id
              compilation_id : /props/"$compilation_id"
              
              g_source : /props/g_source
              g_source : /props/"$g_source"
              
              g_campaign : /props/g_campaign
              g_campaign : /props/"$g_campaign"
              
              g_medium : /props/g_medium
              g_medium : /props/"$g_medium"
              
              g_term : /props/g_term
              g_term : /props/"$g_term"
              
              g_content : /props/g_content
              g_content : /props/"$g_content"
              
              payment_id : /props/payment_id
              payment_method : /props/payment_method
              monetization_avod : /props/monetization_avod
              monetization_est : /props/monetization_est
              monetization_tvod : /props/monetization_tvod
              monetization_svod : /props/monetization_svod
              purchase_id : /props/purchase_id
              credit_id : /props/credit_id
              duration : /props/duration
              adv_video_id : /props/adv_video_id
              
              mnc : /props/"$mnc"
              mcc : /props/"$mcc"
              radio : /props/"$radio"
              abt : /props/abt
           }
         }
       }
       { logInfo { format : "(2) : {}", args : ["@{}"] } }
       
        { setValues {
            _attachment_body : "@{name},@{subsite_id},@{user_id},@{ts},@{lib},@{browser},@{browser_version},@{flash_version},@{os},@{os_version},@{screen},@{referrer_domain},@{url},@{carrier},@{manufacturer},@{device_model},@{brand},@{app_version},@{watch_id},@{watch_id_hash},@{content_id},@{content_id},@{compilation_id},@{compilation_id},@{g_source},@{g_source},@{g_campaign}
            @{comp_id},@{g_campaign},@{g_medium},@{g_medium},@{g_term},@{g_term},@{g_content},@{g_content},@{payment_id},@{payment_method},@{monetization_avod},@{monetization_est},@{monetization_tvod},@{monetization_svod},@{purchase_id},@{credit_id},@{duration},@{adv_video_id},@{mnc},@{mcc},@{radio},@{abt}"
          }    }

      
        { logInfo { format : "(3) : {}", args : ["@{}"] } }
        
        {toByteArray {field: _attachment_body}}
        { logInfo { format : "output record: {}", args : ["@{}"] } }
      
    ]
 }
]

 

flume.log: https://yadi.sk/d/-51tkj1hkzy3f

 

HDFS files: https://yadi.sk/d/s1_spnlXkzy4c