Reply
Highlighted
New Contributor
Posts: 2
Registered: ‎07-19-2016

[failed to process record] Using Flume + Morphline + Solr with HTTP Source for JSON input

[ Edited ]

Hey guys,

 

I've been trying to figure it out this for about two weeks and I have not found any solution , can someone help me , please ? =D.

 

Problem: Cannot make a JSON input be consume through Flume (with HTTP Source) using Morphline to index in Solr.

 

I've followed the examples of Cloudera Tutorials Link

but couldn't make it work with HTTP Source.

 

The configuration that I'm using:

 

 

 

#Flume
agent1.sources = http-source agent1.channels = channel1 agent1.sinks = solrSink agent1.sources.http-source.type = org.apache.flume.source.http.HTTPSource agent1.sources.http-source.handler = org.apache.flume.source.http.JSONHandler agent1.sources.http-source.port = 9049 agent1.sources.http-source.channels = memory-channel agent1.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink agent1.sinks.solrSink.channel = memoryChannel agent1.sinks.solrSink.batchSize = 1000 agent1.sinks.solrSink.batchDurationMillis = 1000 agent1.sinks.solrSink.morphlineFile = /home/cloudera/Desktop/http-source/conf/morphline.conf agent1.sinks.solrSink.morphlineId = iot_morphline agent1.sinks.solrSink.threadCount = 1 agent1.channels.channel1.type = memory agent1.channels.channel1.capacity = 20000 agent1.channels.channel1.transactionCapacity = 1000 agent1.sources.http-source.channels = channel1 agent1.sinks.solrSink.channel = channel1
#Morphline
SOLR_LOCATOR : {
	collection : iot_logs
	zkHost : "quickstart.cloudera:2181/solr"
}

morphlines: [
	{
		id : iot_morphline
		importCommands : ["org.kitesdk.**", "com.cloudera.**", "org.apache.solr.**"]
		commands : [
			{
				readJson {
					outputClass : java.util.Map
				}
			}
			{
				extractJsonPaths {
					flatten : true
					paths :  {
						nome : /nome
						senha : /senha
						_version_ : /_version_
					}
				}	
			}
			{
				generateSolrSequenceKey {
					baseIdField: id
					solrLocator : ${SOLR_LOCATOR}
				}
			}
			{
				sanitizeUnknownSolrFields {
					solrLocator : ${SOLR_LOCATOR}
				}
			}
			{ 	
				logDebug { 
					format : "solrcell output: {}", args : ["@{}"] 
				}
			}
			{
				loadSolr {
					solrLocator : ${SOLR_LOCATOR}
			  	}
			}
		]
	}
]

 

# Solr schema
<?xml version="1.0" encoding="UTF-8" ?> <schema name="solr_iot_schema" version="1.5"> <fields> <field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="nome" type="string" indexed="true" stored="true" multiValued="false"/> <field name="senha" type="string" indexed="true" stored="true" multiValued="false"/> <field name="_version_" type="long" indexed="true" stored="true" required="true" /> </fields> <uniqueKey>id</uniqueKey>

...

</schema>

 

# Command
flume-ng agent --conf conf/ --conf-file conf/flume.conf -Xms2048m -Xmx204800m --name agent1 -Dflume.root.logger=DEBUG,INFO,TRACE,console
# Getting this error
16/07/19 08:37:19 INFO morphline.MorphlineSink: Morphline Sink solrSink started.
16/07/19 08:37:22 WARN morphline.MorphlineHandlerImpl: Morphline /home/cloudera/Desktop/http-source/conf/morphline.conf@iot_morphline failed to process record: {_version_=[1.0], nome=[Jonathan Nobre], senha=[123]}
# HTTP POST
[{ headers: { nome: Jonathan Nobre, senha: 123,_version_ : 1.0 } }]

 

Regards. ^.^'

 

 

Cloudera Employee
Posts: 146
Registered: ‎08-21-2013

Re: [failed to processrecord] Using Flume + Morphline + Solr with HTTP Source for JSON input

The extractJsonPaths command expects a jackson json object as input rather than a java.util.Map. Try to remove the "outputClass : java.util.Map" option from the readJson command.

New Contributor
Posts: 2
Registered: ‎07-19-2016

Re: [failed to processrecord] Using Flume + Morphline + Solr with HTTP Source for JSON input

Thank you for the quick answer, but didn't worked.

 

I've tried right now without the outputClass and with this new configuration, both of them result in the same error.

 

 

SOLR_LOCATOR : {
	collection : iot_logs
	zkHost : "quickstart.cloudera:2181/solr"
}

morphlines: [
	{
		id : iot_morphline
		importCommands : ["org.kitesdk.**", "com.cloudera.**", "org.apache.solr.**"]
		commands : [
			{
				readJson {
					outputClass : com.fasterxml.jackson.databind.JsonNode
				}
			}
			{
				extractJsonPaths {
					flatten : true
					paths :  {
						nome : /nome
						senha : /senha
						_version_ : /_version_
					}
				}	
			}
			{
				generateSolrSequenceKey {
					baseIdField: id
					solrLocator : ${SOLR_LOCATOR}
				}
			}
			{
				sanitizeUnknownSolrFields {
					solrLocator : ${SOLR_LOCATOR}
				}
			}
			{ 	
				logDebug { 
					format : "solrcell output: {}", args : ["@{}"] 
				}
			}
			{
				loadSolr {
					solrLocator : ${SOLR_LOCATOR}
			  	}
			}
		]
	}
]

 

New Contributor
Posts: 1
Registered: ‎10-05-2016

Re: [failed to processrecord] Using Flume + Morphline + Solr with HTTP Source for JSON input

You can use something like below.

I have tried and tested the same.

Solr Cloud 5.1.0

SOlr J 5.0.0

KiteSDK version- 1.1.0

It is working fine with schema less mode as well now. As I have updated Kite Sdk.

This is wokring fine with Kafka Source any Channel and MorphlineSolrSInk.

===============================================================

# Specify server locations in a SOLR_LOCATOR variable; used later in
# variable substitutions:
SOLR_LOCATOR : {
# Name of solr collection
collection :gettingstarted

# ZooKeeper ensemble
zkHost : "localhost:2181"
}


morphlines : [
{
# Name used to identify a morphline. E.g. used if there are multiple
# morphlines in a morphline config file
id : morphline1

# Import all morphline commands in these java packages and their
# subpackages. Other commands that may be present on the classpath are
# not visible to this morphline.
importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
#importCommands : ["org.**", "com.**"]

commands : [
{ readJson: {} }

{ extractJsonPaths { flatten: false, paths: {
"jobName" : /jobName,
"componentType" : /componentType,
"jobStatus" : /jobStatus,
"jobKey" : /jobKey,
"agentName" : /agentName,
"id" : /id,
"jobGroup" : /jobGroup,
"endTime" : /endTime,
"fileName" : /fileName,
"filePath" : /filePath,
"fileFlushTime" : /fileFlushTime,
"fileReadTime" : /fileReadTime,
"fileSize" : /fileSize,
} } }

{ logError { format : "record: {}", args : ["@{}"] } }
# Parse input attachment and emit a record for each input line
{ logDebug { format : "output record: {}", args : ["@{}"] } }

{ removeFields {
blacklist : ["regex:_attachment_.*"]
}
}


# log the record at INFO level to SLF4J
{ logInfo { format : "output record: {}", args : ["@{}"] } }

# load the record into a Solr server or MapReduce Reducer
{
loadSolr {
solrLocator : ${SOLR_LOCATOR}
}
}
]
}
]

Announcements