Source code for mdevaluate.correlation

import numpy as np
import numba
from scipy.special import legendre
from itertools import chain
import dask.array as darray

from .autosave import autosave_data
from .utils import filon_fourier_transformation, coherent_sum, histogram
from .pbc import pbc_diff
from .logging import logger

[docs]def set_has_counter(func): func.has_counter = True return func
[docs]def log_indices(first, last, num=100): ls = np.logspace(0, np.log10(last - first + 1), num=num) return np.unique(np.int_(ls) - 1 + first)
[docs]def correlation(function, frames): iterator = iter(frames) start_frame = next(iterator) return map(lambda f: function(start_frame, f), chain([start_frame], iterator))
[docs]def subensemble_correlation(selector_function, correlation_function=correlation): def c(function, frames): iterator = iter(frames) start_frame = next(iterator) selector = selector_function(start_frame) subensemble = map(lambda f: f[selector], chain([start_frame], iterator)) return correlation_function(function, subensemble) return c
[docs]def multi_subensemble_correlation(selector_function): """ selector_function has to expect a frame and to return either valid indices (as with subensemble_correlation) or a multidimensional array whose entries are valid indices e.g. slice(10,100,2) e.g. [1,2,3,4,5] e.g. [[[0,1],[2],[3]],[[4],[5],[6]] -> shape: 2,3 with list of indices of varying length e.g. [slice(1653),slice(1653,None,3)] e.g. [np.ones(len_of_frames, bool)] in general using slices is the most efficient. if the selections are small subsets of a frame or when many subsets are empty using indices will be more efficient than using masks. """ @set_has_counter def cmulti(function, frames): iterator = iter(frames) start_frame = next(iterator) selectors = np.asarray(selector_function(start_frame)) sel_shape = selectors.shape if sel_shape[-1] == 0: selectors = np.asarray(selectors,int) if (selectors.dtype != object): sel_shape = sel_shape[:-1] f_values = np.zeros(sel_shape + function(start_frame,start_frame).shape,) count = np.zeros(sel_shape, dtype=int) is_first_frame_loop = True def cc(act_frame): nonlocal is_first_frame_loop for index in np.ndindex(sel_shape): sel = selectors[index] sf_sel = start_frame[sel] if is_first_frame_loop: count[index] = len(sf_sel) f_values[index] = function(sf_sel, act_frame[sel]) if count[index] != 0 else 0 is_first_frame_loop = False return np.asarray(f_values.copy()) return map(cc, chain([start_frame], iterator)), count return cmulti
[docs]@autosave_data(nargs=2, kwargs_keys=( 'index_distribution', 'correlation', 'segments', 'window', 'skip', 'average' ), version='shifted_correlation-1') def shifted_correlation(function, frames, index_distribution=log_indices, correlation=correlation, segments=10, window=0.5, skip=None, average=False, ): """ Calculate the time series for a correlation function. The times at which the correlation is calculated are determined automatically by the function given as ``index_distribution``. The default is a logarithmic distribution. Args: function: The function that should be correlated frames: The coordinates of the simulation data index_distribution (opt.): A function that returns the indices for which the timeseries will be calculated correlation (function, opt.): The correlation function segments (int, opt.): The number of segments the time window will be shifted window (float, opt.): The fraction of the simulation the time series will cover skip (float, opt.): The fraction of the trajectory that will be skipped at the beginning, if this is None the start index of the frames slice will be used, which defaults to 0. counter (bool, opt.): If True, returns length of frames (in general number of particles specified) average (bool, opt.): If True, returns averaged correlation function Returns: tuple: A list of length N that contains the indices of the frames at which the time series was calculated and a numpy array of shape (segments, N) that holds the (non-avaraged) correlation data if has_counter == True: adds number of counts to output tupel. if average is returned it will be weighted. Example: Calculating the mean square displacement of a coordinates object named ``coords``: >>> indices, data = shifted_correlation(msd, coords) """ if skip is None: try: skip = frames._slice.start / len(frames) except (TypeError, AttributeError): skip = 0 assert window + skip < 1 start_frames = np.unique(np.linspace( len(frames) * skip, len(frames) * (1 - window), num=segments, endpoint=False, dtype=int )) num_frames = int(len(frames) * (window)) idx = index_distribution(0, num_frames) def correlate(start_frame): shifted_idx = idx + start_frame return correlation(function, map(frames.__getitem__, shifted_idx)) times = np.array([frames[i].time for i in idx]) - frames[0].time if getattr(correlation, "has_counter", False): if average: for i, start_frame in enumerate(start_frames): act_result, act_count = correlate(start_frame) act_result = np.array(list(act_result)) act_count = np.array(act_count) if i == 0: count = act_count cdim = act_count.ndim rdim = act_result.ndim bt = np.newaxis, for i in range(rdim - 1): if i >= cdim: bt += np.newaxis, else: bt += slice(None), result = act_result * act_count[bt] else: result += act_result * act_count[bt] count += act_count np.divide(result, count[bt], out = result, where = count[bt] != 0) result = np.moveaxis(result,0,cdim) count = count / len(start_frames) output = times, result, count else: count = [] result = [] for i, start_frame in enumerate(start_frames): act_result, act_count = correlate(start_frame) act_result = list(act_result) result.append(act_result) count.append(act_count) count = np.asarray(count) cdim = count.ndim result = np.asarray(result) result = np.moveaxis(result,1,cdim) output = times, result, count else: result = 0 if average else [] for i, start_frame in enumerate(start_frames): if average: result += np.array(list(correlate(start_frame))) else: result.append(list(correlate(start_frame))) result = np.array(result) if average: result = result / len(start_frames) output = times, result return output
[docs]def msd(start, frame): """ Mean square displacement """ vec = start - frame return (vec ** 2).sum(axis=1).mean()
[docs]def isf(start, frame, q, box=None): """ Incoherent intermediate scattering function. To specify q, use water_isf = functools.partial(isf, q=22.77) # q has the value 22.77 nm^-1 :param q: length of scattering vector """ vec = start - frame distance = (vec ** 2).sum(axis=1) ** .5 return np.sinc(distance * q / np.pi).mean()
[docs]def rotational_autocorrelation(onset, frame, order=2): """ Compute the rotaional autocorrelation of the legendre polynamial for the given vectors. Args: onset, frame: CoordinateFrames of vectors order (opt.): Order of the legendre polynomial. Returns: Skalar value of the correltaion function. """ scalar_prod = (onset * frame).sum(axis=-1) poly = legendre(order) return poly(scalar_prod).mean()
[docs]def van_hove_self(start, end, bins): r""" Compute the self part of the Van Hove autocorrelation function. ..math:: G(r, t) = \sum_i \delta(|\vec r_i(0) - \vec r_i(t)| - r) """ vec = start - end delta_r = ((vec)**2).sum(axis=-1)**.5 return 1 / len(start) * histogram(delta_r, bins)[0]
[docs]def van_hove_distinct(onset, frame, bins, box=None, use_dask=True, comp=False, bincount=True): r""" Compute the distinct part of the Van Hove autocorrelation function. ..math:: G(r, t) = \sum_{i, j} \delta(|\vec r_i(0) - \vec r_j(t)| - r) """ if box is None: box = onset.box.diagonal() dimension = len(box) N = len(onset) if use_dask: onset = darray.from_array(onset, chunks=(500, dimension)).reshape(1, N, dimension) frame = darray.from_array(frame, chunks=(500, dimension)).reshape(N, 1, dimension) dist = ((pbc_diff(onset, frame, box)**2).sum(axis=-1)**0.5).ravel() if np.diff(bins).std() < 1e6: dx = bins[0] - bins[1] hist = darray.bincount((dist // dx).astype(int), minlength=(len(bins) - 1)) else: hist = darray.histogram(dist, bins=bins)[0] return hist.compute() / N else: if comp: dx = bins[1] - bins[0] minlength = len(bins) - 1 def f(x): d = (pbc_diff(x, frame, box)**2).sum(axis=-1)**0.5 return np.bincount((d // dx).astype(int), minlength=minlength)[:minlength] hist = sum(f(x) for x in onset) else: dist = (pbc_diff(onset.reshape(1, -1, 3), frame.reshape(-1, 1, 3), box)**2).sum(axis=-1)**0.5 hist = histogram(dist, bins=bins)[0] return hist / N
[docs]def overlap(onset, frame, crds_tree, radius): """ Compute the overlap with a reference configuration defined in a CoordinatesTree. Args: onset: Initial frame, this is only used to get the frame index frame: The current configuration crds_tree: A CoordinatesTree of the reference configurations radius: The cutoff radius for the overlap This function is intended to be used with :func:`shifted_correlation`. As usual the first two arguments are used internally and the remaining ones should be defined with :func:`functools.partial`. If the overlap of a subset of the system should be calculated, this has to be defined through a selection of the reference configurations in the CoordinatesTree. Example: >>> shifted_correlation( ... partial(overlap, crds_tree=CoordinatesTree(traj), radius=0.11), ... traj ... ) """ tree = crds_tree[onset.step] return (tree.query(frame)[0] <= radius).sum() / tree.n
[docs]def susceptibility(time, correlation, **kwargs): """ Calculate the susceptibility of a correlation function. Args: time: Timesteps of the correlation data correlation: Value of the correlation function **kwargs (opt.): Additional keyword arguments will be passed to :func:`filon_fourier_transformation`. """ frequencies, fourier = filon_fourier_transformation(time, correlation, imag=False, **kwargs) return frequencies, frequencies * fourier
[docs]def coherent_scattering_function(onset, frame, q): """ Calculate the coherent scattering function. """ box = onset.box.diagonal() dimension = len(box) @numba.jit(nopython=True) def scfunc(x, y): sqdist = 0 for i in range(dimension): d = x[i] - y[i] if d > box[i] / 2: d -= box[i] if d < -box[i] / 2: d += box[i] sqdist += d**2 x = sqdist**0.5 * q if x == 0: return 1.0 else: return np.sin(x) / x return coherent_sum(scfunc, onset.pbc, frame.pbc) / len(onset)
[docs]def non_gaussian(onset, frame): """ Calculate the Non-Gaussian parameter : ..math: \alpha_2 (t) = \frac{3}{5}\frac{\langle r_i^4(t)\rangle}{\langle r_i^2(t)\rangle^2} - 1 """ r_2 = ((frame - onset)**2).sum(axis=-1) return 3 / 5 * (r_2**2).mean() / r_2.mean()**2 - 1