Source code for pymatgen.cli.pmg_analyze

# coding: utf-8
# Copyright (c) Pymatgen Development Team.
# Distributed under the terms of the MIT License.


"""
Implementation for `pmg analyze` CLI.
"""

import os
import re
import logging
import multiprocessing

from tabulate import tabulate

from pymatgen.io.vasp import Outcar
from pymatgen.apps.borg.hive import SimpleVaspToComputedEntryDrone, \
    VaspToComputedEntryDrone
from pymatgen.apps.borg.queen import BorgQueen

__author__ = "Shyue Ping Ong"
__copyright__ = "Copyright 2012, The Materials Project"
__version__ = "4.0"
__maintainer__ = "Shyue Ping Ong"
__email__ = "ongsp@ucsd.edu"
__date__ = "Aug 13 2016"

SAVE_FILE = "vasp_data.gz"


[docs]def get_energies(rootdir, reanalyze, verbose, quick, sort, fmt): """ Get energies of all vaspruns in directory (nested). Args: rootdir (str): Root directory. reanalyze (bool): Whether to ignore saved results and reanalyze verbose (bool): Verbose mode or not. quick (bool): Whether to perform a quick analysis (using OSZICAR instead of vasprun.xml sort (bool): Whether to sort the results in ascending order. fmt (str): tablefmt passed to tabulate. """ if verbose: logformat = "%(relativeCreated)d msecs : %(message)s" logging.basicConfig(level=logging.INFO, format=logformat) if quick: drone = SimpleVaspToComputedEntryDrone(inc_structure=True) else: drone = VaspToComputedEntryDrone(inc_structure=True, data=["filename", "initial_structure"]) ncpus = multiprocessing.cpu_count() logging.info("Detected {} cpus".format(ncpus)) queen = BorgQueen(drone, number_of_drones=ncpus) if os.path.exists(SAVE_FILE) and not reanalyze: msg = "Using previously assimilated data from {}.".format(SAVE_FILE) \ + " Use -r to force re-analysis." queen.load_data(SAVE_FILE) else: if ncpus > 1: queen.parallel_assimilate(rootdir) else: queen.serial_assimilate(rootdir) msg = "Analysis results saved to {} for faster ".format(SAVE_FILE) + \ "subsequent loading." queen.save_data(SAVE_FILE) entries = queen.get_data() if sort == "energy_per_atom": entries = sorted(entries, key=lambda x: x.energy_per_atom) elif sort == "filename": entries = sorted(entries, key=lambda x: x.data["filename"]) all_data = [] for e in entries: if quick: delta_vol = "NA" else: delta_vol = e.structure.volume / \ e.data["initial_structure"].volume - 1 delta_vol = "{:.2f}".format(delta_vol * 100) all_data.append((e.data["filename"].replace("./", ""), re.sub(r"\s+", "", e.composition.formula), "{:.5f}".format(e.energy), "{:.5f}".format(e.energy_per_atom), delta_vol)) if len(all_data) > 0: headers = ("Directory", "Formula", "Energy", "E/Atom", "% vol chg") print(tabulate(all_data, headers=headers, tablefmt=fmt)) print("") print(msg) else: print("No valid vasp run found.") os.unlink(SAVE_FILE) return 0
[docs]def get_magnetizations(mydir, ion_list): """ Get magnetization info from OUTCARs. Args: mydir (str): Directory name ion_list (List): List of ions to obtain magnetization information for. Returns: """ data = [] max_row = 0 for (parent, subdirs, files) in os.walk(mydir): for f in files: if re.match(r"OUTCAR*", f): try: row = [] fullpath = os.path.join(parent, f) outcar = Outcar(fullpath) mags = outcar.magnetization mags = [m["tot"] for m in mags] all_ions = list(range(len(mags))) row.append(fullpath.lstrip("./")) if ion_list: all_ions = ion_list for ion in all_ions: row.append(str(mags[ion])) data.append(row) if len(all_ions) > max_row: max_row = len(all_ions) except Exception: pass for d in data: if len(d) < max_row + 1: d.extend([""] * (max_row + 1 - len(d))) headers = ["Filename"] for i in range(max_row): headers.append(str(i)) print(tabulate(data, headers)) return 0
[docs]def analyze(args): """ Master function controlling which analysis to call. Args: args (dict): args from argparse. """ default_energies = not (args.get_energies or args.ion_list) if args.get_energies or default_energies: for d in args.directories: return get_energies(d, args.reanalyze, args.verbose, args.quick, args.sort, args.format) if args.ion_list: if args.ion_list[0] == "All": ion_list = None else: (start, end) = [int(i) for i in re.split(r"-", args.ion_list[0])] ion_list = list(range(start, end + 1)) for d in args.directories: return get_magnetizations(d, ion_list) return -1