Reply
Highlighted
New Contributor
Posts: 3
Registered: ‎03-01-2017

Flume,morphlines, kite, solr

Hi,

 

I want to handle xml payload using flume and use morphlines to put parsed data to solr.

I use RabbitMQ to push incremental data from Oracle to Solr.

 

Ex Payload from RabbitMQ which I want tu push to Solr:

 

<WER RECORD_ID="142281634" SYSTEM="WERS">
    <WER_ID>100000730</WER_ID>
    <WER_NO>660300070001</WER_NO>
    <WER_RECV_DATE>1966-01-20</WER_RECV_DATE>
    <WER_COUNTRY>062</WER_COUNTRY>
</WER>

 

I have a config:


morphlines : [

  {

    id : morphline1

    importCommands : ["com.cloudera.**", "org.apache.solr.**", "org.kitesdk.**"]

     commands : [
   {
        xquery {
          fragments : [
            {
              fragmentPath : "/"
              queryString : "/WER"
    }
      ]
}
}

        { generateUUID { field : id } }
     
    
      {
         sanitizeUnknownSolrFields {
          # Location from which to fetch Solr schema
         solrLocator : ${SOLR_LOCATOR}        }
      }

 
 {
         loadSolr {
          solrLocator : ${SOLR_LOCATOR}
              }
    }

    ]    

  }

]

 

In that case I received error:

 

ERROR morphline.MorphlineSink: Morphline Sink k1: Unable to process event from channel c1. Exception follows.
org.kitesdk.morphline.api.MorphlineRuntimeException: org.apache.solr.client.solrj.retry.RetriesExhaustedException: Performed 0 retries across 1 client requests. Gave up because the last 0 retries across 0 seconds for the current request failed with this reason: 'Exception is non-retryable per RetryPolicyFactory {org.apache.solr.client.solrj.retry.DefaultRetryPolicyFactory@6c669355}' and these root causes:
        at org.kitesdk.morphline.solr.LoadSolrBuilder$LoadSolr.doNotify(LoadSolrBuilder.java:131)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:137)
        at org.kitesdk.morphline.base.Connector.notify(Connector.java:57)
        at org.kitesdk.morphline.base.AbstractCommand.doNotify(AbstractCommand.java:155)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:137)
        at org.kitesdk.morphline.base.Connector.notify(Connector.java:57)
        at org.kitesdk.morphline.base.AbstractCommand.doNotify(AbstractCommand.java:155)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:137)
        at org.kitesdk.morphline.base.Connector.notify(Connector.java:57)
        at org.kitesdk.morphline.base.AbstractCommand.doNotify(AbstractCommand.java:155)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:137)
        at org.kitesdk.morphline.base.Connector.notify(Connector.java:57)
        at org.kitesdk.morphline.base.AbstractCommand.doNotify(AbstractCommand.java:155)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:137)
        at org.kitesdk.morphline.base.AbstractCommand.doNotify(AbstractCommand.java:155)
        at org.kitesdk.morphline.base.AbstractCommand.notify(AbstractCommand.java:137)
        at org.kitesdk.morphline.base.Notifications.notify(Notifications.java:96)
        at org.kitesdk.morphline.base.Notifications.notifyCommitTransaction(Notifications.java:61)
        at org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl.commitTransaction(MorphlineHandlerImpl.java:148)
        at org.apache.flume.sink.solr.morphline.MorphlineSink.process(MorphlineSink.java:160)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.solr.client.solrj.retry.RetriesExhaustedException: Performed 0 retries across 1 client requests. Gave up because the last 0 retries across 0 seconds for the current request failed with this reason: 'Exception is non-retryable per RetryPolicyFactory {org.apache.solr.client.solrj.retry.DefaultRetryPolicyFactory@6c669355}' and these root causes:
        at org.apache.solr.client.solrj.retry.RetryingSolrServer.request(RetryingSolrServer.java:206)
        at org.apache.solr.client.solrj.request.AbstractUpdateRequest.process(AbstractUpdateRequest.java:124)
        at org.apache.solr.client.solrj.SolrServer.add(SolrServer.java:68)
        at org.apache.solr.client.solrj.SolrServer.add(SolrServer.java:54)
        at org.kitesdk.morphline.solr.SolrServerDocumentLoader.sendLoads(SolrServerDocumentLoader.java:143)
        at org.kitesdk.morphline.solr.SolrServerDocumentLoader.sendBatch(SolrServerDocumentLoader.java:134)
        at org.kitesdk.morphline.solr.SolrServerDocumentLoader.commitTransaction(SolrServerDocumentLoader.java:96)
        at org.kitesdk.morphline.solr.LoadSolrBuilder$LoadSolr.doNotify(LoadSolrBuilder.java:129)
        ... 22 more
