From 6d084b98235bf1799e17fc17aad7ab9894621915 Mon Sep 17 00:00:00 2001 From: Pinapelz Date: Sat, 13 Apr 2024 01:20:53 -0700 Subject: change backend API to a submodule --- backend/sql/pg_handler.py | 169 -------------------------------------------- backend/sql/sql_handler.py | 171 --------------------------------------------- 2 files changed, 340 deletions(-) delete mode 100644 backend/sql/pg_handler.py delete mode 100644 backend/sql/sql_handler.py (limited to 'backend/sql') diff --git a/backend/sql/pg_handler.py b/backend/sql/pg_handler.py deleted file mode 100644 index 74a9170..0000000 --- a/backend/sql/pg_handler.py +++ /dev/null @@ -1,169 +0,0 @@ -import psycopg2 -from psycopg2 import Error - -class PostgresHandler: - def __init__(self, username: str, password: str, host_name: str, port: int, database: str): - db_params = { - "dbname": database, - "user": username, - "password": password, - "host": host_name, - "port": port - } - self._connection = psycopg2.connect(**db_params) - print("Handler Success") - - def get_connection(self): - return self._connection - - def create_table(self, name: str, column: str): - cursor = self._connection.cursor() - cursor.execute(f'CREATE TABLE IF NOT EXISTS "{name}" ({column})') - self._connection.commit() - cursor.close() - - def clear_table(self, name: str): - cursor = self._connection.cursor() - cursor.execute(f"DELETE FROM {name}") - self._connection.commit() - cursor.close() - - def check_row_exists(self, table_name: str, column_name: str, value: str): - cursor = self._connection.cursor() - query = f'SELECT 1 FROM "{table_name}" WHERE {column_name} = %s' - cursor.execute(query, (value,)) - result = cursor.fetchone() - cursor.close() - - if result is not None: - return True - else: - return False - - def insert_row(self, table_name, column, data): - try: - cursor = self._connection.cursor() - placeholders = ', '.join(['%s'] * len(data)) - query = f'INSERT INTO "{table_name}" ({column}) VALUES ({placeholders})' - cursor.execute(query, data) - self._connection.commit() - print("Data Inserted:", data) - except Error as err: - self._connection.rollback() - print("Error inserting data") - print(err) - if "duplicate key" not in str(err).lower(): - return False - return True - - def update_row(self, table_name: str, column: str, value: str, update_column: str, update_value: str): - try: - cursor = self._connection.cursor() - query = f'UPDATE "{table_name}" SET {update_column} = %s WHERE {column} = %s' - cursor.execute(query, (update_value, value)) - self._connection.commit() - print("Data Updated:", value, update_value) - except Error as e: - self._connection.rollback() - print(f"Failed to update row from {table_name} WHERE {column} is {value}") - print(e) - return False - return True - - def get_rows(self, table_name: str, column: str, value: str): - try: - cursor = self._connection.cursor() - query = f'SELECT * FROM "{table_name}" WHERE {column} = %s' - cursor.execute(query, (value,)) - result = cursor.fetchall() - return result - except Error as e: - self._connection.rollback() - print(f"Failed to fetch row from {table_name} WHERE {column} is {value}") - print(e) - return False - - def get_random_row(self, table_name: str, count: int, condition: str = None): - if condition is None: - condition = "1 = 1" - try: - cursor = self._connection.cursor() - query = f"SELECT * FROM {table_name} WHERE {condition} ORDER BY RANDOM() LIMIT {str(count)}" - cursor.execute(query) - result = cursor.fetchall() - return result - except Error as e: - self._connection.rollback() - print(f"Failed to select random rows from {table_name}") - print(e) - return False - - def check_health(self): - cursor = self._connection.cursor() - cursor.execute("SELECT 1") - result = cursor.fetchone() - cursor.close() - if result is not None: - return True - else: - return False - - def delete_row(self, table_name: str, column: str, value: str): - try: - cursor = self._connection.cursor() - query = f"DELETE FROM {table_name} WHERE {column} = %s" - cursor.execute(query, (value,)) - self._connection.commit() - print("Data Deleted:", value) - except Error as e: - self._connection.rollback() - print(f"Failed to delete row from {table_name} WHERE {column} is {value}") - print(e) - return False - return True - - def execute_query(self, query: str, data: tuple = None): - try: - cursor = self._connection.cursor() - if data is None: - cursor.execute(query) - else: - cursor.execute(query, data) - result = cursor.fetchall() - return result - except Error as e: - self._connection.rollback() - print(f"Failed to execute query: {query}") - print(e) - return False - - def reset_auto_increment(self, table_name: str): - try: - cursor = self._connection.cursor() - query = f"ALTER SEQUENCE {table_name}_id RESTART WITH 1" - cursor.execute(query) - self._connection.commit() - print("Auto Increment Reset") - except Error as e: - self._connection.rollback() - print(f"Failed to reset auto increment for {table_name}") - print(e) - return False - return True - - def get_most_recently_added_row_time(self, table_name: str): - try: - cursor = self._connection.cursor() - query = f"SELECT timestamp FROM {table_name} ORDER BY id DESC LIMIT 1" - cursor.execute(query) - result = cursor.fetchone() - return result - except Error as e: - self._connection.rollback() - print(f"Failed to get most recently added row from {table_name}") - print(e) - return False - - - def close_connection(self): - self._connection.close() \ No newline at end of file diff --git a/backend/sql/sql_handler.py b/backend/sql/sql_handler.py deleted file mode 100644 index 82f071d..0000000 --- a/backend/sql/sql_handler.py +++ /dev/null @@ -1,171 +0,0 @@ -import mysql.connector -from mysql.connector import Error, errorcode - - -class SQLHandler: - def __init__(self, host_name: str, user_name: str, user_password: str, database_name: str): - self.host_name = host_name - self.username = user_name - self.password = user_password - self.database_name = database_name - self.connection = self._create_server_connection( - host_name, user_name, user_password) - self._load_database(database_name) - - def _create_server_connection(self, host_name: str, user_name: str, user_password: str) -> mysql.connector: - connection = None - try: - connection = mysql.connector.connect(host=host_name, user=user_name, passwd=user_password) - print("MySQL Database connection successful") - except Error as err: - print(f"Error: '{err}'") - return connection - - def get_connection(self): - return self.connection - - def _create_database(self, cursor: str, database_name: str): - try: - cursor.execute( - f"CREATE DATABASE {database_name} DEFAULT CHARACTER SET 'utf8'") - except Error as err: - print(f"Failed creating database: {err}") - exit(1) - - def _load_database(self, database_name: str): - cursor = self.connection.cursor(buffered=True) - try: - cursor.execute(f"USE {database_name}") - print(f"Database {database_name} loaded successfully") - except Error as err: - print(f"Database {database_name} does not exist") - if err.errno == errorcode.ER_BAD_DB_ERROR: - self._create_database(cursor, database_name) - print(f"Database {database_name} created successfully") - self.connection.database = database_name - else: - print(err) - exit(1) - - def create_table(self, name: str, column: str): - cursor = self.connection.cursor(buffered=True) - try: - cursor.execute(f"CREATE TABLE {name} ({column})") - print(f"Table {name} created successfully") - except Error as err: - print(err) - - def insert_row(self, name: str, column: str, data: tuple): - cursor = self.connection.cursor(buffered=True) - try: - placeholders = ', '.join(['%s'] * len(data)) - query = f"INSERT INTO {name} ({column}) VALUES ({placeholders})" - cursor.execute(query, data) - self.connection.commit() - print("Data Inserted:", data, "into", name) - except Error as err: - print("Error inserting data") - print(err) - - - def clear_table(self, name: str): - cursor = self.connection.cursor(buffered=True) - try: - cursor.execute(f"DELETE FROM {name}") - self.connection.commit() - print("Table cleared successfully") - except Error as err: - print("Error clearing table") - print(err) - - def reset_auto_increment(self, name: str): - cursor = self.connection.cursor(buffered=True) - try: - cursor.execute(f"ALTER TABLE {name} AUTO_INCREMENT = 1") - self.connection.commit() - print("Table reset successfully") - except Error as err: - print("Error resetting table") - print(err) - - def copy_rows_to_new_table(self, name: str, new_name: str, column: str): - cursor = self.connection.cursor(buffered=True) - try: - cursor.execute( - f"INSERT INTO {new_name} ({column}) SELECT {column} FROM {name}") - cursor.execute( - f"ALTER TABLE {new_name} MODIFY COLUMN id INT AUTO_INCREMENT") - self.connection.commit() - print("Rows copied successfully") - except Error as err: - print("Error copying rows") - print(err) - - def drop_table(self, name: str): - cursor = self.connection.cursor(buffered=True) - try: - cursor.execute(f"DROP TABLE {name}") - self.connection.commit() - print("Table dropped successfully") - except Error as err: - print("Error dropping table") - print(err) - - def check_row_exists(self, name: str, column_name: str, value: str): - """ - Checks if a row exists in a table - """ - cursor = self.connection.cursor(buffered=True) - try: - cursor.execute(f"SELECT * FROM {name} WHERE {column_name} = '{value}'") - result = cursor.fetchone() - if result: - return True - else: - return False - except Error as err: - print("Error checking row") - print(err) - - def update_row(self, name: str, column_name: str, search_val: str, replace_col:str, new_value: str): - """ - Updates a row in a table - """ - cursor = self.connection.cursor(buffered=True) - try: - cursor.execute(f"UPDATE {name} SET {replace_col} = '{new_value}' WHERE {column_name} = '{search_val}'") - self.connection.commit() - print("Row updated successfully") - except Error as err: - print("Error updating row") - print(err) - - def execute_query(self, query: str, data: tuple = None): - cursor = self.connection.cursor(buffered=True) - if data: - try: - cursor.execute(query, data) - result = cursor.fetchall() - return result - except Error as err: - print("Error executing query") - print(err) - return None - try: - cursor.execute(query) - result = cursor.fetchall() - return result - except Error as err: - print("Error executing query") - print(err) - - def get_query_result(self, query: str): - cursor = self.connection.cursor(buffered=True) - try: - cursor.execute(query) - result = cursor.fetchall() - return result - except Error as err: - print("Error executing query") - print(err) - \ No newline at end of file -- cgit v1.2.3