Skip to content

Index

optimus_dl.recipe.train.builders

CriterionBuilder

Builder class responsible for creating the loss criterion instance.

Uses the criterion registry to instantiate the specified loss function (e.g., CrossEntropy) based on the training configuration.

Parameters:

Name Type Description Default
cfg CriterionBuilderConfig

Builder configuration.

required
criterion_config CriterionConfig

Configuration for the criterion itself.

required
Source code in optimus_dl/recipe/train/builders/criterion_builder.py
class CriterionBuilder:
    """Builder class responsible for creating the loss criterion instance.

    Uses the `criterion` registry to instantiate the specified loss function
    (e.g., CrossEntropy) based on the training configuration.

    Args:
        cfg: Builder configuration.
        criterion_config: Configuration for the criterion itself.
    """

    def __init__(
        self, cfg: CriterionBuilderConfig, criterion_config: CriterionConfig, **kwargs
    ):
        self.criterion_config = criterion_config

    def build_criterion(self, **kwargs) -> BaseCriterion:
        """Instantiate and return the configured loss criterion."""
        criterion = build("criterion", self.criterion_config, **kwargs)
        assert isinstance(criterion, BaseCriterion)
        logger.info(f"Criterion \n{criterion}")
        return criterion

build_criterion(**kwargs)

Instantiate and return the configured loss criterion.

Source code in optimus_dl/recipe/train/builders/criterion_builder.py
def build_criterion(self, **kwargs) -> BaseCriterion:
    """Instantiate and return the configured loss criterion."""
    criterion = build("criterion", self.criterion_config, **kwargs)
    assert isinstance(criterion, BaseCriterion)
    logger.info(f"Criterion \n{criterion}")
    return criterion

DataBuilder

Builder class for constructing training and evaluation data pipelines.

Manages the creation of DataPipeline objects, ensuring correct distributed sharding and iterator behavior (e.g., infinite loop for training, resettable for evaluation).

Parameters:

Name Type Description Default
cfg DataBuilderConfig

Builder configuration.

required
data_config DataConfig

Configuration for datasets and transforms.

required
Source code in optimus_dl/recipe/train/builders/data_builder.py
class DataBuilder:
    """Builder class for constructing training and evaluation data pipelines.

    Manages the creation of `DataPipeline` objects, ensuring correct distributed
    sharding and iterator behavior (e.g., infinite loop for training, resettable
    for evaluation).

    Args:
        cfg: Builder configuration.
        data_config: Configuration for datasets and transforms.
    """

    def __init__(
        self, cfg: DataBuilderConfig, data_config: DataConfig, data_seed: int, **kwargs
    ):
        self.data_config = data_config
        self.data_seed = data_seed

    @staticmethod
    def _get_rank_seed(seed: int, rank: int, world_size: int) -> int:
        """
        Generate a unique seed for each rank based on the base seed.
        """
        rng = torch.Generator()
        rng.manual_seed(seed + world_size * 10000 + rank)
        return torch.randint(0, 2**32, (1,), generator=rng).item()

    def build_train_data(self, collective: Collective, **kwargs) -> DataPipeline | None:
        """Build the training data pipeline.

        Automatically injects rank and world_size for sharding. The resulting
        loader is configured to restart automatically on StopIteration, creating
        an infinite stream.

        Args:
            collective: Distributed collective for sharding info.
            **kwargs: Additional arguments passed to dataset builders.

        Returns:
            A DataPipeline containing the dataset and loader.
        """
        kwargs["rank"] = collective.dp_rank
        kwargs["world_size"] = collective.dp_world_size
        kwargs["seed"] = self._get_rank_seed(
            self.data_seed, collective.dp_rank, collective.dp_world_size
        )
        train_data = build_data_pipeline(
            self.data_config.train_datasets, profile_name="train", **kwargs
        )
        if train_data is None:
            return None
        dataloader = torchdata.nodes.Loader(
            root=train_data.dataloader,
            restart_on_stop_iteration=True,
        )
        return DataPipeline(
            datasets=train_data.datasets,
            dataloader=dataloader,
        )

    def build_eval_data(
        self, collective: Collective, **kwargs: Any
    ) -> dict[str, EvalDataPipeline | None]:
        """Build evaluation data pipelines.

        Constructs a dictionary of pipelines for multiple evaluation datasets.
        Uses `LoaderIterResettable` to allow repeated iteration over the same
        validation sets.

        Args:
            collective: Distributed collective.
            **kwargs: Additional arguments.

        Returns:
            Dictionary mapping dataset names to DataPipelines.
        """
        kwargs["rank"] = collective.dp_rank
        kwargs["world_size"] = collective.dp_world_size
        kwargs["seed"] = self._get_rank_seed(
            self.data_seed, collective.dp_rank, collective.dp_world_size
        )
        eval_data = build_data_pipeline_dict(
            self.data_config.eval_datasets, profile_name="eval", **kwargs
        )
        eval_data = {
            k: (
                EvalDataPipeline(
                    datasets=v.datasets,
                    dataloader=LoaderIterResettable(
                        root=v.dataloader,
                        restart_on_stop_iteration=False,
                    ),
                    eval_freq=v.eval_freq,
                    eval_iterations=v.eval_iterations,
                )
                if v is not None
                else None
            )
            for k, v in eval_data.items()
        }
        return eval_data

