diff options
| -rw-r--r-- | .env.template | 22 | ||||
| -rw-r--r-- | .gitignore | 2 | ||||
| -rw-r--r-- | database.py | 169 | ||||
| -rw-r--r-- | rasis.py | 224 |
4 files changed, 357 insertions, 60 deletions
diff --git a/.env.template b/.env.template index f300eb4..5d5833d 100644 --- a/.env.template +++ b/.env.template @@ -1,3 +1,19 @@ -SHARKEY_KEY= -SHARKEY_INSTANCE= -HASH_FILE_PATH= +# Database configuration +DB_PATH=rasis.db + +# Rate limiting - maximum posts per hour +POSTS_PER_HOUR=3 + +# Start date filter (YYYY-MM-DD format) - only process posts after this date +# Leave empty to process all posts +START_DATE=2025-12-03 + +# Dry run mode - set to "true" to simulate posting without actually posting +DRY_RUN=false + +# Fediverse/Sharkey instance configuration +SHARKEY_INSTANCE=your-instance.com +SHARKEY_KEY=your-api-key-here + +# Legacy file path (not used with SQLite version) +# HASH_FILE_PATH=processed_hashes.txt @@ -172,4 +172,4 @@ cython_debug/ # PyPI configuration file .pypirc -hashes.txt +*.db diff --git a/database.py b/database.py new file mode 100644 index 0000000..35d239f --- /dev/null +++ b/database.py @@ -0,0 +1,169 @@ +import sqlite3 +import json +from datetime import datetime, timedelta +from typing import List, Dict, Optional +import os + +class DatabaseManager: + def __init__(self, db_path: str = "rasis.db"): + self.db_path = db_path + self.init_database() + + def init_database(self): + """Initialize the database with required tables""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS processed_hashes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + hash TEXT UNIQUE NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS post_queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + post_data TEXT NOT NULL, + content TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + posted_at TIMESTAMP NULL, + status TEXT DEFAULT 'pending' + ) + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS posting_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + posted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + queue_id INTEGER, + FOREIGN KEY (queue_id) REFERENCES post_queue (id) + ) + """) + conn.commit() + + def is_hash_processed(self, hash_value: str) -> bool: + """Check if a news post hash has already been processed""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute("SELECT 1 FROM processed_hashes WHERE hash = ?", (hash_value,)) + return cursor.fetchone() is not None + + def add_processed_hash(self, hash_value: str): + """Add a hash to the processed hashes table""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "INSERT OR IGNORE INTO processed_hashes (hash) VALUES (?)", + (hash_value,) + ) + conn.commit() + + def add_to_queue(self, post_data: Dict, content: str) -> int: + """Add a post to the queue and return the queue ID""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "INSERT INTO post_queue (post_data, content) VALUES (?, ?)", + (json.dumps(post_data), content) + ) + conn.commit() + return cursor.lastrowid + + def get_pending_posts(self, limit: Optional[int] = None) -> List[Dict]: + """Get pending posts from the queue""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + query = """ + SELECT id, post_data, content, created_at + FROM post_queue + WHERE status = 'pending' + ORDER BY created_at ASC + """ + if limit: + query += f" LIMIT {limit}" + + cursor.execute(query) + rows = cursor.fetchall() + + return [ + { + 'id': row[0], + 'post_data': json.loads(row[1]), + 'content': row[2], + 'created_at': row[3] + } + for row in rows + ] + + def mark_post_as_posted(self, queue_id: int): + """Mark a queued post as posted""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + now = datetime.now().isoformat() + cursor.execute( + "UPDATE post_queue SET status = 'posted', posted_at = ? WHERE id = ?", + (now, queue_id) + ) + cursor.execute( + "INSERT INTO posting_log (queue_id) VALUES (?)", + (queue_id,) + ) + conn.commit() + + def get_posts_in_last_hour(self) -> int: + """Get the number of posts made in the last hour""" + one_hour_ago = (datetime.now() - timedelta(hours=1)).isoformat() + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT COUNT(*) FROM posting_log WHERE posted_at >= ?", + (one_hour_ago,) + ) + return cursor.fetchone()[0] + + def can_post_more(self, max_per_hour: int) -> bool: + """Check if we can post more based on rate limit""" + return self.get_posts_in_last_hour() < max_per_hour + + def get_queue_stats(self) -> Dict: + """Get statistics about the queue""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + # Get pending count + cursor.execute("SELECT COUNT(*) FROM post_queue WHERE status = 'pending'") + pending = cursor.fetchone()[0] + + # Get posted count + cursor.execute("SELECT COUNT(*) FROM post_queue WHERE status = 'posted'") + posted = cursor.fetchone()[0] + + # Get posts in last hour + posts_last_hour = self.get_posts_in_last_hour() + + return { + 'pending': pending, + 'posted': posted, + 'posts_last_hour': posts_last_hour + } + + def cleanup_old_data(self, days_to_keep: int = 30): + """Clean up old data to keep database size manageable""" + cutoff_date = (datetime.now() - timedelta(days=days_to_keep)).isoformat() + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + # Clean up old posted posts + cursor.execute( + "DELETE FROM post_queue WHERE status = 'posted' AND posted_at < ?", + (cutoff_date,) + ) + + # Clean up old posting logs + cursor.execute( + "DELETE FROM posting_log WHERE posted_at < ?", + (cutoff_date,) + ) + + conn.commit() @@ -4,51 +4,66 @@ import hashlib import requests from dotenv import load_dotenv import os +import argparse +from datetime import datetime +from database import DatabaseManager load_dotenv() +DB_PATH = os.environ.get("DB_PATH", "rasis.db") +START_DATE = os.environ.get("START_DATE", "") +POSTS_PER_HOUR = int(os.environ.get("POSTS_PER_HOUR", "3")) +DRY_RUN = os.environ.get("DRY_RUN", "false").lower() == "true" -HASHED_NEWS_FILES = os.environ.get("HASH_FILE_PATH") +def is_post_after_start_date(post_date) -> bool: + """Check if post date is after the configured start date""" + if not START_DATE: + return True + try: + if isinstance(post_date, str): + post_timestamp = int(post_date) + else: + post_timestamp = int(post_date) + post_datetime = datetime.fromtimestamp(post_timestamp) + start_datetime = datetime.strptime(START_DATE, "%Y-%m-%d") + return post_datetime >= start_datetime + except (ValueError, TypeError): + return True -def generate_queued_posts() -> list: - def news_already_hashed(line: str) -> bool: - try: - with open(HASHED_NEWS_FILES, "r") as file: - return line in file.read() - except FileNotFoundError: - with open(HASHED_NEWS_FILES, "w") as file: - pass - return False - - def append_to_hash_file(line: str): - with open(HASHED_NEWS_FILES, "a") as file: - file.write(line + "\n") +def generate_queued_posts(db: DatabaseManager, dry_run: bool = False) -> list: + """Fetch new posts and add them to the queue""" url = "https://arcade-news.pinapelz.com/news.json" response = requests.get(url) - queue = [] + new_posts = [] if response.status_code == 200: data = response.json() news_posts = data["news_posts"] for post in news_posts: - hash = hashlib.sha256(f"{post['identifier'] + post['content'] + post['date']}".encode('utf-8')).hexdigest() - if news_already_hashed(hash): + if not is_post_after_start_date(post.get('timestamp', '')): + continue + post_hash = hashlib.sha256( + f"{post['identifier'] + post['content'] + post['date']}".encode('utf-8') + ).hexdigest() + if db.is_hash_processed(post_hash): + continue + content = generate_post_content(post) + if content is None: # skip if we do not handle the game continue - queue.append(post) - append_to_hash_file(hash) + if not dry_run: + db.add_to_queue(post, content) + db.add_processed_hash(post_hash) + else: + print(f"[DRY RUN] Would add to queue: {post['identifier']} - {post['date']}") + new_posts.append(post) + else: print(f"Failed to download JSON. Status code: {response.status_code}") - exit(1) - return queue + return new_posts + return new_posts def generate_post_content(post_data: dict) -> str: - """ - ๐ฐ GAME - DATE - [type] headline? - content? - ๐ [link] - ๐ผ [image?](link) - """ + """Generate post content from post data""" if "IIDX" in post_data["identifier"]: game = "beatmania IIDX" tags = "#iidx #beatmania #bemani" @@ -92,7 +107,7 @@ def generate_post_content(post_data: dict) -> str: tags = "#ongeki" elif "TAIKO" in post_data["identifier"]: game = "Taiko no Tatsujin" - tags = "#taikonotatsujin" + tags = "#taikonotatsufin" else: return None content = f"๐ฐ {game} - {post_data['date']}\n\n" @@ -100,47 +115,144 @@ def generate_post_content(post_data: dict) -> str: content = content + "The information below is written by AI / ไธ่จใฎๆ
ๅ ฑใฏAIใซใใฃใฆ็ๆใใใพใใใ\n\n" if post_data["type"] is not None: content = content + f"[{post_data['type']}] " - if post_data["headline"] is not None and post_data["headline"] != post_data["content"] : - content = content + f"[{post_data['headline']}]\n\n" + if post_data["headline"] is not None and post_data["headline"] != post_data["content"]: + content = content + f"{post_data['headline']}\n\n" if len(post_data["content"]) > 2500: truncated_content = post_data["content"][:2500] + "..." content = content + truncated_content + "\n\n" else: content = content + post_data["content"] + "\n\n" + if post_data["url"] is not None: content = content + f"๐ {post_data['url']}\n" + for i in range(len(post_data["images"])): content = content + f"๐ผ [Image{i+1}]({post_data['images'][i]['image']})\n" content = content + tags return content +def post_on_fedi(content: str, dry_run: bool = False) -> bool: + """Post content to Fediverse""" + if dry_run: + print("[DRY RUN] Would post to Fediverse:") + print("-" * 50) + print(content) + print("-" * 50) + return True + + try: + conn = http.client.HTTPSConnection(os.environ.get("SHARKEY_INSTANCE")) + + payload = { + "visibility": "public", + "text": content, + "localOnly": False, + "noExtractMentions": False, + "noExtractHashtags": False, + "noExtractEmojis": False + } + + headers = { + "Content-Type": "application/json", + "Authorization": "Bearer " + os.environ.get("SHARKEY_KEY") + } + + conn.request("POST", "/api/notes/create", json.dumps(payload), headers) + res = conn.getresponse() + data = res.read() + + # Check if post was successful + if res.status == 200: + return True + else: + print(f"Failed to post: {res.status} - {data.decode()}") + return False + except Exception as e: + print(f"Error posting to Fediverse: {e}") + return False + +def process_queue(db: DatabaseManager, dry_run: bool = False): + """Process the posting queue with rate limiting""" + if not db.can_post_more(POSTS_PER_HOUR): + posts_made = db.get_posts_in_last_hour() + print(f"Rate limit reached: {posts_made}/{POSTS_PER_HOUR} posts made in the last hour") + return + + posts_made = db.get_posts_in_last_hour() + available_slots = max(0, POSTS_PER_HOUR - posts_made) + + pending_posts = db.get_pending_posts(limit=available_slots) + + if not pending_posts: + print("No pending posts to process") + return + + print(f"Processing {len(pending_posts)} posts (rate limit: {posts_made + len(pending_posts)}/{POSTS_PER_HOUR})") + + for post in pending_posts: + post_id = post['id'] + content = post['content'] + post_data = post['post_data'] + + cleaned_content = content.encode("utf-8", "replace").decode("utf-8") + + game_name = post_data.get('identifier', 'Unknown') + print(f"Processing: {game_name} - {post_data.get('date', 'Unknown date')}") + + if post_on_fedi(cleaned_content, dry_run): + if not dry_run: + db.mark_post_as_posted(post_id) + print("โ Posted successfully" if not dry_run else "โ Would post successfully") + else: + print("โ Failed to post") + +def show_status(db: DatabaseManager): + """Show current queue and rate limit status""" + stats = db.get_queue_stats() + + print(f""" +=== RASIS Status === +Pending posts: {stats['pending']} +Posted posts: {stats['posted']} +Posts in last hour: {stats['posts_last_hour']}/{POSTS_PER_HOUR} +Rate limit slots available: {max(0, POSTS_PER_HOUR - stats['posts_last_hour'])} +Database path: {DB_PATH} +""") + +def main(): + parser = argparse.ArgumentParser(description='RASIS - Arcade News Posting Bot') + parser.add_argument('--dry-run', action='store_true', help='Simulate posting without actually posting') + parser.add_argument('--status', action='store_true', help='Show current status and exit') + parser.add_argument('--process-only', action='store_true', help='Only process queue, don\'t fetch new posts') + parser.add_argument('--fetch-only', action='store_true', help='Only fetch new posts, don\'t process queue') + parser.add_argument('--cleanup', action='store_true', help='Clean up old data from database') + + args = parser.parse_args() + dry_run = args.dry_run or DRY_RUN + + if dry_run: + print("๐งช DRY RUN MODE - No actual posting will occur") + + db = DatabaseManager(DB_PATH) -def post_on_fedi(content: str): - conn = http.client.HTTPSConnection(os.environ.get("SHARKEY_INSTANCE")) + if args.status: + show_status(db) + return - payload = { - "visibility": "public", - "text": content, - "localOnly": False, - "noExtractMentions": False, - "noExtractHashtags": False, - "noExtractEmojis": False - } + if args.cleanup: + print("Cleaning up old data...") + db.cleanup_old_data() + print("Cleanup complete") + return - headers = { - "Content-Type": "application/json", - "Authorization": "Bearer " + os.environ.get("SHARKEY_KEY") - } - conn.request("POST", "/api/notes/create", json.dumps(payload), headers) - res = conn.getresponse() - data = res.read() + if not args.process_only: + print("Fetching new posts...") + new_posts = generate_queued_posts(db, dry_run) + print(f"Added {len(new_posts)} new posts to queue") + if not args.fetch_only: + print("Processing queue...") + process_queue(db, dry_run) + show_status(db) if __name__ == "__main__": - queued_posts = generate_queued_posts() - for post in queued_posts: - content = generate_post_content(post) - if content is None: - continue - cleaned = content.encode("utf-8", "replace").decode("utf-8") - print(cleaned) - post_on_fedi(cleaned) + main() |
