-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_demo_node.py
110 lines (92 loc) · 3.03 KB
/
test_demo_node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import numpy as np
import pymc as pm
import pytensor.tensor as pt
import demo_node
from pytensor_federated.wrapper_ops import LogpGradOp
class TestLinearModel:
def test_grad(self):
lm = demo_node.LinearModelBlackbox(
data_x=[-1, 0, 1],
data_y=[1, 1, 1],
sigma=1,
)
# Perfect fit
np.testing.assert_array_equal(
lm(1, 0)[1],
[0, 0],
)
# Intercept too high
np.testing.assert_almost_equal(
lm(1.1, 0)[1],
[-0.3, 0],
)
pass
def test_linear_model_equivalence():
x = np.array([1, 2, 3], dtype="float64")
y = np.array([2, 3, 1], dtype="float64")
sigma = 0.2
# Build a log-probability graph of a linear model
intercept = pt.scalar()
slope = pt.scalar()
mu = intercept + slope * x
pred = pm.Normal.dist(mu, sigma)
logp_factors = pm.logprob.conditional_logp({pred: pt.as_tensor(y)}, sum=True)
L_model = pt.sum([pt.sum(factor) for factor in logp_factors.values()])
# Build the same log-probability using the blackbox Op
lmb = demo_node.LinearModelBlackbox(x, y, sigma)
lmbop = LogpGradOp(lmb)
L_federated = lmbop(intercept, slope)[0]
# Compare both
test_point = {
intercept: 1.2,
slope: 0.8,
}
np.testing.assert_array_equal(
L_model.eval(test_point),
L_federated.eval(test_point),
)
# And now the gradient
dL_model = pt.grad(L_model, [intercept, slope])
dL_federated = pt.grad(L_federated, [intercept, slope])
for dM, dF in zip(dL_model, dL_federated):
np.testing.assert_array_equal(
dM.eval(test_point),
dF.eval(test_point),
)
pass
def test_linear_model_logp_dlogp_findmap():
x = np.array([1, 2, 3])
y = np.array([2, 3, 1])
sigma = 0.2
# Build a standard linear model
with pm.Model() as pmodel:
intercept = pm.Normal("intercept")
slope = pm.Normal("slope")
mu = intercept + slope * x
pm.Normal("L", mu, sigma, observed=y)
P1 = pm.compile_fn(pmodel.logp())
dP1 = pm.compile_fn(pmodel.dlogp())
map_1 = pm.find_MAP()
# Build the same model using the blackbox Op and a Potential
lmb = demo_node.LinearModelBlackbox(x, y, sigma)
lmbop = LogpGradOp(lmb)
with pm.Model() as pmodel:
intercept = pm.Normal("intercept")
slope = pm.Normal("slope")
logp, *_ = lmbop(intercept, slope)
pm.Potential("L", logp)
P2 = pm.compile_fn(pmodel.logp())
dP2 = pm.compile_fn(pmodel.dlogp())
map_2 = pm.find_MAP()
# Compare the model's logp values
test_point = dict(intercept=0.5, slope=1.2)
assert P1(test_point) == P2(test_point)
# And their gradients
grad1 = dP1(test_point)
grad2 = dP2(test_point)
for dp1, dp2 in zip(grad1, grad2):
assert dp1 == dp2
# And also the MAP estimates
for vname in ["intercept", "slope"]:
np.testing.assert_almost_equal(map_1[vname], map_2[vname])
pass