Created 01-12-2017 04:06 PM
Hi,
I need to know if the current file in HDFS that Storm writes to is recognizable as 'in flight' file? For instance Flume marks the in flight files like <filename>.tmp (or something like that). How does Storm do this?
Maybe somebody knows just like that, I hope so I don't have to build a test setup myself now.
Edit: final goal is to have a batch oriented process take on only completed/closed files.
Created 01-25-2017 12:02 PM
With the help of the remarks by @Aaron Dossett I found a solution to this.
Knowing that Storm does not mark the hdfs file currently being written to, and the .addRotationAction not robust enough in extreme cases I turned to a low level solution.
HDFS can report the files on a path that are open for write:
hdfs fsck <storm_hdfs_state_output_path> -files -openforwrite
or alternatively you can just list only NON open files on a path:
hdfs fsck <storm_hdfs_state_output_path> -files
The output is quite verbose but you can use sed or awk to get closed/completed files from there.
(Java HDFS api has similar hooks, this is just for CLI level solution)
Created 01-12-2017 04:18 PM
HDFS spout assumes that files visible in the monitored directory are not actively being updated. Only after a file is completely written should it be made visible to the spout. Following are two approaches for ensuring this:
|
When the spout is actively consuming a file, it renames the file with an .inprogress
suffix. After consuming all contents in the file, the file is moved to a configurable done directory and the .inprogress
suffix is dropped. @Jasper
Created 01-12-2017 04:34 PM
Thanks Artem, but I actually meant the exact opposite; Storm writing to HDFS, not reading
Created 01-12-2017 05:39 PM
I can't find exact reference to it but seems to handle the finished file you can implement an easy solution by doing the following
Both the HDFS bolt and Trident State implementation allow you to register any number of RotationAction
s. What RotationAction
s do is provide a hook to allow you to perform some action right after a file is rotated. For example, moving a file to a different location or renaming it.
public class MoveFileAction implements RotationAction { private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class); private String destination; public MoveFileAction withDestination(String destDir){ destination = destDir; return this; }
If you are using Trident and sequence files you can do something like this:
HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions() .withFileNameFormat(fileNameFormat) .withSequenceFormat(new DefaultSequenceFormat("key", "data")) .withRotationPolicy(rotationPolicy) .withFsUrl("hdfs://localhost:54310") .addRotationAction(new MoveFileAction().withDestination("/dest2/"));
Created 01-12-2017 10:16 PM
This is right, but I am VERY wary about using `RotationAction`s because if a worker crashes and restarts the action will never be performed. For that reason I would never use a `RotationAction` in production.
Created 01-16-2017 10:05 PM
Hey Aaron, were you using the storm-core HDFSBolt or the trident api? The Trident one should guarantee the action in the face of failures and crashes. Can you elaborate a bit?
Created 01-12-2017 10:14 PM
Hi Jasper -- No, you can't tell just by the filename. When I had to solve that same problem I ended up using the HDFS API to test whether or not the file was open. This wasn't too painful since the application I wrote was already using the Java API.
Created 01-17-2017 06:31 AM
@Jasper right now HDFS bolt does not mark the current file as "in-progress". A reasonable solution is to use RotationAction to move the rotated files to a different directory, but its possible that if the worker crashes in the middle of a rotation, the file may not move to the destination (core and trident).
Created 01-17-2017 12:03 PM
@Arun Mahadevan @Aaron Dossett @Sriharsha Chintalapani
I am kind of confused right now. So let me rephrase what I got so far in my own words:
Whereas Trident can have strong exactly-once semantics for persisting stream aggregates and tuples making it to any HDFS file, the action of rotating the file itself is not protected by these same strong guarantees?
Or is the rotation protected by exactly-once but not the .addRotationAction attached to it?
It is just not clear in the documentation: https://github.com/apache/storm/tree/master/external/storm-hdfs#hdfs-bolt-support-for-trident-api
Suppose, the file rotation is exactly-once then it could work to have the syncpolicy set to the exact same size limit as the size-based rotation policy. That way the files will only be visible to HDFS clients (synced) when that size limit is met.
Created 01-17-2017 08:44 PM
@Jasper I think it's correct to say that actions added by .addRotationAction are AT MOST ONCE, even for Trident.
I'm not sure using a sync policy would work either, I believe those guarantee a minimum sync frequency but additional syncs are still possible (I could be wrong, I haven't looked at that code in a while).
Said another way, you want to treat the unit of work as an HDFS file, but in Storm itself the unit of work is a tuple. Thus it will be hard to meet your goal in a clean way.