Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spooldir & HBase Custom Sink: prevent files from being marked ".completed"

Spooldir & HBase Custom Sink: prevent files from being marked ".completed"

New Contributor

Dear all,

 

I have a question concerning the SpoolDir source.

 

Is there a way to prevent the SpoolDir source from marking files as ".completed" when an error occures in the sink?

 

My scenario:

 

I have a SpoolDir Source and a custom HBase sink serializer (which implements the HbaseEventSerializer).

My sink type is org.apache.flume.sink.hbase.HBaseSink.

 

Thank you very much in advance,

Fabian

2 REPLIES 2

Re: Spooldir & HBase Custom Sink: prevent files from being marked ".completed"

Master Guru
Can you share your agent configuration? Do you use a reliable channel between the source and sink?
Highlighted

Re: Spooldir & HBase Custom Sink: prevent files from being marked ".completed"

New Contributor

I was not aware of somethink like a "reliable channel".

I will see what I can find out about it, thanks.

 

Edit:

By "reliable channel" you are refering to a file channel instead of a memory channel?

 

Using a memory channel should be okay since it does not have an impact on e.g. my custom sink serializer encountering an error and throwing a FlumeException.

 

So I would be more interested in this scenario.

 

This is my agent configuration:

 

# Name the components on this agent
agent.sources = s1
agent.sinks = k1
agent.channels = c1

# Describe/configure the source
agent.sources.s1.type=spooldir
agent.sources.s1.spoolDir=/var/csvs/source
agent.sources.s1.fileHeader = true

# Describe the sink

agent.sinks.k1.table = customsink
agent.sinks.k1.columnFamily = cf

agent.sinks.k1.channel = c1

agent.sinks.k1.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.k1.serializer = custom.SplittingSerializer
agent.sinks.k1.serializer.delimiter = ;
agent.sinks.k1.serializer.timezonediff = 1


# Use a channel which buffers events in memory
agent.channels.c1.type = memory
agent.channels.c1.capacity = 100000
agent.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
agent.sources.s1.channels = c1
agent.sinks.k1.channel = c1