Skip to content

data_builder

optimus_dl.recipe.train.builders.data_builder

Data builder mixin for building data pipelines.

DataBuilder

Builder class for constructing training and evaluation data pipelines.

Manages the creation of DataPipeline objects, ensuring correct distributed sharding and iterator behavior (e.g., infinite loop for training, resettable for evaluation).

Parameters:

Name Type Description Default
cfg DataBuilderConfig

Builder configuration.

required
data_config DataConfig

Configuration for datasets and transforms.

required
Source code in optimus_dl/recipe/train/builders/data_builder.py
class DataBuilder:
    """Builder class for constructing training and evaluation data pipelines.

    Manages the creation of `DataPipeline` objects, ensuring correct distributed
    sharding and iterator behavior (e.g., infinite loop for training, resettable
    for evaluation).

    Args:
        cfg: Builder configuration.
        data_config: Configuration for datasets and transforms.
    """

    def __init__(
        self, cfg: DataBuilderConfig, data_config: DataConfig, data_seed: int, **kwargs
    ):
        self.data_config = data_config
        self.data_seed = data_seed

    @staticmethod
    def _get_rank_seed(seed: int, rank: int, world_size: int) -> int:
        """
        Generate a unique seed for each rank based on the base seed.
        """
        rng = torch.Generator()
        rng.manual_seed(seed + world_size * 10000 + rank)
        return torch.randint(0, 2**32, (1,), generator=rng).item()

    def build_train_data(self, collective: Collective, **kwargs) -> DataPipeline | None:
        """Build the training data pipeline.

        Automatically injects rank and world_size for sharding. The resulting
        loader is configured to restart automatically on StopIteration, creating
        an infinite stream.

        Args:
            collective: Distributed collective for sharding info.
            **kwargs: Additional arguments passed to dataset builders.

        Returns:
            A DataPipeline containing the dataset and loader.
        """
        kwargs["rank"] = collective.dp_rank
        kwargs["world_size"] = collective.dp_world_size
        kwargs["seed"] = self._get_rank_seed(
            self.data_seed, collective.dp_rank, collective.dp_world_size
        )
        train_data = build_data_pipeline(
            self.data_config.train_datasets, profile_name="train", **kwargs
        )
        if train_data is None:
            return None
        dataloader = torchdata.nodes.Loader(
            root=train_data.dataloader,
            restart_on_stop_iteration=True,
        )
        return DataPipeline(
            datasets=train_data.datasets,
            dataloader=dataloader,
        )

    def build_eval_data(
        self, collective: Collective, **kwargs: Any
    ) -> dict[str, EvalDataPipeline | None]:
        """Build evaluation data pipelines.

        Constructs a dictionary of pipelines for multiple evaluation datasets.
        Uses `LoaderIterResettable` to allow repeated iteration over the same
        validation sets.

        Args:
            collective: Distributed collective.
            **kwargs: Additional arguments.

        Returns:
            Dictionary mapping dataset names to DataPipelines.
        """
        kwargs["rank"] = collective.dp_rank
        kwargs["world_size"] = collective.dp_world_size
        kwargs["seed"] = self._get_rank_seed(
            self.data_seed, collective.dp_rank, collective.dp_world_size
        )
        eval_data = build_data_pipeline_dict(
            self.data_config.eval_datasets, profile_name="eval", **kwargs
        )
        eval_data = {
            k: (
                EvalDataPipeline(
                    datasets=v.datasets,
                    dataloader=LoaderIterResettable(
                        root=v.dataloader,
                        restart_on_stop_iteration=False,
                    ),
                    eval_freq=v.eval_freq,
                    eval_iterations=v.eval_iterations,
                )
                if v is not None
                else None
            )
            for k, v in eval_data.items()
        }
        return eval_data

build_eval_data(collective, **kwargs)

Build evaluation data pipelines.

Constructs a dictionary of pipelines for multiple evaluation datasets. Uses LoaderIterResettable to allow repeated iteration over the same validation sets.

Parameters:

Name Type Description Default
collective Collective

Distributed collective.

required
**kwargs Any

Additional arguments.

{}

Returns:

Type Description
dict[str, EvalDataPipeline | None]

