Skip to content

Data modules

fowt_ml.datasets

This module contains functions to load and preprocess datasets.

Functions:

  • convert_mat_to_df

    Reads a matlab file and returns a pandas DataFrame.

  • get_data

    Returns a dataframe for the given data_id.

  • check_data

    Checks if the dataframe has the required columns and their are valid.

  • fix_column_names

    Fixes the column names to remove special characters.

convert_mat_to_df

convert_mat_to_df(mat_file: str, data_id: str) -> DataFrame

Reads a matlab file and returns a pandas DataFrame.

Parameters:

  • mat_file (str) –

    Path to a matlab file.

  • data_id (str) –

    ID of the data in the matlab file.

Returns:

  • DataFrame

    pd.DataFrame: DataFrame containing the data.

Source code in src/fowt_ml/datasets.py
def convert_mat_to_df(mat_file: str, data_id: str) -> pd.DataFrame:
    """Reads a matlab file and returns a pandas DataFrame.

    Args:
        mat_file (str): Path to a matlab file.
        data_id (str): ID of the data in the matlab file.

    Returns:
        pd.DataFrame: DataFrame containing the data.
    """
    hdf = h5py.File(mat_file, mode="r")

    # validate the file
    if data_id not in hdf:
        raise ValueError(f"Experiment {data_id} not found in the file.")

    if "X" not in hdf[data_id] or "Y" not in hdf[data_id]:
        raise ValueError(f"Experiment {data_id} does not have X or Y data.")

    if "Data" not in hdf[data_id]["X"]:
        raise ValueError(f"Experiment {data_id} does not have X Data.")

    if "Name" not in hdf[data_id]["Y"] or "Data" not in hdf[data_id]["Y"]:
        raise ValueError(f"Experiment {data_id} does not have Y Name or Data.")

    data = {"time": np.array(hdf[data_id]["X"]["Data"][:]).flatten()}
    name_references = np.array(hdf[data_id]["Y"]["Name"][:]).flatten()
    data_references = np.array(hdf[data_id]["Y"]["Data"][:]).flatten()

    for index, (name_ref, data_ref) in enumerate(
        zip(name_references, data_references, strict=False)
    ):
        name = "".join([chr(item[0]) for item in hdf[name_ref]])
        if name in data:
            msg = (
                f"Duplicate name {name} found in the data."
                f" Renaming it to {name}_{index}."
            )
            logger.warning(msg)
            name = f"{name}_{index}"
        data[name] = np.array(hdf[data_ref]).flatten()
    return pd.DataFrame(data)

get_data

get_data(data_id: str, config: dict) -> DataFrame

Returns a dataframe for the given data_id.

Parameters:

  • data_id (str) –

    ID of the data in the configuration file.

  • config (dict) –

    Configuration dictionary. Example: {"data_id": {"path_file": "data.mat"}}.

Returns:

  • DataFrame

    pd.DataFrame: DataFrame for the given data_id.

Source code in src/fowt_ml/datasets.py
def get_data(data_id: str, config: dict) -> pd.DataFrame:
    """Returns a dataframe for the given data_id.

    Args:
        data_id (str): ID of the data in the configuration file.
        config (dict): Configuration dictionary.
            Example: {"data_id": {"path_file": "data.mat"}}.

    Returns:
        pd.DataFrame: DataFrame for the given data_id.

    """
    data_info = config[data_id]
    df = convert_mat_to_df(data_info["path_file"], data_id)

    # check if auxiliary data is present in the config file
    if "aux_data" in data_info:
        for key, val in data_info["aux_data"].items():
            if key not in df and val is not None:
                df[key] = val
                msg = (
                    f"{key} not found in the data file. "
                    f"But found in config file. "
                    f"Setting it to {val}."
                )
                logger.info(msg)
    return df

check_data

check_data(df: DataFrame, col_names) -> DataFrame

Checks if the dataframe has the required columns and their are valid.

Parameters:

  • df (DataFrame) –

    DataFrame to check.

  • col_names (list) –

    List of required columns.

Returns:

  • DataFrame

    pd.DataFrame: DataFrame with valid required columns.

Source code in src/fowt_ml/datasets.py
def check_data(df: pd.DataFrame, col_names) -> pd.DataFrame:
    """Checks if the dataframe has the required columns and their are valid.

    Args:
        df (pd.DataFrame): DataFrame to check.
        col_names (list): List of required columns.

    Returns:
        pd.DataFrame: DataFrame with valid required columns.

    """
    missing_columns = [col for col in col_names if col not in df.columns]
    if missing_columns:
        raise ValueError(f"Missing columns: {missing_columns}")

    # check if the columns have valid values
    for col in df.columns:
        if df[col].isnull().any():
            raise ValueError(f"Column {col} has NaN values.")
        if not np.issubdtype(df[col].dtype, np.number):
            raise ValueError(f"Column {col} is not numeric.")

    return df

fix_column_names

fix_column_names(df: DataFrame) -> DataFrame

Fixes the column names to remove special characters.

Parameters:

  • df (DataFrame) –

    DataFrame to fix.

Returns:

  • DataFrame

    pd.DataFrame: DataFrame with fixed column names.

