Source code for GLE_analysisEM._gle_estimator

"""
This the main estimator module
"""
import numpy as np
import pandas as pd

import warnings
from time import time

from sklearn.base import BaseEstimator, DensityMixin
from sklearn.utils.validation import check_is_fitted
from sklearn.utils import check_random_state, check_array
from sklearn.exceptions import ConvergenceWarning

from .random_matrix import generateRandomDefPosMat
from ._euler_model import EulerForceVisibleModel

from ._gle_basis_projection import GLE_BasisTransform

# In case the fortran module is not available, there is the python fallback
try:
    from ._filter_smoother import filtersmoother
except ImportError as err:
    print(err)
    warnings.warn("Python fallback will been used for filtersmoother module. Consider compiling the fortran module")
    from ._kalman_python import filtersmoother

import multiprocessing

model_class = {"euler": EulerForceVisibleModel}


def sufficient_stats(traj, dim_x):
    """
    Given a sample of trajectory, compute the averaged values of the sufficient statistics
    Datas are stacked as (xv_plus_proj, xv_proj, v, bk)
    """

    xval = traj[:-1, 2 * dim_x : 3 * dim_x]
    dx = traj[:-1, :dim_x] - traj[:-1, dim_x : 2 * dim_x]
    bk = traj[:-1, 3 * dim_x :]
    xx = np.mean(xval[:, :, np.newaxis] * xval[:, np.newaxis, :], axis=0)
    xdx = np.mean(xval[:, :, np.newaxis] * dx[:, np.newaxis, :], axis=0)
    dxdx = np.mean(dx[:, :, np.newaxis] * dx[:, np.newaxis, :], axis=0)
    bkx = np.mean(bk[:, :, np.newaxis] * xval[:, np.newaxis, :], axis=0)
    bkdx = np.mean(bk[:, :, np.newaxis] * dx[:, np.newaxis, :], axis=0)
    bkbk = np.mean(bk[:, :, np.newaxis] * bk[:, np.newaxis, :], axis=0)

    return pd.Series({"dxdx": dxdx, "xdx": xdx, "xx": xx, "bkx": bkx, "bkdx": bkdx, "bkbk": bkbk, "µ_0": 0, "Σ_0": 1, "hS": 0})