Dictionary mapping dataset names to DataPipelines.

Source code in optimus_dl/recipe/train/builders/data_builder.py
def build_eval_data(
    self, collective: Collective, **kwargs: Any
) -> dict[str, EvalDataPipeline | None]:
    """Build evaluation data pipelines.

    Constructs a dictionary of pipelines for multiple evaluation datasets.
    Uses `LoaderIterResettable` to allow repeated iteration over the same
    validation sets.

    Args:
        collective: Distributed collective.
        **kwargs: Additional arguments.

    Returns:
        Dictionary mapping dataset names to DataPipelines.
    """
    kwargs["rank"] = collective.dp_rank
    kwargs["world_size"] = collective.dp_world_size
    kwargs["seed"] = self._get_rank_seed(
        self.data_seed, collective.dp_rank, collective.dp_world_size
    )
    eval_data = build_data_pipeline_dict(
        self.data_config.eval_datasets, profile_name="eval", **kwargs
    )
    eval_data = {
        k: (
            EvalDataPipeline(
                datasets=v.datasets,
                dataloader=LoaderIterResettable(
                    root=v.dataloader,
                    restart_on_stop_iteration=False,
                ),
                eval_freq=v.eval_freq,
                eval_iterations=v.eval_iterations,
            )
            if v is not None
            else None
        )
        for k, v in eval_data.items()
    }
    return eval_data

build_train_data(collective, **kwargs)

Build the training data pipeline.

Automatically injects rank and world_size for sharding. The resulting loader is configured to restart automatically on StopIteration, creating an infinite stream.

Parameters:

Name Type Description Default
collective Collective

Distributed collective for sharding info.

required
**kwargs

Additional arguments passed to dataset builders.

{}

Returns:

Type Description
DataPipeline | None

A DataPipeline containing the dataset and loader.

Source code in optimus_dl/recipe/train/builders/data_builder.py
def build_train_data(self, collective: Collective, **kwargs) -> DataPipeline | None:
    """Build the training data pipeline.

    Automatically injects rank and world_size for sharding. The resulting
    loader is configured to restart automatically on StopIteration, creating
    an infinite stream.

    Args:
        collective: Distributed collective for sharding info.
        **kwargs: Additional arguments passed to dataset builders.

    Returns:
        A DataPipeline containing the dataset and loader.
    """
    kwargs["rank"] = collective.dp_rank
    kwargs["world_size"] = collective.dp_world_size
    kwargs["seed"] = self._get_rank_seed(
        self.data_seed, collective.dp_rank, collective.dp_world_size
    )
    train_data = build_data_pipeline(
        self.data_config.train_datasets, profile_name="train", **kwargs
    )
    if train_data is None:
        return None
    dataloader = torchdata.nodes.Loader(
        root=train_data.dataloader,
        restart_on_stop_iteration=True,
    )
    return DataPipeline(
        datasets=train_data.datasets,
        dataloader=dataloader,
    )

DataBuilderConfig dataclass

Bases: RegistryConfig

Configuration for DataBuilder.

Source code in optimus_dl/recipe/train/builders/data_builder.py
@dataclass
class DataBuilderConfig(RegistryConfig):
    """Configuration for DataBuilder."""

    pass

LoaderIterResettable

Bases: Loader

A Loader that automatically resets its iterator on __iter__.

This is essential for evaluation loops where the dataloader is re-used multiple times.

Source code in optimus_dl/recipe/train/builders/data_builder.py
class LoaderIterResettable(torchdata.nodes.Loader):
    """A Loader that automatically resets its iterator on `__iter__`.

    This is essential for evaluation loops where the dataloader is re-used
    multiple times.
    """

    def __init__(self, root, restart_on_stop_iteration: bool = True):
        super().__init__(root=root, restart_on_stop_iteration=restart_on_stop_iteration)

    def __iter__(self):
        """Reset the iterator state and return a new iterator."""
        iter = super().__iter__()
        iter.reset()
        return iter

__iter__()

Reset the iterator state and return a new iterator.

Source code in optimus_dl/recipe/train/builders/data_builder.py
def __iter__(self):
    """Reset the iterator state and return a new iterator."""
    iter = super().__iter__()
    iter.reset()
    return iter