Source code for mdevaluate.reader

"""
Module that provides different readers for trajectory files.

It also provides a common interface layer between the file IO packages,
namely pygmx and mdanalysis, and mdevaluate.
"""
from .checksum import checksum
from .logging import logger
from . import atoms

from functools import lru_cache
from collections import namedtuple
import os
from os import path
from array import array
from zipfile import BadZipFile
import builtins
import warnings

import numpy as np
from scipy import sparse
from dask import delayed, __version__ as DASK_VERSION


try:
    import pygmx
    from pygmx.errors import InvalidMagicException, InvalidIndexException, FileTypeError
    PYGMX_AVAILABLE = True
except ImportError:
    PYGMX_AVAILABLE = False

try:
    with warnings.catch_warnings():
        warnings.simplefilter('ignore', category=UserWarning)
        import MDAnalysis as mdanalysis
    MADANALYSIS_AVAILABLE = True
except ImportError:
    MADANALYSIS_AVAILABLE = False

assert PYGMX_AVAILABLE or MADANALYSIS_AVAILABLE, 'Could not import any file IO package; make sure too install either pygmx or mdanalysis.'


[docs]class NojumpError(Exception): pass
[docs]class NoReaderAvailabelError(Exception): pass
[docs]def open_with_mdanalysis(topology, trajectory, cached=False): """Open a the topology and trajectory with mdanalysis.""" uni = mdanalysis.Universe(topology, trajectory, convert_units=False) if cached is not False: if cached is True: maxsize = 128 else: maxsize = cached reader = CachedReader(uni.trajectory, maxsize) else: reader = BaseReader(uni.trajectory) reader.universe = uni atms = atoms.Atoms( np.stack((uni.atoms.resids, uni.atoms.resnames, uni.atoms.names), axis=1), charges=uni.atoms.charges, masses=uni.atoms.masses ).subset() return atms, reader
[docs]def open_with_pygmx(topology, trajectory, cached=False, reindex=False, ignore_index_timestamps=False, index_file=None): """Open a topology and trajectory with pygmx.""" try: rd = pygmx.open(trajectory, ignore_index_timestamps=ignore_index_timestamps) except InvalidMagicException: raise InvalidIndexException('This is not a valid index file: {}'.format(trajectory)) except InvalidIndexException: if reindex: try: os.remove(pygmx.index_filename_for_xtc(trajectory)) except FileNotFoundError: pass rd = pygmx.open(trajectory) else: raise InvalidIndexException('Index file is invalid, us reindex=True to regenerate.') if cached is not False: if isinstance(cached, bool): maxsize = 128 else: maxsize = cached reader = CachedReader(rd, maxsize) else: reader = BaseReader(rd) if topology.endswith('.tpr'): atms = atoms.from_tprfile(topology, index_file=index_file) elif topology.endswith('.gro'): atms = atoms.from_grofile(topology, index_file=index_file) return atms, reader
[docs]def open(topology, trajectory, cached=False, index_file=None, reindex=False, ignore_index_timestamps=False): """ Open a trajectory file with the apropiate reader. Args: filename (str): Trajectory file to open, the reader will be chosen according to the file extension. cached (opt.): If Reader should be cached with lru_cache. If this is True, maxsize for the cache is 128, otherwise the argument is passed as maxsize. Use cached=None to get an unbound cache. reindex (opt.): Regenerate the index of the xtc-file nojump (opt.): If nojump matrixes should be generated. """ if PYGMX_AVAILABLE and trajectory.endswith('.xtc') and topology.endswith(('.tpr', '.gro')): return open_with_pygmx(topology, trajectory, cached=cached, reindex=reindex, ignore_index_timestamps=ignore_index_timestamps, index_file=index_file) elif MADANALYSIS_AVAILABLE: return open_with_mdanalysis(topology, trajectory, cached) else: raise NoReaderAvailabelError('No reader package found, install pygmx or mdanalysis.')
[docs]def is_writeable(fname): """Test if a directory is actually writeable, by writing a temporary file.""" fdir = os.path.dirname(fname) ftmp = os.path.join(fdir, str(np.random.randint(999999999))) while os.path.exists(ftmp): ftmp = os.path.join(fdir, str(np.random.randint(999999999))) if os.access(fdir, os.W_OK): try: with builtins.open(ftmp, 'w'): pass os.remove(ftmp) return True except PermissionError: pass return False
[docs]def nojump_filename(reader): directory, fname = path.split(reader.filename) fname = path.join(directory, '.{}.nojump.npz'.format(fname)) if os.path.exists(fname) or is_writeable(directory): return fname else: fname = os.path.join( os.path.join(os.environ['HOME'], '.mdevaluate/nojump'), directory.lstrip('/'), '.{}.nojump.npz'.format(fname) ) logger.info('Saving nojump to {}, since original location is not writeable.'.format(fname)) os.makedirs(os.path.dirname(fname), exist_ok=True) return fname
CSR_ATTRS = ('data', 'indices', 'indptr') NOJUMP_MAGIC = 2016
[docs]def parse_jumps(trajectory): prev = trajectory[0].whole box = prev.box.diagonal() SparseData = namedtuple('SparseData', ['data', 'row', 'col']) jump_data = ( SparseData(data=array('b'), row=array('l'), col=array('l')), SparseData(data=array('b'), row=array('l'), col=array('l')), SparseData(data=array('b'), row=array('l'), col=array('l')) ) for i, curr in enumerate(trajectory): if i % 500 == 0: logger.debug('Parse jumps Step: %d', i) delta = ((curr - prev) / box).round().astype(np.int8) prev = curr for d in range(3): col, = np.where(delta[:, d] != 0) jump_data[d].col.extend(col) jump_data[d].row.extend([i] * len(col)) jump_data[d].data.extend(delta[col, d]) return jump_data
[docs]def generate_nojump_matrixes(trajectory): """ Create the matrixes with pbc jumps for a trajectory. """ logger.info('generate Nojump Matrixes for: {}'.format(trajectory)) jump_data = parse_jumps(trajectory) N = len(trajectory) M = len(trajectory[0]) trajectory.frames.nojump_matrixes = tuple( sparse.csr_matrix((np.array(m.data), (m.row, m.col)), shape=(N, M)) for m in jump_data ) save_nojump_matrixes(trajectory.frames)
[docs]def save_nojump_matrixes(reader, matrixes=None): if matrixes is None: matrixes = reader.nojump_matrixes data = {'checksum': checksum(NOJUMP_MAGIC, checksum(reader))} for d, mat in enumerate(matrixes): data['shape'] = mat.shape for attr in CSR_ATTRS: data['{}_{}'.format(attr, d)] = getattr(mat, attr) np.savez(nojump_filename(reader), **data)
[docs]def load_nojump_matrixes(reader): zipname = nojump_filename(reader) try: data = np.load(zipname) except (AttributeError, BadZipFile, OSError): # npz-files can be corrupted, propably a bug for big arrays saved with savez_compressed? logger.info('Removing zip-File: %s', zipname) os.remove(nojump_filename(reader)) return try: if data['checksum'] == checksum(NOJUMP_MAGIC, checksum(reader)): reader.nojump_matrixes = tuple( sparse.csr_matrix( tuple(data['{}_{}'.format(attr, d)] for attr in CSR_ATTRS), shape=data['shape'] ) for d in range(3) ) logger.info('Loaded Nojump Matrixes: {}'.format(nojump_filename(reader))) else: logger.info('Invlaid Nojump Data: {}'.format(nojump_filename(reader))) except KeyError: logger.info('Removing zip-File: %s', zipname) os.remove(nojump_filename(reader)) return
[docs]class BaseReader: """Base class for trajectory readers.""" @property def filename(self): return self.rd.filename @property def nojump_matrixes(self): if self._nojump_matrixes is None: raise NojumpError('Nojump Data not available: {}'.format(self.filename)) return self._nojump_matrixes @nojump_matrixes.setter def nojump_matrixes(self, mats): self._nojump_matrixes = mats def __init__(self, rd): """ Args: filename: Trajectory file to open. reindex (bool, opt.): If True, regenerate the index file if necessary. """ self.rd = rd self._nojump_matrixes = None if path.exists(nojump_filename(self)): load_nojump_matrixes(self) def __getitem__(self, item): return self.rd[item] def __len__(self): return len(self.rd) def __checksum__(self): if hasattr(self.rd, 'cache'): # Has an pygmx reader return checksum(self.filename, str(self.rd.cache)) elif hasattr(self.rd, '_xdr'): # Has an mdanalysis reader cache = array('L', self.rd._xdr.offsets.tobytes()) return checksum(self.filename, str(cache))
[docs]class CachedReader(BaseReader): """A reader that has a least-recently-used cache for frames.""" @property def cache_info(self): """Get Information about the lru cache.""" return self._get_item.cache_info()
[docs] def clear_cache(self): """Clear the cache of the frames.""" self._get_item.cache_clear()
def __init__(self, rd, maxsize): """ Args: filename (str): Trajectory file that will be opened. maxsize: Maximum size of the lru_cache or None for infinite cache. """ super().__init__(rd) self._get_item = lru_cache(maxsize=maxsize)(self._get_item) def _get_item(self, item): """Buffer function for lru_cache, since __getitem__ can not be cached.""" return super().__getitem__(item) def __getitem__(self, item): return self._get_item(item)
if DASK_VERSION >= '0.15.0': read_xtcframe_delayed = delayed(pure=True, traverse=False)(pygmx.read_xtcframe) else: read_xtcframe_delayed = delayed(pure=True)(pygmx.read_xtcframe)
[docs]class DelayedReader(BaseReader): @property def filename(self): if self.rd is not None: return self.rd.filename else: return self._filename def __init__(self, filename, reindex=False, ignore_index_timestamps=False): super().__init__(filename, reindex=False, ignore_index_timestamps=False) self.natoms = len(self.rd[0].coordinates) self.cache = self.rd.cache self._filename = self.rd.filename self.rd = None def __len__(self): return len(self.cache) def _get_item(self, frame): return read_xtcframe_delayed(self.filename, self.cache[frame], self.natoms) def __getitem__(self, frame): return self._get_item(frame)
[docs]class EnergyReader: """A reader for Gromacs energy files.""" def __init__(self, edrfile): """ Args: edrfile: Filename of the energy file topology (opt.): Filename of the topology, speeds up file io since the length of the energy file is known """ edr = pygmx.open(edrfile) self.time, data = edr.read() self.types, self.units = zip(*edr.types) self.data = data.T def __getitem__(self, type): """ Get time series of an energy type. """ if type in self.types: return self.data[self.types.index(type)] else: raise KeyError('Energy type {} not found in Energy File.'.format(type))