1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
import psycopg2
from psycopg2 import Error
class PostgresHandler:
def __init__(self, username: str, password: str, host_name: str, port: int, database: str):
db_params = {
"dbname": database,
"user": username,
"password": password,
"host": host_name,
"port": port
}
self._connection = psycopg2.connect(**db_params)
print("Handler Success")
def create_table(self, name: str, column: str):
cursor = self._connection.cursor()
cursor.execute(f"CREATE TABLE IF NOT EXISTS {name} ({column})")
self._connection.commit()
cursor.close()
def clear_table(self, name: str):
cursor = self._connection.cursor()
cursor.execute(f"DELETE FROM {name}")
self._connection.commit()
cursor.close()
def check_row_exists(self, table_name: str, column_name: str, value: str):
cursor = self._connection.cursor()
query = f"SELECT 1 FROM {table_name} WHERE {column_name} = %s"
cursor.execute(query, (value,))
result = cursor.fetchone()
cursor.close()
if result is not None:
return True
else:
return False
def insert_row(self, table_name, column, data):
try:
cursor = self._connection.cursor()
placeholders = ', '.join(['%s'] * len(data))
query = f"INSERT INTO {table_name} ({column}) VALUES ({placeholders})"
cursor.execute(query, data)
self._connection.commit()
print("Data Inserted:", data)
except Error as err:
self._connection.rollback()
print("Error inserting data")
print(err)
if "duplicate key" not in str(err).lower():
return False
return True
def get_rows(self, table_name: str, column: str, value: str):
try:
cursor = self._connection.cursor()
query = f"SELECT * FROM {table_name} WHERE {column} = %s"
cursor.execute(query, (value,))
result = cursor.fetchall()
return result
except Error as e:
self._connection.rollback()
print(f"Failed to fetch row from {table_name} WHERE {column} is {value}")
print(e)
return False
def get_random_row(self, table_name: str, count: int, condition: str = None):
if condition is None:
condition = "1 = 1"
try:
cursor = self._connection.cursor()
query = f"SELECT * FROM {table_name} WHERE {condition} ORDER BY RANDOM() LIMIT {str(count)}"
cursor.execute(query)
result = cursor.fetchall()
return result
except Error as e:
self._connection.rollback()
print(f"Failed to select random rows from {table_name}")
print(e)
return False
def check_health(self):
cursor = self._connection.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
cursor.close()
if result is not None:
return True
else:
return False
def delete_row(self, table_name: str, column: str, value: str):
try:
cursor = self._connection.cursor()
query = f"DELETE FROM {table_name} WHERE {column} = %s"
cursor.execute(query, (value,))
self._connection.commit()
print("Data Deleted:", value)
except Error as e:
self._connection.rollback()
print(f"Failed to delete row from {table_name} WHERE {column} is {value}")
print(e)
return False
return True
def get_distinct_col(self, table_name: str, column: str):
try:
cursor = self._connection.cursor()
query = f"SELECT DISTINCT {column} FROM {table_name}"
cursor.execute(query)
result = cursor.fetchall()
cursor.close()
return [row[0] for row in result]
except Error as e:
self._connection.rollback()
print(f"Failed to get unique values from {column} in {table_name}")
print(e)
return False
def close_connection(self):
self._connection.close()
|