Source code in src/fowt_ml/datasets.py
def fix_column_names(df: pd.DataFrame) -> pd.DataFrame:
    """Fixes the column names to remove special characters.

    Args:
        df (pd.DataFrame): DataFrame to fix.

    Returns:
        pd.DataFrame: DataFrame with fixed column names.

    """
    df.rename(
        columns=lambda col: (
            col.replace("[", "_").replace("]", "").replace("<", "_").replace(">", "")
        ),
        inplace=True,
    )
    return df

Config modules

fowt_ml.config

This module contains functions to read configuration files.

Classes:

Functions:

BaseConfig

Bases: BaseModel

Methods:

  • as_dict

    Return the config as a (nested) dict.

as_dict
as_dict(*, by_alias: bool = False) -> dict

Return the config as a (nested) dict.

Source code in src/fowt_ml/config.py
def as_dict(self, *, by_alias: bool = False) -> dict:
    """Return the config as a (nested) dict."""
    return self.model_dump(by_alias=by_alias)

MLConfig

Bases: BaseConfig

Methods:

validate_tts_kwargs classmethod
validate_tts_kwargs(v: dict[str, Any]) -> dict[str, Any]

Validate train_test_split kwargs.

Source code in src/fowt_ml/config.py
@pydantic.field_validator("train_test_split_kwargs")
@classmethod
def validate_tts_kwargs(cls, v: dict[str, Any]) -> dict[str, Any]:
    """Validate train_test_split kwargs."""
    allowed_tts_kwargs = get_allowed_kwargs(train_test_split)

    if invalid := set(v.keys()) - allowed_tts_kwargs:
        raise ValueError(
            f"Invalid train_test_split kwargs: {invalid}. "
            f"Allowed: {sorted(allowed_tts_kwargs)}"
        )
    return v
validate_cv_kwargs classmethod
validate_cv_kwargs(v: dict[str, Any]) -> dict[str, Any]

Validate cross_validate kwargs.

Source code in src/fowt_ml/config.py
@pydantic.field_validator("cross_validation_kwargs")
@classmethod
def validate_cv_kwargs(cls, v: dict[str, Any]) -> dict[str, Any]:
    """Validate cross_validate kwargs."""
    allowed_cv_kwargs = get_allowed_kwargs(cross_validate)
    if invalid := set(v.keys()) - allowed_cv_kwargs:
        raise ValueError(
            f"Invalid cross_validate kwargs: {invalid}. "
            f"Allowed: {sorted(allowed_cv_kwargs)}"
        )
    return v
validate_models classmethod
validate_models(v: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]

Validate model names and their kwargs.

Source code in src/fowt_ml/config.py
@pydantic.field_validator("model_names")
@classmethod
def validate_models(cls, v: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]:
    """Validate model names and their kwargs."""
    estimator_map = {
        name: est_cls
        for model_class in [
            LinearModels,
            EnsembleModel,
            SparseGaussianModel,
            NeuralNetwork,
            XGBoost,
        ]
        for name, est_cls in model_class.ESTIMATOR_NAMES.items()
    }

    for model_name, kwargs in v.items():
        if model_name not in estimator_map:
            raise ValueError(
                f"Model '{model_name}' not supported. "
                f"Available: {list(estimator_map.keys())}"
            )

        # Get the constructor signature for that model class
        model_class = estimator_map[model_name]
        allowed_kwargs = get_allowed_kwargs(model_class)
        if model_name in {"RNNRegressor", "LSTMRegressor", "GRURegressor"}:
            model_class = create_skorch_regressor
            allowed_kwargs = get_allowed_kwargs(model_class)
            model_class = skorch.net.NeuralNet
            allowed_kwargs = allowed_kwargs | get_allowed_kwargs(model_class)

        if invalid := set(kwargs.keys()) - allowed_kwargs:
            raise ValueError(
                f"Invalid kwargs for model '{model_name}': {invalid}. "
                f"Allowed: {allowed_kwargs}"
            )
    return v

Config

Bases: BaseConfig

Base class for configuration files.

Methods:

  • from_yaml

    Read configs from a config.yaml file.

  • to_yaml

    Write configs to a yaml config_file.

from_yaml classmethod
from_yaml(config_file)

Read configs from a config.yaml file.

If key is not found in config.yaml, the default value is used.

Source code in src/fowt_ml/config.py
@classmethod
def from_yaml(cls, config_file):
    """Read configs from a config.yaml file.

    If key is not found in config.yaml, the default value is used.
    """
    if not Path(config_file).exists():
        raise FileNotFoundError(f"Config file {config_file} not found.")

    with open(config_file) as f:
        try:
            cfg = yaml.safe_load(f)
        except yaml.YAMLError as exc:
            raise SyntaxError(f"Error parsing config file {config_file}.") from exc
    return cls(**cfg)
to_yaml classmethod
to_yaml(config_file)

Write configs to a yaml config_file.

Source code in src/fowt_ml/config.py
@classmethod
def to_yaml(cls, config_file):
    """Write configs to a yaml config_file."""
    if Path(config_file).exists():
        logger.warning(f"Overwriting config file {config_file}.")

    cfg = _schema(cls)
    with open(config_file, "w") as f:
        yaml.dump(cfg, f, sort_keys=False)

