import glob
import os
import functools
from .base_handler import BaseHandler
[docs]class FileSystemHandler(BaseHandler):
def __init__(self, base_log_dir, n_facets, sep):
"""
:param base_log_dir: (str) Path to top level directory for logs
:param n_facets: (int) Number of facets used to describe each unit result
:param sep: (str) Separator used for a result identifier
"""
self.n_facets = n_facets
self.sep = sep
self.success_dir = os.path.join(base_log_dir, 'success')
self.failure_dir = os.path.join(base_log_dir, 'failure')
[docs] def validate(func):
"""
Decorator to check if an identifier is of the correct format
"""
@functools.wraps(func)
def validate_identifier(*args, **kwargs):
identifier = args[1] # Assumes identifier is second argument
if os.sep in identifier:
raise ValueError
return func(*args, **kwargs)
return validate_identifier
def _interpret_error_types(self):
"""
:return: (set) Set of unique return types which aren't 'success'
"""
error_types = [list_name for list_name in os.listdir(self.failure_dir) if os.path.isdir(os.path.join(self.failure_dir, list_name))]
return set(error_types)
def _path_to_identifier(self, path):
"""
Given a full path to a result returns its identifier
:param path: (str) Path to result file
:return: (str) Job identifier
"""
# Getting the last n_facets number of items in the path and joining
# them to create the relative identifier
path_arr = path.split(os.sep)
identifier = self.sep.join(path_arr[-self.n_facets:])
return identifier
def _identifier_to_path(self, identifier, result):
"""
Given an identifier and a result, return a full path to its result file
:param identifier: (str) Identifier of the job result
:param result: (str) Result of the job
:return: (str) Path to result file
"""
id_path = identifier.replace(self.sep, os.sep)
if result == 'success':
return os.path.join(self.success_dir, id_path)
else:
return os.path.join(self.failure_dir, result, id_path)
[docs] @validate
def get_result(self, identifier):
"""
Returns the value of a result given its identifier
:param identifier: (str) Identifier of the job
:return: (str) Result of job
"""
path = self._identifier_to_path(identifier, 'success')
if os.path.exists(path):
return 'success'
error_types = self._interpret_error_types()
for error in error_types:
path = self._identifier_to_path(identifier, error)
if os.path.exists(path):
return error
return None
[docs] def get_all_results(self):
"""
:return: (dict) Dictionary of all job identifiers mapped to their respective results
"""
results = {}
for identifier in self.get_successful_runs():
results[identifier] = 'success'
error_dict = self.get_failed_runs()
for (error_type, identifiers) in error_dict.items():
for identifier in identifiers:
results[identifier] = error_type
return results
[docs] def get_successful_runs(self):
"""
:return: (str list) Returns a list of the identifiers of all successful runs
"""
glob_pattern = os.path.join(self.success_dir, os.sep.join(['*' for _ in range(self.n_facets)]))
files = glob.glob(glob_pattern)
return [self._path_to_identifier(fname) for fname in files]
[docs] def get_failed_runs(self):
"""
:return: (dict) Dictionary of error types mapped to lists of job identifiers which result in them
"""
failures = {}
error_types = self._interpret_error_types()
for error_type in error_types:
glob_pattern = os.path.join(self.failure_dir, error_type, os.sep.join(['*' for _ in range(self.n_facets)]))
files = glob.glob(glob_pattern)
failures[error_type] = [self._path_to_identifier(fname) for fname in files]
return failures
[docs] @validate
def delete_result(self, identifier):
"""
Deletes result file from the file system given its identifier
:param identifier: (str) Identifier of the job
"""
path = self._identifier_to_path(identifier, 'success')
if os.path.exists(path):
os.unlink(path)
error_types = self._interpret_error_types()
for error in error_types:
path = self._identifier_to_path(identifier, error)
if os.path.exists(path):
os.unlink(path)
[docs] def delete_all_results(self):
"""
Deletes all result files in the file system under the
base_log_dir
"""
success_pattern = os.path.join(self.success_dir, os.sep.join(['*' for _ in range(self.n_facets)]))
# failure pattern +1 * to account for failure type dir
failure_pattern = os.path.join(self.failure_dir, os.sep.join(['*' for _ in range(self.n_facets + 1)]))
success_files = glob.glob(success_pattern)
failure_files = glob.glob(failure_pattern)
for success_file in success_files:
os.unlink(success_file)
for failure_file in failure_files:
os.unlink(failure_file)
[docs] @validate
def ran_successfully(self, identifier):
"""
Returns true / false on whether the result with this
identifier is successful
:param identifier: (str) Identifier of the job result
:return: (bool) Boolean on if job ran successfully
"""
path = self._identifier_to_path(identifier, 'success')
return os.path.exists(path)
[docs] def count_results(self):
"""
:return: (int) Number of results in the table
"""
return len(self.get_all_results())
[docs] def count_successes(self):
"""
:return: (int) Number of successful result files
"""
return len(self.get_successful_runs())
[docs] def count_failures(self):
"""
:return: (int) Number of failure result files
"""
size = 0
error_dict = self.get_failed_runs()
for error in error_dict.keys():
size += len(error_dict[error])
return size
[docs] @validate
def insert_success(self, identifier):
"""
Creates a successful result file with the identifier passed
:param identifier: (str) Identifier of the job
"""
path = self._identifier_to_path(identifier, 'success')
dr = os.path.dirname(path)
if not os.path.isdir(dr):
os.makedirs(dr)
with open(path, 'w') as writer:
writer.write(f'{identifier} ran successfully' )
[docs] @validate
def insert_failure(self, identifier, error_type):
"""
Creates a failure result file using the identifier
and error type parsed
:param identifier: (str) Identifier of the job
:param error_type: (str) Result of the job
"""
path = self._identifier_to_path(identifier, error_type)
dr = os.path.dirname(path)
if not os.path.isdir(dr):
os.makedirs(dr)
with open(path, 'w') as writer:
writer.write(f'{error_type} has occurred for {identifier}')