Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

need help with flume - morphline - solr pipeline, CDH5 & CM

avatar
Guru

Hi,

 

I wanted to build a pipeline of ingesting log data via flume into solr, sounds like nothing special....


But I get stuck at starting the flume agent (with exec source 'tail -f...'), its log tells me that it stops doing anything after "INFO org.kitesdk.morphline.api.MorphlineContext: Importing commands", because this is the last log entry before it repeatedly got restarted (every 30sec.).

If I remove the SolrSink from my flume config, the expected files are written to the HDFS sink, thereby the base workflow is fine.

 

For my testing I tried to use the Syslog-example provided in the SearchUserGuide (http://www.cloudera.com/content/cloudera-content/cloudera-docs/Search/latest/Cloudera-Search-User-Gu...

 

One strange thing is, how to configure the grok-dictionaries in the morphlines.conf while using ClouderaManager for configuring the stuff?

The configuration itself is clear, the text area in "Flume-NG Solr Sink", but how to reference the grok-dictionaries? just "dictionaryFiles : [grok-dictionaries]" or some path prefixes ?!?!

 

=========================

this is the log of the flume agent (while I am writing entries to the watched file, but nothing will be processed):

""

2014-04-30 15:42:37,285 INFO org.apache.flume.sink.hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
2014-04-30 15:44:16,448 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
2014-04-30 15:44:16,493 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/var/run/cloudera-scm-agent/process/1027-flume-AGENT/flume.conf
2014-04-30 15:44:16,506 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,507 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,507 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,508 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,508 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,508 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,509 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,509 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,510 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,510 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,510 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,511 INFO org.apache.flume.conf.FlumeConfiguration: Added sinks: HDFS solrSink Agent: agent
2014-04-30 15:44:16,511 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,512 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,512 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,513 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,561 INFO org.apache.flume.conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
2014-04-30 15:44:16,562 INFO org.apache.flume.node.AbstractConfigurationProvider: Creating channels
2014-04-30 15:44:16,580 INFO org.apache.flume.channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory
2014-04-30 15:44:16,592 INFO org.apache.flume.node.AbstractConfigurationProvider: Created channel memoryChannel
2014-04-30 15:44:16,594 INFO org.apache.flume.source.DefaultSourceFactory: Creating instance of source execSrc, type exec
2014-04-30 15:44:16,609 INFO org.apache.flume.sink.DefaultSinkFactory: Creating instance of sink: solrSink, type: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
2014-04-30 15:44:16,616 INFO org.apache.flume.sink.DefaultSinkFactory: Creating instance of sink: HDFS, type: hdfs
2014-04-30 15:44:17,477 INFO org.apache.flume.sink.hdfs.HDFSEventSink: Hadoop Security enabled: false
2014-04-30 15:44:17,481 INFO org.apache.flume.node.AbstractConfigurationProvider: Channel memoryChannel connected to [execSrc, solrSink, HDFS]
2014-04-30 15:44:17,509 INFO org.apache.flume.node.Application: Starting new configuration:{ sourceRunners:{execsrc=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:execSrc,state:IDLE} }} sinkRunners:{HDFS=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@9a87fad counterGroup:{ name:null counters:{} } }, solrSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@15563bcf counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }
2014-04-30 15:44:17,521 INFO org.apache.flume.node.Application: Starting Channel memoryChannel
2014-04-30 15:44:17,623 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.
2014-04-30 15:44:17,623 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started
2014-04-30 15:44:17,630 INFO org.apache.flume.node.Application: Starting Sink HDFS
2014-04-30 15:44:17,632 INFO org.apache.flume.node.Application: Starting Sink solrSink
2014-04-30 15:44:17,632 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: HDFS: Successfully registered new MBean.
2014-04-30 15:44:17,633 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SINK, name: HDFS started
2014-04-30 15:44:17,633 INFO org.apache.flume.sink.solr.morphline.MorphlineSink: Starting Morphline Sink solrSink (MorphlineSolrSink) ...
2014-04-30 15:44:17,633 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: solrSink: Successfully registered new MBean.
2014-04-30 15:44:17,633 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SINK, name: solrSink started
2014-04-30 15:44:17,634 INFO org.apache.flume.node.Application: Starting Source execSrc
2014-04-30 15:44:17,637 INFO org.apache.flume.source.ExecSource: Exec source starting with command:tail -F /tmp/spooldir/test.txt
2014-04-30 15:44:17,650 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: execSrc: Successfully registered new MBean.
2014-04-30 15:44:17,650 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: execSrc started
2014-04-30 15:44:17,687 INFO org.mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
2014-04-30 15:44:17,877 INFO org.mortbay.log: jetty-6.1.26
2014-04-30 15:44:17,956 INFO org.mortbay.log: Started SelectChannelConnector@0.0.0.0:41414
2014-04-30 15:44:18,134 INFO org.kitesdk.morphline.api.MorphlineContext: Importing commands
2014-04-30 15:45:00,994 INFO org.apache.flume.sink.hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false

