aboutsummaryrefslogtreecommitdiffstats
path: root/api/database.py
diff options
context:
space:
mode:
authorPinapelz <yukais@pinapelz.com>2025-09-08 15:56:06 -0700
committerPinapelz <yukais@pinapelz.com>2025-09-08 16:21:57 -0700
commitf155194356a8231b0c190d7276f203aca49ff028 (patch)
tree20c4e7ecb633ce2b70ca65a7840f29901d10d2e6 /api/database.py
parent6ad0c786a3e53f437e24ae7eb92e69648ca8751c (diff)
redirect root page to new demo
Diffstat (limited to 'api/database.py')
-rw-r--r--api/database.py124
1 files changed, 124 insertions, 0 deletions
diff --git a/api/database.py b/api/database.py
new file mode 100644
index 0000000..14ac611
--- /dev/null
+++ b/api/database.py
@@ -0,0 +1,124 @@
+import psycopg2
+from psycopg2 import Error
+
+class PostgresHandler:
+ def __init__(self, username: str, password: str, host_name: str, port: int, database: str):
+ db_params = {
+ "dbname": database,
+ "user": username,
+ "password": password,
+ "host": host_name,
+ "port": port
+ }
+ self._connection = psycopg2.connect(**db_params)
+ print("Handler Success")
+
+
+ def create_table(self, name: str, column: str):
+ cursor = self._connection.cursor()
+ cursor.execute(f"CREATE TABLE IF NOT EXISTS {name} ({column})")
+ self._connection.commit()
+ cursor.close()
+
+ def clear_table(self, name: str):
+ cursor = self._connection.cursor()
+ cursor.execute(f"DELETE FROM {name}")
+ self._connection.commit()
+ cursor.close()
+
+ def check_row_exists(self, table_name: str, column_name: str, value: str):
+ cursor = self._connection.cursor()
+ query = f"SELECT 1 FROM {table_name} WHERE {column_name} = %s"
+ cursor.execute(query, (value,))
+ result = cursor.fetchone()
+ cursor.close()
+
+ if result is not None:
+ return True
+ else:
+ return False
+
+ def insert_row(self, table_name, column, data):
+ try:
+ cursor = self._connection.cursor()
+ placeholders = ', '.join(['%s'] * len(data))
+ query = f"INSERT INTO {table_name} ({column}) VALUES ({placeholders})"
+ cursor.execute(query, data)
+ self._connection.commit()
+ print("Data Inserted:", data)
+ except Error as err:
+ self._connection.rollback()
+ print("Error inserting data")
+ print(err)
+ if "duplicate key" not in str(err).lower():
+ return False
+ return True
+
+ def get_rows(self, table_name: str, column: str, value: str):
+ try:
+ cursor = self._connection.cursor()
+ query = f"SELECT * FROM {table_name} WHERE {column} = %s"
+ cursor.execute(query, (value,))
+ result = cursor.fetchall()
+ return result
+ except Error as e:
+ self._connection.rollback()
+ print(f"Failed to fetch row from {table_name} WHERE {column} is {value}")
+ print(e)
+ return False
+
+ def get_random_row(self, table_name: str, count: int, condition: str = None):
+ if condition is None:
+ condition = "1 = 1"
+ try:
+ cursor = self._connection.cursor()
+ query = f"SELECT * FROM {table_name} WHERE {condition} ORDER BY RANDOM() LIMIT {str(count)}"
+ cursor.execute(query)
+ result = cursor.fetchall()
+ return result
+ except Error as e:
+ self._connection.rollback()
+ print(f"Failed to select random rows from {table_name}")
+ print(e)
+ return False
+
+ def check_health(self):
+ cursor = self._connection.cursor()
+ cursor.execute("SELECT 1")
+ result = cursor.fetchone()
+ cursor.close()
+ if result is not None:
+ return True
+ else:
+ return False
+
+ def delete_row(self, table_name: str, column: str, value: str):
+ try:
+ cursor = self._connection.cursor()
+ query = f"DELETE FROM {table_name} WHERE {column} = %s"
+ cursor.execute(query, (value,))
+ self._connection.commit()
+ print("Data Deleted:", value)
+ except Error as e:
+ self._connection.rollback()
+ print(f"Failed to delete row from {table_name} WHERE {column} is {value}")
+ print(e)
+ return False
+ return True
+
+ def get_distinct_col(self, table_name: str, column: str):
+ try:
+ cursor = self._connection.cursor()
+ query = f"SELECT DISTINCT {column} FROM {table_name}"
+ cursor.execute(query)
+ result = cursor.fetchall()
+ cursor.close()
+ return [row[0] for row in result]
+ except Error as e:
+ self._connection.rollback()
+ print(f"Failed to get unique values from {column} in {table_name}")
+ print(e)
+ return False
+
+ def close_connection(self):
+ self._connection.close()
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage