Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Flink : Files written to HDFS are stuck in .pending when using flink api

avatar
New Member

Hi ,

I am doing a poc in which I am trying to write some data on the HDFS using flink . Though I can see the files are getting written but they are stuck with a postfix ".pending" . Any help will be appreciated , also is there a way that only one file is written

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(123, CheckpointingMode.AT_LEAST_ONCE, true);

DataStream<String> text = env.readTextFile("D:/names.txt");

DataStream<String> parsed = text.map(new MapFunction<String, String>() { @Override public String map(String value) { return (value); } });

parsed.flatMap(new FlatMapFunction<String, String>() { public void flatMap(String value, Collector<String> out) throws Exception { for (String s : value.split(" ")) { out.collect(s); } } });

System.setProperty("HADOOP_USER_NAME", "hdfs");

RollingSink<String> sink = new RollingSink<String>("hdfs://MYMACHINE:8020/flink/test8");

sink.setBucketer(new NonRollingBucketer());

parsed.addSink(sink);

env.execute();

1 ACCEPTED SOLUTION

avatar
New Member

Hi, unfinished buckets have the .pending extension. Once a bucket is closed (for example for time-bucketing, once the time is over), the file will be renamed. Since you are using the NonRollingBucketer, the files will never be closed. I would recommend you to use the DateTimeBucketer.

As a side note: I would recommend you to increase the checkpointing intervall a bit. 123 milliseconds is very frequent and the application doesn't look like being extremely latency critical. A value like 2000 milliseconds is probably more appropriate.

View solution in original post

2 REPLIES 2

avatar
New Member

Hi, unfinished buckets have the .pending extension. Once a bucket is closed (for example for time-bucketing, once the time is over), the file will be renamed. Since you are using the NonRollingBucketer, the files will never be closed. I would recommend you to use the DateTimeBucketer.

As a side note: I would recommend you to increase the checkpointing intervall a bit. 123 milliseconds is very frequent and the application doesn't look like being extremely latency critical. A value like 2000 milliseconds is probably more appropriate.

avatar
New Member

thanks Robert, it worked