Source code for hmclab.Distributions.LinearMatrix

"""Linear matrix distribution (quadratic equations) classes and associated methods.
"""
import warnings as _warnings
from typing import Union as _Union

import numpy as _numpy
import scipy as _scipy
import scipy.sparse as _sparse
import scipy.sparse.linalg as _sparse_linalg

from hmclab.Distributions import _AbstractDistribution
from hmclab.Helpers import RandomMatrices as _RandomMatrices


[docs]class LinearMatrix(_AbstractDistribution): """Likelihood model based on a linear forward model given as :math:`G \\mathbf{m} = \\mathbf{d}` coupled with Gaussian observational errors. """
[docs] def __init__( self, G: _Union[_numpy.ndarray, _sparse.spmatrix], d: _numpy.ndarray, data_covariance: _Union[float, _numpy.ndarray, _sparse.spmatrix], dtype=None, **kwargs, ): # Set precision for inconsistently passed objects ------------------------------ if dtype is not None: G.dtype = dtype d = d.astype(G.dtype) if type(data_covariance) not in [float, _numpy.float32, _numpy.float64]: data_covariance = data_covariance.astype(G.dtype) # Four cases: # 1 - Dense G, scalar/vector covariance # 2 - Dense G, dense covariance # 3 - Sparse G, scalar/vector covariance # 4 - Sparse G, sparse covariance # Any other case needs to be manually programmed # Get dimensionality self.dimensions = G.shape[1] # Check data vector ------------------------------------------------------------ if not (type(d) is _numpy.ndarray and d.shape == (d.size, 1)): raise ValueError( "Didn't understand the data vector object. " "Should be a " "NumPy column vector (ndarray: [datapoints, 1]). " f"{type(d)}, {d.shape}" ) # Check forward model matrix --------------------------------------------------- if type(G) is _numpy.ndarray and G.shape == (d.size, self.dimensions): # Dense G dense_matrix = True elif issubclass(type(G), _scipy.sparse.spmatrix) and G.shape == ( d.size, self.dimensions, ): # Sparse G dense_matrix = False else: raise ValueError( "Didn't understand the forward model matrix object." "Should either be " "a NumPy square matrix (ndarray: [self.dimensions, self.dimensions]) " "or a SciPy spmatrix derived type (spmatrix: [self.dimensions, " "self.dimensions])." ) # Check data covariance -------------------------------------------------------- if type(data_covariance) is float or ( # Scalar/vector covariance type(data_covariance) == _numpy.ndarray and data_covariance.shape == (d.size, 1) ): covariance_simple = True elif type(data_covariance) is _numpy.ndarray and data_covariance.shape == ( d.size, d.size, ): # Full covariance (sparse or dense) covariance_simple = False else: # No idea what the user wants, or very specific case. raise ValueError( "Didn't understand the data covariance object." "Should either be a float," "NumPy column vec tor (ndarray: [self.dimensions, 1])" "or NumPy square matrix (ndarray: [self.dimensions, self.dimensions])." ) # Delegate construction -------------------------------------------------------- if covariance_simple and dense_matrix: self.Distribution = _LinearMatrix_dense_forward_simple_covariance( G, d, data_covariance, **kwargs ) elif (not covariance_simple) and dense_matrix: self.Distribution = _LinearMatrix_dense_forward_dense_covariance( G, d, data_covariance, **kwargs ) elif covariance_simple and (not dense_matrix): self.Distribution = _LinearMatrix_sparse_forward_simple_covariance( G, d, data_covariance, **kwargs ) elif (not covariance_simple) and (not dense_matrix): self.Distribution = _LinearMatrix_sparse_forward_sparse_covariance( G, d, data_covariance, **kwargs )
def misfit(self, coordinates: _numpy.ndarray) -> float: """""" return self.Distribution.misfit(coordinates) + self.misfit_bounds(coordinates) def gradient(self, coordinates: _numpy.ndarray) -> _numpy.ndarray: """""" return self.Distribution.gradient(coordinates) def generate(self, repeat=1, rng=_numpy.random.default_rng()) -> _numpy.ndarray: return self.Distribution.generate(repeat, rng=rng) def forward(self, coordinates: _numpy.ndarray) -> _numpy.ndarray: return self.Distribution.G @ coordinates @staticmethod def create_default( dimensions: int, dtype=_numpy.dtype("float64") ) -> "LinearMatrix": G = _numpy.eye(dimensions, dtype=dtype) d = _numpy.ones(dimensions)[:, None] data_variance = 1.0 return LinearMatrix(G, d, data_variance, dtype=dtype)
# 1 - Dense G, scalar/vector covariance class _LinearMatrix_dense_forward_simple_covariance(_AbstractDistribution): def __init__( self, G: _numpy.ndarray, d: _numpy.ndarray, data_variance: _Union[ float, _numpy.ndarray, ], # The name variance is justified, as only used on diagonal dtype=_numpy.single, premultiplication: bool = None, ): self.dimensions = G.shape[1] self.G = G.astype(dtype) self.d = d.astype(dtype) if type(data_variance) == _numpy.ndarray: self.data_variance = data_variance.astype(dtype) else: # There are no float32 for normal numeric instances self.data_variance = data_variance self.data_sigma = self.data_variance**0.5 # Depending on whether the data or the model space dimension is bigger, # performance of the misfit and gradient algorithm differs. If the data # dimension is smaller than model dimension, premultiplication might be faster. if premultiplication is not None: self.premultiplication = premultiplication else: self.premultiplication = self.G.shape[0] > self.G.shape[1] if self.premultiplication: if type(self.data_variance) == float: invcov = _numpy.eye(self.d.size) / self.data_variance else: invcov = _numpy.diag(1.0 / self.data_variance[:, 0]) self.GtG: _numpy.ndarray = self.G.T @ invcov @ self.G self.Gtd0: _numpy.ndarray = G.T @ invcov @ self.d self.dtd: float = (self.d.T @ invcov @ self.d).item() # Free up unnecessary variables del self.G, self.d, self.data_variance, self.data_sigma else: self.Gt: _numpy.ndarray = G.T def misfit(self, coordinates: _numpy.ndarray) -> float: if self.premultiplication: return self.misfit_bounds(coordinates) + ( 0.5 * ( coordinates.T @ (self.GtG @ coordinates - 2 * self.Gtd0) + self.dtd ).item() ) else: return ( self.misfit_bounds(coordinates) + ( 0.5 * _numpy.linalg.norm( (self.G @ coordinates - self.d) / self.data_sigma ) ** 2 ).item() ) def gradient(self, coordinates: _numpy.ndarray) -> _numpy.ndarray: if self.premultiplication: return self.GtG @ coordinates - self.Gtd0 else: return self.Gt @ ((self.G @ coordinates - self.d) / self.data_variance) def generate(self, repeat=1, rng=_numpy.random.default_rng()) -> _numpy.ndarray: raise NotImplementedError() @staticmethod def create_default( dimensions: int, dtype=_numpy.dtype("float64") ) -> "_LinearMatrix_dense_forward_simple_covariance": G = _numpy.eye(dimensions, dtype=dtype) d = _numpy.ones(dimensions)[:, None] data_variance = _numpy.ones((dimensions, 1)) return _LinearMatrix_dense_forward_simple_covariance( G, d, data_variance, dtype=dtype ) # 2 - Dense G, dense covariance class _LinearMatrix_dense_forward_dense_covariance(_AbstractDistribution): def __init__( self, G: _numpy.ndarray, d: _numpy.ndarray, data_covariance: _numpy.ndarray, dtype=_numpy.single, premultiplication: bool = None, ): self.dimensions = G.shape[1] self.G = G.astype(dtype) self.d = d.astype(dtype) self.data_covariance = data_covariance.astype(dtype) # Depending on whether the data or the model space dimension is bigger, # performance of the misfit and gradient algorithm differs. If the data # dimension is smaller than model dimension, premultiplication might be faster. if premultiplication is not None: self.premultiplication = premultiplication else: self.premultiplication = self.G.shape[0] > self.G.shape[1] # Inverse of the data covariance as needed both with and without # premultiplication self.invcov = _numpy.linalg.inv(self.data_covariance) if self.premultiplication: # Precompute factors self.GtG: _numpy.ndarray = self.G.T @ self.invcov @ self.G self.Gtd0: _numpy.ndarray = G.T @ self.invcov @ self.d self.dtd: float = (self.d.T @ self.invcov @ self.d).item() # Free up unnecessary variables del self.G, self.d, self.data_covariance else: self.Gt: _numpy.ndarray = self.G.T self.cholesky_upper_inv_covariance: _numpy.ndarray = _numpy.linalg.cholesky( self.invcov ).T def misfit(self, coordinates: _numpy.ndarray) -> float: if self.premultiplication: return self.misfit_bounds(coordinates) + ( 0.5 * ( coordinates.T @ (self.GtG @ coordinates - 2 * self.Gtd0) + self.dtd ).item() ) else: return ( self.misfit_bounds(coordinates) + ( 0.5 * _numpy.linalg.norm( self.cholesky_upper_inv_covariance @ (self.G @ coordinates - self.d) ) ** 2 ).item() ) def gradient(self, coordinates: _numpy.ndarray) -> _numpy.ndarray: if self.premultiplication: return self.GtG @ coordinates - self.Gtd0 else: return self.Gt @ self.invcov @ (self.G @ coordinates - self.d) def generate(self, repeat=1, rng=_numpy.random.default_rng()) -> _numpy.ndarray: raise NotImplementedError() @staticmethod def create_default( dimensions: int, dtype=_numpy.dtype("float64") ) -> "_LinearMatrix_dense_forward_dense_covariance": G = _numpy.eye(dimensions, dtype=dtype) d = _numpy.ones(dimensions)[:, None] data_variance = _RandomMatrices.random_correlation_matrix(dimensions) return _LinearMatrix_dense_forward_dense_covariance( G, d, data_variance, dtype=dtype ) # 3 - Sparse G, scalar vector covariance class _LinearMatrix_sparse_forward_simple_covariance(_AbstractDistribution): def __init__( self, G: _scipy.sparse.spmatrix, d: _numpy.ndarray, data_variance: _Union[ float, _numpy.ndarray, ], # The name variance is justified, as only used on diagonal dtype=_numpy.single, premultiplication: bool = None, use_mkl: bool = False, ): self.dimensions = G.shape[1] self.G = _scipy.sparse.csr_matrix(G, dtype=dtype) self.d = d.astype(dtype) if type(data_variance) == _numpy.ndarray: self.data_variance = data_variance.astype(dtype) else: self.data_variance = data_variance self.data_sigma = self.data_variance**0.5 self.use_mkl = use_mkl self.dtype = dtype # Depending on whether the data or the model space dimension is bigger, # performance of the misfit and gradient algorithm differs. If the data # dimension is smaller than model dimension, premultiplication might be faster. if premultiplication is not None: self.premultiplication = premultiplication else: self.premultiplication = self.G.shape[0] > self.G.shape[1] # Prepare both cases if self.premultiplication: # Compute covariance matrix for premultiplication if type(self.data_variance) == float: invcov = _scipy.sparse.eye(self.d.size).tocsr() / self.data_variance else: invcov = _scipy.sparse.diags( 1.0 / self.data_variance[:, 0], offsets=0 ).tocsr() # Precompute relevate factors self.GtG: _scipy.sparse.spmatrix = self.G.T @ invcov @ self.G self.Gtd0: _scipy.sparse.spmatrix = G.T @ invcov @ self.d self.dtd: float = (self.d.T @ invcov @ self.d).item() # Free up unnecessary variables del self.G, self.d, self.data_variance, self.data_sigma else: self.Gt: _scipy.sparse.spmatrix = self.G.T.astype(dtype) # Import MKL if use_mkl: try: # Fails with OSError if MKL is not found from hmclab.Helpers.InterfaceMKL import sparse_gemv # MKL binding works only for sparse matrices if type(G) != _sparse.csr_matrix: self.G = _sparse.csr_matrix(G).astype(dtype) self.use_mkl = True # Bind the needed function self.sparse_gemv = sparse_gemv self.Gt = _scipy.sparse.csr_matrix(self.Gt, dtype=dtype) except OSError: _warnings.warn( "MKL not found, will evaluate matrix-vector products using " "SciPy.", Warning, ) self.use_mkl = False except Exception as e: _warnings.warn(f"Not using MKL because: {e}.", Warning) self.use_mkl = False def misfit(self, coordinates: _numpy.ndarray) -> float: if self.premultiplication: return self.misfit_bounds(coordinates) + ( 0.5 * ( coordinates.T @ (self.GtG @ coordinates - 2 * self.Gtd0) + self.dtd ).item() ) elif self.use_mkl: return self.misfit_bounds(coordinates) + ( 0.5 * _numpy.linalg.norm( (self.sparse_gemv(self.G, coordinates) - self.d) / self.data_sigma ) ** 2 ) else: return ( self.misfit_bounds(coordinates) + ( 0.5 * _numpy.linalg.norm( (self.G @ coordinates - self.d) / self.data_sigma ) ** 2 ).item() ) def gradient(self, coordinates: _numpy.ndarray) -> _numpy.ndarray: if self.premultiplication: return self.GtG @ coordinates - self.Gtd0 elif self.use_mkl: return self.sparse_gemv( self.Gt, ((self.sparse_gemv(self.G, coordinates) - self.d) / self.data_variance), ) else: return self.Gt @ ((self.G @ coordinates - self.d) / self.data_variance) def generate(self, repeat=1, rng=_numpy.random.default_rng()) -> _numpy.ndarray: raise NotImplementedError() @staticmethod def create_default( dimensions: int, use_mkl=False, dtype=_numpy.dtype("float32") ) -> "_LinearMatrix_sparse_forward_simple_covariance": G = _sparse.eye(dimensions, dtype=dtype) d = _numpy.ones(dimensions)[:, None] data_variance = _numpy.ones((dimensions, 1)) return _LinearMatrix_sparse_forward_simple_covariance( G, d, data_variance, use_mkl=use_mkl, dtype=dtype ) # 4 - Sparse G, sparse covariance class _LinearMatrix_sparse_forward_sparse_covariance(_AbstractDistribution): def __init__( self, G: _scipy.sparse.spmatrix, d: _numpy.ndarray, data_covariance: _scipy.sparse.spmatrix, dtype=_numpy.single, ): self.dimensions = G.shape[1] self.G = G.astype(dtype) self.d = d.astype(dtype) if not issubclass(type(data_covariance), _scipy.sparse.spmatrix): data_covariance = _scipy.sparse.csr_matrix(data_covariance, dtype=dtype) self.data_covariance = data_covariance self.Gt = self.G.T.tocsr() self.dt = d.T.astype(dtype) self.factorized_covariance = _sparse_linalg.factorized( self.data_covariance.tocsc() ) def misfit(self, coordinates: _numpy.ndarray) -> float: return self.misfit_bounds(coordinates) + ( 0.5 * ( (coordinates.T @ self.Gt - self.dt) @ self.factorized_covariance( (self.G @ coordinates - self.d).astype(self.data_covariance.dtype) ) ).item() ) def gradient(self, coordinates: _numpy.ndarray) -> _numpy.ndarray: return self.Gt @ self.factorized_covariance( (self.G @ coordinates - self.d).astype(self.data_covariance.dtype) ) def generate(self, repeat=1, rng=_numpy.random.default_rng()) -> _numpy.ndarray: raise NotImplementedError() @staticmethod def create_default( dimensions: int, dtype=_numpy.dtype("float64") ) -> "_LinearMatrix_sparse_forward_sparse_covariance": G = _sparse.eye(dimensions, dtype=dtype) d = _numpy.ones(dimensions)[:, None] data_variance = _sparse.eye(dimensions) return _LinearMatrix_sparse_forward_sparse_covariance( G, d, data_variance, dtype=dtype ) # The following two methods are essential for using this subclass in parallel # sampling. During the initialization of parallel sampling, objects are duplicated # using a method called "Pickling". However, one of the methods of this class # (specifically factorized_covariance) is actually a reference to a function # constructed from a matrix. Here we implement the process of reconstructing this # method upon pickling. def __getstate__(self): # capture what is normally pickled state = self.__dict__.copy() # replace the `value` key (now an EnumValue instance), with it's index: state["factorized_covariance"] = self.data_covariance.tocsc() # what we return here will be stored in the pickle return state def __setstate__(self, newstate): # re-create the EnumState instance based on the stored index newstate["factorized_covariance"] = _sparse_linalg.factorized( newstate["factorized_covariance"] ) # re-instate our __dict__ state from the pickled state self.__dict__.update(newstate)