"""
This module provides implementations of nested nonparametric instrumental variable (NPIV) estimators using ensemble RandomForest models.
Classes:
Ensemble2IV: Implements a nested ensemble learning IV method with two adversaries and two learners.
Ensemble2IVL2: An extension of Ensemble2IV with L2 regularization and optional cross-validation for regularization parameter selection.
Functions:
_mysign: A helper function that returns 2 if the input is non-negative and -1 otherwise.
"""
# Licensed under the MIT License.
import numpy as np
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.base import clone
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
def _mysign(x):
return 2 * (x >= 0) - 1
[docs]class Ensemble2IV:
"""
Implements a nested ensemble learning IV method with two adversaries and two learners.
Parameters:
adversary (str or estimator): Adversary model. If 'auto', a default RandomForestRegressor is used.
learnerg (str or estimator): Learner model for g. If 'auto', a default RandomForestClassifier is used.
learnerh (str or estimator): Learner model for h. If 'auto', a default RandomForestClassifier is used.
max_abs_value (float): Maximum absolute value for the predictions.
n_iter (int): Number of iterations for the ensemble.
n_burn_in (int): Number of burn-in iterations.
"""
def __init__(self, adversary='auto', learnerg='auto', learnerh='auto',
max_abs_value=4, n_iter=100, n_burn_in=10):
self.adversary = adversary
self.learnerg = learnerg
self.learnerh = learnerh
self.max_abs_value = max_abs_value
self.n_iter = n_iter
self.n_burn_in = n_burn_in
return
def _check_input(self, A, B, C, D, Y, W):
if len(A.shape) == 1:
A = A.reshape(-1, 1)
if len(B.shape) == 1:
B = B.reshape(-1, 1)
if len(C.shape) == 1:
C = C.reshape(-1, 1)
if len(D.shape) == 1:
D = D.reshape(-1, 1)
return A, B, C, D, Y.flatten(), W.flatten()
def _get_new_adversary(self):
return RandomForestRegressor(n_estimators=40, max_depth=2,
bootstrap=True, min_samples_leaf=40, min_impurity_decrease=0.001) if self.adversary == 'auto' else clone(self.adversary)
def _get_new_learnerg(self):
return RandomForestClassifier(n_estimators=5, max_depth=2, criterion='gini',
bootstrap=False, min_samples_leaf=40, min_impurity_decrease=0.001) if self.learnerg == 'auto' else clone(self.learnerg)
def _get_new_learnerh(self):
return RandomForestClassifier(n_estimators=5, max_depth=2, criterion='gini',
bootstrap=False, min_samples_leaf=40, min_impurity_decrease=0.001) if self.learnerh == 'auto' else clone(self.learnerh)
def fit(self, A, B, C, D, Y, W=None, subsetted=False, subset_ind1=None, subset_ind2=None):
"""
Fits the nested ensemble IV model to the provided data.
Parameters:
A (array-like): Instrumental variables for the first stage.
B (array-like): Instrumental variables for the second stage.
C (array-like): Treatment variables for the first stage.
D (array-like): Treatment variables for the second stage.
Y (array-like): Outcome variables.
W (array-like, optional): Weights for the observations.
subsetted (bool): If True, use subsets of data as indicated by subset_ind1 and subset_ind2.
subset_ind1 (array-like): Indices for the first subset.
subset_ind2 (array-like): Indices for the second subset.
Returns:
self: Fitted nested ensemble IV model.
"""
W = np.ones(Y.shape) if W is None else W
A, B, C, D, Y, W = self._check_input(A, B, C, D, Y, W)
if subsetted:
if subset_ind1 is None:
raise ValueError("subset_ind1 must be provided when subsetted is True")
if len(subset_ind1) != len(Y):
raise ValueError("subset_ind1 must have the same length as Y")
subset_ind1 = subset_ind1.flatten()
subset_ind2 = subset_ind2.flatten() if subset_ind2 is not None else 1 - subset_ind1
max_value = self.max_abs_value
adversary1 = self._get_new_adversary().fit(D, -Y.flatten())
adversary2 = self._get_new_adversary().fit(C, -Y.flatten())
learnersg = []
learnersh = []
h = 0
g = 0
for it in range(self.n_iter + self.n_burn_in):
v = -adversary2.predict(C).flatten()* W if not subsetted else -adversary2.predict(C).flatten()* W * subset_ind2
v_ = -adversary1.predict(D).flatten() - v if not subsetted else -adversary1.predict(D).flatten() * subset_ind1 - v
aug_A = np.vstack([np.zeros((2, A.shape[1])), A])
aug_B = np.vstack([np.zeros((2, B.shape[1])), B])
lbl_v = np.concatenate(([-1, 1], _mysign(v)))
lbl_v_ = np.concatenate(([-1, 1], _mysign(v_)))
wt_v = np.concatenate(([0, 0], np.abs(v)))
wt_v_ = np.concatenate(([0, 0], np.abs(v_)))
learnersg.append(self._get_new_learnerg().fit(
aug_A, lbl_v_, sample_weight=wt_v_))
learnersh.append(self._get_new_learnerh().fit(
aug_B, lbl_v, sample_weight=wt_v))
g = g * it / (it + 1)
h = h * it / (it + 1)
g += max_value * _mysign(learnersg[it].predict_proba(A)[
:, -1] * learnersg[it].classes_[-1] - 1 / 2) / (it + 1)
h += max_value * _mysign(learnersh[it].predict_proba(B)[
:, -1] * learnersh[it].classes_[-1] - 1 / 2) / (it + 1)
adversary2.fit(C, h - g*W)
adversary1.fit(D, g - Y)
self.learnersg = learnersg[self.n_burn_in:]
self.learnersh = learnersh[self.n_burn_in:]
return self
def predict(self, B, *args):
"""
Predicts outcomes for new data using the fitted nested ensemble IV model.
Parameters:
B (array-like): Instrumental variables for the second stage.
args (tuple): Optional second argument for instrumental variables of the first stage.
Returns:
array: Predicted outcomes for the second stage.
If a second argument is provided, returns a tuple with predictions for both stages.
"""
if len(args) == 0:
# Only B_test provided, return h prediction
return np.mean([self.max_abs_value * _mysign(l.predict_proba(B)
[:, -1] * l.classes_[-1] - 1 / 2) for l in self.learnersh], axis=0)
elif len(args) == 1:
# Two arguments provided, assume the second is A_test
A = args[0]
pred_h = np.mean([self.max_abs_value * _mysign(l.predict_proba(B)
[:, -1] * l.classes_[-1] - 1 / 2) for l in self.learnersh], axis=0)
pred_g = np.mean([self.max_abs_value * _mysign(l.predict_proba(A)
[:, -1] * l.classes_[-1] - 1 / 2) for l in self.learnersg], axis=0)
return pred_h, pred_g
else:
# More than one additional argument provided, raise an error
raise ValueError("predict expects at most two arguments, B_test and optionally A_test")
[docs]class Ensemble2IVL2:
"""
An extension of Ensemble2IV with L2 regularization and optional cross-validation to select the best regularization parameter.
Parameters:
adversary (str or estimator): Adversary model. If 'auto', a default RandomForestRegressor is used.
learnerg (str or estimator): Learner model for g. If 'auto', a default RandomForestRegressor is used.
learnerh (str or estimator): Learner model for h. If 'auto', a default RandomForestRegressor is used.
n_iter (int): Number of iterations for the ensemble.
n_burn_in (int): Number of burn-in iterations.
delta_scale (str or float): Scale factor for the critical radius delta. Default is 'auto'.
delta_exp (str or float): Exponent for the critical radius delta. Default is 'auto'.
CV (bool): Whether to perform cross-validation to select the best alpha value.
alpha_scales (str or list): Scales for alpha in cross-validation. Default is 'auto'.
n_alphas (int): Number of alpha values to test in cross-validation.
n_folds (int): Number of folds for cross-validation.
"""
def __init__(self, adversary='auto', learnerg='auto', learnerh='auto',
n_iter=100, n_burn_in=10, delta_scale='auto', delta_exp='auto', CV=False,
alpha_scales='auto', n_alphas=30, n_folds=5):
self.adversary = adversary
self.learnerg = learnerg
self.learnerh = learnerh
self.n_iter = n_iter
self.n_burn_in = n_burn_in
self.delta_scale = delta_scale
self.delta_exp = delta_exp
self.CV = CV
self.alpha_scales = alpha_scales
self.n_alphas = n_alphas
self.n_folds = n_folds
return
def _get_delta(self, n):
'''
Computes the critical radius delta based on the sample size.
Parameters:
n (int): Sample size.
Returns:
float: Critical radius delta.
'''
delta_scale = 5 if self.delta_scale == 'auto' else self.delta_scale
delta_exp = .4 if self.delta_exp == 'auto' else self.delta_exp
return delta_scale / (n**(delta_exp))
def _get_alpha_scales(self):
return ([c for c in np.geomspace(0.1, 1e4, self.n_alphas)]
if self.alpha_scales == 'auto' else self.alpha_scales)
def _check_input(self, A, B, C, D, Y, W):
if len(A.shape) == 1:
A = A.reshape(-1, 1)
if len(B.shape) == 1:
B = B.reshape(-1, 1)
if len(C.shape) == 1:
C = C.reshape(-1, 1)
if len(D.shape) == 1:
D = D.reshape(-1, 1)
return A, B, C, D, Y.flatten(), W.flatten()
def _get_new_adversary(self):
return RandomForestRegressor(n_estimators=40, max_depth=2,
bootstrap=True, min_samples_leaf=40, min_impurity_decrease=0.001) if self.adversary == 'auto' else clone(self.adversary)
def _get_new_learnerg(self):
return RandomForestRegressor(n_estimators=40, max_depth=2,
bootstrap=True, min_samples_leaf=40, min_impurity_decrease=0.001) if self.learnerg == 'auto' else clone(self.learnerg)
def _get_new_learnerh(self):
return RandomForestRegressor(n_estimators=40, max_depth=2,
bootstrap=True, min_samples_leaf=40, min_impurity_decrease=0.001) if self.learnerh == 'auto' else clone(self.learnerh)
def _cross_validate_alpha(self, A, B, C, D, Y, W):
"""
Performs cross-validation to select the best alpha value.
Parameters:
A (array-like): Instrumental variables for the first stage.
B (array-like): Instrumental variables for the second stage.
C (array-like): Treatment variables for the first stage.
D (array-like): Treatment variables for the second stage.
Y (array-like): Outcome variables.
W (array-like): Weights for the observations.
Returns:
float: Best alpha value.
"""
alpha_scales = self._get_alpha_scales()
best_alpha = None
best_score = float('inf')
kf = KFold(n_splits=self.n_folds, shuffle=True, random_state=42)
for alpha in alpha_scales:
scores = []
for train_index, test_index in kf.split(Y):
A_train, A_test = A[train_index], A[test_index]
B_train, B_test = B[train_index], B[test_index]
C_train, C_test = C[train_index], C[test_index]
D_train, D_test = D[train_index], D[test_index]
Y_train, Y_test = Y[train_index], Y[test_index]
W_train, W_test = W[train_index], W[test_index]
self.fit(A_train, B_train, C_train, D_train, Y_train, W=W_train, alpha=alpha)
predictionB, predictionA = self.predict(B_test, A_test)
score = mean_squared_error(Y_test, predictionA) + mean_squared_error(predictionA*W_test, predictionB)
scores.append(score)
avg_score = np.mean(scores)
if avg_score < best_score:
best_score = avg_score
best_alpha = alpha
return best_alpha
def fit(self, A, B, C, D, Y, W=None, alpha=1.0, cross_validating=False, subsetted=False, subset_ind1=None, subset_ind2=None):
"""
Fits the nested ensemble IV model with L2 regularization to the provided data.
Parameters:
A (array-like): Instrumental variables for the first stage.
B (array-like): Instrumental variables for the second stage.
C (array-like): Treatment variables for the first stage.
D (array-like): Treatment variables for the second stage.
Y (array-like): Outcome variables.
W (array-like, optional): Weights for the observations.
alpha (float): Regularization parameter.
cross_validating (bool): Whether the function is called during cross-validation.
subsetted (bool): If True, use subsets of data as indicated by subset_ind1 and subset_ind2.
subset_ind1 (array-like): Indices for the first subset.
subset_ind2 (array-like): Indices for the second subset.
Returns:
self: Fitted nested ensemble IV model.
"""
W = np.ones(Y.shape) if W is None else W
if self.CV and not cross_validating:
alpha = self._cross_validate_alpha(A, B, C, D, Y, W)
A, B, C, D, Y, W = self._check_input(A, B, C, D, Y, W)
if subsetted:
if subset_ind1 is None:
raise ValueError("subset_ind1 must be provided when subsetted is True")
if len(subset_ind1) != len(Y):
raise ValueError("subset_ind1 must have the same length as Y")
subset_ind1 = subset_ind1.flatten()
subset_ind2 = subset_ind2.flatten() if subset_ind2 is not None else 1 - subset_ind1
n = Y.shape[0]
delta = self._get_delta(n)
adversary1 = self._get_new_adversary().fit(D, -Y.flatten())
adversary2 = self._get_new_adversary().fit(C, np.zeros(Y.shape))
learnersg = []
learnersh = []
h = 0
g = 0
for it in range(self.n_iter + self.n_burn_in):
if it < self.n_burn_in:
v = -adversary2.predict(C).flatten()* W if not subsetted else -adversary2.predict(C).flatten()*W * subset_ind2
v_ = -adversary1.predict(D).flatten() - v if not subsetted else -adversary1.predict(D).flatten() * subset_ind1 - v
else:
iter = it - self.n_burn_in
v = v * iter / (iter + 1)
v_ = v_ * iter / (iter + 1)
v += -adversary2.predict(C).flatten()* W / (alpha * delta ** 2) if not subsetted else -adversary2.predict(C).flatten()*W * subset_ind2 / (alpha * delta ** 2)
v_ += -adversary1.predict(D).flatten()/ (alpha * delta ** 2) - v if not subsetted else -adversary1.predict(D).flatten() * subset_ind1 / (alpha * delta ** 2) - v
learnersg.append(self._get_new_learnerg().fit(A, v_))
learnersh.append(self._get_new_learnerh().fit(B, v))
g = g * it / (it + 1)
h = h * it / (it + 1)
g += learnersg[it].predict(A).flatten() / (it + 1)
h += learnersh[it].predict(B).flatten() / (it + 1)
adversary2.fit(C, h - g*W)
adversary1.fit(D, g - Y)
self.learnersg = learnersg[self.n_burn_in:]
self.learnersh = learnersh[self.n_burn_in:]
return self
def predict(self, B, *args):
"""
Predicts outcomes for new data using the fitted nested ensemble IV model with L2 regularization.
Parameters:
B (array-like): Instrumental variables for the second stage.
args (tuple): Optional second argument for instrumental variables of the first stage.
Returns:
array: Predicted outcomes for the second stage.
If a second argument is provided, returns a tuple with predictions for both stages.
"""
if len(args) == 0:
# Only B_test provided, return h prediction
return np.mean([l.predict(B) for l in self.learnersh], axis=0)
elif len(args) == 1:
# Two arguments provided, assume the second is A_test
A = args[0]
return np.mean([l.predict(B) for l in self.learnersh], axis=0), np.mean([l.predict(A) for l in self.learnersg], axis=0)
else:
# More than one additional argument provided, raise an error
raise ValueError("predict expects at most two arguments, B_test and optionally A_test")