# =========================================================
# Helper builders
# =========================================================
def get_learner(n_t: int) -> nn.Module:
"""Small MLP for the learner network."""
return nn.Sequential(
nn.Dropout(p=p), nn.Linear(n_t, n_hidden), nn.LeakyReLU(),
nn.Dropout(p=p), nn.Linear(n_hidden, 1)
)
def get_adversary(n_z: int) -> nn.Module:
"""Small MLP for the adversary network."""
return nn.Sequential(
nn.Dropout(p=p), nn.Linear(n_z, n_hidden), nn.LeakyReLU(),
nn.Dropout(p=p), nn.Linear(n_hidden, 1)
)
def make_test_grid(X: np.ndarray, var_idx: int = 0, n: int = 1000,
q_low: float = 5.0, q_high: float = 95.0) -> np.ndarray:
"""
Build a grid by sweeping one column between quantiles, fixing others at their median.
"""
grid = np.tile(np.median(X, axis=0, keepdims=True), (n, 1))
grid[:, var_idx] = np.linspace(np.percentile(X[:, var_idx], q_low),
np.percentile(X[:, var_idx], q_high), n)
return grid[np.argsort(grid[:, var_idx])]
# =========================================================
# Data generation
# =========================================================
# Function dictionary (for reference):
# {'abs': 0, '2dpoly': 1, 'sigmoid': 2, 'sin': 3, 'frequent_sin': 4, 'abs_sqrt': 5,
# 'step': 6, '3dpoly': 7, 'linear': 8, 'rand_pw': 9, 'abspos': 10, 'sqrpos': 11,
# 'band': 12, 'invband': 13, 'steplinear': 14, 'pwlinear': 15, 'exponential': 16}
fn_number = 0
tau_fn = dgps.get_tau_fn(fn_number)
# A, D are first stage (endog + instruments); B, C are second stage; Y is outcome
A, D, B, C, Y, tau_fn = dgps.get_data(3000, 10, 10, tau_fn, 2)
# Build test grids for plotting
B_test = make_test_grid(B, var_idx=0, n=1000, q_low=5, q_high=95)
A_test = make_test_grid(A, var_idx=0, n=1000, q_low=5, q_high=95)
true_fn_B = tau_fn(B_test) # true structural function in terms of B
true_fn_A = A_test[:, 0] # for first stage visualization (if desired)
# =========================================================
# Estimation routines
# =========================================================
def nested_npivfit_sequential(
A: np.ndarray, B: np.ndarray, C: np.ndarray, D: np.ndarray, Y: np.ndarray,
B_test: np.ndarray, A_test: np.ndarray,
model1: AGMM, model2: AGMM,
fitargs: dict | None = None
):
"""
Two-step nested NPIV (sequential):
1) Fit bridge for g: A <- D (predict g(A))
2) Fit bridge for h: B <- C (target = g(A))
Returns predictions on B_test and A_test.
"""
fitargs = fitargs or {}
# To torch on DEVICE once
A_t = torch.as_tensor(A, dtype=torch.float32, device=DEVICE)
B_t = torch.as_tensor(B, dtype=torch.float32, device=DEVICE)
C_t = torch.as_tensor(C, dtype=torch.float32, device=DEVICE)
D_t = torch.as_tensor(D, dtype=torch.float32, device=DEVICE)
Y_t = torch.as_tensor(Y, dtype=torch.float32, device=DEVICE)
Btest_t = torch.as_tensor(B_test, dtype=torch.float32, device=DEVICE)
Atest_t = torch.as_tensor(A_test, dtype=torch.float32, device=DEVICE)
# First stage: g(A) using instruments D
model1.fit(D_t, A_t, Y_t, device=DEVICE, **fitargs)
# predict returns numpy on CPU; convert back to Tensor on DEVICE for stage 2
g_hat_np = model1.predict(A_t)
g_hat_t = torch.as_tensor(g_hat_np, dtype=torch.float32, device=DEVICE)
# Second stage: h(B) using instruments C, target = g_hat(A)
model2.fit(C_t, B_t, g_hat_t, device=DEVICE, **fitargs)
h_hat_Btest = model2.predict(Btest_t) # numpy
g_hat_Atest = model1.predict(Atest_t) # numpy
# Return 1D arrays for quick plotting
return h_hat_Btest.ravel(), g_hat_Atest.ravel()
def nested_npivfit_simultaneous(
A: np.ndarray, B: np.ndarray, C: np.ndarray, D: np.ndarray, Y: np.ndarray,
B_test: np.ndarray, A_test: np.ndarray,
model: AGMM2L2,
n_epochs: int = 350
):
"""
Joint (simultaneous) nested NPIV training via AGMM2L2.
Returns predictions h(B_test), g(A_test) as 1D arrays.
"""
# To torch on DEVICE once
A_t = torch.as_tensor(A, dtype=torch.float32, device=DEVICE)
B_t = torch.as_tensor(B, dtype=torch.float32, device=DEVICE)
C_t = torch.as_tensor(C, dtype=torch.float32, device=DEVICE)
D_t = torch.as_tensor(D, dtype=torch.float32, device=DEVICE)
Y_t = torch.as_tensor(Y, dtype=torch.float32, device=DEVICE)
Btest_t = torch.as_tensor(B_test, dtype=torch.float32, device=DEVICE)
Atest_t = torch.as_tensor(A_test, dtype=torch.float32, device=DEVICE)
model.fit(A_t, B_t, C_t, D_t, Y_t, n_epochs=n_epochs, device=DEVICE)
h_hat_Btest, g_hat_Atest = model.predict(Btest_t, Atest_t) # numpy outputs
return h_hat_Btest.ravel(), g_hat_Atest.ravel()