07-19-2016 08:41 AM - edited 07-19-2016 08:44 AM
Hey guys,
I've been trying to figure it out this for about two weeks and I have not found any solution , can someone help me , please ? =D.
Problem: Cannot make a JSON input be consume through Flume (with HTTP Source) using Morphline to index in Solr.
I've followed the examples of Cloudera Tutorials Link
but couldn't make it work with HTTP Source.
The configuration that I'm using:
#Flume
agent1.sources = http-source agent1.channels = channel1 agent1.sinks = solrSink agent1.sources.http-source.type = org.apache.flume.source.http.HTTPSource agent1.sources.http-source.handler = org.apache.flume.source.http.JSONHandler agent1.sources.http-source.port = 9049 agent1.sources.http-source.channels = memory-channel agent1.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink agent1.sinks.solrSink.channel = memoryChannel agent1.sinks.solrSink.batchSize = 1000 agent1.sinks.solrSink.batchDurationMillis = 1000 agent1.sinks.solrSink.morphlineFile = /home/cloudera/Desktop/http-source/conf/morphline.conf agent1.sinks.solrSink.morphlineId = iot_morphline agent1.sinks.solrSink.threadCount = 1 agent1.channels.channel1.type = memory agent1.channels.channel1.capacity = 20000 agent1.channels.channel1.transactionCapacity = 1000 agent1.sources.http-source.channels = channel1 agent1.sinks.solrSink.channel = channel1
#Morphline SOLR_LOCATOR : { collection : iot_logs zkHost : "quickstart.cloudera:2181/solr" } morphlines: [ { id : iot_morphline importCommands : ["org.kitesdk.**", "com.cloudera.**", "org.apache.solr.**"] commands : [ { readJson { outputClass : java.util.Map } } { extractJsonPaths { flatten : true paths : { nome : /nome senha : /senha _version_ : /_version_ } } } { generateSolrSequenceKey { baseIdField: id solrLocator : ${SOLR_LOCATOR} } } { sanitizeUnknownSolrFields { solrLocator : ${SOLR_LOCATOR} } } { logDebug { format : "solrcell output: {}", args : ["@{}"] } } { loadSolr { solrLocator : ${SOLR_LOCATOR} } } ] } ]
# Solr schema
<?xml version="1.0" encoding="UTF-8" ?> <schema name="solr_iot_schema" version="1.5"> <fields> <field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="nome" type="string" indexed="true" stored="true" multiValued="false"/> <field name="senha" type="string" indexed="true" stored="true" multiValued="false"/> <field name="_version_" type="long" indexed="true" stored="true" required="true" /> </fields> <uniqueKey>id</uniqueKey>
...
</schema>
# Command flume-ng agent --conf conf/ --conf-file conf/flume.conf -Xms2048m -Xmx204800m --name agent1 -Dflume.root.logger=DEBUG,INFO,TRACE,console
# Getting this error 16/07/19 08:37:19 INFO morphline.MorphlineSink: Morphline Sink solrSink started.
16/07/19 08:37:22 WARN morphline.MorphlineHandlerImpl: Morphline /home/cloudera/Desktop/http-source/conf/morphline.conf@iot_morphline failed to process record: {_version_=[1.0], nome=[Jonathan Nobre], senha=[123]}
# HTTP POST [{ headers: { nome: Jonathan Nobre, senha: 123,_version_ : 1.0 } }]
Regards. ^.^'
07-19-2016 09:10 AM
07-19-2016 09:38 AM
Thank you for the quick answer, but didn't worked.
I've tried right now without the outputClass and with this new configuration, both of them result in the same error.
SOLR_LOCATOR : { collection : iot_logs zkHost : "quickstart.cloudera:2181/solr" } morphlines: [ { id : iot_morphline importCommands : ["org.kitesdk.**", "com.cloudera.**", "org.apache.solr.**"] commands : [ { readJson { outputClass : com.fasterxml.jackson.databind.JsonNode } } { extractJsonPaths { flatten : true paths : { nome : /nome senha : /senha _version_ : /_version_ } } } { generateSolrSequenceKey { baseIdField: id solrLocator : ${SOLR_LOCATOR} } } { sanitizeUnknownSolrFields { solrLocator : ${SOLR_LOCATOR} } } { logDebug { format : "solrcell output: {}", args : ["@{}"] } } { loadSolr { solrLocator : ${SOLR_LOCATOR} } } ] } ]
10-05-2016 02:31 AM
You can use something like below.
I have tried and tested the same.
Solr Cloud 5.1.0
SOlr J 5.0.0
KiteSDK version- 1.1.0
It is working fine with schema less mode as well now. As I have updated Kite Sdk.
This is wokring fine with Kafka Source any Channel and MorphlineSolrSInk.
===============================================================
# Specify server locations in a SOLR_LOCATOR variable; used later in
# variable substitutions:
SOLR_LOCATOR : {
# Name of solr collection
collection :gettingstarted
# ZooKeeper ensemble
zkHost : "localhost:2181"
}
morphlines : [
{
# Name used to identify a morphline. E.g. used if there are multiple
# morphlines in a morphline config file
id : morphline1
# Import all morphline commands in these java packages and their
# subpackages. Other commands that may be present on the classpath are
# not visible to this morphline.
importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
#importCommands : ["org.**", "com.**"]
commands : [
{ readJson: {} }
{ extractJsonPaths { flatten: false, paths: {
"jobName" : /jobName,
"componentType" : /componentType,
"jobStatus" : /jobStatus,
"jobKey" : /jobKey,
"agentName" : /agentName,
"id" : /id,
"jobGroup" : /jobGroup,
"endTime" : /endTime,
"fileName" : /fileName,
"filePath" : /filePath,
"fileFlushTime" : /fileFlushTime,
"fileReadTime" : /fileReadTime,
"fileSize" : /fileSize,
} } }
{ logError { format : "record: {}", args : ["@{}"] } }
# Parse input attachment and emit a record for each input line
{ logDebug { format : "output record: {}", args : ["@{}"] } }
{ removeFields {
blacklist : ["regex:_attachment_.*"]
}
}
# log the record at INFO level to SLF4J
{ logInfo { format : "output record: {}", args : ["@{}"] } }
# load the record into a Solr server or MapReduce Reducer
{
loadSolr {
solrLocator : ${SOLR_LOCATOR}
}
}
]
}
]