Source code for hmclab.Samplers

"""Sampler classes and associated methods.

The classes in this module provide different sampling algorithms to appraise
distributions. All of them are designed to work in a minimal way; you can run the
sampling method with only a target distribution and filename to write your samples to.
However, the true power of any algorithm only shows when the user injects his expertise
through tuning parameters.

Sampling can be initialised from an instance of a sampler:

.. code-block:: python

    from hmclab import HMC

    HMC_instance = HMC()

    # Sampling using the instance method
    HMC_instance.sample(distribution, "samples.h5")

All of the classes inherit from :class:`._AbstractSampler`; a base class outlining
required methods and their signatures (required in- and outputs).


"""
import warnings as _warnings
from abc import ABC as _ABC
from abc import abstractmethod as _abstractmethod
from time import time as _time
from datetime import datetime as _datetime
from typing import Union as _Union
from typing import List as _List
from typing import Dict as _Dict
from multiprocess import (
    Process as _Process,
    Pipe as _Pipe,
    Value as _Value,
    Queue as _Queue,
)
import copy as _copy
import h5py as _h5py
from matplotlib import pyplot as _plt
import numpy as _numpy
import tqdm.auto as _tqdm_au

from hmclab.Distributions import _AbstractDistribution
from hmclab.MassMatrices import Unit as _Unit
from hmclab.MassMatrices import _AbstractMassMatrix
from hmclab.Helpers.Timers import AccumulatingTimer as _AccumulatingTimer
from hmclab.Helpers.CustomExceptions import InvalidCaseError

import ipywidgets as _widgets
from IPython.display import display as _display

from hmclab.Helpers.CustomExceptions import AbstractMethodError as _AbstractMethodError

dev_assertion_message = (
    "Something went wrong internally, please report this to the developers."
)


class H5FileOpenedError(FileExistsError):
    """An internal exception that helps us keep track of H5 files."""

    pass


