Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Flink : Files written to HDFS are stuck in .pending when using flink api

avatar
Explorer

Hi ,

I am doing a poc in which I am trying to write some data on the HDFS using flink . Though I can see the files are getting written but they are stuck with a postfix ".pending" . Any help will be appreciated , also is there a way that only one file is written

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(123, CheckpointingMode.AT_LEAST_ONCE, true);

DataStream<String> text = env.readTextFile("D:/names.txt");

DataStream<String> parsed = text.map(new MapFunction<String, String>() { @Override public String map(String value) { return (value); } });

parsed.flatMap(new FlatMapFunction<String, String>() { public void flatMap(String value, Collector<String> out) throws Exception { for (String s : value.split(" ")) { out.collect(s); } } });

System.setProperty("HADOOP_USER_NAME", "hdfs");

RollingSink<String> sink = new RollingSink<String>("hdfs://MYMACHINE:8020/flink/test8");

sink.setBucketer(new NonRollingBucketer());

parsed.addSink(sink);

env.execute();

1 ACCEPTED SOLUTION

avatar
Explorer

Hi, unfinished buckets have the .pending extension. Once a bucket is closed (for example for time-bucketing, once the time is over), the file will be renamed. Since you are using the NonRollingBucketer, the files will never be closed. I would recommend you to use the DateTimeBucketer.

As a side note: I would recommend you to increase the checkpointing intervall a bit. 123 milliseconds is very frequent and the application doesn't look like being extremely latency critical. A value like 2000 milliseconds is probably more appropriate.

View solution in original post

2 REPLIES 2

avatar
Explorer

Hi, unfinished buckets have the .pending extension. Once a bucket is closed (for example for time-bucketing, once the time is over), the file will be renamed. Since you are using the NonRollingBucketer, the files will never be closed. I would recommend you to use the DateTimeBucketer.

As a side note: I would recommend you to increase the checkpointing intervall a bit. 123 milliseconds is very frequent and the application doesn't look like being extremely latency critical. A value like 2000 milliseconds is probably more appropriate.

avatar
Explorer

thanks Robert, it worked