aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.env.template22
-rw-r--r--.gitignore2
-rw-r--r--database.py169
-rw-r--r--rasis.py224
4 files changed, 357 insertions, 60 deletions
diff --git a/.env.template b/.env.template
index f300eb4..5d5833d 100644
--- a/.env.template
+++ b/.env.template
@@ -1,3 +1,19 @@
-SHARKEY_KEY=
-SHARKEY_INSTANCE=
-HASH_FILE_PATH=
+# Database configuration
+DB_PATH=rasis.db
+
+# Rate limiting - maximum posts per hour
+POSTS_PER_HOUR=3
+
+# Start date filter (YYYY-MM-DD format) - only process posts after this date
+# Leave empty to process all posts
+START_DATE=2025-12-03
+
+# Dry run mode - set to "true" to simulate posting without actually posting
+DRY_RUN=false
+
+# Fediverse/Sharkey instance configuration
+SHARKEY_INSTANCE=your-instance.com
+SHARKEY_KEY=your-api-key-here
+
+# Legacy file path (not used with SQLite version)
+# HASH_FILE_PATH=processed_hashes.txt
diff --git a/.gitignore b/.gitignore
index 4182b08..e5df151 100644
--- a/.gitignore
+++ b/.gitignore
@@ -172,4 +172,4 @@ cython_debug/
# PyPI configuration file
.pypirc
-hashes.txt
+*.db
diff --git a/database.py b/database.py
new file mode 100644
index 0000000..35d239f
--- /dev/null
+++ b/database.py
@@ -0,0 +1,169 @@
+import sqlite3
+import json
+from datetime import datetime, timedelta
+from typing import List, Dict, Optional
+import os
+
+class DatabaseManager:
+ def __init__(self, db_path: str = "rasis.db"):
+ self.db_path = db_path
+ self.init_database()
+
+ def init_database(self):
+ """Initialize the database with required tables"""
+ with sqlite3.connect(self.db_path) as conn:
+ cursor = conn.cursor()
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS processed_hashes (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ hash TEXT UNIQUE NOT NULL,
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+ )
+ """)
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS post_queue (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ post_data TEXT NOT NULL,
+ content TEXT NOT NULL,
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ posted_at TIMESTAMP NULL,
+ status TEXT DEFAULT 'pending'
+ )
+ """)
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS posting_log (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ posted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ queue_id INTEGER,
+ FOREIGN KEY (queue_id) REFERENCES post_queue (id)
+ )
+ """)
+ conn.commit()
+
+ def is_hash_processed(self, hash_value: str) -> bool:
+ """Check if a news post hash has already been processed"""
+ with sqlite3.connect(self.db_path) as conn:
+ cursor = conn.cursor()
+ cursor.execute("SELECT 1 FROM processed_hashes WHERE hash = ?", (hash_value,))
+ return cursor.fetchone() is not None
+
+ def add_processed_hash(self, hash_value: str):
+ """Add a hash to the processed hashes table"""
+ with sqlite3.connect(self.db_path) as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ "INSERT OR IGNORE INTO processed_hashes (hash) VALUES (?)",
+ (hash_value,)
+ )
+ conn.commit()
+
+ def add_to_queue(self, post_data: Dict, content: str) -> int:
+ """Add a post to the queue and return the queue ID"""
+ with sqlite3.connect(self.db_path) as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ "INSERT INTO post_queue (post_data, content) VALUES (?, ?)",
+ (json.dumps(post_data), content)
+ )
+ conn.commit()
+ return cursor.lastrowid
+
+ def get_pending_posts(self, limit: Optional[int] = None) -> List[Dict]:
+ """Get pending posts from the queue"""
+ with sqlite3.connect(self.db_path) as conn:
+ cursor = conn.cursor()
+ query = """
+ SELECT id, post_data, content, created_at
+ FROM post_queue
+ WHERE status = 'pending'
+ ORDER BY created_at ASC
+ """
+ if limit:
+ query += f" LIMIT {limit}"
+
+ cursor.execute(query)
+ rows = cursor.fetchall()
+
+ return [
+ {
+ 'id': row[0],
+ 'post_data': json.loads(row[1]),
+ 'content': row[2],
+ 'created_at': row[3]
+ }
+ for row in rows
+ ]
+
+ def mark_post_as_posted(self, queue_id: int):
+ """Mark a queued post as posted"""
+ with sqlite3.connect(self.db_path) as conn:
+ cursor = conn.cursor()
+ now = datetime.now().isoformat()
+ cursor.execute(
+ "UPDATE post_queue SET status = 'posted', posted_at = ? WHERE id = ?",
+ (now, queue_id)
+ )
+ cursor.execute(
+ "INSERT INTO posting_log (queue_id) VALUES (?)",
+ (queue_id,)
+ )
+ conn.commit()
+
+ def get_posts_in_last_hour(self) -> int:
+ """Get the number of posts made in the last hour"""
+ one_hour_ago = (datetime.now() - timedelta(hours=1)).isoformat()
+
+ with sqlite3.connect(self.db_path) as conn:
+ cursor = conn.cursor()
+ cursor.execute(
+ "SELECT COUNT(*) FROM posting_log WHERE posted_at >= ?",
+ (one_hour_ago,)
+ )
+ return cursor.fetchone()[0]
+
+ def can_post_more(self, max_per_hour: int) -> bool:
+ """Check if we can post more based on rate limit"""
+ return self.get_posts_in_last_hour() < max_per_hour
+
+ def get_queue_stats(self) -> Dict:
+ """Get statistics about the queue"""
+ with sqlite3.connect(self.db_path) as conn:
+ cursor = conn.cursor()
+
+ # Get pending count
+ cursor.execute("SELECT COUNT(*) FROM post_queue WHERE status = 'pending'")
+ pending = cursor.fetchone()[0]
+
+ # Get posted count
+ cursor.execute("SELECT COUNT(*) FROM post_queue WHERE status = 'posted'")
+ posted = cursor.fetchone()[0]
+
+ # Get posts in last hour
+ posts_last_hour = self.get_posts_in_last_hour()
+
+ return {
+ 'pending': pending,
+ 'posted': posted,
+ 'posts_last_hour': posts_last_hour
+ }
+
+ def cleanup_old_data(self, days_to_keep: int = 30):
+ """Clean up old data to keep database size manageable"""
+ cutoff_date = (datetime.now() - timedelta(days=days_to_keep)).isoformat()
+
+ with sqlite3.connect(self.db_path) as conn:
+ cursor = conn.cursor()
+
+ # Clean up old posted posts
+ cursor.execute(
+ "DELETE FROM post_queue WHERE status = 'posted' AND posted_at < ?",
+ (cutoff_date,)
+ )
+
+ # Clean up old posting logs
+ cursor.execute(
+ "DELETE FROM posting_log WHERE posted_at < ?",
+ (cutoff_date,)
+ )
+
+ conn.commit()
diff --git a/rasis.py b/rasis.py
index 2f68596..4223718 100644
--- a/rasis.py
+++ b/rasis.py
@@ -4,51 +4,66 @@ import hashlib
import requests
from dotenv import load_dotenv
import os
+import argparse
+from datetime import datetime
+from database import DatabaseManager
load_dotenv()
+DB_PATH = os.environ.get("DB_PATH", "rasis.db")
+START_DATE = os.environ.get("START_DATE", "")
+POSTS_PER_HOUR = int(os.environ.get("POSTS_PER_HOUR", "3"))
+DRY_RUN = os.environ.get("DRY_RUN", "false").lower() == "true"
-HASHED_NEWS_FILES = os.environ.get("HASH_FILE_PATH")
+def is_post_after_start_date(post_date) -> bool:
+ """Check if post date is after the configured start date"""
+ if not START_DATE:
+ return True
+ try:
+ if isinstance(post_date, str):
+ post_timestamp = int(post_date)
+ else:
+ post_timestamp = int(post_date)
+ post_datetime = datetime.fromtimestamp(post_timestamp)
+ start_datetime = datetime.strptime(START_DATE, "%Y-%m-%d")
+ return post_datetime >= start_datetime
+ except (ValueError, TypeError):
+ return True
-def generate_queued_posts() -> list:
- def news_already_hashed(line: str) -> bool:
- try:
- with open(HASHED_NEWS_FILES, "r") as file:
- return line in file.read()
- except FileNotFoundError:
- with open(HASHED_NEWS_FILES, "w") as file:
- pass
- return False
-
- def append_to_hash_file(line: str):
- with open(HASHED_NEWS_FILES, "a") as file:
- file.write(line + "\n")
+def generate_queued_posts(db: DatabaseManager, dry_run: bool = False) -> list:
+ """Fetch new posts and add them to the queue"""
url = "https://arcade-news.pinapelz.com/news.json"
response = requests.get(url)
- queue = []
+ new_posts = []
if response.status_code == 200:
data = response.json()
news_posts = data["news_posts"]
for post in news_posts:
- hash = hashlib.sha256(f"{post['identifier'] + post['content'] + post['date']}".encode('utf-8')).hexdigest()
- if news_already_hashed(hash):
+ if not is_post_after_start_date(post.get('timestamp', '')):
+ continue
+ post_hash = hashlib.sha256(
+ f"{post['identifier'] + post['content'] + post['date']}".encode('utf-8')
+ ).hexdigest()
+ if db.is_hash_processed(post_hash):
+ continue
+ content = generate_post_content(post)
+ if content is None: # skip if we do not handle the game
continue
- queue.append(post)
- append_to_hash_file(hash)
+ if not dry_run:
+ db.add_to_queue(post, content)
+ db.add_processed_hash(post_hash)
+ else:
+ print(f"[DRY RUN] Would add to queue: {post['identifier']} - {post['date']}")
+ new_posts.append(post)
+
else:
print(f"Failed to download JSON. Status code: {response.status_code}")
- exit(1)
- return queue
+ return new_posts
+ return new_posts
def generate_post_content(post_data: dict) -> str:
- """
- ๐Ÿ“ฐ GAME - DATE
- [type] headline?
- content?
- ๐Ÿ”— [link]
- ๐Ÿ–ผ [image?](link)
- """
+ """Generate post content from post data"""
if "IIDX" in post_data["identifier"]:
game = "beatmania IIDX"
tags = "#iidx #beatmania #bemani"
@@ -92,7 +107,7 @@ def generate_post_content(post_data: dict) -> str:
tags = "#ongeki"
elif "TAIKO" in post_data["identifier"]:
game = "Taiko no Tatsujin"
- tags = "#taikonotatsujin"
+ tags = "#taikonotatsufin"
else:
return None
content = f"๐Ÿ“ฐ {game} - {post_data['date']}\n\n"
@@ -100,47 +115,144 @@ def generate_post_content(post_data: dict) -> str:
content = content + "The information below is written by AI / ไธŠ่จ˜ใฎๆƒ…ๅ ฑใฏAIใซใ‚ˆใฃใฆ็”Ÿๆˆใ•ใ‚Œใพใ—ใŸใ€‚\n\n"
if post_data["type"] is not None:
content = content + f"[{post_data['type']}] "
- if post_data["headline"] is not None and post_data["headline"] != post_data["content"] :
- content = content + f"[{post_data['headline']}]\n\n"
+ if post_data["headline"] is not None and post_data["headline"] != post_data["content"]:
+ content = content + f"{post_data['headline']}\n\n"
if len(post_data["content"]) > 2500:
truncated_content = post_data["content"][:2500] + "..."
content = content + truncated_content + "\n\n"
else:
content = content + post_data["content"] + "\n\n"
+
if post_data["url"] is not None:
content = content + f"๐Ÿ”— {post_data['url']}\n"
+
for i in range(len(post_data["images"])):
content = content + f"๐Ÿ–ผ [Image{i+1}]({post_data['images'][i]['image']})\n"
content = content + tags
return content
+def post_on_fedi(content: str, dry_run: bool = False) -> bool:
+ """Post content to Fediverse"""
+ if dry_run:
+ print("[DRY RUN] Would post to Fediverse:")
+ print("-" * 50)
+ print(content)
+ print("-" * 50)
+ return True
+
+ try:
+ conn = http.client.HTTPSConnection(os.environ.get("SHARKEY_INSTANCE"))
+
+ payload = {
+ "visibility": "public",
+ "text": content,
+ "localOnly": False,
+ "noExtractMentions": False,
+ "noExtractHashtags": False,
+ "noExtractEmojis": False
+ }
+
+ headers = {
+ "Content-Type": "application/json",
+ "Authorization": "Bearer " + os.environ.get("SHARKEY_KEY")
+ }
+
+ conn.request("POST", "/api/notes/create", json.dumps(payload), headers)
+ res = conn.getresponse()
+ data = res.read()
+
+ # Check if post was successful
+ if res.status == 200:
+ return True
+ else:
+ print(f"Failed to post: {res.status} - {data.decode()}")
+ return False
+ except Exception as e:
+ print(f"Error posting to Fediverse: {e}")
+ return False
+
+def process_queue(db: DatabaseManager, dry_run: bool = False):
+ """Process the posting queue with rate limiting"""
+ if not db.can_post_more(POSTS_PER_HOUR):
+ posts_made = db.get_posts_in_last_hour()
+ print(f"Rate limit reached: {posts_made}/{POSTS_PER_HOUR} posts made in the last hour")
+ return
+
+ posts_made = db.get_posts_in_last_hour()
+ available_slots = max(0, POSTS_PER_HOUR - posts_made)
+
+ pending_posts = db.get_pending_posts(limit=available_slots)
+
+ if not pending_posts:
+ print("No pending posts to process")
+ return
+
+ print(f"Processing {len(pending_posts)} posts (rate limit: {posts_made + len(pending_posts)}/{POSTS_PER_HOUR})")
+
+ for post in pending_posts:
+ post_id = post['id']
+ content = post['content']
+ post_data = post['post_data']
+
+ cleaned_content = content.encode("utf-8", "replace").decode("utf-8")
+
+ game_name = post_data.get('identifier', 'Unknown')
+ print(f"Processing: {game_name} - {post_data.get('date', 'Unknown date')}")
+
+ if post_on_fedi(cleaned_content, dry_run):
+ if not dry_run:
+ db.mark_post_as_posted(post_id)
+ print("โœ“ Posted successfully" if not dry_run else "โœ“ Would post successfully")
+ else:
+ print("โœ— Failed to post")
+
+def show_status(db: DatabaseManager):
+ """Show current queue and rate limit status"""
+ stats = db.get_queue_stats()
+
+ print(f"""
+=== RASIS Status ===
+Pending posts: {stats['pending']}
+Posted posts: {stats['posted']}
+Posts in last hour: {stats['posts_last_hour']}/{POSTS_PER_HOUR}
+Rate limit slots available: {max(0, POSTS_PER_HOUR - stats['posts_last_hour'])}
+Database path: {DB_PATH}
+""")
+
+def main():
+ parser = argparse.ArgumentParser(description='RASIS - Arcade News Posting Bot')
+ parser.add_argument('--dry-run', action='store_true', help='Simulate posting without actually posting')
+ parser.add_argument('--status', action='store_true', help='Show current status and exit')
+ parser.add_argument('--process-only', action='store_true', help='Only process queue, don\'t fetch new posts')
+ parser.add_argument('--fetch-only', action='store_true', help='Only fetch new posts, don\'t process queue')
+ parser.add_argument('--cleanup', action='store_true', help='Clean up old data from database')
+
+ args = parser.parse_args()
+ dry_run = args.dry_run or DRY_RUN
+
+ if dry_run:
+ print("๐Ÿงช DRY RUN MODE - No actual posting will occur")
+
+ db = DatabaseManager(DB_PATH)
-def post_on_fedi(content: str):
- conn = http.client.HTTPSConnection(os.environ.get("SHARKEY_INSTANCE"))
+ if args.status:
+ show_status(db)
+ return
- payload = {
- "visibility": "public",
- "text": content,
- "localOnly": False,
- "noExtractMentions": False,
- "noExtractHashtags": False,
- "noExtractEmojis": False
- }
+ if args.cleanup:
+ print("Cleaning up old data...")
+ db.cleanup_old_data()
+ print("Cleanup complete")
+ return
- headers = {
- "Content-Type": "application/json",
- "Authorization": "Bearer " + os.environ.get("SHARKEY_KEY")
- }
- conn.request("POST", "/api/notes/create", json.dumps(payload), headers)
- res = conn.getresponse()
- data = res.read()
+ if not args.process_only:
+ print("Fetching new posts...")
+ new_posts = generate_queued_posts(db, dry_run)
+ print(f"Added {len(new_posts)} new posts to queue")
+ if not args.fetch_only:
+ print("Processing queue...")
+ process_queue(db, dry_run)
+ show_status(db)
if __name__ == "__main__":
- queued_posts = generate_queued_posts()
- for post in queued_posts:
- content = generate_post_content(post)
- if content is None:
- continue
- cleaned = content.encode("utf-8", "replace").decode("utf-8")
- print(cleaned)
- post_on_fedi(cleaned)
+ main()
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage