diff options
| author | Pinapelz <yukais@pinapelz.com> | 2025-12-03 14:45:44 -0800 |
|---|---|---|
| committer | Pinapelz <yukais@pinapelz.com> | 2025-12-03 14:45:44 -0800 |
| commit | 0fd8a4dfa48651ba8d25b799e1a72e692b5e9e62 (patch) | |
| tree | 88d7e86fe7b3ce9e2e562fd329167dd8bc669ae3 /database.py | |
| parent | eb97f51353fc6a2e582bf70187a4203e79e276bf (diff) | |
switch to sqlite db, allow for post queueing
Diffstat (limited to 'database.py')
| -rw-r--r-- | database.py | 169 |
1 files changed, 169 insertions, 0 deletions
diff --git a/database.py b/database.py new file mode 100644 index 0000000..35d239f --- /dev/null +++ b/database.py @@ -0,0 +1,169 @@ +import sqlite3 +import json +from datetime import datetime, timedelta +from typing import List, Dict, Optional +import os + +class DatabaseManager: + def __init__(self, db_path: str = "rasis.db"): + self.db_path = db_path + self.init_database() + + def init_database(self): + """Initialize the database with required tables""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS processed_hashes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + hash TEXT UNIQUE NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS post_queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + post_data TEXT NOT NULL, + content TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + posted_at TIMESTAMP NULL, + status TEXT DEFAULT 'pending' + ) + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS posting_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + posted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + queue_id INTEGER, + FOREIGN KEY (queue_id) REFERENCES post_queue (id) + ) + """) + conn.commit() + + def is_hash_processed(self, hash_value: str) -> bool: + """Check if a news post hash has already been processed""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute("SELECT 1 FROM processed_hashes WHERE hash = ?", (hash_value,)) + return cursor.fetchone() is not None + + def add_processed_hash(self, hash_value: str): + """Add a hash to the processed hashes table""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "INSERT OR IGNORE INTO processed_hashes (hash) VALUES (?)", + (hash_value,) + ) + conn.commit() + + def add_to_queue(self, post_data: Dict, content: str) -> int: + """Add a post to the queue and return the queue ID""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "INSERT INTO post_queue (post_data, content) VALUES (?, ?)", + (json.dumps(post_data), content) + ) + conn.commit() + return cursor.lastrowid + + def get_pending_posts(self, limit: Optional[int] = None) -> List[Dict]: + """Get pending posts from the queue""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + query = """ + SELECT id, post_data, content, created_at + FROM post_queue + WHERE status = 'pending' + ORDER BY created_at ASC + """ + if limit: + query += f" LIMIT {limit}" + + cursor.execute(query) + rows = cursor.fetchall() + + return [ + { + 'id': row[0], + 'post_data': json.loads(row[1]), + 'content': row[2], + 'created_at': row[3] + } + for row in rows + ] + + def mark_post_as_posted(self, queue_id: int): + """Mark a queued post as posted""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + now = datetime.now().isoformat() + cursor.execute( + "UPDATE post_queue SET status = 'posted', posted_at = ? WHERE id = ?", + (now, queue_id) + ) + cursor.execute( + "INSERT INTO posting_log (queue_id) VALUES (?)", + (queue_id,) + ) + conn.commit() + + def get_posts_in_last_hour(self) -> int: + """Get the number of posts made in the last hour""" + one_hour_ago = (datetime.now() - timedelta(hours=1)).isoformat() + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT COUNT(*) FROM posting_log WHERE posted_at >= ?", + (one_hour_ago,) + ) + return cursor.fetchone()[0] + + def can_post_more(self, max_per_hour: int) -> bool: + """Check if we can post more based on rate limit""" + return self.get_posts_in_last_hour() < max_per_hour + + def get_queue_stats(self) -> Dict: + """Get statistics about the queue""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + # Get pending count + cursor.execute("SELECT COUNT(*) FROM post_queue WHERE status = 'pending'") + pending = cursor.fetchone()[0] + + # Get posted count + cursor.execute("SELECT COUNT(*) FROM post_queue WHERE status = 'posted'") + posted = cursor.fetchone()[0] + + # Get posts in last hour + posts_last_hour = self.get_posts_in_last_hour() + + return { + 'pending': pending, + 'posted': posted, + 'posts_last_hour': posts_last_hour + } + + def cleanup_old_data(self, days_to_keep: int = 30): + """Clean up old data to keep database size manageable""" + cutoff_date = (datetime.now() - timedelta(days=days_to_keep)).isoformat() + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + # Clean up old posted posts + cursor.execute( + "DELETE FROM post_queue WHERE status = 'posted' AND posted_at < ?", + (cutoff_date,) + ) + + # Clean up old posting logs + cursor.execute( + "DELETE FROM posting_log WHERE posted_at < ?", + (cutoff_date,) + ) + + conn.commit() |
