import numpy as np
from .coordinates import rotate_axis, polar_coordinates, spherical_coordinates
from .atoms import next_neighbors
from .autosave import autosave_data
from .utils import runningmean
from .pbc import pbc_diff, pbc_points
from .logging import logger
from scipy import spatial
[docs]@autosave_data(nargs=2, kwargs_keys=('coordinates_b',), version='time_average-1')
def time_average(function, coordinates, coordinates_b=None, pool=None):
"""
Compute the time average of a function.
Args:
function:
The function that will be averaged, it has to accept exactly one argument
which is the current atom set
coordinates: The coordinates object of the simulation
pool (multiprocessing.Pool, opt.):
A multiprocessing pool which will be used for cocurrent calculation of the
averaged function
"""
if pool is not None:
_map = pool.imap
else:
_map = map
number_of_averages = 0
result = 0
if coordinates_b is not None:
if coordinates._slice != coordinates_b._slice:
logger.warning("Different slice for coordinates and coordinates_b.")
coordinate_iter = (iter(coordinates), iter(coordinates_b))
else:
coordinate_iter = (iter(coordinates),)
evaluated = _map(function, *coordinate_iter)
for ev in evaluated:
number_of_averages += 1
result += ev
if number_of_averages % 100 == 0:
logger.debug('time_average: %d', number_of_averages)
return result / number_of_averages
[docs]def time_histogram(function, coordinates, bins, hist_range, pool=None):
coordinate_iter = iter(coordinates)
if pool is not None:
_map = pool.imap
else:
_map = map
evaluated = _map(function, coordinate_iter)
results = []
hist_results = []
for num, ev in enumerate(evaluated):
results.append(ev)
if num % 100 == 0 and num > 0:
print(num)
r = np.array(results).T
for i, row in enumerate(r):
histo, _ = np.histogram(row, bins=bins, range=hist_range)
if len(hist_results) <= i:
hist_results.append(histo)
else:
hist_results[i] += histo
results = []
return hist_results
[docs]def rdf(atoms_a, atoms_b=None, bins=None, box=None, kind=None, chunksize=50000, returnx=False, **kwargs):
r"""
Compute the radial pair distribution of one or two sets of atoms.
.. math::
g_{AB}(r) = \frac{1}{\langle \rho_B\rangle N_A}\sum\limits_{i\in A}^{N_A}
\sum\limits_{j\in B}^{N_B}\frac{\delta(r_{ij} -r)}{4\pi r^2}
For use with :func:`time_average`, define bins through the use of :func:`~functools.partial`,
the atom sets are passed to :func:`time_average`, if a second set of atoms should be used
specify it as ``coordinates_b`` and it will be passed to this function.
Args:
atoms_a: First set of atoms, used internally
atoms_b (opt.): Second set of atoms, used internally
bins: Bins of the radial distribution function
box (opt.): Simulations box, if not specified this is taken from ``atoms_a.box``
kind (opt.): Can be 'inter', 'intra' or None (default).
chunksize (opt.):
For large systems (N > 1000) the distaces have to be computed in chunks so the arrays
fit into memory, this parameter controlls the size of these chunks. It should be
as large as possible, depending on the available memory.
returnx (opt.): If True the x ordinate of the histogram is returned.
"""
assert bins is not None, 'Bins of the pair distribution have to be defined.'
assert kind in ['intra', 'inter', None], 'Argument kind must be one of the following: intra, inter, None.'
if box is None:
box = atoms_a.box.diagonal()
if atoms_b is None:
atoms_b = atoms_a
nr_of_atoms = len(atoms_a)
indices = np.triu_indices(nr_of_atoms, k=1)
else:
nr_a, dim = atoms_a.shape
nr_b, dim = atoms_b.shape
indices = np.array([(i, j) for i in range(nr_a) for j in range(nr_b)]).T
# compute the histogram in chunks for large systems
hist = 0
nr_of_samples = 0
for chunk in range(0, len(indices[0]), chunksize):
sl = slice(chunk, chunk + chunksize)
diff = pbc_diff(atoms_a[indices[0][sl]], atoms_b[indices[1][sl]], box)
dist = (diff**2).sum(axis=1)**0.5
if kind == 'intra':
mask = atoms_a.residue_ids[indices[0][sl]] == atoms_b.residue_ids[indices[1][sl]]
dist = dist[mask]
elif kind == 'inter':
mask = atoms_a.residue_ids[indices[0][sl]] != atoms_b.residue_ids[indices[1][sl]]
dist = dist[mask]
nr_of_samples += len(dist)
hist += np.histogram(dist, bins)[0]
volume = 4 / 3 * np.pi * (bins[1:]**3 - bins[:-1]**3)
density = nr_of_samples / np.prod(box)
res = hist / volume / density
if returnx:
return np.vstack((runningmean(bins, 2), res))
else:
return res
[docs]def pbc_tree_rdf(atoms_a, atoms_b=None, bins=None, box=None, exclude=0, returnx=False, **kwargs):
if box is None:
box = atoms_a.box.diagonal()
all_coords = pbc_points(pbc_diff(atoms_b,box=box), box, thickness=np.amax(bins)+0.1, center=0)
to_tree = spatial.cKDTree(all_coords)
dist = to_tree.query(pbc_diff(atoms_a,box=box),k=len(atoms_b), distance_upper_bound=np.amax(bins)+0.1)[0].flatten()
dist = dist[dist < np.inf]
hist = np.histogram(dist, bins)[0]
volume = 4/3*np.pi*(bins[1:]**3-bins[:-1]**3)
res = (hist) * np.prod(box) / volume / len(atoms_a) / (len(atoms_b)-exclude)
if returnx:
return np.vstack((runningmean(bins, 2), res))
else:
return res
[docs]def pbc_spm_rdf(atoms_a, atoms_b=None, bins=None, box=None, exclude=0, returnx=False, **kwargs):
if box is None:
box = atoms_a.box.diagonal()
all_coords = pbc_points(pbc_diff(atoms_b,box=box), box, thickness=np.amax(bins)+0.1, center=0)
to_tree = spatial.cKDTree(all_coords)
if all_coords.nbytes/1024**3 * len(atoms_a) < 2:
from_tree = spatial.cKDTree(pbc_diff(atoms_a,box=box))
dist = to_tree.sparse_distance_matrix(from_tree, max_distance=np.amax(bins)+0.1, output_type='ndarray')
dist = np.asarray(dist.tolist())[:,2]
hist = np.histogram(dist, bins)[0]
else:
chunksize = int(2 * len(atoms_a) / (all_coords.nbytes/1024**3 * len(atoms_a)))
hist = 0
for chunk in range(0, len(atoms_a), chunksize):
sl = slice(chunk, chunk + chunksize)
from_tree = spatial.cKDTree(pbc_diff(atoms_a[sl],box=box))
dist = to_tree.sparse_distance_matrix(from_tree, max_distance=np.amax(bins)+0.1, output_type='ndarray')
dist = np.asarray(dist.tolist())[:,2]
hist += np.histogram(dist, bins)[0]
volume = 4/3*np.pi*(bins[1:]**3-bins[:-1]**3)
res = (hist) * np.prod(box) / volume / len(atoms_a) / (len(atoms_b)-exclude)
if returnx:
return np.vstack((runningmean(bins, 2), res))
else:
return res
[docs]@autosave_data(nargs=2, kwargs_keys=('to_coords','times'))
def fast_averaged_rdf(from_coords, bins, to_coords=None, times=10, exclude=0, **kwargs):
if to_coords is None:
to_coords = from_coords
exclude = 1
# first find timings for the different rdf functions
import time
# only consider sparse matrix for this condition
if (len(from_coords[0])*len(to_coords[0]) <= 3000 * 2000 ) & (len(from_coords[0])/len(to_coords[0]) > 5 ):
funcs = [rdf, pbc_tree_rdf, pbc_spm_rdf]
else:
funcs = [rdf, pbc_tree_rdf]
timings = []
for f in funcs:
start = time.time()
f(from_coords[0], atoms_b=to_coords[0], bins=bins, box=np.diag(from_coords[0].box))
end = time.time()
timings.append(end-start)
timings = np.array(timings)
timings[0] = 2*timings[0] # statistics for the other functions is twice as good per frame
logger.debug('rdf function timings: ' + str(timings))
rdffunc = funcs[np.argmin(timings)]
logger.debug('rdf function used: ' + str(rdffunc))
if rdffunc == rdf:
times = times*2 # duplicate times for same statistics
frames = np.array(range(0, len(from_coords), int(len(from_coords)/times)))[:times]
out = np.zeros(len(bins)-1)
for j, i in enumerate(frames):
logger.debug('multi_radial_pair_distribution: %d/%d', j, len(frames))
out += rdffunc(from_coords[i], to_coords[i], bins, box=np.diag(from_coords[i].box), exclude=exclude)
return out/len(frames)
[docs]def distance_distribution(atoms, bins):
connection_vectors = atoms[:-1, :] - atoms[1:, :]
connection_lengths = (connection_vectors**2).sum(axis=1)**.5
return np.histogram(connection_lengths, bins)[0]
[docs]def tetrahedral_order(atoms, reference_atoms=None):
if reference_atoms is None:
reference_atoms = atoms
indices = next_neighbors(reference_atoms, query_atoms=atoms, number_of_neighbors=4)
neighbors = reference_atoms[indices]
neighbors_1, neighbors_2, neighbors_3, neighbors_4 = \
neighbors[:, 0, :], neighbors[:, 1, :], neighbors[:, 2, :], neighbors[:, 3, :]
# Connection vectors
neighbors_1 -= atoms
neighbors_2 -= atoms
neighbors_3 -= atoms
neighbors_4 -= atoms
# Normed Connection vectors
neighbors_1 /= np.linalg.norm(neighbors_1, axis=-1).reshape(-1, 1)
neighbors_2 /= np.linalg.norm(neighbors_2, axis=-1).reshape(-1, 1)
neighbors_3 /= np.linalg.norm(neighbors_3, axis=-1).reshape(-1, 1)
neighbors_4 /= np.linalg.norm(neighbors_4, axis=-1).reshape(-1, 1)
a_1_2 = ((neighbors_1 * neighbors_2).sum(axis=1) + 1 / 3)**2
a_1_3 = ((neighbors_1 * neighbors_3).sum(axis=1) + 1 / 3)**2
a_1_4 = ((neighbors_1 * neighbors_4).sum(axis=1) + 1 / 3)**2
a_2_3 = ((neighbors_2 * neighbors_3).sum(axis=1) + 1 / 3)**2
a_2_4 = ((neighbors_2 * neighbors_4).sum(axis=1) + 1 / 3)**2
a_3_4 = ((neighbors_3 * neighbors_4).sum(axis=1) + 1 / 3)**2
q = 1 - 3 / 8 * (a_1_2 + a_1_3 + a_1_4 + a_2_3 + a_2_4 + a_3_4)
return q
[docs]def tetrahedral_order_distribution(atoms, reference_atoms=None, bins=None):
assert bins is not None, 'Bin edges of the distribution have to be specified.'
Q = tetrahedral_order(atoms, reference_atoms=reference_atoms)
return np.histogram(Q, bins=bins)[0]
[docs]def radial_density(atoms, bins, symmetry_axis=(0, 0, 1), origin=(0, 0, 0), height=1, returnx=False):
"""
Calculate the radial density distribution.
This function is meant to be used with time_average.
Args:
atoms:
Set of coordinates.
bins:
Bin specification that is passed to numpy.histogram. This needs to be
a list of bin edges if the function is used within time_average.
symmetry_axis (opt.):
Vector of the symmetry axis, around which the radial density is calculated,
default is z-axis.
origin (opt.):
Origin of the rotational symmetry, e.g. center of the pore.
height (opt.):
Height of the pore, necessary for correct normalization of the density.
returnx (opt.):
If True, the x ordinate of the distribution is returned.
"""
cartesian = rotate_axis(atoms - origin, symmetry_axis)
radius, _ = polar_coordinates(cartesian[:, 0], cartesian[:, 1])
hist = np.histogram(radius, bins=bins)[0]
volume = np.pi * (bins[1:]**2 - bins[:-1]**2) * height
res = hist / volume
if returnx:
return np.vstack((runningmean(bins, 2), res))
else:
return res
[docs]def shell_density(atoms, shell_radius, bins, shell_thickness=0.5,
symmetry_axis=(0, 0, 1), origin=(0, 0, 0)):
"""
Compute the density distribution on a cylindrical shell.
Args:
atoms: The coordinates of the atoms
shell_radius: Inner radius of the shell
bins: Histogramm bins, this has to be a two-dimensional list of bins: [angle, z]
shell_thickness (opt.): Thicknes of the shell, default is 0.5
symmetry_axis (opt.): The symmtery axis of the pore, the coordinates will be
rotated such that this axis is the z-axis
origin (opt.): Origin of the pore, the coordinates will be moved such that this
is the new origin.
Returns:
Two-dimensional density distribution of the atoms in the defined shell.
"""
cartesian = rotate_axis(atoms-origin, symmetry_axis)
radius, theta = polar_coordinates(cartesian[:, 0], cartesian[:, 1])
shell_indices = (shell_radius <= radius) & (radius <= shell_radius + shell_thickness)
hist = np.histogram2d(theta[shell_indices], cartesian[shell_indices, 2], bins)[0]
return hist
[docs]def spatial_density(atoms, bins, weights=None):
"""
Compute the spatial density distribution.
"""
density, _ = np.histogramdd(atoms, bins=bins, weights=weights)
return density
[docs]def mixing_ratio_distribution(atoms_a, atoms_b, bins_ratio, bins_density,
weights_a=None, weights_b=None, weights_ratio=None):
"""
Compute the distribution of the mixing ratio of two sets of atoms.
"""
density_a, _ = time_average
density_b, _ = np.histogramdd(atoms_b, bins=bins_density, weights=weights_b)
mixing_ratio = density_a/(density_a + density_b)
good_inds = (density_a != 0) & (density_b != 0)
hist, _ = np.histogram(mixing_ratio[good_inds], bins=bins_ratio, weights=weights_ratio)
return hist
[docs]def next_neighbor_distribution(atoms, reference=None, number_of_neighbors=4, bins=None, normed=True):
"""
Compute the distribution of next neighbors with the same residue name.
"""
assert bins is not None, 'Bins have to be specified.'
if reference is None:
reference = atoms
nn = next_neighbors(reference, query_atoms=atoms, number_of_neighbors=number_of_neighbors)
resname_nn = reference.residue_names[nn]
count_nn = (resname_nn == atoms.residue_names.reshape(-1, 1)).sum(axis=1)
return np.histogram(count_nn, bins=bins, normed=normed)[0]