"""
Collection of utility functions.
"""
import functools
from types import FunctionType
import numpy as np
import numba
import pandas as pd
from .functions import kww, kww_1e
from scipy.ndimage.filters import uniform_filter1d
from scipy.interpolate import interp1d
from scipy.optimize import curve_fit
from .logging import logger
[docs]def five_point_stencil(xdata, ydata):
"""
Calculate the derivative dy/dx with a five point stencil.
This algorith is only valid for equally distributed x values.
Args:
xdata: x values of the data points
ydata: y values of the data points
Returns:
Values where the derivative was estimated and the value of the derivative at these points.
This algorithm is only valid for values on a regular grid, for unevenly distributed
data it is only an approximation, albeit a quite good one.
See: https://en.wikipedia.org/wiki/Five-point_stencil
"""
return xdata[2:-2], (
(-ydata[4:] + 8 * ydata[3:-1] - 8 * ydata[1:-3] + ydata[:-4]) /
(3 * (xdata[4:] - xdata[:-4]))
)
[docs]def mask2indices(mask):
"""
Return the selected indices of an array mask.
If the mask is two-dimensional, the indices will be calculated for the second axis.
Example:
>>> mask2indices([True, False, True, False])
array([0, 2])
>>> mask2indices([[True, True, False], [True, False, True]])
array([[0, 1], [0, 2]])
"""
mask = np.array(mask)
if len(mask.shape) == 1:
indices = np.where(mask)
else:
indices = np.array([np.where(m) for m in mask])
return indices
[docs]def superpose(x1, y1, x2, y2, N=100, damping=1.0):
if x2[0] == 0:
x2 = x2[1:]
y2 = y2[1:]
reg1 = x1 < x2[0]
reg2 = x2 > x1[-1]
x_ol = np.logspace(
np.log10(max(x1[~reg1][0], x2[~reg2][0]) + 0.001),
np.log10(min(x1[~reg1][-1], x2[~reg2][-1]) - 0.001),
(sum(~reg1) + sum(~reg2)) / 2
)
def w(x):
A = x_ol.min()
B = x_ol.max()
return (np.log10(B / x) / np.log10(B / A))**damping
xdata = np.concatenate((x1[reg1], x_ol, x2[reg2]))
y1_interp = interp1d(x1[~reg1], y1[~reg1])
y2_interp = interp1d(x2[~reg2], y2[~reg2])
ydata = np.concatenate((
y1[x1 < x2.min()],
w(x_ol) * y1_interp(x_ol) + (1 - w(x_ol)) * y2_interp(x_ol),
y2[x2 > x1.max()]
))
return xdata, ydata
[docs]def runningmean(data, nav):
"""
Compute the running mean of a 1-dimenional array.
Args:
data: Input data of shape (N, )
nav: Number of points over which the data will be averaged
Returns:
Array of shape (N-(nav-1), )
"""
return np.convolve(data, np.ones((nav,)) / nav, mode='valid')
[docs]def moving_average(A,n=3):
"""
Compute the running mean of an array.
Uses the second axis if it is of higher dimensionality.
Args:
data: Input data of shape (N, )
n: Number of points over which the data will be averaged
Returns:
Array of shape (N-(n-1), )
Supports 2D-Arrays.
Slower than runningmean for small n but faster for large n.
"""
k1 = int(n/2)
k2 = int((n-1)/2)
if k2 == 0:
if A.ndim > 1:
return uniform_filter1d(A,n)[:,k1:]
return uniform_filter1d(A,n)[k1:]
if A.ndim > 1:
return uniform_filter1d(A,n)[:,k1:-k2]
return uniform_filter1d(A,n)[k1:-k2]
[docs]def coherent_sum(func, coord_a, coord_b):
"""
Perform a coherent sum over two arrays :math:`A, B`.
.. math::
\\frac{1}{N_A N_B}\\sum_i\\sum_j f(A_i, B_j)
For numpy arrays this is equal to::
N, d = x.shape
M, d = y.shape
coherent_sum(f, x, y) == f(x.reshape(N, 1, d), x.reshape(1, M, d)).sum()
Args:
func: The function is called for each two items in both arrays, this should return a scalar value.
coord_a, coord_b: The two arrays.
"""
if isinstance(func, FunctionType):
func = numba.jit(func, nopython=True, cache=True)
@numba.jit(nopython=True)
def cohsum(coord_a, coord_b):
res = 0
for i in range(len(coord_a)):
for j in range(len(coord_b)):
res += func(coord_a[i], coord_b[j])
return res
return cohsum(coord_a, coord_b)
[docs]def coherent_histogram(func, coord_a, coord_b, bins, distinct=False):
"""
Compute a coherent histogram over two arrays, equivalent to coherent_sum.
For numpy arrays ofthis is equal to::
N, d = x.shape
M, d = y.shape
bins = np.arange(1, 5, 0.1)
coherent_histogram(f, x, y, bins) == histogram(f(x.reshape(N, 1, d), x.reshape(1, M, d)), bins=bins)
Args:
func: The function is called for each two items in both arrays, this should return a scalar value.
coord_a, coord_b: The two arrays.
bins: The bins used for the histogram must be distributed regular on a linear scale.
"""
if isinstance(func, FunctionType):
func = numba.jit(func, nopython=True, cache=True)
assert np.isclose(np.diff(bins).mean(), np.diff(bins)).all(), 'A regular distribution of bins is required.'
hmin = bins[0]
hmax = bins[-1]
N = len(bins) - 1
dh = (hmax - hmin) / N
@numba.jit(nopython=True)
def cohsum(coord_a, coord_b):
res = np.zeros((N,))
for i in range(len(coord_a)):
for j in range(len(coord_b)):
if not (distinct and i == j):
h = func(coord_a[i], coord_b[j])
if hmin <= h < hmax:
res[int((h - hmin) / dh)] += 1
return res
return cohsum(coord_a, coord_b)
[docs]def Sq_from_gr(r, gr, q, ρ):
r"""
Compute the static structure factor as fourier transform of the pair correlation function. [Yarnell]_
.. math::
S(q) - 1 = \\frac{4\\pi \\rho}{q}\\int\\limits_0^\\infty (g(r) - 1)\\,r \\sin(qr) dr
Args:
r: Radii of the pair correlation function
gr: Values of the pair correlation function
q: List of q values
ρ: Average number density
.. [Yarnell]
Yarnell, J. L., Katz, M. J., Wenzel, R. G., & Koenig, S. H. (1973). Physical Review A, 7(6), 2130–2144.
http://doi.org/10.1017/CBO9781107415324.004
"""
ydata = ((gr - 1) * r).reshape(-1, 1) * np.sin(r.reshape(-1, 1) * q.reshape(1, -1))
return np.trapz(x=r, y=ydata, axis=0) * (4 * np.pi * ρ / q) + 1
[docs]def Fqt_from_Grt(data, q):
"""
Calculate the ISF from the van Hove function for a given q value by fourier transform.
.. math::
F_q(t) = \\int\\limits_0^\\infty dr \\; G(r, t) \\frac{\\sin(qr)}{qr}
Args:
data:
Input data can be a pandas dataframe with columns 'r', 'time' and 'G'
or an array of shape (N, 3), of tuples (r, t, G).
q: Value of q.
Returns:
If input data was a dataframe the result will be returned as one too, else two arrays
will be returned, which will contain times and values of Fq(t) respectively.
"""
if isinstance(data, pd.DataFrame):
df = data.copy()
else:
df = pd.DataFrame(data, columns=['r', 'time', 'G'])
df['isf'] = df['G'] * np.sinc(q / np.pi * df['r'])
isf = df.groupby('time')['isf'].sum()
if isinstance(data, pd.DataFrame):
return pd.DataFrame({'time': isf.index, 'isf': isf.values, 'q': q})
else:
return isf.index, isf.values
[docs]@numba.jit
def norm(vec):
return (vec**2).sum()**0.5
[docs]def singledispatchmethod(func):
"""A decorator to define a genric instance method, analogue to functools.singledispatch."""
dispatcher = functools.singledispatch(func)
def wrapper(*args, **kw):
return dispatcher.dispatch(args[1].__class__)(*args, **kw)
wrapper.register = dispatcher.register
functools.update_wrapper(wrapper, func)
return wrapper
[docs]def histogram(data, bins):
"""Compute the histogram of the given data. Uses numpy.bincount function, if possible."""
dbins = np.diff(bins)
dx = dbins.mean()
if bins.min() == 0 and dbins.std() < 1e-6:
logger.debug("Using numpy.bincount for histogramm compuation.")
hist = np.bincount((data // dx).astype(int), minlength=len(dbins))[:len(dbins)]
else:
hist = np.histogram(data, bins=bins)[0]
return hist, runningmean(bins, 2)
[docs]def quick1etau(t, C, n=7):
"""
Estimate the time for a correlation function that goes from 1 to 0 to decay to 1/e.
If successful, returns tau as fine interpolation with a kww fit.
The data is reduce to points around 1/e to remove short and long times from the kww fit!
t is the time
C is C(t) the correlation function
n is the minimum number of points around 1/e required
"""
# first rough estimate, the closest time. This is returned if the interpolation fails!
tau_est = t[np.argmin(np.fabs(C-np.exp(-1)))]
# reduce the data to points around 1/e
k = 0.1
mask = (C < np.exp(-1)+k) & (C > np.exp(-1)-k)
while np.sum(mask) < n:
k += 0.01
mask = (C < np.exp(-1)+k) & (C > np.exp(-1)-k)
if k + np.exp(-1) > 1.0:
break
# if enough points are found, try a curve fit, else and in case of failing keep using the estimate
if np.sum(mask) >= n:
try:
with np.errstate(invalid='ignore'):
fit, _ = curve_fit(kww, t[mask], C[mask], p0=[0.9, tau_est, 0.9], maxfev=100000)
tau_est = kww_1e(*fit)
except:
pass
return tau_est