summaryrefslogtreecommitdiffstats
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/pg_handler.py169
1 files changed, 169 insertions, 0 deletions
diff --git a/sql/pg_handler.py b/sql/pg_handler.py
new file mode 100644
index 0000000..74a9170
--- /dev/null
+++ b/sql/pg_handler.py
@@ -0,0 +1,169 @@
+import psycopg2
+from psycopg2 import Error
+
+class PostgresHandler:
+ def __init__(self, username: str, password: str, host_name: str, port: int, database: str):
+ db_params = {
+ "dbname": database,
+ "user": username,
+ "password": password,
+ "host": host_name,
+ "port": port
+ }
+ self._connection = psycopg2.connect(**db_params)
+ print("Handler Success")
+
+ def get_connection(self):
+ return self._connection
+
+ def create_table(self, name: str, column: str):
+ cursor = self._connection.cursor()
+ cursor.execute(f'CREATE TABLE IF NOT EXISTS "{name}" ({column})')
+ self._connection.commit()
+ cursor.close()
+
+ def clear_table(self, name: str):
+ cursor = self._connection.cursor()
+ cursor.execute(f"DELETE FROM {name}")
+ self._connection.commit()
+ cursor.close()
+
+ def check_row_exists(self, table_name: str, column_name: str, value: str):
+ cursor = self._connection.cursor()
+ query = f'SELECT 1 FROM "{table_name}" WHERE {column_name} = %s'
+ cursor.execute(query, (value,))
+ result = cursor.fetchone()
+ cursor.close()
+
+ if result is not None:
+ return True
+ else:
+ return False
+
+ def insert_row(self, table_name, column, data):
+ try:
+ cursor = self._connection.cursor()
+ placeholders = ', '.join(['%s'] * len(data))
+ query = f'INSERT INTO "{table_name}" ({column}) VALUES ({placeholders})'
+ cursor.execute(query, data)
+ self._connection.commit()
+ print("Data Inserted:", data)
+ except Error as err:
+ self._connection.rollback()
+ print("Error inserting data")
+ print(err)
+ if "duplicate key" not in str(err).lower():
+ return False
+ return True
+
+ def update_row(self, table_name: str, column: str, value: str, update_column: str, update_value: str):
+ try:
+ cursor = self._connection.cursor()
+ query = f'UPDATE "{table_name}" SET {update_column} = %s WHERE {column} = %s'
+ cursor.execute(query, (update_value, value))
+ self._connection.commit()
+ print("Data Updated:", value, update_value)
+ except Error as e:
+ self._connection.rollback()
+ print(f"Failed to update row from {table_name} WHERE {column} is {value}")
+ print(e)
+ return False
+ return True
+
+ def get_rows(self, table_name: str, column: str, value: str):
+ try:
+ cursor = self._connection.cursor()
+ query = f'SELECT * FROM "{table_name}" WHERE {column} = %s'
+ cursor.execute(query, (value,))
+ result = cursor.fetchall()
+ return result
+ except Error as e:
+ self._connection.rollback()
+ print(f"Failed to fetch row from {table_name} WHERE {column} is {value}")
+ print(e)
+ return False
+
+ def get_random_row(self, table_name: str, count: int, condition: str = None):
+ if condition is None:
+ condition = "1 = 1"
+ try:
+ cursor = self._connection.cursor()
+ query = f"SELECT * FROM {table_name} WHERE {condition} ORDER BY RANDOM() LIMIT {str(count)}"
+ cursor.execute(query)
+ result = cursor.fetchall()
+ return result
+ except Error as e:
+ self._connection.rollback()
+ print(f"Failed to select random rows from {table_name}")
+ print(e)
+ return False
+
+ def check_health(self):
+ cursor = self._connection.cursor()
+ cursor.execute("SELECT 1")
+ result = cursor.fetchone()
+ cursor.close()
+ if result is not None:
+ return True
+ else:
+ return False
+
+ def delete_row(self, table_name: str, column: str, value: str):
+ try:
+ cursor = self._connection.cursor()
+ query = f"DELETE FROM {table_name} WHERE {column} = %s"
+ cursor.execute(query, (value,))
+ self._connection.commit()
+ print("Data Deleted:", value)
+ except Error as e:
+ self._connection.rollback()
+ print(f"Failed to delete row from {table_name} WHERE {column} is {value}")
+ print(e)
+ return False
+ return True
+
+ def execute_query(self, query: str, data: tuple = None):
+ try:
+ cursor = self._connection.cursor()
+ if data is None:
+ cursor.execute(query)
+ else:
+ cursor.execute(query, data)
+ result = cursor.fetchall()
+ return result
+ except Error as e:
+ self._connection.rollback()
+ print(f"Failed to execute query: {query}")
+ print(e)
+ return False
+
+ def reset_auto_increment(self, table_name: str):
+ try:
+ cursor = self._connection.cursor()
+ query = f"ALTER SEQUENCE {table_name}_id RESTART WITH 1"
+ cursor.execute(query)
+ self._connection.commit()
+ print("Auto Increment Reset")
+ except Error as e:
+ self._connection.rollback()
+ print(f"Failed to reset auto increment for {table_name}")
+ print(e)
+ return False
+ return True
+
+ def get_most_recently_added_row_time(self, table_name: str):
+ try:
+ cursor = self._connection.cursor()
+ query = f"SELECT timestamp FROM {table_name} ORDER BY id DESC LIMIT 1"
+ cursor.execute(query)
+ result = cursor.fetchone()
+ return result
+ except Error as e:
+ self._connection.rollback()
+ print(f"Failed to get most recently added row from {table_name}")
+ print(e)
+ return False
+
+
+ def close_connection(self):
+ self._connection.close() \ No newline at end of file
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage