Source code for ocean.maxsat._variables._feature

from collections.abc import Mapping

import numpy as np

from ...feature import Feature
from ...feature._keeper import FeatureKeeper
from ...typing import Key
from .._base import BaseModel, Var


[docs] class FeatureVar(Var, FeatureKeeper): """MaxSAT variable bundle associated with a single parsed feature.""" X_VAR_NAME_FMT: str = "x[{name}]" TH_VAR_NAME_FMT: str = "{name}[{idx}]" _x: int _u: Mapping[Key, int] _mu: Mapping[Key, int] _th: Mapping[Key, int] _threshold_values: tuple[float, ...] _threshold_encoding: bool = False _hard_voting: bool = False def __init__(self, feature: Feature, name: str) -> None: Var.__init__(self, name=name) FeatureKeeper.__init__(self, feature=feature)
[docs] def build(self, model: BaseModel) -> None: self._hard_voting = bool(getattr(model, "_hard_voting", False)) self._threshold_encoding = self.is_continuous or ( self.is_discrete and self._hard_voting ) if self.is_binary: self._x = self._add_x(model) if self.is_numeric: if self._threshold_encoding: self._th = self._add_th(model) self._add_threshold_order(model) if self.is_discrete: self._add_discrete_feasibility(model) else: self._mu = self._add_mu(model) model.add_exactly_one(list(self._mu.values())) if self.is_one_hot_encoded: self._u = self._add_u(model)
[docs] def xget(self, code: Key | None = None, mu: Key | None = None) -> int: if mu is not None and code is not None: msg = "Cannot get both 'mu' and 'code' at the same time" raise ValueError(msg) if self.is_one_hot_encoded: return self._xget_one_hot_encoded(code) if code is not None: msg = "Get by code is only supported for one-hot encoded features" raise ValueError(msg) if self.is_numeric: if self._threshold_encoding: return self._xget_numeric_threshold(mu) return self._xget_numeric(mu) if mu is not None: msg = "Get by 'mu' is only supported for numeric features" raise ValueError(msg) return self._x
@property def has_threshold_encoding(self) -> bool: return self._threshold_encoding and self.is_numeric @property def split_threshold_values(self) -> tuple[float, ...]: if not self.is_numeric: msg = "Split thresholds are only available for numeric features." raise ValueError(msg) return self._threshold_values
[docs] def threshold_index(self, value: float) -> int: if not self.has_threshold_encoding: msg = "Threshold indices are only available in hard-voting mode." raise ValueError(msg) values = np.asarray(self._threshold_values, dtype=np.float64) matches = np.flatnonzero(np.isclose(values, value)) if matches.size == 0: msg = f"Threshold '{value}' not found for this feature." raise ValueError(msg) return int(matches[0])
def _add_x(self, model: BaseModel) -> int: if not self.is_binary: msg = "The '_add_x' method is only supported for binary features" raise ValueError(msg) name = self.X_VAR_NAME_FMT.format(name=self._name) return self._add_binary(model, name) def _add_u(self, model: BaseModel) -> Mapping[Key, int]: name = self._name.format(name=self._name) u = self._add_one_hot_encoded(model=model, name=name) model.add_exactly_one(list(u.values())) return u def _add_one_hot_encoded( self, model: BaseModel, name: str, ) -> Mapping[Key, int]: return { code: model.add_var( name=f"{name}[{code}]", ) for code in self.codes } def _add_mu(self, model: BaseModel) -> Mapping[Key, int]: name = self._name.format(name=self._name) if self.is_discrete: # For discrete features: one mu variable per level (value) # mu[i] means value == levels[i] n_values = len(self.levels) return { lv: model.add_var( name=f"{name}[{lv}]", ) for lv in range(n_values) } # For continuous features: n-1 mu variables for n levels (intervals) # mu[i] means value in interval (levels[i], levels[i+1]] n_intervals = len(self.levels) - 1 return { lv: model.add_var( name=f"{name}[{lv}]", ) for lv in range(n_intervals) } def _get_split_threshold_values(self) -> tuple[float, ...]: minimum_levels: int = 2 if self.is_continuous: if len(self.levels) <= minimum_levels: return () return tuple(map(float, self.levels[1:-1])) return tuple(map(float, self.thresholds)) def _add_th(self, model: BaseModel) -> Mapping[Key, int]: self._threshold_values = self._get_split_threshold_values() name = self._name.format(name=self._name) return { idx: model.add_var( name=self.TH_VAR_NAME_FMT.format(name=name, idx=idx), ) for idx in range(len(self._threshold_values)) } def _add_threshold_order(self, model: BaseModel) -> None: for idx in range(len(self._threshold_values) - 1): model.add_hard([-self._th[idx], self._th[idx + 1]]) def _add_discrete_feasibility(self, model: BaseModel) -> None: levels = np.asarray(self.levels, dtype=np.float64) thresholds = np.asarray(self._threshold_values, dtype=np.float64) for idx in range(len(thresholds) - 1): lower = thresholds[idx] upper = thresholds[idx + 1] feasible = np.any((levels > lower) & (levels <= upper)) if not feasible: model.add_hard([self._th[idx], -self._th[idx + 1]]) @staticmethod def _add_binary(model: BaseModel, name: str) -> int: return model.add_var(name=name) def _xget_one_hot_encoded(self, code: Key | None) -> int: if code is None: msg = "Code is required for one-hot encoded features get" raise ValueError(msg) if code not in self.codes: msg = f"Code '{code}' not found in the feature codes" raise ValueError(msg) return self._u[code] def _xget_numeric(self, mu: Key | None) -> int: if mu is None: msg = "mu is required to get numeric features" raise ValueError(msg) if self.is_discrete: # For discrete: mu[i] represents value levels[i] n_values = len(self.levels) if mu not in range(n_values): msg = f"mu '{mu}' not in values (0 to {n_values - 1})" raise ValueError(msg) else: # For continuous: mu[i] represents interval (levels[i], levels[i+1]] n_intervals = len(self.levels) - 1 if mu not in range(n_intervals): msg = f"mu '{mu}' not in intervals (0 to {n_intervals - 1})" raise ValueError(msg) return self._mu[mu] def _xget_numeric_threshold(self, mu: Key | None) -> int: if mu is None: msg = "mu is required to get hard-voting numeric thresholds" raise ValueError(msg) n_thresholds = len(self._threshold_values) if mu not in range(n_thresholds): msg = f"mu '{mu}' not in thresholds (0 to {n_thresholds - 1})" raise ValueError(msg) return self._th[mu]