get_allowed_kwargs

get_allowed_kwargs(func_or_class)

Return valid keyword args for a function or class constructor.

Source code in src/fowt_ml/config.py
def get_allowed_kwargs(func_or_class):
    """Return valid keyword args for a function or class constructor."""
    if inspect.isclass(func_or_class):
        # Handle sklearn-style estimators (incl. XGBoost)
        if hasattr(func_or_class, "get_params"):
            try:
                return set(func_or_class().get_params().keys())
            except Exception:
                pass
        sig = inspect.signature(func_or_class.__init__)
    else:
        sig = inspect.signature(func_or_class)
    return set(sig.parameters.keys()) - {"self", "kwargs"}  # drop 'self' and 'kwargs'

get_config_file

get_config_file()

Get the config file path.

Source code in src/fowt_ml/config.py
def get_config_file():
    """Get the config file path."""
    config_path = Path.home() / ".config" / "fowt_ml"
    if os.environ.get("CONFIG_PATH"):
        return os.environ.get("CONFIG_PATH")
    elif os.path.exists(config_path):
        yml_files = list(Path.glob(config_path, "*.yml"))
        if len(yml_files) > 1:
            raise ValueError(
                f"Multiple config files found in {config_path}. Please specify one."
            )
        return config_path / yml_files[0]
    else:
        raise FileNotFoundError(
            f"Config file not found. Please specify one in {config_path}"
            " or as an environment variable `CONFIG_PATH`."
        )

ML pipelines

fowt_ml.pipeline

Classes:

Pipeline

Pipeline(config: str | Config)

Parameters:

  • config (str | Config) –

    Path to the configuration file or a Config object.

  • kwargs

    Additional keyword arguments to override the configuration file.

Returns:

  • None

    None

Methods:

  • get_data

    Returns the dataset for the given data_id.

  • train_test_split

    Splits the data into training and testing sets.

  • get_models

    Returns the models for the given model names.

  • setup

    Set up the machine learning experiment.

  • compare_models

    Compares the models and returns the best model.

Source code in src/fowt_ml/pipeline.py
def __init__(self, config: str | Config) -> None:
    """Initializes the machine learning pipeline.

    Args:
        config (str | Config): Path to the configuration file or a Config object.
        kwargs: Additional keyword arguments to override the configuration file.

    Returns:
        None
    """
    config = config if isinstance(config, Config) else Config.from_yaml(config)

    self.predictors_labels = config["ml_setup"]["predictors"]
    self.target_labels = config["ml_setup"]["targets"]
    self.model_names = config["ml_setup"]["model_names"]
    self.metric_names = config["ml_setup"]["metric_names"]
    self.train_test_split_kwargs = config["ml_setup"]["train_test_split_kwargs"]
    self.cross_validation_kwargs = config["ml_setup"]["cross_validation_kwargs"]
    self.scale_data = config["ml_setup"]["scale_data"]

    self.work_dir = Path(config["session_setup"]["work_dir"])

    self.data_config = config["data"]
    self.save_grid_scores = config["ml_setup"]["save_grid_scores"]
    self.save_best_model = config["ml_setup"]["save_best_model"]

    self.log_experiment = config["ml_setup"]["log_experiment"]
get_data
get_data(data_id: str) -> DataFrame

Returns the dataset for the given data_id.

Parameters:

  • data_id (str) –

    ID of the data in the configuration file.

Returns:

  • DataFrame

    pd.DataFrame: DataFrame for the given data_id, set in the

  • DataFrame

    configuration file.

Source code in src/fowt_ml/pipeline.py
def get_data(self, data_id: str) -> pd.DataFrame:
    """Returns the dataset for the given data_id.

    Args:
        data_id (str): ID of the data in the configuration file.

    Returns:
        pd.DataFrame: DataFrame for the given data_id, set in the
        configuration file.
    """
    return get_data(data_id, self.data_config)
train_test_split
train_test_split(**kwargs)

Splits the data into training and testing sets.

The data should be set in self.data before calling this method. kwargs are passed to sklearn.model_selection.train_test_split.

Source code in src/fowt_ml/pipeline.py
def train_test_split(self, **kwargs):
    """Splits the data into training and testing sets.

    The data should be set in self.data before calling this method.
    kwargs are passed to sklearn.model_selection.train_test_split.
    """
    if not hasattr(self, "X_data") or not hasattr(self, "Y_data"):
        raise ValueError("Data not found. Run setup before splitting.")

    return train_test_split(self.X_data, self.Y_data, **kwargs)
get_models
get_models()

Returns the models for the given model names.

Returns:

  • dict

    Dictionary of models.

Source code in src/fowt_ml/pipeline.py
def get_models(self):
    """Returns the models for the given model names.

    Returns:
        dict: Dictionary of models.
    """
    models = {}
    model_classes = [
        LinearModels,
        EnsembleModel,
        SparseGaussianModel,
        NeuralNetwork,
        XGBoost,
    ]
    for model_name, kwrags in self.model_names.items():
        for model_class in model_classes:
            if model_name in model_class.ESTIMATOR_NAMES:
                models[model_name] = model_class(model_name, **kwrags)
                break
        else:
            raise ValueError(f"Model {model_name} not supported.")
    return models