""

 

This log lines are written every ~30sec.

 

=====flume config====

agent.sources = execSrc
agent.channels = memoryChannel
agent.sinks = HDFS solrSink

agent.sources.execSrc.type = exec
agent.sources.execSrc.command = tail -F /tmp/spooldir/test.txt
agent.sources.execSrc.interceptors.uuidinterceptor.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
agent.sources.execSrc.interceptors.uuidinterceptor.headerName = id
agent.sources.execSrc.interceptors.uuidinterceptor.preserveExisting = false
agent.sources.execSrc.interceptors.uuidinterceptor.prefix = myhostname
agent.sources.execSrc.channels = memoryChannel

agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1000

agent.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
agent.sinks.solrSink.channel = memoryChannel
agent.sinks.solrSink.batchSize = 1000
agent.sinks.solrSink.batchDurationMillis = 1000
agent.sinks.solrSink.morphlineFile = morphlines.conf
agent.sinks.solrSink.morphlineId = morphline1

agent.sinks.HDFS.channel = memoryChannel
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.hdfs.path = hdfs://hadoop-pg-6.cluster:8020/tmp/test4solr
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.writeFormat = Text
agent.sinks.HDFS.hdfs.batchSize = 2000
agent.sinks.HDFS.hdfs.rollSize = 0
agent.sinks.HDFS.hdfs.rollCount = 2000
agent.sinks.HDFS.hdfs.rollInterval = 30

 

======morphline config=======

# Specify server locations in a SOLR_LOCATOR variable; used later in variable substitutions:

SOLR_LOCATOR : {
collection : workshop
# ZooKeeper ensemble
zkHost : "$ZK_HOST"
# The maximum number of documents to send to Solr per network batch (throughput knob)
# batchSize : 100
}

morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.**", "org.apache.solr.**"]

commands : [
{ readLine { charset : UTF-8 } }
{ addCurrentTime {
field : manual_timestamp
preserveExisting : false
}
}
{
grok {
dictionaryFiles : [grok-dictionaries]
expressions : {
message : """<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA&colon;syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA&colon;syslog_message}"""
}
}
}

# convert timestamp field to native Solr timestamp format
# e.g. 2012-09-06T07:14:34Z to 2012-09-06T07:14:34.000Z
{
convertTimestamp {
field : created_at
inputFormats : ["yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd"]
inputTimezone : America/Los_Angeles
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
outputTimezone : UTC
}
}

# Recall that Solr throws an exception on any attempt to load a document that contains a
# field that isn't specified in schema.xml.
{
sanitizeUnknownSolrFields {
# Location from which to fetch Solr schema
solrLocator : ${SOLR_LOCATOR}
}
}

# log the record at DEBUG level to SLF4J
{ logDebug { format : "output record: {}", args : ["@{}"] } }


# load the record into a SolrServer
{
loadSolr {
solrLocator : ${SOLR_LOCATOR}
}
}
]
}
]

 

 

Additionally I wanted to ask where the logDebug output from the morphline will be written to? 

 

What do I need to modify to be able to ingest data into Solr?!?!

 

any help appreciated....

1 ACCEPTED SOLUTION

avatar
Super Collaborator
Interceptors are executed prior to Sinks.

If the UUIDInterceptor does nothing it's probably misconfigured or attached to the wrong channel in flume.conf, or similar.

Alternatively, consider replacing the UUIDInterceptor with a MorphlineInterceptor that uses the generateUUID command, or move the generateUUID command into the morphline config of the MorphlineSolrSink.

Also see http://kitesdk.org/docs/current/kite-morphlines/morphlinesReferenceGuide.html#/generateUUID

Wolfgang.

View solution in original post

10 REPLIES 10

avatar
Super Collaborator
Try to call it dictionaryFiles : [grok-dictionary.conf] perhttps://www.cloudera.com/content/cloudera-content/cloudera-docs/CM4Ent/4.8.1/Cloudera-Manager-Managing-Clust...

