"""
This module provides implementations of ensemble instrumental variable (IV) estimators using RandomForest models.
Classes:
EnsembleIV: Implements an ensemble learning IV method with adversarial and learner components.
EnsembleIVStar: Similar to EnsembleIV but with a different method for updating the test predictions.
EnsembleIVL2: An extension of EnsembleIV with L2 regularization and optional cross-validation for regularization parameter selection.
Functions:
_mysign: A helper function that returns 2 if the input is non-negative and -1 otherwise.
"""
# Licensed under the MIT License.
import numpy as np
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.base import clone
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
def _mysign(x):
return 2 * (x >= 0) - 1
[docs]class EnsembleIV:
"""
Implements an ensemble learning IV method with adversarial and learner components.
Parameters:
adversary (str or estimator): Adversary model. If 'auto', a default RandomForestRegressor is used.
learner (str or estimator): Learner model. If 'auto', a default RandomForestClassifier is used.
max_abs_value (float): Maximum absolute value for the predictions.
n_iter (int): Number of iterations for the ensemble.
"""
def __init__(self, adversary='auto', learner='auto',
max_abs_value=4, n_iter=100):
self.adversary = adversary
self.learner = learner
self.max_abs_value = max_abs_value
self.n_iter = n_iter
return
def _check_input(self, Z, T, Y):
if len(T.shape) == 1:
T = T.reshape(-1, 1)
if len(Z.shape) == 1:
Z = Z.reshape(-1, 1)
return Z, T, Y.flatten()
def _get_new_adversary(self):
return RandomForestRegressor(n_estimators=40, max_depth=2,
bootstrap=True, min_samples_leaf=40, min_impurity_decrease=0.001) if self.adversary == 'auto' else clone(self.adversary)
def _get_new_learner(self):
return RandomForestClassifier(n_estimators=5, max_depth=2, criterion='gini',
bootstrap=False, min_samples_leaf=40, min_impurity_decrease=0.001) if self.learner == 'auto' else clone(self.learner)
def fit(self, Z, T, Y):
"""
Fits the ensemble IV model to the provided data.
Parameters:
Z (array-like): Instrumental variables.
T (array-like): Treatment variables.
Y (array-like): Outcome variables.
Returns:
self: Fitted ensemble IV model.
"""
Z, T, Y = self._check_input(Z, T, Y)
max_value = self.max_abs_value
adversary = self._get_new_adversary().fit(Z, Y.flatten())
learners = []
h = 0
for it in range(self.n_iter):
test = adversary.predict(Z).flatten()
aug_T = np.vstack([np.zeros((2, T.shape[1])), T])
aug_label = np.concatenate(([-1, 1], _mysign(test)))
aug_weights = np.concatenate(([0, 0], np.abs(test)))
learners.append(self._get_new_learner().fit(
aug_T, aug_label, sample_weight=aug_weights))
h = h * it / (it + 1)
h += max_value * _mysign(learners[it].predict_proba(T)[
:, -1] * learners[it].classes_[-1] - 1 / 2) / (it + 1)
adversary.fit(Z, Y - h)
self.learners = learners
return self
def predict(self, T):
"""
Predicts outcomes for new data using the fitted ensemble IV model.
Parameters:
T (array-like): Treatment variables.
Returns:
array: Predicted outcomes.
"""
return np.mean([self.max_abs_value * _mysign(l.predict_proba(T)
[:, -1] * l.classes_[-1] - 1 / 2) for l in self.learners], axis=0)
[docs]class EnsembleIVStar:
"""
Similar to EnsembleIV but with a different method for updating the test predictions using a linear combination approach.
Parameters:
adversary (str or estimator): Adversary model. If 'auto', a default RandomForestRegressor is used.
learner (str or estimator): Learner model. If 'auto', a default RandomForestClassifier is used.
max_abs_value (float): Maximum absolute value for the predictions.
n_iter (int): Number of iterations for the ensemble.
"""
def __init__(self, adversary='auto', learner='auto',
max_abs_value=4, n_iter=100):
self.adversary = adversary
self.learner = learner
self.max_abs_value = max_abs_value
self.n_iter = n_iter
return
def _check_input(self, Z, T, Y):
if len(T.shape) == 1:
T = T.reshape(-1, 1)
if len(Z.shape) == 1:
Z = Z.reshape(-1, 1)
return Z, T, Y.flatten()
def _get_new_adversary(self):
return RandomForestRegressor(n_estimators=5, max_depth=2,
bootstrap=False, min_samples_leaf=40, min_impurity_decrease=0.0001) if self.adversary == 'auto' else clone(self.adversary)
def _get_new_learner(self):
return RandomForestClassifier(n_estimators=5, max_depth=2, criterion='gini',
bootstrap=False, min_samples_leaf=40, min_impurity_decrease=0.001) if self.learner == 'auto' else clone(self.learner)
def _update_test(self, Z, Y, pred_old, adv):
best_loss = np.mean((Y - pred_old)**2)
pred_new = pred_old.copy()
for gamma in np.linspace(.1, .9, 5):
adv.fit(Z, Y - gamma * pred_old)
pred = adv.predict(Z).flatten()
loss = np.mean(
(Y - gamma * pred_old - pred)**2)
if loss <= best_loss:
pred_new = gamma * pred_old + pred
best_loss = loss
return pred_new
def fit(self, Z, T, Y):
"""
Fits the ensemble IV model to the provided data.
Parameters:
Z (array-like): Instrumental variables.
T (array-like): Treatment variables.
Y (array-like): Outcome variables.
Returns:
self: Fitted ensemble IV model.
"""
Z, T, Y = self._check_input(Z, T, Y)
max_value = self.max_abs_value
adversary = self._get_new_adversary()
test = np.zeros(Z.shape[0])
h = 0
learners = []
for it in range(self.n_iter):
test = self._update_test(Z, Y - h, test, adversary)
aug_T = np.vstack([np.zeros((2, T.shape[1])), T])
aug_label = np.concatenate(([-1, 1], _mysign(test)))
aug_weights = np.concatenate(([0, 0], np.abs(test)))
learners.append(self._get_new_learner().fit(
aug_T, aug_label, sample_weight=aug_weights))
h = h * it / (it + 1)
h += max_value * _mysign(learners[it].predict_proba(T)[
:, -1] * learners[it].classes_[-1] - 1 / 2) / (it + 1)
self.learners = learners
return self
def predict(self, T):
"""
Predicts outcomes for new data using the fitted ensemble IV model.
Parameters:
T (array-like): Treatment variables.
Returns:
array: Predicted outcomes.
"""
return np.mean([self.max_abs_value * _mysign(l.predict_proba(T)
[:, -1] * l.classes_[-1] - 1 / 2) for l in self.learners], axis=0)
[docs]class EnsembleIVL2:
"""
An extension of EnsembleIV with L2 regularization and optional cross-validation to select the best regularization parameter.
Parameters:
adversary (str or estimator): Adversary model. If 'auto', a default RandomForestRegressor is used.
learner (str or estimator): Learner model. If 'auto', a default RandomForestRegressor is used.
n_iter (int): Number of iterations for the ensemble.
delta_scale (str or float): Scale factor for the critical radius delta. Default is 'auto'.
delta_exp (str or float): Exponent for the critical radius delta. Default is 'auto'.
CV (bool): Whether to perform cross-validation to select the best alpha value.
alpha_scales (str or list): Scales for alpha in cross-validation. Default is 'auto'.
n_alphas (int): Number of alpha values to test in cross-validation.
n_folds (int): Number of folds for cross-validation.
"""
def __init__(self, adversary='auto', learner='auto',
n_iter=100, delta_scale='auto', delta_exp='auto', CV=False,
alpha_scales='auto', n_alphas=30, n_folds=5):
self.adversary = adversary
self.learner = learner
self.n_iter = n_iter
self.delta_scale = delta_scale
self.delta_exp = delta_exp
self.CV = CV
self.alpha_scales = alpha_scales
self.n_alphas = n_alphas
self.n_folds = n_folds
return
def _get_delta(self, n):
'''
Computes the critical radius delta based on the sample size.
Parameters:
n (int): Sample size.
Returns:
float: Critical radius delta.
'''
delta_scale = 5 if self.delta_scale == 'auto' else self.delta_scale
delta_exp = .4 if self.delta_exp == 'auto' else self.delta_exp
return delta_scale / (n**(delta_exp))
def _get_alpha_scales(self):
return ([c for c in np.geomspace(0.1, 1e4, self.n_alphas)]
if self.alpha_scales == 'auto' else self.alpha_scales)
def _check_input(self, Z, T, Y):
if len(T.shape) == 1:
T = T.reshape(-1, 1)
if len(Z.shape) == 1:
Z = Z.reshape(-1, 1)
return Z, T, Y.flatten()
def _get_new_adversary(self):
return RandomForestRegressor(n_estimators=40, max_depth=2,
bootstrap=True, min_samples_leaf=40, min_impurity_decrease=0.001) if self.adversary == 'auto' else clone(self.adversary)
def _get_new_learner(self):
return RandomForestRegressor(n_estimators=40, max_depth=2,
bootstrap=True, min_samples_leaf=40, min_impurity_decrease=0.001) if self.learner == 'auto' else clone(self.learner)
def _cross_validate_alpha(self, Z, T, Y):
"""
Performs cross-validation to select the best alpha value.
Parameters:
Z (array-like): Instrumental variables.
T (array-like): Treatment variables.
Y (array-like): Outcome variables.
Returns:
float: Best alpha value.
"""
alpha_scales = self._get_alpha_scales()
best_alpha = None
best_score = float('inf')
kf = KFold(n_splits=self.n_folds, shuffle=True, random_state=42)
for alpha in alpha_scales:
scores = []
for train_index, test_index in kf.split(Z):
Z_train, Z_test = Z[train_index], Z[test_index]
T_train, T_test = T[train_index], T[test_index]
Y_train, Y_test = Y[train_index], Y[test_index]
self.fit(Z_train, T_train, Y_train, alpha=alpha)
predictions = self.predict(T_test)
score = mean_squared_error(Y_test, predictions)
scores.append(score)
avg_score = np.mean(scores)
if avg_score < best_score:
best_score = avg_score
best_alpha = alpha
return best_alpha
def fit(self, Z, T, Y, alpha=1.0, cross_validating=False):
"""
Fits the ensemble IV model with L2 regularization to the provided data.
Parameters:
Z (array-like): Instrumental variables.
T (array-like): Treatment variables.
Y (array-like): Outcome variables.
alpha (float): Regularization parameter.
cross_validating (bool): Whether the function is called during cross-validation.
Returns:
self: Fitted ensemble IV model.
"""
if self.CV and not cross_validating:
alpha = self._cross_validate_alpha(Z, T, Y)
Z, T, Y = self._check_input(Z, T, Y)
n = Y.shape[0]
delta = self._get_delta(n)
adversary = []
adversary.append(self._get_new_adversary().fit(Z, Y.flatten()))
f = 0
learners = []
h = 0
for it in range(self.n_iter):
f = f * it / (it + 1)
f += adversary[it].predict(Z).flatten() / ((alpha * delta ** 2) * (it + 1))
learners.append(self._get_new_learner().fit(T, f))
h = h * it / (it + 1)
h += learners[it].predict(T).flatten() / (it + 1)
adversary.append(self._get_new_adversary().fit(Z, Y - h))
self.learners = learners
return self
def predict(self, T):
"""
Predicts outcomes for new data using the fitted ensemble IV model with L2 regularization.
Parameters:
T (array-like): Treatment variables.
Returns:
array: Predicted outcomes.
"""
return np.mean([l.predict(T) for l in self.learners], axis=0)