Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Kafka - Flume - Solr Morphline

Highlighted

Kafka - Flume - Solr Morphline

New Contributor

Hi,

 

I'm looking for any specific information on how to sink data to Solr from a Kafka source using Flume. Tried out sink to HDFS which works, but somehow the same doesn't seem to function when trying to sink to Solr. I've configured the Flume's Configuration File and the Morphlines File(Both using CM). Also any information on defining the schema.xml and it's relationship to the Morphlines File is highly appriciated.

Thanks in advance!

 

Configuration File

 

tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sinks = solrSink

 

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.topic = errlog
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 2000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.zookeeperConnect = <IP>
tier1.sources.source1.kafka.consumer.group.id = flume

 

tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 50000
tier1.channels.channel1.transactionCapacity = 5000

 

##Works <start>

 

 

#tier1.sinks.sink1.type = hdfs
#tier1.sinks.sink1.hdfs.fileType = DataStream
#tier1.sinks.sink1.hdfs.writeFormat = Text
#tier1.sinks.sink1.hdfs.kerberosPrincipal = $KERBEROS_PRINCIPAL
#tier1.sinks.sink1.hdfs.kerberosKeytab = $KERBEROS_KEYTAB
#tier1.sinks.sink1.channel = channel1
#tier1.sinks.sink1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
#tier1.sinks.sink1.hdfs.filePrefix = events-
#tier1.sinks.sink1.hdfs.round = true
#tier1.sinks.sink1.hdfs.roundValue = 10
#tier1.sinks.sink1.hdfs.roundUnit = minute

 

##Works <end>

 

##Doesn't Work <start>

 

tier1.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
tier1.sinks.solrSink.channel = channel1
tier1.sinks.solrSink.batchSize = 100
tier1.sinks.solrSink.batchDurationMillis = 1000
tier1.sinks.solrSink.morphlineFile=morphlines.conf
tier1.sinks.solrSink.morphlineId = morphline1

 

##Doesn't Work <end>

 

 

Morphlines File

 

SOLR_LOCATOR : {
collection : collection1

zkHost : "$ZK_HOST"
}

morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.**"]

commands : [
{
readLine {
charset : UTF-8
}
}

{
grok {
dictionaryFiles : [target/test-classes/grok-dictionaries]
expressions : {
message : """<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}"""
}
}
}

{
sanitizeUnknownSolrFields {
solrLocator : ${SOLR_LOCATOR}
}
}

{ logDebug { format : "output record: {}", args : ["@{}"] } }

{
loadSolr {
solrLocator : ${SOLR_LOCATOR}
}
}

]
}
]