Source code for rkhsiv

"""
This module provides implementations of RKHS Instrumental Variable (IV) estimators.

Classes:
    _BaseRKHSIV: Base class for RKHS IV methods.
    RKHSIV: RKHS IV estimator.
    RKHSIVCV: RKHS IV estimator with cross-validation.
    RKHSIVL2: RKHS IV estimator with L2 regularization.
    RKHSIVL2CV: RKHS IV estimator with L2 regularization and cross-validation.
    ApproxRKHSIV: Approximate RKHS IV estimator using kernel approximations.
    ApproxRKHSIVCV: Approximate RKHS IV estimator with cross-validation using kernel approximations.
    ApproxRKHSIVL2: Approximate RKHS IV estimator with L2 regularization.
    ApproxRKHSIVL2CV: Approximate RKHS IV estimator with L2 regularization and cross-validation.
"""

# Licensed under the MIT License.

from sklearn.metrics.pairwise import pairwise_kernels, euclidean_distances
from sklearn.model_selection import KFold
from sklearn.kernel_approximation import Nystroem, RBFSampler
import numpy as np


def _check_auto(param):
    return (isinstance(param, str) and (param == 'auto'))


def _to_column_vector(y):
    arr = np.asarray(y)
    if arr.ndim == 1:
        return arr.reshape(-1, 1)
    if arr.ndim == 2 and arr.shape[1] == 1:
        return arr
    raise ValueError(
        "`Y` must be a 1D array or a 2D column vector with shape (n, 1). "
        f"Got shape={arr.shape!r}."
    )


def _to_scalar(x):
    arr = np.asarray(x)
    if arr.size != 1:
        raise ValueError(
            "Expected scalar quadratic form, got array with "
            f"shape={arr.shape!r} and size={arr.size}."
        )
    return float(arr.reshape(-1)[0])


def _sqrt_psd_matrix(K):
    """
    Numerically stable real square-root for symmetric PSD matrices.

    Kernel matrices can be theoretically PSD but have tiny negative/complex
    artifacts in floating point arithmetic. This routine symmetrizes ``K``,
    clips negative eigenvalues to zero, and returns a real square-root.
    """
    K_sym = 0.5 * (K + K.T)
    evals, evecs = np.linalg.eigh(K_sym)
    evals = np.clip(evals, 0.0, None)
    return (evecs * np.sqrt(evals)) @ evecs.T


class _BaseRKHSIV:
    """
    Base class for RKHS IV methods.

    This class provides common functionality for RKHS IV estimators.

    Parameters:
        kernel (str or callable): Kernel function or string identifier.
        gamma (str or float): Length scale for the kernel.
        degree (int): Degree for polynomial kernels.
        coef0 (float): Zero coefficient for polynomial kernels.
        delta_scale (str or float): Scale of the critical radius.
        delta_exp (str or float): Exponent of the critical radius.
        alpha_scale (str or float): Scale of the regularization parameter.
        kernel_params (dict): Additional parameters for the kernel.
    """

    def __init__(self, *args, **kwargs):
        return

    def _get_delta(self, n):
        """
        Compute the critical radius.

        Parameters:
            n (int): Number of samples.

        Returns:
            float: Critical radius.
        """
        delta_scale = 5 if _check_auto(self.delta_scale) else self.delta_scale
        delta_exp = .4 if _check_auto(self.delta_exp) else self.delta_exp
        return delta_scale / (n**(delta_exp))

    def _get_alpha_scale(self):
        return 60 if _check_auto(self.alpha_scale) else self.alpha_scale

    def _get_alpha_scales(self):
        return ([c for c in np.geomspace(0.1, 1e4, self.n_alphas)]
                if _check_auto(self.alpha_scales) else self.alpha_scales)

    def _get_alpha(self, delta, alpha_scale):
        return alpha_scale * (delta**4)

    def _get_kernel(self, X, Y=None):
        if callable(self.kernel):
            params = self.kernel_params or {}
        else:
            if _check_auto(self.gamma):
                pairwise_dists = euclidean_distances(X, X)
                median_dist = np.median(pairwise_dists)
                gamma = 1.0 / (2 * median_dist)
            else:
                gamma = self.gamma
            params = {"gamma": gamma,
                      "degree": self.degree,
                      "coef0": self.coef0}
        return pairwise_kernels(X, Y, metric=self.kernel,
                                filter_params=True, **params)


