import psycopg2 from psycopg2 import Error class PostgresHandler: def __init__(self, username: str, password: str, host_name: str, port: int, database: str): db_params = { "dbname": database, "user": username, "password": password, "host": host_name, "port": port } self._connection = psycopg2.connect(**db_params) print("Handler Success") def create_table(self, name: str, column: str): cursor = self._connection.cursor() cursor.execute(f"CREATE TABLE IF NOT EXISTS {name} ({column})") self._connection.commit() cursor.close() def clear_table_by_date(self, name: str): cursor = self._connection.cursor() cursor.execute(f"DELETE FROM {name}") self._connection.commit() cursor.close() def clear_old_sessions(self, table_name: str): """Delete sessions older than 15 minutes""" try: cursor = self._connection.cursor() query = f"DELETE FROM {table_name} WHERE created_at <= NOW() - INTERVAL '15 minutes'" cursor.execute(query) deleted_count = cursor.rowcount self._connection.commit() cursor.close() print(f"Deleted {deleted_count} old sessions") return deleted_count except Error as e: self._connection.rollback() print(f"Failed to delete old sessions from {table_name}") print(e) return False def check_row_exists(self, table_name: str, column_name: str, value: str): cursor = self._connection.cursor() query = f"SELECT 1 FROM {table_name} WHERE {column_name} = %s" cursor.execute(query, (value,)) result = cursor.fetchone() cursor.close() if result is not None: return True else: return False def insert_row(self, table_name, column, data): try: cursor = self._connection.cursor() placeholders = ', '.join(['%s'] * len(data)) query = f"INSERT INTO {table_name} ({column}) VALUES ({placeholders})" cursor.execute(query, data) self._connection.commit() print("Data Inserted:", data) except Error as err: self._connection.rollback() print("Error inserting data") print(err) if "duplicate key" not in str(err).lower(): return False return True def get_rows(self, table_name: str, column: str, value: str): try: cursor = self._connection.cursor() query = f"SELECT * FROM {table_name} WHERE {column} = %s" cursor.execute(query, (value,)) result = cursor.fetchall() return result except Error as e: self._connection.rollback() print(f"Failed to fetch row from {table_name} WHERE {column} is {value}") print(e) return False def get_random_rows(self, table_name: str, count: int, condition: str = None, unique_only: bool = False, exclude_ids: list = None): if condition is None: condition = "1 = 1" # Add exclusion condition if exclude_ids is provided if exclude_ids and len(exclude_ids) > 0: id_list = ','.join(str(id) for id in exclude_ids) condition = f"{condition} AND id NOT IN ({id_list})" try: cursor = self._connection.cursor() if unique_only: # Use subquery to handle DISTINCT with ORDER BY RANDOM() query = f"SELECT * FROM (SELECT DISTINCT * FROM {table_name} WHERE {condition}) AS distinct_rows ORDER BY RANDOM() LIMIT {str(count)}" else: query = f"SELECT * FROM {table_name} WHERE {condition} ORDER BY RANDOM() LIMIT {str(count)}" cursor.execute(query) result = cursor.fetchall() return result except Error as e: self._connection.rollback() print(f"Failed to select random rows from {table_name}") print(e) return False def check_health(self): cursor = self._connection.cursor() cursor.execute("SELECT 1") result = cursor.fetchone() cursor.close() if result is not None: return True else: return False def delete_row(self, table_name: str, column: str, value: str): try: cursor = self._connection.cursor() query = f"DELETE FROM {table_name} WHERE {column} = %s" cursor.execute(query, (value,)) self._connection.commit() print("Data Deleted:", value) except Error as e: self._connection.rollback() print(f"Failed to delete row from {table_name} WHERE {column} is {value}") print(e) return False return True def get_distinct_col(self, table_name: str, column: str): try: cursor = self._connection.cursor() query = f"SELECT DISTINCT {column} FROM {table_name}" cursor.execute(query) result = cursor.fetchall() cursor.close() return [row[0] for row in result] except Error as e: self._connection.rollback() print(f"Failed to get unique values from {column} in {table_name}") print(e) return False def close_connection(self): self._connection.close()