def sufficient_stats_hidden(muh, Sigh, traj, old_stats, dim_x, dim_h, dim_force, model="aboba"):
    """
    Compute the sufficient statistics averaged over the hidden variable distribution
    Datas are stacked as (xv_plus_proj, xv_proj, v, bk)
    """
    # print("Suff_stats")
    xx = np.zeros((dim_x + dim_h, dim_x + dim_h))
    xx[:dim_x, :dim_x] = old_stats["xx"]
    xdx = np.zeros_like(xx)
    xdx[:dim_x, :dim_x] = old_stats["xdx"]
    dxdx = np.zeros_like(xx)
    dxdx[:dim_x, :dim_x] = old_stats["dxdx"]
    bkx = np.zeros((dim_force, dim_x + dim_h))
    bkx[:, :dim_x] = old_stats["bkx"]
    bkdx = np.zeros_like(bkx)
    bkdx[:, :dim_x] = old_stats["bkdx"]

    xval = traj[:-1, 2 * dim_x : 3 * dim_x]
    dx = traj[:-1, :dim_x] - traj[:-1, dim_x : 2 * dim_x]
    bk = traj[:-1, 3 * dim_x :]

    dh = muh[:-1, :dim_h] - muh[:-1, dim_h:]

    Sigh_tptp = np.mean(Sigh[:-1, :dim_h, :dim_h], axis=0)
    Sigh_ttp = np.mean(Sigh[:-1, dim_h:, :dim_h], axis=0)
    Sigh_tpt = np.mean(Sigh[:-1, :dim_h, dim_h:], axis=0)
    Sigh_tt = np.mean(Sigh[:-1, dim_h:, dim_h:], axis=0)

    muh_tptp = np.mean(muh[:-1, :dim_h, np.newaxis] * muh[:-1, np.newaxis, :dim_h], axis=0)
    muh_ttp = np.mean(muh[:-1, dim_h:, np.newaxis] * muh[:-1, np.newaxis, :dim_h], axis=0)
    muh_tpt = np.mean(muh[:-1, :dim_h, np.newaxis] * muh[:-1, np.newaxis, dim_h:], axis=0)
    muh_tt = np.mean(muh[:-1, dim_h:, np.newaxis] * muh[:-1, np.newaxis, dim_h:], axis=0)

    xx[dim_x:, dim_x:] = Sigh_tt + muh_tt
    xx[dim_x:, :dim_x] = np.mean(muh[:-1, dim_h:, np.newaxis] * xval[:, np.newaxis, :], axis=0)

    xdx[dim_x:, dim_x:] = Sigh_ttp + muh_ttp - Sigh_tt - muh_tt
    xdx[dim_x:, :dim_x] = np.mean(muh[:-1, dim_h:, np.newaxis] * dx[:, np.newaxis, :], axis=0)
    xdx[:dim_x, dim_x:] = np.mean(xval[:, :, np.newaxis] * dh[:, np.newaxis, :], axis=0)

    dxdx[dim_x:, dim_x:] = Sigh_tptp + muh_tptp - Sigh_ttp - Sigh_tpt - muh_ttp - muh_tpt + Sigh_tt + muh_tt
    dxdx[dim_x:, :dim_x] = np.mean(dh[:, :, np.newaxis] * dx[:, np.newaxis, :], axis=0)

    bkx[:, dim_x:] = np.mean(bk[:, :, np.newaxis] * muh[:-1, np.newaxis, dim_h:], axis=0)
    bkdx[:, dim_x:] = np.mean(bk[:, :, np.newaxis] * dh[:, np.newaxis, :], axis=0)

    xx[:dim_x, dim_x:] = xx[dim_x:, :dim_x].T
    dxdx[:dim_x, dim_x:] = dxdx[dim_x:, :dim_x].T

    detd = np.linalg.det(Sigh[:-1, :, :])
    dets = np.linalg.det(Sigh[:-1, dim_h:, dim_h:])
    hSdouble = 0.5 * np.log(detd[detd > 0.0]).mean()
    hSsimple = 0.5 * np.log(dets[dets > 0.0]).mean()
    # TODO take care of initial value that is missing
    return pd.Series({"dxdx": dxdx, "xdx": xdx, "xx": xx, "bkx": bkx, "bkdx": bkdx, "bkbk": old_stats["bkbk"], "µ_0": muh[0, dim_h:], "Σ_0": Sigh[0, dim_h:, dim_h:], "hS": 0.5 * dim_h * (1 + np.log(2 * np.pi)) + hSdouble - hSsimple})


def e_step_worker_pool(est, traj, datas_visible, N):
    muh, Sigh = est._e_step(traj)  # Compute hidden variable distribution
    return sufficient_stats_hidden(muh, Sigh, traj, datas_visible, est.dim_x, est.dim_h, est.dim_coeffs_force) / N