build_eval_data(collective, **kwargs)

Build evaluation data pipelines.

Constructs a dictionary of pipelines for multiple evaluation datasets. Uses LoaderIterResettable to allow repeated iteration over the same validation sets.

Parameters:

Name Type Description Default
collective Collective

Distributed collective.

required
**kwargs Any

Additional arguments.

{}

Returns:

Type Description
dict[str, EvalDataPipeline | None]

Dictionary mapping dataset names to DataPipelines.

Source code in optimus_dl/recipe/train/builders/data_builder.py
def build_eval_data(
    self, collective: Collective, **kwargs: Any
) -> dict[str, EvalDataPipeline | None]:
    """Build evaluation data pipelines.

    Constructs a dictionary of pipelines for multiple evaluation datasets.
    Uses `LoaderIterResettable` to allow repeated iteration over the same
    validation sets.

    Args:
        collective: Distributed collective.
        **kwargs: Additional arguments.

    Returns:
        Dictionary mapping dataset names to DataPipelines.
    """
    kwargs["rank"] = collective.dp_rank
    kwargs["world_size"] = collective.dp_world_size
    kwargs["seed"] = self._get_rank_seed(
        self.data_seed, collective.dp_rank, collective.dp_world_size
    )
    eval_data = build_data_pipeline_dict(
        self.data_config.eval_datasets, profile_name="eval", **kwargs
    )
    eval_data = {
        k: (
            EvalDataPipeline(
                datasets=v.datasets,
                dataloader=LoaderIterResettable(
                    root=v.dataloader,
                    restart_on_stop_iteration=False,
                ),
                eval_freq=v.eval_freq,
                eval_iterations=v.eval_iterations,
            )
            if v is not None
            else None
        )
        for k, v in eval_data.items()
    }
    return eval_data

build_train_data(collective, **kwargs)

Build the training data pipeline.

Automatically injects rank and world_size for sharding. The resulting loader is configured to restart automatically on StopIteration, creating an infinite stream.

Parameters:

Name Type Description Default
collective Collective

Distributed collective for sharding info.

required
**kwargs

Additional arguments passed to dataset builders.

{}

Returns:

Type Description
DataPipeline | None

A DataPipeline containing the dataset and loader.