Caused by: org.apache.solr.client.solrj.impl.CloudSolrServer$RouteException: Expected mime type application/octet-stream but got text/html. <html><head><title>Apache Tomcat/6.0.44 - Error report</title><style><!--H1 {font-family:Tahoma,Arial,sans-serif;color:white;background-color:#525D76;font-size:22px;} H2 {font-family:Tahoma,Arial,sans-serif;color:white;background-color:#525D76;font-size:16px;} H3 {font-family:Tahoma,Arial,sans-serif;color:white;background-color:#525D76;font-size:14px;} BODY {font-family:Tahoma,Arial,sans-serif;color:black;background-color:white;} B {font-family:Tahoma,Arial,sans-serif;color:white;background-color:#525D76;} P {font-family:Tahoma,Arial,sans-serif;background:white;color:black;font-size:12px;}A {color : black;}A.name {color : black;}HR {color : #525D76;}--></style> </head><body><h1>HTTP Status 401 - Authentication required</h1><HR size="1" noshade="noshade"><p><b>type</b> Status report</p><p><b>message</b> <u>Authentication required</u></p><p><b>description</b> <u>This request requires HTTP authentication.</u></p><HR size="1" noshade="noshade"><h3>Apache Tomcat/6.0.44</h3></body></html>
        at org.apache.solr.client.solrj.impl.CloudSolrServer.directUpdate(CloudSolrServer.java:381)
        at org.apache.solr.client.solrj.impl.CloudSolrServer.request(CloudSolrServer.java:557)
        at org.apache.solr.client.solrj.retry.RetryingSolrServer.request(RetryingSolrServer.java:167)
        ... 29 more
Caused by: org.apache.solr.client.solrj.impl.HttpSolrServer$RemoteSolrException: Expected mime type application/octet-stream but got text/html. <html><head><title>Apache Tomcat/6.0.44 - Error report</title><style><!--H1 {font-family:Tahoma,Arial,sans-serif;color:white;background-color:#525D76;font-size:22px;} H2 {font-family:Tahoma,Arial,sans-serif;color:white;background-color:#525D76;font-size:16px;} H3 {font-family:Tahoma,Arial,sans-serif;color:white;background-color:#525D76;font-size:14px;} BODY {font-family:Tahoma,Arial,sans-serif;color:black;background-color:white;} B {font-family:Tahoma,Arial,sans-serif;color:white;background-color:#525D76;} P {font-family:Tahoma,Arial,sans-serif;background:white;color:black;font-size:12px;}A {color : black;}A.name {color : black;}HR {color : #525D76;}--></style> </head><body><h1>HTTP Status 401 - Authentication required</h1><HR size="1" noshade="noshade"><p><b>type</b> Status report</p><p><b>message</b> <u>Authentication required</u></p><p><b>description</b> <u>This request requires HTTP authentication.</u></p><HR size="1" noshade="noshade"><h3>Apache Tomcat/6.0.44</h3></body></html>
        at org.apache.solr.client.solrj.impl.HttpSolrServer.executeMethod(HttpSolrServer.java:580)
        at org.apache.solr.client.solrj.impl.HttpSolrServer.request(HttpSolrServer.java:228)
        at org.apache.solr.client.solrj.impl.HttpSolrServer.request(HttpSolrServer.java:224)
        at org.apache.solr.client.solrj.impl.LBHttpSolrServer.doRequest(LBHttpSolrServer.java:345)
        at org.apache.solr.client.solrj.impl.LBHttpSolrServer.request(LBHttpSolrServer.java:306)
        at org.apache.solr.client.solrj.impl.CloudSolrServer$1.call(CloudSolrServer.java:359)
        at org.apache.solr.client.solrj.impl.CloudSolrServer$1.call(CloudSolrServer.java:356)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        ... 1 more
17/05/31 13:53:16 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
        at org.apache.flume.sink.solr.morphline.MorphlineSink.process(MorphlineSink.java:190)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)

 

Also I tried wih configuration:

 


morphlines : [

  {
    id : morphline1

    importCommands : ["com.cloudera.**", "org.apache.solr.**", "org.kitesdk.**"]

    commands : [

      {

        # Parse input attachment and emit a record for each input line                

        readLine {

          charset : UTF-8

        }

      }

      {

        grok {

          dictionaryFiles : [dir/kite/kite-morphlines/kite-morphlines-core/src/test/resources/grok-dictionaries]

          expressions : {

             message : """^<%{REC_ID:RECORD_ID} %{SYS_ID:SYSTEM_ID}>"""

          }

        }

      }


        { generateUUID { field : id } }
     
    
      {
         sanitizeUnknownSolrFields {
          # Location from which to fetch Solr schema
         solrLocator : ${SOLR_LOCATOR}        }
      }

 
 {
         loadSolr {
          solrLocator : ${SOLR_LOCATOR}
              }
    }

    ]

  }

]

 

In that case error :

 

17/05/31 14:12:34 WARN morphline.MorphlineHandlerImpl: Morphlinedir/morphlineFlume.conf@null failed to process record: {_attachment_body=[[B@35c4fbe4], timestamp=[1496232752755]}
17/05/31 14:14:35 WARN morphline.MorphlineHandlerImpl: Morphline dir/morphlineFlume.conf@null failed to process record: {_attachment_body=[[B@57f54b43], timestamp=[1496232872756]}
17/05/31 14:18:36 WARN morphline.MorphlineHandlerImpl: Morphline dir/morphlineFlume.conf@null failed to process record: {_attachment_body=[[B@4a1647fd], timestamp=[1496233112963]}
17/05/31 14:22:37 WARN morphline.MorphlineHandlerImpl: Morphline dir/morphlineFlume.conf@null failed to process record: {_attachment_body=[[B@4cbeb17a], timestamp=[1496233352752]}

 

Any help would be appreciated.

Maybe there is better way to parse data payload from any MQ service to Solr using Flume?

 

Announcements
The Kite SDK is a collection of docs, sample code, APIs, and tools to make Hadoop application development faster. Learn more at http://kitesdk.org.