Source code for pibronic.vibronic.electronic_structure

# electronic_structure.py
#
# system imports
import itertools as it
# import threading
# import warnings
# import inspect
import shutil
# import socket
import signal
import enum
import time
# import math
import mmap
import sys
import os
from os.path import join
from os.path import isfile

# third party imports
import numpy as np

# local imports
# from ..data import vibronic_model_io as vIO
# from .. import constants
# from ..constants import hbar
from ..log_conf import log
from ..server import job_boss


# how we identify the different steps
[docs]@enum.unique class State(enum.Enum): # DROPMO = 0 OPT = enum.auto() NIP = enum.auto() VIB = enum.auto() IPEOMCC = enum.auto() PREPVIB = enum.auto() VIBRON = enum.auto() FINISHED = enum.auto() @classmethod def max(cls): return max([x.value for x in cls.__members__.values()]) @classmethod def min(cls): return min([x.value for x in cls.__members__.values()])
# the input filename templates that this script uses to run calculations default_file_in = { "parameter": "{:s}_params.txt", "zmat": "{:s}_zmat.txt", "DROPMO": "{:s}_hartree_fock_dropmo.in", "NIP": "{:s}_hartree_fock_nip.in", "geometry": "{:s}_opt.in", "vib": "{:s}_vib.in", "ipeomcc": "{:s}_eomcc_ip_{{:d}}.in", "prepVib": "{:s}_prepvib.in", "vibronIn": "{:s}_vibron.in", # "vibronIn": "cp.in", "vibronCp": "cp.auto", # "vibronCp": "{:s}_vibron.auto", } # the output filename templates that this script uses to run calculations default_file_out = { "DROPMO": "{:s}_hartree_fock_dropmo.out0", "NIP": "{:s}_hartree_fock_nip.out0", "geometry": "{:s}_opt.out0", "vib": "{:s}_vib.out0", "ipeomcc": "{:s}_eomcc_ip_{{:d}}.out0", "prepVib": "{:s}_prepvib.out0", "pntheff": "{:s}_prepvib.pntheff", "vibronIn": "{:s}_prepvib.vibron_input", "vibronCp": "{:s}_prepvib.vibronic_coupling", "vibron": "{:s}_vibron.out0", # "vibron": "cp.auto0", } # supported calculation methods basis_list = [ "TZ2P", "DZP", # "6-31+G", # "6-31++G**", ] theory_list = [ "MBPT(2)", "CCSD", "CCSD(T)", ] calculations_list = [ "IP", # "EA", # "EA", ] """ get the functions name """ # inspect.currentframe().f_code.co_name """ get the name of the function that called the current function """ # inspect.currentframe().f_back.f_code.co_name # ----------------------------------------------------------- # MEMORY MAPPED HELPER FUNCTIONS # -----------------------------------------------------------
[docs]def find_string_in_file(memmap_file, file_path, target_string): """wrapper that raises error if no substr can be found finds the first occurrence of a substring in memory mapped file""" location = memmap_file.find(target_string.encode(encoding="utf-8")) if location == -1: # couldn't find target string in file s = ("It seems \"{:s}\" was not present in the file\n\"{:s}\"\n" "Check that the previous calculation didn't fail" ) raise Exception(s.format(target_string, file_path)) return location
[docs]def rfind_string_in_file(memmap_file, file_path, target_string): """wrapper that raises error if no substr can be found finds the last occurrence of a substring in memory mapped file""" location = memmap_file.rfind(target_string.encode(encoding="utf-8")) if location == -1: # couldn't find target string in file s = ("It seems \"{:s}\" was not present in the file\n\"{:s}\"\n" "Check that the previous calculation didn't fail" ) raise Exception(s.format(target_string, file_path)) return location
[docs]def skip_back_n_lines(memmap_file, n, start_index): """gives the byte location n lines before the given byte location start_index""" for _ in it.repeat(None, n): temporary_start_index = memmap_file.rfind(b'\n', 0, start_index) if temporary_start_index < 0: break start_index = temporary_start_index return start_index
[docs]def skip_forward_n_lines(memmap_file, n, start_index): """gives the byte location n lines after the given byte location start_index""" for _ in it.repeat(None, n): temporary_start_index = memmap_file.find(b'\n', start_index + 1) if temporary_start_index == -1: break start_index = temporary_start_index return start_index
# ----------------------------------------------------------- # HELPER FUNCTIONS # ----------------------------------------------------------- number_dictionary = { 0: ("acetonitrile", 200), 1: ("ammonia", 201), 2: ("boron_trifluoride", 202), 3: ("formaldehyde", 203), 4: ("methane", 204), 5: ("formamide", 205), 6: ("formic_acid", 206), 7: ("hydrogen_peroxide", 207), 8: ("water", 208), 9: ("pyridine", 209), 10: ("furan", 210), 11: ("trichloroethylene", 211), 12: ("chloroethylene", 212), 13: ("acrolein", 213), 14: ("acrylonitrile", 214), 15: ("cis_12_dichloroethylene", 215), 16: ("trans_12_dichloroethylene", 216), 17: ("11_dichloroethylene", 217), # 18: ("", 218), # 19: ("", 219), # 20: ("", 220), # 21: ("", 221), } name_of_state_file = "execution_state.txt"
[docs]def pretty_print_job_status(path=None): """quick hack for scripting, path should look something like `/**/data_set_{:d}/electronic_structure/execution_state.txt` """ # use the default path if path is None: path = join("/work/ngraymon/pimc/data_set_{:d}/electronic_structure/", name_of_state_file) # assume that we we're provided with the root of the path if "data_set_" not in path: path = join(path, "data_set_{:d}/electronic_structure/", name_of_state_file) for model in number_dictionary.values(): path_file_state = path.format(model[1]) if isfile(path_file_state): with open(path_file_state, 'r') as file: lines = file.readlines() for line in reversed(lines): if line in ["", "\n"]: continue last_state = str(line.strip()) break log.info("({:}, {:}) last state was {:}".format(model[0], model[1], last_state)) else: s = "({:}, {:}) does not have an {:s} file" log.info(s.format(model[0], model[1], name_of_state_file)) return
[docs]def verify_file_exists(file_path): """raises FileNotFoundError exception if input is not a file, or does not exist""" if not isfile(file_path): raise FileNotFoundError("Cannot find {}".format(file_path)) return True
[docs]def pairwise(iterable): """returns an iterator over pairs of elements in the iterable""" a, b = it.tee(iterable) next(b, None) return zip(a, b)
[docs]def get_yes_no_from_user(message): """return the Boolean value of the users response the a yes/no question""" while True: try: yes_no = bool('y' in input(message).lower()) except ValueError: print("Sorry, I didn't understand that\n") continue else: break return yes_no
[docs]def get_integer_from_user(message): """return the integer value of the users response to a input prompt""" while True: try: user_integer = int(input(message)) assert user_integer >= 0 except ValueError: print("Sorry, I didn't understand that\n") continue except AssertionError: print("Please provide a non negative value\n") continue else: break return user_integer
[docs]def wait_on_results(job_id, job_type): """synchronizes with the submitted job and verifies that the job's output is successful""" log.flow("Waiting on job") # wait for the job to queue up time.sleep(5) job_boss.synchronize_with_job(job_id, job_type) # get the recorded state of the job result = job_boss.check_acct_state(job_id) # check if the job failed if "FAILED" in result: s = "{:s} script (JOBID={:d}) failed for some reason" raise Exception(s.format(job_type, job_id)) # needs to be completed elif not ("COMPLETED" in result): raise Exception("unknown result in {:s} parsing".format(job_type)) return
[docs]def verify_aces2_completed(path_root, file_path, job_id): """verifies that the aces2 output file has completed successfully""" final_string = b'The ACES2 program has completed successfully in' with open(file_path, "r+b") as source_file: with mmap.mmap(source_file.fileno(), 0, prot=mmap.PROT_READ) as memmap_file: # the target string should be in the last couple lines of the file # first we find the byte location of the beginning of the 10th last line ten_lines_back = skip_back_n_lines(memmap_file, 10, len(memmap_file) - 1) # then we search from there to the end of the file for our target string if memmap_file.find(final_string, ten_lines_back) == -1: # if we don't find it then the calculation was a failure s = ("Output for ACES2 jobid={:d} did not complete successfully!\n" "Check output file\n{:s}" ) log.error(s.format(job_id, file_path)) # verify that the slurm job didn't fail job_boss.check_slurm_output(path_root, job_id) s = ("Could not find an issue with slurm output in file\nslurm-{:d}.out\n" "It appears that slurm job didn't fail but ACES2 did?\n" "This appears to be a theory/numerical/symmetry issue.\n" ) # only the aces2 job failed? raise Exception(s.format(job_id)) return
[docs]def submit_job(parameter_dictionary): """wrapper for job_boss job submission""" command = "sbatch" command += (" --mem={memory_size:d}G" " --ntasks=1" " --job-name={file_name:s}" " --cpus-per-task={cpus:d}" " --workdir={work_dir:s}" # " --output={output:s}/%x.o%j" " --partition={partition:s}" " --export=" # "MK_SCRATCH_DIR={make_scratch:s}" # ",SCRATCH_COPY_IN={scratch_in:s}" # ",SCRATCH_COPY_OUT={scratch_out:s}" ",HEADNODE={hostname:s}" ",PREAMBLE={pre_amble:s}" ",JOB={job_name:s}" ",POSTAMBLE={post_amble:s}" ",PARENT_PID={parent_pid:d}" " /home/ngraymon/chem740/work_dir/submit_template" ) job_id, out, error = job_boss.submit_job(command, parameter_dictionary) # check for any errors if error.decode('utf-8') is not "": s = "Failed to execute script \n{:s}\n due to {:s}" raise Exception(s.format(command, error.decode('utf-8'))) return job_id
# -----------------------------------------------------------
[docs]def parse_input_parameters(file_path=None): return # does not work - decommission for now # Default is to assume parameter file is in the same directory # if file_path is None: # file_path = os.path.dirname(os.path.realpath(__file__)) # file_path = join(file_path, default_file_in["parameter"].format("default")) # where we store the data data_dictionary = { "calculation": None, "zmat path": None, "theory": None, "basis": None, } # # read in the file contents # try: # verify_file_exists(file_path) # with open(file_path, 'r') as source_file: # for line in source_file: # if not (line == "" or line.startswith("#")): # header = line.strip() # data = source_file.readline().strip() # if header in ["zmat path", "basis", "theory", "calculation"]: # data_dictionary[header] = data # else: # s = ("Header {:s} is not valid\n" # "Check that your {:s} has the correct formatting" # ) # raise ValueError(s.format(header, file_path)) # except ValueError as error_object: # raise error_object # # check the validity of the parameter file # assert data_dictionary["basis"] in basis_list, "invalid parameter provided for basis" # assert data_dictionary["theory"] in theory_list, "invalid parameter provided for theory" # assert data_dictionary["calculation"] in calculations_list, "invalid parameter provided for calculation" # assert isfile(data_dictionary["zmat path"]), "path to zmat file is not a valid path!" return data_dictionary
[docs]def hartree_fock_calculation(path_root, name, zmat, parameter_dictionary, hf_type): """creates and populates the input file and submission file for the job""" log.flow("Setting up {:s} calculation".format(hf_type)) memory_size = 10 # in GB's # file location file_name = default_file_in[hf_type].format(name) file_path = join(path_root, file_name) # create the input file template template = zmat.get() template += "*ACES2( BASIS={basis:s}," template += "\n\t" + "CALC=SCF," template += "\n\t" + "MEMORY_SIZE={:d}GB,".format(memory_size) # template += "\n\t" + "{DROPMO:s}," template += "\n\t" + ")" template += "\n" # populate the template using the input dictionary output = template.format_map(parameter_dictionary) with open(file_path, 'w') as dest_file: dest_file.write(output) # "/scratch/$USER/pimc/data_set_${DATA_SET_ID}/" # copy_out = "mv --force /scratch/$USER/pimc/data_set_${DATA_SET_ID}/" # path_output = "/scratch/$USER/pimc/data_set_{:d}/execution_output/" # nothing for now copy_in = "" copy_out = "" path_output = "" slurm_parameters = { "memory_size": memory_size+3, "file_name": file_name, "cpus": 1, "work_dir": path_root, "output": path_output, "partition": job_boss.partition, "make_scratch": 0, "scratch_in": copy_in, "scratch_out": copy_out, "pre_amble": "jobaces", "job_name": file_name.replace(".in", ""), "post_amble": "", "hostname": job_boss.hostname, "parent_pid": os.getpid(), } # run the job job_id = submit_job(slurm_parameters) wait_on_results(job_id, "SCF") file_name = default_file_out[hf_type].format(name) file_path = join(path_root, file_name) verify_aces2_completed(path_root, file_path, job_id)
[docs]def temp_file_path(root): return
def _estimate_dropmo(file_path): """read the output file from a scf calculation and attempt to choose a DROPMO value """ # access the file using memory map for efficiency with open(file_path, "r+b") as source_file: with mmap.mmap(source_file.fileno(), 0, prot=mmap.PROT_READ) as memmap_file: # find the beginning and ending of the important region target_begin = 'ORBITAL EIGENVALUES (ALPHA)' target_end = '+++++++++++++++++++++++++++++++++++++++++++' begin = find_string_in_file(memmap_file, file_path, target_begin) end = find_string_in_file(memmap_file, file_path, target_end) # go there memmap_file.seek(begin) # throw away the header lines past_headers = skip_forward_n_lines(memmap_file, 4, memmap_file.tell()) memmap_file.seek(past_headers) # read all the relevant data data_in_bytes = memmap_file.read(end - memmap_file.tell()) data_as_string = data_in_bytes.decode(encoding="utf-8") lines = data_as_string.strip().splitlines() # attempt to guess a DROPMO value try: # where we store the data list_ev = np.array([line.split(maxsplit=5)[3] for line in lines], dtype=float) log.debug("list_ev "+str(list_ev)) # the differences in each pair of eV's diffs = list(map(lambda e: (e[0] / e[1]), pairwise(list_ev))) log.debug("diffs "+str(diffs)) # biggest gap in eV's max_diff = max(diffs) log.debug("max_diff: "+str(max_diff)) # getguess_dropmo guess_dropmo = diffs.index(max_diff)+1 log.debug("guess_dropmo: "+str(guess_dropmo)) except Exception as error: guess_dropmo = -1 log.debug("We failed to guess a dropmo") pass string_orbital = "Here are the orbital energies\n" string_orbital += "-"*30 + "\n" string_orbital += "".join([str(ev)+"\n" for ev in list_ev]) string_orbital += "+"*30 # show the user the orbital energies print(string_orbital) # ask user if our guess of dropmo is appropriate if not guess_dropmo == -1: s = "We guessed that dropmo should be {:d} is that acceptable?\n" valid = get_yes_no_from_user(s.format(guess_dropmo)) else: print("We failed to guess a dropmo value") valid = False # if the dropmo value is bad then get a new value from the user while not valid: guess_dropmo = get_integer_from_user("Please provide an alternative dropmo value\n") s = "You provided a new dropmo value of {:d} is that acceptable?\n" valid = get_yes_no_from_user(s.format(guess_dropmo)) # if zero then don't put any typeword string_dropmo = "" if guess_dropmo == 1: string_dropmo = "DROPMO=1" if guess_dropmo > 1: # a range of orbitals is indicated by '1-X' string_dropmo = "DROPMO=1-{:d}".format(guess_dropmo) return string_dropmo def _extract_lines_for_estimating_NIP(path): """ does what it says """ with open(path, "r+b") as source_file: # access the file using memory map for efficiency with mmap.mmap(source_file.fileno(), 0, prot=mmap.PROT_READ) as memmap_file: # find the beginning and ending of the important region target_begin = 'ORBITAL EIGENVALUES (ALPHA)' target_end = '+++++++++++++++++++++++++++++++++++++++++++' begin = find_string_in_file(memmap_file, path, target_begin) end = find_string_in_file(memmap_file, path, target_end) # go there memmap_file.seek(begin) # throw away the header lines past_headers = skip_forward_n_lines(memmap_file, 4, memmap_file.tell()) memmap_file.seek(past_headers) # read all the relevant data data_in_bytes = memmap_file.read(end - memmap_file.tell()) data_as_string = data_in_bytes.decode(encoding="utf-8") lines = data_as_string.strip().splitlines() return lines def _guess_NIP_value(lines, guess_dropmo): """ attempt to guess a NIP value """ list_ev = [] try: # where we store the data list_ev = np.array([lin.split(maxsplit=5)[3] for lin in lines], dtype=float) log.debug("list_ev "+str(list_ev)) # the differences in each pair of eV's diffs = list(map(lambda e: (e[0] / e[1]), pairwise(list_ev))) log.debug("diffs "+str(diffs)) if guess_dropmo >= len(list_ev): raise Exception("Can't remove orbitals that don't exist") list_dropped = np.delete(list_ev, range(0, guess_dropmo+1)) log.debug("list_dropped "+str(list_dropped)) # the differences in each pair of eV's diffs2 = list(map(lambda e: (e[0] / e[1]), pairwise(list_dropped))) log.debug("diffs2 "+str(diffs2)) # biggest gap in eV's second_highest = max(diffs2) log.debug("second_highest: "+str(second_highest)) # the suggested nip value guess_nip = len(diffs) - diffs.index(second_highest) log.debug("guess_nip: "+str(guess_nip)) except Exception as error: guess_nip = -1 log.debug("We failed to guess a nip") return guess_nip, list_ev def _estimate_NIP(vibron, path): """ read the output file from a scf calculation and attempt to choose a NIP value""" lines = _extract_lines_for_estimating_NIP(path) # grab the guess_dropmo as a number string_dropmo = vibron.parameter_dictionary["DROPMO"] guess_dropmo = 0 if string_dropmo == "" else int(string_dropmo[-1]) print(guess_dropmo) guess_nip, list_ev = _guess_NIP_value(lines, guess_dropmo) string_orbital = "Here are the orbital energies\n" string_orbital += "-"*30 + "\n" string_orbital += "".join([str(ev)+"\n" for ev in list_ev]) string_orbital += "+"*30 # show the user the orbital energies print(string_orbital) # ask user if our guess of nip is appropriate if not guess_nip == -1: s = "We guessed that nip should be {:d} is that acceptable?\n" valid = get_yes_no_from_user(s.format(guess_nip)) else: print("We failed to guess a nip value") valid = False # if the nip value is bad then get a new value from the user while not valid: guess_nip = get_integer_from_user("Please provide an alternative nip value\n") s = "You provided a new nip value of {:d} is that acceptable?\n" valid = get_yes_no_from_user(s.format(guess_nip)) return guess_nip
[docs]def parse_hartree_fock_output(vibron, execution_state): """ extracts relevant data from output of job """ hf_type = str(State(execution_state).name) file_name = vibron.file_out[hf_type] file_path = join(vibron.path_root, file_name) verify_file_exists(file_path) # this could possibly be redesigned # first scf estimates the DROPMO parameter if hf_type is "DROPMO": # save the estimate in the vibron object vibron.parameter_dictionary["DROPMO"] = _estimate_dropmo(file_path) # second scf estimates the new DROPMO parameter # and checks which orbitals to include at the optimized geometry elif hf_type is "NIP": # vibron.parameter_dictionary["DROPMO"] = estimate_dropmo(file_path) vibron.parameter_dictionary["nip"] = _estimate_NIP(vibron, file_path) else: s = ("Tried to call parse_hartree_fock_output " "with hf_type argument of {:s} which is not implemented" ) raise NotImplementedError(s.format(hf_type)) return
[docs]def geometry_optimization(path_root, name, zmat, parameter_dictionary): """creates and populates the input file and submission file for the job""" log.flow("Setting up calculation") # in GB's memory_size = 20 # file location file_name = default_file_in["geometry"].format(name) file_path = join(path_root, file_name) # create the input file template template = zmat.get(opt_zmat=True) template += "*ACES2( BASIS={basis:s}," template += "\n\t" + "CALC={theory:s}," template += "\n\t" + "MEMORY_SIZE={:d}GB,".format(memory_size) # template += "\n\t" + "{DROPMO:s}," if zmat.kind == 'cartesian': template += "\n\t" + "GEOM_OPT=RIC," # template += "\n\t" + "GEOM_OPT=CART," template += "\n\t" + ")" template += "\n" # populate the template using the input dictionary output = template.format_map(parameter_dictionary) # write the input file with open(file_path, 'w') as dest_file: dest_file.write(output) slurm_parameters = { "memory_size": memory_size+3, "file_name": file_name, "cpus": 1, "work_dir": path_root, "output": "", "partition": job_boss.partition, "make_scratch": "", "scratch_in": "", "scratch_out": "", "pre_amble": "jobaces", "job_name": file_name.replace(".in", ""), "post_amble": "", "hostname": job_boss.hostname, "parent_pid": os.getpid(), } # run the job job_id = submit_job(slurm_parameters) wait_on_results(job_id, "geometry optimization") file_name = default_file_out["geometry"].format(name) file_path = join(path_root, file_name) verify_aces2_completed(path_root, file_path, job_id) return
[docs]def parse_opt_output(vibron): """extract substrings describing the optimized geometry from the output file generated by a geometry optimization calculation""" file_name = vibron.file_out["geometry"] file_path = join(vibron.path_root, file_name) verify_file_exists(file_path) def extract_internal(file_path): """assume that the substrings describing the geometry are in internal coordinates""" with open(file_path, "r+b") as source_file: # access the file using memory map for efficiency with mmap.mmap(source_file.fileno(), 0, prot=mmap.PROT_READ) as memmap_file: # find the beginning of the important region target_string = 'Summary of optimized internal coordinates' begin = find_string_in_file(memmap_file, file_path, target_string) # go there memmap_file.seek(begin) # throw away the headers memmap_file.readline() memmap_file.readline() lines = [] while True: line = memmap_file.readline().decode(encoding="utf-8") if line in ["\n", "Frequencies of the updated Hessian at convergence"]: break lines.append(line) new_internal_geometry = dict([map(str.strip, line.split('=')) for line in lines]) return new_internal_geometry def extract_cartesian(file_path): """assume that the substrings describing the geometry are in cartesian coordinates""" with open(file_path, "r+b") as source_file: # access the file using memory map for efficiency with mmap.mmap(source_file.fileno(), 0, prot=mmap.PROT_READ) as memmap_file: # find the beginning of the important region target_string = 'Summary of optimized Cartesian coordinates (Ang)' begin = find_string_in_file(memmap_file, file_path, target_string) # go there memmap_file.seek(begin) # throw away the headers memmap_file.readline() memmap_file.readline() lines = [] while True: line = memmap_file.readline().decode(encoding="utf-8") if line in ["\n", "Frequencies of the updated Hessian at convergence"]: break lines.append(line) # create list of updated geometry list_of_lines = [x.split() for x in lines] new_cartesian_geometry = "" for line in list_of_lines: for item in line: new_cartesian_geometry += str(item) + " " new_cartesian_geometry += "\n" new_cartesian_geometry += "\n" return new_cartesian_geometry # return optimized geometry if vibron.zmat.kind is "internal": vibron.zmat.set(extract_internal(file_path)) else: vibron.zmat.set(extract_cartesian(file_path)) return
[docs]def vibrational_frequency(path_root, name, zmat, parameter_dictionary): """creates and populates the input file and submission file for the job""" log.flow("Setting up calculation") # in GB's memory_size = 20 # file location file_name = default_file_in["vib"].format(name) file_path = join(path_root, file_name) # create the input file template template = zmat.get() template += "*ACES2( BASIS={basis:s}," template += "\n\t" + "CALC={theory:s}," template += "\n\t" + "MEMORY_SIZE={:d}GB,".format(memory_size) # template += "\n\t" + "{DROPMO:s}," template += "\n\t" + "VIB=FINDIF," template += "\n\t" + ")" template += "\n" # populate the template using the input dictionary output = template.format_map(parameter_dictionary) with open(file_path, 'w') as dest_file: dest_file.write(output) slurm_parameters = { "memory_size": memory_size+3, "file_name": file_name, "cpus": 1, "work_dir": path_root, "output": "", "partition": job_boss.partition, "make_scratch": "", "scratch_in": "", "scratch_out": "", "pre_amble": "jobaces", "job_name": file_name.replace(".in", ""), "post_amble": "fcm", "hostname": job_boss.hostname, "parent_pid": os.getpid(), } # run the job job_id = submit_job(slurm_parameters) wait_on_results(job_id, "vibrational frequency") file_name = default_file_out["vib"].format(name) file_path = join(path_root, file_name) verify_aces2_completed(path_root, file_path, job_id) # make sure the output files we expect to be present are present # verify_file_exists(join(path_root, "fcmint")) # verify_file_exists(join(path_root, file_name)) # verify_file_exists(join(path_root, file_name)) # verify_file_exists(join(path_root, file_name)) # verify_file_exists(join(path_root, file_name)) return
[docs]def ip_calculation(path_root, name, zmat, parameter_dictionary): """creates and populates the input file and submission file for the job""" log.flow("Setting up calculation") # in GB's memory_size = 10 # file location file_name = default_file_in["ipeomcc"].format(name) file_name = file_name.format(parameter_dictionary["nip"]) file_path = join(path_root, file_name) template = zmat.get() template += "*ACES2( BASIS={basis:s}," template += "\n\t" + "CALC={theory:s}," template += "\n\t" + "MEMORY_SIZE={:d}GB,".format(memory_size) # template += "\n\t" + "{DROPMO:s}," template += "\n\t" + "IP_CALC=IP_EOMCC," template += "\n\t" + ")" template += "\n" template += "\n" + "*mrcc_gen" template += "\n\t" + "nip={nip:d}" # template += "\n\t" + "ip_low=-25" # template += "\n\t" + "ea_high=0" template += "\n" + "*end" template += "\n" # populate the template using the input dictionary output = template.format_map(parameter_dictionary) with open(file_path, 'w') as dest_file: dest_file.write(output) slurm_parameters = { "memory_size": memory_size+3, "file_name": file_name, "cpus": 1, "work_dir": path_root, "output": "", "partition": job_boss.partition, "make_scratch": "", "scratch_in": "", "scratch_out": "", "pre_amble": "jobaces", "job_name": file_name.replace(".in", ""), "post_amble": "", "hostname": job_boss.hostname, "parent_pid": os.getpid(), } # run the job job_id = submit_job(slurm_parameters) wait_on_results(job_id, "IP_EOM") file_name = default_file_out["ipeomcc"].format(name) file_name = file_name.format(parameter_dictionary["nip"]) file_path = join(path_root, file_name) verify_aces2_completed(path_root, file_path, job_id) return
[docs]def parse_ip_output(vibron): """extracts relevant data from output of job""" file_name = vibron.file_out["ipeomcc"].format(vibron.parameter_dictionary["nip"]) file_path = join(vibron.path_root, file_name) verify_file_exists(file_path) def verify_percent_singles(file_path, cutoff): """read the output file from an ionization potential calculation extract the \% singles and verify they they are above a cutoff""" with open(file_path, "r+b") as source_file: # access the file using memory map for efficiency with mmap.mmap(source_file.fileno(), 0, prot=mmap.PROT_READ) as memmap_file: # find the beginning and ending of the important region target_begin = 'Summary of ionization-potential eom-cc calculation' begin = find_string_in_file(memmap_file, file_path, target_begin) # go there memmap_file.seek(begin) # throw away the header lines past_headers = skip_forward_n_lines(memmap_file, 5, memmap_file.tell()) memmap_file.seek(past_headers) # store the lines with the relevant information lines = [] while True: line = memmap_file.readline().decode(encoding="utf-8") if(line is "\n" or "--------" in line): break lines.append(line) # extract the relevant information percent_singles = np.array([x.split(maxsplit=5)[4] for x in lines], dtype=float) boolean_array = (percent_singles <= vibron.singles_cutoff).copy() # verify that all %singles are above our cutoff point if not boolean_array.any(): return True # otherwise record a warning s = "We found {:d} states with less than {:f}\% singles" log.warning(s.format(np.count_nonzero(boolean_array), vibron.singles_cutoff)) return False return verify_percent_singles(file_path, vibron.singles_cutoff)
[docs]def verify_ip_states(job): """run calculations to verify the range of states selected""" while True: job.ip_calc() if parse_ip_output(job): s = "We verified that {:d} states had \% singles above {:f}" log.info(s.format(job.parameter_dictionary["nip"], job.singles_cutoff)) if job.state is State.IPEOMCC: job.advance() # this is a problem # we only want to advance if we aren't already at prepVibron or vibron calc break s = "Reducing the nip from {:d} to {:d}" log.debug(s.format(job.parameter_dictionary["nip"], job.parameter_dictionary["nip"]-1)) job.parameter_dictionary["nip"] -= 1 if job.parameter_dictionary["nip"] == 0: s = "We failed to get \% singles above 85\% and ended up with a nip of 0" raise Exception(s) # Don't know how to handle this right now return
def _create_template(zmat, memory_size): """ wrapper for the template creation this is the most flexible way in case it needs to be changed in the future """ template = zmat.get() template += "*ACES2( BASIS={basis:s}," template += "\n\t" + "CALC={theory:s}," template += "\n\t" + "MEMORY_SIZE={:d}GB,".format(memory_size) # template += "\n\t" + "{DROPMO:s}," template += "\n\t" + "IP_CALC=IP_EOMCC," template += "\n\t" + "PREP_VIBRON=ON," template += "\n\t" + "GRID_VIBRON={gridVibron:d}," template += "\n\t" + "CC_CONV={ccConv:d}," template += "\n\t" + "SCF_CONV={scfConv:d}," template += "\n\t" + "ESTATE_TOL={estateTol:d}," template += "\n\t" + ")" template += "\n" template += "\n" + "*mrcc_gen" template += "\n\t" + "nip={nip:d}" template += "\n" + "*ip_eom" template += "\n\t" + "diabatize=on" template += "\n\t" + "heff_low=0" template += "\n\t" + "heff_high=200" template += "\n" + "*end" template += "\n" return
[docs]def prepVibron_calculation(path_root, name, zmat, parameter_dictionary): """creates and populates the input file and submission file for the job""" log.flow("Setting up calculation") # in GB's memory_size = 10 # file location file_name = default_file_in["prepVib"].format(name) file_path = join(path_root, file_name) template = _create_template(zmat, memory_size) # populate the template using the input dictionary output = template.format_map(parameter_dictionary) with open(file_path, 'w') as dest_file: dest_file.write(output) slurm_parameters = { "memory_size": memory_size+3, "file_name": file_name, "cpus": 2, "work_dir": path_root, "output": "", "partition": job_boss.partition, "make_scratch": "", "scratch_in": "", "scratch_out": "", "pre_amble": "jobaces", "job_name": file_name.replace(".in", ""), "post_amble": "vibron", "hostname": job_boss.hostname, "parent_pid": os.getpid(), } # run the job job_id = submit_job(slurm_parameters) wait_on_results(job_id, "preparing for vibron") job_boss.check_slurm_output(path_root, job_id) verify_file_exists(join(path_root, "fcmfinal")) verify_file_exists(join(path_root, default_file_out["pntheff"].format(name))) verify_file_exists(join(path_root, default_file_out["vibronCp"].format(name))) verify_file_exists(join(path_root, default_file_out["vibronIn"].format(name))) return
def _remove_extra_heff(vibron, file_name, target_string='HEFF_IP2'): """remove non relevant data from output files kind argument does nothing at the moment""" file_path = join(vibron.path_root, file_name) # access the file using memory map for efficiency with open(file_path, "r+b") as source_file: with mmap.mmap(source_file.fileno(), 0) as memmap_file: # store the original file size size_original = memmap_file.size() log.debug(size_original) log.debug(target_string) # find the first occurrence of the target_string start_in_bytes = find_string_in_file(memmap_file, file_path, target_string) log.debug(start_in_bytes) # get the byte location of the start of the line start_in_bytes = memmap_file.rfind(b'\n', 0, start_in_bytes) # we want to ignore that newline character however start_in_bytes += 1 log.debug(start_in_bytes) # get the EOF location in numBytes memmap_file.seek(0, os.SEEK_END) end_of_file_in_bytes = memmap_file.tell() log.debug(end_of_file_in_bytes) # the length of our source length_in_bytes = end_of_file_in_bytes - start_in_bytes log.debug(length_in_bytes) # copy the file, resize it and write the change to disk memmap_file.move(0, start_in_bytes, length_in_bytes) memmap_file.resize(length_in_bytes) memmap_file.flush() # # store the original file size # size_original = memmap_file.size() # # find the beginning and ending of the important region # destStart = find_string_in_file(memmap_file, file_path, target_string) # destEnd = rfind_string_in_file(memmap_file, file_path, target_string) # log.debug(destStart, destEnd) # # find the byte location of the end of the line # destStart = memmap_file.rfind(b'\n', 0, destStart) # destEnd = memmap_file.find(b'\n', destEnd) # log.debug(destStart, destEnd) # # if we can't find a newline character # # that means the substring was found # # in the first line of the file # if destStart == -1: # destStart = 0 # destEnd += 1 # log.debug(destStart, destEnd) # # find the start of the section to copy # start_in_bytes = memmap_file.find(b'HEFF_IP2', destEnd) # log.debug(start_in_bytes) # # get the location of the beginning of the line # start_in_bytes = memmap_file.rfind(b'\n', 0, start_in_bytes) # log.debug(start_in_bytes) # # get the EOF location in numBytes # memmap_file.seek(0, os.SEEK_END) # EOF = memmap_file.tell() # # check file sizes # destLength = destEnd - destStart # length_in_bytes = EOF - start_in_bytes # log.debug(memmap_file.size(), EOF) # log.debug(destStart, start_in_bytes, length_in_bytes) # log.debug(size_original - destLength) # memmap_file.move(destStart, start_in_bytes, length_in_bytes) # memmap_file.resize(size_original - destLength) # memmap_file.flush() return def _copy_output_file(vibron, source, destination): """ verifies source file exists before copying it to destination """ path_source = join(vibron.path_root, source) verify_file_exists(path_source) path_destination = join(vibron.path_root, destination) shutil.copyfile(path_source, path_destination) return def _add_nip_to_file(vibron, file_name): """ modify file to contain the nip """ nip = vibron.parameter_dictionary["nip"] file_path = join(vibron.path_root, file_name) with open(file_path, "r+b") as destFile: # access the file using memory map for efficiency with mmap.mmap(destFile.fileno(), 0) as memmap_file: # we assume that there is only one 'xxx' substring in the whole file startTarget = 'xxx' start = find_string_in_file(memmap_file, file_path, startTarget) # go there memmap_file.seek(start) s = "{:>6,d}" # attempt to write 3 bytes over the 'xxx' try: bytesWritten = memmap_file.write(s.format(nip).encode(('utf-8'))) # make sure we wrote the right amount assert bytesWritten == 6, "incorrect number of bytes written to file {:s}".format(file_name) memmap_file.flush() # write the changes from memory to disk except (Exception, ValueError) as e: s = "Failed to write the nip={:d} to {:s}" log.error(s.format(nip, file_path)) raise e # 1 return
[docs]def parse_prepVibron_output(vibron): """extracts relevant data from output of job""" src = vibron.file_out["pntheff"] dst = "PNTHEFF" _copy_output_file(vibron, src, dst) _remove_extra_heff(vibron, dst) # _remove_extra_heff(vibron, dst, 'HEFF_0') src = vibron.file_out["vibronCp"] dst = vibron.file_in["vibronCp"] _copy_output_file(vibron, src, dst) _remove_extra_heff(vibron, dst) # _remove_extra_heff(vibron, dst, 'Heff_0') src = vibron.file_out["vibronIn"] dst = vibron.file_in["vibronIn"] _copy_output_file(vibron, src, dst) _add_nip_to_file(vibron, dst) return
[docs]def vibron_calculation(path_root, name, zmat, parameter_dictionary): """creates and populates the input file and submission file for the job""" log.flow("Setting up calculation") # in GB's memory_size = 15 # file location file_name = default_file_in["vibronIn"].format(name) slurm_parameters = { "memory_size": memory_size+3, "file_name": file_name, "cpus": 1, "work_dir": path_root, "output": "", "partition": job_boss.partition, "make_scratch": "", "scratch_in": "", "scratch_out": "", "pre_amble": "jobvibron", "job_name": file_name.replace(".in", ""), "post_amble": "", "hostname": job_boss.hostname, "parent_pid": os.getpid(), } # run the job job_id = submit_job(slurm_parameters) wait_on_results(job_id, "VIBRON") job_boss.check_slurm_output(path_root, job_id) return
[docs]def parse_vibron_output(vibron): """extracts relevant data from output of job""" file_name = vibron.file_out["vibron"] file_path = join(vibron.path_root, file_name) verify_file_exists(file_path) final_string = b'All done in main_vibron' with open(file_path, "r+b") as source_file: with mmap.mmap(source_file.fileno(), 0, prot=mmap.PROT_READ) as memmap_file: # the target string should be in the last couple lines of the file # first we find the byte location of the beginning of the 10th last line ten_lines_back = skip_back_n_lines(memmap_file, 10, len(memmap_file) - 1) # then we search from there to the end of the file for our target string if memmap_file.find(final_string, ten_lines_back) == -1: # if we don't find it then the calculation was a failure s = "The vibron calculation failed to successfully complete"\ + "\nCheck output file {:s}" # log.error(s.format(file_path)) raise Exception(s.format(file_path)) else: log.info("vibron calculation successfully completed!") return
[docs]class ZmatClass: """handles all details related to zmat file which represents the geometry of the molecule can hold internal or cartesian coordinates""" kind = ["cartesian", "internal"][0] # string that holds the relational ZMAT info # in internal coordinates (without the bond/angle values) _internal = None # dictionary that holds the coordinate bond/angle values # for internal coordinates type of ZMAT _coord = None # string that holds the relational ZMAT info # in cartesian coordinates _cartesian = None def __init__(self, file_path): verify_file_exists(file_path) # parse file with open(file_path, 'r') as source_file: # make sure there is at least one newline at the end data = source_file.read() + "\n" # determine which coordinates are used in the ZMAT if "=" in data: self.kind = "internal" # process the input if self.kind is "internal": # split along the blank line b = data.split("\n\n") # this is the relational ZMAT info self._internal = b[0] + "\n" # create a dictionary that maps the angles/bonds to their values self._coord = dict([map(str.strip, x.split('=')) for x in b[1].splitlines()]) elif self.kind is "cartesian": self._cartesian = data + "\n" else: s = "Invalid ZMAT file located at {:s}" raise Exception(s.format(file_path)) return
[docs] def get(self, opt_zmat=False): """returns a string which represents the coordinates""" # creates a string from zmat and self.coord # that represents a valid ZMAT file if self.kind is "internal": completeZMAT = self._internal + "\n" # if we aren't optimizing then remove the asterisks if not opt_zmat: completeZMAT = completeZMAT.replace("*", "") # add the geometry values for key, value in self._coord.items(): completeZMAT += key+"="+value+"\n" # need a blank line coordinate list completeZMAT += "\n" return completeZMAT # no modification is needed for the cartesian version elif self.kind is "cartesian": return self._cartesian
[docs] def set(self, geometry_new): """updates geometry data""" # just update the dict values if self.kind is "internal": self._coord.update(geometry_new) # special treatment is necessary elif self.kind is "cartesian": geometry_new = [line for line in geometry_new.split("\n") if line != ''] geometry_old = [line for line in self._cartesian.split("\n") if line != ''] diff = len(geometry_old) - len(geometry_new) if diff == 0: self._cartesian = "\n".join(geometry_new) + "\n\n" elif diff > 0: for i in range(0, len(geometry_new)): geometry_old[i+diff] = geometry_new[i] self._cartesian = "\n".join(geometry_old) + "\n\n" else: s = "The new geometries have MORE? atoms than before???"\ + "Old geometry\n{:s}\nNew Geometry\n{:s}\n" raise Exception(s.format(self._cartesian, geometry_new)) return return
[docs]class VibronExecutionClass: """handles all details related to the execution""" # the molecule of interest name = None # ZmatClass object that contains the ZMAT information zmat = None # the directory in which we execute jobs path_root = None # parameters parameter_dictionary = { "calculation": None, "theory": None, "basis": None, "DROPMO": "", "NIP": None, "gridVibron": 3, # or 4 "ccConv": 10, "scfConv": 10, "estateTol": 10, } file_in = {} file_out = {} # our cut off for % singles singles_cutoff = 85.00 # our cut off for % triples triples_cutoff = 92.00 # State enum that corresponds to location in execution workflow state = None def __init__(self, name, root, state=None): self.name = name self.path_root = root self.file_in = default_file_in.copy() self.file_out = default_file_out.copy() self.path_state = join(self.path_root, name_of_state_file) # fill in names for key, val in self.file_in.items(): self.file_in[key] = val.format(self.name) for key, val in self.file_out.items(): self.file_out[key] = val.format(self.name) if state is not None: self.state = State(int(state)) return
[docs] def record_state(self): with open(self.path_state, 'a') as file_state: string_state = str(self.state.name)+"\n" file_state.write(string_state) return
[docs] def advance(self): """increase state counter and log it IF not already at the highest state""" # create state file if it doesn't exist if not isfile(self.path_state): s = "State file {:s} doesn't exist, creating a blank file" log.debug(s.format(self.path_state)) open(self.path_state, 'w').close() self.record_state() # advance or write down that we are complete if self.state == State.FINISHED: # we have finished! self.record_state() elif self.state.value < State.max(): old_state = self.state self.state = State(old_state.value + 1) s = "Advancing from state {:s} to state {:s}" log.debug(s.format(old_state.name, self.state.name)) else: raise Exception("This code should not be executed") return
[docs] def setup_zmat(self, file_path=None): """assumes that you formatted the ZMAT file correctly""" # Default is to assume parameter file is in /path_root/* if file_path is None: file_path = join(self.path_root, # "parameters", self.file_in["zmat"] ) self.zmat = ZmatClass(file_path) return
def _set_state_from_file(self): """ sets self.state to the value stored in the file located at self.path_state if the file is empty it defaults to the initial state State.min()""" log.debug("Attempting to set state using state file") if os.stat(self.path_state).st_size == 0: s = "State file {:s} is empty, defaulting to initial State" log.debug(s.format(self.path_state)) self.state = State(State.min()) return with open(self.path_state, 'r') as file_state: lines = file_state.readlines() for line in reversed(lines): # I believe there is a cleaner way to do this, should clean this up # low priority if line in ["", "\n"]: continue old_state = str(line.strip()) if old_state == State(State.max()): self.state = State(State.max()) elif old_state in State: self.state = State(State(old_state).value + 1) for member in State: if member.name == str(line.strip()): self.state = State(member.value + 1) break else: s = ("State file {:s} contained an invalid state: {:s}" "Defaulting to initial state" ) log.debug(s.format(line, self.path_state)) self.state = State(State.min()) break return
[docs] def parse_input_parameters(self, file_path=None): """loads data into the object""" # Default is to assume parameter file is in /path_root/* if file_path is None: file_path = join(self.path_root, # "parameters", self.file_in["parameter"] ) # attempt to read in file try: verify_file_exists(file_path) with open(file_path, 'r') as source_file: for line in source_file: # if line is not blank AND is not a comment if bool(line.strip()) ^ bool(line.startswith("#")): header = line.strip() data = next(source_file).strip() if header not in self.parameter_dictionary.keys(): s = "Header {:s} is not valid\n Check that your {:s} has the correct formatting" raise ValueError(s.format(header, file_path)) self.parameter_dictionary[header] = data except ValueError as error_object: raise error_object # check the validity of the parameter file assert self.parameter_dictionary["basis"] in basis_list, "invalid parameter provided for basis" assert self.parameter_dictionary["theory"] in theory_list, "invalid parameter provided for theory" assert self.parameter_dictionary["calculation"] in calculations_list, "invalid parameter provided for calculation" # create state file if it doesn't exist if not isfile(self.path_state): s = "State file {:s} doesn't exist, creating a blank file" log.debug(s.format(self.path_state)) open(self.path_state, 'w').close() # retrieve the previous state which was saved if self.state is None: self._set_state_from_file() return
[docs] def hf(self, matching_state): """wrapper method""" if self.state == matching_state: hartree_fock_calculation(self.path_root, self.name, self.zmat, self.parameter_dictionary, self.state.name) self.advance() return
[docs] def geom(self): """wrapper method""" if self.state == State.OPT: geometry_optimization(self.path_root, self.name, self.zmat, self.parameter_dictionary) self.advance() return
[docs] def vib(self): """wrapper method""" if self.state == State.VIB: vibrational_frequency(self.path_root, self.name, self.zmat, self.parameter_dictionary) self.advance() return
[docs] def ip_calc(self): """wrapper method""" if self.state == State.IPEOMCC: ip_calculation(self.path_root, self.name, self.zmat, self.parameter_dictionary) return
[docs] def prepVibron(self): """wrapper method""" if self.state == State.PREPVIB: prepVibron_calculation(self.path_root, self.name, self.zmat, self.parameter_dictionary) self.advance() return
[docs] def vibron_calc(self): """wrapper method""" if self.state == State.VIBRON: vibron_calculation(self.path_root, self.name, self.zmat, self.parameter_dictionary) self.advance() return
[docs] def calculate_vibronic_model(self): # prepare signal handler - this is used to synchronize with the submitted jobs signal.signal(signal.SIGUSR1, job_boss.SIGUSR1_handle) self.parse_input_parameters() self.setup_zmat() if self.state.value < State.max(): # self.hf(State.DROPMO) # parse_hartree_fock_output(self, State.DROPMO) self.geom() parse_opt_output(self) self.hf(State.NIP) parse_hartree_fock_output(self, State.NIP) self.vib() verify_ip_states(self) self.prepVibron() parse_prepVibron_output(self) self.vibron_calc() parse_vibron_output(self) self.advance() # important to do after parsing vibron_output log.info("We have reached the end of execution.") return
[docs]def test_one(molecule_name="ch2o", inital_job_state=0): """runs the job in the current directory stores results in folder named by the molecule""" # create execution directory path_root = os.path.dirname(os.path.realpath(__file__)) path_root = join(path_root, molecule_name) os.makedirs(path_root, exist_ok=True) job = VibronExecutionClass(molecule_name, path_root, inital_job_state) job.calculate_vibronic_model() return
# need a better naming - distinguishing scheme between # build_vibronic_model and calculate_vibronic_model
[docs]def calculate_vibronic_model_wrapper_one(data_set_num, molecule_name="ch2o", inital_job_state=0): """executes jobs using exists data_set_# file structure""" # create execution directory path_root = '/work/ngraymon/pimc/data_set_{:d}/electronic_structure/' os.makedirs(path_root, exist_ok=True) job = VibronExecutionClass(molecule_name, path_root, inital_job_state) job.calculate_vibronic_model() return
[docs]def main(): # we provide a specific named file if len(sys.argv) is 2: test_one(str(sys.argv[1])) # we provide a specific named file # and we provide a integer parameter # which indicates what stage of the calculation to start at elif len(sys.argv) is 3: test_one(str(sys.argv[1]), int(sys.argv[2])) else: test_one()
if (__name__ == "__main__"): main()