Source code in optimus_dl/recipe/train/builders/data_builder.py
def build_train_data(self, collective: Collective, **kwargs) -> DataPipeline | None:
    """Build the training data pipeline.

    Automatically injects rank and world_size for sharding. The resulting
    loader is configured to restart automatically on StopIteration, creating
    an infinite stream.

    Args:
        collective: Distributed collective for sharding info.
        **kwargs: Additional arguments passed to dataset builders.

    Returns:
        A DataPipeline containing the dataset and loader.
    """
    kwargs["rank"] = collective.dp_rank
    kwargs["world_size"] = collective.dp_world_size
    kwargs["seed"] = self._get_rank_seed(
        self.data_seed, collective.dp_rank, collective.dp_world_size
    )
    train_data = build_data_pipeline(
        self.data_config.train_datasets, profile_name="train", **kwargs
    )
    if train_data is None:
        return None
    dataloader = torchdata.nodes.Loader(
        root=train_data.dataloader,
        restart_on_stop_iteration=True,
    )
    return DataPipeline(
        datasets=train_data.datasets,
        dataloader=dataloader,
    )

ModelBuilder

Mixin for building models and applying transformations.

Encapsulates the logic for: 1. Instantiating a BaseModel from a configuration object. 2. Sequentially applying a list of ModelTransforms (e.g., FSDP, DDP, compile). 3. Logging model statistics (parameter count).

Parameters:

Name Type Description Default
cfg ModelBuilderConfig

Builder configuration.

required
model_transforms list[ModelTransformConfig] | None

List of configurations for transforms to apply.

None
Source code in optimus_dl/recipe/mixins/model_builder.py
class ModelBuilder:
    """Mixin for building models and applying transformations.

    Encapsulates the logic for:
    1.  Instantiating a `BaseModel` from a configuration object.
    2.  Sequentially applying a list of `ModelTransforms` (e.g., FSDP, DDP, compile).
    3.  Logging model statistics (parameter count).

    Args:
        cfg: Builder configuration.
        model_transforms: List of configurations for transforms to apply.
    """

    def __init__(
        self,
        cfg: ModelBuilderConfig,
        model_transforms: list[ModelTransformConfig] | None = None,
        **kwargs: Any,
    ):
        self.model_transforms = model_transforms or []

    def build_model(
        self, model_config: ModelConfig | None, collective: Collective, **kwargs
    ) -> BaseModel:
        """Build the model and apply all configured transforms.

        Args:
            model_config: Configuration for the model architecture.
            collective: Distributed collective for transforms that need it.
            **kwargs: Additional arguments passed to model constructor and transforms.

        Returns:
            The fully constructed and transformed model.
        """
        if model_config is None:
            raise ValueError(
                "model_config is None. Use build_model_from_checkpoint for evaluation."
            )

        model = build_model(model_config, **kwargs)
        num_param_before = get_num_parameters(model)
        logger.info(f"Params num (before model transforms): {num_param_before:,}")
        log_averaged("model/num_params_before_transforms", num_param_before)
        assert isinstance(model, BaseModel)

        # Apply model transforms (including distributed setup)
        model = self._apply_model_transforms(
            model, collective=collective, device=collective.default_device, **kwargs
        )
        num_param_after = get_num_parameters(model)
        logger.info(f"Model \n{model}")
        logger.info(f"Params num (after model transforms): {num_param_after:,}")
        log_averaged("model/num_params_after_transforms", num_param_after)

        return model

    def _apply_model_transforms(self, model: BaseModel, **kwargs) -> BaseModel:
        """Iteratively apply all configured model transforms.

        Args:
            model: The base model.
            **kwargs: Context arguments (device, collective, etc.).

        Returns:
            The transformed model.
        """
        for transform_cfg in self.model_transforms:
            try:
                transform = build_model_transform(transform_cfg, **kwargs)
                if transform is not None:
                    logger.info(f"Applying model transform: {transform}")
                    model = transform.apply(model, **kwargs)
                else:
                    logger.warning(
                        f"Failed to build model transform from config: {transform_cfg}"
                    )
            except Exception as e:
                logger.error(f"Failed to apply model transform {transform_cfg}: {e}")
                raise

        return model

build_model(model_config, collective, **kwargs)

Build the model and apply all configured transforms.

Parameters:

Name Type Description Default
model_config ModelConfig | None

