Support Questions

Find answers, ask questions, and share your expertise

Storm HDFS Bolt question (Trident api)

avatar
Super Collaborator

Hi,

I need to know if the current file in HDFS that Storm writes to is recognizable as 'in flight' file? For instance Flume marks the in flight files like <filename>.tmp (or something like that). How does Storm do this?

Maybe somebody knows just like that, I hope so I don't have to build a test setup myself now.

Edit: final goal is to have a batch oriented process take on only completed/closed files.

1 ACCEPTED SOLUTION

avatar
Super Collaborator

With the help of the remarks by @Aaron Dossett I found a solution to this.

Knowing that Storm does not mark the hdfs file currently being written to, and the .addRotationAction not robust enough in extreme cases I turned to a low level solution.

HDFS can report the files on a path that are open for write:

hdfs fsck <storm_hdfs_state_output_path> -files -openforwrite

or alternatively you can just list only NON open files on a path:

hdfs fsck <storm_hdfs_state_output_path> -files

The output is quite verbose but you can use sed or awk to get closed/completed files from there.

(Java HDFS api has similar hooks, this is just for CLI level solution)

View solution in original post

10 REPLIES 10

avatar
Master Mentor

HDFS spout assumes that files visible in the monitored directory are not actively being updated. Only after a file is completely written should it be made visible to the spout. Following are two approaches for ensuring this:

  • Write the file to another directory. When the write operation is finished, move the file to the monitored directory.
  • Create the file in the monitored directory with an '.ignore' suffix; HDFS spout ignores files with an '.ignore' suffix. When the write operation is finished, rename the file to omit the suffix.

When the spout is actively consuming a file, it renames the file with an .inprogress suffix. After consuming all contents in the file, the file is moved to a configurable done directory and the .inprogress suffix is dropped. @Jasper

http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.5.3/bk_storm-component-guide/content/storm-inges...

avatar
Super Collaborator

@Artem Ervits

Thanks Artem, but I actually meant the exact opposite; Storm writing to HDFS, not reading

avatar
Master Mentor

I can't find exact reference to it but seems to handle the finished file you can implement an easy solution by doing the following

Both the HDFS bolt and Trident State implementation allow you to register any number of RotationActions. What RotationActions do is provide a hook to allow you to perform some action right after a file is rotated. For example, moving a file to a different location or renaming it.

public class MoveFileAction implements RotationAction {
    private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class);

    private String destination;

    public MoveFileAction withDestination(String destDir){
        destination = destDir;
        return this;
    }

If you are using Trident and sequence files you can do something like this:

        HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
                .withFileNameFormat(fileNameFormat)
                .withSequenceFormat(new DefaultSequenceFormat("key", "data"))
                .withRotationPolicy(rotationPolicy)
                .withFsUrl("hdfs://localhost:54310")
                .addRotationAction(new MoveFileAction().withDestination("/dest2/"));

avatar
Rising Star

This is right, but I am VERY wary about using `RotationAction`s because if a worker crashes and restarts the action will never be performed. For that reason I would never use a `RotationAction` in production.

avatar
Super Collaborator

@Aaron Dossett

Hey Aaron, were you using the storm-core HDFSBolt or the trident api? The Trident one should guarantee the action in the face of failures and crashes. Can you elaborate a bit?

avatar
Rising Star

Hi Jasper -- No, you can't tell just by the filename. When I had to solve that same problem I ended up using the HDFS API to test whether or not the file was open. This wasn't too painful since the application I wrote was already using the Java API.

avatar
Cloudera Employee

@Jasper right now HDFS bolt does not mark the current file as "in-progress". A reasonable solution is to use RotationAction to move the rotated files to a different directory, but its possible that if the worker crashes in the middle of a rotation, the file may not move to the destination (core and trident).

avatar
Super Collaborator

@Arun Mahadevan @Aaron Dossett @Sriharsha Chintalapani

I am kind of confused right now. So let me rephrase what I got so far in my own words:

Whereas Trident can have strong exactly-once semantics for persisting stream aggregates and tuples making it to any HDFS file, the action of rotating the file itself is not protected by these same strong guarantees?

Or is the rotation protected by exactly-once but not the .addRotationAction attached to it?

It is just not clear in the documentation: https://github.com/apache/storm/tree/master/external/storm-hdfs#hdfs-bolt-support-for-trident-api

Suppose, the file rotation is exactly-once then it could work to have the syncpolicy set to the exact same size limit as the size-based rotation policy. That way the files will only be visible to HDFS clients (synced) when that size limit is met.

avatar
Rising Star

@Jasper I think it's correct to say that actions added by .addRotationAction are AT MOST ONCE, even for Trident.

I'm not sure using a sync policy would work either, I believe those guarantee a minimum sync frequency but additional syncs are still possible (I could be wrong, I haven't looked at that code in a while).

Said another way, you want to treat the unit of work as an HDFS file, but in Storm itself the unit of work is a tuple. Thus it will be hard to meet your goal in a clean way.