[docs]class _AbstractSampler(_ABC): """Abstract base class for Markov chain Monte Carlo samplers.""" # Essential attributes ------------------------------------------------------------- name: str = "Monte Carlo sampler abstract base class" """The name of the sampler""" rng = None dimensions: int = None """An integer representing the dimensionality in which the MCMC sampler works.""" distribution: _AbstractDistribution = None """The _AbstractDistribution-derived object on which the sampler works.""" samples_hdf5_filename: str = None """A string containing the path+filename of the hdf5 file to which samples will be stored.""" samples_hdf5_filehandle = None """A HDF5 file handle of the hdf5 file to which samples will be stored.""" samples_hdf5_dataset = None """A string containing the HDF5 group of the hdf5 file to which samples will be stored. """ ram_buffer_size: int = None """A positive integer indicating the size of the RAM buffer in amount of samples.""" ram_buffer: _numpy.ndarray = None """A NumPy ndarray containing the samples that are as of yet not written to disk.""" current_model: _numpy.ndarray = None """A NumPy array containing the model at the current state of the Markov chain. """ proposed_model: _numpy.ndarray = None """A NumPy array containing the model at the proposed state of the Markov chain. """ current_x: float = None r"""A float containing :math:`\chi = -\log\left( p\\right)` (i.e. the misfit, negative log probability) of the distribution at the current state of the Markov chain.""" proposed_x: float = None r"""A float containing :math:`\chi = -\log\left( p\\right)` (i.e. the misfit, negative log probability) of the distribution at the proposed state of the Markov chain. """ # Statistics attributes ------------------------------------------------------------ accepted_proposals: int = None """A positive integer representing the amount of accepted proposals.""" amount_of_writes: int = None """A positive integer representing the amount of times the sampler has written to disk.""" progressbar_refresh_rate: float = 0.25 """A float representing how many seconds lies between an update of the progress bar statistics (acceptance rate etc.).""" max_time: float = None """A float representing the maximum time in seconds that sampling is allowed to take before it is automatically terminated. The value None is used for unlimited time.""" times_started: int = 0 """An integer representing how often this sampler object has started sampling.""" proposals: int = None """An integer representing the amount of requested proposals for a run.""" online_thinning: int = None """An integer representing the interval between stored samples. Defaults to 1, i.e. all samples are stored.""" current_proposal: int = 0 """An integer representing the current proposal index (before thinning) of the Markov chain.""" current_proposal_after_thinning: int = 0 """An integer representing the current proposal index after thinning of the Markov chain.""" accepted_proposals = 0 """An integer representing the amount of accepted proposals.""" disable_progressbar: bool = False """A bool describing whether or not to disable the TQDM progressbar""" # distribution = type("NoDistribution", (object,), {"name": None}) # """The distribution """ # TODO: assert why this was in the class # Diagnostics attributes ----------------------------------------------------------- end_time: _datetime = None """Time (and date) at which sampling was terminated / finished.""" start_time: _datetime = None """Time (and date) at which sampling was started.""" diagnostic_mode: bool = True """Boolean describing whether functions need to be profiled for performance.""" # TODO: figure out how to do _List[fn] functions_to_diagnose: _List = [] """Array of functions to be profiled, typically contains functions only from the abstract base class.""" sampler_specific_functions_to_diagnose: _List = [] """Array of sampler specific functions to be profiled.""" main_thread_keyboard_interrupt = None # TODO: figure out what this does # Parallel sampling attributes ----------------------------------------------------- parallel: bool = False """A boolean describing if this sampler works in an array of multiple samplers.""" sampler_index: int = 0 """An integer describing the index of the parallel sampler in the array of samplers. Essential for two-way communication.""" # TODO: add type hint exchange_schedule = None """A pre-communicated exchange schedule determining when and which samplers try to exchange states.""" # TODO: add type hint pipe_matrix = None """A matrix of two-way communicators between samplers. Indexed using `sampler_index.`""" exchange_interval: int = None """Integer describing how many samples lie between the attempted swap of states between samplers.""" def __str__(self) -> str: """Method for converting a sampler object to string, handy in outputs. Works for derived classes.""" return f"An instance of the {self.name} sampler object." def _widget_data(self) -> _Dict: """Method for returning post-sampling summary data to be displayed in e.g. Jupyter widgets.""" # Run details (panel 1) -------------------------------------------------------- run_details = {} proposed_samples = self.current_proposal if self.current_proposal > 0 else None acceptance_rate = self.accepted_proposals / (self.current_proposal + 1) if not (self.start_time is None or self.end_time is None): runtime = (self.end_time - self.start_time).total_seconds() run_details["local start time (not timezone aware)"] = self.start_time run_details["runtime (seconds)"] = runtime run_details["proposals per seconds"] = proposed_samples / runtime written_samples = ( self.current_proposal_after_thinning + 1 if self.current_proposal_after_thinning > 0 else None ) run_details["acceptance rate"] = acceptance_rate run_details["output file"] = self.samples_hdf5_filename run_details["proposals made (excluding first position)"] = proposed_samples run_details["samples written (after online thinning)"] = written_samples run_details["amount of writes"] = self.amount_of_writes run_details["dimensions"] = self.dimensions run_details["distribution"] = str( ( self.distribution.name if (self.distribution.name is not None) else self.distribution ) if self.distribution is not None else None ) # Tuning settings (panel 2) ---------------------------------------------------- settings = {} settings["proposals"] = self.proposals settings["online thinning (store every ...-th sample)"] = self.online_thinning # Combine with algorithm specific settings settings = {**settings, **self._tuning_settings()} # Algorithm (panel 3) ---------------------------------------------------------- algorithm = {} algorithm["algorithm used"] = self.name algorithm["diagnostic mode"] = self.diagnostic_mode algorithm["ram buffer size"] = self.ram_buffer_size algorithm["this object has started sampling ... times"] = self.times_started return { "Details of last run": run_details, "Tuning settings": settings, "Algorithm": algorithm, }
[docs] def print_results(self) -> None: """Print Jupyter widget from `_repr_html_()` to stdout.""" print(self._repr_html_())
def _repr_html_( self, nested: bool = False, widget_data: _Dict = None ) -> _Union[None, _widgets.Tab]: """Create a Jupyter widget with the sampling results and statistics.""" default_layout = _widgets.Layout(padding="10px") # Helper function to make a Tab from a Python Dictionary def dictionary_to_widget(dictionary): left_column = _widgets.VBox( [_widgets.Label(str(key)) for key in dictionary.keys()], layout=default_layout, ) right_column = _widgets.VBox( [_widgets.Label(str(key)) for key in dictionary.values()], layout=default_layout, ) return _widgets.HBox([left_column, right_column], layout=default_layout) # Obtain sampling data if widget_data is None: widget_data = self._widget_data() # Create tab object tab = _widgets.Tab() # Populate children with sampling data tab.children = [ dictionary_to_widget(panel_data) for panel_data in widget_data.values() ] panel_headings = [key for key in widget_data.keys()] for i in range(len(tab.children)): tab.set_title(i, panel_headings[i]) # Return results, or print and return nothing. if nested: # This is used when multiple samplers ran in parallel, and the tabs of each # individual sampler still need to be combined. return tab else: _display(tab) return ""
[docs] def __init__( self, seed: int = None, ) -> None: """Constructor that sets the random number generator for the sampler. Pass a seed to make a Markov chain reproducible (given that all settings are re-used.) Parameters ---------- seed : int Description of parameter `seed`. """ if seed is None: self.rng = _numpy.random.default_rng() else: self.rng = _numpy.random.default_rng(seed)
def _init_sampler( self, samples_hdf5_filename: str, distribution: _AbstractDistribution, initial_model: _numpy.ndarray, proposals: int, online_thinning: int, ram_buffer_size: int, overwrite_existing_file: bool, max_time: int, disable_progressbar: bool = False, diagnostic_mode: bool = False, **kwargs, ): """A method that is called everytime any markov chain sampler object is constructed. Args: samples_hdf5_filename ([type]): [description] distribution ([type]): [description] initial_model ([type]): [description] proposals ([type]): [description] online_thinning ([type]): [description] ram_buffer_size ([type]): [description] overwrite_existing_file ([type]): [description] max_time ([type]): [description] """ assert type(samples_hdf5_filename) == str, ( f"First argument should be a string containing the path of the file to " f"which to write samples. It was an object of type " f"{type(samples_hdf5_filename)}." ) # Parse the distribution ------------------------------------------------------- # Store the distribution assert issubclass(type(distribution), _AbstractDistribution), ( "The passed target distribution should be a derived class of " "_AbstractDistribution." ) self.distribution = distribution # Extract dimensionality from the distribution assert type(distribution.dimensions) == int and distribution.dimensions > 0, ( "The passed target distribution should have an integer dimension larger " "than zero." ) self.dimensions = distribution.dimensions # Set up proposals ------------------------------------------------------------- # Assert that proposals is a positive integer assert type(proposals) == int and proposals > 0, ( "The amount of proposal requested (`proposals`) should be an integer " "number larger than zero." ) self.proposals = proposals # Assert that online_thinning is a positive integer assert type(online_thinning) == int and online_thinning > 0, ( "The amount of online thinning (`online_thinning`) should be an integer " "number larger than zero." ) self.online_thinning = online_thinning # Assert that we would write out the last sample, preventing wasteful # computations assert self.proposals % self.online_thinning == 0, ( "The amount of proposals (`proposals`) needs to be a multiple of the " "online thinning (`online_thinning`) number, to prevent sample wastage." ) self.proposals_after_thinning = int(self.proposals / self.online_thinning) # Set up the sample RAM buffer ------------------------------------------------- if ram_buffer_size is not None: # Assert that ram_buffer_size is a positive integer assert type(ram_buffer_size) == int and ram_buffer_size > 0, ( "The ram buffer size (`ram_buffer_size`) needs to be an integer larger " "than zero." ) # Set the ram buffer size self.ram_buffer_size = ram_buffer_size else: # This is all automated stuff. You can force any size by setting it # manually. # Detailed explanation: we strive for approximately 1 gigabyte in # memory before we write to disk, by default. The amount of floats that are # in memory is calculated as follows: (dimensions + 1) * # ram_buffer_size. The plus ones comes from requiring to store the # misfit. 1 gigabyte is approximately 1e8 64 bits floats (actually 1.25e8). # Additionally, there is a cap at 10000 samples. ram_buffer_size = min(int(_numpy.floor(1e8 / self.dimensions)), 10000) # Reduce this number until it fits in the amount of proposals while self.proposals_after_thinning % ram_buffer_size != 0: ram_buffer_size = ram_buffer_size - 1 # Now, this number might be larger than the actual amount of samples, so we # take the minimum of this and the amount of proposals to write as the # actual ram size. self.ram_buffer_size = min(ram_buffer_size, self.proposals_after_thinning) assert type(self.ram_buffer_size) == int, dev_assertion_message shape = (self.dimensions + 1, self.ram_buffer_size) self.ram_buffer = _numpy.empty(shape, dtype=_numpy.float64) # Set up the samples file ------------------------------------------------------ # Parse the filename assert ( type(samples_hdf5_filename) == str ), "The samples filename needs to be a string." if samples_hdf5_filename[-3:] != ".h5": samples_hdf5_filename += ".h5" self.samples_hdf5_filename = samples_hdf5_filename # Open the HDF5 file self._open_samples_hdf5( self.samples_hdf5_filename, self.proposals_after_thinning, overwrite=overwrite_existing_file, ) # Set up the initial model and preallocate other necessary arrays -------------- if initial_model is None: initial_model = _numpy.zeros((self.dimensions, 1)) else: initial_model = _numpy.array(initial_model) initial_model.shape = (initial_model.size, 1) assert initial_model.shape == (self.dimensions, 1), ( f"The initial model (`initial_model`) dimension is incompatible with the " f"target distribution. Supplied model shape: {initial_model.shape}." f"Required shape: {(self.dimensions, 1)}" ) self.current_model = initial_model.astype(_numpy.float64) self.current_x = distribution.misfit(self.current_model) assert not _numpy.isnan( self.current_x ), "Initial position in model space gives NaN probability" assert not _numpy.isinf( self.current_x ), "Initial position in model space gives inf/-inf probability" self.proposed_model = _numpy.empty_like(self.current_model) self.proposed_x = _numpy.nan # Set up accepted_proposals for acceptance rate -------------------------------- self.accepted_proposals = 0 self.amount_of_writes = 0 # Set up time limit ------------------------------------------------------------ if max_time is not None: max_time = float(max_time) assert ( max_time > 0.0 ), "The maximal runtime (`max_time`) should be a float larger than zero." self.max_time = max_time self.disable_progressbar = disable_progressbar # Prepare diagnostic mode if needed -------------------------------------------- self.diagnostic_mode = diagnostic_mode if self.diagnostic_mode: self._propose = _AccumulatingTimer(self._propose) self._evaluate_acceptance = _AccumulatingTimer(self._evaluate_acceptance) self._sample_to_ram = _AccumulatingTimer(self._sample_to_ram) self._samples_to_disk = _AccumulatingTimer(self._samples_to_disk) self._update_progressbar = _AccumulatingTimer(self._update_progressbar) self.functions_to_diagnose = [ self._propose, self._evaluate_acceptance, self._sample_to_ram, self._samples_to_disk, self._update_progressbar, ] # Do sampler specific operations ----------------------------------------------- # Set up specifics for each algorithm self._init_sampler_specific(**kwargs) # Write out the tuning settings self._write_tuning_settings() # Create attributes before sampling, such that SWMR works self.samples_hdf5_dataset.attrs["write_index"] = -1 self.samples_hdf5_dataset.attrs["last_written_sample"] = -1 self.samples_hdf5_dataset.attrs["proposals"] = self.proposals self.samples_hdf5_dataset.attrs["acceptance_rate"] = 0 self.samples_hdf5_dataset.attrs["online_thinning"] = self.online_thinning self.samples_hdf5_dataset.attrs["start_time"] = _datetime.now().strftime( "%d-%b-%Y (%H:%M:%S.%f)" ) self.samples_hdf5_dataset.attrs["sampler"] = self.name def _close_sampler(self): self.samples_hdf5_dataset.attrs["acceptance_rate"] = self.accepted_proposals / ( self.current_proposal + 1 ) # Manually check for ram_buffer_size = 1 + failed written sample if self.ram_buffer_size == 1 and _numpy.all( self.samples_hdf5_dataset[:, -1] == 0.0 ): self.samples_hdf5_dataset[:, -1] = self.ram_buffer[:, 0] self.samples_hdf5_dataset.attrs["end_time"] = _datetime.now().strftime( "%d-%b-%Y (%H:%M:%S.%f)" ) self.samples_hdf5_dataset.attrs["runtime"] = str( self.end_time - self.start_time ) self.samples_hdf5_dataset.attrs["runtime_seconds"] = ( self.end_time - self.start_time ).total_seconds() self._close_sampler_specific() self.samples_hdf5_filehandle.close() self.samples_hdf5_filehandle = None self.samples_hdf5_dataset = None if self.diagnostic_mode: # This block shows the percentage of time spent in each part of the sampler. percentage_time_spent = {} print("Detailed statistics:") total_time = (self.end_time - self.start_time).total_seconds() print(f"Total runtime: {total_time:.2f} seconds") for fn in self.functions_to_diagnose: percentage_time_spent[fn.function.__name__] = ( 100 * fn.time_spent / total_time ) print() print("General sampler components:") print("{:<30} {:<30}".format("Function", "percentage of time")) for name, percentage in percentage_time_spent.items(): print("{:<30} {:<30.2f}".format(name, percentage)) percentage_time_spent = {} for fn in self.sampler_specific_functions_to_diagnose: percentage_time_spent[fn.function.__name__] = ( 100 * fn.time_spent / total_time ) print() print(f"{self.name} specific components:") print("{:<30} {:<30}".format("Function", "percentage of time")) for name, percentage in percentage_time_spent.items(): print("{:<30} {:<30.2f}".format(name, percentage)) def _sample_loop(self): """The actual sampling code.""" # This avoids some weird stdout bugs on OSX. Doesn't do anythign than # fiddle with the stdout. print(" ", end="", flush=True) # Create progressbar ----------------------------------------------------------- try: self.proposals_iterator = _tqdm_au.trange( self.proposals, desc=f"Tot. acc rate: {0:.2f}. Progress", leave=True, dynamic_ncols=True, position=self.sampler_index, # only relevant for parallel sampling disable=self.disable_progressbar, ) except Exception: self.proposals_iterator = _tqdm_au.trange( self.proposals, desc=f"Tot. acc rate: {0:.2f}. Progress", leave=True, position=self.sampler_index, # only relevant for parallel sampling disable=self.disable_progressbar, ) # Start time for updating progressbar self.last_update_time = _time() # Capture NumPy exceptions ----------------------------------------------------- class Log(object): messages = [] def write(self, msg): self.messages.append(msg) self.log = Log() _numpy.seterrcall(self.log) _numpy.seterr(all="log") # seterr to known value # Run the Markov process ------------------------------------------------------- self.start_time = _datetime.now() self.times_started += 1 try: # If the sampler is given a maximum time, start timer now scheduled_termination_time = 0 if self.max_time is not None: scheduled_termination_time = _time() + self.max_time # Iterate through amount of proposals for self.current_proposal in self.proposals_iterator: # Propose a new sample. # The underlying method changes when different algorithms are selected. self._propose() # Evaluate acceptance criterium. # The underlying method changes when different algorithms are selected. self._evaluate_acceptance() # Parallel communication section ------------- if self.parallel and ( self.exchange_interval is not None ): # pragma: no cover # Check if chain is in the schedule for current iteration if self.current_proposal % self.exchange_interval == 0 and ( self.sampler_index in self.exchange_schedule[ int(self.current_proposal / self.exchange_interval), : ] ): # Find where in schedule position_in_schedule = _numpy.where( self.sampler_index == self.exchange_schedule[ int(self.current_proposal / self.exchange_interval), : ] )[0][0] # Determine if master (master evaluates swap) if position_in_schedule % 2 == 0: master = False else: master = True # Find counterpart exchange_chain = self.exchange_schedule[ int(self.current_proposal / self.exchange_interval), position_in_schedule + (-1 if master else +1), ] # Find correct pipes left_pipe, right_pipe = self.pipe_matrix.retrieve_pipes( self.sampler_index, exchange_chain ) if self.sampler_index > exchange_chain: pipe = left_pipe else: pipe = right_pipe # Master sends model, then waits for misfit and other chains' # model + misfit if master: (exchange_model,) = pipe.recv() pipe.send([self.current_model]) # Compute misfit of received model exchange_x = self.distribution.misfit(exchange_model) # Compute misfit improvement misfit_improvement = self.current_x - exchange_x # Receive improvement of misfit of the counterpart (counterpart_improvement,) = pipe.recv() # Evaluate acceptance rate of exchange, based on both # improvements. if _numpy.exp( misfit_improvement + counterpart_improvement ) > self.rng.uniform(0, 1): # If accepted, switch models pipe.send([self.current_model]) self.current_model = exchange_model.copy() else: # If not accepted, send the exchanged model back to # other chain, effectively not switching. pipe.send([exchange_model]) else: pipe.send([self.current_model]) (exchange_model,) = pipe.recv() exchange_x = self.distribution.misfit(exchange_model) misfit_improvement = self.current_x - exchange_x pipe.send([misfit_improvement]) (self.current_model,) = pipe.recv() # -------------------------------------------- # If we are on a thinning number ... (i.e. one of the non-discarded # samples) if self.current_proposal % self.online_thinning == 0: # Write sample to ram array self._sample_to_ram() # Calculate the index this sample has after thinning after_thinning = int(self.current_proposal / self.online_thinning) # Check if this number is at the end of the buffer ... if ( after_thinning % self.ram_buffer_size ) == self.ram_buffer_size - 1: # If so, write samples to disk self._samples_to_disk() # Update the progressbar self._update_progressbar() # Check elapsed time if self.max_time is not None and scheduled_termination_time < _time(): # Raise TimeoutError if we're over time raise TimeoutError except KeyboardInterrupt: # Catch SIGINT -------------------------------------- # Assume current proposal couldn't be finished, so ignore it. self.current_proposal -= 1 except TimeoutError: # Catch SIGINT ------------------------------------------- pass except Exception as e: # Any other exception, we don't know how to handle self.proposals_iterator.close() self.proposals_iterator = None self._samples_to_disk() self.end_time = _datetime.now() self._close_sampler() raise e finally: # Write out the last samples not on a full buffer -------------------- self.proposals_iterator.close() self.proposals_iterator = None self._samples_to_disk() self.end_time = _datetime.now() self._close_sampler() def _open_samples_hdf5( self, name: str, length: int, dtype: str = "f8", overwrite: bool = False, ) -> int: # Try to create file try: if overwrite: # honor overwrite flag flag = "w" else: flag = "w-" # Create file, fail if exists and flag == w- self.samples_hdf5_filehandle = _h5py.File(name, flag, libver="latest") except OSError as e: # Catch error on file creations, likely that the file already exists if ( not str(e) == f"Unable to create file (unable to open file: name = '{name}', " f"errno = 17, error message = 'File exists', flags = 15, o_flags = c2)" ): raise H5FileOpenedError( "This file is already opened as HDF5 file. If you " "want to write to it, close the filehandle." ) else: raise e # pragma: no cover # Update the filename in the sampler object for later retrieval self.samples_hdf5_filename = name # Create dataset self.samples_hdf5_dataset = self.samples_hdf5_filehandle.create_dataset( "samples_0", (self.dimensions + 1, 1), maxshape=(self.dimensions + 1, length), # one extra for misfit dtype=dtype, chunks=True, ) self.samples_hdf5_dataset.set_fill_value = _numpy.nan # Set the current index of samples to the start of the file self.samples_hdf5_dataset.attrs["write_index"] = -1 self.samples_hdf5_dataset.attrs["last_written_sample"] = -1 def _update_progressbar(self): if ( self.current_proposal == 0 or (_time() - self.last_update_time) > self.progressbar_refresh_rate or self.current_proposal == self.proposals - 1 ): self.last_update_time = _time() # Calculate acceptance rate acceptance_rate = self.accepted_proposals / (self.current_proposal + 1) if self.parallel: self.proposals_iterator.set_description( f"Sampler {self.sampler_index}, tot. acc rate: " f"{acceptance_rate:.2f}. Progress", refresh=False, ) else: self.proposals_iterator.set_description( f"Tot. acc rate: {acceptance_rate:.2f}. Progress", refresh=False, ) def _sample_to_ram(self): # Calculate proposal number after thinning self.current_proposal_after_thinning = int( self.current_proposal / self.online_thinning ) # Assert that it's an integer (if we only write on end of buffer) assert self.current_proposal % self.online_thinning == 0, dev_assertion_message # Calculate index for the RAM array index_in_ram = self.current_proposal_after_thinning % self.ram_buffer_size # Place samples in RAM self.ram_buffer[:-1, index_in_ram] = self.current_model[:, 0] # Place misfit in RAM self.ram_buffer[-1, index_in_ram] = self.current_x def _samples_to_disk(self): # Calculate proposal number after thinning self.current_proposal_after_thinning = int( _numpy.floor(self.current_proposal / self.online_thinning) ) # This should always be the case if we write during or at the end of sampling, # but not after a KeyboardInterrupt. However, samples are definitely only # written to RAM on a whole number. # assert self.current_proposal % self.online_thinning == 0: # Calculate start/end indices robust_start = ( self.current_proposal_after_thinning - self.current_proposal_after_thinning % self.ram_buffer_size ) end = self.current_proposal_after_thinning + 1 # If there is something to write if ( self.samples_hdf5_dataset.attrs["write_index"] == robust_start or self.samples_hdf5_dataset.attrs["write_index"] == -1 ): # Some sanity checks on the indices assert ( robust_start >= 0 and robust_start <= self.proposals_after_thinning + 1 ), dev_assertion_message assert ( end >= 0 and end <= self.proposals_after_thinning + 1 ), dev_assertion_message assert end >= robust_start, dev_assertion_message # Update the amount of writes self.amount_of_writes += 1 # Update the size in the h5 file size_before = _numpy.copy(self.samples_hdf5_dataset.shape) self.samples_hdf5_dataset.resize( (self.dimensions + 1, self.current_proposal_after_thinning + 1) ) size_after = _numpy.copy(self.samples_hdf5_dataset.shape) # Write nan's to resized array to be sure if actual writing fails, we can # recover self.samples_hdf5_dataset[:, size_before[1] - size_after[1]] = _numpy.nan # Write samples to disk self.samples_hdf5_dataset[:, robust_start:end] = self.ram_buffer[ :, : end - robust_start ] self.ram_buffer.fill(_numpy.nan) # Reset the markers in the HDF5 file self.samples_hdf5_dataset.attrs["write_index"] = end self.samples_hdf5_dataset.attrs["last_written_sample"] = end - 1 self.samples_hdf5_dataset.flush() @_abstractmethod def _propose(self): raise _AbstractMethodError() @_abstractmethod def _evaluate_acceptance(self): """This abstract method evaluates the acceptance criterion in the MCMC algorithm. Pass or fail, it updates the objects attributes accordingly; modifying current_model and current_x as needed.""" raise _AbstractMethodError() @_abstractmethod def _write_tuning_settings(self): """An abstract method that writes all the relevant tuning settings of the algorithm to the HDF5 file.""" raise _AbstractMethodError() @_abstractmethod def _init_sampler_specific(self): """An abstract method that sets up all required attributes and methods for the algorithm.""" raise _AbstractMethodError() @_abstractmethod def _close_sampler_specific(self): """An abstract method that does any post-sampling operations for the algorithm.""" raise _AbstractMethodError() def get_diagnostics(self): if not self.diagnostic_mode: raise InvalidCaseError( "Can't return diagnostics if sampler is not in diagnostic mode" ) return self.functions_to_diagnose + self.sampler_specific_functions_to_diagnose def load_results(self, burn_in: int = 0) -> _numpy.array: assert burn_in >= 0 from hmclab.Samples import Samples as _Samples with _Samples(self.samples_hdf5_filename, burn_in=burn_in) as samples: samples_numpy = samples.numpy return samples_numpy
[docs]class RWMH(_AbstractSampler): stepsize: _Union[float, _numpy.ndarray] = 1.0 """A parameter describing the standard deviation of a multivariate normal (MVN) used as the proposal distribution for Random Walk Metropolis-Hastings. Using a _numpy.ndarray column vector (shape dimensions × 1) will give every dimensions a unique step length. Correlations in the MVN are not yet implemented. Has a strong influence on acceptance rate. **An essential tuning parameter.**""" autotuning: bool = False """A boolean indicating if the stepsize is tuned towards a specific acceptance rate using diminishing adaptive parameters.""" learning_rate: float = 0.75 """A float tuning the rate of decrease at which a new step size is adapted, according to rate = (iteration_number) ** (-learning_rate). Needs to be larger than 0.5 and equal to or smaller than 1.0.""" target_acceptance_rate: float = 0.65 """A float representing the optimal acceptance rate used for autotuning, if autotuning is True.""" acceptance_rates: _numpy.ndarray = None """A NumPy ndarray containing all past acceptance rates. Collected if autotuning is True.""" stepsizes: _numpy.ndarray = None """A NumPy ndarray containing all past step lengths for the RWMH algorithm. Collected if autotuning is True.""" minimal_stepsize: float = 1e-18 """Minimal step length which is chosen if timestep becomes zero or negative during autotuning.""" _stepsize_non_scalar_part = 1.0 name = "Random Walk Metropolis Hastings"
[docs] def sample( self, samples_hdf5_filename: str, distribution: _AbstractDistribution, stepsize: _Union[float, _numpy.ndarray] = 1.0, initial_model: _numpy.ndarray = None, proposals: int = 100, online_thinning: int = 1, diagnostic_mode: bool = False, ram_buffer_size: int = None, overwrite_existing_file: bool = False, max_time: float = None, autotuning: bool = False, target_acceptance_rate: float = 0.65, learning_rate: float = 0.75, queue=None, disable_progressbar=False, ): """Sampling using the Metropolis-Hastings algorithm. Parameters ---------- samples_hdf5_filename: str String containing the (path+)filename of the HDF5 file to contain the samples. **A required parameter.** distribution: _AbstractDistribution The distribution to sample. Should be an instance of a subclass of _AbstractDistribution. **A required parameter.** stepsize: _Union[float, _numpy.ndarray] A parameter describing the standard deviation of a multivariate normal (MVN) used as the proposal distribution for Random Walk Metropolis-Hastings. Using a _numpy.ndarray column vector (shape dimensions × 1) will give every dimensions a unique step length. Correlations in the MVN are not yet implemented. Has a strong influence on acceptance rate. **An essential tuning parameter.** initial_model: _numpy A NumPy column vector (shape dimensions × 1) containing the starting model of the Markov chain. This model will not be written out as a sample. proposals: int An integer representing the amount of proposals the algorithm should make. online_thinning: int An integer representing the degree of online thinning, i.e. the interval between storing samples. diagnostic_mode: bool A boolean describing if subroutines of sampling should be timed. Useful for finding slow parts of the algorithm. Will add overhead to each function. ram_buffer_size: int An integer representing how many samples should be kept in RAM before writing to storage. overwrite_existing_file: bool A boolean describing whether or not to silently overwrite existing files. Use with caution. max_time: float A float representing the maximum time in seconds that sampling is allowed to take before it is automatically terminated. The value None is used for unlimited time. autotuning: bool A boolean describing whether or not stepsize is automatically adjusted. Uses a diminishing adapting scheme to satisfy detailed balance. target_acceptance_rate: float A float between 0.0 and 1.0 that is the target acceptance rate of autotuning. The algorithm will try to achieve this acceptance rate. learning_rate: float A float larger than 0.5 but smaller than or equal to 1.0, describing how aggressively the stepsize is updated. Lower is more aggresive. Arbitrary keyword arguments. Raises ------ AssertionError For any unspecified invalid entry. ValueError For any invalid value of algorithm settings. TypeError For any invalid value of algorithm settings. """ # We put the creation of the sampler entirely in a try/catch block, so we can # actually close the hdf5 file if something goes wrong. try: self._init_sampler( samples_hdf5_filename=samples_hdf5_filename, distribution=distribution, stepsize=stepsize, initial_model=initial_model, proposals=proposals, diagnostic_mode=diagnostic_mode, online_thinning=online_thinning, ram_buffer_size=ram_buffer_size, overwrite_existing_file=overwrite_existing_file, max_time=max_time, autotuning=autotuning, target_acceptance_rate=target_acceptance_rate, learning_rate=learning_rate, disable_progressbar=disable_progressbar, ) except Exception as e: if self.samples_hdf5_filehandle is not None: self.samples_hdf5_filehandle.close() self.samples_hdf5_filehandle = None self.samples_hdf5_dataset = None raise e self._sample_loop() if self.parallel: queue.put({f"{self.sampler_index}": self._widget_data()}) queue.close() return self
def _init_sampler_specific(self, **kwargs): # Parse all possible kwargs for key in [ "stepsize", "autotuning", "target_acceptance_rate", "learning_rate", ]: setattr(self, key, kwargs[key]) kwargs.pop(key) # Autotuning ------------------------------------------------------------------- if self.autotuning: assert self.learning_rate > 0.5 and self.learning_rate <= 1.0, ( f"The learning rate should be larger than 0.5 and smaller than or " f"equal to 1.0, otherwise the Markov chain does not converge. Chosen: " f"{self.learning_rate}" ) if type(self.stepsize) == _numpy.ndarray: self._stepsize_non_scalar_part = self.stepsize self.stepsize = 1.0 assert type(self.stepsize) == float, ( "Autotuning RWMH is only implemented for scalar stepsizes. If you need " "it for non-scalar steps, write us an email." ) self.acceptance_rates = _numpy.empty((self.proposals, 1)) self.stepsizes = _numpy.empty((self.proposals, 1)) if len(kwargs) != 0: raise TypeError( f"Unidentified argument(s) not applicable to sampler: {kwargs}" ) # Assert that step length is either a float and bigger than zero, or a full # matrix / diagonal try: self.stepsize = float(self.stepsize) assert self.stepsize > 0.0, ( "RW-MH step length should be a positive float or a numpy.ndarray. The " "passed argument is a float equal to or smaller than zero." ) except TypeError: self.stepsize = _numpy.asarray(self.stepsize) assert type(self.stepsize) == _numpy.ndarray, ( "RW-MH step length should be a numpy.ndarray of shape (dimensions, 1) " "or a positive float. The passed argument is neither." ) assert self.stepsize.shape == (self.dimensions, 1), ( "RW-MH step length should be a numpy.ndarray of shape (dimensions, 1) " "or a positive float. The passed argument is an ndarray of the wrong " "shape." ) if self.diagnostic_mode: self.distribution.misfit = _AccumulatingTimer(self.distribution.misfit) self.distribution.gradient = _AccumulatingTimer(self.distribution.gradient) self.distribution.corrector = _AccumulatingTimer( self.distribution.corrector ) self.sampler_specific_functions_to_diagnose = [ self.distribution.misfit, self.distribution.gradient, self.distribution.corrector, ] if self.autotuning: self.autotune = _AccumulatingTimer(self.autotune) self.sampler_specific_functions_to_diagnose.append(self.autotune) def _close_sampler_specific(self): if self.autotuning: self.acceptance_rates = self.acceptance_rates[: self.current_proposal] self.stepsizes = self.stepsizes[: self.current_proposal] # Also write these to the hdf5 dataset self.samples_hdf5_dataset.attrs["acceptance_rates"] = self.acceptance_rates self.samples_hdf5_dataset.attrs["stepsizes"] = self.stepsizes def _write_tuning_settings(self): if type(self.stepsize) == float: self.samples_hdf5_dataset.attrs["stepsize"] = self.stepsize else: self.samples_hdf5_dataset.attrs[ "stepsize" ] = self.stepsize.__class__.__name__ def _tuning_settings(self): settings = { "step size" if not self.autotuning else "final step size after tuning": str(self.stepsize) } if self.autotuning: settings = { **settings, **{ "autotuning": self.autotuning, "learning_rate": self.learning_rate, "target_acceptance_rate": self.target_acceptance_rate, "minimal_stepsize": self.minimal_stepsize, }, } return settings def autotune(self, acceptance_rate): # Write out parameters self.acceptance_rates[self.current_proposal] = acceptance_rate self.stepsizes[self.current_proposal] = self.stepsize # Compute weight according to diminishing scheme, see also: # The No-U-Turn Sampler: Adaptively Setting Path Lengths in Hamiltonian # Monte Carlo, Hoffman & Gelman, 2014, equation (5). schedule_weight = (self.current_proposal + 1) ** (-self.learning_rate) if _numpy.isnan(acceptance_rate): acceptance_rate = 0 # Update stepsize self.stepsize -= schedule_weight * ( self.target_acceptance_rate - min(acceptance_rate, 1) ) if self.stepsize <= 0: if self.diagnostic_mode: _warnings.warn( "The timestep of the algorithm went below zero. You possibly " "started the algorithm in a region with extremely strong " "gradients. The sampler will now default to a minimum timestep of " f"{self.minimal_stepsize}. If this doesn't work, and if choosing " "a different initial model does not make this warning go away, try" "setting a smaller minimal time step and initial time step value." ) self.stepsize = max(self.stepsize, self.minimal_stepsize) def _propose(self): # Propose a new model according to the MH Random Walk algorithm with a Gaussian # proposal distribution self.proposed_model = ( self.current_model + self.stepsize * self._stepsize_non_scalar_part * self.rng.normal(size=(self.dimensions, 1)) ) assert self.proposed_model.shape == (self.dimensions, 1), dev_assertion_message def _evaluate_acceptance(self): # Compute new misfit self.proposed_x = self.distribution.misfit(self.proposed_model) acceptance_rate = _numpy.exp(self.current_x - self.proposed_x) if self.autotuning: self.autotune(acceptance_rate) # Evaluate MH acceptance rate if acceptance_rate > self.rng.uniform(0, 1): self.current_model = _numpy.copy(self.proposed_model) self.current_x = self.proposed_x self.accepted_proposals += 1 def plot_stepsizes(self): import matplotlib.pyplot as _plt _plt.semilogy(self.stepsizes) _plt.xlabel("iteration") _plt.ylabel("stepsize") return _plt.gca() def plot_acceptance_rate(self): import matplotlib.pyplot as _plt _plt.semilogy(self.acceptance_rates) _plt.xlabel("iteration") _plt.ylabel("stepsize") return _plt.gca()
[docs]class HMC(_AbstractSampler): stepsize: float = 0.1 """A positive float representing the time step to be used in solving Hamiltons equations. Has a strong influence on acceptance rate. **An essential tuning parameter.**""" amount_of_steps: int = 10 """A positive integer representing the amount of integration steps to be used in solving Hamiltons equations. Has a medium influence on acceptance rate. **An essential tuning parameter.**""" mass_matrix: _AbstractMassMatrix = None """An object representing the artificial masses assigned to each parameters. Needs to be a subtype of _AbstractMassMatrix. Has a strong influence on convergence rate. **An essential tuning parameter.**""" current_momentum: _numpy.ndarray = None """A NumPy ndarray (shape dimensions × 1) containing the momentum at the current state of the Markov chain. Indicates direction in which the model will initially move along its numerical trajectory. Will be resampled from the mass matrix for each proposal.""" current_k: float = _numpy.nan """A float representing the kinetic energy associated with the current state. Typically follows the ChiSquared[dimensions] distribution.""" current_h: float = _numpy.nan """A float representing the total energy associated with the current state.""" proposed_momentum: _numpy.ndarray = None """A NumPy ndarray (shape dimensions × 1) containing the momentum at the proposed state of the Markov chain. Indicates direction in which the model was moving at the end of its numerical trajectory. Will be computed deterministically from the current_momentum, i.e. the state the Markov chain started in.""" proposed_k: float = _numpy.nan """A float representing the kinetic energy associated with the proposed state.""" proposed_h: float = _numpy.nan """A float representing the total energy associated with the proposed state.""" name = "Hamiltonian Monte Carlo" """Sampler name.""" autotuning: bool = False """A boolean indicating if the stepsize is tuned towards a specific acceptance rate using diminishing adaptive parameters.""" learning_rate: float = 0.75 """A float tuning the rate of decrease at which a new step size is adapted, according to rate = (iteration_number) ** (-learning_rate). Needs to be larger than 0.5 and equal to or smaller than 1.0.""" target_acceptance_rate: float = 0.65 """A float representing the optimal acceptance rate used for autotuning, if autotuning is True.""" acceptance_rates: _numpy.ndarray = None """A NumPy ndarray containing all past acceptance rates. Collected if autotuning is True.""" randomize_stepsize: bool = True """Boolean describing whether or not to randomize the stepsize slightly for every trajectory, by a Uniform~[0.5-1.5] * stepsize.""" stepsizes: _numpy.ndarray = None """A NumPy ndarray containing all past stepsizes for the HMC algorithm. Collected if autotuning is True.""" minimal_stepsize: float = 1e-18 """Minimal stepsize which is chosen if stepsize becomes zero or negative during autotuning.""" integrator = None
[docs] def sample( self, samples_hdf5_filename: str, distribution: _AbstractDistribution, stepsize: float = 0.1, randomize_stepsize: bool = None, amount_of_steps: int = 10, mass_matrix: _AbstractMassMatrix = None, integrator: str = "lf", initial_model: _numpy.ndarray = None, proposals: int = 100, online_thinning: int = 1, diagnostic_mode: bool = False, ram_buffer_size: int = None, overwrite_existing_file: bool = False, max_time: float = None, autotuning: bool = False, target_acceptance_rate: float = 0.65, learning_rate: float = 0.75, queue=None, disable_progressbar=False, ): """Sampling using the Hamiltonian Monte Carlo algorithm. Parameters ---------- samples_hdf5_filename: str String containing the (path+)filename of the HDF5 file to contain the samples. **A required parameter.** distribution: _AbstractDistribution The distribution to sample. Should be an instance of a subclass of _AbstractDistribution. **A required parameter.** stepsize: float A positive float representing the time step to be used in solving Hamiltons equations. Has a strong influence on acceptance rate. **An essential tuning parameter.** amount_of_steps: int A positive integer representing the amount of integration steps to be used in solving Hamiltons equations. Has a medium influence on acceptance rate. **An essential tuning parameter.** mass_matrix: _AbstractMassMatrix An object representing the artificial masses assigned to each parameters. Needs to be a subtype of _AbstractMassMatrix. Has a strong influence on convergence rate. One passing None, defaults to the Unit mass matrix. **An essential tuning parameter.** integrator: str String containing "lf", "3s" or "4s" for a leapfrog, 3-stage or 4-stage symplectic integrator respectively, to be used during the trajectory calculation. initial_model: _numpy A NumPy column vector (shape dimensions × 1) containing the starting model of the Markov chain. This model will not be written out as a sample. proposals: int An integer representing the amount of proposals the algorithm should make. online_thinning: int An integer representing the degree of online thinning, i.e. the interval between storing samples. randomize_stepsize: bool A boolean enabling the randomization of the stepsize, which helps avoiding MCMC resonance. diagnostic_mode: bool A boolean describing if subroutines of sampling should be timed. Useful for finding slow parts of the algorithm. Will add overhead to each function. ram_buffer_size: int An integer representing how many samples should be kept in RAM before writing to storage. overwrite_existing_file: bool A boolean describing whether or not to silently overwrite existing files. Use with caution. max_time: float A float representing the maximum time in seconds that sampling is allowed to take before it is automatically terminated. The value None is used for unlimited time. autotuning: bool A boolean describing whether or not stepsize is automatically adjusted. Uses a diminishing adapting scheme to satisfy detailed balance. target_acceptance_rate: float A float between 0.0 and 1.0 that is the target acceptance rate of autotuning. The algorithm will try to achieve this acceptance rate. learning_rate: float A float larger than 0.5 but smaller than or equal to 1.0, describing how aggressively the stepsize is updated. Lower is more aggresive. Raises ------ AssertionError For any unspecified invalid entry. ValueError For any invalid value of algorithm settings. TypeError For any invalid value of algorithm settings. """ # We put the creation of the sampler entirely in a try/catch block, so we can # actually close the hdf5 file if something goes wrong. try: self._init_sampler( samples_hdf5_filename=samples_hdf5_filename, distribution=distribution, stepsize=stepsize, randomize_stepsize=randomize_stepsize, amount_of_steps=amount_of_steps, mass_matrix=mass_matrix, integrator=integrator, initial_model=initial_model, autotuning=autotuning, target_acceptance_rate=target_acceptance_rate, learning_rate=learning_rate, proposals=proposals, diagnostic_mode=diagnostic_mode, online_thinning=online_thinning, ram_buffer_size=ram_buffer_size, overwrite_existing_file=overwrite_existing_file, max_time=max_time, disable_progressbar=disable_progressbar, ) except Exception as e: if self.samples_hdf5_filehandle is not None: self.samples_hdf5_filehandle.close() self.samples_hdf5_filehandle = None self.samples_hdf5_dataset = None if type(e) is FileExistsError: if ( str(e) == "Skipping sampling due to samples file existing. Code " "execution continues." ): return raise e self._sample_loop() if self.parallel: queue.put({f"{self.sampler_index}": self._widget_data()}) return self
def _init_sampler_specific(self, **kwargs): # Parse all possible kwargs for key in ( "stepsize", "randomize_stepsize", "amount_of_steps", "mass_matrix", "integrator", "autotuning", "target_acceptance_rate", "learning_rate", ): setattr(self, key, kwargs[key]) kwargs.pop(key) if len(kwargs) != 0: raise TypeError( f"Unidentified argument(s) not applicable to sampler: {kwargs}" ) # Autotuning ------------------------------------------------------------------- if self.autotuning: assert self.learning_rate > 0.5 and self.learning_rate <= 1.0, ( f"The learning rate should be larger than 0.5 and smaller than or " f"equal to 1.0, otherwise the Markov chain does not converge. Chosen: " f"{self.learning_rate}" ) self.acceptance_rates = _numpy.empty((self.proposals, 1)) self.stepsizes = _numpy.empty((self.proposals, 1)) # Step length ------------------------------------------------------------------ # Assert that step length for Hamiltons equations is a float and bigger than # zero self.stepsize = float(self.stepsize) assert self.stepsize > 0.0, "Stepsize should be a float larger than zero." # Step amount ------------------------------------------------------------------ # Assert that number of steps for Hamiltons equations is a positive integer assert type(self.amount_of_steps) == int, ( "The amount of steps (amount_of_steps) the HMC integrator should make " "should be an integer." ) assert self.amount_of_steps > 0, ( "The amount of steps (amount_of_steps) the HMC integrator should make " "should be larger than zero." ) # Mass matrix ------------------------------------------------------------------ # Set the mass matrix if it is not yet set using the default: a unit mass if self.mass_matrix is None: self.mass_matrix = _Unit(self.dimensions) self.mass_matrix.rng = self.rng # Assert that the mass matrix is the right type and dimension assert isinstance(self.mass_matrix, _AbstractMassMatrix), ( "The passed mass matrix (mass_matrix) should be a class derived from " "_AbstractMassMatrix." ) assert self.mass_matrix.dimensions == self.dimensions, ( f"The passed mass matrix (mass_matrix) should have dimensions equal to the " f"target distribution. Passed: {self.mass_matrix.dimensions}, " f"required: {self.dimensions}." ) # Integrator ------------------------------------------------------------------- self.integrator = str(self.integrator) if self.integrator not in self.available_integrators: raise ValueError( f"Unknown integrator used. Choices are: {self.available_integrators}" ) if self.diagnostic_mode: self.distribution.misfit = _AccumulatingTimer(self.distribution.misfit) self.distribution.gradient = _AccumulatingTimer(self.distribution.gradient) self.mass_matrix.generate_momentum = _AccumulatingTimer( self.mass_matrix.generate_momentum ) self.mass_matrix.kinetic_energy = _AccumulatingTimer( self.mass_matrix.kinetic_energy ) self.mass_matrix.kinetic_energy_gradient = _AccumulatingTimer( self.mass_matrix.kinetic_energy_gradient ) self.distribution.corrector = _AccumulatingTimer( self.distribution.corrector ) self.integrators[self.integrator] = _AccumulatingTimer( self.integrators[self.integrator] ) self.sampler_specific_functions_to_diagnose = [ self.distribution.misfit, self.distribution.gradient, self.integrators[self.integrator], self.mass_matrix.generate_momentum, self.mass_matrix.kinetic_energy, self.mass_matrix.kinetic_energy_gradient, self.distribution.corrector, ] if self.autotuning: self.autotune = _AccumulatingTimer(self.autotune) self.sampler_specific_functions_to_diagnose.append(self.autotune) def _close_sampler_specific(self): if self.autotuning: self.acceptance_rates = self.acceptance_rates[: self.current_proposal] self.stepsizes = self.stepsizes[: self.current_proposal] # Also write these to the hdf5 dataset self.samples_hdf5_dataset.attrs["acceptance_rates"] = self.acceptance_rates self.samples_hdf5_dataset.attrs["stepsizes"] = self.stepsizes def _write_tuning_settings(self): self.samples_hdf5_dataset.attrs["stepsize"] = self.stepsize self.samples_hdf5_dataset.attrs["amount_of_steps"] = self.amount_of_steps self.samples_hdf5_dataset.attrs["mass_matrix"] = self.mass_matrix.name self.samples_hdf5_dataset.attrs["integrator"] = self.integrators_full_names[ self.integrator ] def _tuning_settings(self): settings = { "step size" if not self.autotuning else "final step size after tuning": str(self.stepsize), "amount of steps": self.amount_of_steps, "mass matrix type": self.mass_matrix.name if self.mass_matrix is not None else None, "integrator": self.integrators_full_names[self.integrator] if self.integrator is not None else None, } if self.autotuning: settings = { **settings, **{ "autotuning": self.autotuning, "learning_rate": self.learning_rate, "target_acceptance_rate": self.target_acceptance_rate, "minimal_stepsize": self.minimal_stepsize, }, } return settings def _propose(self): # Generate a momentum sample self.current_momentum = self.mass_matrix.generate_momentum() # Propagate the sample self.integrators[self.integrator](self) def _evaluate_acceptance(self): self.current_x = self.distribution.misfit(self.current_model) self.current_k = self.mass_matrix.kinetic_energy(self.current_momentum) self.current_h = self.current_x + self.current_k self.proposed_x = self.distribution.misfit(self.proposed_model) self.proposed_k = self.mass_matrix.kinetic_energy(self.proposed_momentum) self.proposed_h = self.proposed_x + self.proposed_k acceptance_rate = _numpy.exp(self.current_h - self.proposed_h) if self.autotuning: self.autotune(acceptance_rate) if acceptance_rate > self.rng.uniform(0, 1): self.current_model = _numpy.copy(self.proposed_model) self.current_x = self.proposed_x self.accepted_proposals += 1 def autotune(self, acceptance_rate): # Write out parameters self.acceptance_rates[self.current_proposal] = acceptance_rate self.stepsizes[self.current_proposal] = self.stepsize # Compute weight according to diminishing scheme, see also: # The No-U-Turn Sampler: Adaptively Setting Path Lengths in Hamiltonian # Monte Carlo, Hoffman & Gelman, 2014, equation (5). schedule_weight = (self.current_proposal + 1) ** (-self.learning_rate) if _numpy.isnan(acceptance_rate): acceptance_rate = 0 # Update stepsize proposed_stepsize = self.stepsize - schedule_weight * ( self.target_acceptance_rate - min(acceptance_rate, 1) ) if proposed_stepsize <= 0: if self.diagnostic_mode: _warnings.warn( "The stepsize of the algorithm went below zero. You possibly " "started the algorithm in a region with extremely strong " "gradients. The sampler will now default to a minimum stepsize of " f"{self.minimal_stepsize}. If this doesn't work, and if choosing " "a different initial model does not make this warning go away, try" "setting a smaller minimal stepsize and initial stepsize value." ) proposed_stepsize = self.minimal_stepsize if ( _numpy.abs(_numpy.log10(proposed_stepsize) - _numpy.log10(self.stepsize)) > 1 ): if proposed_stepsize > self.stepsize: self.stepsize *= 10 else: self.stepsize *= 0.1 else: self.stepsize = proposed_stepsize def _propagate_leapfrog( self, ): # Make sure not to alter a view but a copy of arrays --------------------------- position = self.current_model.copy() momentum = self.current_momentum.copy() if self.randomize_stepsize: local_stepsize = self.rng.uniform(0.5, 1.5) * self.stepsize else: local_stepsize = self.stepsize # Leapfrog integration --------------------------------------------------------- position += ( 0.5 * local_stepsize * self.mass_matrix.kinetic_energy_gradient(momentum) ) self.distribution.corrector(position, momentum) # verbose_integration verbose_integration = False if verbose_integration: integration_iterator = _tqdm_au.trange( self.amount_of_steps - 1, position=2, leave=False, desc="Leapfrog integration", ) else: integration_iterator = range(self.amount_of_steps - 1) # Integration loop for i in integration_iterator: # Momentum step momentum -= local_stepsize * self.distribution.gradient(position) # Position step position += local_stepsize * self.mass_matrix.kinetic_energy_gradient( momentum ) # Correct bounds self.distribution.corrector(position, momentum) # Full momentum and half step position after loop ------------------------------ # Momentum step momentum -= local_stepsize * self.distribution.gradient(position) # Position step position += ( 0.5 * local_stepsize * self.mass_matrix.kinetic_energy_gradient(momentum) ) self.distribution.corrector(position, momentum) self.proposed_model = position.copy() self.proposed_momentum = momentum.copy() def _propagate_4_stage_simplified(self): # Schema: (a1,b1,a2,b2,a3,b2,a2,b1,a1) a1 = 0.071353913450279725904 a2 = 0.268548791161230105820 a3 = 1.0 - 2.0 * a1 - 2.0 * a2 b1 = 0.191667800000000000000 b2 = 1.0 / 2.0 - b1 if self.randomize_stepsize: local_stepsize = self.rng.uniform(0.5, 1.5) * self.stepsize else: local_stepsize = self.stepsize a1 *= local_stepsize a2 *= local_stepsize a3 *= local_stepsize b1 *= local_stepsize b2 *= local_stepsize # Make sure not to alter a view but a copy of the passed arrays. position = self.current_model.copy() momentum = self.current_momentum.copy() # Leapfrog integration ------------------------------------------------- for i in range(self.amount_of_steps): # A1 position += a1 * self.mass_matrix.kinetic_energy_gradient(momentum) self.distribution.corrector(position, momentum) # B1 potential_gradient = self.distribution.gradient(position) momentum -= b1 * potential_gradient # A2 position += a2 * self.mass_matrix.kinetic_energy_gradient(momentum) self.distribution.corrector(position, momentum) # B2 potential_gradient = self.distribution.gradient(position) momentum -= b2 * potential_gradient # A3 position += a3 * self.mass_matrix.kinetic_energy_gradient(momentum) self.distribution.corrector(position, momentum) # B2 potential_gradient = self.distribution.gradient(position) momentum -= b2 * potential_gradient # A2 position += a2 * self.mass_matrix.kinetic_energy_gradient(momentum) self.distribution.corrector(position, momentum) # B1 potential_gradient = self.distribution.gradient(position) momentum -= b1 * potential_gradient # A1 position += a1 * self.mass_matrix.kinetic_energy_gradient(momentum) self.distribution.corrector(position, momentum) # For the update self.proposed_model = _numpy.copy(position) self.proposed_momentum = _numpy.copy(momentum) def _propagate_3_stage_simplified(self): # Schema: (a1,b1,a2,b2,a2,b1,a1) a1 = 0.11888010966548 a2 = 1.0 / 2.0 - a1 b1 = 0.29619504261126 b2 = 1.0 - 2.0 * b1 if self.randomize_stepsize: local_stepsize = self.rng.uniform(0.5, 1.5) * self.stepsize else: local_stepsize = self.stepsize a1 *= local_stepsize a2 *= local_stepsize b1 *= local_stepsize b2 *= local_stepsize # Make sure not to alter a view but a copy of the passed arrays. position = self.current_model.copy() momentum = self.current_momentum.copy() # Leapfrog integration ------------------------------------------------- for i in range(self.amount_of_steps): # A1 position += a1 * self.mass_matrix.kinetic_energy_gradient(momentum) self.distribution.corrector(position, momentum) # B1 potential_gradient = self.distribution.gradient(position) momentum -= b1 * potential_gradient # A2 position += a2 * self.mass_matrix.kinetic_energy_gradient(momentum) self.distribution.corrector(position, momentum) # B2 potential_gradient = self.distribution.gradient(position) momentum -= b2 * potential_gradient # A2 position += a2 * self.mass_matrix.kinetic_energy_gradient(momentum) self.distribution.corrector(position, momentum) # B1 potential_gradient = self.distribution.gradient(position) momentum -= b1 * potential_gradient # A1 position += a1 * self.mass_matrix.kinetic_energy_gradient(momentum) self.distribution.corrector(position, momentum) self.proposed_model = position.copy() self.proposed_momentum = momentum.copy() integrators = { "lf": _propagate_leapfrog, "3s": _propagate_3_stage_simplified, "4s": _propagate_4_stage_simplified, } integrators_full_names = { "lf": "leapfrog integrator", "3s": "three stage integrator", "4s": "four stage integrator", } available_integrators = integrators.keys() def plot_stepsizes(self): import matplotlib.pyplot as _plt _plt.semilogy(self.stepsizes) _plt.xlabel("iteration") _plt.ylabel("stepsize") return _plt.gca() def plot_acceptance_rate(self): import matplotlib.pyplot as _plt _plt.semilogy(self.acceptance_rates) _plt.xlabel("iteration") _plt.ylabel("acceptance rate") return _plt.gca()
class PipeMatrix: def __init__(self, n_endpoints): self.n_endpoints = n_endpoints self.left_pipes = [ [None for i in range(n_endpoints)] for i in range(n_endpoints) ] self.right_pipes = [ [None for i in range(n_endpoints)] for i in range(n_endpoints) ] for point1 in range(self.n_endpoints): for point2 in range(point1): left, right = _Pipe(True) self.left_pipes[point1][point2] = left self.right_pipes[point1][point2] = right # The following pipe is to communicate with the main process. Left is used # to send from main, right to receive at fork. Is used to send interrupts. left, right = _Pipe(True) self.left_pipes[point1][point1] = left self.right_pipes[point1][point1] = right def pipes_to_subprocess(self): return [self.left_pipes[point][point] for point in range(self.n_endpoints)] def pipes_from_main(self): return [self.right_pipes[point][point] for point in range(self.n_endpoints)] def retrieve_pipes(self, point1, point2): assert point1 != point2 left_point = max(point1, point2) right_point = min(point1, point2) return ( self.left_pipes[left_point][right_point], self.right_pipes[left_point][right_point], ) def close(self): for point1 in range(self.n_endpoints): for point2 in range(point1): self.left_pipes[point1][point2].close() self.right_pipes[point1][point2].close() class MyProc(_Process): pass class ParallelSampleSMP: # SMP stands for Shared Memory Parallelism, i.e. single machine parallel sampling # Initlization makes a deepcopy of the samplers, but NOT of the posteriors. def __init__(self, seed=None): if seed is None: self.rng = _numpy.random.default_rng() else: self.rng = _numpy.random.default_rng(seed) def sample( self, samplers: _AbstractSampler, filenames: str, posteriors: _AbstractDistribution, overwrite_existing_files=False, proposals: int = 100, exchange: bool = True, exchange_interval: int = 1, initial_model: _Union[_numpy.array, _List[_numpy.array]] = None, kwargs: _Union[_Dict, _List[_Dict]] = None, ): assert overwrite_existing_files, ( "You have to manually enable overwriting samples. This is for safety. The " "existing file dialog doesn't work in the parallel case. " "Set `overwrite_existing_files=True`." ) number_of_chains = len(samplers) # Save passed parameters and check their shapes. ------------------------------- self.samplers = _copy.deepcopy(samplers) assert len(filenames) == number_of_chains, ( f"The number of supplied initial models ({len(filenames)}) " f"is not equal to the amount of chains ({number_of_chains}). " f"Supply {number_of_chains} models." ) assert len(posteriors) == number_of_chains, ( f"The number of supplied initial models ({len(posteriors)}) " f"is not equal to the amount of chains ({number_of_chains}). " f"Supply {number_of_chains} posteriors." ) if type(initial_model) == list: assert len(initial_model) == number_of_chains, ( f"The number of supplied initial models ({len(initial_model)}) " f"is not equal to the amount of chains ({number_of_chains}). " f"Supply either 1 or {number_of_chains} models." ) if type(kwargs) == list: assert len(kwargs) == number_of_chains, ( f"The number of supplied kwargs dictionaries ({len(kwargs)}) " f"is not equal to the amount of chains ({number_of_chains}). " f"Supply either 1 or {number_of_chains} kwargs. Note that even if you " f"don't want to pass arguments to one of the chains, and empty " f"dictionary still has to be included." ) # Exchange and parallel communication details ---------------------------------- # All markov chain processes in an array ps = [] # If we need to exchange samples, create a schedule and a communication matrix. if exchange: # Every separate chain can at most switch 1 time per proposal. The total is # rounded down, because every chain needs a partner chain. exchanges_per_proposal = int(_numpy.floor(number_of_chains / 2)) exchange_schedule = _numpy.vstack( [ self.rng.choice( number_of_chains, exchanges_per_proposal * 2, replace=False ) for i in range(int(proposals / exchange_interval)) ] ) # Communication object pipe_matrix = PipeMatrix(number_of_chains) else: exchange_schedule = None pipe_matrix = None exchange_interval = None self.exchange_schedule = exchange_schedule # A queue to which the final sampling details are passed at the end of sampling self.queue = _Queue(maxsize=number_of_chains) main_thread_keyboard_interrupt = _Value("f", 0) # Settings within the samplers ------------------------------------------------- # Do modifications to the samplers that we don't want to do in the # constructor or sampling function. The reasoning here is that it is better # to avoid obfuscating the non-parallel code. for i, sampler in enumerate(self.samplers): sampler.parallel = True sampler.exchange_interval = exchange_interval sampler.sampler_index = i sampler.exchange_schedule = exchange_schedule sampler.pipe_matrix = pipe_matrix sampler.main_thread_keyboard_interrupt = main_thread_keyboard_interrupt # Start parallel sampling try: # These arguments need to be passed to all chains fixed_kwargs = { "overwrite_existing_file": True, "proposals": proposals, "queue": self.queue, } for i_chain, (sampler, filename, posterior) in enumerate( zip(self.samplers, filenames, posteriors) ): if type(initial_model) == list: chain_initial_model = initial_model[i_chain] else: chain_initial_model = initial_model if type(kwargs) == list: chain_kwargs = kwargs[i_chain] elif kwargs is None: chain_kwargs = {} else: chain_kwargs = kwargs total_kwargs = { **{"initial_model": chain_initial_model}, **chain_kwargs, **fixed_kwargs, } ps.append( MyProc( target=sampler.sample, args=(filename, posterior), kwargs=total_kwargs, ) ) print(f"Starting {number_of_chains} markov chains...") for p in ps: p.start() for p in ps: p.join() except KeyboardInterrupt: # Keyboard interrupt is also send automatically to subprocesses, so the only # thing left to do is wait on them. for p in ps: p.join() except Exception as e: if exchange: pipe_matrix.close() raise e finally: self.sampler_widget_data = [] while not self.queue.empty(): self.sampler_widget_data.append(self.queue.get()) # Beat it into the correct format data = {} for idata in self.sampler_widget_data: data = {**data, **idata} self.sampler_widget_data = data if exchange: pipe_matrix.close() return self def _repr_html_(self): tab = _widgets.Tab() if self.sampler_widget_data: tab.children = [ sampler._repr_html_( nested=True, widget_data=self.sampler_widget_data[f"{i}"] ) for i, sampler in enumerate(self.samplers) ] for i in range(len(tab.children)): tab.set_title(i, f"Sampler {i}") _display(tab) return "" def print_results(self): print(self._repr_html_()) class _AbstractVisualSampler(_AbstractSampler): """This class acts as a superclass to all possible visual variations on the algorithms. We directly 'hijack' 3 methods; the constructor, _init_sampler and _close_sampler, to create and close the plots. The beauty is that this can still call the original methods from _AbstractSampler. We also need to define some general function (i.e. that are used for all visual samplers) that override subclass methods. Since this is not possible from the superclass, we provide that function (update plots) itself to be linked in the subclass. """ misfits_to_plot: _numpy.array """Array of stored misfits that are plotted""" samples_to_plot: _numpy.array """Array of stored samples that are plotted""" plot_update_interval: int = 100 """Update interval (in number of samples) of the plots. Might strongly influence algorithm performance.""" animate_proposals: bool = False """Whether to animate the proposals themselves. Animation is controlled by subclass.""" leave_proposal_animation: bool = False """Whether to leave the animation of the proposals.""" animation_domain = None """Array describing the extents of the animation domain for the samples, in [xmin, xmax, ymin, ymax]. If not supplied, the domain is dynamically extended.""" dims_to_plot = [0, 1] """Which dimensions to animate samples for.""" background_image = None """Array storing a background image to plot behind the samples.""" def __init__( self, plot_update_interval=None, dims_to_plot=None, animate_proposals=None, leave_proposal_animation=None, animation_domain=None, background=None, seed=None, ): # Parse parameters if plot_update_interval is not None: self.plot_update_interval = plot_update_interval if dims_to_plot is not None: self.dims_to_plot = dims_to_plot if animate_proposals is not None: self.animate_proposals = animate_proposals # If we are animating proposals, we take a neglegible performance hit to draw, # every sample, hence we set the interval to 1. if self.animate_proposals: self.plot_update_interval = 1 if leave_proposal_animation is not None: self.leave_proposal_animation = leave_proposal_animation if animation_domain is not None: self.animation_domain = animation_domain if background is not None: self.x1s, self.x2s, self.background_image = background # Call the original constructor super().__init__(seed=seed) def _init_sampler( self, samples_hdf5_filename: str, distribution: _AbstractDistribution, initial_model: _numpy.ndarray, proposals: int, online_thinning: int, ram_buffer_size: int, overwrite_existing_file: bool, max_time: int, disable_progressbar: bool = False, diagnostic_mode: bool = False, **kwargs, ): dimensions = distribution.dimensions for dim in self.dims_to_plot: assert ( dim < dimensions ), "You requested to animate a dimension which is not in the distribution" assert self.plot_update_interval > 0 if self.animation_domain is not None: assert self.animation_domain[1] > self.animation_domain[0] assert self.animation_domain[3] > self.animation_domain[2] # Create arrays to store animation parameters self.misfits_to_plot = _numpy.empty((proposals, 1)) self.samples_to_plot = _numpy.empty((proposals, 2)) # Create collection of plots self.plots = {} self.plots["global_misfit"] = {} self.plots["samples"] = {} # Create figure self.plots["figure"] = _plt.figure(figsize=(10, 5)) _plt.show(block=False) # Subplot 1; misfits over time ar = 2 if self.animation_domain is not None: ar = 1 + (self.animation_domain[1] - self.animation_domain[0]) / ( self.animation_domain[3] - self.animation_domain[2] ) a0, a1 = self.plots["figure"].subplots( 1, 2, gridspec_kw={"width_ratios": [1, ar]} ) self.plots["global_misfit"]["axis"] = a0 # _plt.subplot(121) self.plots["global_misfit"]["title"] = _plt.title("Misfit over time") self.plots["global_misfit"]["axis"].set_xlim([0, proposals]) self.plots["global_misfit"]["axis"].set_xlabel("sample index") self.plots["global_misfit"]["axis"].set_ylabel("Unnormalized -log(p)") self.plots["global_misfit"]["scatterplot"] = None # Subplot 2; samples over time self.plots["samples"]["axis"] = a1 # _plt.subplot(122) self.plots["samples"]["axis"].set_aspect(1) self.plots["samples"]["title"] = _plt.title("2d samples over time") self.plots["samples"]["axis"].set_xlabel( f"Model dimension {self.dims_to_plot[0]}" ) self.plots["samples"]["axis"].set_ylabel( f"Model dimension {self.dims_to_plot[1]}" ) self.plots["samples"]["scatterplot"] = None if self.background_image is not None: self.plots["samples"]["axis"].contour( self.x1s, self.x2s, _numpy.exp(-self.background_image), levels=20, alpha=0.5, zorder=0, ) if self.animation_domain is not None: self.plots["samples"]["axis"].set_xlim( [self.animation_domain[0], self.animation_domain[1]] ) self.plots["samples"]["axis"].set_ylim( [self.animation_domain[2], self.animation_domain[3]] ) self.plots["samples"]["legend"] = None # Run original function return super()._init_sampler( samples_hdf5_filename, distribution, initial_model, proposals, online_thinning, ram_buffer_size, overwrite_existing_file, max_time, disable_progressbar=disable_progressbar, diagnostic_mode=diagnostic_mode, **kwargs, ) def _update_plots_after_acceptance(self, force=False): if len(_plt.get_fignums()) == 0: # If the figure is closed by the user, we skip the plotting return # Load current state self.misfits_to_plot[self.current_proposal] = self.current_x self.samples_to_plot[self.current_proposal, :] = self.current_model[ self.dims_to_plot ].flatten() # Beat the data into a nice shape index, misfit = ( _numpy.arange(self.misfits_to_plot[: self.current_proposal + 1].size)[ :, None ], self.misfits_to_plot[: self.current_proposal + 1], ) samples_x, samples_y = ( self.samples_to_plot[: self.current_proposal + 1, 0, None], self.samples_to_plot[: self.current_proposal + 1, 1, None], ) if self.plots["global_misfit"]["scatterplot"] is None: # Create plots if they are not there yet ... self.plots["global_misfit"]["scatterplot"] = self.plots["global_misfit"][ "axis" ].scatter(index, misfit, s=10, c="r") self.plots["samples"]["scatterplot"] = self.plots["samples"][ "axis" ].scatter(samples_x, samples_y, s=30, label="samples") if self.plots["samples"]["legend"] is None: self.plots["samples"]["legend"] = self.plots["samples"]["axis"].legend() else: # ... or update them if self.current_proposal % self.plot_update_interval == 0 or force: self.plots["global_misfit"]["scatterplot"].set_offsets( _numpy.hstack((index, misfit)) ) ymin = misfit.min() - 0.1 * (misfit.max() - misfit.min()) ymax = misfit.max() + 0.1 * (misfit.max() - misfit.min()) if ymin == ymax: ymin -= 0.5 ymax += 0.5 self.plots["global_misfit"]["axis"].set_ylim( [ ymin, ymax, ] ) self.plots["samples"]["scatterplot"].set_offsets( _numpy.hstack((samples_x, samples_y)) ) if self.animation_domain is None: # Update the bounds as fit if they were not given by the user xmin, xmax = samples_x.min(), samples_x.max() ymin, ymax = samples_y.min(), samples_y.max() if xmin == xmax: xmin -= 0.5 xmax += 0.5 if ymin == ymax: ymin -= 0.5 ymax += 0.5 self.plots["samples"]["axis"].set_xlim([xmin, xmax]) self.plots["samples"]["axis"].set_ylim([ymin, ymax]) self.plots["figure"].canvas.draw() _plt.pause(0.00001) def _close_sampler(self): # Set the maximum sample on the misfit plot to the latest sample self._update_plots_after_acceptance(force=True) self.plots["global_misfit"]["axis"].set_xlim([0, self.current_proposal + 1]) self.plots["figure"].canvas.draw() _plt.pause(0.00001) _plt.close() return super()._close_sampler() class RWMH_visual(_AbstractVisualSampler, RWMH): """Visual version of Random Walk Metropolis Hastings""" def _evaluate_acceptance(self): """Animate every new sample after criterion evaluation""" return_value = super()._evaluate_acceptance() self._update_plots_after_acceptance() return return_value class HMC_visual(_AbstractVisualSampler, HMC): """Visual version of Hamiltonian Monte Carlo""" def _evaluate_acceptance(self): """Animate every new sample after criterion evaluation""" return_value = super()._evaluate_acceptance() self._update_plots_after_acceptance() return return_value def _propagate_leapfrog_visual(self): """Animate the leapfrog integration.""" if not self.animate_proposals or len(_plt.get_fignums()) == 0: # If the figure is closed by the user, we skip the plotting return super()._propagate_leapfrog() position = self.current_model.copy() momentum = self.current_momentum.copy() # These are the positions stored for animating the trajectory positions_x = _numpy.array([]) positions_y = _numpy.array([]) positions_x = _numpy.append(positions_x, position[self.dims_to_plot[0]]) positions_y = _numpy.append(positions_y, position[self.dims_to_plot[1]]) self.plots["samples"]["scatterplot_proposal"] = self.plots["samples"][ "axis" ].plot(positions_x, positions_y, "r", alpha=0.5, label="trajectories", zorder=0) line = self.plots["samples"]["scatterplot_proposal"].pop(0) self.plots["figure"].canvas.draw() _plt.pause(0.00001) if self.randomize_stepsize: local_stepsize = self.rng.uniform(0.5, 1.5) * self.stepsize else: local_stepsize = self.stepsize # Leapfrog integration --------------------------------------------------------- position += ( 0.5 * local_stepsize * self.mass_matrix.kinetic_energy_gradient(momentum) ) line.set_xdata( _numpy.append(line.get_xdata().flatten(), position[self.dims_to_plot[0]]) ) line.set_ydata( _numpy.append(line.get_ydata().flatten(), position[self.dims_to_plot[1]]) ) positions_x = _numpy.array([]) positions_y = _numpy.array([]) positions_x = _numpy.append(positions_x, position[self.dims_to_plot[0]]) positions_y = _numpy.append(positions_y, position[self.dims_to_plot[1]]) self.plots["samples"]["scatterplot_proposal_grads"] = self.plots["samples"][ "axis" ].plot( positions_x, positions_y, "r", marker=".", ls="", alpha=1, markersize=3, label="computed gradients", zorder=0, ) line_grads = self.plots["samples"]["scatterplot_proposal_grads"].pop(0) line_grads.set_xdata( _numpy.append( line_grads.get_xdata().flatten(), position[self.dims_to_plot[0]] ) ) line_grads.set_ydata( _numpy.append( line_grads.get_ydata().flatten(), position[self.dims_to_plot[1]] ) ) self.plots["figure"].canvas.draw() _plt.pause(0.00001) self.distribution.corrector(position, momentum) # verbose_integration verbose_integration = False if verbose_integration: integration_iterator = _tqdm_au.trange( self.amount_of_steps - 1, position=2, leave=False, desc="Leapfrog integration", ) else: integration_iterator = range(self.amount_of_steps - 1) # Integration loop for i in integration_iterator: # Momentum step momentum -= local_stepsize * self.distribution.gradient(position) # Position step position += local_stepsize * self.mass_matrix.kinetic_energy_gradient( momentum ) # Correct bounds self.distribution.corrector(position, momentum) line.set_xdata( _numpy.append( line.get_xdata().flatten(), position[self.dims_to_plot[0]] ) ) line.set_ydata( _numpy.append( line.get_ydata().flatten(), position[self.dims_to_plot[1]] ) ) line_grads.set_xdata( _numpy.append( line_grads.get_xdata().flatten(), position[self.dims_to_plot[0]] ) ) line_grads.set_ydata( _numpy.append( line_grads.get_ydata().flatten(), position[self.dims_to_plot[1]] ) ) self.plots["figure"].canvas.draw() _plt.pause(0.00001) # Full momentum and half step position after loop ------------------------------ # Momentum step momentum -= local_stepsize * self.distribution.gradient(position) # Position step position += ( 0.5 * local_stepsize * self.mass_matrix.kinetic_energy_gradient(momentum) ) self.distribution.corrector(position, momentum) line.set_xdata( _numpy.append(line.get_xdata().flatten(), position[self.dims_to_plot[0]]) ) line.set_ydata( _numpy.append(line.get_ydata().flatten(), position[self.dims_to_plot[1]]) ) self.plots["figure"].canvas.draw() _plt.pause(0.00001) self.proposed_model = position.copy() self.proposed_momentum = momentum.copy() if not self.leave_proposal_animation: line.remove() line_grads.remove() integrators = { "lf": _propagate_leapfrog_visual, } integrators_full_names = { "lf": "leapfrog integrator", } available_integrators = integrators.keys()