"""Mass matrix classes and associated methods.
The classes in this module describe the metric used in HMC algorithms. Changing the
metric alters the shape of trajectories the HMC algorithm generates, thereby impacting
convergence performance.
All of the classes inherit from :class:`._AbstractMassMatrix`; a base class outlining
required methods and their signatures (required in- and outputs).
.. note::
The mass matrix is vitally important for the performance of HMC algorithms A
tutorial on the tuning parameters of HMC can be found at
:ref:`/notebooks/tutorials/1 - Tuning Hamiltonian Monte Carlo.ipynb`.
"""
import warnings as _warnings
from abc import ABC as _ABC
from abc import abstractmethod as _abstractmethod
from scipy.linalg import cho_factor as _cho_factor, cho_solve as _cho_solve
import numpy as _numpy
from hmclab.Helpers.CustomExceptions import AbstractMethodError as _AbstractMethodError
from hmclab.Helpers.CustomExceptions import Assertions
[docs]class _AbstractMassMatrix(_ABC):
"""Abstract base class for mass matrices.
Defines all required methods for derived classes.
"""
name: str = "mass matrix abstract base class"
dimensions: int = -1
rng: _numpy.random.Generator = _numpy.random.default_rng()
def full_name(self) -> str:
raise _AbstractMethodError()
[docs] @_abstractmethod
def kinetic_energy(self, momentum: _numpy.ndarray) -> float:
"""Abstract method for computing kinetic energy for a given momentum.
Parameters
----------
momentum
"""
raise _AbstractMethodError()
[docs] @_abstractmethod
def kinetic_energy_gradient(self, momentum: _numpy.ndarray) -> _numpy.ndarray:
"""Abstract method for computing kinetic energy gradient for a given
momentum.
Parameters
----------
momentum
"""
raise _AbstractMethodError()
@_abstractmethod
def generate_momentum(self) -> _numpy.ndarray:
raise _AbstractMethodError()
@staticmethod
def create_default(dimensions: int) -> "_AbstractMassMatrix":
raise _AbstractMethodError()
[docs]class Unit(_AbstractMassMatrix):
"""The unit mass matrix.
This mass matrix or metric does not perform any scaling on the target distribution.
It is the default setting for the HMC algorithms and is optimal when all parameters
of your target distribution are expected to have the same variance and no
trade-offs.
"""
[docs] def __init__(self, dimensions: int = -1, rng: _numpy.random.Generator = None):
"""Constructor for unit mass matrices"""
self.name = "unit mass matrix"
self.dimensions = dimensions
if rng is not None:
self.rng = rng
[docs] def kinetic_energy(self, momentum: _numpy.ndarray) -> float:
"""
Parameters
----------
momentum
Returns
-------
"""
if momentum.shape != (self.dimensions, 1):
raise ValueError()
return 0.5 * (momentum.T @ momentum).item(0)
[docs] def kinetic_energy_gradient(self, momentum: _numpy.ndarray) -> _numpy.ndarray:
"""
Parameters
----------
momentum
Returns
-------
"""
if momentum.shape != (self.dimensions, 1):
raise ValueError()
return momentum
[docs] def generate_momentum(self) -> _numpy.ndarray:
"""
Returns
-------
"""
return self.rng.normal(size=(self.dimensions, 1))
@property
def matrix(self) -> _numpy.ndarray:
"""
Returns
-------
"""
return _numpy.eye(self.dimensions)
@staticmethod
def create_default(dimensions: int, rng: _numpy.random.Generator = None) -> "Unit":
return Unit(dimensions, rng)
[docs]class Diagonal(_AbstractMassMatrix):
"""The diagonal mass matrix.
This mass matrix or metric does only performs scaling on each dimension separately.
It is optimal when all parameters of your target distribution are expected to be
independent (not have trade-offs) but still varying scales of disperion / variance.
"""
[docs] def __init__(self, diagonal: _numpy.ndarray, rng: _numpy.random.Generator = None):
"""Constructor for diagonal mass matrices."""
self.name = "diagonal mass matrix"
diagonal = _numpy.asarray(diagonal)
diagonal.shape = (diagonal.size, 1)
assert diagonal.shape == (diagonal.size, 1)
self.dimensions = diagonal.size
self.diagonal = diagonal
self.inverse_diagonal = 1.0 / self.diagonal
if rng is not None:
self.rng = rng
[docs] def kinetic_energy(self, momentum: _numpy.ndarray) -> float:
"""
Parameters
----------
momentum
Returns
-------
"""
if momentum.shape != (self.dimensions, 1):
raise ValueError()
return 0.5 * _numpy.vdot(momentum, self.inverse_diagonal * momentum)
[docs] def kinetic_energy_gradient(self, momentum: _numpy.ndarray) -> _numpy.ndarray:
"""
Parameters
----------
momentum
Returns
-------
"""
if momentum.shape != (self.dimensions, 1):
raise ValueError()
return self.inverse_diagonal * momentum
[docs] def generate_momentum(self) -> _numpy.ndarray:
"""
Returns
-------
"""
return _numpy.sqrt(self.diagonal) * self.rng.normal(size=(self.dimensions, 1))
@property
def matrix(self) -> _numpy.ndarray:
return _numpy.diagflat(self.diagonal)
@staticmethod
def create_default(
dimensions: int, rng: _numpy.random.Generator = None
) -> "Diagonal":
diagonal = _numpy.ones((dimensions, 1))
return Diagonal(diagonal, rng=rng)
class Full(_AbstractMassMatrix):
"""The full mass matrix.
This mass matrix or metric performs scaling on all dimensions, as well as rotations.
It is ideal when it is know a-priori that parameters might be heavily correlated.
"""
def __init__(
self,
full: _numpy.ndarray,
rng: _numpy.random.Generator = None,
do_hermitian_check=True,
):
"""Constructor for diagonal mass matrices."""
self.name = "diagonal mass matrix"
self.mass_matrix = _numpy.asarray(full)
if do_hermitian_check:
assert _numpy.allclose(self.mass_matrix, self.mass_matrix.T)
self.cholesky, self.cholesky_lower = _cho_factor(self.mass_matrix, lower=True)
self.cholesky = _numpy.tril(self.cholesky)
self.dimensions = self.mass_matrix.shape[0]
if rng is not None:
self.rng = rng
def kinetic_energy(self, momentum: _numpy.ndarray) -> float:
"""
Parameters
----------
momentum
Returns
-------
"""
if momentum.shape != (self.dimensions, 1):
raise ValueError()
return 0.5 * _numpy.vdot(
momentum, _cho_solve((self.cholesky, self.cholesky_lower), momentum)
)
def kinetic_energy_gradient(self, momentum: _numpy.ndarray) -> _numpy.ndarray:
"""
Parameters
----------
momentum
Returns
-------
"""
if momentum.shape != (self.dimensions, 1):
raise ValueError()
return _cho_solve((self.cholesky, self.cholesky_lower), momentum)
def generate_momentum(self, repeat=1) -> _numpy.ndarray:
"""
Returns
-------
"""
return self.cholesky @ self.rng.normal(size=(self.dimensions, repeat))
@property
def matrix(self) -> _numpy.ndarray:
return self.mass_matrix
@staticmethod
def create_default(dimensions: int, rng: _numpy.random.Generator = None) -> "Full":
mass_matrix = (
_numpy.eye(dimensions)
+ 0.1 * _numpy.eye(dimensions, k=-1)
+ 0.1 * _numpy.eye(dimensions, k=1)
)
return Full(mass_matrix, rng=rng)
class LBFGS(_AbstractMassMatrix):
"""The experimental adaptive LBFGS mass matrix.
.. warning::
This mass matrix is not guaranteed to produce valid Markov chains.
"""
def __init__(
self,
dimensions: int,
number_of_vectors: int = 10,
starting_position: _numpy.ndarray = None,
starting_gradient: _numpy.ndarray = None,
max_determinant_change: float = 0.1,
update_interval: int = 1,
rng: _numpy.random.Generator = None,
):
"""Constructor for LBFGS-style mass matrices."""
if starting_position is None or starting_gradient is None:
starting_gradient = _numpy.ones((dimensions, 1))
starting_position = _numpy.ones((dimensions, 1))
self.name = "LBFGS-style mass matrix"
self.dimensions = dimensions
self.number_of_vectors = number_of_vectors
self.currently_stored_gradients = 0
self.current_position = starting_position
self.current_gradient = starting_gradient
self.update_attempt = 0
self.update_interval = update_interval
self.s = _numpy.empty((dimensions, number_of_vectors))
self.y = _numpy.empty((dimensions, number_of_vectors))
self.u = _numpy.empty((dimensions, number_of_vectors))
self.v = _numpy.empty((dimensions, number_of_vectors))
self.vTu = _numpy.empty(number_of_vectors)
self.max_determinant_change = max_determinant_change
if rng is not None:
self.rng = rng
def kinetic_energy(self, momentum: _numpy.ndarray) -> float:
if momentum.shape != (self.dimensions, 1):
raise ValueError()
return 0.5 * _numpy.vdot(momentum, self.Hinv(momentum))
def kinetic_energy_gradient(self, momentum: _numpy.ndarray) -> _numpy.ndarray:
if momentum.shape != (self.dimensions, 1):
raise ValueError()
return self.Hinv(momentum)
def generate_momentum(self) -> _numpy.ndarray:
return self.S(self.rng.normal(size=(self.dimensions, 1)))
def update(self, m, g):
self.update_attempt += 1
# if (
# self.currently_stored_gradients == self.number_of_vectors
# or self.update_attempt % self.update_interval
# ):
# # Do nothing if we shouldn't do anything
# return
# Calculate separation and gradient change
s_update = m - self.current_position
y_update = g - self.current_gradient
assert s_update.shape == (self.dimensions, 1), Assertions.v_shape
assert y_update.shape == (self.dimensions, 1), Assertions.v_shape
rho = 1.0 / _numpy.vdot(s_update, y_update)
# Do nothing unless rho is positive.
if rho > 0 and not _numpy.isnan(rho) and not _numpy.isinf(rho):
self.current_position = m
self.current_gradient = g
Hinv_y = self.Hinv(y_update)
gamma2 = rho**2 * _numpy.vdot(y_update, y_update) + rho
beta = gamma2 * _numpy.vdot(s_update, self.H(s_update))
theta = _numpy.sqrt(rho / (beta * gamma2))
a = _numpy.sqrt(gamma2) * s_update
b = (rho / _numpy.sqrt(gamma2)) * Hinv_y
u_update = a
v_update = -self.H(b + theta * a)
assert u_update.shape == (self.dimensions, 1), Assertions.v_shape
assert v_update.shape == (self.dimensions, 1), Assertions.v_shape
sigma_threshold = (1.0 / (1.0 + _numpy.vdot(u_update, v_update))) ** 2
if sigma_threshold < self.max_determinant_change:
r = (1.0 - self.max_determinant_change) / (
self.max_determinant_change * _numpy.vdot(u_update, v_update)
)
v_update = r * v_update
if self.currently_stored_gradients < self.number_of_vectors:
self.currently_stored_gradients += 1
else:
# self.s[:, 1:-1] = self.s[:, 2:]
# self.y[:, 1:-1] = self.y[:, 2:]
# self.u[:, 1:-1] = self.u[:, 2:]
# self.v[:, 1:-1] = self.v[:, 2:]
# self.vTu[1:-1] = self.vTu[2:]
# Next two blocks are equivalent
# 1
# self.s[:, :-1] = self.s[:, 1:]
# self.y[:, :-1] = self.y[:, 1:]
# self.u[:, :-1] = self.u[:, 1:]
# self.v[:, :-1] = self.v[:, 1:]
# self.vTu[:-1] = self.vTu[1:]
# 2
self.s = _numpy.roll(self.s, -1, axis=1)
self.y = _numpy.roll(self.y, -1, axis=1)
self.u = _numpy.roll(self.u, -1, axis=1)
self.v = _numpy.roll(self.v, -1, axis=1)
self.vTu = _numpy.roll(self.vTu, -1, axis=0)
assert s_update.shape == (self.dimensions, 1), Assertions.v_shape
assert y_update.shape == (self.dimensions, 1), Assertions.v_shape
assert u_update.shape == (self.dimensions, 1), Assertions.v_shape
assert v_update.shape == (self.dimensions, 1), Assertions.v_shape
self.s[:, self.currently_stored_gradients - 1] = s_update.flatten()
self.y[:, self.currently_stored_gradients - 1] = y_update.flatten()
self.u[:, self.currently_stored_gradients - 1] = u_update.flatten()
self.v[:, self.currently_stored_gradients - 1] = v_update.flatten()
self.vTu[self.currently_stored_gradients - 1] = 1.0 + _numpy.vdot(
v_update, u_update
)
else:
print(f"Not updating. Rho: {rho}")
def S(self, h):
for i in range(self.currently_stored_gradients):
h = (
h
- self.v[:, i, None] * _numpy.vdot(self.u[:, i, None], h) / self.vTu[i]
)
assert h.shape == (self.dimensions, 1), Assertions.v_shape
return h
def ST(self, h):
for i in range(self.currently_stored_gradients - 1, -1, -1):
h = (
h
- self.u[:, i, None] * _numpy.vdot(self.v[:, i, None], h) / self.vTu[i]
)
assert h.shape == (self.dimensions, 1), Assertions.v_shape
return h
def Sinv(self, h):
for i in range(self.currently_stored_gradients - 1, -1, -1):
h = h + self.v[:, i, None] * _numpy.vdot(self.u[:, i, None], h)
assert h.shape == (self.dimensions, 1), Assertions.v_shape
return h
def SinvT(self, h):
for i in range(self.currently_stored_gradients):
h = h + self.u[:, i, None] * _numpy.vdot(self.v[:, i, None], h)
assert h.shape == (self.dimensions, 1), Assertions.v_shape
return h
def H(self, h):
h = self.ST(h)
assert h.shape == (self.dimensions, 1), Assertions.v_shape
return self.S(h)
def Hinv(self, h):
h = self.Sinv(h)
assert h.shape == (self.dimensions, 1), Assertions.v_shape
return self.SinvT(h)
def logdet(self):
logdet = 0.0
for i in range(self.currently_stored_gradients):
alpha = 1.0 / (1.0 + _numpy.dot(self.u[:, i], self.v[:, i]))
logdet += _numpy.log(alpha**2)
return logdet
@property
def matrix(self):
matrix = _numpy.empty((self.dimensions, self.dimensions))
return matrix
@staticmethod
def create_default(dimensions: int, rng: _numpy.random.Generator = None) -> "LBFGS":
return LBFGS(dimensions, rng=rng)