setup
setup(data: DataFrame | str) -> Any

Set up the machine learning experiment.

  • find the data
  • train test split
  • setup the models for comparison

Parameters:

  • data (DataFrame) –

    DataFrame containing the data.

Returns:

  • Any

    Experiment object or similar.

Source code in src/fowt_ml/pipeline.py
def setup(self, data: pd.DataFrame | str) -> Any:
    """Set up the machine learning experiment.

    - find the data
    - train test split
    - setup the models for comparison

    Args:
        data (pd.DataFrame): DataFrame containing the data.

    Returns:
        Experiment object or similar.

    """
    if isinstance(data, str):
        data = self.get_data(data)

    # check if the data has the required columns, and valid values
    data = check_data(data, self.predictors_labels + self.target_labels)

    self.X_data = data.loc[:, self.predictors_labels]
    self.Y_data = data.loc[:, self.target_labels]

    # convert to numpy arrays for consistency between libraries
    self.X_data = np.asarray(self.X_data, dtype=np.float32)
    self.Y_data = np.asarray(self.Y_data, dtype=np.float32)

    self.X_train, self.X_test, self.Y_train, self.Y_test = self.train_test_split(
        **self.train_test_split_kwargs
    )

    # get the models
    self.model_instances = self.get_models()

    # create work directory
    self.work_dir.mkdir(parents=True, exist_ok=True)

    # setup mlflow if logging is enabled
    if self.log_experiment:
        self._setup_mlflow()
compare_models
compare_models(sort: str = 'r2', cross_validation: bool = False) -> Any

Compares the models and returns the best model.

"model_fit_time" is in seconds.

Parameters:

  • sort (str, default: 'r2' ) –

    Metric to sort the models by. Defaults to "r2".

  • cross_validation (bool, default: False ) –

    Whether to use cross-validation

Returns:

  • tuple ( Any ) –

    (dict of fitted models, pd.DataFrame of grid scores sorted by sort)

Source code in src/fowt_ml/pipeline.py
def compare_models(self, sort: str = "r2", cross_validation: bool = False) -> Any:
    """Compares the models and returns the best model.

    "model_fit_time" is in seconds.

    Args:
        sort (str, optional): Metric to sort the models by. Defaults to "r2".
        cross_validation (bool, optional): Whether to use cross-validation
        for comparison. Defaults to False.

    Returns:
        tuple: (dict of fitted models, pd.DataFrame of grid scores sorted by `sort`)
    """
    self.fitted_models = {}
    self.scores = {}
    for model_name in self.model_names:
        fitted_model, scores = self._run_model(model_name, cross_validation)
        self.fitted_models[model_name] = fitted_model
        self.scores[model_name] = scores

    grid_scores = pd.DataFrame(self.scores).T

    if sort not in grid_scores.columns:
        raise ValueError(
            f"Sort '{sort}' not in metrics {grid_scores.columns.tolist()}"
            " provided. Choose one of the metrics to sort the models."
        )

    ascending = sort in {"model_fit_time", "model_predict_time"}
    self.grid_scores_sorted = grid_scores.sort_values(by=sort, ascending=ascending)

    self._log_model()
    self._save_grid_scores()
    self._save_best_model()

    return self.fitted_models, self.grid_scores_sorted

Model modules

fowt_ml.base

This is the base class for all models in the fowt_ml package.

Classes:

BaseModel

BaseModel(estimator: str | BaseEstimator, **kwargs: dict[str, Any])

Base class for all models.

Methods:

Source code in src/fowt_ml/base.py
def __init__(
    self, estimator: str | BaseEstimator, **kwargs: dict[str, Any]
) -> None:
    """Initialize the class with the estimator."""
    if isinstance(estimator, str):
        if estimator not in self.ESTIMATOR_NAMES:
            raise ValueError(f"Available estimators: {self.ESTIMATOR_NAMES.keys()}")
        self.estimator = self.ESTIMATOR_NAMES[estimator](**kwargs)
    else:
        self.estimator = estimator.set_params(**kwargs)
calculate_score
calculate_score(x_train: ArrayLike, x_test: ArrayLike, y_train: ArrayLike, y_test: ArrayLike, scoring: str | Iterable) -> float | dict[str, float]

Calculate the score for the model using test data.

First, the model is fitted to the training data, and the time taken to fit the model is recorded. Then, the model is scored using the provided scoring method(s) on the test data.

In multi-output regression, by default, 'uniform_average' is used, which specifies a uniformly weighted mean over outputs. see https://scikit-learn.org/stable/modules/model_evaluation.html#regression-metrics

For scoring paramers overview: https://scikit-learn.org/stable/modules/model_evaluation.html#string-name-scorers

Parameters:

  • x_train (ArrayLike) –

    training data for features

  • x_test (ArrayLike) –

    test data for features

  • y_train (ArrayLike) –

    training data for targets

  • y_test (ArrayLike) –

    test data for targets

  • scoring (str | Iterable) –

    scoring method(s) to use.

