Source code for abcunit_backend.database_handler

import psycopg2
import os
from .base_handler import BaseHandler


[docs]class DataBaseHandler(BaseHandler): _max_id_length = 255 _max_result_length = 255 def __init__(self, table_name='results'): """ :param table_name: (str) Optional string name of the table logs will be insert into (default is 'results') """ self.connection_info = os.environ.get("ABCUNIT_DB_SETTINGS") if not self.connection_info: raise KeyError('Please create environment variable ABCUNIT_DB_SETTINGS' 'in for format of "dbname=<db_name> user=<user_name>' 'host=<host_name> password=<password>"') self._test_connection() self.table_name = table_name self._create_table() def _test_connection(self): try: conn = psycopg2.connect(self.connection_info) except psycopg2.Error as err: print(err) raise ValueError('ABCUNIT_DB_SETTINGS string is incorrect. Should be' 'in for format of "dbname=<db_name> user=<user_name>' 'host=<host_name> password=<password>"') conn.close() def _create_table(self): """ Creates a table called <self.table_name> with primary key id varchar(255) and result varchar(255), if one does not already exist """ with psycopg2.connect(self.connection_info) as conn: with conn.cursor() as cur: cur.execute(f'CREATE TABLE IF NOT EXISTS {self.table_name}' f'(id varchar({self._max_id_length}) PRIMARY KEY, ' f'result varchar({self._max_result_length}) NOT NULL);') conn.commit() def _delete_table(self): """ Drops the database table """ with psycopg2.connect(self.connection_info) as conn: with conn.cursor() as cur: cur.execute(f"DROP TABLE {self.table_name};") conn.commit()
[docs] def get_result(self, identifier): """ Selects the result of the job with the identifier parsed and returns it :param identifier: (str) Identifier of the job result :return: (str) Result of job """ query = f"SELECT result FROM {self.table_name} " \ f"WHERE id='{identifier}';" with psycopg2.connect(self.connection_info) as conn: with conn.cursor() as cur: cur.execute(query) if cur.rowcount > 0: return cur.fetchone()[0] return None
[docs] def get_all_results(self): """ :return: (dict) Dictionary of all job identifiers mapped to their respective results """ query = f"SELECT * FROM {self.table_name}" with psycopg2.connect(self.connection_info) as conn: with conn.cursor() as cur: cur.execute(query) result_dict = {} for (name, result) in cur: result_dict[name] = result return result_dict
[docs] def get_successful_runs(self): """ :return: (str list) Returns a list of the identifiers of all successful runs """ query = f"SELECT id FROM {self.table_name} " \ "WHERE result='success';" with psycopg2.connect(self.connection_info) as conn: with conn.cursor() as cur: cur.execute(query) return [name[0] for name in cur]
[docs] def get_failed_runs(self): """ :return: (dict) Dictionary of error types mapped to lists of job identifiers which result in them """ query = f"SELECT id, result FROM {self.table_name} " \ "WHERE result<>'success';" with psycopg2.connect(self.connection_info) as conn: with conn.cursor() as cur: cur.execute(query) failures = {} for (name, result) in cur: failures.setdefault(result, []) failures[result].append(name) return failures
[docs] def delete_result(self, identifier): """ Deletes entry specified by the given identifier from the database :param identifier: (str) Identifier of the job """ query = f"DELETE FROM {self.table_name} " \ f"WHERE id='{identifier}';" with psycopg2.connect(self.connection_info) as conn: with conn.cursor() as cur: cur.execute(query) conn.commit()
[docs] def delete_all_results(self): """ Deletes all entries from the table """ with psycopg2.connect(self.connection_info) as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {self.table_name};") conn.commit()
[docs] def ran_successfully(self, identifier): """ Returns true / false on whether the result with this identifier is successful :param identifier: (str) Identifier of the job result :return: (bool) Boolean on if job ran successfully """ query = f"SELECT result FROM {self.table_name} " \ f"WHERE id='{identifier}';" with psycopg2.connect(self.connection_info) as conn: with conn.cursor() as cur: cur.execute(query) result = cur.fetchone() if result is not None: return result[0] == 'success' return False
[docs] def count_results(self): """ :return: (int) Number of results in the table """ with psycopg2.connect(self.connection_info) as conn: with conn.cursor() as cur: cur.execute(f"SELECT COUNT(*) FROM {self.table_name};") return cur.fetchone()[0]
[docs] def count_successes(self): """ :return: (int) Number of successfull results in the table """ query = f"SELECT COUNT(*) FROM {self.table_name} " \ "WHERE result='success';" with psycopg2.connect(self.connection_info) as conn: with conn.cursor() as cur: cur.execute(query) return cur.fetchone()[0]
[docs] def count_failures(self): """ :return: (int) Number of failed results in the table """ query = f"SELECT COUNT(*) FROM {self.table_name} " \ "WHERE result<>'success';" with psycopg2.connect(self.connection_info) as conn: with conn.cursor() as cur: cur.execute(query) return cur.fetchone()[0]
[docs] def insert_success(self, identifier): """ Inserts an entry into the table with a given identifier and the result 'success' :param identifier: (str) Identifier of the job """ if self.get_result(identifier): self.delete_result(identifier) query = f"INSERT INTO {self.table_name} " \ f"VALUES ('{identifier}', 'success');" with psycopg2.connect(self.connection_info) as conn: with conn.cursor() as cur: cur.execute(query) conn.commit()
[docs] def insert_failure(self, identifier, error_type='failure'): """ Inserts an entry into the table with a given identifier and erroneous result :param identifier: (str) Identifier of the job :param error_type: (str) Result of the job """ if self.get_result(identifier): self.delete_result(identifier) error_type = error_type[:self._max_result_length] query = f"INSERT INTO {self.table_name} " \ f"VALUES ('{identifier}', '{error_type}');" with psycopg2.connect(self.connection_info) as conn: with conn.cursor() as cur: cur.execute(query) conn.commit()