Support Questions

Find answers, ask questions, and share your expertise

Isolation between Flume Channels?

avatar
Expert Contributor

CDH5.2 installed with Cloudera Manager and Parcels

 

Are Flume Channels isolated with each other? It seems when I have problem with a channel, other channel is affected.

 

I want to record and process Syslog data with Flume using 2 Channel+Sink (channels are replicating) as follows:

  1. Memory Channel + HDFSSink (hdfschannel+hdfssink) to write raw Syslog records to HDFS
  2. Optional File Channel + Avro Sink (avrochannel+avrosink) to send the Syslog records to Spark Streaming to further process. Since the processing can be reproduced using raw data, the Avro channel is optional.

When Spark Streaming is running, the above works well. Data were handled by both sink correctly.

 

However, when the Spark Streaming job hanged / stopped, the avrochannnel had network related exceptions and ChannelFullException. This is understandable because the events could not be sent. The problem was that the amount of raw data logged by hdfschannel+hdfssink became around 1-2% of normal condition.

 

Is this expected? I don't understand why error with an optional channel affect others.
(Note: the use of File Channel was historical. But this seems not the cause of the behaviour anyway?)

 

1 ACCEPTED SOLUTION

avatar
Expert Contributor

Replying myself. I worked around this with Sink Groups and a Null Sink.

 

Relevant settings in flume.conf

a1.sinks = hdfssink avrosink nullsink

a1.sinkgroups = avrosinkgroup
a1.sinkgroups.avrosinkgroup.sinks = avrosink nullsink
a1.sinkgroups.avrosinkgroup.processor.type = failover
a1.sinkgroups.avrosinkgroup.processor.priority.avrosink = 100
a1.sinkgroups.avrosinkgroup.processor.priority.nullsink = 10

a1.sinks.nullsink.type = null
a1.sinks.nullsink.channel = avrochannel
a1.sinks.nullsink.batchsize = 10000

 
The end result is that avrochannel use the high priority avrosink (priority=100) normally. If this sink fails, it failover to the low prioirty nullsink, which simply discard the events.

 

PS:

  1. Upgraded to CDH5.5.1, which bundles Flume 1.6
  2. This works with Spark Streaming "Flume-style Push-based Approach" (sink type=avro), but not "Pull-based Approach using a Custom Sink" (sink type=org.apache.spark.streaming.flume.sink.SparkSink). Guess the custom sink refuse to admit fail because of fault-tolerance guarantees. Reference: http://spark.apache.org/docs/latest/streaming-flume-integration.html

View solution in original post

1 REPLY 1

avatar
Expert Contributor

Replying myself. I worked around this with Sink Groups and a Null Sink.

 

Relevant settings in flume.conf

a1.sinks = hdfssink avrosink nullsink

a1.sinkgroups = avrosinkgroup
a1.sinkgroups.avrosinkgroup.sinks = avrosink nullsink
a1.sinkgroups.avrosinkgroup.processor.type = failover
a1.sinkgroups.avrosinkgroup.processor.priority.avrosink = 100
a1.sinkgroups.avrosinkgroup.processor.priority.nullsink = 10

a1.sinks.nullsink.type = null
a1.sinks.nullsink.channel = avrochannel
a1.sinks.nullsink.batchsize = 10000

 
The end result is that avrochannel use the high priority avrosink (priority=100) normally. If this sink fails, it failover to the low prioirty nullsink, which simply discard the events.

 

PS:

  1. Upgraded to CDH5.5.1, which bundles Flume 1.6
  2. This works with Spark Streaming "Flume-style Push-based Approach" (sink type=avro), but not "Pull-based Approach using a Custom Sink" (sink type=org.apache.spark.streaming.flume.sink.SparkSink). Guess the custom sink refuse to admit fail because of fault-tolerance guarantees. Reference: http://spark.apache.org/docs/latest/streaming-flume-integration.html