Skip to content

Commit

Permalink
RDD.doCheckpoint and checkpointing
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Dec 31, 2023
1 parent 21d8785 commit 073a6c2
Showing 1 changed file with 35 additions and 13 deletions.
48 changes: 35 additions & 13 deletions docs/rdd/RDD.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ getResourceProfile(): ResourceProfile

* `DAGScheduler` is requested for the [ShuffleDependencies and ResourceProfiles of an RDD](../scheduler/DAGScheduler.md#getShuffleDependenciesAndResourceProfiles)

## <span id="preferredLocations"> Preferred Locations (Placement Preferences of Partition)
## Preferred Locations (Placement Preferences of Partition) { #preferredLocations }

```scala
preferredLocations(
Expand All @@ -155,7 +155,7 @@ preferredLocations(

* `DAGScheduler` is requested for [preferred locations](../scheduler/DAGScheduler.md#getPreferredLocs)

## <span id="partitions"> Partitions
## Partitions

```scala
partitions: Array[Partition]
Expand Down Expand Up @@ -185,7 +185,7 @@ Otherwise, when this `RDD` is not checkpointed, `partitions` [getPartitions](#ge
* `SparkContext` is requested to [run a job](../SparkContext.md#runJob)
* _others_

## <span id="dependencies"> dependencies
## dependencies

```scala
dependencies: Seq[Dependency[_]]
Expand All @@ -205,12 +205,18 @@ Otherwise, when this `RDD` is not checkpointed, `dependencies` [getDependencies]
!!! note
`getDependencies` is an abstract method that custom `RDD`s are required to provide.

## <span id="checkpoint"> Reliable Checkpointing
## Reliable Checkpointing { #checkpoint }

```scala
checkpoint(): Unit
```

!!! note "Public API"
`checkpoint` is part of the public API.

??? warning "Procedure"
`checkpoint` is a procedure (returns `Unit`) so _what happens inside stays inside_ (paraphrasing the [former advertising slogan of Las Vegas, Nevada](https://idioms.thefreedictionary.com/what+happens+in+Vegas+stays+in+Vegas)).

`checkpoint` creates a new [ReliableRDDCheckpointData](ReliableRDDCheckpointData.md) (with this `RDD`) and saves it in [checkpointData](#checkpointData) registry.

`checkpoint` does nothing when the [checkpointData](#checkpointData) registry has already been defined.
Expand All @@ -221,24 +227,30 @@ checkpoint(): Unit
Checkpoint directory has not been set in the SparkContext
```

## <span id="checkpointData"> RDDCheckpointData
## RDDCheckpointData { #checkpointData }

```scala
checkpointData: Option[RDDCheckpointData[T]]
```

`RDD` defines `checkpointData` internal registry for a [RDDCheckpointData[T]](RDDCheckpointData.md) (of `T` type of this `RDD`).

The `checkpointData` registry is undefined (`None`) when `RDD` is [created](#creating-instance) and can be the following values:
The `checkpointData` registry is undefined (`None`) initially when this `RDD` is [created](#creating-instance) and can hold a value after the following `RDD` API operators:

* [ReliableRDDCheckpointData](ReliableRDDCheckpointData.md) in [checkpoint](#checkpoint)
* [LocalRDDCheckpointData](LocalRDDCheckpointData.md) in [localCheckpoint](#localCheckpoint)
RDD Operator | RDDCheckpointData
-------------|------------------
[RDD.checkpoint](#checkpoint) | [ReliableRDDCheckpointData](ReliableRDDCheckpointData.md)
[RDD.localCheckpoint](#localCheckpoint) | [LocalRDDCheckpointData](LocalRDDCheckpointData.md)

Used when:
`checkpointData` is used when:

* [isCheckpointedAndMaterialized](#isCheckpointedAndMaterialized)
* [isLocallyCheckpointed](#isLocallyCheckpointed)
* [isReliablyCheckpointed](#isReliablyCheckpointed)
* [getCheckpointFile](#getCheckpointFile)
* [doCheckpoint](#doCheckpoint)

### <span id="checkpointRDD"><span id="CheckpointRDD"> CheckpointRDD
### <span id="CheckpointRDD"> CheckpointRDD { #checkpointRDD }

```scala
checkpointRDD: Option[CheckpointRDD[T]]
Expand All @@ -265,20 +277,30 @@ doCheckpoint(): Unit

`doCheckpoint` is triggered when `Dataset.checkpoint` operator ([Spark SQL]({{ book.spark_sql }}/Dataset/#checkpoint)) is executed (with `eager` flag on) which will likely trigger one or more Spark jobs on the underlying RDD anyway.

`doCheckpoint` executes in [checkpoint](RDDOperationScope.md#withScope) scope.
??? warning "Procedure"
`doCheckpoint` is a procedure (returns `Unit`) so _what happens inside stays inside_ (paraphrasing the [former advertising slogan of Las Vegas, Nevada](https://idioms.thefreedictionary.com/what+happens+in+Vegas+stays+in+Vegas)).

??? note "Does nothing unless checkpointData is defined"
My understanding is that `doCheckpoint` does nothing (_noop_) unless the [RDDCheckpointData](#checkpointData) is defined.

`doCheckpoint` executes all the following in [checkpoint](RDDOperationScope.md#withScope) scope.

`doCheckpoint` turns the [doCheckpointCalled](#doCheckpointCalled) flag on (to prevent multiple executions).

`doCheckpoint` branches off based on whether a [RDDCheckpointData](#checkpointData) is defined or not:

1. With the `RDDCheckpointData` defined, `doCheckpoint` checks out the [checkpointAllMarkedAncestors](#checkpointAllMarkedAncestors) flag and if enabled, `doCheckpoint` requests the [Dependencies](#dependencies) for the [RDD](Dependency.md#rdd) that are in turn requested to [doCheckpoint](#doCheckpoint) themselves. Otherwise, `doCheckpoint` requests the [RDDCheckpointData](#checkpointData) to [checkpoint](RDDCheckpointData.md#checkpoint).

1. With the [RDDCheckpointData](#checkpointData) undefined, `doCheckpoint` requests the [Dependencies](#dependencies) for the [RDD](Dependency.md#rdd) that are in turn requested to [doCheckpoint](#doCheckpoint) themselves.
1. With the [RDDCheckpointData](#checkpointData) undefined, `doCheckpoint` requests the [Dependencies](#dependencies) (of this RDD) for their [RDD](Dependency.md#rdd)s that are in turn requested to [doCheckpoint](#doCheckpoint) themselves (recursively).

In other words, With the `RDDCheckpointData` defined, requesting [doCheckpointing](#doCheckpoint) of the [Dependencies](#dependencies) is guarded by [checkpointAllMarkedAncestors](#checkpointAllMarkedAncestors) flag.
!!! note
With the `RDDCheckpointData` defined, requesting [doCheckpoint](#doCheckpoint) of the [Dependencies](#dependencies) is guarded by [checkpointAllMarkedAncestors](#checkpointAllMarkedAncestors) flag.

`doCheckpoint` skips execution if [called earlier](#doCheckpointCalled).

!!! note "CheckpointRDD"
[CheckpointRDD](CheckpointRDD.md) is not checkpoint again (and does nothing when requested to do so).

---

`doCheckpoint` is used when:
Expand Down

0 comments on commit 073a6c2

Please sign in to comment.