Returns:

  • float | dict[str, float]

    float | dict[str, float]: the calculated score(s)

Source code in src/fowt_ml/base.py
def calculate_score(
    self,
    x_train: ArrayLike,
    x_test: ArrayLike,
    y_train: ArrayLike,
    y_test: ArrayLike,
    scoring: str | Iterable,
) -> float | dict[str, float]:
    """Calculate the score for the model using test data.

    First, the model is fitted to the training data, and the time taken to
    fit the model is recorded. Then, the model is scored using the provided
    scoring method(s) on the `test` data.

    In multi-output regression, by default, 'uniform_average' is used,
    which specifies a uniformly weighted mean over outputs. see
    https://scikit-learn.org/stable/modules/model_evaluation.html#regression-metrics

    For scoring paramers overview:
    https://scikit-learn.org/stable/modules/model_evaluation.html#string-name-scorers

    Args:
        x_train (ArrayLike): training data for features
        x_test (ArrayLike): test data for features
        y_train (ArrayLike): training data for targets
        y_test (ArrayLike): test data for targets
        scoring (str | Iterable, optional): scoring method(s) to use.

    Returns:
        float | dict[str, float]: the calculated score(s)
    """  # noqa: E501
    model_fit_time = _measure_fit_time(self.estimator, x_train, y_train)

    # prepare scoring list and check if "model_fit_time" is included
    scoring_list = [scoring] if isinstance(scoring, str) else list(scoring)
    include_fit_time = "model_fit_time" in scoring_list
    include_predict_time = "model_predict_time" in scoring_list

    # Remove custom timing keys before passing to sklearn scorer
    scoring_list = [
        s for s in scoring_list if s not in {"model_fit_time", "model_predict_time"}
    ]

    if scoring_list:
        scorer = check_scoring(self.estimator, scoring=scoring_list)
        scores = scorer(self.estimator, x_test, y_test)
    else:
        scores = {}

    if include_fit_time:
        scores["model_fit_time"] = model_fit_time

    if include_predict_time:
        scores["model_predict_time"] = _measure_predict_latency(
            self.estimator, x_test
        )
    return scores
cross_validate
cross_validate(x_train: ArrayLike, y_train: ArrayLike, scoring: str | Iterable, **kwargs: Any) -> dict[str, Any]

Perform cross-validation on the model.

Parameters:

  • x_train (ArrayLike) –

    features data

  • y_train (ArrayLike) –

    target data

  • scoring (str | Iterable) –

    scoring method(s) to use.

  • **kwargs (Any, default: {} ) –

    additional keyword arguments to pass to cross_validate

Returns:

  • dict[str, Any]

    dict[str, Any]: dictionary containing cross-validation results

Source code in src/fowt_ml/base.py
def cross_validate(
    self,
    x_train: ArrayLike,
    y_train: ArrayLike,
    scoring: str | Iterable,
    **kwargs: Any,
) -> dict[str, Any]:
    """Perform cross-validation on the model.

    Args:
        x_train (ArrayLike): features data
        y_train (ArrayLike): target data
        scoring (str | Iterable, optional): scoring method(s) to use.
        **kwargs: additional keyword arguments to pass to `cross_validate`

    Returns:
        dict[str, Any]: dictionary containing cross-validation results
    """
    scoring_list = [scoring] if isinstance(scoring, str) else list(scoring)
    include_fit_time = "model_fit_time" in scoring_list
    include_predict_time = "model_predict_time" in scoring_list

    scoring_list = [
        s for s in scoring_list if s not in {"model_fit_time", "model_predict_time"}
    ]

    scorers = {}
    if scoring_list:
        scorers.update({s: s for s in scoring_list})
    if include_predict_time:
        scorers["model_predict_time"] = _measure_predict_latency

    cv_results = cross_validate(
        self.estimator,
        x_train,
        y_train,
        scoring=scorers or None,
        return_train_score=False,
        **kwargs,
    )

    results = {}
    for k, v in cv_results.items():
        if k.startswith("test_"):
            results[k.replace("test_", "")] = v
        elif include_fit_time and k == "fit_time":
            results["model_fit_time"] = np.round(v, 3)

    return results
use_scaled_data
use_scaled_data()

Wrap the estimator to use scaled data for both X and y.

Source code in src/fowt_ml/base.py
def use_scaled_data(self):
    """Wrap the estimator to use scaled data for both X and y."""
    if isinstance(self.estimator, TransformedTargetRegressor):
        return self  # already wrapped

    # Pipeline for input scaling + model
    regressor = Pipeline([("scaler", StandardScaler()), ("model", self.estimator)])

    # Wrap with TransformedTargetRegressor for y scaling
    self.estimator = TransformedTargetRegressor(
        regressor=regressor, transformer=StandardScaler()
    )
    return self

fowt_ml.linear_models

Module to handle linear models.

Classes:

  • LinearModels

    Class to handle linear models and metrics for comparison.

LinearModels

LinearModels(estimator: str | BaseEstimator, **kwargs: dict[str, Any])

Bases: BaseModel

Class to handle linear models and metrics for comparison.

