Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark textFileStream leaves some files behind

Highlighted

Spark textFileStream leaves some files behind

Explorer

We are using Java and Spark 1.4 to process a streaming set of text files with a 60 second batch duration and then move them into an archive directory. The code apparently works fine - it will pick up and process the text files - but the code does not always grab all of the files in the source directory. These files are not then subsequently picked up during later runs and sit in the source directory.

jsc.sparkContext().hadoopConfiguration().set("spark.streaming.fileStream.minRememberDuration", "6000");
UserGroupInformation.setConfiguration(jsc.sparkContext().hadoopConfiguration());
UserGroupInformation.loginUserFromKeytab(cli.getOptionValue(KERBEROS_USER), cli.getOptionValue(KERBEROS_KEYTAB));

// Adding Keytab
jsc.sparkContext().addFile(cli.getOptionValue(KERBEROS_KEYTAB));
jsc.sparkContext().hadoopConfiguration().set("textinputformat.record.delimiter", "\u0003");


HBaseAdmin.checkHBaseAvailable(jsc.sparkContext().hadoopConfiguration());
LOG.info("HBase is running!");

JavaDStream<String> inputStream = jsc.textFileStream(cli.getOptionValue(INPUT_DIRECTORY));
//JavaPairInputDStream<LongWritable, Text> inputFStream = jsc.fileStream(cli.getOptionValue(INPUT_DIRECTORY), LongWritable.class, Text.class, TextInputFormat.class, new NoFilter(), false);
//JavaDStream<String> inputStream = inputFStream.map(new MapTextPairToString());
JavaDStream<Record> records = inputStream.flatMap(new CacheFileStreamFlatMap()).persist();

I've tried setting spark.streaming.fileStream.minRememberDuration to 6000 and I've tried switching from textFileStream to fileStream with the appropriate changes in the code. How can I get Spark to process all of the files in the directory and to not have the zombie files hanging around?

2 REPLIES 2
Highlighted

Re: Spark textFileStream leaves some files behind

Explorer

Currently, I'm implementing a Custom Receiver that reads in the files and then moves the processed files into an archive directory.

public class ProcessingFileReceiver extends Receiver<String> {


	private static final String NEWLINE = "\n";
	private static final long serialVersionUID = 471108203753777978L;
	private static final Logger LOG = LoggerFactory.getLogger(ProcessingFileReceiver.class);
	private static final int RECORD_DELIMITER_CHARACTER_NUMBER = 3;


	private String sourceDirectory;
	private String archiveDirectory;
	private FileSystem fs;
	private Path sourcePath;
	private CompressionCodecFactory compressionCodecs  = null;
	private LineReader in;


	public ProcessingFileReceiver(StorageLevel storageLevel, String sourceDirectory, String archiveDirectory) {
		super(storageLevel);
		this.sourceDirectory = sourceDirectory;
		this.archiveDirectory = archiveDirectory;


	}


	public Configuration getConfiguration() {
		return new Configuration();
	}


	@Override
	public void onStart() {


		sourcePath = new Path(sourceDirectory);
		try {
			fs = sourcePath.getFileSystem(getConfiguration());


			new Thread() {
				@Override
				public void run() {
					try {
						receive();
					} catch (FileNotFoundException e) {
						LOG.error("FileNotFound", e);
					} catch (IOException e) {
						LOG.error("IOException", e);
					} catch (InterruptedException e) {
						LOG.error("InterruptedException" ,e);
					}
				}
			}.start();
		} catch (IOException e) {
			LOG.error("IOException", e);
		}
	}


	protected void receive() throws FileNotFoundException, IOException, InterruptedException {
		while (!isStopped()) {
			FileStatus[] files = fs.listStatus(sourcePath);
			if (LOG.isDebugEnabled())
				LOG.debug(files.length + " files in the source directory.");
			for (FileStatus file : files) {
				readFileIntoStream(file);
				// move File
				moveFile(file);


			}
			if (files.length == 0) {
				Thread.sleep(1000);
			}
		}
	}


	private void readFileIntoStream(FileStatus file) throws IOException {
		TextInputFormat f;
		FSDataInputStream fileIn = fs.open(file.getPath());
		compressionCodecs = new CompressionCodecFactory(getConfiguration());
		final CompressionCodec codec = compressionCodecs.getCodec(file.getPath());
		in = new LineReader(codec.createInputStream(fileIn), getConfiguration());
		while (!isStopped()) {
			Text value = readRecord();
			if (value.getLength() > 0) {
				store(value.toString());
			} else {
				break;
			}
		}
		if (LOG.isDebugEnabled())
			LOG.debug("Closing " + file.getPath());
		fileIn.close();
	}


	private void moveFile(FileStatus file) throws IOException {
		Path destinationParent = new Path(archiveDirectory);
		Path destination = new Path(destinationParent, FilenameUtils.getName(file.getPath().toString()));
		if (LOG.isDebugEnabled())
			LOG.debug("Moving " + file.getPath() + " to " + destination);
		fs.rename(file.getPath(), destination);
	}


	private Text readRecord() throws IOException {
		StringBuilder sb = new StringBuilder();
		Text value = new Text();
		while (!isStopped()) {
			int count = in.readLine(value);
			if (count == 0) {
				break;
			}
			sb.append(value.toString()).append(NEWLINE);
			if (value.getLength() > 0 && value.getBytes()[value.getLength() - 1] == RECORD_DELIMITER_CHARACTER_NUMBER) {
				value = new Text(sb.toString());
				break;
			}
		}
		return value;
	}


	@Override
	public void onStop() {
		// No-op. Handled by isStopping in receive method
	}
}
Highlighted

Re: Spark textFileStream leaves some files behind

New Contributor

you can do it like this:

jsc.sparkContext().hadoopConfiguration().set("spark.streaming.fileStream.minRememberDuration","12000");

the value of minRememberDuration must be the 'batch duration' 2 times or biger

you can read this article: http://www.w2bc.com/Article/81149

Don't have an account?
Coming from Hortonworks? Activate your account here