Member since
03-02-2015
6
Posts
0
Kudos Received
0
Solutions
12-05-2016
12:18 PM
Hello Tristan Thanks a lot for your response, as you said, the issue was on the batchSize configuration of the kafka sink. Given that i only expected a couple of messages per second, having a batchsize of 10 was not needed. Putting the batch Size equal to 1 solved the "latency" I was seeing. I guess that if the messages arrive at a rate of thousands per second, having a larger batchSize could be much more efficient. At the end I guess it was more of a problem of type PEBKAC than Flume's problem haha! 😛 Just to let you know, I'm using a somewhat "older" distribution (CDH 5.5), so I don't have the newer performance improvements you linked me, however, the problem was removed changing the batchSize configuration as I said before. We are planing to upgrade our distribution in the coming months so I hope to use the newer performance enhancements soon!. Again, thanks a lot for your help and have a nice day! Rafa
... View more
12-05-2016
07:00 AM
Good Morning Everyone! I've been trying to use the Flume's kafka sink to send some transactional information to another system that consumes the kafka queue. The problem is not the performance of flume (That I know of), any message that is sent to flume is consumed and sent to the kafka sink, however, the message does not appear in the kafka que for the next 3 seconds. It takes too much time for the message to be seen in the kafka queue. I think it might me a possible kafka sink configuration, buy I'm not sure. My flume setup is like this: - Memory channel - Custom source (the source pulls data from a database and send the information through the channel) - Kafka Sink I start counting the time to reach the kafka queue, form the time the source sends the message to the channel. This agent does not have to handle a lot of messages (Around 1-2 mesages per second) however, I'm concerned of the time it takes to reach the kafka queue. This is my kafka sink configuration: a3.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a3.sinks.k1.brokerList = sbmdeqpc02:9092,sbmdeqpc03:9092,sbmdeqpc04:9092 a3.sinks.k1.topic = aud-50 a3.sinks.k1.batchSize = 10 I've tried to change the batchSize Configuration but doesn't seem to change the latency. this is the topic description for the topic/queue Topic:aud-50 PartitionCount:3 ReplicationFactor:1 Configs:retention.ms=86400000 Topic: aud-50 Partition: 0 Leader: 183 Replicas: 183 Isr: 183 Topic: aud-50 Partition: 1 Leader: 181 Replicas: 181 Isr: 181 Topic: aud-50 Partition: 2 Leader: 182 Replicas: 182 Isr: 182 Does anyone have this issue?, a kafka sink taking too long to put messages to the queue?. Any help is welcome.. Thanks for your help. Kind regards. Rafa
... View more
Labels:
- Labels:
-
Apache Flume
12-22-2015
01:29 PM
Hello pdvorak!!! Thank you very much. i don't know why I was trying to use the memory parameters with a -Dproperty value. (brain freeze I guess) using your suggestion was great and it worked perfectly!. Thanks again!!, Kind Regards rafa
... View more
12-22-2015
07:55 AM
Good Morning everyone I've been trying to get the syslog->Solr example specified here . It is a fairly simple example using: syslog source memory channel MorphlineSolr Sink The flume configuration file I used is: a1.sources = r1
a1.channels = c1
a1.sinks = k1
#source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = xxxx
a1.sources.r1.host = xxx.xxx.xxx.xxx
#sink
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.morphlineFile = /route/to/the/morphline.conf
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#a1.channels.c1.transactionCapacity = 10000
#connect
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 The morphline files I used was the same provided in the example above: morphlines : [
{
id : morphline1
importCommands : ["com.cloudera.**", "org.kitesdk.**", "org.apache.solr.**"]
commands : [
{
readLine {
charset : UTF-8
}
}
{
grok {
dictionaryFiles : [/route/to/the/morphline/grok-dictonaries]
expressions : {
message : """<%{POSINT:priority}>%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:hostname} %{DATA:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:msg}"""
}
}
}
{
convertTimestamp {
field : timestamp
inputFormats : [ "yyyy-MM-dd'T'HH:mm:ss'Z'", "MMM d HH:mm:ss" ]
inputTimezone : America/Bogota
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
outputTimezone : UTC
}
}
{
sanitizeUnknownSolrFields {
# Location from which to fetch Solr schema
solrLocator: {
collection: syslogs
zkHost: "zkEnsembleAddresses"
}
}
}
# log the record at INFO level to SLF4J
{ logInfo { format : "output record: {}", args : ["@{}"] } }
{
loadSolr {
solrLocator : {
collection: syslogs
zkHost: "zkEnsembleAddresses"
}
}
}
]
}
] The morphlines file specify the folowing chain of commands: - read the line - grok messages with an expression - convert the timestamp - filter unsanitized fields - put the data in solr However when I try to start the flume agent it always throws the error: java.lang.OutOfMemoryError: GC overhead limit exceeded it always shows the same error after the message: INFO api.MorphlineContext: Importing commands flume never gets to start the solr sink. It seems that there is not enough memory in flume to start the sink. So I modified the /etc/flume-ng/conf/flume-env.sh file and uncommented the JAVA_OPTS LINE. the uncommented line was this: export JAVA_OPTS="-Xms2048m -Xmx204800m -Dcom.sun.management.jmxremote" Basically I was giving 2GB of starting heap space to java (a maximum limit of 200GB - the machines in the cluster have a lot of Memory). The error is still the same :(. Then I modified the command line to start the flume agent trying to increase the java memory: flume-ng agent -n a1 -f flume_config.conf -Dproperty="-Xms1024m -Xmx=204800m" And the error still keeps appearing. I don't really know if I am not giving the correct memory options or in the places that I should, but this problem is getting me (more) bald !. Any pointers would be very much appreciated. Thanks for your support Rafa PS.., just in case this is the stacktrace of the error: 15/12/22 10:53:51 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
15/12/22 10:53:51 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:flume_config_e.conf
15/12/22 10:53:51 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
15/12/22 10:53:51 INFO conf.FlumeConfiguration: Processing:k1
15/12/22 10:53:51 INFO conf.FlumeConfiguration: Processing:k1
15/12/22 10:53:51 INFO conf.FlumeConfiguration: Processing:k1
15/12/22 10:53:51 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
15/12/22 10:53:51 INFO node.AbstractConfigurationProvider: Creating channels
15/12/22 10:53:51 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
15/12/22 10:53:51 INFO node.AbstractConfigurationProvider: Created channel c1
15/12/22 10:53:51 INFO source.DefaultSourceFactory: Creating instance of source r1, type exec
15/12/22 10:53:51 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
15/12/22 10:53:51 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
15/12/22 10:53:51 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@3927ce5e counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
15/12/22 10:53:51 INFO node.Application: Starting Channel c1
15/12/22 10:53:51 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
15/12/22 10:53:51 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
15/12/22 10:53:51 INFO node.Application: Starting Sink k1
15/12/22 10:53:51 INFO morphline.MorphlineSink: Starting Morphline Sink k1 (MorphlineSolrSink) ...
15/12/22 10:53:51 INFO node.Application: Starting Source r1
15/12/22 10:53:51 INFO source.ExecSource: Exec source starting with command:tail -f /var/logs/flume-ng/flume-cmf-flume-AGENT-sbmdeqpc01.ambientesbc.lab.log
15/12/22 10:53:51 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
15/12/22 10:53:51 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
15/12/22 10:53:51 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
15/12/22 10:53:51 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
15/12/22 10:53:51 INFO source.ExecSource: Command [tail -f /var/logs/flume-ng/flume-cmf-flume-AGENT-sbmdeqpc01.ambientesbc.lab.log] exited with 1
15/12/22 10:53:51 INFO api.MorphlineContext: Importing commands
15/12/22 10:53:55 ERROR lifecycle.LifecycleSupervisor: Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@3927ce5e counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.String.replace(String.java:2021)
at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath.getClassName(ClassPath.java:403)
at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath$ClassInfo.<init>(ClassPath.java:193)
at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath$ResourceInfo.of(ClassPath.java:141)
at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath$Scanner.scanJar(ClassPath.java:345)
at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath$Scanner.scanFrom(ClassPath.java:286)
at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath$Scanner.scan(ClassPath.java:274)
at org.kitesdk.morphline.shaded.com.google.common.reflect.ClassPath.from(ClassPath.java:82)
at org.kitesdk.morphline.api.MorphlineContext.getTopLevelClasses(MorphlineContext.java:149)
at org.kitesdk.morphline.api.MorphlineContext.importCommandBuilders(MorphlineContext.java:91)
at org.kitesdk.morphline.stdlib.Pipe.<init>(Pipe.java:43)
at org.kitesdk.morphline.stdlib.PipeBuilder.build(PipeBuilder.java:40)
at org.kitesdk.morphline.base.Compiler.compile(Compiler.java:126)
at org.kitesdk.morphline.base.Compiler.compile(Compiler.java:55)
at org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl.configure(MorphlineHandlerImpl.java:101)
at org.apache.flume.sink.solr.morphline.MorphlineSink.start(MorphlineSink.java:97)
at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/12/22 10:53:55 INFO morphline.MorphlineSink: Morphline Sink k1 stopping...
15/12/22 10:53:55 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 stopped
... View more
Labels:
- Labels:
-
Apache Flume