Source code in src/fowt_ml/base.py
def __init__(
    self, estimator: str | BaseEstimator, **kwargs: dict[str, Any]
) -> None:
    """Initialize the class with the estimator."""
    if isinstance(estimator, str):
        if estimator not in self.ESTIMATOR_NAMES:
            raise ValueError(f"Available estimators: {self.ESTIMATOR_NAMES.keys()}")
        self.estimator = self.ESTIMATOR_NAMES[estimator](**kwargs)
    else:
        self.estimator = estimator.set_params(**kwargs)

fowt_ml.ensemble

Class to handle random forest models and metrics for comparison.

Classes:

  • EnsembleModel

    Class to handle random forest models and metrics for comparison.

EnsembleModel

EnsembleModel(estimator: str | BaseEstimator, **kwargs: dict[str, Any])

Bases: BaseModel

Class to handle random forest models and metrics for comparison.

Methods:

  • oob_score

    Fit and estimate generalization score from out-of-bag samples.

Source code in src/fowt_ml/base.py
def __init__(
    self, estimator: str | BaseEstimator, **kwargs: dict[str, Any]
) -> None:
    """Initialize the class with the estimator."""
    if isinstance(estimator, str):
        if estimator not in self.ESTIMATOR_NAMES:
            raise ValueError(f"Available estimators: {self.ESTIMATOR_NAMES.keys()}")
        self.estimator = self.ESTIMATOR_NAMES[estimator](**kwargs)
    else:
        self.estimator = estimator.set_params(**kwargs)
oob_score
oob_score(x: ArrayLike, y: ArrayLike, scoring: str) -> float

Fit and estimate generalization score from out-of-bag samples.

Source code in src/fowt_ml/ensemble.py
def oob_score(self, x: ArrayLike, y: ArrayLike, scoring: str) -> float:
    """Fit and estimate generalization score from out-of-bag samples."""
    scorer = get_scorer(scoring)

    def score_func(y, y_pred, **kwargs):
        return scorer._sign * scorer._score_func(y, y_pred, **kwargs)

    oob_score = score_func
    if not (self.estimator.bootstrap and self.estimator.oob_score):
        warnings.warn(f"Setting `bootstrap=True` and `oob_score={oob_score}`")
        self.estimator.set_params(bootstrap=True, oob_score=oob_score)
    self.estimator.fit(x, y)
    return self.estimator.oob_score_

fowt_ml.neural_network

Module to handle Neural Network models.

Classes:

Functions:

  • create_skorch_regressor

    Create a skorch NeuralNetRegressor with a specified RNN model.

  • RNNRegressor

    Create a skorch NeuralNetRegressor with a standard RNN model.

  • LSTMRegressor

    Create a skorch NeuralNetRegressor with an LSTM model.

  • GRURegressor

    Create a skorch NeuralNetRegressor with a GRU model.

GenericRNNModule

GenericRNNModule(rnn_model, input_size, hidden_size, output_size, num_layers=1)

Bases: Module

Methods:

  • forward

    Forward pass of the RNN module.

Source code in src/fowt_ml/neural_network.py
def __init__(self, rnn_model, input_size, hidden_size, output_size, num_layers=1):
    super().__init__()
    self.rnn = rnn_model(
        input_size=input_size,
        hidden_size=hidden_size,
        num_layers=num_layers,
        batch_first=True,
    )
    self.fc = torch.nn.Linear(hidden_size, output_size)
forward
forward(x)

Forward pass of the RNN module.

Source code in src/fowt_ml/neural_network.py
def forward(self, x):
    """Forward pass of the RNN module."""
    if x.dim() == 2:
        x = x.unsqueeze(0)  # add batch dim
    out, _ = self.rnn(x)

    out_fc = self.fc(out)  # regression on all time steps
    if out_fc.shape[0] == 1:
        out_fc = out_fc.squeeze(0)
    return out_fc

NeuralNetwork

NeuralNetwork(estimator: str | BaseEstimator, **kwargs: dict[str, Any])

Bases: BaseModel

Class to handle Neural Network models and metrics for comparison.

Source code in src/fowt_ml/base.py
def __init__(
    self, estimator: str | BaseEstimator, **kwargs: dict[str, Any]
) -> None:
    """Initialize the class with the estimator."""
    if isinstance(estimator, str):
        if estimator not in self.ESTIMATOR_NAMES:
            raise ValueError(f"Available estimators: {self.ESTIMATOR_NAMES.keys()}")
        self.estimator = self.ESTIMATOR_NAMES[estimator](**kwargs)
    else:
        self.estimator = estimator.set_params(**kwargs)

create_skorch_regressor

create_skorch_regressor(rnn_model, input_size, hidden_size, output_size, num_layers=1, **kwargs)

Create a skorch NeuralNetRegressor with a specified RNN model.

Source code in src/fowt_ml/neural_network.py
def create_skorch_regressor(
    rnn_model,
    input_size,
    hidden_size,
    output_size,
    num_layers=1,
    **kwargs,
):
    """Create a skorch NeuralNetRegressor with a specified RNN model."""
    params = dict(
        module=GenericRNNModule,
        module__rnn_model=rnn_model,
        module__input_size=input_size,
        module__hidden_size=hidden_size,
        module__output_size=output_size,
        module__num_layers=num_layers,
        verbose=0,
    )
    params.update(kwargs)
    return skorch.regressor.NeuralNetRegressor(**params)