[docs]class GLE_Estimator(DensityMixin, BaseEstimator): """A GLE estimator based on Expectation-Maximation algorithm. Parameters ---------- dim_x : int, default=1 The number of visible dimensions dim_h : int, default=1 The number of hidden dimensions tol : float, defaults to 1e-5. The convergence threshold. EM iterations will stop when the lower bound average gain is below this threshold. max_iter: int, default=100 The maximum number of EM iterations OptimizeForce: bool, default=True Optimize or not the force coefficients, to be set to False if the force or the potential have been externally determined OptimizeDiffusion: bool, default=True Optimize or not the diffusion coefficients init_params : {'user','random','markov'}, defaults to 'random'. The method used to initialize the fitting coefficients. Must be one of:: 'user' : coefficients are initialized at values provided by the user 'random' : coefficients are initialized randomly. 'markov' : coefficients are initialized with Markovian estimation of the visible part model : {}, default to 'euler'. Choice of time discretized model to be fitted. For now only euler model is implemented basis: a scikit-learn Transformer class, default to linear basis. Transformer to get value of the basis function A_init, C_init, force_init, mu_init, sig_init: array, optional The user-provided initial coefficients, defaults to None. If it None, coefficients are initialized using the `init_params` method. n_init : int, defaults to 1. The number of initializations to perform. The best results are kept. no_stop: bool, default to False Does not stop the iterations if the algorithm have converged warm_start : bool, default to False. If 'warm_start' is True, the solution of the last fitting is used as initialization for the next call of fit(). This can speed up convergence when fit is called several times on similar problems. random_state : int, RandomState instance or None, optional (default=None) Controls the random seed given to the method chosen to initialize the parameters (see `init_params`). In addition, it controls the generation of random samples from the fitted distribution (see the method `sample`). Pass an int for reproducible output across multiple function calls. See :term:`Glossary <random_state>`. verbose : int, default to 0. Enable verbose output. If 1 then it prints the current initialization and each iteration step. If greater than 1 then it prints also the log probability and the time needed for each step. verbose_interval : int, default to 10. Number of iteration done before the next print. multiprocessing: int, default to 1 Number of process to use for E step """
[docs] def __init__( self, dim_x=1, dim_h=1, tol=1e-5, max_iter=100, OptimizeForce=True, OptimizeDiffusion=True, init_params="random", model="euler", basis=GLE_BasisTransform(basis_type="linear"), A_init=None, C_init=None, force_init=None, mu_init=None, sig_init=None, n_init=1, random_state=None, warm_start=False, no_stop=False, verbose=0, verbose_interval=10, multiprocessing=1, ): self.dim_x = dim_x self.dim_h = dim_h self.OptimizeForce = OptimizeForce self.OptimizeDiffusion = OptimizeDiffusion self.model = model self.basis = basis self.A_init = A_init self.C_init = C_init self.force_init = force_init self.mu_init = mu_init self.sig_init = sig_init self.tol = tol self.max_iter = max_iter self.n_init = n_init self.warm_start = warm_start self.no_stop = no_stop self.init_params = init_params self.random_state = random_state self.verbose = verbose self.verbose_interval = verbose_interval self.multiprocessing = multiprocessing
def _more_tags(self): return {"X_types": "2darray"} def set_init_coeffs(self, coeffs): """Set the initial values of the coefficients via a dict Parameters ---------- coeffs : dict Contains the wanted values of the initial coefficients, if a key is absent the coefficients is set to None """ keys_coeffs = ["A", "C", "force", "µ_0", "Σ_0"] init_coeffs = [None] * len(keys_coeffs) for n, key in enumerate(keys_coeffs): if key in coeffs: init_coeffs[n] = coeffs[key] self.A_init, self.C_init, self.force_init, self.mu_init, self.sig_init = init_coeffs def _check_initial_parameters(self): """Check values of the basic parameters.""" if self.dt <= 0.0: raise ValueError("Invalid value for 'dt': %d " "Timestep should be positive" % self.dt) if self.dim_h < 0: raise ValueError("Invalid value for 'dim_h': %d " "Estimator requires non-negative hidden dimension" % self.dim_h) if self.tol < 0.0: raise ValueError("Invalid value for 'tol': %.5f " "Tolerance used by the EM must be non-negative" % self.tol) if self.max_iter < 1: raise ValueError("Invalid value for 'max_iter': %d " "Estimation requires at least one iteration" % self.max_iter) self.model = self.model.casefold() if self.model not in model_class.keys(): raise ValueError("Model {} not implemented".format(self.model)) self.model_class = model_class[self.model](self.dim_x) if self.init_params == "user": if self.n_init != 1: self.n_init = 1 warnings.warn("The number of initialization have been put to 1 as the coefficients are user initialized.") if self.A_init is None: raise ValueError("No initial values for A is provided and init_params is set to user defined") if self.force_init is None: raise ValueError("No initial values for the force is provided and init_params is set to user defined") # if self.mu_init is None or self.sig_init is None: # raise ValueError("No initial values for initial conditions are provided and init_params is set to user defined") if self.A_init is not None: if np.asarray(self.A_init).shape != (self.dim_x + self.dim_h, self.dim_x + self.dim_h): raise ValueError("Wrong dimensions for A_init") if self.C_init is None: self.C_init = np.identity(self.dim_x + self.dim_h) expA, SST = self.model_class._convert_user_coefficients(np.asarray(self.A_init), np.asarray(self.C_init), self.dt) if not np.all(np.linalg.eigvals(SST) > 0): raise ValueError("Provided user values does not lead to definite positive diffusion matrix") if self.C_init is not None: if np.asarray(self.C_init).shape != (self.dim_x + self.dim_h, self.dim_x + self.dim_h): raise ValueError("Wrong dimensions for C_init") if self.mu_init is not None: if np.asarray(self.mu_init).shape != (self.dim_h,): raise ValueError("Provided user values for initial mean of hidden variables have wrong shape, provided {}, wanted {}".format(np.asarray(self.mu_init).shape, (self.dim_h,))) if self.sig_init is not None: if np.asarray(self.sig_init).shape != (self.dim_h, self.dim_h): raise ValueError("Provided user values for initial variance of hidden variables have wrong shape, provided {}, wanted {}".format(np.asarray(self.sig_init).shape, (self.dim_h, self.dim_h))) if not np.all(np.linalg.eigvals(self.sig_init) >= 0): raise ValueError("Provided user values for initial variance of hidden variables is not a definite positive diffusion matrix") # We initialize the coefficients value to dummy values to ensure existence of the variables if not hasattr(self, "friction_coeffs"): self.friction_coeffs = np.identity(self.dim_x + self.dim_h) if not hasattr(self, "diffusion_coeffs"): self.diffusion_coeffs = np.identity(self.dim_x + self.dim_h) def _initialize_parameters(self, random_state, traj_len=50): """Initialize the model parameters. Parameters ---------- X : array-like, shape (n_samples, n_features) random_state : RandomState A random number generator instance that controls the random seed used for the method chosen to initialize the parameters. """ self.random_state = check_random_state(random_state) if self.init_params == "random" or self.init_params == "markov": A = generateRandomDefPosMat(dim_x=self.dim_x, dim_h=self.dim_h, rng=self.random_state, max_ev=(1.0 / 50) / self.dt, min_re_ev=(0.5 / traj_len) / self.dt) # We ask the typical time scales to be correct with minimum and maximum timescale of the trajectory if self.C_init is None: # temp_mat = generateRandomDefPosMat(self.dim_h + self.dim_x, random_state) # C = temp_mat + temp_mat.T C = np.identity(self.dim_x + self.dim_h) else: C = np.asarray(self.C_init) (self.friction_coeffs, self.diffusion_coeffs) = self.model_class._convert_user_coefficients(A, C, self.dt) elif self.init_params == "user": (self.friction_coeffs, self.diffusion_coeffs) = self.model_class._convert_user_coefficients(np.asarray(self.A_init), np.asarray(self.C_init), self.dt) self.force_coeffs = np.asarray(self.force_init).reshape(self.dim_x, -1) else: raise ValueError("Unimplemented initialization method '%s'" % self.init_params) if not self.OptimizeDiffusion and self.A_init is not None and self.C_init is not None: _, self.diffusion_coeffs = self.model_class._convert_user_coefficients(np.asarray(self.A_init), np.asarray(self.C_init), self.dt) if not hasattr(self.basis, "fitted_"): # Setup the basis if needed dummytraj = np.zeros((1, self.dim_x)) self.basis.fit(dummytraj) self.dim_coeffs_force = self.basis.nb_basis_elt_ if self.force_init is not None: self.force_coeffs = np.asarray(self.force_init).reshape(self.dim_x, -1) else: self.force_coeffs = -self.random_state.random(size=(self.dim_x, self.dim_coeffs_force)) # -np.ones((self.dim_x, self.dim_coeffs_force)) # Initial conditions for hidden variables, either user provided or chosen from stationnary state probability fo the hidden variables if self.mu_init is not None: self.mu0 = np.asarray(self.mu_init) else: self.mu0 = np.zeros((self.dim_h)) if self.sig_init is not None: self.sig0 = np.asarray(self.sig_init) elif self.C_init is not None: self.sig0 = self.C_init[self.dim_x :, self.dim_x :] else: self.sig0 = np.identity(self.dim_h) self.initialized_ = True def fit(self, X, y=None, idx_trajs=[]): """Estimate model parameters with the EM algorithm. The method iterates between E-step and M-step for ``max_iter`` times until the change of likelihood or lower bound is less than ``tol``, otherwise, a ``ConvergenceWarning`` is raised. Upon consecutive calls, training starts where it left off. Parameters ---------- X : array-like, shape (n_samples, dim_x) List of positions data. idx_trajs: array, default [] Location of split if multiple trajectory are inputed Returns ------- self """ X = check_array(X, ensure_min_samples=4, ensure_min_features=self.dim_x) self.dt = X[1, 0] - X[0, 0] self._check_initial_parameters() Xproc, idx_trajs = self.model_class.preprocessingTraj(self.basis, X, idx_trajs=idx_trajs) traj_list = np.split(Xproc, idx_trajs) _min_traj_len = np.min([trj.shape[0] for trj in traj_list]) # print(traj_list) # if we enable warm_start, we will have a unique initialisation do_init = not (self.warm_start and hasattr(self, "converged_")) n_init = self.n_init if do_init else 1 max_lower_bound = -np.infty self.converged_ = False self.random_state = check_random_state(self.random_state) self.logL = np.empty((n_init, self.max_iter)) self.logL[:] = np.nan # Initial evalution of the sufficient statistics for observables datas_visible = 0.0 for traj in traj_list: datas_visible += sufficient_stats(traj, self.dim_x) / len(traj_list) self.coeffs_list_all = [] # For referencement best_coeffs = None best_n_iter = -1 best_n_init = -1 for init in range(n_init): coeff_list_init = [] if do_init: self._initialize_parameters(self.random_state, traj_len=_min_traj_len) if self.init_params == "markov": self._m_step_markov(datas_visible) # Initialize the visibles coefficients from markovian approx self._print_verbose_msg_init_beg(init) lower_bound = -np.infty if do_init else self.lower_bound_ lower_bound_m_step = -np.infty # Algorithm loop for n_iter in range(1, self.max_iter + 1): prev_lower_bound = lower_bound new_stat = self._e_step_stats(traj_list, datas_visible) # new_stat_rs = self._rescale_hidden(new_stat) # new_stat_rs=new_stat lower_bound = self.loglikelihood(new_stat) if self.verbose >= 2: if lower_bound - lower_bound_m_step < 0: print("Delta ll after E step:", lower_bound - lower_bound_m_step) curr_coeffs = self.get_coefficients() curr_coeffs["ll"] = lower_bound coeff_list_init.append(curr_coeffs) self._m_step(new_stat) lower_bound_m_step = self.loglikelihood(new_stat) if self.verbose >= 2 and lower_bound_m_step - lower_bound < 0: print("Delta ll after M step:", lower_bound_m_step - lower_bound) if np.isnan(lower_bound_m_step) or not self._check_finiteness(): # If we have nan value we simply restart the iteration warnings.warn("Initialization %d has NaN values. Ends iteration" % (init), ConvergenceWarning) if self.verbose >= 2: print("Friction:\n{} \n Diffusion:\n{} \n Force :\n{} \n µ0 :\n{} \n Σ0:\n{} \n".format(self.friction_coeffs, self.diffusion_coeffs, self.force_coeffs, self.mu0, self.sig0)) print("ll: {}".format(lower_bound)) break self.logL[init, n_iter - 1] = lower_bound change = lower_bound - prev_lower_bound self._print_verbose_msg_iter_end(n_iter, change, lower_bound) if lower_bound > max_lower_bound: max_lower_bound = lower_bound best_coeffs = self.get_coefficients() best_n_iter = n_iter best_n_init = init if abs(change) < self.tol: self.converged_ = True if not self.no_stop: break self._print_verbose_msg_init_end(lower_bound, n_iter) self.coeffs_list_all.append(coeff_list_init) if not self.converged_: warnings.warn("Initialization %d did not converge. " "Try different init parameters, " "or increase max_iter, tol " "or check for degenerate data." % (init + 1), ConvergenceWarning) if best_coeffs is not None: self.set_coefficients(best_coeffs, with_basis=False) # Don't set basis coefficients self.n_iter_ = best_n_iter self.n_best_init_ = best_n_init self.lower_bound_ = max_lower_bound self._print_verbose_msg_fit_end(max_lower_bound, best_n_init, best_n_iter) return self def _e_step(self, traj): """E step. Parameters ---------- traj : array-like, shape (n_timstep, dim_x) One trajectory Returns ------- muh : array-like, shape (n_timstep, 2*dim_h) Mean values of the pair of the hidden variables Sigh : array-like, shape (n_timstep, 2*dim_h,2*dim_h) Covariances of the pair of the hidden variables """ Xtplus, mutilde, R = self.model_class.compute_expectation_estep(traj, self.friction_coeffs, self.force_coeffs, self.dim_h, self.dt) return filtersmoother(Xtplus, mutilde, R, self.diffusion_coeffs, self.mu0, self.sig0) def _e_step_stats(self, traj_list, datas_visible): new_stat = 0.0 if self.multiprocessing > 1: # If we ask for more than one process with multiprocessing.Pool(processes=self.multiprocessing) as pool: proc = [pool.apply_async(e_step_worker_pool, args=(self, traj, datas_visible, len(traj_list))) for traj in traj_list] for p in proc: ret = p.get() # will block new_stat += ret else: for traj in traj_list: muh, Sigh = self._e_step(traj) # Compute hidden variable distribution new_stat += sufficient_stats_hidden(muh, Sigh, traj, datas_visible, self.dim_x, self.dim_h, self.dim_coeffs_force) / len(traj_list) return new_stat def _m_step(self, sufficient_stat): """M step. .. todo:: -Select dimension of fitted parameters from the sufficient stats (To deal with markovian initialization) """ friction, force, diffusion = self.model_class.m_step(self.friction_coeffs, self.diffusion_coeffs, self.force_coeffs, sufficient_stat, self.dim_h, self.dt, self.OptimizeDiffusion, self.OptimizeForce) self.friction_coeffs = friction if self.OptimizeForce: self.force_coeffs = force self.mu0 = sufficient_stat["µ_0"] # self.sig0 = sufficient_stat["Σ_0"] # A, C = self.model_class._convert_local_coefficients(self.friction_coeffs, self.diffusion_coeffs, self.dt) # self.sig0 = C[self.dim_x :, self.dim_x :] if self.OptimizeDiffusion: self.diffusion_coeffs = diffusion def _em_step(self, vect_coeff, traj_list, datas_visible): """ Wrapper of the E and M for gradient descent algorithm """ self.unvectorization_coefficient(vect_coeff) new_stat = self._e_step_stats(traj_list, datas_visible) return self.loglikelihood(new_stat) def _check_finiteness(self): """ Check that all quantities are finite """ return np.isfinite(np.sum(self.friction_coeffs)) and np.isfinite(np.sum(self.diffusion_coeffs)) and np.isfinite(np.sum(self.force_coeffs)) and np.isfinite(np.sum(self.mu0)) and np.isfinite(np.sum(self.sig0)) def _m_step_markov(self, sufficient_stat_vis): """Compute coefficients estimate via Markovian approximation to provide initialization""" A_full, C_full = self.model_class._convert_local_coefficients(self.friction_coeffs, self.diffusion_coeffs, self.dt) friction, force, diffusion = self.model_class.m_step(self.friction_coeffs, self.diffusion_coeffs, self.force_coeffs, sufficient_stat_vis, 0, self.dt, self.OptimizeDiffusion, self.OptimizeForce) A, C = self.model_class._convert_local_coefficients(friction, diffusion, self.dt) A_full[: self.dim_x, : self.dim_x] = A if self.OptimizeForce: self.force_coeffs = force if self.OptimizeDiffusion: C_full[: self.dim_x, : self.dim_x] = C (self.friction_coeffs, self.diffusion_coeffs) = self.model_class._convert_user_coefficients(A_full, C_full, self.dt) def loglikelihood(self, suff_datas, dim_h=None): """ Return the current value of the negative log-likelihood """ if dim_h is None: dim_h = self.dim_h ll = self.model_class.loglikelihood(suff_datas, self.friction_coeffs, self.diffusion_coeffs, self.force_coeffs, dim_h, self.dt) if dim_h > 0 and not np.isnan(suff_datas["hS"]): return ll + suff_datas["hS"] else: if dim_h > 0: warnings.warn("NaN value in hidden entropy") return ll def score(self, X, y=None, idx_trajs=[], Xh=None): """Compute the per-sample average log-likelihood of the given data X. Parameters ---------- X : array-like, shape (n_samples, dim_x) List of positions data. idx_trajs: array, default [] Location of split if multiple trajectory are inputed Returns ------- log_likelihood : float Log likelihood of the generalized Langevin model given X. """ check_is_fitted(self, "initialized_") X = check_array(X, ensure_min_samples=4, ensure_min_features=self.dim_x) self.dt = X[1, 0] - X[0, 0] Xproc, idx_trajs = self.model_class.preprocessingTraj(self.basis, X, idx_trajs=idx_trajs) traj_list = np.split(Xproc, idx_trajs) # Initial evalution of the sufficient statistics for observables new_stat = 0.0 if Xh is None: datas_visible = 0.0 for traj in traj_list: datas_visible += sufficient_stats(traj, self.dim_x) / len(traj_list) new_stat = self._e_step_stats(traj_list, datas_visible) # muh, Sigh = self._e_step(traj) # Compute hidden variable distribution # new_stat += sufficient_stats_hidden(muh, Sigh, traj, datas, self.dim_x, self.dim_h, self.dim_coeffs_force) / len(traj_list) else: traj_list_h = np.split(Xh, idx_trajs) for n, traj in enumerate(traj_list): datas_visible = sufficient_stats(traj, self.dim_x) zero_sig = np.zeros((len(traj), 2 * self.dim_h, 2 * self.dim_h)) muh = np.hstack((np.roll(traj_list_h[n], -1, axis=0), traj_list_h[n])) new_stat += sufficient_stats_hidden(muh, zero_sig, traj, datas_visible, self.dim_x, self.dim_h, self.dim_coeffs_force) / len(traj_list) lower_bound = self.loglikelihood(new_stat) return lower_bound def predict(self, X, idx_trajs=[]): """Predict the hidden variables for the data samples in X using trained model. Parameters ---------- X : array-like, shape (n_samples, dim_x) List of positions data. idx_trajs: array, default [] Location of split if multiple trajectory are inputed Returns ------- muh : array, shape (n_samples,dim_h) Average value of the hidden variables given current estimation of the parameters and provided trajectories. """ check_is_fitted(self, "converged_") X = check_array(X, ensure_min_samples=4, ensure_min_features=self.dim_x) Xproc, idx_trajs = self.model_class.preprocessingTraj(self.basis, X, idx_trajs=idx_trajs) traj_list = np.split(Xproc, idx_trajs) muh_out = None for traj in traj_list: muh, Sigh = self._e_step(traj) # Compute hidden variable distribution if muh_out is None: muh_out = muh[:, self.dim_h :] else: muh_out = np.hstack((muh_out, muh[:, self.dim_h :])) return muh_out def sample(self, n_samples=50, n_trajs=1, x0=None, v0=None, dt=5e-3, burnout=0, rng=None): """Generate random samples from the fitted GLE model. Use the provided basis to compute the force term. The basis should be fitted first, if not will be fitted using a dummy trajectory. Parameters ---------- n_samples : int, default=50 Number of timestep per trajectory to generate. Defaults to 50. n_trajs : int, default=1 Number of trajectory to generate x0,v0 : array-like, optionnal Initial value of the trajectory dt : float, default = 5e-3 Timestep to use for the integration burnout : int, default to 0 Remove [burnout] step from the start of the trajectories rng: random number generator Provide if wanted a random number generator. See numpy documention about RNG Returns ------- X : array, shape (n_samples, n_features) Randomly generated trajectory idx : array Index of the trajectories y : array, shape (nsamples,) Hidden variables values """ if rng is None: self.random_state = check_random_state(self.random_state) else: self.random_state = check_random_state(rng) self.dt = dt self._check_initial_parameters() if not (self.warm_start or hasattr(self, "converged_")): self._initialize_parameters(self.random_state, traj_len=n_samples) if x0 is None: x0 = np.zeros((self.dim_x)) if v0 is None: v0 = np.zeros((self.dim_x)) X = None idx_trajs = [] X_h = None if self.multiprocessing > 1: # If we ask for more than one process child_seeds = np.random.SeedSequence(self.random_state.get_state()[1]).spawn(n_trajs) with multiprocessing.Pool(processes=self.multiprocessing) as pool: proc = [ pool.apply_async( self.model_class.generator, args=(), kwds={ "nsteps": n_samples, "rng": np.random.default_rng(child_seeds[n]), "dt": self.dt, "dim_h": self.dim_h, "x0": x0, "v0": v0, "friction": self.friction_coeffs, "SST": self.diffusion_coeffs, "force_coeffs": self.force_coeffs, "muh0": self.mu0, "sigh0": self.sig0, "basis": self.basis, }, ) for n in range(n_trajs) ] for p in proc: txv, h = p.get() # will block if X is None: X = txv[burnout:, :] else: idx_trajs.append(len(X)) X = np.vstack((X, txv[burnout:, :])) if X_h is None: X_h = h[burnout:, :] else: X_h = np.vstack((X_h, h[burnout:, :])) else: for n in range(n_trajs): txv, h = self.model_class.generator(nsteps=n_samples, dt=self.dt, dim_h=self.dim_h, x0=x0, v0=v0, friction=self.friction_coeffs, SST=self.diffusion_coeffs, force_coeffs=self.force_coeffs, muh0=self.mu0, sigh0=self.sig0, basis=self.basis, rng=self.random_state) if X is None: X = txv[burnout:, :] else: idx_trajs.append(len(X)) X = np.vstack((X, txv[burnout:, :])) if X_h is None: X_h = h[burnout:, :] else: X_h = np.vstack((X_h, h[burnout:, :])) return X, idx_trajs, X_h def get_coefficients(self): """Return the actual values of the fitted coefficients.""" A, C = self.model_class._convert_local_coefficients(self.friction_coeffs, self.diffusion_coeffs, self.dt) return {"A": A, "C": C, "force": self.force_coeffs, "µ_0": self.mu0, "Σ_0": self.sig0, "SST": self.diffusion_coeffs, "dt": self.dt, "basis": self.basis.get_coefficients()} def set_coefficients(self, coeffs, with_basis=True): """Set the value of the coefficients Parameters ---------- coeffs : dict Contains the value of the coefficients to set. """ (self.friction_coeffs, self.diffusion_coeffs) = self.model_class._convert_user_coefficients(np.asarray(coeffs["A"]), np.asarray(coeffs["C"]), self.dt) (self.force_coeffs, self.mu0, self.sig0) = (coeffs["force"], coeffs["µ_0"], coeffs["Σ_0"]) if with_basis: self.basis.set_coefficients(coeffs["basis"]) def _n_parameters(self): """Return the number of free parameters in the model.""" return 2 * (self.dim_x + self.dim_h) ** 2 + self.dim_h + self.dim_h ** 2 def bic(self, X): """Bayesian information criterion for the current model on the input X. Parameters ---------- X : array of shape (n_samples, n_features) Returns ------- bic : float The lower the better. """ return -2 * self.score(X) * X.shape[0] + self._n_parameters() * np.log(X.shape[0]) def aic(self, X): """Akaike information criterion for the current model on the input X. Parameters ---------- X : array of shape (n_samples, n_features) Returns ------- aic : float The lower the better. """ return -2 * self.score(X) * X.shape[0] + 2 * self._n_parameters() def _print_verbose_msg_init_beg(self, n_init): """Print verbose message on initialization.""" if self.verbose == 1: print("Initialization %d" % n_init) elif self.verbose >= 2: print("Initialization %d" % n_init) self._init_prev_time = time() self._iter_prev_time = self._init_prev_time if self.verbose >= 3: print("----------------Current parameters values------------------") print(self.get_coefficients()) def _print_verbose_msg_iter_end(self, n_iter, diff_ll, log_likelihood): """Print verbose message on initialization.""" if n_iter % self.verbose_interval == 0: if self.verbose == 1: print("***Iteration EM*** : {} / {} --- Current loglikelihood {}".format(n_iter, self.max_iter, log_likelihood)) elif self.verbose >= 2: cur_time = time() print("***Iteration EM*** :%d / %d\t time lapse %.5fs\t Current loglikelihood %.5f loglikelihood change %.5f" % (n_iter, self.max_iter, cur_time - self._iter_prev_time, log_likelihood, diff_ll)) self._iter_prev_time = cur_time if self.verbose >= 3: print("----------------Current parameters values------------------") print(self.get_coefficients()) def _print_verbose_msg_init_end(self, ll, best_iter): """Print verbose message on the end of iteration.""" if self.verbose == 1: print("Initialization converged: %s at step %i \t ll %.5f" % (self.converged_, best_iter, ll)) elif self.verbose >= 2: print("Initialization converged: %s at step %i \t time lapse %.5fs\t ll %.5f" % (self.converged_, best_iter, time() - self._init_prev_time, ll)) print("----------------Current parameters values------------------") print(self.get_coefficients()) def _print_verbose_msg_fit_end(self, ll, best_init, best_iter): """Print verbose message on the end of iteration.""" if self.verbose == 1: print("Fit converged: %s Init: %s at step %i \t ll %.5f" % (self.converged_, best_init, best_iter, ll)) elif self.verbose >= 2: print("Fit converged: %s Init: %s at step %i \t time lapse %.5fs\t ll %.5f" % (self.converged_, best_init, best_iter, time() - self._init_prev_time, ll)) print("----------------Fitted parameters values------------------") print(self.get_coefficients())