Source code for

# coding: utf-8
# Copyright (c) Pymatgen Development Team.
# Distributed under the terms of the MIT License.
"""Wrapper for netCDF readers."""
from __future__ import unicode_literals, division, print_function

import os.path
import warnings
import numpy as np

from collections import OrderedDict
from import requires
from monty.collections import AttrDict
from monty.functools import lazy_property
from monty.string import marquee
from pymatgen.core.units import ArrayWithUnit
from pymatgen.core.xcfunc import XcFunc
from pymatgen.core.structure import Structure

import logging
logger = logging.getLogger(__name__)

__author__ = "Matteo Giantomassi"
__copyright__ = "Copyright 2013, The Materials Project"
__version__ = "0.1"
__maintainer__ = "Matteo Giantomassi"
__email__ = "gmatteo at"
__status__ = "Development"
__date__ = "$Feb 21, 2013M$"

__all__ = [

    import netCDF4
except ImportError as exc:
    netCDF4 = None
`import netCDF4` failed with the following error:


Please install netcdf4 with `conda install netcdf4`
If the conda version does not work, uninstall it with `conda uninstall hdf4 hdf5 netcdf4`
and use `pip install netcdf4`""" % str(exc))

def _asreader(file, cls):
    closeit = False
    if not isinstance(file, cls):
        file, closeit = cls(file), True
    return file, closeit

[docs]def as_ncreader(file): """ Convert file into a NetcdfReader instance. Returns reader, closeit where closeit is set to True if we have to close the file before leaving the procedure. """ return _asreader(file, NetcdfReader)
[docs]def as_etsfreader(file): return _asreader(file, ETSF_Reader)
class NetcdfReaderError(Exception): """Base error class for NetcdfReader"""
[docs]class NO_DEFAULT(object): """Signal that read_value should raise an Error"""
[docs]class NetcdfReader(object): """ Wraps and extends netCDF4.Dataset. Read only mode. Supports with statements. Additional documentation available at: """ Error = NetcdfReaderError @requires(netCDF4 is not None, "netCDF4 must be installed to use this class") def __init__(self, path): """Open the Netcdf file specified by path (read mode).""" self.path = os.path.abspath(path) try: self.rootgrp = netCDF4.Dataset(self.path, mode="r") except Exception as exc: raise self.Error("In file %s: %s" % (self.path, str(exc))) self.ngroups = len(list(self.walk_tree())) #self.path2group = OrderedDict() #for children in self.walk_tree(): # for child in children: # #print(, child.path) # self.path2group[child.path] = def __enter__(self): """Activated when used in the with statement.""" return self def __exit__(self, type, value, traceback): """Activated at the end of the with statement. It automatically closes the file.""" self.rootgrp.close()
[docs] def close(self): try: self.rootgrp.close() except Exception as exc: logger.warning("Exception %s while trying to close %s" % (exc, self.path))
[docs] def walk_tree(self, top=None): """ Navigate all the groups in the file starting from top. If top is None, the root group is used. """ if top is None: top = self.rootgrp values = top.groups.values() yield values for value in top.groups.values(): for children in self.walk_tree(value): yield children
[docs] def print_tree(self): for children in self.walk_tree(): for child in children: print(child)
[docs] def read_dimvalue(self, dimname, path="/", default=NO_DEFAULT): """ Returns the value of a dimension. Args: dimname: Name of the variable path: path to the group. default: return `default` if `dimname` is not present and `default` is not `NO_DEFAULT` else raise self.Error. """ try: dim = self._read_dimensions(dimname, path=path)[0] return len(dim) except self.Error: if default is NO_DEFAULT: raise return default
[docs] def read_varnames(self, path="/"): """List of variable names stored in the group specified by path.""" if path == "/": return self.rootgrp.variables.keys() else: group = self.path2group[path] return group.variables.keys()
[docs] def read_value(self, varname, path="/", cmode=None, default=NO_DEFAULT): """ Returns the values of variable with name varname in the group specified by path. Args: varname: Name of the variable path: path to the group. cmode: if cmode=="c", a complex ndarrays is constructed and returned (netcdf does not provide native support from complex datatype). default: returns default if varname is not present. self.Error is raised if default is default is NO_DEFAULT Returns: numpy array if varname represents an array, scalar otherwise. """ try: var = self.read_variable(varname, path=path) except self.Error: if default is NO_DEFAULT: raise return default if cmode is None: # scalar or array # getValue is not portable! try: return var.getValue()[0] if not var.shape else var[:] except IndexError: return var.getValue() if not var.shape else var[:] else: assert var.shape[-1] == 2 if cmode == "c": return var[...,0] + 1j*var[...,1] else: raise ValueError("Wrong value for cmode %s" % cmode)
[docs] def read_variable(self, varname, path="/"): """Returns the variable with name varname in the group specified by path.""" return self._read_variables(varname, path=path)[0]
def _read_dimensions(self, *dimnames, **kwargs): path = kwargs.get("path", "/") try: if path == "/": return [self.rootgrp.dimensions[dname] for dname in dimnames] else: group = self.path2group[path] return [group.dimensions[dname] for dname in dimnames] except KeyError: raise self.Error("In file %s:\nError while reading dimensions: `%s` with kwargs: `%s`" % (self.path, dimnames, kwargs)) def _read_variables(self, *varnames, **kwargs): path = kwargs.get("path", "/") try: if path == "/": return [self.rootgrp.variables[vname] for vname in varnames] else: group = self.path2group[path] return [group.variables[vname] for vname in varnames] except KeyError: raise self.Error("In file %s:\nError while reading variables: `%s` with kwargs `%s`." % (self.path, varnames, kwargs))
[docs] def read_keys(self, keys, dict_cls=AttrDict, path="/"): """ Read a list of variables/dimensions from file. If a key is not present the corresponding entry in the output dictionary is set to None. """ od = dict_cls() for k in keys: try: # Try to read a variable. od[k] = self.read_value(k, path=path) except self.Error: try: # Try to read a dimension. od[k] = self.read_dimvalue(k, path=path) except self.Error: od[k] = None return od
[docs]class ETSF_Reader(NetcdfReader): """ This object reads data from a file written according to the ETSF-IO specifications. We assume that the netcdf file contains at least the crystallographic section. """
[docs] @lazy_property def chemical_symbols(self): """Chemical symbols char [number of atom species][symbol length].""" charr = self.read_value("chemical_symbols") symbols = [] for v in charr: s = "".join(c.decode("utf-8") for c in v) symbols.append(s.strip()) return symbols
[docs] def typeidx_from_symbol(self, symbol): """Returns the type index from the chemical symbol. Note python convention.""" return self.chemical_symbols.index(symbol)
[docs] def read_structure(self, cls=Structure): """Returns the crystalline structure.""" if self.ngroups != 1: raise NotImplementedError("In file %s: ngroups != 1" % self.path) return structure_from_ncdata(self, cls=cls)
[docs] def read_abinit_xcfunc(self): """ Read ixc from an Abinit file. Return :class:`XcFunc` object. """ ixc = int(self.read_value("ixc")) return XcFunc.from_abinit_ixc(ixc)
[docs] def read_abinit_hdr(self): """ Read the variables associated to the Abinit header. Return :class:`AbinitHeader` """ d = {} for hvar in _HDR_VARIABLES.values(): ncname = hvar.etsf_name if hvar.etsf_name is not None else if ncname in self.rootgrp.variables: d[] = self.read_value(ncname) elif ncname in self.rootgrp.dimensions: d[] = self.read_dimvalue(ncname) else: raise ValueError("Cannot find `%s` in `%s`" % (ncname, self.path)) # Convert scalars to (well) scalars. if hasattr(d[], "shape") and not d[].shape: d[] = np.asscalar(d[]) if in ("title", "md5_pseudos", "codvsn"): # Convert array of numpy bytes to list of strings if == "codvsn": d[] = "".join(bs.decode("utf-8").strip() for bs in d[]) else: d[] = ["".join(bs.decode("utf-8") for bs in astr).strip() for astr in d[]] return AbinitHeader(d)
[docs]def structure_from_ncdata(ncdata, site_properties=None, cls=Structure): """ Reads and returns a pymatgen structure from a NetCDF file containing crystallographic data in the ETSF-IO format. Args: ncdata: filename or NetcdfReader instance. site_properties: Dictionary with site properties. cls: The Structure class to instanciate. """ ncdata, closeit = as_ncreader(ncdata) # TODO check whether atomic units are used lattice = ArrayWithUnit(ncdata.read_value("primitive_vectors"), "bohr").to("ang") red_coords = ncdata.read_value("reduced_atom_positions") natom = len(red_coords) znucl_type = ncdata.read_value("atomic_numbers") # type_atom[0:natom] --> index Between 1 and number of atom species type_atom = ncdata.read_value("atom_species") # Fortran to C index and float --> int conversion. species = natom * [None] for atom in range(natom): type_idx = type_atom[atom] - 1 species[atom] = int(znucl_type[type_idx]) d = {} if site_properties is not None: for prop in site_properties: d[property] = ncdata.read_value(prop) structure = cls(lattice, species, red_coords, site_properties=d) # Quick and dirty hack. # I need an abipy structure since I need to_abivars and other methods. try: from abipy.core.structure import Structure as AbipyStructure structure.__class__ = AbipyStructure except ImportError: pass if closeit: ncdata.close() return structure
class _H(object): __slots__ = ["name", "doc", "etsf_name"] def __init__(self, name, doc, etsf_name=None):, self.doc, self.etsf_name = name, doc, etsf_name _HDR_VARIABLES = ( # Scalars _H("bantot", "total number of bands (sum of nband on all kpts and spins)"), _H("date", "starting date"), _H("headform", "format of the header"), _H("intxc", "input variable"), _H("ixc", "input variable"), _H("mband", "maxval(hdr%nband)", etsf_name="max_number_of_states"), _H("natom", "input variable", etsf_name="number_of_atoms"), _H("nkpt", "input variable", etsf_name="number_of_kpoints"), _H("npsp", "input variable"), _H("nspden", "input variable", etsf_name="number_of_components"), _H("nspinor", "input variable", etsf_name="number_of_spinor_components"), _H("nsppol", "input variable", etsf_name="number_of_spins"), _H("nsym", "input variable", etsf_name="number_of_symmetry_operations"), _H("ntypat", "input variable", etsf_name="number_of_atom_species"), _H("occopt", "input variable"), _H("pertcase", "the index of the perturbation, 0 if GS calculation"), _H("usepaw", "input variable (0=norm-conserving psps, 1=paw)"), _H("usewvl", "input variable (0=plane-waves, 1=wavelets)"), _H("kptopt", "input variable (defines symmetries used for k-point sampling)"), _H("pawcpxocc", "input variable"), _H("nshiftk_orig", "original number of shifts given in input (changed in inkpts, the actual value is nshiftk)"), _H("nshiftk", "number of shifts after inkpts."), _H("icoulomb", "input variable."), _H("ecut", "input variable", etsf_name="kinetic_energy_cutoff"), _H("ecutdg", "input variable (ecut for NC psps, pawecutdg for paw)"), _H("ecutsm", "input variable"), _H("ecut_eff", "ecut*dilatmx**2 (dilatmx is an input variable)"), _H("etot", "EVOLVING variable"), _H("fermie", "EVOLVING variable", etsf_name="fermi_energy"), _H("residm", "EVOLVING variable"), _H("stmbias", "input variable"), _H("tphysel", "input variable"), _H("tsmear", "input variable"), _H("nelect", "number of electrons (computed from pseudos and charge)"), _H("charge", "input variable"), # Arrays _H("qptn", "qptn(3) the wavevector, in case of a perturbation"), #_H("rprimd", "rprimd(3,3) EVOLVING variables", etsf_name="primitive_vectors"), #_H(ngfft, "ngfft(3) input variable", number_of_grid_points_vector1" #_H("nwvlarr", "nwvlarr(2) the number of wavelets for each resolution.", etsf_name="number_of_wavelets"), _H("kptrlatt_orig", "kptrlatt_orig(3,3) Original kptrlatt"), _H("kptrlatt", "kptrlatt(3,3) kptrlatt after inkpts."), _H("istwfk", "input variable istwfk(nkpt)"), _H("lmn_size", "lmn_size(npsp) from psps"), _H("nband", "input variable nband(nkpt*nsppol)", etsf_name="number_of_states"), _H("npwarr", "npwarr(nkpt) array holding npw for each k point", etsf_name="number_of_coefficients"), _H("pspcod", "pscod(npsp) from psps"), _H("pspdat", "psdat(npsp) from psps"), _H("pspso", "pspso(npsp) from psps"), _H("pspxc", "pspxc(npsp) from psps"), _H("so_psp", "input variable so_psp(npsp)"), _H("symafm", "input variable symafm(nsym)"), #_H(symrel="input variable symrel(3,3,nsym)", etsf_name="reduced_symmetry_matrices"), _H("typat", "input variable typat(natom)", etsf_name="atom_species"), _H("kptns", "input variable kptns(nkpt, 3)", etsf_name="reduced_coordinates_of_kpoints"), _H("occ", "EVOLVING variable occ(mband, nkpt, nsppol)", etsf_name="occupations"), _H("tnons", "input variable tnons(nsym, 3)", etsf_name="reduced_symmetry_translations"), _H("wtk", "weight of kpoints wtk(nkpt)", etsf_name="kpoint_weights"), _H("shiftk_orig", "original shifts given in input (changed in inkpts)."), _H("shiftk", "shiftk(3,nshiftk), shiftks after inkpts"), _H("amu", "amu(ntypat) ! EVOLVING variable"), #_H("xred", "EVOLVING variable xred(3,natom)", etsf_name="reduced_atom_positions"), _H("zionpsp", "zionpsp(npsp) from psps"), _H("znuclpsp", "znuclpsp(npsp) from psps. Note the difference between (znucl|znucltypat) and znuclpsp"), _H("znucltypat", "znucltypat(ntypat) from alchemy", etsf_name="atomic_numbers"), _H("codvsn", "version of the code"), _H("title", "title(npsp) from psps"), _H("md5_pseudos", "md5pseudos(npsp), md5 checksums associated to pseudos (read from file)"), #_H(type(pawrhoij_type), allocatable :: pawrhoij(:) ! EVOLVING variable, only for paw ) _HDR_VARIABLES = OrderedDict([(, h) for h in _HDR_VARIABLES]) class AbinitHeader(AttrDict): """Stores the values reported in the Abinit header.""" #def __init__(self, *args, **kwargs): # super(AbinitHeader, self).__init__(*args, **kwargs) # for k, v in self.items(): # v.__doc__ = _HDR_VARIABLES[k].doc def __str__(self): return self.to_string() def to_string(self, verbose=0, title=None, **kwargs): """ String representation. kwargs are passed to `pprint.pformat`. Args: verbose: Verbosity level title: Title string. """ from pprint import pformat s = pformat(self, **kwargs) if title is not None: return "\n".join([marquee(title, mark="="), s]) return s