Spark RDD checkpoint on S3 exits with exception intermittently

Issue

- Run a spark job and save RDD checkpoint to S3.
- Spark job failed intermittently with below error:

org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: xxx, num of partitions: 6]; Checkpoint RDD [ID: xxx, num of partitions: 5].

Environment

- AWS EMR Spark
- AWS S3

Resolution

- Save RDD checkpoint to consistent storage like HDFS.
- Or, enable EMRFS Consistent View during creation of EMR.

Root Cause

- Normally, we could save RDD checkpoint to a persistent storage, like below [2]:

import glob
import os
from urllib.parse import urlparse

sc.setCheckpointDir("/tmp/checkpoints/")
ch_dir = os.path.join(urlparse(sc._jsc.getCheckpointDir().orElse("")).path, "*")

rdd = sc.range(1000, 10)
plus_one = rdd.map(lambda x: x + 1)
plus_one.cache()
plus_one.checkpoint()  # No checkpoint dir here yet

[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## []
plus_one.isCheckpointed()
## False

# After count is executed you'll see rdd specific checkpoint dir
plus_one.count()  
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## ['rdd-1']
plus_one.isCheckpointed()
## True

- When saving an RDD checkpoint, Spark will save the partitions to files like part-xxxx in the checkpoint directory. Right after the files are saved, Spark will read back the RDD checkpoint file to return a new RDD object.

  /**
   * Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD.
   */

  def writeRDDToCheckpointDirectory[T: ClassTag](
      originalRDD: RDD[T],
      checkpointDir: String,
      blockSize: Int = -1): ReliableCheckpointRDD[T] = {
    val checkpointStartTimeNs = System.nanoTime()

    val sc = originalRDD.sparkContext

    // Create the output path for the checkpoint
    val checkpointDirPath = new Path(checkpointDir)
    val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
    if (!fs.mkdirs(checkpointDirPath)) {
      throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
    }

    // Save to file, and reload it as an RDD
    val broadcastedConf = sc.broadcast(
      new SerializableConfiguration(sc.hadoopConfiguration))
    // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
    sc.runJob(originalRDD,
      writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)  //<<<----- Writing checkpoint file

    if (originalRDD.partitioner.nonEmpty) {
      writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
    }

    val checkpointDurationMs =
      TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
    logInfo(s"Checkpointing took $checkpointDurationMs ms.")

    val newRDD = new ReliableCheckpointRDD[T](   //<---- Here it will reload the checkpoint file
      sc, checkpointDirPath.toString, originalRDD.partitioner)  
    if (newRDD.partitions.length != originalRDD.partitions.length) {
      throw new SparkException(
        "Checkpoint RDD has a different number of partitions from original RDD. Original " +
          s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; " +
          s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " +
          s"${newRDD.partitions.length}].")
    }
    newRDD
}

- The problem is, S3 is "Eventually Consistent", which means it is possible that you could not see the files you just uploaded.

$ aws s3 cp somefile s3://feichashao-bucket/somefile
(after very short time, say 1 ms)
$ aws s3 ls s3://feichashao-bucket/somefile  <--- File does not exist.
(after some time, say 5 ms)
$ aws s3 ls s3://feichashao-bucket/somefile  <--- Can see the file.

[1] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html
[2] https://stackoverflow.com/questions/37777326/rdd-checkpoint-not-storing-any-data-in-checkpoint-directory