Configuration for the model architecture.

required
collective Collective

Distributed collective for transforms that need it.

required
**kwargs

Additional arguments passed to model constructor and transforms.

{}

Returns:

Type Description
BaseModel

The fully constructed and transformed model.

Source code in optimus_dl/recipe/mixins/model_builder.py
def build_model(
    self, model_config: ModelConfig | None, collective: Collective, **kwargs
) -> BaseModel:
    """Build the model and apply all configured transforms.

    Args:
        model_config: Configuration for the model architecture.
        collective: Distributed collective for transforms that need it.
        **kwargs: Additional arguments passed to model constructor and transforms.

    Returns:
        The fully constructed and transformed model.
    """
    if model_config is None:
        raise ValueError(
            "model_config is None. Use build_model_from_checkpoint for evaluation."
        )

    model = build_model(model_config, **kwargs)
    num_param_before = get_num_parameters(model)
    logger.info(f"Params num (before model transforms): {num_param_before:,}")
    log_averaged("model/num_params_before_transforms", num_param_before)
    assert isinstance(model, BaseModel)

    # Apply model transforms (including distributed setup)
    model = self._apply_model_transforms(
        model, collective=collective, device=collective.default_device, **kwargs
    )
    num_param_after = get_num_parameters(model)
    logger.info(f"Model \n{model}")
    logger.info(f"Params num (after model transforms): {num_param_after:,}")
    log_averaged("model/num_params_after_transforms", num_param_after)

    return model

OptimizerBuilder

Builder class responsible for creating the optimizer.

Takes parameter groups from the model and instantiates the configured optimizer (e.g., AdamW). It also logs the total number of optimized parameters.

Parameters:

Name Type Description Default
cfg OptimizerBuilderConfig

Builder configuration.

required
optimization_config OptimizationConfig

Optimization settings including the optimizer config.

required
Source code in optimus_dl/recipe/train/builders/optimizer_builder.py
class OptimizerBuilder:
    """Builder class responsible for creating the optimizer.

    Takes parameter groups from the model and instantiates the configured
    optimizer (e.g., AdamW). It also logs the total number of optimized
    parameters.

    Args:
        cfg: Builder configuration.
        optimization_config: Optimization settings including the optimizer config.
    """

    def __init__(
        self,
        cfg: OptimizerBuilderConfig,
        optimization_config: OptimizationConfig,
        **kwargs: Any,
    ):
        self.optimization_config = optimization_config

    def build_optimizer(self, params, **kwargs) -> Optimizer:
        """Build and validate the optimizer.

        Args:
            params: Iterable of parameters or dicts defining parameter groups
                (typically from `model.make_parameter_groups()`).
            **kwargs: Additional arguments passed to the optimizer constructor.

        Returns:
            Instantiated Optimizer.
        """
        optimizer = build(
            "optimizer", self.optimization_config.optimizer, params=params, **kwargs
        )
        assert isinstance(optimizer, Optimizer)
        logger.info(f"Optimizer \n{optimizer}")
        optimized_params = []
        for param_group in optimizer.param_groups:
            optimized_params.append(
                sum([p.numel() for p in param_group["params"] if p.requires_grad])
            )
        optimized_params_num = sum(optimized_params)
        logger.info(
            f"Optimized {optimized_params_num:,} parameters. Per group: {[f'{i:,}' for i in optimized_params]}"
        )
        log_averaged("optimized_params", optimized_params_num)

        return optimizer

build_optimizer(params, **kwargs)

Build and validate the optimizer.

Parameters:

Name Type Description Default
params

Iterable of parameters or dicts defining parameter groups (typically from model.make_parameter_groups()).

required
**kwargs

Additional arguments passed to the optimizer constructor.

{}

Returns:

Type Description
Optimizer

Instantiated Optimizer.

