Spark RDD checkpoint on S3 exits with exception intermittently


- Run a spark job and save RDD checkpoint to S3.
- Spark job failed intermittently with below error:

org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: xxx, num of partitions: 6]; Checkpoint RDD [ID: xxx, num of partitions: 5].


- AWS EMR Spark
- AWS S3


- Save RDD checkpoint to consistent storage like HDFS.
- Or, enable EMRFS Consistent View during creation of EMR.

Root Cause

- Normally, we could save RDD checkpoint to a persistent storage, like below [2]:

import glob
import os
from urllib.parse import urlparse

ch_dir = os.path.join(urlparse(sc._jsc.getCheckpointDir().orElse("")).path, "*")

rdd = sc.range(1000, 10)
plus_one = x: x + 1)
plus_one.checkpoint()  # No checkpoint dir here yet

[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## []
## False

# After count is executed you'll see rdd specific checkpoint dir
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## ['rdd-1']
## True

- When saving an RDD checkpoint, Spark will save the partitions to files like part-xxxx in the checkpoint directory. Right after the files are saved, Spark will read back the RDD checkpoint file to return a new RDD object.

   * Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD.

  def writeRDDToCheckpointDirectory[T: ClassTag](
      originalRDD: RDD[T],
      checkpointDir: String,
      blockSize: Int = -1): ReliableCheckpointRDD[T] = {
    val checkpointStartTimeNs = System.nanoTime()

    val sc = originalRDD.sparkContext

    // Create the output path for the checkpoint
    val checkpointDirPath = new Path(checkpointDir)
    val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
    if (!fs.mkdirs(checkpointDirPath)) {
      throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")

    // Save to file, and reload it as an RDD
    val broadcastedConf = sc.broadcast(
      new SerializableConfiguration(sc.hadoopConfiguration))
    // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
      writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)  //<<<----- Writing checkpoint file

    if (originalRDD.partitioner.nonEmpty) {
      writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)

    val checkpointDurationMs =
      TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
    logInfo(s"Checkpointing took $checkpointDurationMs ms.")

    val newRDD = new ReliableCheckpointRDD[T](   //<---- Here it will reload the checkpoint file
      sc, checkpointDirPath.toString, originalRDD.partitioner)  
    if (newRDD.partitions.length != originalRDD.partitions.length) {
      throw new SparkException(
        "Checkpoint RDD has a different number of partitions from original RDD. Original " +
          s"RDD [ID: ${}, num of partitions: ${originalRDD.partitions.length}]; " +
          s"Checkpoint RDD [ID: ${}, num of partitions: " +

- The problem is, S3 is "Eventually Consistent", which means it is possible that you could not see the files you just uploaded.

$ aws s3 cp somefile s3://feichashao-bucket/somefile
(after very short time, say 1 ms)
$ aws s3 ls s3://feichashao-bucket/somefile  <--- File does not exist.
(after some time, say 5 ms)
$ aws s3 ls s3://feichashao-bucket/somefile  <--- Can see the file.