Source code for hmclab.Distributions.SourceLocation

from typing import Union as _Union, Tuple as _Tuple

import numpy as _numpy
import matplotlib.pyplot as _plt

from hmclab.Distributions import _AbstractDistribution
from hmclab.Helpers.CustomExceptions import InvalidCaseError as _InvalidCaseError
import math as _math


[docs]class SourceLocation2D(_AbstractDistribution): """Earthquake source location in 2D using a single velocity for the subsurface.""" name: str = "Earthquake source location in 2D" infer_velocity: bool = True """Boolean that determines whether or not the model velocity is also a free parameter.""" receiver_array_x: _numpy.ndarray = None receiver_array_z: _numpy.ndarray = None number_of_events: int = None medium_velocity: float = None
[docs] def __init__( self, receiver_array_x: _numpy.ndarray, receiver_array_z: _numpy.ndarray, observed_data: _numpy.ndarray, data_std: _Union[_numpy.ndarray, float], infer_velocity: bool = True, medium_velocity=None, ): receiver_array_x = receiver_array_x.copy() receiver_array_z = receiver_array_z.copy() observed_data = observed_data.copy() if type(data_std) == _numpy.ndarray: data_std = data_std.copy() # Stations --------------------------------------------------------------------- # Assert that the arrays are row vectors. try: assert receiver_array_x.shape == ( 1, receiver_array_x.size, ), f"Receivers should be row vectors, not {receiver_array_x.shape}" assert receiver_array_z.shape == ( 1, receiver_array_z.size, ), f"Receivers should be row vectors, not {receiver_array_z.shape}" except AssertionError: receiver_array_x.shape = (1, receiver_array_x.size) receiver_array_z.shape = (1, receiver_array_z.size) self.receiver_array_x = receiver_array_x self.receiver_array_z = receiver_array_z # Free parameters -------------------------------------------------------------- # Determine if velocity is a free parameter self.infer_velocity = infer_velocity if not self.infer_velocity: assert medium_velocity is not None self.medium_velocity = medium_velocity # Observed data and error statistics ------------------------------------------- # Get number of events from data and array size assert ( observed_data.size % receiver_array_x.size == 0 ), "Data does not match receiver array" self.number_of_events: int = int(observed_data.size / receiver_array_x.size) self.number_of_stations: int = self.receiver_array_z.size self.number_of_datums: int = self.number_of_events * self.number_of_stations try: assert observed_data.shape == ( self.number_of_events, self.number_of_stations, ), "Wrong shape for the observed data, trying to transpose..." except AssertionError: observed_data = observed_data.T assert observed_data.shape == ( self.number_of_events, self.number_of_stations, ), "Wrong shape for the observed data, not sure what to do." self.observed_data = observed_data if type(data_std) is float: data_std = _numpy.ones_like(observed_data) * data_std try: assert data_std.shape == ( self.number_of_events, self.number_of_stations, ), "Wrong shape for the data uncertainty, trying to transpose..." except AssertionError: data_std = data_std.T assert data_std.shape == ( self.number_of_events, self.number_of_stations, ), "Wrong shape for the data uncertainty, not sure what to do." self.data_std = data_std # Assert that the data and data error dispersion are the same shape assert observed_data.shape == data_std.shape # Set amount of free parameters. self.dimensions: int = self.number_of_events * 3 + int(self.infer_velocity)
def misfit(self, coordinates: _numpy.ndarray) -> float: x, z, T, v = self.split_vector(coordinates, velocity=self.medium_velocity) distances = ( (x - self.receiver_array_x) ** 2.0 + (z - self.receiver_array_z) ** 2.0 ) ** 0.5 return self.misfit_bounds(coordinates) + 0.5 * _numpy.nansum( ((self.observed_data - (T + distances / v)) / self.data_std) ** 2 ) def gradient(self, coordinates: _numpy.ndarray) -> _numpy.ndarray: x, z, T, v = self.split_vector(coordinates, velocity=self.medium_velocity) dx = x - self.receiver_array_x dz = z - self.receiver_array_z d = (dx**2.0 + dz**2.0) ** 0.5 # Data t_calc = T + d / v # Data gradients data_grad_x = dx / (v * d) data_grad_z = dz / (v * d) data_grad_v = -d / (v * v) data_grad_T = _numpy.ones_like(data_grad_x) # Misfit gradient misfit_grad = (t_calc - self.observed_data) / (self.data_std**2) # Applying chain rule gx = _numpy.sum(misfit_grad * data_grad_x, axis=1) gz = _numpy.sum(misfit_grad * data_grad_z, axis=1) gT = _numpy.sum(misfit_grad * data_grad_T, axis=1) gv = _numpy.sum(misfit_grad * data_grad_v) # Compiling into total gradient total_grad = _numpy.zeros_like(coordinates) if self.infer_velocity: total_grad[0:-1:3, 0] = gx total_grad[1:-1:3, 0] = gz total_grad[2:-1:3, 0] = gT total_grad[-1, 0] = gv return total_grad else: total_grad[0::3, 0] = gx total_grad[1::3, 0] = gz total_grad[2::3, 0] = gT return total_grad def generate(self, repeat=1, rng=_numpy.random.default_rng()) -> _numpy.ndarray: raise NotImplementedError( "Generating samples from this distribution is not implemented or supported." ) @staticmethod def forward(x, z, T, v, receiver_array_x, receiver_array_z) -> _numpy.ndarray: # # Assert that the vectors are column-shaped # assert x.shape == (x.size, 1) # assert z.shape == (z.size, 1) # assert T.shape == (T.size, 1) # Assert that the vectors are the same length # assert x.size == z.size and x.size == T.size # assert receiver_array_x.shape == receiver_array_z.shape # assert receiver_array_x.shape == (1, receiver_array_x.size) # Forward calculate the travel time by dividing distance (Pythagoras) by speed, # and adding origin time. # # This function relies heavily on matrix broadcasting in NumPy to handle # multiple stations. The arrays T, x and z are column vectors. By subtracting # the row vectors of the station coordinates, we create matrices for the # traveltimes, with the following structure (example data points are filled in): # # Station: 1 2 # Event 1: [5.3s, 9.0s] # Event 2: [2.7s, 7,4s] # Event 3: [7.1s, 8.2s] traveltimes: _numpy.ndarray = ( T + ((x - receiver_array_x) ** 2.0 + (z - receiver_array_z) ** 2.0) ** 0.5 / v ) return traveltimes def forward_vector(self, m): x, z, T, v = self.split_vector(m, velocity=self.medium_velocity) traveltimes: _numpy.ndarray = ( T + ((x - self.receiver_array_x) ** 2.0 + (z - self.receiver_array_z) ** 2.0) ** 0.5 / v ) return traveltimes @staticmethod def forward_gradient( x, z, T, v, receiver_array_x, receiver_array_z ) -> _numpy.ndarray: # Assert that the vectors are column-shaped # assert x.shape == (x.size, 1) # assert z.shape == (z.size, 1) # assert T.shape == (T.size, 1) # Assert that the vectors are the same length # assert x.size == z.size and x.size == T.size # assert receiver_array_x.shape == receiver_array_z.shape # assert receiver_array_x.shape == (1, receiver_array_x.size) # Forward calculate the travel time by dividing distance (Pythagoras) by speed, # and adding origin time. # # This function relies heavily on matrix broadcasting in NumPy to handle # multiple stations. The arrays T, x and z are column vectors. By subtracting # the row vectors of the station coordinates, we create matrices for the # gradients, with the following structure (example data points are filled in): # # Station: 1 2 # Event 1: [5.3s, 9.0s] # Event 2: [2.7s, 7,4s] # Event 3: [7.1s, 8.2s] distances = ( (x - receiver_array_x) ** 2.0 + (z - receiver_array_z) ** 2.0 ) ** 0.5 gradient_x = (x - receiver_array_x) / (v * distances) gradient_z = (z - receiver_array_z) / (v * distances) gradient_T = 0.0 * gradient_x + 1.0 gradient_v = -distances / (v * v) return gradient_x, gradient_z, gradient_T, gradient_v def describe(self): if self.infer_velocity: print("We are inferring the location of events AS WELL AS medium velocity.") print( "The parameters are ordered like: x1, z1, t1, x2, z2, t2, ... xn, " "zn, tn, v." ) else: print("We are inferring the location of events GIVEN a medium velocity.") print( "The parameters are ordered like: x1, z1, t1, x2, z2, t2, ... xn, " "zn, tn." ) print( f"We are placing {self.number_of_events} events using " f"{self.number_of_stations} stations, and a total of " f"{self.number_of_datums} observations" ) @staticmethod def create_default(dimensions, seed=127): # Possible parameter numbers: # 3, 4, 6, 7, 9, 10, etcc events = _math.floor(dimensions / 3) medium_velocity = None if dimensions < 3: raise _InvalidCaseError() elif dimensions % 3 == 1: infer_velocity = True elif dimensions % 3 == 0: infer_velocity = False else: raise _InvalidCaseError() _numpy.random.seed(seed) # Create surface stations ------------------------------------------------------ stations_x = _numpy.random.rand(1, 3) * 40 - 10 stations_z = _numpy.zeros_like(stations_x) stations_z[0, -1] = 4.0 # Create the true model and observations --------------------------------------- x = _numpy.random.rand(events, 1) * 20 z = _numpy.random.rand(events, 1) * 10 T = _numpy.random.rand(events, 1) * 10 v = _numpy.random.rand(1, 1) * 3 + 1 if not infer_velocity: medium_velocity = v fake_observed_data = SourceLocation2D.forward( x, z, T, v, stations_x, stations_z ) # Create the likelihood -------------------------------------------------------- data_std = 1.0 * _numpy.random.randn(*fake_observed_data.shape) fake_observed_data += data_std * _numpy.random.randn(*data_std.shape) return SourceLocation2D( stations_x, stations_z, fake_observed_data, data_std, infer_velocity=infer_velocity, medium_velocity=medium_velocity, ) def plot_data(self, figure=None): if figure is None: figure = _plt.figure(figsize=(6, 6)) _plt.grid() for event in range(self.number_of_events): _plt.errorbar( self.receiver_array_x.T, self.observed_data[event, :], yerr=self.data_std[event, :], fmt="x", label=f"Event {event+1}", ) _plt.ylim([0, _plt.gca().get_ylim()[1]]) _plt.xlabel("Horizontal distance [km]") _plt.ylabel("Time [s]") @staticmethod def split_vector(model_vector, velocity=None) -> _Tuple: if velocity is None: assert model_vector.size % 3 == 1 x = model_vector[0:-1:3] z = model_vector[1:-1:3] T = model_vector[2:-1:3] v = model_vector[-1] return x, z, T, v else: assert model_vector.size % 3 == 0 x = model_vector[0::3] z = model_vector[1::3] T = model_vector[2::3] return x, z, T, velocity
[docs]class SourceLocation3D(_AbstractDistribution): """Earthquake source location in 3D using a single velocity for the subsurface. Pretty close to a one-to-one copy of the 2D problem, but it was clearer to write the codes out separately.""" name: str = "Earthquake source location in 3D" infer_velocity: bool = True """Boolean that determines whether or not the model velocity is also a free parameter.""" receiver_array_x: _numpy.ndarray = None receiver_array_y: _numpy.ndarray = None receiver_array_z: _numpy.ndarray = None number_of_events: int = None medium_velocity: float = None
[docs] def __init__( self, receiver_array_x: _numpy.ndarray, receiver_array_y: _numpy.ndarray, receiver_array_z: _numpy.ndarray, observed_data: _numpy.ndarray, data_std: _Union[_numpy.ndarray, float], infer_velocity: bool = True, medium_velocity=None, ): receiver_array_x = receiver_array_x.copy() receiver_array_y = receiver_array_y.copy() receiver_array_z = receiver_array_z.copy() observed_data = observed_data.copy() if type(data_std) == _numpy.ndarray: data_std = data_std.copy() # Stations --------------------------------------------------------------------- # Assert that the arrays are row vectors. try: assert receiver_array_x.shape == ( 1, receiver_array_x.size, ), f"Receivers should be row vectors, not {receiver_array_x.shape}" assert receiver_array_y.shape == ( 1, receiver_array_y.size, ), f"Receivers should be row vectors, not {receiver_array_y.shape}" assert receiver_array_z.shape == ( 1, receiver_array_z.size, ), f"Receivers should be row vectors, not {receiver_array_z.shape}" except AssertionError: receiver_array_x.shape = (1, receiver_array_x.size) receiver_array_y.shape = (1, receiver_array_y.size) receiver_array_z.shape = (1, receiver_array_z.size) self.receiver_array_x = receiver_array_x self.receiver_array_y = receiver_array_y self.receiver_array_z = receiver_array_z # Free parameters -------------------------------------------------------------- # Determine if velocity is a free parameter self.infer_velocity = infer_velocity if not self.infer_velocity: assert medium_velocity is not None self.medium_velocity = medium_velocity # Observed data and error statistics ------------------------------------------- # Get number of events from data and array size assert observed_data.size % receiver_array_x.size == 0 self.number_of_events: int = int(observed_data.size / receiver_array_x.size) self.number_of_stations: int = self.receiver_array_z.size self.number_of_datums: int = self.number_of_events * self.number_of_stations try: assert observed_data.shape == ( self.number_of_events, self.number_of_stations, ), "Wrong shape for the observed data, trying to transpose..." except AssertionError: observed_data = observed_data.T assert observed_data.shape == ( self.number_of_events, self.number_of_stations, ), "Wrong shape for the observed data, not sure what to do." self.observed_data = observed_data if type(data_std) is float: data_std = _numpy.ones_like(observed_data) * data_std try: assert data_std.shape == ( self.number_of_events, self.number_of_stations, ), "Wrong shape for the data uncertainty, trying to transpose..." except AssertionError: data_std = data_std.T assert data_std.shape == ( self.number_of_events, self.number_of_stations, ), "Wrong shape for the data uncertainty, not sure what to do." self.data_std = data_std # Assert that the data and data error dispersion are the same shape assert observed_data.shape == data_std.shape # Set amount of free parameters. self.dimensions: int = self.number_of_events * 4 + int(self.infer_velocity)
def misfit(self, coordinates: _numpy.ndarray) -> float: x, y, z, T, v = self.split_vector(coordinates, velocity=self.medium_velocity) distances = ( (x - self.receiver_array_x) ** 2.0 + (y - self.receiver_array_y) ** 2.0 + (z - self.receiver_array_z) ** 2.0 ) ** 0.5 return self.misfit_bounds(coordinates) + 0.5 * _numpy.nansum( ((self.observed_data - (T + distances / v)) / self.data_std) ** 2 ) def gradient(self, coordinates: _numpy.ndarray) -> _numpy.ndarray: x, y, z, T, v = self.split_vector(coordinates, velocity=self.medium_velocity) dx = x - self.receiver_array_x dy = y - self.receiver_array_y dz = z - self.receiver_array_z d = (dx**2.0 + dy**2.0 + dz**2.0) ** 0.5 # Data t_calc = T + d / v # Data gradients data_grad_x = dx / (v * d) data_grad_y = dy / (v * d) data_grad_z = dz / (v * d) data_grad_v = -d / (v * v) data_grad_T = _numpy.ones_like(data_grad_x) # Misfit gradient misfit_grad = (t_calc - self.observed_data) / (self.data_std**2) # Applying chain rule gx = _numpy.nansum(misfit_grad * data_grad_x, axis=1) gy = _numpy.nansum(misfit_grad * data_grad_y, axis=1) gz = _numpy.nansum(misfit_grad * data_grad_z, axis=1) gT = _numpy.nansum(misfit_grad * data_grad_T, axis=1) gv = _numpy.nansum(misfit_grad * data_grad_v) # Compiling into total gradient total_grad = _numpy.zeros_like(coordinates) if self.infer_velocity: total_grad[0:-1:4, 0] = gx total_grad[1:-1:4, 0] = gy total_grad[2:-1:4, 0] = gz total_grad[3:-1:4, 0] = gT total_grad[-1, 0] = gv return total_grad else: total_grad[0::4, 0] = gx total_grad[1::4, 0] = gy total_grad[2::4, 0] = gz total_grad[3::4, 0] = gT return total_grad def generate(self, repeat=1, rng=_numpy.random.default_rng()) -> _numpy.ndarray: raise NotImplementedError( "Generating samples from this distribution is not implemented or supported." ) @staticmethod def forward( x, y, z, T, v, receiver_array_x, receiver_array_y, receiver_array_z ) -> _numpy.ndarray: traveltimes: _numpy.ndarray = ( T + ( (x - receiver_array_x) ** 2.0 + (y - receiver_array_y) ** 2.0 + (z - receiver_array_z) ** 2.0 ) ** 0.5 / v ) return traveltimes def forward_vector(self, m): x, y, z, T, v = self.split_vector(m, velocity=self.medium_velocity) traveltimes: _numpy.ndarray = ( T + ( (x - self.receiver_array_x) ** 2.0 + (y - self.receiver_array_y) ** 2.0 + (z - self.receiver_array_z) ** 2.0 ) ** 0.5 / v ) return traveltimes @staticmethod def forward_gradient( x, y, z, T, v, receiver_array_x, receiver_array_y, receiver_array_z ) -> _numpy.ndarray: distances = ( (x - receiver_array_x) ** 2.0 + (y - receiver_array_y) ** 2.0 + (z - receiver_array_z) ** 2.0 ) ** 0.5 gradient_x = (x - receiver_array_x) / (v * distances) gradient_y = (y - receiver_array_y) / (v * distances) gradient_z = (z - receiver_array_z) / (v * distances) gradient_T = 0.0 * gradient_x + 1.0 gradient_v = -distances / (v * v) return gradient_x, gradient_y, gradient_z, gradient_T, gradient_v def describe(self): if self.infer_velocity: print("We are inferring the location of events AS WELL AS medium velocity.") print( "The parameters are ordered like: x1, y1, z1, t1, x2, y2, z2, t2, ..." " xn, yn, zn, tn, v." ) else: print("We are inferring the location of events GIVEN a medium velocity.") print( "The parameters are ordered like: x1, y1, z1, t1, x2, y2, z2, t2, ..." " xn, yn, zn, tn." ) print( f"We are placing {self.number_of_events} events using " f"{self.number_of_stations} stations, and a total of " f"{self.number_of_datums} observations" ) @staticmethod def create_default(dimensions, seed=127): # Possible parameter numbers 4 * n_events + (opt velocity) 1: # 4, 5, 8, 9, 12, 13, etc events = _math.floor(dimensions / 4) medium_velocity = None if dimensions < 4: raise _InvalidCaseError() elif dimensions % 4 == 1: infer_velocity = True elif dimensions % 4 == 0: infer_velocity = False else: raise _InvalidCaseError() _numpy.random.seed(seed) # Create surface stations ------------------------------------------------------ n_stations = 3 stations_x = _numpy.random.rand(1, n_stations) * 40 - 10 stations_y = _numpy.random.rand(1, n_stations) * 40 - 10 stations_z = _numpy.zeros_like(stations_x) # Create the true model and observations --------------------------------------- x = _numpy.random.rand(events, 1) * 20 y = _numpy.random.rand(events, 1) * 20 z = _numpy.random.rand(events, 1) * 10 T = _numpy.random.rand(events, 1) * 10 v = _numpy.random.rand(1, 1) * 3 + 1 if not infer_velocity: medium_velocity = v fake_observed_data = SourceLocation3D.forward( x, y, z, T, v, stations_x, stations_y, stations_z ) # Create the likelihood -------------------------------------------------------- data_std = 1.0 * _numpy.random.randn(*fake_observed_data.shape) fake_observed_data += data_std * _numpy.random.randn(*data_std.shape) return SourceLocation3D( stations_x, stations_y, stations_z, fake_observed_data, data_std, infer_velocity=infer_velocity, medium_velocity=medium_velocity, ) def plot_data(self, figure=None): if figure is None: figure = _plt.figure(figsize=(6, 6)) _plt.grid() for event in range(self.number_of_events): _plt.errorbar( self.receiver_array_x.T, self.observed_data[event, :], yerr=self.data_std[event, :], fmt="x", label=f"Event {event+1}", ) _plt.ylim([0, _plt.gca().get_ylim()[1]]) _plt.xlabel("Horizontal distance [km]") _plt.ylabel("Time [s]") @staticmethod def split_vector(model_vector, velocity=None) -> _Tuple: if velocity is None: assert model_vector.size % 4 == 1 x = model_vector[0:-1:4] y = model_vector[1:-1:4] z = model_vector[2:-1:4] T = model_vector[3:-1:4] v = model_vector[-1] return x, y, z, T, v else: assert model_vector.size % 4 == 0 x = model_vector[0::4] y = model_vector[1::4] z = model_vector[2::4] T = model_vector[3::4] return x, y, z, T, velocity