Source code for mdevaluate.coordinates

from functools import partial, lru_cache, wraps
from copy import copy
from .logging import logger

import numpy as np
from scipy.spatial import cKDTree, KDTree

from .atoms import AtomSubset
from .pbc import whole, nojump, pbc_diff
from .utils import mask2indices, singledispatchmethod
from .checksum import checksum


[docs]class UnknownCoordinatesMode(Exception): pass
[docs]def rotate_axis(coords, axis): """ Rotate a set of coordinates to a given axis. """ axis = np.array(axis) / np.linalg.norm(axis) zaxis = np.array([0, 0, 1]) if (axis == zaxis).sum() == 3: return coords rotation_axis = np.cross(axis, zaxis) rotation_axis = rotation_axis / np.linalg.norm(rotation_axis) theta = np.arccos(axis @ zaxis / np.linalg.norm(axis)) # return theta/pi, rotation_axis ux, uy, uz = rotation_axis cross_matrix = np.array([ [0, -uz, uy], [uz, 0, -ux], [-uy, ux, 0] ]) rotation_matrix = np.cos(theta) * np.identity(len(axis)) \ + (1 - np.cos(theta)) * rotation_axis.reshape(-1, 1) @ rotation_axis.reshape(1, -1) \ + np.sin(theta) * cross_matrix if len(coords.shape) == 2: rotated = np.array([rotation_matrix @ xyz for xyz in coords]) else: rotated = rotation_matrix @ coords return rotated
[docs]def spherical_radius(frame, origin=None): """ Transform a frame of cartesian coordinates into the sperical radius. If origin=None the center of the box is taken as the coordinates origin. """ if origin is None: origin = frame.box.diagonal() / 2 return ((frame - origin)**2).sum(axis=-1)**0.5
[docs]def polar_coordinates(x, y): """Convert cartesian to polar coordinates.""" radius = (x**2 + y**2)**0.5 phi = np.arctan2(y, x) return radius, phi
[docs]def spherical_coordinates(x, y, z): """Convert cartesian to spherical coordinates.""" xy, phi = polar_coordinates(x, y) radius = (x**2 + y**2 + z**2)**0.5 theta = np.arccos(z / radius) return radius, phi, theta
[docs]def radial_selector(frame, coordinates, rmin, rmax): """ Return a selection of all atoms with radius in the interval [rmin, rmax]. """ crd = coordinates[frame.step] rad, _ = polar_coordinates(crd[:, 0], crd[:, 1]) selector = (rad >= rmin) & (rad <= rmax) return mask2indices(selector)
[docs]def spatial_selector(frame, transform, rmin, rmax): """ Select a subset of atoms which have a radius between rmin and rmax. Coordinates are filtered by the condition:: rmin <= transform(frame) <= rmax Args: frame: The coordinates of the actual trajectory transform: A function that transforms the coordinates of the frames into the one-dimensional spatial coordinate (e.g. radius). rmin: Minimum value of the radius rmax: Maximum value of the radius """ r = transform(frame) selector = (rmin <= r) & (rmax >= r) return mask2indices(selector)
[docs]class CoordinateFrame(np.ndarray): _known_modes = ('pbc', 'whole', 'nojump') @property def box(self): return np.array(self.coordinates.frames[self.step].box) @property def volume(self): return self.box.diagonal().cumprod()[-1] @property def time(self): return self.coordinates.frames[self.step].time @property def masses(self): return self.coordinates.atoms.masses[self.coordinates.atom_subset.selection] @property def charges(self): return self.coordinates.atoms.charges[self.coordinates.atom_subset.selection] @property def residue_ids(self): return self.coordinates.atom_subset.residue_ids @property def residue_names(self): return self.coordinates.atom_subset.residue_names @property def atom_names(self): return self.coordinates.atom_subset.atom_names @property def indices(self): return self.coordinates.atom_subset.indices @property def selection(self): return self.coordinates.atom_subset.selection @property def whole(self): frame = whole(self) frame.mode = 'whole' return frame @property def pbc(self): frame = self % self.box.diagonal() frame.mode = 'pbc' return frame @property def nojump(self): if self.mode != 'nojump': frame = nojump(self) frame.mode = 'nojump' return frame else: return self def __new__(subtype, shape, dtype=float, buffer=None, offset=0, strides=None, order=None, coordinates=None, step=None, box=None, mode=None): obj = np.ndarray.__new__(subtype, shape, dtype, buffer, offset, strides) obj.coordinates = coordinates obj.step = step obj.mode = mode return obj def __array_finalize__(self, obj): if obj is None: return self.coordinates = getattr(obj, 'coordinates', None) self.step = getattr(obj, 'step', None) self.mode = getattr(obj, 'mode', None) if hasattr(obj, 'reference'): self.reference = getattr(obj, 'reference')
[docs]class Coordinates: """ Coordinates represent trajectory data, which is used for evaluation functions. Atoms may be selected by specifing a atom_subset or a atom_filter. """
[docs] def get_mode(self, mode): if self.atom_subset is not None: return Coordinates(frames=self.frames, atom_subset=self.atom_subset, mode=mode)[self._slice] else: return Coordinates(frames=self.frames, atom_filter=self.atom_filter, mode=mode)[self._slice]
@property def pbc(self): return self.get_mode('pbc') @property def whole(self): return self.get_mode('whole') @property def nojump(self): return self.get_mode('nojump') @property def mode(self): return self._mode @mode.setter def mode(self, val): if val in CoordinateFrame._known_modes: logger.warn('Changing the Coordinates mode directly is deprecated. Use Coordinates.%s instead, which returns a copy.', val) self._mode = val else: raise UnknownCoordinatesMode('No such mode: {}'.format(val)) def __init__(self, frames, atom_filter=None, atom_subset: AtomSubset=None, mode=None): """ Args: frames: The trajectory reader atom_filter (opt.): A mask which selects a subset of the system atom_subset (opt.): A AtomSubset that selects a subset of the system mode (opt.): PBC mode of the Coordinates, can be pbc, whole or nojump. Note: The caching in Coordinates is deprecated, use the CachedReader or the function open from the reader module instead. """ self._mode = mode self.frames = frames self._slice = slice(None) assert atom_filter is None or atom_subset is None, "Cannot use both: subset and filter" if atom_filter is not None: self.atom_filter = atom_filter self.atom_subset = None elif atom_subset is not None: self.atom_filter = atom_subset.selection self.atom_subset = atom_subset self.atoms = atom_subset.atoms else: self.atom_filter = np.ones(shape=(len(frames[0].coordinates),), dtype=bool) self.atom_subset = None
[docs] def get_frame(self, fnr): """Returns the fnr-th frame.""" try: if self.atom_filter is not None: frame = self.frames[fnr].positions[self.atom_filter].view(CoordinateFrame) else: frame = self.frames.__getitem__(fnr).positions.view(CoordinateFrame) frame.coordinates = self frame.step = fnr if self.mode is not None: frame = getattr(frame, self.mode) except EOFError: raise IndexError return frame
[docs] def clear_cache(self): """Clears the frame cache, if it is enabled.""" if hasattr(self.get_frame, 'clear_cache'): self.get_frame.clear_cache()
def __iter__(self): for i in range(len(self))[self._slice]: yield self[i] @singledispatchmethod def __getitem__(self, item): return self.get_frame(item) @__getitem__.register(slice) def _(self, item): sliced = copy(self) sliced._slice = item return sliced def __len__(self): return len(self.frames) def __checksum__(self): return checksum(self.frames, self.atom_filter, self._slice, self.mode) def __repr__(self): return "Coordinates <{}>: {}".format(self.frames.filename, self.atom_subset) @wraps(AtomSubset.subset) def subset(self, **kwargs): return Coordinates(self.frames, atom_subset=self.atom_subset.subset(**kwargs), mode=self._mode) @property def description(self): return self.atom_subset.description @description.setter def description(self, desc): self.atom_subset.description = desc
[docs]class MeanCoordinates(Coordinates): def __init__(self, frames, atom_filter=None, mean=1): super().__init__(frames, atom_filter) self.mean = mean assert mean >= 1, "Mean must be positive" def __getitem__(self, item): frame = super().__getitem__(item) for i in range(item + 1, item + self.mean): frame += super().__getitem__(i) return frame / self.mean
[docs] def len(self): return len(super() - self.mean + 1)
[docs]class CoordinatesMap: def __init__(self, coordinates, function): self.coordinates = coordinates self.frames = self.coordinates.frames self.atom_subset = self.coordinates.atom_subset self.function = function if isinstance(function, partial): self._description = self.function.func.__name__ else: self._description = self.function.__name__ def __iter__(self): for frame in self.coordinates: step = frame.step frame = self.function(frame) if not isinstance(frame, CoordinateFrame): frame = frame.view(CoordinateFrame) frame.coordinates = self frame.step = step yield frame def __getitem__(self, item): if isinstance(item, slice): return self.__class__(self.coordinates[item], self.function) else: frame = self.function(self.coordinates.__getitem__(item)) if not isinstance(frame, CoordinateFrame): frame = frame.view(CoordinateFrame) frame.coordinates = self frame.step = item return frame def __len__(self): return len(self.coordinates.frames) def __checksum__(self): return checksum(self.coordinates, self.function) @wraps(Coordinates.subset) def subset(self, **kwargs): return CoordinatesMap(self.coordinates.subset(**kwargs), self.function) @property def description(self): return '{}_{}'.format(self._description, self.coordinates.description) @description.setter def description(self, desc): self._description = desc @property def nojump(self): return CoordinatesMap(self.coordinates.nojump, self.function) @property def whole(self): return CoordinatesMap(self.coordinates.whole, self.function) @property def pbc(self): return CoordinatesMap(self.coordinates.pbc, self.function)
[docs]class CoordinatesFilter: @property def atom_subset(self): pass def __init__(self, coordinates, atom_filter): self.coordinates = coordinates self.atom_filter = atom_filter def __getitem__(self, item): if isinstance(item, slice): sliced = copy(self) sliced.coordinates = self.coordinates[item] return sliced else: frame = self.coordinates[item] return frame[self.atom_filter]
[docs]class CoordinatesKDTree: """ A KDTree of coordinates frames. The KDtrees are cached by a :func:`functools.lru_cache`. Uses :class:`scipy.spatial.cKDTree` by default, since it's significantly faster. Make sure to use scipy 0.17 or later or switch to the normal KDTree, since cKDTree has a memory leak in earlier versions. """
[docs] def clear_cache(self): """Clear the LRU cache.""" self._get_tree_at_index.cache_clear()
@property def cache_info(self): """Return info about the state of the cache.""" return self._get_tree_at_index.cache_info() def _get_tree_at_index(self, index): frame = self.frames[index] return self.kdtree(frame[self.selector(frame)]) def __init__(self, frames, selector=None, boxsize=None, maxcache=128, ckdtree=True): """ Args: frames: Trajectory of the simulation, can be Coordinates object or reader selector: Selector function that selects a subset of each frame maxcache: Maxsize of the :func:`~functools.lru_cache` ckdtree: Use :class:`~scipy.spatial.cKDTree` or :class:`~scipy.spatial.KDTree` if False """ if selector is not None: self.selector = selector else: self.selector = lambda x: slice(None) self.frames = frames self.kdtree = cKDTree if ckdtree else KDTree if boxsize is not None: self.kdtree = partial(self.kdtree, boxsize=boxsize) self._get_tree_at_index = lru_cache(maxsize=maxcache)(self._get_tree_at_index) def __getitem__(self, index): return self._get_tree_at_index(index) def __checksum__(self): return checksum(self.selector, self.frames) def __eq__(self, other): return super().__eq__(other)
[docs]def map_coordinates(func): @wraps(func) def wrapped(coordinates, **kwargs): return CoordinatesMap(coordinates, partial(func, **kwargs)) return wrapped
[docs]@map_coordinates def centers_of_mass(c, *, masses=None): """ A- 1 B- 2 A- 1 C 3 A- B- A- C A- B- A- C Example: rd = XTCReader('t.xtc') coordinates = Coordinates(rd) com = centers_of_mass(coordinates, (1.0, 2.0, 1.0, 3.0)) """ # At first, regroup our array number_of_masses = len(masses) number_of_coordinates, number_of_dimensions = c.shape number_of_new_coordinates = number_of_coordinates // number_of_masses grouped_masses = c.reshape(number_of_new_coordinates, number_of_masses, number_of_dimensions) return np.average(grouped_masses, axis=1, weights=masses)
[docs]@map_coordinates def pore_coordinates(coordinates, origin, sym_axis='z'): """ Map coordinates of a pore simulation so the pore has cylindrical symmetry. Args: coordinates: Coordinates of the simulation origin: Origin of the pore which will be the coordinates origin after mapping sym_axis (opt.): Symmtery axis of the pore, may be a literal direction 'x', 'y' or 'z' or an array of shape (3,) """ if sym_axis in ('x', 'y', 'z'): rot_axis = np.zeros(shape=(3,)) rot_axis[['x', 'y', 'z'].index(sym_axis)] = 1 else: rot_axis = sym_axis return rotate_axis(coordinates - origin, rot_axis)
[docs]@map_coordinates def vectors(coordinates, atoms_a, atoms_b, normed=False, box=None): """ Compute the vectors between the atoms of two subsets. Args: coordinates: The Coordinates object the atoms will be taken from atoms_a: Mask or indices of the first atom subset atoms_b: Mask or indices of the second atom subset normed (opt.): If the vectors should be normed box (opt.): If not None, the vectors are calcualte with PBC The defintion of atoms_a/b can be any possible subript of a numpy array. They can, for example, be given as a masking array of bool values with the same length as the frames of the coordinates. Or they can be a list of indices selecting the atoms of these indices from each frame. It is possible to compute the mean of several atoms before calculating the vectors, by using a two-dimensional list of indices. The following code computes the vectors between atoms 0, 3, 6 and the mean coordinate of atoms 1, 4, 7 and 2, 5, 8:: >>> inds_a = [0, 3, 6] >>> inds_b = [[1, 4, 7], [2, 5, 8]] >>> vectors(coords, inds_a, inds_b) array([ coords[0] - (coords[1] + coords[2])/2, coords[3] - (coords[4] + coords[5])/2, coords[6] - (coords[7] + coords[8])/2, ]) """ coords_a = coordinates[atoms_a] if len(coords_a.shape) > 2: coords_a = coords_a.mean(axis=0) coords_b = coordinates[atoms_b] if len(coords_b.shape) > 2: coords_b = coords_b.mean(axis=0) vectors = pbc_diff(coords_a, coords_b, box=box) norm = np.linalg.norm(vectors, axis=-1).reshape(-1, 1) if normed else 1 vectors.reference = coords_a return vectors / norm