Skip to content

checkpoint

optimus_dl.recipe.pretokenize.checkpoint

Manages saving and loading of data preparation checkpoints.

CheckpointManager

Handles the loading and saving of checkpoints to ensure atomicity.

Source code in optimus_dl/recipe/pretokenize/checkpoint.py
class CheckpointManager:
    """Handles the loading and saving of checkpoints to ensure atomicity."""

    def __init__(self, output_dir: Path):
        self.checkpoint_path = output_dir / "checkpoint.pkl"
        self.tmp_path = output_dir / "checkpoint.tmp"

    def save(self, state: CheckpointState):
        """Saves the current processing state to disk atomically.

        Args:
            state: The checkpoint state object to save.
        """
        with open(self.tmp_path, "wb") as f:
            pickle.dump(state, f)
        shutil.move(self.tmp_path, self.checkpoint_path)
        logger.debug(f"Saved checkpoint to {self.checkpoint_path}")

    def load(self) -> CheckpointState | None:
        """Loads the processing state from disk if a checkpoint exists.

        Returns:
            The loaded CheckpointState, or None if no valid checkpoint is found.
        """
        if self.checkpoint_path.exists():
            logger.info(f"Loading checkpoint from {self.checkpoint_path}")
            try:
                with open(self.checkpoint_path, "rb") as f:
                    state = pickle.load(f)
                if isinstance(state, CheckpointState):
                    return state
                logger.warning("Checkpoint file is invalid or outdated, ignoring.")
                return None
            except Exception as e:
                logger.warning(
                    f"Failed to load checkpoint: {e}. Starting from scratch."
                )
                return None
        return None

    def clean(self):
        """Removes the checkpoint file if it exists."""
        if self.checkpoint_path.exists():
            self.checkpoint_path.unlink()
            logger.debug("Removed checkpoint file.")

clean()

Removes the checkpoint file if it exists.

Source code in optimus_dl/recipe/pretokenize/checkpoint.py
def clean(self):
    """Removes the checkpoint file if it exists."""
    if self.checkpoint_path.exists():
        self.checkpoint_path.unlink()
        logger.debug("Removed checkpoint file.")

load()

Loads the processing state from disk if a checkpoint exists.

Returns:

Type Description
CheckpointState | None

The loaded CheckpointState, or None if no valid checkpoint is found.

Source code in optimus_dl/recipe/pretokenize/checkpoint.py
def load(self) -> CheckpointState | None:
    """Loads the processing state from disk if a checkpoint exists.

    Returns:
        The loaded CheckpointState, or None if no valid checkpoint is found.
    """
    if self.checkpoint_path.exists():
        logger.info(f"Loading checkpoint from {self.checkpoint_path}")
        try:
            with open(self.checkpoint_path, "rb") as f:
                state = pickle.load(f)
            if isinstance(state, CheckpointState):
                return state
            logger.warning("Checkpoint file is invalid or outdated, ignoring.")
            return None
        except Exception as e:
            logger.warning(
                f"Failed to load checkpoint: {e}. Starting from scratch."
            )
            return None
    return None

save(state)

Saves the current processing state to disk atomically.

Parameters:

Name Type Description Default
state CheckpointState

The checkpoint state object to save.

required
Source code in optimus_dl/recipe/pretokenize/checkpoint.py
def save(self, state: CheckpointState):
    """Saves the current processing state to disk atomically.

    Args:
        state: The checkpoint state object to save.
    """
    with open(self.tmp_path, "wb") as f:
        pickle.dump(state, f)
    shutil.move(self.tmp_path, self.checkpoint_path)
    logger.debug(f"Saved checkpoint to {self.checkpoint_path}")

CheckpointState dataclass

Represents the state to be saved in a checkpoint.

This provides a clear structure for what is being saved and loaded.

Attributes:

Name Type Description
rng_state dict[str, Any]

Random number generator state (from random.getstate()).

Parameters:

Name Type Description Default
processor_state dict[str, Any]
required
sharder_state dict[str, Any]
required
Source code in optimus_dl/recipe/pretokenize/checkpoint.py
@dataclass
class CheckpointState:
    """Represents the state to be saved in a checkpoint.

    This provides a clear structure for what is being saved and loaded.

    Attributes:
        processor_state: State dictionary from the TokenProcessor.
        sharder_state: State dictionary from the Sharder.
        rng_state: Random number generator state (from `random.getstate()`).
    """

    processor_state: dict[str, Any]
    sharder_state: dict[str, Any]