Wolfgang.

avatar
Guru

Hi Wolfgang,

 

thanks for answering. I modified that entry accordingly, but nothing has changed in the workflow (no data processing happens, no output in HDFS and solr).

The flume.log remains at (same output as in the original post):

...

2014-05-01 12:10:37,893 INFO org.mortbay.log: jetty-6.1.26
2014-05-01 12:10:37,990 INFO org.mortbay.log: Started SelectChannelConnector@0.0.0.0:41414
2014-05-01 12:10:38,179 INFO org.kitesdk.morphline.api.MorphlineContext: Importing commands

 

and restarts periodically, about every 90sec.

Additionally the status of this agent in CM is "bad", because of "
This role encountered 3 unexpected exit(s) in the previous 5 minute(s). Critical threshold: any."

 

Where do I have some more log output of what is going on in the morphline pipeline? There is no log/debug output in the flume log, where does the morphline debug message go to?!?!

 

getting confused 😉 , any help appreciated, Gerd

 

PS: if I disable the solr-morphline-sink I receive data on HDFS, thereby the error must be somewhere in the morphline processing (assumingly)

avatar
Super Collaborator
Weird, what (Solr, CDH, Cloudera Manager) version is this with?

To automatically print diagnostic information such as the content of records as they pass through the morphline commands, consider enabling TRACE log level, for example by adding the following line to your log4j.properties file, e.g. via Cloudera Manager, per http://kitesdk.org/docs/current/kite-morphlines/morphlinesReferenceGuide.html#/logTrace:

log4j.logger.org.kitesdk.morphline=TRACE


avatar
Guru

I am using:

