Source code for pibronic.data.postprocessing
"""
Handles all file processing after calculations are run
Provides functions for collating files for statistical analaysis (such as jackknife) and plotting
"""
# system imports
# import multiprocessing as mp
# import itertools as it
import collections
# import subprocess
# import socket
import json
import glob
# import sys
import os
from os.path import join
# third party imports
import numpy as np
# from numpy import newaxis as NEW
# from numpy import float64 as F64
# local imports
# from ..data import vibronic_model_io as vIO
# from .. import constants
# from ..constants import hbar
from ..log_conf import log
from ..data import file_name
# from ..data import file_structure
# from ..server import job_boss
# TODO - should these single file functions exist? is there enough need to justify their use?
# def retrive_a_pimc_file(files):
# """verify that a specific pimc file exists and retrives the path to it"""
# globPath = FS.path_rho_results + file_name.jackknife(P="*", T="*", X="*")
# return glob.glob(globPath)
# def retrive_a_sos_coupled_file(files):
# """verify that a specific sos(coupled) file exists and retrives the path to it"""
# globPath = FS.path_vib_params + file_name.sos("*")
# return glob.glob(globPath)
# def retrive_a_sos_sampling_file(files):
# """verify that a specific sos(sampling) file exists and retrives the path to it"""
# globPath = FS.path_rho_params + file_name.sos("*")
# return glob.glob(globPath)
[docs]def retrive_pimc_file_list(FS):
"""return a list of the full path to each pimc file that might be used"""
globPath = FS.path_rho_results + file_name.pimc(P="*", T="*", J="*")
return [file for file in glob.glob(globPath)]
[docs]def retrive_jackknife_file_list(FS):
"""return a list of the full path to each jackknife file that might be used"""
globPath = FS.path_rho_results + file_name.jackknife(P="*", T="*", X="*")
return [file for file in glob.glob(globPath)]
[docs]def retrive_sos_coupled_file_list(FS, B="*"):
"""return a list of the full path to each sos(coupled) file that might be used"""
globPath = FS.path_vib_params + file_name.sos("*")
return [file for file in glob.glob(globPath)]
[docs]def retrive_sos_sampling_file_list(FS, B="*"):
"""return a list of the full path to each sos(sampling) file that might be used"""
globPath = FS.path_rho_params + file_name.sos("*")
return [file for file in glob.glob(globPath)]
[docs]def retrive_all_file_paths(FS):
"""return three lists of the full paths to each data file that might be used"""
list_pimc = retrive_pimc_file_list(FS)
list_jackknife = retrive_jackknife_file_list(FS)
list_sos_vib = retrive_sos_coupled_file_list(FS)
list_sos_rho = retrive_sos_sampling_file_list(FS)
return list_pimc, list_jackknife, list_sos_vib, list_sos_rho
# note that the way the split()'s are coded for all the following extract_ functions
# will pose problems if the naming scheme for the files is changed
# it might be good to eventually link them to the file_name module in the future
[docs]def extract_bead_value_from_trotter_file_path(file_path):
""" does what it says, returns an int """
return int(file_path.split("trotter_P")[1].split("_B"))
[docs]def extract_trotter_paramater_list(list_trotter):
""" takes a list of file-paths to results from trotter calculations and return a dictionary with integer keys representing possible basis sizes, whose corresponding values are lists of all possible bead values for that given basis size. """
tL = map(extract_bead_value_from_trotter_file_path, list_trotter)
trotterDict = collections.defaultdict(list)
for path in tL:
trotterDict[int(path[1].split(".json")[0])].append(int(path[0]))
return trotterDict
[docs]def extract_basis_value_from_sos_file_path(file_path):
""" does what it says, returns an int """
return int(file_path.split("_B")[1].split(".json")[0])
[docs]def extract_sos_basis_paramater_list(list_vib, list_rho):
""" takes a list of file-paths to results from sos calculations (both coupled and rho) and return a list of all the unique """
cL = map(extract_basis_value_from_sos_file_path, list_vib)
sL = map(extract_basis_value_from_sos_file_path, list_rho)
list_sos = list(set(cL) & set(sL))
list_sos.sort()
return list_sos
[docs]def extract_bead_value_from_pimc_file_path(file_path):
""" does what it says, returns an int """
return int(file_path.split("P")[1].split("_T")[0])
[docs]def extract_bead_paramater_list(list_pimc):
""" takes a list of file-paths to results from pimc calculations and return a list of all the unique bead values"""
pL = map(extract_bead_value_from_pimc_file_path, list_pimc)
list_bead = list(set(pL)) # the use of the set object removes all duplicate elements
list_bead.sort()
return list_bead
[docs]def extract_temperature_value_from_pimc_file_path(file_path):
""" does what it says, returns a float"""
return float(file_path.split("_T")[1].split("_J")[0])
[docs]def extract_temperature_paramater_list(list_pimc):
""" takes a list of file-paths to results from pimc calculations and return a list of all the unique temperature values (as floats)"""
# this option would be to specifically only select temperatures from thermo files instead of just generally from all output files, it is not clear which is better
# tempL = map(lambda path: int(path.split("_T")[1].split("_thermo")[0]), list_pimc)
tempL = map(extract_temperature_value_from_pimc_file_path, list_pimc)
list_temperature = list(set(tempL)) # the use of the set object removes all duplicate elements
list_temperature.sort()
return list_temperature
[docs]def extract_job_value_from_pimc_file_path(file_path):
""" does what it says, returns an int"""
return int(file_path.split("_J")[1].split("_data_")[0])
[docs]def extract_parameter_lists(list_pimc, list_vib, list_rho):
""" just assume that we directly use the extract_trotter_paramater_list() function for now """
bL = extract_sos_basis_paramater_list(list_vib, list_rho)
pL = extract_bead_paramater_list(list_pimc)
tL = extract_temperature_paramater_list(list_pimc)
return pL, tL, bL
[docs]def extract_bead_value_from_thermo_file_path(file_path):
""" does what it says, returns an int """
return int(file_path.split("P")[1].split("_T")[0])
[docs]def extract_temperature_value_from_thermo_file_path(file_path):
""" does what it says, returns a float"""
return float(file_path.split("_T")[1].split("_X")[0])
[docs]def extract_sample_value_from_thermo_file_path(file_path):
""" does what it says, returns an int"""
return int(file_path.split("_X")[1].split("_thermo")[0])
[docs]def prune_results_using_hashes(FS, list_pimc):
""" takes a list of file paths (strings) to different results and a FileStructure object
returns a subset of the input list where each returned file path exists and has a
'valid' hash, i.e. the same as in the FileStructure object"""
output_list = []
for file_path in list_pimc:
if file_path[-4:] == '.npz':
with np.load(file_path, mmap_mode='r') as file:
if ('hash_vib' not in file) or ('hash_rho' not in file):
continue
if file['hash_vib'] == FS.hash_vib and file['hash_rho'] == FS.hash_rho:
output_list.append(file_path)
elif file_path[-6:] == 'thermo':
with open(file_path, 'r') as file:
data = json.loads(file.read())
if ('hash_vib' not in data) or ('hash_rho' not in data):
continue
if data['hash_vib'] == FS.hash_vib and data['hash_rho'] == FS.hash_rho:
output_list.append(file_path)
else:
raise Exception("file path {:s} undefined".format(file_path))
return output_list
[docs]def retrive_file_paths_for_jackknife(FS):
"""return three lists of the full paths to each data file that might be used"""
list_pimc = retrive_pimc_file_list(FS)
list_sos_vib = retrive_sos_coupled_file_list(FS)
list_sos_rho = retrive_sos_sampling_file_list(FS)
return list_pimc, list_sos_vib, list_sos_rho
# I believe this function is decommissioned for the time being
[docs]def extract_pimc_parameters(list_pimc, list_coupled, list_sampling):
"""make a list of all parameters whose dependencies are satisfied
note that this function is tightly tied to the file name
"""
# note that the way these splits are coded will pose problems if the naming scheme for sos is changed in the future
value_dict = {"pimc_beads": 0, "basis_fxns": 0, "temperatures": 0}
cL = map(lambda path: int(path.split("_B")[1].split(".json")[0]), list_coupled)
sL = map(lambda path: int(path.split("_B")[1].split(".json")[0]), list_sampling)
# parse file paths to find shared #'s' of basis functions
value_dict["basis_fxns"] = list(set(cL) & set(sL))
value_dict["basis_fxns"].sort()
log.debug(value_dict["basis_fxns"])
# parse file paths to find shared temperature values
# for now we will leave this partially undeveloped
tempL = map(lambda path: float(path.split("_T")[1].split("_X")[0]), list_pimc)
# need to add thing here that checks temperatures inside sos file
value_dict["temperatures"] = list(set(tempL))
value_dict["temperatures"].sort()
log.debug(value_dict["temperatures"])
# parse file paths to find shared sample values
xL = map(lambda path: int(path.split("_X")[1].split("_thermo")[0]), list_pimc)
value_dict["samples"] = list(set(xL))
value_dict["samples"].sort()
log.debug(value_dict["samples"])
# parse file paths to find pimc bead values
pL = map(lambda path: int(path.split("/P")[1].split("_T")[0]), list_pimc)
value_dict["pimc_beads"] = []
for p in pL:
if p not in value_dict["pimc_beads"]:
value_dict["pimc_beads"].append(p)
value_dict["pimc_beads"].sort()
log.debug(value_dict["pimc_beads"])
return value_dict
[docs]def extract_jackknife_parameters(list_pimc, list_coupled, list_sampling):
"""make a list of all parameters whose dependencies are satisfied
note that this function is tightly tied to the file name
"""
# note that the way these splits are coded will pose problems if the naming scheme for sos is changed in the future
value_dict = {"pimc_beads": 0, "basis_fxns": 0, "temperatures": 0}
cL = map(lambda path: int(path.split("_B")[1].split(".json")[0]), list_coupled)
sL = map(lambda path: int(path.split("_B")[1].split(".json")[0]), list_sampling)
# parse file paths to find shared #'s' of basis functions
value_dict["basis_fxns"] = list(set(cL) & set(sL))
value_dict["basis_fxns"].sort()
log.debug(value_dict["basis_fxns"])
# parse file paths to find shared temperature values
# for now we will leave this partially undeveloped
tempL = map(lambda path: float(path.split("_T")[1].split("_J")[0]), list_pimc)
# need to add thing here that checks temperatures inside sos file
value_dict["temperatures"] = list(set(tempL))
value_dict["temperatures"].sort()
log.debug(value_dict["temperatures"])
# # parse file paths to find shared sample values
# xL = map(lambda path: int(path.split("_X")[1].split("_P")[0]), list_pimc)
# value_dict["samples"] = list(set(xL))
# value_dict["samples"].sort()
# log.debug(value_dict["samples"])
# parse file paths to find pimc bead values
pL = map(lambda path: int(path.split("results/P")[1].split("_T")[0]), list_pimc)
value_dict["pimc_beads"] = []
for p in pL:
if p not in value_dict["pimc_beads"]:
value_dict["pimc_beads"].append(p)
value_dict["pimc_beads"].sort()
log.debug(value_dict["pimc_beads"])
return value_dict
[docs]def load_pimc_data(FS, P, T, pimc_results):
""" load data from all files with same P and T"""
"""
this usuage of FS.template_pimc.format(P=P, T=T, J="*") raises a good question about the design!
possible ways to implement:
- some combination of partial
- write function for each template_* member of file_structure to replace .format()
- write function for file_name
currently solved by:
self.template_pimc = self.path_rho_results + file_name.pimc(J="{J:s}")
"""
path_data_points = FS.template_pimc.format(P=P, T=T, J="*")
list_of_files = [file for file in glob.glob(path_data_points)]
pimc_results.load_multiple_results(list_of_files)
# TODO - add support for passing in the optional desired_number_of_samples paramter to the load_multiple_results method
return
[docs]def load_analytic_data(FS, T, analytic):
""" load data from analytic_results.txt with same T into the dictionary analytic"""
path = FS.path_analytic_rho
try:
assert os.path.isfile(path), f"This file doesn't exist:\n{path:s}"
with open(path, "r") as file:
in_dict = json.loads(file.read())
print(path, in_dict)
# make sure the analytic data is up to date!!
assert in_dict["hash_vib"] == FS.hash_vib, "wrong vib hash"
assert in_dict["hash_rho"] == FS.hash_rho, "wrong rho hash"
# TODO - should make a class function in a new module that handles analytic stuff ??
temperature = f"{T:.2f}"
assert temperature in in_dict.keys(), "no analytical results for temperature {:s} in file {:s}".format(temperature, path)
analytic["Z"] = in_dict[temperature]["Z_sampling"]
analytic["E"] = in_dict[temperature]["E_sampling"]
analytic["Cv"] = in_dict[temperature]["Cv_sampling"]
analytic["alpha_plus"] = analytic["Z"] / in_dict[temperature]["Z_sampling+beta"]
analytic["alpha_minus"] = analytic["Z"] / in_dict[temperature]["Z_sampling-beta"]
except OSError as err:
# skip if we cannot obtain all the necessary data
print("Skipped data from {:s} at temperature {:.2f}".format(path, T))
raise err # we cannot proceed at the moment since stats.py needs this file
return
return
[docs]def load_rho_sos_data(FS, P, B, T, rho_args):
""" load data from all files with same P and T into the dictionary rho_args"""
path = FS.template_sos_rho.format(B=B)
try:
assert os.path.isfile(path), f"This file doesn't exist:\n{path:s}"
with open(path, "r") as file:
rho_dict = json.loads(file.read())
# TODO - should make a class function in a new module that handles sos stuff ??
input_temp_index = rho_dict["temperature"].index(T)
# make sure the temperature matches
assert T == rho_dict["temperature"][input_temp_index], "different temperatures"
rho_args["Z"] = rho_dict["Z_sampling"][input_temp_index]
rho_args["E"] = rho_dict["E_sampling"][input_temp_index]
rho_args["Cv"] = rho_dict["Cv_sampling"][input_temp_index]
rho_args["alpha_plus"] = rho_args["Z"] / rho_dict["Z_sampling+beta"][input_temp_index]
rho_args["alpha_minus"] = rho_args["Z"] / rho_dict["Z_sampling-beta"][input_temp_index]
except OSError as err:
# skip if we cannot obtain all the necessary data
print("Skipped {:s}".format(path))
return
return