RNNRegressor

RNNRegressor(**args)

Create a skorch NeuralNetRegressor with a standard RNN model.

Source code in src/fowt_ml/neural_network.py
def RNNRegressor(**args):  # noqa: N802
    """Create a skorch NeuralNetRegressor with a standard RNN model."""
    return create_skorch_regressor(torch.nn.RNN, **args)

LSTMRegressor

LSTMRegressor(**args)

Create a skorch NeuralNetRegressor with an LSTM model.

Source code in src/fowt_ml/neural_network.py
def LSTMRegressor(**args):  # noqa: N802
    """Create a skorch NeuralNetRegressor with an LSTM model."""
    return create_skorch_regressor(torch.nn.LSTM, **args)

GRURegressor

GRURegressor(**args)

Create a skorch NeuralNetRegressor with a GRU model.

Source code in src/fowt_ml/neural_network.py
def GRURegressor(**args):  # noqa: N802
    """Create a skorch NeuralNetRegressor with a GRU model."""
    return create_skorch_regressor(torch.nn.GRU, **args)

fowt_ml.gaussian_process

Module for sparse Gaussian process for multi-output regeression problem.

Classes:

MultitaskGPModelApproximate

MultitaskGPModelApproximate(inducing_points, num_latents, num_tasks)

Bases: ApproximateGP

Multitask GP model with approximate inference.

This module models similarities/correlation in the outputs simultaneously. Each output dimension (task) is the linear combination of some latent function. Base on example https://docs.gpytorch.ai/en/stable/examples/04_Variational_and_Approximate_GPs/SVGP_Multitask_GP_Regression.html#Types-of-Variational-Multitask-Models

Methods:

  • forward

    Forward pass of the model.

Source code in src/fowt_ml/gaussian_process.py
def __init__(self, inducing_points, num_latents, num_tasks):
    # convert inducing points to tensor
    inducing_points = _to_tensor(inducing_points, dtype="float32", device=DEVICE)

    # Variational distribution + strategy: posterior for latent GPs
    # CholeskyVariationalDistribution: modeling a full covariance (not
    # diagonal), so it can capture dependencies between inducing points
    variational_distribution = gpytorch.variational.CholeskyVariationalDistribution(
        num_inducing_points=inducing_points.size(0),
        batch_shape=torch.Size([num_latents]),
    )

    # Check inducing points shape before passing to VariationalStrategy
    inducing_points = _check_inducing_points(inducing_points, num_latents)

    # model correlations across tasks (or outputs)
    variational_strategy = gpytorch.variational.LMCVariationalStrategy(
        gpytorch.variational.VariationalStrategy(
            self,
            inducing_points,
            variational_distribution,
            learn_inducing_locations=True,
        ),
        num_tasks=num_tasks,
        num_latents=num_latents,
        latent_dim=-1,
    )
    super().__init__(variational_strategy)

    # covariance module: kernel: Prior information about latents
    self.covar = gpytorch.kernels.ScaleKernel(
        gpytorch.kernels.RBFKernel(batch_shape=torch.Size([num_latents])),
        batch_shape=torch.Size([num_latents]),
    )
    # Mean module
    self.mean = gpytorch.means.ConstantMean(
        batch_shape=torch.Size([num_latents]),
    )

    self.likelihood = gpytorch.likelihoods.MultitaskGaussianLikelihood(
        num_tasks=num_tasks
    ).to(DEVICE)
forward
forward(x)

Forward pass of the model.

Source code in src/fowt_ml/gaussian_process.py
def forward(self, x):
    """Forward pass of the model."""
    mean_x = self.mean(x)
    covar_x = self.covar(x)
    return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)

SklearnGPRegressor

SklearnGPRegressor(num_inducing, num_latents, num_epochs=10, batch_size=1024, learning_rate=0.01)

Bases: RegressorMixin, BaseEstimator

Sklearn Wrapper for MultitaskGPModelApproximate.

Methods:

  • fit

    Fit the model to the training data.

  • predict

    Make predictions using the trained model.

  • score

    Return the R^2 score of the prediction.

Source code in src/fowt_ml/gaussian_process.py
def __init__(
    self,
    num_inducing,
    num_latents,
    num_epochs=10,
    batch_size=1024,
    learning_rate=0.01,
):
    self.num_inducing = num_inducing
    self.num_latents = num_latents
    self.num_epochs = num_epochs
    self.batch_size = batch_size
    self.learning_rate = learning_rate
fit
fit(x_train: ArrayLike, y_train: ArrayLike) -> SklearnGPRegressor

Fit the model to the training data.

