Source code for alex.ml.bn.test_factor

#!/usr/bin/env python
# -*- coding: utf8 -*-

# pylint: disable=W0212,C0111,C0103

import unittest
import copy
import operator

import numpy as np

if __name__ == '__main__':
    import autopath
from alex.ml.bn.factor import Factor, to_log, from_log, logsubexp


[docs]class TestFactor(unittest.TestCase):
[docs] def test_apply_op_different(self): f = Factor(['A'], {'A': ['a1', 'a2']}, {('a1',): 0.8, ('a2',): 0.2}, logarithmetic=False) g = Factor(['A', 'B'], {'A': ['a1', 'a2'], 'B': ['b1', 'b2']}, { ('a1', 'b1'): 0.5, ('a1', 'b2'): 0.6, ('a2', 'b1'): 0.5, ('a2', 'b2'): 0.4, }, logarithmetic=False) h = f._apply_op_different(g, np.multiply) self.assertAlmostEqual(h[('a1', 'b1')], 0.4) self.assertAlmostEqual(h[('a2', 'b1')], 0.1) h = f._apply_op_different(g, np.add) self.assertAlmostEqual(h[('a1', 'b1')], 1.3) self.assertAlmostEqual(h[('a2', 'b1')], 0.7) h = f._apply_op_different(g, np.divide) self.assertAlmostEqual(h[('a1', 'b1')], 1.6) self.assertAlmostEqual(h[('a2', 'b1')], 0.4) h = f._apply_op_different(g, np.subtract) self.assertAlmostEqual(h[('a1', 'b1')], 0.3) self.assertAlmostEqual(h[('a2', 'b1')], -0.3) h = f._apply_op_different(g, np.power) self.assertAlmostEqual(h[('a1', 'b1')], 0.894427191) self.assertAlmostEqual(h[('a2', 'b1')], 0.447213595)
[docs] def test_apply_op_same(self): f = Factor(['A'], {'A': ['a1', 'a2']}, {('a1',): 0.8, ('a2',): 0.2}, logarithmetic=False) g = Factor(['A'], {'A': ['a1', 'a2']}, { ('a1',): 0.5, ('a2',): 0.5, }, logarithmetic=False) h = f._apply_op_same(g, np.multiply) self.assertAlmostEqual(h[('a1',)], 0.4) self.assertAlmostEqual(h[('a2',)], 0.1) h = f._apply_op_same(g, np.add) self.assertAlmostEqual(h[('a1',)], 1.3) self.assertAlmostEqual(h[('a2',)], 0.7) h = f._apply_op_same(g, np.divide) self.assertAlmostEqual(h[('a1',)], 1.6) self.assertAlmostEqual(h[('a2',)], 0.4) h = f._apply_op_same(g, np.subtract) self.assertAlmostEqual(h[('a1',)], 0.3) self.assertAlmostEqual(h[('a2',)], -0.3) h = f._apply_op_same(g, np.power) self.assertAlmostEqual(h[('a1',)], 0.894427191) self.assertAlmostEqual(h[('a2',)], 0.447213595)
[docs] def test_apply_op_scalar(self): f = Factor(['A'], {'A': ['a1', 'a2']}, {('a1',): 0.8, ('a2',): 0.2}, logarithmetic=False) g = 0.5 h = f._apply_op_scalar(g, np.multiply) self.assertAlmostEqual(h[('a1',)], 0.4) self.assertAlmostEqual(h[('a2',)], 0.1) h = f._apply_op_scalar(g, np.add) self.assertAlmostEqual(h[('a1',)], 1.3) self.assertAlmostEqual(h[('a2',)], 0.7) h = f._apply_op_scalar(g, np.divide) self.assertAlmostEqual(h[('a1',)], 1.6) self.assertAlmostEqual(h[('a2',)], 0.4) h = f._apply_op_scalar(g, np.subtract) self.assertAlmostEqual(h[('a1',)], 0.3) self.assertAlmostEqual(h[('a2',)], -0.3) h = f._apply_op_scalar(g, np.power) self.assertAlmostEqual(h[('a1',)], 0.894427191) self.assertAlmostEqual(h[('a2',)], 0.447213595)
[docs] def test_setitem(self): f = Factor( ['X'], { 'X': [0, 1], }, { (0,): 0.8, (1,): 0.2, } ) self.assertAlmostEqual(f[(0,)], 0.8) f[(0,)] = 0.6 self.assertAlmostEqual(f[(0,)], 0.6)
[docs] def test_strides(self): factor = Factor( ['A', 'B', 'C'], { "A": [0, 1], "B": [0, 1, 2], "C": [0, 1], }, { # (A, B, C): P(A, B, C) (0, 0, 0): 0.01, (0, 0, 1): 0.02, (0, 1, 0): 0.03, (0, 1, 1): 0.04, (0, 2, 0): 0.05, (0, 2, 1): 0.05, (1, 0, 0): 0.06, (1, 0, 1): 0.07, (1, 1, 0): 0.08, (1, 1, 1): 0.09, (1, 2, 0): 0.1, (1, 2, 1): 0.4, }) self.assertEquals(factor.strides, {"A": 6, "B": 2, "C": 1})
[docs] def test_observations(self): f = Factor( ['X'], { 'X': [0, 1], }, { (0,): 0.8, (1,): 0.2, } ) self.assertAlmostEqual(f[(0,)], 0.8) f.observed({(1,): 0.2}) self.assertAlmostEqual(f[(0,)], 0) self.assertAlmostEqual(f[(1,)], 0.2) f.observed(None) self.assertAlmostEqual(f[(0,)], 0.8) self.assertAlmostEqual(f[(1,)], 0.2)
[docs] def test_get_index_from_assignment(self): factor = Factor( ['A', 'B', 'C'], { "A": [0, 1], "B": [0, 1, 2], "C": [0, 1], }, { # (A, B, C): P(A, B, C) (0, 0, 0): 0.01, (0, 0, 1): 0.02, (0, 1, 0): 0.03, (0, 1, 1): 0.04, (0, 2, 0): 0.05, (0, 2, 1): 0.05, (1, 0, 0): 0.06, (1, 0, 1): 0.07, (1, 1, 0): 0.08, (1, 1, 1): 0.09, (1, 2, 0): 0.1, (1, 2, 1): 0.4, }) self.assertEquals(factor._get_index_from_assignment((0, 2, 1)), 5) self.assertEquals(factor._get_index_from_assignment((1, 0, 1)), 7)
[docs] def test_get_assignment_from_index(self): factor = Factor( ['A', 'B', 'C'], { "A": [0, 1], "B": [0, 1, 2], "C": [0, 1], }, { # (A, B, C): P(A, B, C) (0, 0, 0): 0.01, (0, 0, 1): 0.02, (0, 1, 0): 0.03, (0, 1, 1): 0.04, (0, 2, 0): 0.05, (0, 2, 1): 0.05, (1, 0, 0): 0.06, (1, 0, 1): 0.07, (1, 1, 0): 0.08, (1, 1, 1): 0.09, (1, 2, 0): 0.1, (1, 2, 1): 0.4, }) self.assertEquals( factor._get_assignment_from_index(3), (0, 1, 1)) self.assertEquals( factor._get_assignment_from_index(10), (1, 2, 0))
[docs] def test_marginalize(self): factor = Factor( ['A', 'B', 'C'], { "A": [0, 1], "B": [0, 1, 2], "C": [0, 1], }, { # (A, B, C): P(A, B, C) (0, 0, 0): 0.01, (0, 0, 1): 0.02, (0, 1, 0): 0.03, (0, 1, 1): 0.04, (0, 2, 0): 0.05, (0, 2, 1): 0.05, (1, 0, 0): 0.06, (1, 0, 1): 0.07, (1, 1, 0): 0.08, (1, 1, 1): 0.09, (1, 2, 0): 0.1, (1, 2, 1): 0.4, }) factor_a = factor.marginalize(["A"]) self.assertAlmostEqual(factor_a[(0,)], 0.2) self.assertAlmostEqual(factor_a[(1,)], 0.8) factor_ac = factor.marginalize(["A", "C"]) self.assertAlmostEqual(factor_ac[(0, 0)], 0.09)
[docs] def test_multiplication(self): f1 = Factor( ['A', 'B'], { 'A': ['a', 'b'], 'B': [0, 1, 2] }, { ('a', 0): 0.1, ('a', 1): 0.15, ('a', 2): 0.2, ('b', 0): 0.25, ('b', 1): 0.25, ('b', 2): 0.05, }) f2 = Factor( ['B', 'C'], { 'B': [0, 1, 2], 'C': [0, 1] }, { (0, 0): 0.1, (0, 1): 0.15, (1, 0): 0.2, (1, 1): 0.25, (2, 0): 0.25, (2, 1): 0.05, }) f3 = f1 * f2 self.assertEqual(f3.variables, ['A', 'B', 'C']) self.assertAlmostEqual(f3[('a', 0, 0)], f1[('a', 0)] * f2[(0, 0)]) self.assertAlmostEqual(f3[('b', 1, 0)], f1[('b', 1)] * f2[(1, 0)]) self.assertAlmostEqual(f3[('b', 2, 1)], f1[('b', 2)] * f2[(2, 1)])
[docs] def test_multiplication_different_values(self): f1 = Factor( ['A'], { 'A': ['a1', 'a2'] }, { ('a1',): 0.8, ('a2',): 0.2 } ) f2 = Factor( ['B'], { 'B': ['b1', 'b2'] }, { ('b1',): 0.8, ('b2',): 0.2 } ) f3 = f1 * f2 self.assertEqual(f3.variables, ['A', 'B']) self.assertAlmostEqual(f3[('a1', 'b1')], 0.64)
[docs] def test_fast_mul(self): f1 = Factor( ['A', 'B'], { 'A': ['a', 'b'], 'B': [0, 1, 2] }, { ('a', 0): 0.1, ('a', 1): 0.15, ('a', 2): 0.2, ('b', 0): 0.25, ('b', 1): 0.25, ('b', 2): 0.05, }) f2 = f1 * f1 self.assertAlmostEqual(f2[('a', 2)], 0.04) self.assertAlmostEqual(f2[('b', 1)], 0.0625)
[docs] def test_fast_mul_correct(self): f1 = Factor( ['B'], { 'B': [0, 1, 2] }, { (0,): 0.2, (1,): 0.25, (2,): 0.55, }) f2 = Factor( ['B'], { 'B': [0, 1] }, { (0,): 0.7, (1,): 0.3, }) self.assertRaises(ValueError, lambda: f2 * f1)
[docs] def test_division(self): factor = Factor( ['A', 'B', 'C'], { "A": [0, 1], "B": [0, 1, 2], "C": [0, 1], }, { # (A, B, C): P(A, B, C) (0, 0, 0): 0.01, (0, 0, 1): 0.02, (0, 1, 0): 0.03, (0, 1, 1): 0.04, (0, 2, 0): 0.05, (0, 2, 1): 0.05, (1, 0, 0): 0.06, (1, 0, 1): 0.07, (1, 1, 0): 0.08, (1, 1, 1): 0.09, (1, 2, 0): 0.1, (1, 2, 1): 0.4, }) f2 = Factor( ['B', 'C'], { 'B': [0, 1, 2], 'C': [0, 1] }, { (0, 0): 0.1, (0, 1): 0.15, (1, 0): 0.2, (1, 1): 0.25, (2, 0): 0.25, (2, 1): 0.05, }) f3 = factor / f2 self.assertAlmostEqual(f3[(0, 0, 0)], 0.1) self.assertAlmostEqual(f3[(1, 0, 1)], 7 / 15.0) self.assertAlmostEqual(f3[(1, 2, 0)], 0.4)
[docs] def test_fast_div(self): f2 = Factor( ['B', 'C'], { 'B': [0, 1, 2], 'C': [0, 1] }, { (0, 0): 0.1, (0, 1): 0.15, (1, 0): 0.2, (1, 1): 0.25, (2, 0): 0.25, (2, 1): 0.05, }) f3 = f2 / f2 self.assertAlmostEqual(f3[0, 0], 1) self.assertAlmostEqual(f3[1, 1], 1)
[docs] def test_mul_div(self): factor = Factor( ['A', 'B', 'C'], { "A": [0, 1], "B": [0, 1, 2], "C": [0, 1], }, { # (A, B, C): P(A, B, C) (0, 0, 0): 0.01, (0, 0, 1): 0.02, (0, 1, 0): 0.03, (0, 1, 1): 0.04, (0, 2, 0): 0.05, (0, 2, 1): 0.05, (1, 0, 0): 0.06, (1, 0, 1): 0.07, (1, 1, 0): 0.08, (1, 1, 1): 0.09, (1, 2, 0): 0.1, (1, 2, 1): 0.4, }) f2 = Factor( ['B', 'C'], { 'B': [0, 1, 2], 'C': [0, 1] }, { (0, 0): 0.1, (0, 1): 0.15, (1, 0): 0.2, (1, 1): 0.25, (2, 0): 0.25, (2, 1): 0.05, }) f3 = factor * f2 f4 = f3 / f2 for i in range(f4.factor_length): assignment = f4._get_assignment_from_index(i) self.assertAlmostEqual(f4[assignment], factor[assignment])
[docs] def test_parents_normalize(self): f = Factor( ['A', 'B'], { 'A': ['a1', 'a2'], 'B': ['b1', 'b2'] }, { ('a1', 'b1'): 1, ('a2', 'b1'): 1, ('a1', 'b2'): 1, ('a2', 'b2'): 1 }) f.normalize(parents=['B']) self.assertAlmostEqual(f[('a1', 'b1')], 0.5)
[docs] def test_power(self): f = Factor( ['A', 'B'], { 'A': ['a1', 'a2'], 'B': ['b1', 'b2'] }, { ('a1', 'b1'): 0.3, ('a1', 'b2'): 0.2, ('a2', 'b1'): 0.7, ('a2', 'b2'): 0.8, }) f1 = f ** 2 self.assertAlmostEqual(f[('a1', 'b1')], 0.3) self.assertAlmostEqual(f1[('a1', 'b1')], 0.09)
[docs] def test_rename(self): f = Factor( ['A', 'B'], { 'A': ['a1', 'a2'], 'B': ['b1', 'b2'] }, { ('a1', 'b1'): 0.3, ('a1', 'b2'): 0.2, ('a2', 'b1'): 0.7, ('a2', 'b2'): 0.8, }) f.rename_variables({'A': 'Z'}) self.assertAlmostEqual(f[('a1', 'b1')], 0.3) f1 = Factor( ['Z'], { 'Z': ['a1', 'a2'], }, { ('a1',): 0.5, ('a2',): 0.5, }) f2 = f * f1 self.assertAlmostEqual(f2[('b1', 'a1')], 0.15) self.assertAlmostEqual(f2[('b1', 'a2')], 0.35)
[docs] def test_add(self): X0 = Factor( ['X0'], { 'X0': ['x0_0', 'x0_1'] }, { ('x0_0',): 1, ('x0_1',): 0, }) f = X0 + 1 self.assertAlmostEqual(f[('x0_0',)], 2) self.assertAlmostEqual(f[('x0_1',)], 1) f = X0 + X0 self.assertAlmostEqual(f[('x0_0',)], 2) self.assertAlmostEqual(f[('x0_1',)], 0)
[docs] def test_logsubexp(self): a1 = to_log(np.array([1, 2, 3])) a2 = to_log(np.array([1, 1, 2])) result = from_log(logsubexp(a1, a2)) self.assertAlmostEqual(result[0], 0) self.assertAlmostEqual(result[1], 1) self.assertAlmostEqual(result[2], 1) result = from_log(logsubexp(a1, to_log(1))) self.assertAlmostEqual(result[0], 0) self.assertAlmostEqual(result[1], 1) self.assertAlmostEqual(result[2], 2)
[docs] def test_sum_other(self): alpha = Factor( ['X0', 'X1'], { 'X0': ['x0_0', 'x0_1'], 'X1': ['x1_0', 'x1_1'], }, { ('x0_0', 'x1_0'): 8, ('x0_0', 'x1_1'): 1, ('x0_1', 'x1_0'): 2, ('x0_1', 'x1_1'): 1, } ) f = alpha.sum_other() self.assertAlmostEqual(f[('x0_0', 'x1_0')], 4) self.assertAlmostEqual(f[('x0_0', 'x1_1')], 11, places=5) self.assertAlmostEqual(f[('x0_1', 'x1_0')], 10) self.assertAlmostEqual(f[('x0_1', 'x1_1')], 11, places=5)
[docs] def test_expected_value_squared(self): alpha = Factor( ['X0', 'X1'], { 'X0': ['x0_0', 'x0_1'], 'X1': ['x1_0', 'x1_1'], }, { ('x0_0', 'x1_0'): 8, ('x0_0', 'x1_1'): 1, ('x0_1', 'x1_0'): 2, ('x0_1', 'x1_1'): 1, } ) sum_of_alphas = alpha.marginalize(['X1']) expected_value_squared = alpha * (alpha + 1) / (sum_of_alphas * (sum_of_alphas + 1)) self.assertAlmostEqual(expected_value_squared[('x0_0', 'x1_0')], 72.0/110) self.assertAlmostEqual(expected_value_squared[('x0_0', 'x1_1')], 1.0/3) self.assertAlmostEqual(expected_value_squared[('x0_1', 'x1_0')], 6.0/110) self.assertAlmostEqual(expected_value_squared[('x0_1', 'x1_1')], 1.0/3)
[docs] def test_alphas(self): alpha = Factor( ['X0', 'X1', 'X2', 'X3'], { 'X0': ['x0_0', 'x0_1'], 'X1': ['x1_0', 'x1_1'], 'X2': ['x2_0', 'x2_1'], 'X3': ['x3_0', 'x3_1'], }, { ('x0_0', 'x1_0', 'x2_0', 'x3_0'): 1, ('x0_0', 'x1_0', 'x2_0', 'x3_1'): 1, ('x0_0', 'x1_0', 'x2_1', 'x3_0'): 1, ('x0_0', 'x1_0', 'x2_1', 'x3_1'): 2, ('x0_0', 'x1_1', 'x2_0', 'x3_0'): 5, ('x0_0', 'x1_1', 'x2_0', 'x3_1'): 1, ('x0_0', 'x1_1', 'x2_1', 'x3_0'): 3, ('x0_0', 'x1_1', 'x2_1', 'x3_1'): 1, ('x0_1', 'x1_0', 'x2_0', 'x3_0'): 1, ('x0_1', 'x1_0', 'x2_0', 'x3_1'): 1, ('x0_1', 'x1_0', 'x2_1', 'x3_0'): 1, ('x0_1', 'x1_0', 'x2_1', 'x3_1'): 1, ('x0_1', 'x1_1', 'x2_0', 'x3_0'): 1, ('x0_1', 'x1_1', 'x2_0', 'x3_1'): 1, ('x0_1', 'x1_1', 'x2_1', 'x3_0'): 1, ('x0_1', 'x1_1', 'x2_1', 'x3_1'): 1, }) X0 = Factor( ['X0'], { 'X0': ['x0_0', 'x0_1'] }, { ('x0_0',): 1, ('x0_1',): 0, }) X1 = Factor( ['X1'], { 'X1': ['x1_0', 'x1_1'] }, { ('x1_0',): 0, ('x1_1',): 1, }) X2 = Factor( ['X2'], { 'X2': ['x2_0', 'x2_1'] }, { ('x2_0',): 1, ('x2_1',): 0, }) X3 = Factor( ['X3'], { 'X3': ['x3_0', 'x3_1'] }, { ('x3_0',): 1, ('x3_1',): 0, }) # Compute message to X1. cavity = X0 * X2 * X3 self.assertAlmostEqual(cavity[('x0_0', 'x2_0', 'x3_0')], 1.0) sum_of_alphas = alpha.marginalize(['X1', 'X2', 'X3']) self.assertAlmostEqual(sum_of_alphas[('x1_1', 'x2_0', 'x3_0')], 6) expected_value = alpha / sum_of_alphas self.assertAlmostEqual(expected_value[('x0_0', 'x1_1', 'x2_1', 'x3_0')], 0.75) factor = cavity * expected_value self.assertAlmostEqual(factor[('x0_0', 'x1_1', 'x2_0', 'x3_0')], 5.0/6) msg = factor.marginalize(['X1']) self.assertAlmostEqual(msg[('x1_0',)], 0.5) # Compute message to X0. cavity = X1 * X2 * X3 self.assertAlmostEqual(cavity[('x1_1', 'x2_0', 'x3_0')], 1.0) self.assertAlmostEqual(cavity[('x1_0', 'x2_0', 'x3_0')], 0.0) sum_of_alphas = alpha.marginalize(['X1', 'X2', 'X3']) expected_value = alpha / sum_of_alphas self.assertAlmostEqual(expected_value[('x0_0', 'x1_1', 'x2_0', 'x3_1')], 0.5) factor = cavity * expected_value msg = factor.marginalize(['X0']) self.assertAlmostEqual(msg[('x0_0',)], 5.0/6) # Compute w_0. sum_of_alphas = alpha.marginalize(['X1', 'X2', 'X3']) expected_value = alpha / sum_of_alphas belief = X0 * X1 * X2 * X3 factor = belief * expected_value msg = factor.marginalize(['X1', 'X2', 'X3']) w0 = msg.sum_other() self.assertAlmostEqual(w0[('x1_0', 'x2_0', 'x3_0')], 5.0/6) # Compute w_k. # When we want to compute w_k for each j, we can just compute the belief. # Each j is denoted by an assignment of parents, which in this case is # the assignment of X1, X2, X3, and each k is a value of a child, in # this case it's the value of X0. # For given j and k, we can get the value of w_jk, by getting one row # from w_k. w_k = X0 * X1 * X2 * X3 self.assertAlmostEqual(w_k[('x0_0', 'x1_0', 'x2_0', 'x3_0')], 0) self.assertAlmostEqual(w_k[('x0_0', 'x1_1', 'x2_0', 'x3_0')], 1) # Compute expected value of p(theta) sum_of_alphas = alpha.marginalize(['X1', 'X2', 'X3']) expected_value_0 = alpha / sum_of_alphas sum_of_alphas_plus_1 = alpha.marginalize(['X1', 'X2', 'X3']) sum_of_alphas_plus_1 += 1 expected_values = [w0 * expected_value_0] self.assertAlmostEqual(expected_values[0][('x0_0', 'x1_0', 'x2_0', 'x3_0')], 5.0/12) self.assertAlmostEqual(expected_values[0][('x0_1', 'x1_0', 'x2_0', 'x3_0')], 5.0/12) for k in X0.variable_values['X0']: new_alpha = copy.deepcopy(alpha) for i, item in enumerate(new_alpha): (assignment, value) = item if assignment[0] == k: new_alpha[assignment] += 1 expected_value_k = new_alpha / sum_of_alphas_plus_1 expected_values.append(w_k * expected_value * expected_value_k) expected_value_sum = reduce(operator.add, expected_values)
if __name__ == '__main__': unittest.main()