Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

need help with flume - morphline - solr pipeline, CDH5 & CM

avatar
Guru

Hi,

 

I wanted to build a pipeline of ingesting log data via flume into solr, sounds like nothing special....


But I get stuck at starting the flume agent (with exec source 'tail -f...'), its log tells me that it stops doing anything after "INFO org.kitesdk.morphline.api.MorphlineContext: Importing commands", because this is the last log entry before it repeatedly got restarted (every 30sec.).

If I remove the SolrSink from my flume config, the expected files are written to the HDFS sink, thereby the base workflow is fine.

 

For my testing I tried to use the Syslog-example provided in the SearchUserGuide (http://www.cloudera.com/content/cloudera-content/cloudera-docs/Search/latest/Cloudera-Search-User-Gu...

 

One strange thing is, how to configure the grok-dictionaries in the morphlines.conf while using ClouderaManager for configuring the stuff?

The configuration itself is clear, the text area in "Flume-NG Solr Sink", but how to reference the grok-dictionaries? just "dictionaryFiles : [grok-dictionaries]" or some path prefixes ?!?!

 

=========================

this is the log of the flume agent (while I am writing entries to the watched file, but nothing will be processed):

""

2014-04-30 15:42:37,285 INFO org.apache.flume.sink.hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
2014-04-30 15:44:16,448 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
2014-04-30 15:44:16,493 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/var/run/cloudera-scm-agent/process/1027-flume-AGENT/flume.conf
2014-04-30 15:44:16,506 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,507 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,507 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,508 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,508 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,508 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,509 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,509 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,510 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,510 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,510 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,511 INFO org.apache.flume.conf.FlumeConfiguration: Added sinks: HDFS solrSink Agent: agent
2014-04-30 15:44:16,511 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,512 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,512 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,513 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,561 INFO org.apache.flume.conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
2014-04-30 15:44:16,562 INFO org.apache.flume.node.AbstractConfigurationProvider: Creating channels
2014-04-30 15:44:16,580 INFO org.apache.flume.channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory
2014-04-30 15:44:16,592 INFO org.apache.flume.node.AbstractConfigurationProvider: Created channel memoryChannel
2014-04-30 15:44:16,594 INFO org.apache.flume.source.DefaultSourceFactory: Creating instance of source execSrc, type exec
2014-04-30 15:44:16,609 INFO org.apache.flume.sink.DefaultSinkFactory: Creating instance of sink: solrSink, type: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
2014-04-30 15:44:16,616 INFO org.apache.flume.sink.DefaultSinkFactory: Creating instance of sink: HDFS, type: hdfs
2014-04-30 15:44:17,477 INFO org.apache.flume.sink.hdfs.HDFSEventSink: Hadoop Security enabled: false
2014-04-30 15:44:17,481 INFO org.apache.flume.node.AbstractConfigurationProvider: Channel memoryChannel connected to [execSrc, solrSink, HDFS]
2014-04-30 15:44:17,509 INFO org.apache.flume.node.Application: Starting new configuration:{ sourceRunners:{execsrc=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:execSrc,state:IDLE} }} sinkRunners:{HDFS=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@9a87fad counterGroup:{ name:null counters:{} } }, solrSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@15563bcf counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }
2014-04-30 15:44:17,521 INFO org.apache.flume.node.Application: Starting Channel memoryChannel
2014-04-30 15:44:17,623 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.
2014-04-30 15:44:17,623 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started
2014-04-30 15:44:17,630 INFO org.apache.flume.node.Application: Starting Sink HDFS
2014-04-30 15:44:17,632 INFO org.apache.flume.node.Application: Starting Sink solrSink
2014-04-30 15:44:17,632 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: HDFS: Successfully registered new MBean.
2014-04-30 15:44:17,633 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SINK, name: HDFS started
2014-04-30 15:44:17,633 INFO org.apache.flume.sink.solr.morphline.MorphlineSink: Starting Morphline Sink solrSink (MorphlineSolrSink) ...
2014-04-30 15:44:17,633 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: solrSink: Successfully registered new MBean.
2014-04-30 15:44:17,633 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SINK, name: solrSink started
2014-04-30 15:44:17,634 INFO org.apache.flume.node.Application: Starting Source execSrc
2014-04-30 15:44:17,637 INFO org.apache.flume.source.ExecSource: Exec source starting with command:tail -F /tmp/spooldir/test.txt
2014-04-30 15:44:17,650 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: execSrc: Successfully registered new MBean.
2014-04-30 15:44:17,650 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: execSrc started
2014-04-30 15:44:17,687 INFO org.mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
2014-04-30 15:44:17,877 INFO org.mortbay.log: jetty-6.1.26
2014-04-30 15:44:17,956 INFO org.mortbay.log: Started SelectChannelConnector@0.0.0.0:41414
2014-04-30 15:44:18,134 INFO org.kitesdk.morphline.api.MorphlineContext: Importing commands
2014-04-30 15:45:00,994 INFO org.apache.flume.sink.hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false

