Source code for

# coding: utf-8
# Copyright (c) Pymatgen Development Team.
# Distributed under the terms of the MIT License.
"""Wrapper for netCDF readers."""

import os.path
import warnings
import numpy as np

from collections import OrderedDict
from import requires
from monty.collections import AttrDict
from monty.functools import lazy_property
from monty.string import marquee
from pymatgen.core.units import ArrayWithUnit
from pymatgen.core.xcfunc import XcFunc
from pymatgen.core.structure import Structure

import logging

logger = logging.getLogger(__name__)

__author__ = "Matteo Giantomassi"
__copyright__ = "Copyright 2013, The Materials Project"
__version__ = "0.1"
__maintainer__ = "Matteo Giantomassi"
__email__ = "gmatteo at"
__status__ = "Development"
__date__ = "$Feb 21, 2013M$"

__all__ = [

    import netCDF4
except ImportError as exc:
    netCDF4 = None
`import netCDF4` failed with the following error:


Please install netcdf4 with `conda install netcdf4`
If the conda version does not work, uninstall it with `conda uninstall hdf4 hdf5 netcdf4`
and use `pip install netcdf4`""" % str(exc))

def _asreader(file, cls):
    closeit = False
    if not isinstance(file, cls):
        file, closeit = cls(file), True
    return file, closeit

[docs]def as_ncreader(file): """ Convert file into a NetcdfReader instance. Returns reader, closeit where closeit is set to True if we have to close the file before leaving the procedure. """ return _asreader(file, NetcdfReader)
[docs]def as_etsfreader(file): """Return an ETSF_Reader. Accepts filename or ETSF_Reader.""" return _asreader(file, ETSF_Reader)
class NetcdfReaderError(Exception): """Base error class for NetcdfReader"""
[docs]class NO_DEFAULT: """Signal that read_value should raise an Error"""
[docs]class NetcdfReader: """ Wraps and extends netCDF4.Dataset. Read only mode. Supports with statements. Additional documentation available at: """ Error = NetcdfReaderError @requires(netCDF4 is not None, "netCDF4 must be installed to use this class") def __init__(self, path): """Open the Netcdf file specified by path (read mode).""" self.path = os.path.abspath(path) try: self.rootgrp = netCDF4.Dataset(self.path, mode="r") except Exception as exc: raise self.Error("In file %s: %s" % (self.path, str(exc))) self.ngroups = len(list(self.walk_tree())) # Always return non-masked numpy arrays. # Slicing a ncvar returns a MaskedArrray and this is really annoying # because it can lead to unexpected behaviour in e.g. calls to np.matmul! # See also self.rootgrp.set_auto_mask(False) def __enter__(self): """Activated when used in the with statement.""" return self def __exit__(self, type, value, traceback): """Activated at the end of the with statement. It automatically closes the file.""" self.rootgrp.close()
[docs] def close(self): """Close the file.""" try: self.rootgrp.close() except Exception as exc: logger.warning("Exception %s while trying to close %s" % (exc, self.path))
[docs] def walk_tree(self, top=None): """ Navigate all the groups in the file starting from top. If top is None, the root group is used. """ if top is None: top = self.rootgrp values = top.groups.values() yield values for value in top.groups.values(): for children in self.walk_tree(value): yield children
[docs] def print_tree(self): """Print all the groups in the file.""" for children in self.walk_tree(): for child in children: print(child)
[docs] def read_dimvalue(self, dimname, path="/", default=NO_DEFAULT): """ Returns the value of a dimension. Args: dimname: Name of the variable path: path to the group. default: return `default` if `dimname` is not present and `default` is not `NO_DEFAULT` else raise self.Error. """ try: dim = self._read_dimensions(dimname, path=path)[0] return len(dim) except self.Error: if default is NO_DEFAULT: raise return default
[docs] def read_varnames(self, path="/"): """List of variable names stored in the group specified by path.""" if path == "/": return self.rootgrp.variables.keys() else: group = self.path2group[path] return group.variables.keys()
[docs] def read_value(self, varname, path="/", cmode=None, default=NO_DEFAULT): """ Returns the values of variable with name varname in the group specified by path. Args: varname: Name of the variable path: path to the group. cmode: if cmode=="c", a complex ndarrays is constructed and returned (netcdf does not provide native support from complex datatype). default: returns default if varname is not present. self.Error is raised if default is default is NO_DEFAULT Returns: numpy array if varname represents an array, scalar otherwise. """ try: var = self.read_variable(varname, path=path) except self.Error: if default is NO_DEFAULT: raise return default if cmode is None: # scalar or array # getValue is not portable! try: return var.getValue()[0] if not var.shape else var[:] except IndexError: return var.getValue() if not var.shape else var[:] else: assert var.shape[-1] == 2 if cmode == "c": return var[..., 0] + 1j * var[..., 1] else: raise ValueError("Wrong value for cmode %s" % cmode)
[docs] def read_variable(self, varname, path="/"): """Returns the variable with name varname in the group specified by path.""" return self._read_variables(varname, path=path)[0]
def _read_dimensions(self, *dimnames, **kwargs): path = kwargs.get("path", "/") try: if path == "/": return [self.rootgrp.dimensions[dname] for dname in dimnames] else: group = self.path2group[path] return [group.dimensions[dname] for dname in dimnames] except KeyError: raise self.Error("In file %s:\nError while reading dimensions: `%s` with kwargs: `%s`" % (self.path, dimnames, kwargs)) def _read_variables(self, *varnames, **kwargs): path = kwargs.get("path", "/") try: if path == "/": return [self.rootgrp.variables[vname] for vname in varnames] else: group = self.path2group[path] return [group.variables[vname] for vname in varnames] except KeyError: raise self.Error("In file %s:\nError while reading variables: `%s` with kwargs `%s`." % (self.path, varnames, kwargs))
[docs] def read_keys(self, keys, dict_cls=AttrDict, path="/"): """ Read a list of variables/dimensions from file. If a key is not present the corresponding entry in the output dictionary is set to None. """ od = dict_cls() for k in keys: try: # Try to read a variable. od[k] = self.read_value(k, path=path) except self.Error: try: # Try to read a dimension. od[k] = self.read_dimvalue(k, path=path) except self.Error: od[k] = None return od
[docs]class ETSF_Reader(NetcdfReader): """ This object reads data from a file written according to the ETSF-IO specifications. We assume that the netcdf file contains at least the crystallographic section. """
[docs] @lazy_property def chemical_symbols(self): """Chemical symbols char [number of atom species][symbol length].""" charr = self.read_value("chemical_symbols") symbols = [] for v in charr: s = "".join(c.decode("utf-8") for c in v) symbols.append(s.strip()) return symbols
[docs] def typeidx_from_symbol(self, symbol): """Returns the type index from the chemical symbol. Note python convention.""" return self.chemical_symbols.index(symbol)
[docs] def read_structure(self, cls=Structure): """Returns the crystalline structure.""" if self.ngroups != 1: raise NotImplementedError("In file %s: ngroups != 1" % self.path) return structure_from_ncdata(self, cls=cls)
[docs] def read_abinit_xcfunc(self): """ Read ixc from an Abinit file. Return :class:`XcFunc` object. """ ixc = int(self.read_value("ixc")) return XcFunc.from_abinit_ixc(ixc)
[docs] def read_abinit_hdr(self): """ Read the variables associated to the Abinit header. Return :class:`AbinitHeader` """ d = {} for hvar in _HDR_VARIABLES.values(): ncname = hvar.etsf_name if hvar.etsf_name is not None else if ncname in self.rootgrp.variables: d[] = self.read_value(ncname) elif ncname in self.rootgrp.dimensions: d[] = self.read_dimvalue(ncname) else: raise ValueError("Cannot find `%s` in `%s`" % (ncname, self.path)) # Convert scalars to (well) scalars. if hasattr(d[], "shape") and not d[].shape: d[] = np.asarray(d[]).item() if in ("title", "md5_pseudos", "codvsn"): # Convert array of numpy bytes to list of strings if == "codvsn": d[] = "".join(bs.decode("utf-8").strip() for bs in d[]) else: d[] = ["".join(bs.decode("utf-8") for bs in astr).strip() for astr in d[]] return AbinitHeader(d)
[docs]def structure_from_ncdata(ncdata, site_properties=None, cls=Structure): """ Reads and returns a pymatgen structure from a NetCDF file containing crystallographic data in the ETSF-IO format. Args: ncdata: filename or NetcdfReader instance. site_properties: Dictionary with site properties. cls: The Structure class to instanciate. """ ncdata, closeit = as_ncreader(ncdata) # TODO check whether atomic units are used lattice = ArrayWithUnit(ncdata.read_value("primitive_vectors"), "bohr").to("ang") red_coords = ncdata.read_value("reduced_atom_positions") natom = len(red_coords) znucl_type = ncdata.read_value("atomic_numbers") # type_atom[0:natom] --> index Between 1 and number of atom species type_atom = ncdata.read_value("atom_species") # Fortran to C index and float --> int conversion. species = natom * [None] for atom in range(natom): type_idx = type_atom[atom] - 1 species[atom] = int(znucl_type[type_idx]) d = {} if site_properties is not None: for prop in site_properties: d[property] = ncdata.read_value(prop) structure = cls(lattice, species, red_coords, site_properties=d) # Quick and dirty hack. # I need an abipy structure since I need to_abivars and other methods. try: from abipy.core.structure import Structure as AbipyStructure structure.__class__ = AbipyStructure except ImportError: pass if closeit: ncdata.close() return structure
class _H: __slots__ = ["name", "doc", "etsf_name"] def __init__(self, name, doc, etsf_name=None):, self.doc, self.etsf_name = name, doc, etsf_name _HDR_VARIABLES = ( # Scalars _H("bantot", "total number of bands (sum of nband on all kpts and spins)"), _H("date", "starting date"), _H("headform", "format of the header"), _H("intxc", "input variable"), _H("ixc", "input variable"), _H("mband", "maxval(hdr%nband)", etsf_name="max_number_of_states"), _H("natom", "input variable", etsf_name="number_of_atoms"), _H("nkpt", "input variable", etsf_name="number_of_kpoints"), _H("npsp", "input variable"), _H("nspden", "input variable", etsf_name="number_of_components"), _H("nspinor", "input variable", etsf_name="number_of_spinor_components"), _H("nsppol", "input variable", etsf_name="number_of_spins"), _H("nsym", "input variable", etsf_name="number_of_symmetry_operations"), _H("ntypat", "input variable", etsf_name="number_of_atom_species"), _H("occopt", "input variable"), _H("pertcase", "the index of the perturbation, 0 if GS calculation"), _H("usepaw", "input variable (0=norm-conserving psps, 1=paw)"), _H("usewvl", "input variable (0=plane-waves, 1=wavelets)"), _H("kptopt", "input variable (defines symmetries used for k-point sampling)"), _H("pawcpxocc", "input variable"), _H("nshiftk_orig", "original number of shifts given in input (changed in inkpts, the actual value is nshiftk)"), _H("nshiftk", "number of shifts after inkpts."), _H("icoulomb", "input variable."), _H("ecut", "input variable", etsf_name="kinetic_energy_cutoff"), _H("ecutdg", "input variable (ecut for NC psps, pawecutdg for paw)"), _H("ecutsm", "input variable"), _H("ecut_eff", "ecut*dilatmx**2 (dilatmx is an input variable)"), _H("etot", "EVOLVING variable"), _H("fermie", "EVOLVING variable", etsf_name="fermi_energy"), _H("residm", "EVOLVING variable"), _H("stmbias", "input variable"), _H("tphysel", "input variable"), _H("tsmear", "input variable"), _H("nelect", "number of electrons (computed from pseudos and charge)"), _H("charge", "input variable"), # Arrays _H("qptn", "qptn(3) the wavevector, in case of a perturbation"), # _H("rprimd", "rprimd(3,3) EVOLVING variables", etsf_name="primitive_vectors"), # _H(ngfft, "ngfft(3) input variable", number_of_grid_points_vector1" # _H("nwvlarr", "nwvlarr(2) the number of wavelets for each resolution.", etsf_name="number_of_wavelets"), _H("kptrlatt_orig", "kptrlatt_orig(3,3) Original kptrlatt"), _H("kptrlatt", "kptrlatt(3,3) kptrlatt after inkpts."), _H("istwfk", "input variable istwfk(nkpt)"), _H("lmn_size", "lmn_size(npsp) from psps"), _H("nband", "input variable nband(nkpt*nsppol)", etsf_name="number_of_states"), _H("npwarr", "npwarr(nkpt) array holding npw for each k point", etsf_name="number_of_coefficients"), _H("pspcod", "pscod(npsp) from psps"), _H("pspdat", "psdat(npsp) from psps"), _H("pspso", "pspso(npsp) from psps"), _H("pspxc", "pspxc(npsp) from psps"), _H("so_psp", "input variable so_psp(npsp)"), _H("symafm", "input variable symafm(nsym)"), # _H(symrel="input variable symrel(3,3,nsym)", etsf_name="reduced_symmetry_matrices"), _H("typat", "input variable typat(natom)", etsf_name="atom_species"), _H("kptns", "input variable kptns(nkpt, 3)", etsf_name="reduced_coordinates_of_kpoints"), _H("occ", "EVOLVING variable occ(mband, nkpt, nsppol)", etsf_name="occupations"), _H("tnons", "input variable tnons(nsym, 3)", etsf_name="reduced_symmetry_translations"), _H("wtk", "weight of kpoints wtk(nkpt)", etsf_name="kpoint_weights"), _H("shiftk_orig", "original shifts given in input (changed in inkpts)."), _H("shiftk", "shiftk(3,nshiftk), shiftks after inkpts"), _H("amu", "amu(ntypat) ! EVOLVING variable"), # _H("xred", "EVOLVING variable xred(3,natom)", etsf_name="reduced_atom_positions"), _H("zionpsp", "zionpsp(npsp) from psps"), _H("znuclpsp", "znuclpsp(npsp) from psps. Note the difference between (znucl|znucltypat) and znuclpsp"), _H("znucltypat", "znucltypat(ntypat) from alchemy", etsf_name="atomic_numbers"), _H("codvsn", "version of the code"), _H("title", "title(npsp) from psps"), _H("md5_pseudos", "md5pseudos(npsp), md5 checksums associated to pseudos (read from file)"), # _H(type(pawrhoij_type), allocatable :: pawrhoij(:) ! EVOLVING variable, only for paw ) _HDR_VARIABLES = OrderedDict([(, h) for h in _HDR_VARIABLES]) class AbinitHeader(AttrDict): """Stores the values reported in the Abinit header.""" # def __init__(self, *args, **kwargs): # super().__init__(*args, **kwargs) # for k, v in self.items(): # v.__doc__ = _HDR_VARIABLES[k].doc def __str__(self): return self.to_string() def to_string(self, verbose=0, title=None, **kwargs): """ String representation. kwargs are passed to `pprint.pformat`. Args: verbose: Verbosity level title: Title string. """ from pprint import pformat s = pformat(self, **kwargs) if title is not None: return "\n".join([marquee(title, mark="="), s]) return s