Issue
- Run a spark job and save RDD checkpoint to S3.
- Spark job failed intermittently with below error:
Environment
- AWS EMR Spark
- AWS S3
Resolution
- Save RDD checkpoint to consistent storage like HDFS.
- Or, enable EMRFS Consistent View during creation of EMR.
Root Cause
- Normally, we could save RDD checkpoint to a persistent storage, like below [2]:
import os
from urllib.parse import urlparse
sc.setCheckpointDir("/tmp/checkpoints/")
ch_dir = os.path.join(urlparse(sc._jsc.getCheckpointDir().orElse("")).path, "*")
rdd = sc.range(1000, 10)
plus_one = rdd.map(lambda x: x + 1)
plus_one.cache()
plus_one.checkpoint() # No checkpoint dir here yet
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## []
plus_one.isCheckpointed()
## False
# After count is executed you'll see rdd specific checkpoint dir
plus_one.count()
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## ['rdd-1']
plus_one.isCheckpointed()
## True
- When saving an RDD checkpoint, Spark will save the partitions to files like part-xxxx in the checkpoint directory. Right after the files are saved, Spark will read back the RDD checkpoint file to return a new RDD object.
* Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD.
*/
def writeRDDToCheckpointDirectory[T: ClassTag](
originalRDD: RDD[T],
checkpointDir: String,
blockSize: Int = -1): ReliableCheckpointRDD[T] = {
val checkpointStartTimeNs = System.nanoTime()
val sc = originalRDD.sparkContext
// Create the output path for the checkpoint
val checkpointDirPath = new Path(checkpointDir)
val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
if (!fs.mkdirs(checkpointDirPath)) {
throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
}
// Save to file, and reload it as an RDD
val broadcastedConf = sc.broadcast(
new SerializableConfiguration(sc.hadoopConfiguration))
// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
sc.runJob(originalRDD,
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _) //<<<----- Writing checkpoint file
if (originalRDD.partitioner.nonEmpty) {
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
}
val checkpointDurationMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
logInfo(s"Checkpointing took $checkpointDurationMs ms.")
val newRDD = new ReliableCheckpointRDD[T]( //<---- Here it will reload the checkpoint file
sc, checkpointDirPath.toString, originalRDD.partitioner)
if (newRDD.partitions.length != originalRDD.partitions.length) {
throw new SparkException(
"Checkpoint RDD has a different number of partitions from original RDD. Original " +
s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; " +
s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " +
s"${newRDD.partitions.length}].")
}
newRDD
}
- The problem is, S3 is "Eventually Consistent", which means it is possible that you could not see the files you just uploaded.
(after very short time, say 1 ms)
$ aws s3 ls s3://feichashao-bucket/somefile <--- File does not exist.
(after some time, say 5 ms)
$ aws s3 ls s3://feichashao-bucket/somefile <--- Can see the file.
[1] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html
[2] https://stackoverflow.com/questions/37777326/rdd-checkpoint-not-storing-any-data-in-checkpoint-directory