[docs]class RKHSIV(_BaseRKHSIV): """ RKHS IV estimator. This class implements an RKHS IV estimator. Parameters: kernel (str or callable): Kernel function or string identifier. gamma (str or float): Length scale for the kernel. degree (int): Degree for polynomial kernels. coef0 (float): Zero coefficient for polynomial kernels. delta_scale (str or float): Scale of the critical radius. delta_exp (str or float): Exponent of the critical radius. alpha_scale (str or float): Scale of the regularization parameter. kernel_params (dict): Additional parameters for the kernel. """ def __init__(self, kernel='rbf', gamma=2, degree=3, coef0=1, delta_scale='auto', delta_exp='auto', alpha_scale='auto', kernel_params=None): self.kernel = kernel self.degree = degree self.coef0 = coef0 self.gamma = gamma self.kernel_params = kernel_params self.delta_scale = delta_scale self.delta_exp = delta_exp self.alpha_scale = alpha_scale def fit(self, Z, T, Y): """ Fit the RKHS IV estimator. Parameters: Z (array-like): Instrumental variables. T (array-like): Treatments. Y (array-like): Outcomes. Returns: self: Fitted estimator. """ Y = _to_column_vector(Y) n = Y.shape[0] delta = self._get_delta(n) alpha = self._get_alpha(delta, self._get_alpha_scale()) Kh = self._get_kernel(T) Kf = self._get_kernel(Z) RootKf = _sqrt_psd_matrix(Kf) M = RootKf @ np.linalg.inv( Kf / (2 * n * delta**2) + np.eye(n) / 2) @ RootKf self.T = T.copy() self.a = np.linalg.pinv(Kh @ M @ Kh + alpha * Kh) @ Kh @ M @ Y return self def predict(self, T_test): """ Predict outcomes for new treatments. Parameters: T_test (array-like): New treatments. Returns: array-like: Predicted outcomes. """ return self._get_kernel(T_test, Y=self.T) @ self.a def score(self, Z, T, Y, delta='auto'): """ Compute the score of the fitted estimator. Parameters: Z (array-like): Instrumental variables. T (array-like): Treatments. Y (array-like): Outcomes. delta (str or float): Critical radius. Returns: float: Score. """ Y = _to_column_vector(Y) n = Y.shape[0] delta = self._get_delta(n) Kf = self._get_kernel(Z) RootKf = _sqrt_psd_matrix(Kf) M = RootKf @ np.linalg.inv( Kf / (2 * n * delta**2) + np.eye(n) / 2) @ RootKf Y_pred = self.predict(T) return _to_scalar((Y - Y_pred).T @ M @ (Y - Y_pred)) / n**2
[docs]class RKHSIVCV(RKHSIV): """ RKHS IV estimator with cross-validation. This class implements an RKHS IV estimator with cross-validation. Parameters: kernel (str or callable): Kernel function or string identifier. gamma (str or float): Length scale for the kernel. degree (int): Degree for polynomial kernels. coef0 (float): Zero coefficient for polynomial kernels. delta_scale (str or float): Scale of the critical radius. delta_exp (str or float): Exponent of the critical radius. alpha_scales (str or array-like): Scale of the regularization parameter. n_alphas (int): Number of alpha scales to try. cv (int): Number of folds for cross-validation. kernel_params (dict): Additional parameters for the kernel. """ def __init__(self, kernel='rbf', gamma=2, degree=3, coef0=1, kernel_params=None, delta_scale='auto', delta_exp='auto', alpha_scales='auto', n_alphas=30, cv=6): self.kernel = kernel self.degree = degree self.coef0 = coef0 self.gamma = gamma self.kernel_params = kernel_params self.delta_scale = delta_scale self.delta_exp = delta_exp self.alpha_scales = alpha_scales self.n_alphas = n_alphas self.cv = cv def fit(self, Z, T, Y): """ Fit the RKHS IV estimator with cross-validation. Parameters: Z (array-like): Instrumental variables. T (array-like): Treatments. Y (array-like): Outcomes. Returns: self: Fitted estimator. """ Y = _to_column_vector(Y) n = Y.shape[0] Kh = self._get_kernel(T) Kf = self._get_kernel(Z) RootKf = _sqrt_psd_matrix(Kf) alpha_scales = self._get_alpha_scales() n_train = n * (self.cv - 1) / self.cv n_test = n / self.cv delta_train = self._get_delta(n_train) delta_test = self._get_delta(n_test) delta = self._get_delta(n) scores = [] for it, (train, test) in enumerate(KFold(n_splits=self.cv).split(Z)): M_train = RootKf[np.ix_(train, train)] @ np.linalg.inv( Kf[np.ix_(train, train)] / (2 * n_train * (delta_train**2)) + np.eye(len(train)) / 2) @ RootKf[np.ix_(train, train)] M_test = RootKf[np.ix_(test, test)] @ np.linalg.inv( Kf[np.ix_(test, test)] / (2 * n_test * (delta_test**2)) + np.eye(len(test)) / 2) @ RootKf[np.ix_(test, test)] Kh_train = Kh[np.ix_(train, train)] KMK_train = Kh_train @ M_train @ Kh_train B_train = Kh_train @ M_train @ Y[train] scores.append([]) for alpha_scale in alpha_scales: alpha = self._get_alpha(delta_train, alpha_scale) a = np.linalg.pinv(KMK_train + alpha * Kh_train) @ B_train res = Y[test] - Kh[np.ix_(test, train)] @ a scores[it].append(_to_scalar(res.T @ M_test @ res) / (res.shape[0]**2)) self.alpha_scales = alpha_scales self.avg_scores = np.mean(np.array(scores), axis=0) self.best_alpha_scale = alpha_scales[np.argmin(self.avg_scores)] delta = self._get_delta(n) self.best_alpha = self._get_alpha(delta, self.best_alpha_scale) M = RootKf @ np.linalg.inv( Kf / (2 * n * delta**2) + np.eye(n) / 2) @ RootKf self.T = T.copy() self.a = np.linalg.pinv( Kh @ M @ Kh + self.best_alpha * Kh) @ Kh @ M @ Y return self
[docs]class RKHSIVL2(_BaseRKHSIV): """ RKHS IV estimator with L2 regularization. This class implements an RKHS IV estimator with L2 regularization. Parameters: kernel (str or callable): Kernel function or string identifier. gamma (str or float): Length scale for the kernel. degree (int): Degree for polynomial kernels. coef0 (float): Zero coefficient for polynomial kernels. delta_scale (str or float): Scale of the critical radius. delta_exp (str or float): Exponent of the critical radius. kernel_params (dict): Additional parameters for the kernel. """ def __init__(self, kernel='rbf', gamma=2, degree=3, coef0=1, delta_scale='auto', delta_exp='auto', kernel_params=None): self.kernel = kernel self.degree = degree self.coef0 = coef0 self.gamma = gamma self.kernel_params = kernel_params self.delta_scale = delta_scale self.delta_exp = delta_exp def fit(self, Z, T, Y): """ Fit the RKHS IV estimator with L2 regularization. Parameters: Z (array-like): Instrumental variables. T (array-like): Treatments. Y (array-like): Outcomes. Returns: self: Fitted estimator. """ Y = _to_column_vector(Y) n = Y.shape[0] delta = self._get_delta(n) alpha = delta**4 Kh = self._get_kernel(T) Kf = self._get_kernel(Z) M = np.linalg.pinv(Kf) @ Kf self.T = T.copy() self.a = np.linalg.pinv(Kh @ M @ Kh + alpha * Kh @ Kh) @ Kh @ M @ Y return self def predict(self, T_test): """ Predict outcomes for new treatments. Parameters: T_test (array-like): New treatments. Returns: array-like: Predicted outcomes. """ return self._get_kernel(T_test, Y=self.T) @ self.a
[docs]class RKHSIVL2CV(RKHSIVL2): """ RKHS IV estimator with L2 regularization and cross-validation. This class implements an RKHS IV estimator with L2 regularization and cross-validation. Parameters: kernel (str or callable): Kernel function or string identifier. gamma (str or float): Length scale for the kernel. degree (int): Degree for polynomial kernels. coef0 (float): Zero coefficient for polynomial kernels. delta_scale (str or float): Scale of the critical radius. delta_exp (str or float): Exponent of the critical radius. alpha_scales (str or array-like): Scale of the regularization parameter. n_alphas (int): Number of alpha scales to try. cv (int): Number of folds for cross-validation. kernel_params (dict): Additional parameters for the kernel. """ def __init__(self, kernel='rbf', gamma=2, degree=3, coef0=1, kernel_params=None, delta_scale='auto', delta_exp='auto', alpha_scales='auto', n_alphas=30, cv=6): self.kernel = kernel self.degree = degree self.coef0 = coef0 self.gamma = gamma self.kernel_params = kernel_params self.delta_scale = delta_scale self.delta_exp = delta_exp self.alpha_scales = alpha_scales self.n_alphas = n_alphas self.cv = cv def fit(self, Z, T, Y): """ Fit the RKHS IV estimator with L2 regularization and cross-validation. Parameters: Z (array-like): Instrumental variables. T (array-like): Treatments. Y (array-like): Outcomes. Returns: self: Fitted estimator. """ Y = _to_column_vector(Y) n = Y.shape[0] Kh = self._get_kernel(T) Kf = self._get_kernel(Z) alpha_scales = self._get_alpha_scales() n_train = n * (self.cv - 1) / self.cv n_test = n / self.cv delta_train = self._get_delta(n_train) delta = self._get_delta(n) scores = [] for it, (train, test) in enumerate(KFold(n_splits=self.cv).split(Z)): M_train = np.linalg.pinv(Kf[np.ix_(train, train)]) @ Kf[np.ix_(train, train)] M_test = np.linalg.pinv(Kf[np.ix_(test, test)]) @ Kf[np.ix_(test, test)] Kh_train = Kh[np.ix_(train, train)] KMK_train = Kh_train @ M_train @ Kh_train B_train = Kh_train @ M_train @ Y[train] scores.append([]) for alpha_scale in alpha_scales: alpha = alpha_scale * delta_train**4 a = np.linalg.pinv(KMK_train + alpha * Kh_train @ Kh_train) @ B_train res = Y[test] - Kh[np.ix_(test, train)] @ a scores[it].append(_to_scalar(res.T @ M_test @ res) / (res.shape[0]**2)) self.alpha_scales = alpha_scales self.avg_scores = np.mean(np.array(scores), axis=0) self.best_alpha_scale = alpha_scales[np.argmin(self.avg_scores)] delta = self._get_delta(n) self.best_alpha = self.best_alpha_scale * delta**4 M = np.linalg.pinv(Kf) @ Kf self.T = T.copy() self.a = np.linalg.pinv( Kh @ M @ Kh + self.best_alpha * Kh @ Kh) @ Kh @ M @ Y return self
[docs]class ApproxRKHSIV(_BaseRKHSIV): """ Approximate RKHS IV estimator using kernel approximations. This class implements an approximate RKHS IV estimator using kernel approximations. Parameters: kernel_approx (str): Kernel approximation method ('nystrom' or 'rbfsampler'). n_components (int or float): Number of approximation components. If integer-like and >= 1, treated as a fixed component count. If float in (0, 1], treated as the sample fraction with a floor of 10. kernel (str or callable): Kernel function or string identifier. gamma (str or float): Length scale for the kernel. degree (int): Degree for polynomial kernels. coef0 (float): Zero coefficient for polynomial kernels. delta_scale (str or float): Scale of the critical radius. delta_exp (str or float): Exponent of the critical radius. alpha_scale (str or float): Scale of the regularization parameter. kernel_params (dict): Additional parameters for the kernel. """ def __init__(self, kernel_approx='nystrom', n_components=10, kernel='rbf', gamma=2, degree=3, coef0=1, kernel_params=None, delta_scale='auto', delta_exp='auto', alpha_scale='auto'): self.kernel_approx = kernel_approx self.n_components = n_components self.kernel = kernel self.degree = degree self.coef0 = coef0 self.gamma = gamma self.kernel_params = kernel_params self.delta_scale = delta_scale self.delta_exp = delta_exp self.alpha_scale = alpha_scale def _resolve_n_components(self, n_samples=None): """ Resolve the effective approximation dimension from ``self.n_components``. Supports two input modes: - integer-like >= 1: fixed component count - float in (0, 1]: fraction of sample size (rounded, with floor 10) The resolved count is always capped by ``n_samples`` when provided. """ try: value = float(self.n_components) except Exception as exc: raise ValueError("`n_components` must be numeric.") from exc if value <= 0: raise ValueError("`n_components` must be > 0.") if value <= 1: if n_samples is None: raise ValueError("Fractional `n_components` requires `n_samples`.") n_samples_i = int(n_samples) if n_samples_i <= 0: raise ValueError("`n_samples` must be a positive integer.") resolved = max(10, int(round(n_samples_i * value))) elif value.is_integer(): resolved = int(value) else: raise ValueError( "`n_components` must be integer-like >= 1 or a fraction in (0, 1]." ) if n_samples is not None: resolved = min(resolved, int(n_samples)) return max(1, resolved) def _get_new_approx_instance(self, n_samples=None): """ Create a new kernel approximation instance. Parameters: n_samples (int, optional): Sample count used to resolve/cap components. Returns: object: Kernel approximation instance. """ if (self.kernel_approx == 'rbfsampler') and (self.kernel == 'rbf'): n_components = self._resolve_n_components(n_samples=n_samples) return RBFSampler(gamma=self.gamma, n_components=n_components, random_state=1) elif self.kernel_approx == 'nystrom': n_components = self._resolve_n_components(n_samples=n_samples) return Nystroem(kernel=self.kernel, gamma=self.gamma, coef0=self.coef0, degree=self.degree, kernel_params=self.kernel_params, random_state=1, n_components=n_components) else: raise AttributeError("Invalid kernel approximator") def fit(self, Z, T, Y): """ Fit the approximate RKHS IV estimator. Parameters: Z (array-like): Instrumental variables. T (array-like): Treatments. Y (array-like): Outcomes. Returns: self: Fitted estimator. """ Y = _to_column_vector(Y) n = Y.shape[0] delta = self._get_delta(n) alpha = self._get_alpha(delta, self._get_alpha_scale()) self.featZ = self._get_new_approx_instance(n_samples=n) RootKf = self.featZ.fit_transform(Z) self.featT = self._get_new_approx_instance(n_samples=n) RootKh = self.featT.fit_transform(T) n_feat_f = RootKf.shape[1] n_feat_h = RootKh.shape[1] Q = np.linalg.pinv(RootKf.T @ RootKf / (2 * n * delta**2) + np.eye(n_feat_f) / 2) A = RootKh.T @ RootKf W = (A @ Q @ A.T + alpha * np.eye(n_feat_h)) B = A @ Q @ RootKf.T @ Y self.a = np.linalg.pinv(W) @ B self.fitted_delta = delta return self def predict(self, T): """ Predict outcomes for new treatments. Parameters: T (array-like): New treatments. Returns: array-like: Predicted outcomes. """ return self.featT.transform(T) @ self.a def score(self, Z, T, Y, delta='auto'): """ Compute the score of the fitted estimator. Parameters: Z (array-like): Instrumental variables. T (array-like): Treatments. Y (array-like): Outcomes. delta (str or float): Critical radius. Returns: float: Score. """ Y = _to_column_vector(Y) n = Y.shape[0] delta = self._get_delta(n) featZ = self._get_new_approx_instance(n_samples=n) RootKf = featZ.fit_transform(Z) RootKh = self.featT.fit_transform(T) n_feat_f = RootKf.shape[1] Q = np.linalg.pinv(RootKf.T @ RootKf / (2 * n * delta**2) + np.eye(n_feat_f) / 2) Y_pred = self.predict(T) res = RootKf.T @ (Y - Y_pred) return _to_scalar(res.T @ Q @ res) / n**2
[docs]class ApproxRKHSIVCV(ApproxRKHSIV): """ Approximate RKHS IV estimator with cross-validation using kernel approximations. This class implements an approximate RKHS IV estimator with cross-validation using kernel approximations. Parameters: kernel_approx (str): Kernel approximation method ('nystrom' or 'rbfsampler'). n_components (int or float): Number of approximation components. If integer-like and >= 1, treated as a fixed component count. If float in (0, 1], treated as the sample fraction with a floor of 10. kernel (str or callable): Kernel function or string identifier. gamma (str or float): Length scale for the kernel. degree (int): Degree for polynomial kernels. coef0 (float): Zero coefficient for polynomial kernels. delta_scale (str or float): Scale of the critical radius. delta_exp (str or float): Exponent of the critical radius. alpha_scales (str or array-like): Scale of the regularization parameter. n_alphas (int): Number of alpha scales to try. cv (int): Number of folds for cross-validation. kernel_params (dict): Additional parameters for the kernel. """ def __init__(self, kernel_approx='nystrom', n_components=10, kernel='rbf', gamma=2, degree=3, coef0=1, kernel_params=None, delta_scale='auto', delta_exp='auto', alpha_scales='auto', n_alphas=30, cv=6): self.kernel_approx = kernel_approx self.n_components = n_components self.kernel = kernel self.degree = degree self.coef0 = coef0 self.gamma = gamma self.kernel_params = kernel_params self.delta_scale = delta_scale self.delta_exp = delta_exp self.alpha_scales = alpha_scales self.n_alphas = n_alphas self.cv = cv def fit(self, Z, T, Y): """ Fit the approximate RKHS IV estimator with cross-validation. Parameters: Z (array-like): Instrumental variables. T (array-like): Treatments. Y (array-like): Outcomes. Returns: self: Fitted estimator. """ Y = _to_column_vector(Y) n = Y.shape[0] self.featZ = self._get_new_approx_instance(n_samples=n) RootKf = self.featZ.fit_transform(Z) self.featT = self._get_new_approx_instance(n_samples=n) RootKh = self.featT.fit_transform(T) alpha_scales = self._get_alpha_scales() n_train = n * (self.cv - 1) / self.cv n_test = n / self.cv delta_train = self._get_delta(n_train) delta_test = self._get_delta(n_test) delta = self._get_delta(n) scores = [] for it, (train, test) in enumerate(KFold(n_splits=self.cv).split(Z)): RootKf_train, RootKf_test = RootKf[train], RootKf[test] RootKh_train, RootKh_test = RootKh[train], RootKh[test] n_feat_f_train = RootKf_train.shape[1] n_feat_f_test = RootKf_test.shape[1] n_feat_h_train = RootKh_train.shape[1] Q_train = np.linalg.pinv( RootKf_train.T @ RootKf_train / (2 * n_train * (delta_train**2)) + np.eye(n_feat_f_train) / 2) Q_test = np.linalg.pinv( RootKf_test.T @ RootKf_test / (2 * n_test * (delta_test**2)) + np.eye(n_feat_f_test) / 2) A_train = RootKh_train.T @ RootKf_train AQA_train = A_train @ Q_train @ A_train.T B_train = A_train @ Q_train @ RootKf_train.T @ Y[train] scores.append([]) for alpha_scale in alpha_scales: alpha = self._get_alpha(delta_train, alpha_scale) a = np.linalg.pinv(AQA_train + alpha * np.eye(n_feat_h_train)) @ B_train res = RootKf_test.T @ (Y[test] - RootKh_test @ a) scores[it].append(_to_scalar(res.T @ Q_test @ res) / (len(test)**2)) self.alpha_scales = alpha_scales self.avg_scores = np.mean(np.array(scores), axis=0) self.best_alpha_scale = alpha_scales[np.argmin(self.avg_scores)] delta = self._get_delta(n) self.best_alpha = self._get_alpha(delta, self.best_alpha_scale) n_feat_f = RootKf.shape[1] n_feat_h = RootKh.shape[1] Q = np.linalg.pinv(RootKf.T @ RootKf / (2 * n * delta**2) + np.eye(n_feat_f) / 2) A = RootKh.T @ RootKf W = (A @ Q @ A.T + self.best_alpha * np.eye(n_feat_h)) B = A @ Q @ RootKf.T @ Y self.a = np.linalg.pinv(W) @ B self.fitted_delta = delta return self
[docs]class ApproxRKHSIVL2(ApproxRKHSIV): """ Approximate RKHS IV estimator with L2 regularization. This class mirrors ``RKHSIVL2`` using finite kernel feature approximations. """ def fit(self, Z, T, Y): """ Fit the approximate RKHS IV L2 estimator. Parameters: Z (array-like): Instrumental variables. T (array-like): Treatments. Y (array-like): Outcomes. Returns: self: Fitted estimator. """ Y = _to_column_vector(Y) n = Y.shape[0] delta = self._get_delta(n) alpha = delta**4 self.featZ = self._get_new_approx_instance(n_samples=n) RootKf = self.featZ.fit_transform(Z) self.featT = self._get_new_approx_instance(n_samples=n) RootKh = self.featT.fit_transform(T) self.RootKh_train_ = RootKh Kf = RootKf @ RootKf.T Kh = RootKh @ RootKh.T M = np.linalg.pinv(Kf) @ Kf self.a = np.linalg.pinv(Kh @ M @ Kh + alpha * Kh @ Kh) @ Kh @ M @ Y self.fitted_delta = delta return self def predict(self, T): """ Predict outcomes for new treatments. Parameters: T (array-like): New treatments. Returns: array-like: Predicted outcomes. """ RootKh_test = self.featT.transform(T) return RootKh_test @ self.RootKh_train_.T @ self.a def score(self, Z, T, Y, delta='auto'): """ Compute the L2 score of the fitted estimator. Parameters: Z (array-like): Instrumental variables. T (array-like): Treatments. Y (array-like): Outcomes. delta (str or float): Kept for API compatibility. Returns: float: Score. """ _ = delta Y = _to_column_vector(Y) n = Y.shape[0] featZ = self._get_new_approx_instance(n_samples=n) RootKf = featZ.fit_transform(Z) Kf = RootKf @ RootKf.T M = np.linalg.pinv(Kf) @ Kf Y_pred = self.predict(T) return _to_scalar((Y - Y_pred).T @ M @ (Y - Y_pred)) / n**2
[docs]class ApproxRKHSIVL2CV(ApproxRKHSIVL2): """ Approximate RKHS IV L2 estimator with cross-validation. """ def __init__(self, kernel_approx='nystrom', n_components=10, kernel='rbf', gamma=2, degree=3, coef0=1, kernel_params=None, delta_scale='auto', delta_exp='auto', alpha_scales='auto', n_alphas=30, cv=6): self.kernel_approx = kernel_approx self.n_components = n_components self.kernel = kernel self.degree = degree self.coef0 = coef0 self.gamma = gamma self.kernel_params = kernel_params self.delta_scale = delta_scale self.delta_exp = delta_exp self.alpha_scales = alpha_scales self.n_alphas = n_alphas self.cv = cv def fit(self, Z, T, Y): """ Fit the approximate RKHS IV L2 estimator with cross-validation. Parameters: Z (array-like): Instrumental variables. T (array-like): Treatments. Y (array-like): Outcomes. Returns: self: Fitted estimator. """ Y = _to_column_vector(Y) n = Y.shape[0] self.featZ = self._get_new_approx_instance(n_samples=n) RootKf = self.featZ.fit_transform(Z) self.featT = self._get_new_approx_instance(n_samples=n) RootKh = self.featT.fit_transform(T) alpha_scales = self._get_alpha_scales() n_train = n * (self.cv - 1) / self.cv n_test = n / self.cv delta_train = self._get_delta(n_train) delta = self._get_delta(n) scores = [] for it, (train, test) in enumerate(KFold(n_splits=self.cv).split(Z)): RootKf_train, RootKf_test = RootKf[train], RootKf[test] RootKh_train, RootKh_test = RootKh[train], RootKh[test] Kf_train = RootKf_train @ RootKf_train.T Kf_test = RootKf_test @ RootKf_test.T Kh_train = RootKh_train @ RootKh_train.T Kh_test_train = RootKh_test @ RootKh_train.T M_train = np.linalg.pinv(Kf_train) @ Kf_train M_test = np.linalg.pinv(Kf_test) @ Kf_test KMK_train = Kh_train @ M_train @ Kh_train B_train = Kh_train @ M_train @ Y[train] scores.append([]) for alpha_scale in alpha_scales: alpha = float(alpha_scale) * (delta_train**4) a = np.linalg.pinv(KMK_train + alpha * Kh_train @ Kh_train) @ B_train res = Y[test] - Kh_test_train @ a scores[it].append(_to_scalar(res.T @ M_test @ res) / (res.shape[0]**2)) self.alpha_scales = alpha_scales self.avg_scores = np.mean(np.array(scores), axis=0) self.best_alpha_scale = alpha_scales[np.argmin(self.avg_scores)] self.best_alpha = self.best_alpha_scale * (delta**4) Kf = RootKf @ RootKf.T Kh = RootKh @ RootKh.T M = np.linalg.pinv(Kf) @ Kf self.a = np.linalg.pinv( Kh @ M @ Kh + self.best_alpha * Kh @ Kh ) @ Kh @ M @ Y self.RootKh_train_ = RootKh self.fitted_delta = delta return self