Source code in optimus_dl/recipe/train/builders/optimizer_builder.py
def build_optimizer(self, params, **kwargs) -> Optimizer:
    """Build and validate the optimizer.

    Args:
        params: Iterable of parameters or dicts defining parameter groups
            (typically from `model.make_parameter_groups()`).
        **kwargs: Additional arguments passed to the optimizer constructor.

    Returns:
        Instantiated Optimizer.
    """
    optimizer = build(
        "optimizer", self.optimization_config.optimizer, params=params, **kwargs
    )
    assert isinstance(optimizer, Optimizer)
    logger.info(f"Optimizer \n{optimizer}")
    optimized_params = []
    for param_group in optimizer.param_groups:
        optimized_params.append(
            sum([p.numel() for p in param_group["params"] if p.requires_grad])
        )
    optimized_params_num = sum(optimized_params)
    logger.info(
        f"Optimized {optimized_params_num:,} parameters. Per group: {[f'{i:,}' for i in optimized_params]}"
    )
    log_averaged("optimized_params", optimized_params_num)

    return optimizer

SchedulerBuilder

Builder class responsible for creating the learning rate scheduler.

Instantiates a scheduler (e.g., CosineAnnealing, WSD) and associates it with the optimizer. It ensures the scheduler is aware of the total training iterations.

Parameters:

Name Type Description Default
cfg SchedulerBuilderConfig

Builder configuration.

required
lr_scheduler_config RegistryConfig | None

Configuration for the scheduler itself (can be None).

required
optimization_config OptimizationConfig

Optimization settings (needed for total iterations).

required
Source code in optimus_dl/recipe/train/builders/scheduler_builder.py
class SchedulerBuilder:
    """Builder class responsible for creating the learning rate scheduler.

    Instantiates a scheduler (e.g., CosineAnnealing, WSD) and associates it
    with the optimizer. It ensures the scheduler is aware of the total training
    iterations.

    Args:
        cfg: Builder configuration.
        lr_scheduler_config: Configuration for the scheduler itself (can be None).
        optimization_config: Optimization settings (needed for total iterations).
    """

    def __init__(
        self,
        cfg: SchedulerBuilderConfig,
        lr_scheduler_config: RegistryConfig | None,
        optimization_config: OptimizationConfig,
        **kwargs: Any,
    ):
        self.lr_scheduler_config = lr_scheduler_config
        self.optimization_config = optimization_config

    def build_lr_scheduler(self, optimizer: Optimizer, **kwargs):
        """Build and validate the learning rate scheduler.

        Args:
            optimizer: The optimizer to schedule.
            **kwargs: Additional arguments.

        Returns:
            Instantiated LR Scheduler or None if not configured.
        """
        if self.lr_scheduler_config is None:
            return None
        lr_scheduler = build(
            "lr_scheduler",
            cfg=self.lr_scheduler_config,
            optimizer=optimizer,
            iterations=self.optimization_config.iterations,
            **kwargs,
        )
        if lr_scheduler is not None:
            logger.info(f"LR Scheduler \n{lr_scheduler}")
        return lr_scheduler

build_lr_scheduler(optimizer, **kwargs)

Build and validate the learning rate scheduler.

Parameters:

Name Type Description Default
optimizer Optimizer

The optimizer to schedule.

required
**kwargs

Additional arguments.

{}

Returns:

Type Description

Instantiated LR Scheduler or None if not configured.

Source code in optimus_dl/recipe/train/builders/scheduler_builder.py
def build_lr_scheduler(self, optimizer: Optimizer, **kwargs):
    """Build and validate the learning rate scheduler.

    Args:
        optimizer: The optimizer to schedule.
        **kwargs: Additional arguments.

    Returns:
        Instantiated LR Scheduler or None if not configured.
    """
    if self.lr_scheduler_config is None:
        return None
    lr_scheduler = build(
        "lr_scheduler",
        cfg=self.lr_scheduler_config,
        optimizer=optimizer,
        iterations=self.optimization_config.iterations,
        **kwargs,
    )
    if lr_scheduler is not None:
        logger.info(f"LR Scheduler \n{lr_scheduler}")
    return lr_scheduler

Modules and Sub-packages