ClouderaManager: Version: Cloudera Express 5.0.0 (#215 built by jenkins on 20140331-1424 git: 50c701f3e920b1fcf524bf5fa061d65902cde804)

Hadoop stack: CDH5.0.0-1.cdh5.0.0.p0.47 (with the included Solr)

 

I configured the TRACE in the corresponding "Advanced" section of the flume instance, but nothing has changed. After restarting the agent I cannot see any additional output in the /var/log/flume-ng/flume-cmf-flume-AGENT-hadoop-pg-7.cluster.log, and the data I insert into the file that is being watched (via tail -f ...) isn't being processed anyhow.

 

The only thing what happens is that the the agent seems to reset/restart periodically and CM shows that agent in state 'BAD' due to a lot of unexpected exits.

 

hmmm?!?!



avatar
Guru

Hi Wolfgang,

 

I tried a different approach, starting with a blank flume on a different cluster node. I configured the exact same settings for flume-/morphline-configuration. The behaviour is almost the same, data isn't being processed, but now I have an error message in the flume.log. Most probably you can interpret it much better 🙂 o.k., out of memory sounds obvious, but which setting does this affect ?

 

2014-05-01 15:05:11,724 DEBUG org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Checking file:/var/run/cloudera-scm-agent/process/1089-flume-AGENT/flume.conf for changes
2014-05-01 15:05:37,382 ERROR org.apache.flume.sink.hdfs.HDFSEventSink: process failed
2014-05-01 15:05:44,224 DEBUG org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Checking file:/var/run/cloudera-scm-agent/process/1089-flume-AGENT/flume.conf for changes
2014-05-01 15:05:44,977 ERROR org.apache.flume.lifecycle.LifecycleSupervisor: Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2ed7c530 counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.OutOfMemoryError: Java heap space
at java.util.zip.ZipCoder.toString(ZipCoder.java:49)
at java.util.zip.ZipFile.getZipEntry(ZipFile.java:531)
at java.util.zip.ZipFile.access$900(ZipFile.java:56)
at java.util.zip.ZipFile$1.nextElement(ZipFile.java:513)
at java.util.zip.ZipFile$1.nextElement(ZipFile.java:483)
at java.util.jar.JarFile$1.nextElement(JarFile.java:243)
at java.util.jar.JarFile$1.nextElement(JarFile.java:238)
at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath$Scanner.scanJar(ClassPath.java:343)
at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath$Scanner.scanFrom(ClassPath.java:288)
at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath$Scanner.scan(ClassPath.java:276)
at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath.from(ClassPath.java:84)
at org.kitesdk.morphline.api.MorphlineContext.getTopLevelClasses(MorphlineContext.java:134)
at org.kitesdk.morphline.api.MorphlineContext.importCommandBuilders(MorphlineContext.java:77)
at org.kitesdk.morphline.stdlib.Pipe.<init>(Pipe.java:43)
at org.kitesdk.morphline.stdlib.PipeBuilder.build(PipeBuilder.java:40)
at org.kitesdk.morphline.base.Compiler.compile(Compiler.java:126)
at org.kitesdk.morphline.base.Compiler.compile(Compiler.java:55)
at org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl.configure(MorphlineHandlerImpl.java:101)
at org.apache.flume.sink.solr.morphline.MorphlineSink.start(MorphlineSink.java:97)
at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
2014-05-01 15:05:44,982 INFO org.apache.flume.sink.solr.morphline.MorphlineSink: Morphline Sink solrSink stopping...
2014-05-01 15:05:44,982 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SINK, name: solrSink stopped
2014-05-01 15:05:44,982 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: solrSink. sink.start.time == 1398949449905
2014-05-01 15:05:44,982 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: solrSink. sink.stop.time == 1398949544982
2014-05-01 15:05:44,982 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: solrSink. sink.batch.complete == 0
2014-05-01 15:05:44,982 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: solrSink. sink.batch.empty == 0
2014-05-01 15:05:44,982 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: solrSink. sink.batch.underflow == 0
2014-05-01 15:05:44,982 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: solrSink. sink.connection.closed.count == 0
2014-05-01 15:05:44,982 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: solrSink. sink.connection.creation.count == 0
2014-05-01 15:05:44,983 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: solrSink. sink.connection.failed.count == 0
2014-05-01 15:05:44,983 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: solrSink. sink.event.drain.attempt == 0
2014-05-01 15:05:44,983 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: solrSink. sink.event.drain.sucess == 0
2014-05-01 15:05:44,983 INFO org.apache.flume.sink.solr.morphline.MorphlineSink: Morphline Sink solrSink stopped. Metrics: SINK:solrSink{sink.connection.closed.count=0, sink.event.drain.attempt=0, sink.batch.underflow=0, sink.connection.failed.count=0, sink.connection.creation.count=0, sink.event.drain.sucess=0, sink.batch.empty=0, sink.batch.complete=0}, {}
2014-05-01 15:05:44,983 WARN org.apache.flume.lifecycle.LifecycleSupervisor: Component SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2ed7c530 counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies
2014-05-01 15:06:14,979 DEBUG org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Checking file:/var/run/cloudera-scm-agent/process/1089-flume-AGENT/flume.conf for changes

 

br, Gerd

avatar
Super Collaborator
OOM would explain it. The flume default setting for jvm memory is very low. Try something like -Xmx512m -XX:MaxPermSize=256m

avatar
Guru

Hi Wolfgang, again me 😉

 

things are getting better. Seems like the increased memory setting saves my day.

Now I am getting the expected DEBUG output from the morphline, but the document doesn't fit into Solr because the required field "id" is missing. Yes, of course Solr wouldn't accept such a document, but I configured flume to insert a header field called "id" filled by the UUIDInterceptor.

The DEBUG output is:

""

2014-05-01 17:04:46,280 DEBUG org.kitesdk.morphline.stdlib.LogDebugBuilder$LogDebug: 1 : [{message=[<168>Feb 4 10:46:14 syslog sshd[607]: listening on 0.0.0.0 port 22.]}]

""

This is the whole line of text read by ReadLine. Also the following Grok regex is parsing correctly but I have no idea how to access the header field "id" from the UUIDInterceptor.

 

How do I have to handle that additional field to be put into the corresponding document-part being inserted into Solr ?!?!

How can I check the header fields of the incoming event at all?

 

best, Gerd

avatar
Super Collaborator
Interceptors are executed prior to Sinks.

If the UUIDInterceptor does nothing it's probably misconfigured or attached to the wrong channel in flume.conf, or similar.

Alternatively, consider replacing the UUIDInterceptor with a MorphlineInterceptor that uses the generateUUID command, or move the generateUUID command into the morphline config of the MorphlineSolrSink.

Also see http://kitesdk.org/docs/current/kite-morphlines/morphlinesReferenceGuide.html#/generateUUID

Wolfgang.

avatar
Guru

Hi Wolfgang,

 

finally I got it (almost) to work, just some schema.xml settings to adjust.

 

Many thanks for your great support, let's celebrate that at BerlinBuzz 😉

 

br, Gerd