Community Articles

Find and share helpful community-sourced technical articles.
avatar
Rising Star

In the Cloudera Data Platform (CDP), there are two options for fault tolerance in Flink. One is a checkpoint, and the other is a savepoint [1]. Checkpoints are created automatically when enabled and are used for automatically restarting jobs in case of failure. While savepoints are manually triggered, they are always stored externally and are used for starting a "new" job with a previous internal state. Savepoints are what are used when you are performing steps like an upgrade.[2]

CHECKPOINT ERROR RECOVERY

Further, at checkpoints, there are two levels of error recovery in Flink jobs.
One is from the yarn side, and the other is from the Flink side. From the yarn side, we have yarn.maximum-failed-containers. This property will check from the Yarn Application master side whether 100 (default) containers have failed or not, and once 100 containers have failed, the Yarn application will fail, and it will not take into consideration any Flink job parameters.From the Flink side, if checkpointing is activated and the restart strategy has not been configured, the fixed-delay strategy is used with Integer.MAX_VALUE restart attempts. If you have checkpointing enabled, and maxNumberRestartAttempts=5. This restart strategy of Flink [3], should perform a restart of job up to 5 times. This is controlled by Flink and when 5 limit is reached it will fail the job.
[3] Fixed Delay Restart Strategy
 
Finally, to enable checkpoints and configure restart strategy on cluster level, customer can add the following properties to CM > Flink Service > Configuration > Flink Service Advanced Configuration Snippet (Safety Valve) for flink-conf/flink-conf.yaml:

  • execution.checkpointing.interval=<value>
  • restart-strategy=exponentialdelay or FixedDelayRestartBackoffTimeStrategy
  • config options for exponential delay strategy from Exponential Delay Restart Strategy
558 Views
0 Kudos