Source code for lineshape_tools.cli

"""Contains functionality for the command line interface."""

from __future__ import annotations

import logging
import sys
import warnings
from importlib.util import find_spec
from pathlib import Path
from typing import TYPE_CHECKING, Annotated

import ase.io
import numpy as np
from cyclopts import Parameter
from tqdm import tqdm

from lineshape_tools.constants import omega2eV
from lineshape_tools.lineshape import convert_A_to_L, gaussian, get_phonon_spec_func, get_Stot
from lineshape_tools.phonon import get_disp_vect, get_ipr, get_phonons
from lineshape_tools.plot import plot_spec_funcs

if TYPE_CHECKING:
    from ase.atoms import Atoms
    from mace.calculators import MACECalculator

logger = logging.getLogger(__name__)


[docs] def collect( files: Annotated[list[Path], Parameter(negative="")], output_file: Path = Path("./database.extxyz"), strategy: str = "none", read_index: str = ":", max_force: float = 2.0, min_force: float = -np.inf, dx_tol: float = 0.1, rtol: float = 1e-5, config_weight: float = 1.0, force_weighting: bool = False, ) -> None: """Collect and process data for fine-tuning. Collect files into an extxyz database that can be used for fine-tuning. An optional filtering strategy can be applied. This is potentially useful if relaxation data is being included in the dataset, as it can be noisey from having multiple closely spaced geometries close to the equilibrium geomtry. By default, configurations with too large forces will be thrown away to avoid potential anharmonic contributions to the PES. Args: files (list): list of paths to files that are parseable by ase.io. The files should contain atomic geometries, total energies, and forces at a minimum (for example, vasprun.xml). output_file (Path): optional path where data is written (output to stdout by default) strategy (str): optional specification of strategy to be used for filtering. Available options are 'none', 'qr', or 'dx'. read_index (str): pythonic index passed to ase.io.read to determine which structures are read from the input files (the same value is used for each file). The default ":" reads all of the structures, while ":3" would read the first three for example. max_force (float): remove structures where the maximum force acting on any atom is above the specified value (in eV/Å) min_force (float): remove structures where the maximum force acting on any atom is below the specified value (in eV/Å) dx_tol (float): tolerance (in Å) for how far atoms must move to accept configuration in 'dx' filtering strategy rtol (float): tolerance ratio for determining rank of displacement vectors in 'qr' strategy config_weight (float): set the configuration weight for training force_weighting (bool): store a config_weight that's inversely proportional to the max force that any atom feels in the configuration [min(0.02 / max_fpa, 1)]. Overwrites the value specified by config_weight. """ if strategy.lower() not in ("none", "qr", "dx"): raise ValueError("invalid strategy choice") if output_file.exists(): logger.info(f"{output_file} already exists, appending to it") total_read = 0 all_atoms = [] for fname in tqdm(files, desc="[*] reading files", disable=len(files) < 2): read_atoms = ase.io.read(fname, read_index) total_read += len(read_atoms) for atoms in tqdm(read_atoms, desc="[*] processing atoms", disable=len(read_atoms) < 10): forces = atoms.get_forces() if min_force < np.linalg.norm(forces, axis=1).max() < max_force: atoms.info["REF_energy"] = atoms.get_potential_energy() atoms.new_array("REF_forces", forces) atoms.calc = None atoms.info["config_weight"] = config_weight if force_weighting: atoms.info["config_weight"] = min( 0.02 / np.linalg.norm(forces, axis=1).max(), 1 ) if (cw := atoms.info["config_weight"]) < 0.02: logger.warning(f"small config weight found ({cw})") all_atoms.append(atoms) logger.info( f"read {total_read} configurations and discarded {total_read - len(all_atoms)} based" " on forces criteria" ) filtered_atoms = [] if strategy.lower() == "qr": from scipy.linalg import qr logger.warning("qr strategy assumes the last read structure is the equilibrium geometry") if np.linalg.norm(all_atoms[-1].arrays["REF_forces"], axis=1).max() > 0.02: warnings.warn("large forces found in last structure", stacklevel=0) sqrt_mass = np.repeat(np.sqrt(all_atoms[-1].get_masses()), 3) dq = np.zeros((sqrt_mass.shape[0], len(all_atoms) - 1), dtype=np.float64) for atoms_i, atoms in enumerate(tqdm(all_atoms[:-1], desc="[*] computing dq")): dx = get_disp_vect(atoms, all_atoms[-1]) dq[:, atoms_i] = sqrt_mass * dx logger.info("computing pivoted QR factorization") _, R, P = qr(dq, mode="economic", pivoting=True) rdiag = np.diag(R) rank = np.sum(np.abs(rdiag) > rtol * rdiag.max()) filtered_atoms = [all_atoms[-1]] + [all_atoms[atoms_i] for atoms_i in np.sort(P[:rank])] elif strategy.lower() == "dx": filtered_atoms.append(all_atoms[0]) for atoms in tqdm(all_atoms[1:], desc="[*] dx filtering"): dx = get_disp_vect(filtered_atoms[-1], atoms) if np.linalg.norm(dx) > dx_tol: filtered_atoms.append(atoms) else: filtered_atoms = all_atoms logger.info(f"filtered {len(all_atoms) - len(filtered_atoms)} configurations from dataset") with open(output_file, "a") as f: for atoms in tqdm(filtered_atoms, desc="[*] writing"): ase.io.write(f, atoms, format="extxyz")
[docs] def get_force_opt_modes( n: int, omega2: np.ndarray, U: np.ndarray, sqrt_mass: np.ndarray, F: float = 0.5, tol: float = 1e-6, seed: int = 897689932, start_with_min_spread: bool = False, save_plot: bool = False, ) -> tuple[np.ndarray, np.ndarray]: """Bin phonons by energy and find a vector in the subspace that minimizes the spread in forces. Args: n (int): number of modes to select omega2 (np.ndarray): frequencies squared of the modes (directly from np.linalg.eigh) U (np.ndarray): matrix where eigenvectors of modes are cols (directly from np.linalg.eigh) sqrt_mass (np.ndarray): sqrt of the vector of atomic masses F (float): target forces to optimize amplitudes for tol (float): convergence tolerance for scipy minimize call seed (int): seed value for random number generator start_with_min_spread (bool): determines if mode with smallest force spread is used as the starting point for optimization. Uses a random vector otherwise. save_plot (bool): save a plot for analyzing resulting modes Returns: modes (np.ndarray): generated modes as columns of the matrix mode_dqs (np.ndarray): optimized displacement amplitudes following above criteria """ inv_sqrt_mass = 1 / sqrt_mass # acoustic phonon filtering omega2[omega2 < (0.0005 / omega2eV) ** 2] = 0.0 opt_path, opt_info = np.einsum_path("i,ij,j,j->i", sqrt_mass, U, omega2, np.ones(U.shape[0])) logger.debug(opt_info) def loss(x, ind=None): if ind is not None: tx = np.zeros(U.shape[0]) tx[ind] = x else: tx = x Fx = np.einsum("i,ij,j,j->i", sqrt_mass, U, omega2, tx, optimize=opt_path) Fx = np.linalg.norm(Fx.reshape((-1, 3)), axis=1) return np.mean((Fx - Fx.mean()) ** 2) def constr_f(x): return np.sum(x**2) def constr_J(x): return 2 * x def constr_H(x, v): return np.diag(2 * v[0] * np.ones(x.shape[0])) def log_callback(intermediate_result): ir = intermediate_result logger.debug(f"{ir.nit:04d} | {ir.fun:.06e} | {ir.optimality:.06e}") from scipy.optimize import NonlinearConstraint, minimize norm_constr = NonlinearConstraint(constr_f, 1, 1, jac=constr_J, hess=constr_H) rng = np.random.default_rng(seed) modes = np.zeros((U.shape[0], n), dtype=np.float64) mode_dqs = np.zeros(n, dtype=np.float64) freqs = np.empty(n) subspace_inds = np.array_split(np.arange(3, U.shape[0]), n) for imode, ind in enumerate(tqdm(subspace_inds, desc="[*] optimizing modes")): subs_spread = np.empty(ind.shape[0]) for i, ind_i in enumerate(ind): Fx = np.linalg.norm((sqrt_mass * omega2[ind_i] * U[:, ind_i]).reshape((-1, 3)), axis=1) subs_spread[i] = np.mean((Fx - Fx.mean()) ** 2) if start_with_min_spread: imin = np.argmin(subs_spread) logger.info( f"starting from mode {ind[imin]} " f"with frequency {1000 * omega2eV * np.sqrt(omega2[ind[imin]]):.02f} meV " f"and spread {subs_spread[imin]:.06e}" ) x0 = np.zeros(ind.shape[0]) x0[imin] = 1.0 else: x0 = rng.random(ind.shape[0]) - 0.5 x0 /= np.linalg.norm(x0) logger.debug(" nit | loss | optimality") res = minimize( lambda x: loss(x, ind=ind), # noqa: B023 x0, tol=tol, method="trust-constr", constraints=[norm_constr], callback=log_callback, ) if not res.success: logger.warning(f"optimization failed - {res.message}") logger.debug(f"final spread {res.fun}, subspace spread {subs_spread}") if not np.all(res.fun < subs_spread): logger.warning("optimization failed to find a vector with smaller spread") modes[ind, imode] = res.x freqs[imode] = omega2eV * np.sqrt(modes[:, imode] @ np.diag(omega2) @ modes[:, imode]) logger.debug(f"found mode with frequency {1000 * freqs[imode]:.02f} meV") Fx = np.linalg.norm( np.einsum("i,ij,j,j->i", sqrt_mass, U, omega2, modes[:, imode]).reshape((-1, 3)), axis=1, ) mode_dqs[imode] = F / Fx.max() dx = np.linalg.norm( (inv_sqrt_mass * mode_dqs[imode] * (U @ modes[:, imode])).reshape((-1, 3)), axis=1 ) if dx.max() > 0.05: new_dq = (0.05 / dx.max()) * mode_dqs[imode] logger.warning( f"max displacement too large ({dx.max()} > 0.05), " f"resetting dq {mode_dqs[imode]} -> {new_dq}" ) mode_dqs[imode] = new_dq elif dx.max() < 0.005: new_dq = (0.005 / dx.max()) * mode_dqs[imode] logger.warning( f"max displacement too small ({dx.max()} < 0.005), " f"resetting dq {mode_dqs[imode]} -> {new_dq}" ) mode_dqs[imode] = new_dq if save_plot: w = np.linspace(0, 0.1, 1000) dos = np.zeros_like(w) for i in range(U.shape[0]): dos += gaussian(w - omega2eV * np.sqrt(omega2[i]), 0.001) dos /= U.shape[0] import matplotlib.pyplot as plt fig, ax = plt.subplots(figsize=(4, 3)) ax.plot(w, dos, color="k") ax.fill_between(w, dos, color="k", alpha=0.2, lw=0) for imode in range(n): pdos = np.zeros_like(w) for i in range(U.shape[0]): pdos += modes[i, imode] ** 2 * gaussian(w - omega2eV * np.sqrt(omega2[i]), 0.001) p = ax.plot(w, pdos / n, lw=1) ax.axvline(x=freqs[imode], color=p[0].get_color(), alpha=0.2) ax.set_xlabel("Energy [eV]") ax.set_ylabel("Density of States") plt.savefig("pdos.png", dpi=600, bbox_inches="tight") return U @ modes, mode_dqs
[docs] def get_random_phonons( n: int, omega2: np.ndarray, U: np.ndarray, sqrt_mass: np.ndarray, F: float = 0.5, seed: int = 897689932, ) -> tuple[np.ndarray, np.ndarray]: """Bin phonons by energy and select one randomly from each bin. The amplitude of each phonon is chosen to produce a max force per atom as close to F as possible. The max displacement on a given atom is kept within a reasonable range (0.005, 0.05). Args: n (int): number of modes to select omega2 (np.ndarray): frequencies squared of the modes (directly from np.linalg.eigh) U (np.ndarray): matrix where eigenvectors of modes are cols (directly from np.linalg.eigh) sqrt_mass (np.ndarray): sqrt of the vector of atomic masses F (float): target forces to optimize amplitudes for seed (int): seed value for random number generator Returns: modes (np.ndarray): generated modes as columns of the matrix mode_dqs (np.ndarray): optimized displacement amplitudes following above criteria """ inv_sqrt_mass = 1 / sqrt_mass # acoustic phonon filtering omega2[omega2 < (0.0005 / omega2eV) ** 2] = 0.0 modes = np.zeros((U.shape[0], n), dtype=np.float64) mode_dqs = np.empty(n, dtype=np.float64) rng = np.random.default_rng(seed) # starting from 3 to skip acoustic phonons subspace_inds = np.array_split(np.arange(3, U.shape[0]), n) for i, inds in enumerate(tqdm(subspace_inds, desc="[*] selecting random modes")): imode = inds[rng.integers(inds.shape[0])] modes[:, i] = U[:, imode] Fx = np.linalg.norm((sqrt_mass * omega2[imode] * U[:, imode]).reshape((-1, 3)), axis=1) mode_dqs[i] = F / Fx.max() dx = np.linalg.norm((mode_dqs[i] * inv_sqrt_mass * U[:, imode]).reshape((-1, 3)), axis=1) if dx.max() > 0.05: new_dq = (0.05 / dx.max()) * mode_dqs[i] logger.warning( f"max displacement too large ({dx.max()} > 0.05), " f"resetting dq {mode_dqs[i]} -> {new_dq}" ) mode_dqs[i] = new_dq elif dx.max() < 0.005: new_dq = (0.005 / dx.max()) * mode_dqs[i] logger.warning( f"max displacement too small ({dx.max()} < 0.005), " f"resetting dq {mode_dqs[i]} -> {new_dq}" ) mode_dqs[i] = new_dq return modes, mode_dqs
[docs] def gen_confs( struct_path: Path, num_conf: int, strategy: str = "rand", output_dir: Path = Path("./confs"), accepting_mode: Path | None = None, dynmat_file: Path | None = None, orthogonalize: bool = False, default_max_dx: float = 0.015, start_with_min_spread: bool = False, opt_tol: float = 1e-6, seed: int = 897689932, ) -> None: """Generate additional configurations to enhance fine-tuning dataset. Args: struct_path (Path): path to file containing structure that will be displaced num_conf (int): total number of additional configurations to generate strategy (str): strategy used to generate the additional configurations. Available options are 'rand', 'phon_rand', and 'phon_opt'. output_dir (Path): output directory where the new configurations will be written to accepting_mode (Path): path to file containing the structure that defines the accepting mode. For example, if struct_path refers to the ground-state equilibrium geometry, then accepting_mode should refer to the excited-state equilibrium geometry and vice versa. dynmat_file (Path): path to the .npz file containing the dynamical matrix presumably calculated using the "compute-dynmat" function. orthogonalize (bool): perform Gram-Schmidt orthogonalization at the last step default_max_dx (float): default value for max displacement of a given atom start_with_min_spread (bool): determines if mode with smallest force spread is used as the starting point for optimization in phon_opt strategy. Uses a random vector otherwise. opt_tol (float): convergence tolerance for the call to scipy minimize in the phon_opt strat seed (int): seed value for random number generator """ if strategy.lower() not in ("rand", "phon_rand", "phon_opt"): raise ValueError("invalid strategy choice") if output_dir.exists(): raise ValueError("output directory already exists") struct: Atoms = ase.io.read(struct_path) # type: ignore[assignment] sqrt_mass = np.repeat(np.sqrt(struct.get_masses()), 3) # makes written poscars ugly and adds unnecessary data if struct.has("momenta"): del struct.arrays["momenta"] imode = 0 modes = np.zeros((3 * len(struct), num_conf), dtype=np.float64) mode_dqs = np.zeros(num_conf, dtype=np.float64) if accepting_mode is not None: am_struct: Atoms = ase.io.read(accepting_mode) # type: ignore[assignment] dx = get_disp_vect(struct, am_struct) dq = sqrt_mass * dx logger.info(f"accepting mode dQ={np.linalg.norm(dq)} amu^0.5 Å") modes[:, 0] = dq / np.linalg.norm(dq) mode_dqs[0] = ( default_max_dx / np.linalg.norm((modes[:, 0] / sqrt_mass).reshape((-1, 3)), axis=1).max() ) imode += 1 if strategy.lower()[:4] == "phon": logger.info("working in phonon basis") if dynmat_file is None: raise ValueError("dynamical matrix file is needed to compute phonons") logger.info(f"reading dynamical matrix from {dynmat_file}") data = np.load(dynmat_file) H = data["H"] if not np.allclose(sqrt_mass, data["sqrt_mass"], rtol=1e-4): logger.debug(sqrt_mass, data["sqrt_mass"]) raise ValueError("sqrt_mass from struct is not compatible with dynamical matrix file") # diagonalize dynamical matrix omega2, U = np.linalg.eigh(H) if strategy.lower() == "phon_opt": logger.info("generating force-optimized phonons") modes[:, imode:], mode_dqs[imode:] = get_force_opt_modes( num_conf - imode, omega2, U, sqrt_mass, tol=opt_tol, start_with_min_spread=start_with_min_spread, seed=seed, ) else: logger.info("selecting phonons randomly") modes[:, imode:], mode_dqs[imode:] = get_random_phonons( num_conf - imode, omega2, U, sqrt_mass, seed=seed, ) else: logger.info("generating random modes") from scipy.linalg import qr rng = np.random.default_rng(seed) modes[:, imode:], _, _ = qr( rng.random((modes.shape[0], num_conf - imode)) - 0.5, mode="economic", pivoting=True ) for i in range(imode, num_conf): mode_dqs[i] = ( default_max_dx / np.linalg.norm((modes[:, i] / sqrt_mass).reshape((-1, 3)), axis=1).max() ) if orthogonalize: from scipy.linalg import qr logger.info("performing final Gram-Schmidt orthogonalization of all modes") modes, _ = qr(modes, mode="economic") logger.info("recomputing displacement amplitude after orthogonalization") for i in range(num_conf): mode_dqs[i] = ( default_max_dx / np.linalg.norm((modes[:, i] / sqrt_mass).reshape((-1, 3)), axis=1).max() ) # write to output directory for imode in tqdm(range(num_conf), desc="[*] writing structures"): dx = mode_dqs[imode] * modes[:, imode] / sqrt_mass logger.debug( f"mode {imode} - max displacement " f"{np.linalg.norm(dx.reshape((-1, 3)), axis=1).max():.06f} Å " f"{np.abs(dx).max():.06f} Å" ) atoms = struct.copy() atoms.positions += dx.reshape((-1, 3)) fname = output_dir / f"{imode}/POSCAR" fname.parent.mkdir(parents=True) ase.io.write(fname, atoms, format="vasp", direct=True) # store invoking command with open(output_dir / "cmd.txt", "w") as f: f.write(" ".join(sys.argv) + "\n")
[docs] def reestimate_e0s_linear_system( calculator: MACECalculator, database_atoms: list[Atoms], elements: list | None = None, initial_e0s: dict | None = None, ) -> dict: """Estimate atomic reference energies (E0s) by solving a linear system. Notes: Slightly adapted from code by Noam Bernstein based on private communications with Ilyes Batatia and Joe Hart. This functionality will eventually be removed once merged into MACE. Args: calculator (MACECalculator): Calculator object for the MACE model. database_atoms (list): List of ase Atoms objects with energy and atomic_numbers. elements (list): List of element atomic numbers to consider, default to set present in database_atoms. initial_e0s (dict): Dictionary mapping element atomic numbers to E0 values, default to values returned by foundation_model for isolated atom configs> Returns: Dictionary with re-estimated E0 values for each element """ # filter configs without energy database_atoms = [ atoms for atoms in database_atoms if atoms.info.get("REF_energy") is not None ] if len(database_atoms) == 0: raise ValueError("database does not contain REF_energy tag") if not elements: elements = np.unique([Z for atoms in database_atoms for Z in atoms.numbers]).tolist() if not initial_e0s: initial_e0s = {Z: 0.0 for Z in elements} try: for Z in elements: for i in range(len(calculator.models)): z_ind = calculator.z_table.z_to_index(Z) initial_e0s[Z] += float( calculator.models[i].atomic_energies_fn.atomic_energies[z_ind] ) except Exception as e: logger.warning(f"unexpected exception in getting initial E0s: {e}") logger.warning("falling back to explicit isolated atom calculations") from ase.atoms import Atoms for Z in elements: calculator.calculate( atoms=Atoms(numbers=[Z], cell=[20] * 3, pbc=[True] * 3), properties=["energy"] ) initial_e0s[Z] = calculator.results.get("energy") logger.info(f"using initial E0s: {initial_e0s}") # A matrix: each row contains atom counts for each element # b vector: each entry is the prediction error for a configuration A = np.zeros((len(database_atoms), len(elements))) b = np.zeros(len(database_atoms)) logger.info( f"solving linear system with {len(database_atoms)} equations and {len(elements)} unknowns" ) # - A[i,j] is the count of element j in configuration i # - b[i] is the error (true - predicted) for configuration i # - x[j] will be the energy correction for element j for i, atoms in enumerate(tqdm(database_atoms, desc="[*] foundation model predictions")): calculator.calculate(atoms=atoms.copy(), properties=["energy"]) b[i] = atoms.info["REF_energy"] - calculator.results.get("energy") # atom counts for each element for j, element in enumerate(elements): A[i, j] = np.sum(atoms.get_atomic_numbers() == element) # solve with least squares try: corrections, _, rank, s = np.linalg.lstsq(A, b, rcond=None) except np.linalg.LinAlgError as e: logger.warning(f"error using lstsq to solve the linear system: {e}") logger.warning("falling back to foundation model E0s") return initial_e0s.copy() if np.linalg.norm(corrections) > 1e6: logger.critical( f"abnormally large corrections found, rank determination may have failed: {s}" ) new_e0s = {} for i, element in enumerate(elements): new_e0s[element] = initial_e0s[element] + corrections[i] logger.debug( f"element {element}: foundation E0 = {initial_e0s[element]:.4f}, " f"correction = {corrections[i]:.4f}, new E0 = {new_e0s[element]:.4f}" ) # statistics about the fit b_after = b - A @ corrections mse_before, mse_after = np.mean(b**2), np.mean(b_after**2) improvement = (1 - mse_after / mse_before) * 100 logger.debug(f"mean squared error before correction: {mse_before:.4f} eV²") logger.debug(f"mean squared error after correction: {mse_after:.4f} eV²") logger.debug(f"improvement: {improvement:.1f}%") if rank < len(elements): logger.warning(f"system is rank deficient (rank {rank}/{len(elements)})") logger.warning( "some elements may be linearly dependent or not well represented in the dataset." ) return new_e0s
[docs] def gen_ft_config( out: Path | str = "./config.default", estimate_e0s: bool = False, device: str = "cuda", name: str = "fine-tuned", mace_model: str = "medium-omat-0", database: Path | str = "./database.extxyz", head: str = "default", ) -> None: """Generate a configuration file for mace_run_train. Args: out (Path): path where the mace_run_train configuration file is written to. estimate_e0s (bool): estimate the E0s for training of the foundation model. device (str): device string passed to MACE to determine where calculation is performed. name (str): name of the model. mace_model (str): pre-trained MACE model that is used, can be a local path database (Path): path to the training dataset file (likely generated with :func:`collect`) head (str): which head from the model to use for prediction """ e0s: str | dict if Path(out).exists(): raise ValueError(f"Output path {out} already exists!") if not estimate_e0s: logger.warning(f'Please update the "E0s" tag in {out} prior to running mace_run_train.') e0s = "TO_BE_REPLACED" else: from mace.calculators import mace_mp calc = mace_mp( model=mace_model, dispersion=False, default_dtype="float64", device=device, enable_cueq=(device == "cuda" and find_spec("cuequivariance") is not None), head=head, ) logger.info("running E0 estimation") db_atoms: list[Atoms] = ase.io.read(database, ":") # type: ignore[assignment] e0s = reestimate_e0s_linear_system(calc, db_atoms) with open(out, "w") as f: f.write( "\n".join( [ f"name: {name}", f"foundation_model: {mace_model}", f"train_file: {database}", f"valid_file: {database}", f'E0s: "{e0s}"', "multiheads_finetuning: false", "energy_weight: 1", "forces_weight: 10", "stress_weight: 0", "ema: true", "ema_decay: 0.999", "lr: 0.001", "max_num_epochs: 500", "default_dtype: float64", "batch_size: 1", "valid_batch_size: 1", ] ) + "\n" )
[docs] def parse_force_constants_file(fname: Path | str) -> np.ndarray: """Parse a force constants file from phonopy.""" with open(fname) as f: H = np.zeros([int(x) for x in next(f).split()] + [3, 3], dtype=np.float64) for _ in range(H.shape[0] * H.shape[1]): i, j = [int(x) - 1 for x in next(f).split()] H[i, j, :, :] = np.fromstring( " ".join([next(f).strip() for _ in range(3)]), sep=" " ).reshape((3, 3)) H = H.swapaxes(1, 2).reshape((3 * H.shape[0], 3 * H.shape[0])) return H
[docs] def convert_from_phonopy( fname: Path | str, atoms_file: Path | str, save_file: Path | str = "dynmat.npz" ) -> None: """Convert phonopy FORCE_CONSTANTS to dynmat.npz. FORCE_CONSTANTS is written by phonopy when specifying the "--writefc" tag. Args: fname (Path): path to FORCE_CONSTANTS file. atoms_file (Path): path to file containing the equilibrium structure that was used to evaluate the force constants. (Only needed to extract sqrt masses.) save_file (Path): path where dynamical matrix is saved to (should end in .npz) """ logger.info(f"reading force constants from {fname}") H = parse_force_constants_file(fname) atoms: Atoms = ase.io.read(atoms_file) # type: ignore[assignment] sqrt_mass = np.repeat(np.sqrt(atoms.get_masses()), 3) inv_sqrt_mass = 1 / sqrt_mass H = np.einsum("i,ij,j->ij", inv_sqrt_mass, H, inv_sqrt_mass) logger.info(f"saving dynamical matrix to {save_file}") np.savez_compressed(save_file, H=H, sqrt_mass=sqrt_mass, cmd=" ".join(sys.argv))
[docs] def compute_dynmat( input_struct: Path, save_file: Path = Path("./dynmat.npz"), mace_model: str = "medium-omat-0", device: str = "cuda", head: str = "default", relax_struct: bool = True, analytical_hessian: bool = True, relax_algo: str = "LBFGSLineSearch", fmax: float = 0.001, ) -> None: """Calculate the dynamical matrix using MACE. Args: input_struct (Path): structure about which to compute the dynamical matrix save_file (Path): path where dynamical matrix is saved to (should end in .npz) mace_model (str): pre-trained MACE model that is used, can be a local path device (str): device string passed to MACE to determine where calculation is performed head (str): which head from the model to use for prediction relax_struct (bool): determines if an atomic relaxation is performed prior to computing the Hessian matrix. This is recommended if the model does not predict the same equilibrium structure as your explicit DFT calculation, which is generally the case unless good fine tuning has been performed. analytical_hessian (bool): determines if the Hessian is computed analytically or numerically using finite differences relax_algo (str): name of algorithm from ase.optimize that is used for atomic relaxation. fmax (float): force convergence criteria for atomic relaxation in eV/Ä """ from mace.calculators import mace_mp atoms: Atoms = ase.io.read(input_struct) # type: ignore[assignment] # noqa: F823 atoms.calc = mace_mp( model=mace_model, dispersion=False, default_dtype="float64", device=device, enable_cueq=(device == "cuda" and find_spec("cuequivariance") is not None), head=head, ) if relax_struct: import ase.optimize as ase_optim optim = getattr(ase_optim, relax_algo) optim(atoms).run(fmax=fmax) if np.linalg.norm(atoms.get_forces(), axis=1).max() > 0.02: warnings.warn("large forces found", stacklevel=0) if analytical_hessian: H = atoms.calc.get_hessian(atoms).reshape((3 * len(atoms), 3 * len(atoms))) else: from ase.atoms import Atoms from phonopy import Phonopy from phonopy.structure.atoms import PhonopyAtoms phonopy_atoms = PhonopyAtoms( symbols=atoms.get_chemical_symbols(), cell=atoms.get_cell(), scaled_positions=atoms.get_scaled_positions(), ) phonopy = Phonopy(phonopy_atoms, supercell_matrix=np.eye(3), log_level=2) phonopy.generate_displacements(distance=0.02) forces = [] for phonopy_atoms in tqdm( phonopy.supercells_with_displacements, desc="[*] computing forces" ): atoms_dx = Atoms( symbols=phonopy_atoms.symbols, scaled_positions=phonopy_atoms.scaled_positions, cell=phonopy_atoms.cell, pbc=True, ) atoms.calc.calculate(atoms=atoms_dx, properties=["forces"]) forces.append(atoms.calc.results["forces"]) phonopy.forces = np.array(forces) phonopy.produce_force_constants() phonopy.symmetrize_force_constants() if phonopy.force_constants is None: raise RuntimeError("phonopy failed to produce force constants") H = phonopy.force_constants.swapaxes(1, 2).reshape((3 * len(atoms), 3 * len(atoms))) if not np.allclose(H, H.T): warnings.warn("Hessian matrix is not symmetric", stacklevel=0) sqrt_mass = np.repeat(np.sqrt(atoms.get_masses()), 3) inv_sqrt_mass = 1 / sqrt_mass H = np.einsum("i,ij,j->ij", inv_sqrt_mass, H, inv_sqrt_mass) np.savez_compressed(save_file, H=H, sqrt_mass=sqrt_mass, cmd=" ".join(sys.argv))
[docs] def compute_lineshape( ground: Path, excited: Path, dynmat_file: Path, emission: Annotated[bool, Parameter(name="--luminescence", negative="--absorption")] = True, dE: float | None = None, gamma_zpl: float = 0.001, sigma_zpl: float = 0.0, sigma_psb: tuple[float, float] = (0.005, 0.001), gamma_psb: tuple[float, float] | None = None, omega_mult: float = 5.0, norm: str = "area", T: Annotated[float, Parameter(name=["--T", "-T"])] = 0.0, plot: str | None = None, ) -> None: """Calculate the spectral density/function and lineshape for a given dynamical matrix. Args: ground (Path): path to structure containing the ground state equilibrium geometry. excited (Path): path to structure containing the excited state equilibrium geometry. dynmat_file (Path): path to dynamical matrix file produced by :func:`compute_dynmat` or by phonopy and converted with :func:`convert_from_phonopy`. emission (bool): write luminescence (True) or absorption (False) spectrum. dE (float): zero-phonon line energy in eV, inferred from ground/excited if not provided. gamma_zpl (float): Lorentzian broadening in the ZPL to capture homogeneous broadening. sigma_zpl (float): Gaussian broadening in the ZPL to capture inhomogeneous broadening. sigma_psb (float, float): Gaussian broadening used to broaden the partial Huang-Rhys factors. The broadening factor is linearly interpolated from sigma_psb[0] at zero frequency to sigma_psb[1] at the highest (non-LVM) frequency. gamma_psb (float, float): Turns on Lorentzian broadening of local vibrational modes identified by their inverse participation ratio. gamma_psb[0] is ipr_cut and gamma_psb[1] is gamma_lvm. See :class:`Broadening`. omega_mult (float): number of factors of maximum phonon frequency from ZPL to plot. norm (str): normalization of luminescence (area or max). T (float): Temperature in kelvin. plot (str): if provide, specifies the type of plot to be generated and save in the current working directory. Can be "subplot", "inset", "dos", "S", or "L". See :func:`plot_spec_funcs` for more info. """ initial, final = (excited, ground) if emission else (ground, excited) ini_atoms: Atoms = ase.io.read(initial) # type: ignore[assignment] fin_atoms: Atoms = ase.io.read(final) # type:ignore[assignment] if dE is None: try: dE = np.abs(fin_atoms.get_potential_energy() - ini_atoms.get_potential_energy()) except RuntimeError: logger.critical("input files do not contain total energy, so dE cannot be determined") raise logger.info(f"energy difference not provided, found dE = {dE} from input structures") sqrt_mass = np.repeat(np.sqrt(fin_atoms.get_masses()), 3) dx = get_disp_vect(fin_atoms, ini_atoms) dq = sqrt_mass * dx logger.info(f"dQ={np.linalg.norm(dq):.06f} amu^{{1/2}} Å") logger.info("reading dynmat matrix") data = np.load(dynmat_file) H = data["H"] # probably not needed, but doesn't hurt if not np.allclose(sqrt_mass, data["sqrt_mass"]): warnings.warn( "sqrt_mass in dynmat file is not compatible with ground/excited conf", stacklevel=0, ) logger.info("diagonalizing") omega, U = get_phonons(H) dq_k = U.T @ dq logger.info(f"total Huang-Rhys factor Stot={get_Stot(dq_k, omega):.04f}") if gamma_psb is not None: logger.info("computing inverse participation ratios") ipr, ipr_cut, gamma_lvm = get_ipr(U), gamma_psb[0], gamma_psb[1] else: ipr, ipr_cut, gamma_lvm = None, None, None logger.info("computing spectral functions") w, dos, S, A = get_phonon_spec_func( dq_k, omega, sigma_psb=sigma_psb, gamma_zpl=gamma_zpl, sigma_zpl=sigma_zpl, gamma_lvm=gamma_lvm, ipr=ipr, ipr_cut=ipr_cut, T=T, ) tw, L = convert_A_to_L(w, A, dE, emission=emission, norm=norm) logger.info("saving results to .txt files") np.savetxt("spec_funcs.txt", np.array((w, S, A)).T) np.savetxt("lineshape.txt", np.array((tw, L)).T) if plot is not None: import matplotlib.pyplot as plt plot_spec_funcs( (w, dos, S, tw, L), None, dE, emission=emission, omega_mult=omega_mult, omega_max=(omega2eV * omega.max()), plot_type=plot, ) plt.savefig("lineshape.png", dpi=600, bbox_inches="tight")
[docs] def analyze_dynmat(dynmat: Path, structure: Path) -> None: """Produce analysis plots of the dynamical matrix.""" H = np.load(dynmat)["H"] import matplotlib.pyplot as plt fig, ax = plt.subplots(figsize=(4, 3)) p = ax.imshow(np.log(np.abs(H)), vmin=-6) plt.colorbar(p) plt.savefig("H.png", dpi=600, bbox_inches="tight") atoms: Atoms = ase.io.read(structure) # type: ignore[assignment] d = atoms.get_all_distances(mic=True) fig, ax = plt.subplots(figsize=(4, 3)) for i in range(3): for j in range(3): ax.plot(d.flatten(), np.abs(H[i::3, j::3].flatten()), ".", ms=1) ax.set_yscale("log") ax.set_xlabel(r"$\vert {\bf R}_I - {\bf R}_J \vert$ [${\rm \AA}$]") ax.set_ylabel(r"$\vert \Phi_{I\alpha,J\beta} \vert$ [eV/amu$^{1/2}$ ${\rm \AA}$]") plt.savefig("radial_H.png", dpi=600, bbox_inches="tight")