Source code in src/fowt_ml/gaussian_process.py
def fit(self, x_train: ArrayLike, y_train: ArrayLike) -> "SklearnGPRegressor":
    """Fit the model to the training data."""
    # Check that X and y have correct shape
    x_train, y_train = check_X_y(x_train, y_train, multi_output=True)

    x_train = _to_tensor(x_train, dtype="float32", device=DEVICE)
    y_train = _to_tensor(y_train, dtype="float32", device=DEVICE)

    # add some sklearn variables
    self.X_ = x_train
    self.y_ = y_train
    self.n_features_in_ = x_train.shape[1]

    # initialize model
    if y_train.ndim == 1:
        y_train = y_train.unsqueeze(1)

    inducing_points = x_train[torch.randperm(x_train.size(0))[: self.num_inducing]]

    self.module_ = MultitaskGPModelApproximate(
        inducing_points=inducing_points,
        num_latents=self.num_latents,
        num_tasks=y_train.size(1),
    ).to(DEVICE)

    self.likelihood_ = self.module_.likelihood

    # Train the model
    self.module_.train()
    self.likelihood_.train()

    optimizer = torch.optim.Adam(self.module_.parameters(), lr=self.learning_rate)
    # marginal log likelihood (mll)
    mll = gpytorch.mlls.VariationalELBO(
        self.likelihood_, self.module_, num_data=x_train.size(0)
    )

    # TODO optimize the loops
    for epoch in range(self.num_epochs):
        total_loss = 0
        if self.batch_size:  # Use batching if batch_size is set
            batches = DataLoader(
                TensorDataset(x_train, y_train),
                batch_size=self.batch_size,
            )
        else:  # Treat entire dataset as one batch
            batches = [(x_train, y_train)]
        for x_batch, y_batch in batches:
            optimizer.zero_grad()
            output = self.module_(x_batch)
            loss = -mll(output, y_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss

        # normalize the loss per data point and output dimension because it
        # gives a better idea of the loss in log
        ave_loss = total_loss.item() / (x_train.size(0) * y_train.size(1))
        logger.info(f"Epoch {epoch + 1}/{self.num_epochs} - Loss: {ave_loss:.3f}")

    self.is_fitted_ = True
    return self
predict
predict(x_array: ArrayLike) -> ArrayLike

Make predictions using the trained model.

Source code in src/fowt_ml/gaussian_process.py
def predict(self, x_array: ArrayLike) -> ArrayLike:
    """Make predictions using the trained model."""
    # Check if the model has been fitted
    check_is_fitted(self, ["is_fitted_", "module_", "likelihood_"])

    # Check that X has correct shape
    x_array = check_array(x_array)

    # Check number of features
    if x_array.shape[1] != self.n_features_in_:
        raise ValueError(
            f"Expected {self.n_features_in_} features, "
            f"but got {x_array.shape[1]} features."
        )
    x_array = _to_tensor(x_array, dtype="float32", device=DEVICE)

    self.module_.eval()
    self.likelihood_.eval()

    all_preds = []
    with torch.no_grad():
        if self.batch_size:  # Use batching if batch_size is set
            batches = DataLoader(
                TensorDataset(x_array),
                batch_size=self.batch_size,
            )
        else:  # Treat entire dataset as one batch
            batches = [(x_array,)]
        for (x_batch,) in batches:
            predictions = self.likelihood_(self.module_(x_batch))
            all_preds.append(predictions.mean.cpu())

    # sklearn multioutput regressor expects float64
    # see check_multioutput_regressor
    return torch.cat(all_preds, dim=0).numpy().astype(np.float64)
score
score(x, y)

Return the R^2 score of the prediction.

Source code in src/fowt_ml/gaussian_process.py
def score(self, x, y):
    """Return the R^2 score of the prediction."""
    y_pred = self.predict(x)
    return r2_score(y, y_pred)

SparseGaussianModel

SparseGaussianModel(estimator: str | BaseEstimator, **kwargs: dict[str, Any])

Bases: BaseModel

Class to handle sparse Gaussian process regression.

Source code in src/fowt_ml/base.py
def __init__(
    self, estimator: str | BaseEstimator, **kwargs: dict[str, Any]
) -> None:
    """Initialize the class with the estimator."""
    if isinstance(estimator, str):
        if estimator not in self.ESTIMATOR_NAMES:
            raise ValueError(f"Available estimators: {self.ESTIMATOR_NAMES.keys()}")
        self.estimator = self.ESTIMATOR_NAMES[estimator](**kwargs)
    else:
        self.estimator = estimator.set_params(**kwargs)

fowt_ml.xgboost

The module for XGBoost model training and evaluation.

Classes:

  • XGBoost

    Class to handle linear models and metrics for comparison.

XGBoost

XGBoost(estimator: str | BaseEstimator, **kwargs: dict[str, Any])

Bases: BaseModel

Class to handle linear models and metrics for comparison.

Source code in src/fowt_ml/base.py
def __init__(
    self, estimator: str | BaseEstimator, **kwargs: dict[str, Any]
) -> None:
    """Initialize the class with the estimator."""
    if isinstance(estimator, str):
        if estimator not in self.ESTIMATOR_NAMES:
            raise ValueError(f"Available estimators: {self.ESTIMATOR_NAMES.keys()}")
        self.estimator = self.ESTIMATOR_NAMES[estimator](**kwargs)
    else:
        self.estimator = estimator.set_params(**kwargs)