diff options
| author | Pinapelz <yukais@pinapelz.com> | 2025-09-08 15:56:06 -0700 |
|---|---|---|
| committer | Pinapelz <yukais@pinapelz.com> | 2025-09-08 16:21:57 -0700 |
| commit | f155194356a8231b0c190d7276f203aca49ff028 (patch) | |
| tree | 20c4e7ecb633ce2b70ca65a7840f29901d10d2e6 /api/database.py | |
| parent | 6ad0c786a3e53f437e24ae7eb92e69648ca8751c (diff) | |
redirect root page to new demo
Diffstat (limited to 'api/database.py')
| -rw-r--r-- | api/database.py | 124 |
1 files changed, 124 insertions, 0 deletions
diff --git a/api/database.py b/api/database.py new file mode 100644 index 0000000..14ac611 --- /dev/null +++ b/api/database.py @@ -0,0 +1,124 @@ +import psycopg2 +from psycopg2 import Error + +class PostgresHandler: + def __init__(self, username: str, password: str, host_name: str, port: int, database: str): + db_params = { + "dbname": database, + "user": username, + "password": password, + "host": host_name, + "port": port + } + self._connection = psycopg2.connect(**db_params) + print("Handler Success") + + + def create_table(self, name: str, column: str): + cursor = self._connection.cursor() + cursor.execute(f"CREATE TABLE IF NOT EXISTS {name} ({column})") + self._connection.commit() + cursor.close() + + def clear_table(self, name: str): + cursor = self._connection.cursor() + cursor.execute(f"DELETE FROM {name}") + self._connection.commit() + cursor.close() + + def check_row_exists(self, table_name: str, column_name: str, value: str): + cursor = self._connection.cursor() + query = f"SELECT 1 FROM {table_name} WHERE {column_name} = %s" + cursor.execute(query, (value,)) + result = cursor.fetchone() + cursor.close() + + if result is not None: + return True + else: + return False + + def insert_row(self, table_name, column, data): + try: + cursor = self._connection.cursor() + placeholders = ', '.join(['%s'] * len(data)) + query = f"INSERT INTO {table_name} ({column}) VALUES ({placeholders})" + cursor.execute(query, data) + self._connection.commit() + print("Data Inserted:", data) + except Error as err: + self._connection.rollback() + print("Error inserting data") + print(err) + if "duplicate key" not in str(err).lower(): + return False + return True + + def get_rows(self, table_name: str, column: str, value: str): + try: + cursor = self._connection.cursor() + query = f"SELECT * FROM {table_name} WHERE {column} = %s" + cursor.execute(query, (value,)) + result = cursor.fetchall() + return result + except Error as e: + self._connection.rollback() + print(f"Failed to fetch row from {table_name} WHERE {column} is {value}") + print(e) + return False + + def get_random_row(self, table_name: str, count: int, condition: str = None): + if condition is None: + condition = "1 = 1" + try: + cursor = self._connection.cursor() + query = f"SELECT * FROM {table_name} WHERE {condition} ORDER BY RANDOM() LIMIT {str(count)}" + cursor.execute(query) + result = cursor.fetchall() + return result + except Error as e: + self._connection.rollback() + print(f"Failed to select random rows from {table_name}") + print(e) + return False + + def check_health(self): + cursor = self._connection.cursor() + cursor.execute("SELECT 1") + result = cursor.fetchone() + cursor.close() + if result is not None: + return True + else: + return False + + def delete_row(self, table_name: str, column: str, value: str): + try: + cursor = self._connection.cursor() + query = f"DELETE FROM {table_name} WHERE {column} = %s" + cursor.execute(query, (value,)) + self._connection.commit() + print("Data Deleted:", value) + except Error as e: + self._connection.rollback() + print(f"Failed to delete row from {table_name} WHERE {column} is {value}") + print(e) + return False + return True + + def get_distinct_col(self, table_name: str, column: str): + try: + cursor = self._connection.cursor() + query = f"SELECT DISTINCT {column} FROM {table_name}" + cursor.execute(query) + result = cursor.fetchall() + cursor.close() + return [row[0] for row in result] + except Error as e: + self._connection.rollback() + print(f"Failed to get unique values from {column} in {table_name}") + print(e) + return False + + def close_connection(self): + self._connection.close() |
