1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
import sqlite3
import json
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import os
class DatabaseManager:
def __init__(self, db_path: str = "rasis.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize the database with required tables"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS processed_hashes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
hash TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS post_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_data TEXT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
posted_at TIMESTAMP NULL,
status TEXT DEFAULT 'pending'
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS posting_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
posted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
queue_id INTEGER,
FOREIGN KEY (queue_id) REFERENCES post_queue (id)
)
""")
conn.commit()
def is_hash_processed(self, hash_value: str) -> bool:
"""Check if a news post hash has already been processed"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT 1 FROM processed_hashes WHERE hash = ?", (hash_value,))
return cursor.fetchone() is not None
def add_processed_hash(self, hash_value: str):
"""Add a hash to the processed hashes table"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT OR IGNORE INTO processed_hashes (hash) VALUES (?)",
(hash_value,)
)
conn.commit()
def add_to_queue(self, post_data: Dict, content: str) -> int:
"""Add a post to the queue and return the queue ID"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO post_queue (post_data, content) VALUES (?, ?)",
(json.dumps(post_data), content)
)
conn.commit()
return cursor.lastrowid
def get_pending_posts(self, limit: Optional[int] = None) -> List[Dict]:
"""Get pending posts from the queue"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
query = """
SELECT id, post_data, content, created_at
FROM post_queue
WHERE status = 'pending'
ORDER BY created_at ASC
"""
if limit:
query += f" LIMIT {limit}"
cursor.execute(query)
rows = cursor.fetchall()
return [
{
'id': row[0],
'post_data': json.loads(row[1]),
'content': row[2],
'created_at': row[3]
}
for row in rows
]
def mark_post_as_posted(self, queue_id: int):
"""Mark a queued post as posted"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
now = datetime.now().isoformat()
cursor.execute(
"UPDATE post_queue SET status = 'posted', posted_at = ? WHERE id = ?",
(now, queue_id)
)
cursor.execute(
"INSERT INTO posting_log (queue_id) VALUES (?)",
(queue_id,)
)
conn.commit()
def get_posts_in_last_hour(self) -> int:
"""Get the number of posts made in the last hour"""
one_hour_ago = (datetime.now() - timedelta(hours=1)).isoformat()
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT COUNT(*) FROM posting_log WHERE posted_at >= ?",
(one_hour_ago,)
)
return cursor.fetchone()[0]
def can_post_more(self, max_per_hour: int) -> bool:
"""Check if we can post more based on rate limit"""
return self.get_posts_in_last_hour() < max_per_hour
def get_queue_stats(self) -> Dict:
"""Get statistics about the queue"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Get pending count
cursor.execute("SELECT COUNT(*) FROM post_queue WHERE status = 'pending'")
pending = cursor.fetchone()[0]
# Get posted count
cursor.execute("SELECT COUNT(*) FROM post_queue WHERE status = 'posted'")
posted = cursor.fetchone()[0]
# Get posts in last hour
posts_last_hour = self.get_posts_in_last_hour()
return {
'pending': pending,
'posted': posted,
'posts_last_hour': posts_last_hour
}
def cleanup_old_data(self, days_to_keep: int = 30):
"""Clean up old data to keep database size manageable"""
cutoff_date = (datetime.now() - timedelta(days=days_to_keep)).isoformat()
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Clean up old posted posts
cursor.execute(
"DELETE FROM post_queue WHERE status = 'posted' AND posted_at < ?",
(cutoff_date,)
)
# Clean up old posting logs
cursor.execute(
"DELETE FROM posting_log WHERE posted_at < ?",
(cutoff_date,)
)
conn.commit()
|