Skip to content

multiprocess

optimus_dl.core.multiprocess

finalize_process(timeout_seconds=5, max_retries=10)

Finalizes the process by ensuring all child processes are terminated to prevent hanging. - First attempts a polite termination of child processes. - If any child processes are still alive after the timeout, it forcefully kills them. - Schedules a watchdog to force exit the process after a delay as a safety net.

Parameters:

Name Type Description Default
timeout_seconds int

Time to wait for child processes to terminate before force killing.

5
max_retries

Maximum number of retries to check for alive child processes.

10
Source code in optimus_dl/core/multiprocess.py
def finalize_process(timeout_seconds: int = 5, max_retries=10) -> None:
    """
    Finalizes the process by ensuring all child processes are terminated to prevent hanging.
     - First attempts a polite termination of child processes.
     - If any child processes are still alive after the timeout, it forcefully kills them.
     - Schedules a watchdog to force exit the process after a delay as a safety net.

    Args:
        timeout_seconds: Time to wait for child processes to terminate before force killing.
        max_retries: Maximum number of retries to check for alive child processes.
    """
    force_terminate_joblib()
    finish_wandb()
    finish_mlflow()

    children = ["dummy"]
    retries = 0

    while len(children) > 0 and retries < max_retries:
        retries += 1
        gc.collect()
        try:
            current_process = psutil.Process(os.getpid())
            children = current_process.children(recursive=True)
            logger.info(
                f"Found {children} child processes to terminate (retry {retries}/{max_retries})"
            )
        except Exception:
            logger.warning(
                f"Failed to get child processes for PID {os.getpid()}", exc_info=True
            )
            break

        # Phase 1: Polite termination
        for child in children:
            try:
                child.terminate()
            except Exception:
                logger.warning(
                    f"Failed to terminate child process: {child.pid}", exc_info=True
                )
                continue

        # Phase 2: Force kill if they ignore SIGTERM
        _, alive = psutil.wait_procs(children, timeout=timeout_seconds)
        for child in alive:
            try:
                logger.warning(f"Force killing unresponsive child process: {child.pid}")
                child.kill()
            except Exception:
                logger.warning(
                    f"Failed to force kill child process: {child.pid}", exc_info=True
                )
                continue

    _schedule_watchdog()

finish_mlflow()

Ensures that all MLflow processes are terminated to prevent hanging.

Source code in optimus_dl/core/multiprocess.py
def finish_mlflow():
    """
    Ensures that all MLflow processes are terminated to prevent hanging.
    """
    try:
        import mlflow
    except ImportError:
        logger.warning("MLflow is not installed, cannot finish MLflow run.")
        return

    try:
        mlflow.end_run()
        logger.info("Finished MLflow run to ensure all processes are terminated.")
    except Exception as e:
        logger.warning(f"Failed to finish MLflow run: {e}", exc_info=True)

finish_wandb()

Ensures that all WandB processes are terminated to prevent hanging.

Source code in optimus_dl/core/multiprocess.py
def finish_wandb():
    """
    Ensures that all WandB processes are terminated to prevent hanging.
    """
    try:
        import wandb
    except ImportError:
        logger.warning("WandB is not installed, cannot finish WandB run.")
        return

    if wandb.run is not None:
        logger.info("Finishing WandB run to ensure all processes are terminated.")
        wandb.finish()

force_terminate_joblib()

Forcefully terminates Joblib workers without allowing them to respawn.

Source code in optimus_dl/core/multiprocess.py
def force_terminate_joblib():
    """
    Forcefully terminates Joblib workers without allowing them to respawn.
    """
    try:
        import joblib.externals.loky.reusable_executor as loky_reusable_executor
    except ImportError:
        logger.warning(
            "Joblib is not installed, cannot force terminate Joblib workers."
        )
        return

    executor = loky_reusable_executor._executor
    if executor is not None:
        logger.info("Shutting down Joblib reusable executor to prevent worker respawn.")
        logger.warning(
            "Lokky workers may not terminated completely. This may lead to the process hanging if they do not exit on their own. It is recomended to not use joblib."
        )
        executor.shutdown(wait=False, kill_workers=True)