Created on 04-30-2014 06:50 AM - edited 09-16-2022 01:58 AM
Hi,
I wanted to build a pipeline of ingesting log data via flume into solr, sounds like nothing special....
But I get stuck at starting the flume agent (with exec source 'tail -f...'), its log tells me that it stops doing anything after "INFO org.kitesdk.morphline.api.MorphlineContext: Importing commands", because this is the last log entry before it repeatedly got restarted (every 30sec.).
If I remove the SolrSink from my flume config, the expected files are written to the HDFS sink, thereby the base workflow is fine.
For my testing I tried to use the Syslog-example provided in the SearchUserGuide (http://www.cloudera.com/content/cloudera-content/cloudera-docs/Search/latest/Cloudera-Search-User-Gu...
One strange thing is, how to configure the grok-dictionaries in the morphlines.conf while using ClouderaManager for configuring the stuff?
The configuration itself is clear, the text area in "Flume-NG Solr Sink", but how to reference the grok-dictionaries? just "dictionaryFiles : [grok-dictionaries]" or some path prefixes ?!?!
=========================
this is the log of the flume agent (while I am writing entries to the watched file, but nothing will be processed):
""
2014-04-30 15:42:37,285 INFO org.apache.flume.sink.hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
2014-04-30 15:44:16,448 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
2014-04-30 15:44:16,493 INFO org.apache.flume.node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/var/run/cloudera-scm-agent/process/1027-flume-AGENT/flume.conf
2014-04-30 15:44:16,506 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,507 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,507 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,508 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,508 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,508 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,509 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,509 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,510 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,510 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,510 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,511 INFO org.apache.flume.conf.FlumeConfiguration: Added sinks: HDFS solrSink Agent: agent
2014-04-30 15:44:16,511 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,512 INFO org.apache.flume.conf.FlumeConfiguration: Processing:solrSink
2014-04-30 15:44:16,512 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,513 INFO org.apache.flume.conf.FlumeConfiguration: Processing:HDFS
2014-04-30 15:44:16,561 INFO org.apache.flume.conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
2014-04-30 15:44:16,562 INFO org.apache.flume.node.AbstractConfigurationProvider: Creating channels
2014-04-30 15:44:16,580 INFO org.apache.flume.channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory
2014-04-30 15:44:16,592 INFO org.apache.flume.node.AbstractConfigurationProvider: Created channel memoryChannel
2014-04-30 15:44:16,594 INFO org.apache.flume.source.DefaultSourceFactory: Creating instance of source execSrc, type exec
2014-04-30 15:44:16,609 INFO org.apache.flume.sink.DefaultSinkFactory: Creating instance of sink: solrSink, type: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
2014-04-30 15:44:16,616 INFO org.apache.flume.sink.DefaultSinkFactory: Creating instance of sink: HDFS, type: hdfs
2014-04-30 15:44:17,477 INFO org.apache.flume.sink.hdfs.HDFSEventSink: Hadoop Security enabled: false
2014-04-30 15:44:17,481 INFO org.apache.flume.node.AbstractConfigurationProvider: Channel memoryChannel connected to [execSrc, solrSink, HDFS]
2014-04-30 15:44:17,509 INFO org.apache.flume.node.Application: Starting new configuration:{ sourceRunners:{execsrc=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:execSrc,state:IDLE} }} sinkRunners:{HDFS=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@9a87fad counterGroup:{ name:null counters:{} } }, solrSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@15563bcf counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }
2014-04-30 15:44:17,521 INFO org.apache.flume.node.Application: Starting Channel memoryChannel
2014-04-30 15:44:17,623 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.
2014-04-30 15:44:17,623 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started
2014-04-30 15:44:17,630 INFO org.apache.flume.node.Application: Starting Sink HDFS
2014-04-30 15:44:17,632 INFO org.apache.flume.node.Application: Starting Sink solrSink
2014-04-30 15:44:17,632 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: HDFS: Successfully registered new MBean.
2014-04-30 15:44:17,633 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SINK, name: HDFS started
2014-04-30 15:44:17,633 INFO org.apache.flume.sink.solr.morphline.MorphlineSink: Starting Morphline Sink solrSink (MorphlineSolrSink) ...
2014-04-30 15:44:17,633 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: solrSink: Successfully registered new MBean.
2014-04-30 15:44:17,633 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SINK, name: solrSink started
2014-04-30 15:44:17,634 INFO org.apache.flume.node.Application: Starting Source execSrc
2014-04-30 15:44:17,637 INFO org.apache.flume.source.ExecSource: Exec source starting with command:tail -F /tmp/spooldir/test.txt
2014-04-30 15:44:17,650 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: execSrc: Successfully registered new MBean.
2014-04-30 15:44:17,650 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: execSrc started
2014-04-30 15:44:17,687 INFO org.mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
2014-04-30 15:44:17,877 INFO org.mortbay.log: jetty-6.1.26
2014-04-30 15:44:17,956 INFO org.mortbay.log: Started SelectChannelConnector@0.0.0.0:41414
2014-04-30 15:44:18,134 INFO org.kitesdk.morphline.api.MorphlineContext: Importing commands
2014-04-30 15:45:00,994 INFO org.apache.flume.sink.hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
""
This log lines are written every ~30sec.
=====flume config====
agent.sources = execSrc
agent.channels = memoryChannel
agent.sinks = HDFS solrSink
agent.sources.execSrc.type = exec
agent.sources.execSrc.command = tail -F /tmp/spooldir/test.txt
agent.sources.execSrc.interceptors.uuidinterceptor.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
agent.sources.execSrc.interceptors.uuidinterceptor.headerName = id
agent.sources.execSrc.interceptors.uuidinterceptor.preserveExisting = false
agent.sources.execSrc.interceptors.uuidinterceptor.prefix = myhostname
agent.sources.execSrc.channels = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1000
agent.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
agent.sinks.solrSink.channel = memoryChannel
agent.sinks.solrSink.batchSize = 1000
agent.sinks.solrSink.batchDurationMillis = 1000
agent.sinks.solrSink.morphlineFile = morphlines.conf
agent.sinks.solrSink.morphlineId = morphline1
agent.sinks.HDFS.channel = memoryChannel
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.hdfs.path = hdfs://hadoop-pg-6.cluster:8020/tmp/test4solr
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.writeFormat = Text
agent.sinks.HDFS.hdfs.batchSize = 2000
agent.sinks.HDFS.hdfs.rollSize = 0
agent.sinks.HDFS.hdfs.rollCount = 2000
agent.sinks.HDFS.hdfs.rollInterval = 30
======morphline config=======
# Specify server locations in a SOLR_LOCATOR variable; used later in variable substitutions:
SOLR_LOCATOR : {
collection : workshop
# ZooKeeper ensemble
zkHost : "$ZK_HOST"
# The maximum number of documents to send to Solr per network batch (throughput knob)
# batchSize : 100
}
morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
commands : [
{ readLine { charset : UTF-8 } }
{ addCurrentTime {
field : manual_timestamp
preserveExisting : false
}
}
{
grok {
dictionaryFiles : [grok-dictionaries]
expressions : {
message : """<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}"""
}
}
}
# convert timestamp field to native Solr timestamp format
# e.g. 2012-09-06T07:14:34Z to 2012-09-06T07:14:34.000Z
{
convertTimestamp {
field : created_at
inputFormats : ["yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd"]
inputTimezone : America/Los_Angeles
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
outputTimezone : UTC
}
}
# Recall that Solr throws an exception on any attempt to load a document that contains a
# field that isn't specified in schema.xml.
{
sanitizeUnknownSolrFields {
# Location from which to fetch Solr schema
solrLocator : ${SOLR_LOCATOR}
}
}
# log the record at DEBUG level to SLF4J
{ logDebug { format : "output record: {}", args : ["@{}"] } }
# load the record into a SolrServer
{
loadSolr {
solrLocator : ${SOLR_LOCATOR}
}
}
]
}
]
Additionally I wanted to ask where the logDebug output from the morphline will be written to?
What do I need to modify to be able to ingest data into Solr?!?!
any help appreciated....
Created 05-01-2014 09:37 AM
Created 05-01-2014 02:28 PM