Spark RDD checkpoint on S3 exits with exception intermittently

Issue

– Run a spark job and save RDD checkpoint to S3.
– Spark job failed intermittently with below error:
[cc lang=”text”]
org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: xxx, num of partitions: 6]; Checkpoint RDD [ID: xxx, num of partitions: 5].
[/cc]

Environment

– AWS EMR Spark
– AWS S3

Resolution

– Save RDD checkpoint to consistent storage like HDFS.
– Or, enable EMRFS Consistent View during creation of EMR.

Root Cause

– Normally, we could save RDD checkpoint to a persistent storage, like below [2]:
[cc lang=”python”]
import glob
import os
from urllib.parse import urlparse

sc.setCheckpointDir(“/tmp/checkpoints/”)
ch_dir = os.path.join(urlparse(sc._jsc.getCheckpointDir().orElse(“”)).path, “*”)

rdd = sc.range(1000, 10)
plus_one = rdd.map(lambda x: x + 1)
plus_one.cache()
plus_one.checkpoint() # No checkpoint dir here yet

[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## []
plus_one.isCheckpointed()
## False

# After count is executed you’ll see rdd specific checkpoint dir
plus_one.count()
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## [‘rdd-1’]
plus_one.isCheckpointed()
## True
[/cc]

– When saving an RDD checkpoint, Spark will save the partitions to files like part-xxxx in the checkpoint directory. Right after the files are saved, Spark will read back the RDD checkpoint file to return a new RDD object.
[cc lang=”scala”]
/**
* Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD.
*/
def writeRDDToCheckpointDirectory[T: ClassTag](
originalRDD: RDD[T],
checkpointDir: String,
blockSize: Int = -1): ReliableCheckpointRDD[T] = {
val checkpointStartTimeNs = System.nanoTime()

val sc = originalRDD.sparkContext

// Create the output path for the checkpoint
val checkpointDirPath = new Path(checkpointDir)
val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
if (!fs.mkdirs(checkpointDirPath)) {
throw new SparkException(s”Failed to create checkpoint path $checkpointDirPath”)
}

// Save to file, and reload it as an RDD
val broadcastedConf = sc.broadcast(
new SerializableConfiguration(sc.hadoopConfiguration))
// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
sc.runJob(originalRDD,
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _) //<<<----- Writing checkpoint file if (originalRDD.partitioner.nonEmpty) { writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath) } val checkpointDurationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs) logInfo(s"Checkpointing took $checkpointDurationMs ms.") val newRDD = new ReliableCheckpointRDD[T]( //<---- Here it will reload the checkpoint file sc, checkpointDirPath.toString, originalRDD.partitioner) if (newRDD.partitions.length != originalRDD.partitions.length) { throw new SparkException( "Checkpoint RDD has a different number of partitions from original RDD. Original " + s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; " + s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " + s"${newRDD.partitions.length}].") } newRDD } [/cc] - The problem is, S3 is "Eventually Consistent", which means it is possible that you could not see the files you just uploaded. [cc lang="text"] $ aws s3 cp somefile s3://feichashao-bucket/somefile (after very short time, say 1 ms) $ aws s3 ls s3://feichashao-bucket/somefile <--- File does not exist. (after some time, say 5 ms) $ aws s3 ls s3://feichashao-bucket/somefile <--- Can see the file. [/cc] [1] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html [2] https://stackoverflow.com/questions/37777326/rdd-checkpoint-not-storing-any-data-in-checkpoint-directory