diff options
| -rw-r--r-- | database.py | 164 | ||||
| -rw-r--r-- | rasis.py | 208 |
2 files changed, 127 insertions, 245 deletions
diff --git a/database.py b/database.py index 35d239f..8ab2ab0 100644 --- a/database.py +++ b/database.py @@ -1,8 +1,6 @@ import sqlite3 -import json from datetime import datetime, timedelta -from typing import List, Dict, Optional -import os +from typing import Optional class DatabaseManager: def __init__(self, db_path: str = "rasis.db"): @@ -10,160 +8,78 @@ class DatabaseManager: self.init_database() def init_database(self): - """Initialize the database with required tables""" + """Initialize the database with a simple posted_posts table""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" - CREATE TABLE IF NOT EXISTS processed_hashes ( + CREATE TABLE IF NOT EXISTS posted_posts ( id INTEGER PRIMARY KEY AUTOINCREMENT, - hash TEXT UNIQUE NOT NULL, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + archive_hash TEXT UNIQUE NOT NULL, + posted_at TIMESTAMP NOT NULL ) """) cursor.execute(""" - CREATE TABLE IF NOT EXISTS post_queue ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - post_data TEXT NOT NULL, - content TEXT NOT NULL, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - posted_at TIMESTAMP NULL, - status TEXT DEFAULT 'pending' - ) - """) - cursor.execute(""" - CREATE TABLE IF NOT EXISTS posting_log ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - posted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - queue_id INTEGER, - FOREIGN KEY (queue_id) REFERENCES post_queue (id) - ) + CREATE INDEX IF NOT EXISTS idx_posted_at + ON posted_posts(posted_at) """) conn.commit() - def is_hash_processed(self, hash_value: str) -> bool: - """Check if a news post hash has already been processed""" + def is_posted(self, archive_hash: str) -> bool: + """Check if we've already posted this hash""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - cursor.execute("SELECT 1 FROM processed_hashes WHERE hash = ?", (hash_value,)) + cursor.execute("SELECT 1 FROM posted_posts WHERE archive_hash = ?", (archive_hash,)) return cursor.fetchone() is not None - def add_processed_hash(self, hash_value: str): - """Add a hash to the processed hashes table""" - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - cursor.execute( - "INSERT OR IGNORE INTO processed_hashes (hash) VALUES (?)", - (hash_value,) - ) - conn.commit() - - def add_to_queue(self, post_data: Dict, content: str) -> int: - """Add a post to the queue and return the queue ID""" - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - cursor.execute( - "INSERT INTO post_queue (post_data, content) VALUES (?, ?)", - (json.dumps(post_data), content) - ) - conn.commit() - return cursor.lastrowid - - def get_pending_posts(self, limit: Optional[int] = None) -> List[Dict]: - """Get pending posts from the queue""" - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - query = """ - SELECT id, post_data, content, created_at - FROM post_queue - WHERE status = 'pending' - ORDER BY created_at ASC - """ - if limit: - query += f" LIMIT {limit}" - - cursor.execute(query) - rows = cursor.fetchall() - - return [ - { - 'id': row[0], - 'post_data': json.loads(row[1]), - 'content': row[2], - 'created_at': row[3] - } - for row in rows - ] - - def mark_post_as_posted(self, queue_id: int): - """Mark a queued post as posted""" + def mark_as_posted(self, archive_hash: str): + """Mark a post as posted""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - now = datetime.now().isoformat() - cursor.execute( - "UPDATE post_queue SET status = 'posted', posted_at = ? WHERE id = ?", - (now, queue_id) - ) cursor.execute( - "INSERT INTO posting_log (queue_id) VALUES (?)", - (queue_id,) + "INSERT OR IGNORE INTO posted_posts (archive_hash, posted_at) VALUES (?, ?)", + (archive_hash, datetime.now().isoformat()) ) conn.commit() - def get_posts_in_last_hour(self) -> int: - """Get the number of posts made in the last hour""" - one_hour_ago = (datetime.now() - timedelta(hours=1)).isoformat() - + def get_posts_count_last_hour(self) -> int: + """How many posts did we make in the last hour?""" + one_hour_ago = datetime.now() - timedelta(hours=1) with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute( - "SELECT COUNT(*) FROM posting_log WHERE posted_at >= ?", - (one_hour_ago,) + "SELECT COUNT(*) FROM posted_posts WHERE posted_at >= ?", + (one_hour_ago.isoformat(),) ) return cursor.fetchone()[0] def can_post_more(self, max_per_hour: int) -> bool: - """Check if we can post more based on rate limit""" - return self.get_posts_in_last_hour() < max_per_hour + """Can we post more within the rate limit?""" + return self.get_posts_count_last_hour() < max_per_hour - def get_queue_stats(self) -> Dict: - """Get statistics about the queue""" + def get_next_post_time(self, max_per_hour: int) -> Optional[datetime]: + """Get the time when the next post can be made""" + if self.can_post_more(max_per_hour): + return None # Can post now + one_hour_ago = datetime.now() - timedelta(hours=1) with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() - - # Get pending count - cursor.execute("SELECT COUNT(*) FROM post_queue WHERE status = 'pending'") - pending = cursor.fetchone()[0] - - # Get posted count - cursor.execute("SELECT COUNT(*) FROM post_queue WHERE status = 'posted'") - posted = cursor.fetchone()[0] - - # Get posts in last hour - posts_last_hour = self.get_posts_in_last_hour() - - return { - 'pending': pending, - 'posted': posted, - 'posts_last_hour': posts_last_hour - } - - def cleanup_old_data(self, days_to_keep: int = 30): - """Clean up old data to keep database size manageable""" - cutoff_date = (datetime.now() - timedelta(days=days_to_keep)).isoformat() - - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - - # Clean up old posted posts cursor.execute( - "DELETE FROM post_queue WHERE status = 'posted' AND posted_at < ?", - (cutoff_date,) + "SELECT posted_at FROM posted_posts WHERE posted_at >= ? ORDER BY posted_at ASC LIMIT 1", + (one_hour_ago.isoformat(),) ) + result = cursor.fetchone() + if result: + oldest_post_time = datetime.fromisoformat(result[0]) + return oldest_post_time + timedelta(hours=1) + return None - # Clean up old posting logs + def cleanup_old_data(self, days_to_keep: int = 90): + """Optional: Clean up very old entries to keep DB small""" + cutoff_date = datetime.now() - timedelta(days=days_to_keep) + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() cursor.execute( - "DELETE FROM posting_log WHERE posted_at < ?", - (cutoff_date,) + "DELETE FROM posted_posts WHERE posted_at < ?", + (cutoff_date.isoformat(),) ) - conn.commit() @@ -1,6 +1,5 @@ import http.client import json -import hashlib import requests from dotenv import load_dotenv import os @@ -20,48 +19,13 @@ def is_post_after_start_date(post_date) -> bool: if not START_DATE: return True try: - if isinstance(post_date, str): - post_timestamp = int(post_date) - else: - post_timestamp = int(post_date) + post_timestamp = int(post_date) if isinstance(post_date, str) else int(post_date) post_datetime = datetime.fromtimestamp(post_timestamp) start_datetime = datetime.strptime(START_DATE, "%Y-%m-%d") return post_datetime >= start_datetime except (ValueError, TypeError): return True -def generate_queued_posts(db: DatabaseManager, dry_run: bool = False) -> list: - """Fetch new posts and add them to the queue""" - url = "https://arcade-news.pinapelz.com/news.json" - response = requests.get(url) - new_posts = [] - if response.status_code == 200: - data = response.json() - news_posts = data["news_posts"] - for post in news_posts: - if not is_post_after_start_date(post.get('timestamp', '')): - continue - post_hash = hashlib.sha256( - f"{post['identifier'] + post['content'] + post['date']}".encode('utf-8') - ).hexdigest() - if db.is_hash_processed(post_hash): - continue - content = generate_post_content(post) - if content is None: # skip if we do not handle the game - continue - if not dry_run: - db.add_to_queue(post, content) - db.add_processed_hash(post_hash) - else: - print(f"[DRY RUN] Would add to queue: {post['identifier']} - {post['date']}") - new_posts.append(post) - - else: - print(f"Failed to download JSON. Status code: {response.status_code}") - return new_posts - - return new_posts - def generate_post_content(post_data: dict) -> str: """Generate post content from post data""" if "IIDX" in post_data["identifier"]: @@ -110,39 +74,36 @@ def generate_post_content(post_data: dict) -> str: tags = "#taikonotatsufin" else: return None + content = f"๐ฐ {game} - {post_data['date']}\n\n" + if post_data["is_ai_summary"]: - content = content + "The information below is written by AI / ไธ่จใฎๆ
ๅ ฑใฏAIใซใใฃใฆ็ๆใใใพใใใ\n\n" + content += "The information below is written by AI / ไธ่จใฎๆ
ๅ ฑใฏAIใซใใฃใฆ็ๆใใใพใใใ\n\n" + if post_data["type"] is not None: - content = content + f"[{post_data['type']}] " + content += f"[{post_data['type']}] " + if post_data["headline"] is not None and post_data["headline"] != post_data["content"]: - content = content + f"{post_data['headline']}\n\n" + content += f"{post_data['headline']}\n\n" + if len(post_data["content"]) > 2500: truncated_content = post_data["content"][:2500] + "..." - content = content + truncated_content + "\n\n" + content += truncated_content + "\n\n" else: - content = content + post_data["content"] + "\n\n" + content += post_data["content"] + "\n\n" if post_data["url"] is not None: - content = content + f"๐ {post_data['url']}\n" + content += f"๐ {post_data['url']}\n" - for i in range(len(post_data["images"])): - content = content + f"๐ผ [Image{i+1}]({post_data['images'][i]['image']})\n" - content = content + tags + content += f"[๐ MORE DETAILS HERE](https://ac.moekyun.me/news?post={post_data['archive_hash']})\n" + content += tags return content def post_on_fedi(content: str, dry_run: bool = False) -> bool: """Post content to Fediverse""" - if dry_run: - print("[DRY RUN] Would post to Fediverse:") - print("-" * 50) - print(content) - print("-" * 50) - return True - + print(f"[DRY RUN] Would post:\n{'-' * 60}\n{content}\n{'-' * 60}") try: conn = http.client.HTTPSConnection(os.environ.get("SHARKEY_INSTANCE")) - payload = { "visibility": "public", "text": content, @@ -151,17 +112,14 @@ def post_on_fedi(content: str, dry_run: bool = False) -> bool: "noExtractHashtags": False, "noExtractEmojis": False } - headers = { "Content-Type": "application/json", "Authorization": "Bearer " + os.environ.get("SHARKEY_KEY") } - conn.request("POST", "/api/notes/create", json.dumps(payload), headers) res = conn.getresponse() data = res.read() - # Check if post was successful if res.status == 200: return True else: @@ -171,88 +129,96 @@ def post_on_fedi(content: str, dry_run: bool = False) -> bool: print(f"Error posting to Fediverse: {e}") return False -def process_queue(db: DatabaseManager, dry_run: bool = False): - """Process the posting queue with rate limiting""" - if not db.can_post_more(POSTS_PER_HOUR): - posts_made = db.get_posts_in_last_hour() - print(f"Rate limit reached: {posts_made}/{POSTS_PER_HOUR} posts made in the last hour") - return +def main(): + parser = argparse.ArgumentParser(description='RASIS - Simple Arcade News Bot') + parser.add_argument('--dry-run', action='store_true', help='Simulate without posting') + parser.add_argument('--status', action='store_true', help='Show current status') + parser.add_argument('--cleanup', action='store_true', help='Clean up old data (90+ days)') + + args = parser.parse_args() + dry_run = args.dry_run or DRY_RUN + + db = DatabaseManager(DB_PATH) - posts_made = db.get_posts_in_last_hour() - available_slots = max(0, POSTS_PER_HOUR - posts_made) + if args.status: + posts_in_hour = db.get_posts_count_last_hour() + print(f"Posts in last hour: {posts_in_hour}/{POSTS_PER_HOUR}") + print(f"Can post more: {db.can_post_more(POSTS_PER_HOUR)}") - pending_posts = db.get_pending_posts(limit=available_slots) + next_post_time = db.get_next_post_time(POSTS_PER_HOUR) + if next_post_time: + time_until = next_post_time - datetime.now() + minutes = int(time_until.total_seconds() / 60) + print(f"Next post can be made at: {next_post_time.strftime('%Y-%m-%d %H:%M:%S')}") + print(f"Time until next post: {minutes} minutes") + else: + print("Can post now!") + return - if not pending_posts: - print("No pending posts to process") + if args.cleanup: + print("Cleaning up old data...") + db.cleanup_old_data() return - print(f"Processing {len(pending_posts)} posts (rate limit: {posts_made + len(pending_posts)}/{POSTS_PER_HOUR})") + if not db.can_post_more(POSTS_PER_HOUR): + posts_made = db.get_posts_count_last_hour() + print(f"Rate limit reached: {posts_made}/{POSTS_PER_HOUR} posts in the last hour") + return - for post in pending_posts: - post_id = post['id'] - content = post['content'] - post_data = post['post_data'] + print(f"Starting to process posts (rate limit: {POSTS_PER_HOUR}/hour)") + url = "https://arcade-news.pinapelz.com/news.json" + response = requests.get(url) + if response.status_code != 200: + print(f"Failed to download JSON. Status code: {response.status_code}") + return - cleaned_content = content.encode("utf-8", "replace").decode("utf-8") + data = response.json() + news_posts = data["news_posts"] - game_name = post_data.get('identifier', 'Unknown') - print(f"Processing: {game_name} - {post_data.get('date', 'Unknown date')}") + posts_to_make = [] + for post in news_posts: + if not is_post_after_start_date(post.get('timestamp', '')): + continue - if post_on_fedi(cleaned_content, dry_run): - if not dry_run: - db.mark_post_as_posted(post_id) - print("โ Posted successfully" if not dry_run else "โ Would post successfully") - else: - print("โ Failed to post") + archive_hash = post["archive_hash"] + if db.is_posted(archive_hash): + continue -def show_status(db: DatabaseManager): - """Show current queue and rate limit status""" - stats = db.get_queue_stats() + content = generate_post_content(post) + if content is None: + continue - print(f""" -=== RASIS Status === -Pending posts: {stats['pending']} -Posted posts: {stats['posted']} -Posts in last hour: {stats['posts_last_hour']}/{POSTS_PER_HOUR} -Rate limit slots available: {max(0, POSTS_PER_HOUR - stats['posts_last_hour'])} -Database path: {DB_PATH} -""") + posts_to_make.append((post, content, archive_hash)) -def main(): - parser = argparse.ArgumentParser(description='RASIS - Arcade News Posting Bot') - parser.add_argument('--dry-run', action='store_true', help='Simulate posting without actually posting') - parser.add_argument('--status', action='store_true', help='Show current status and exit') - parser.add_argument('--process-only', action='store_true', help='Only process queue, don\'t fetch new posts') - parser.add_argument('--fetch-only', action='store_true', help='Only fetch new posts, don\'t process queue') - parser.add_argument('--cleanup', action='store_true', help='Clean up old data from database') + if not posts_to_make: + print("No new posts to make") + return - args = parser.parse_args() - dry_run = args.dry_run or DRY_RUN + print(f"Found {len(posts_to_make)} new posts to potentially make") - if dry_run: - print("๐งช DRY RUN MODE - No actual posting will occur") + posts_made_this_run = 0 + for post_data, content, archive_hash in posts_to_make: + if not db.can_post_more(POSTS_PER_HOUR): + current_count = db.get_posts_count_last_hour() + print(f"Rate limit reached: {current_count}/{POSTS_PER_HOUR}. Stopping.") + break - db = DatabaseManager(DB_PATH) + game = post_data.get('identifier', 'Unknown') + date = post_data.get('date', 'Unknown date') - if args.status: - show_status(db) - return + print(f"Posting: {game} - {date}") - if args.cleanup: - print("Cleaning up old data...") - db.cleanup_old_data() - print("Cleanup complete") - return + if post_on_fedi(content, dry_run): + if not dry_run: + db.mark_as_posted(archive_hash) + posts_made_this_run += 1 + print(f" โ Posted successfully ({posts_made_this_run} posted this run)") + else: + print(" โ Failed to post") + break - if not args.process_only: - print("Fetching new posts...") - new_posts = generate_queued_posts(db, dry_run) - print(f"Added {len(new_posts)} new posts to queue") - if not args.fetch_only: - print("Processing queue...") - process_queue(db, dry_run) - show_status(db) + posts_in_hour = db.get_posts_count_last_hour() + print(f"\nStatus: {posts_in_hour}/{POSTS_PER_HOUR} posts made in last hour") if __name__ == "__main__": main() |
