Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Flume spooldir writes smaller files

Flume spooldir writes smaller files

New Contributor

Hello,

 

Below is my flume agent configuration:

#Define source channel sink
tier1.sources=source1 source1-sub1 source1-sub2 source1-sub3 source1-sub4
tier1.channels=channel1
tier1.sinks=sink1

#Define the sources as exec type
tier1.sources.source1.type=spooldir
tier1.sources.source1-sub1.type=spooldir
tier1.sources.source1-sub2.type=spooldir
tier1.sources.source1-sub3.type=spooldir
tier1.sources.source1-sub4.type=spooldir

tier1.sources.source1.spoolDir= /home/dd2500-50/debug
tier1.sources.source1-sub1.spoolDir= /home/dd2500-50/debug/cifs
tier1.sources.source1-sub2.spoolDir= /home/dd2500-50/debug/ost
tier1.sources.source1-sub3.spoolDir= /home/dd2500-50/debug/sm
tier1.sources.source1-sub4.spoolDir= /home/dd2500-50/debug/platform

tier1.sources.source1.deletePolicy= immediate
tier1.sources.source1.deserializer= LINE
tier1.sources.source1-sub1.deletePolicy= immediate
tier1.sources.source1-sub1.deserializer= LINE
tier1.sources.source1-sub2.deletePolicy= immediate
tier1.sources.source1-sub2.deserializer= LINE
tier1.sources.source1-sub3.deletePolicy= immediate
tier1.sources.source1-sub3.deserializer= LINE
tier1.sources.source1-sub4.deletePolicy= immediate
tier1.sources.source1-sub4.deserializer= LINE

#Define the channel as filechannel
tier1.channels.channel1.type=file
tier1.channels.channel1.capacity=100000
tier1.channels.channel1.transactionCapacity=100000
tier1.channels.channel1.checkpointDir=/flumefilechannel/chkpointdir
tier1.channels.channel1.dataDirs=/flumefilechannel/datadir

#Bind the source chanel and sink
tier1.sources.source1.channels=channel1
tier1.sources.source1-sub1.channels=channel1
tier1.sources.source1-sub2.channels=channel1
tier1.sources.source1-sub3.channels=channel1
tier1.sources.source1-sub4.channels=channel1
tier1.sinks.sink1.channel=channel1

#Define sink as HDFS sink
tier1.sinks.sink1.type=hdfs
tier1.sinks.sink1.hdfs.path=hdfs://qaia-nn1.qaia.local.chaos.local/flume/dd2500-50
tier1.sinks.sink1.hdfs.fileType=DataStream
tier1.sinks.sink1.hdfs.rollSize=209715200
tier1.sinks.sink1.hdfs.rollInterval=0
tier1.sinks.sink1.hdfs.rollCount=0
tier1.sinks.sink1.hdfs.batchSize=10000

 

The directories part of the spoolDIR( /home/dd2500-50/debug etc) have log files that are plain log files, logrotated log files(logfile.1, logfile.2 etc) and compressed log files(logfile.1.gz etc). Copied the folders and files from a NFS mount point to local disk that has the spoolDIR directories ( /home/dd2500-50/debug etc).

 

Issue1:

Very small kbs or bytes of files are getting created in HDFS.

Issue2:

No all files are getting copied.

 

Below is the error message from flume log:

2016-07-12 14:45:13,429 ERROR org.apache.flume.source.SpoolDirectorySource: FATAL: Spool Directory source source1-sub3: { spoolDir: /home/dd2500-50/debug/sm }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
at java.nio.charset.CoderResult.throwException(CoderResult.java:277)
at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:282)
at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:133)
at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:71)
at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:90)
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:252)
at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Don't have an account?
Coming from Hortonworks? Activate your account here