""

 

This log lines are written every ~30sec.

 

=====flume config====

agent.sources = execSrc
agent.channels = memoryChannel
agent.sinks = HDFS solrSink

agent.sources.execSrc.type = exec
agent.sources.execSrc.command = tail -F /tmp/spooldir/test.txt
agent.sources.execSrc.interceptors.uuidinterceptor.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
agent.sources.execSrc.interceptors.uuidinterceptor.headerName = id
agent.sources.execSrc.interceptors.uuidinterceptor.preserveExisting = false
agent.sources.execSrc.interceptors.uuidinterceptor.prefix = myhostname
agent.sources.execSrc.channels = memoryChannel

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1000

agent.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
agent.sinks.solrSink.channel = memoryChannel
agent.sinks.solrSink.batchSize = 1000
agent.sinks.solrSink.batchDurationMillis = 1000
agent.sinks.solrSink.morphlineFile = morphlines.conf
agent.sinks.solrSink.morphlineId = morphline1

agent.sinks.HDFS.channel = memoryChannel
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.hdfs.path = hdfs://hadoop-pg-6.cluster:8020/tmp/test4solr
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.writeFormat = Text
agent.sinks.HDFS.hdfs.batchSize = 2000
agent.sinks.HDFS.hdfs.rollSize = 0
agent.sinks.HDFS.hdfs.rollCount = 2000
agent.sinks.HDFS.hdfs.rollInterval = 30

 

======morphline config=======

# Specify server locations in a SOLR_LOCATOR variable; used later in variable substitutions:

SOLR_LOCATOR : {
collection : workshop
# ZooKeeper ensemble
zkHost : "$ZK_HOST"
# The maximum number of documents to send to Solr per network batch (throughput knob)
# batchSize : 100
}

morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.**", "org.apache.solr.**"]

commands : [
{ readLine { charset : UTF-8 } }
{ addCurrentTime {
field : manual_timestamp
preserveExisting : false
}
}
{
grok {
dictionaryFiles : [grok-dictionaries]
expressions : {
message : """<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA&colon;syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA&colon;syslog_message}"""
}
}
}

# convert timestamp field to native Solr timestamp format
# e.g. 2012-09-06T07:14:34Z to 2012-09-06T07:14:34.000Z
{
convertTimestamp {
field : created_at
inputFormats : ["yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd"]
inputTimezone : America/Los_Angeles
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
outputTimezone : UTC
}
}

# Recall that Solr throws an exception on any attempt to load a document that contains a
# field that isn't specified in schema.xml.
{
sanitizeUnknownSolrFields {
# Location from which to fetch Solr schema
solrLocator : ${SOLR_LOCATOR}
}
}

# log the record at DEBUG level to SLF4J
{ logDebug { format : "output record: {}", args : ["@{}"] } }


# load the record into a SolrServer
{
loadSolr {
solrLocator : ${SOLR_LOCATOR}
}
}
]
}
]

 

 

Additionally I wanted to ask where the logDebug output from the morphline will be written to? 

 

What do I need to modify to be able to ingest data into Solr?!?!

 

any help appreciated....

1 ACCEPTED SOLUTION

avatar
Super Collaborator
Interceptors are executed prior to Sinks.

If the UUIDInterceptor does nothing it's probably misconfigured or attached to the wrong channel in flume.conf, or similar.

Alternatively, consider replacing the UUIDInterceptor with a MorphlineInterceptor that uses the generateUUID command, or move the generateUUID command into the morphline config of the MorphlineSolrSink.

Also see http://kitesdk.org/docs/current/kite-morphlines/morphlinesReferenceGuide.html#/generateUUID

Wolfgang.

View solution in original post

10 REPLIES 10

avatar
Super Collaborator
Great! Looking forward to meet up @ Berlin Buzzwords.

Wolfgang.