aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPinapelz <yukais@pinapelz.com>2025-12-03 15:33:14 -0800
committerPinapelz <yukais@pinapelz.com>2025-12-03 16:10:48 -0800
commitcd5836ded746ba67159ec9014b75c3dc07ddc48c (patch)
tree346f5f66d8036437d79702311f9d82aec10a174f
parent0fd8a4dfa48651ba8d25b799e1a72e692b5e9e62 (diff)
change format to use 573-updates middleware as permalink
-rw-r--r--database.py164
-rw-r--r--rasis.py208
2 files changed, 127 insertions, 245 deletions
diff --git a/database.py b/database.py
index 35d239f..8ab2ab0 100644
--- a/database.py
+++ b/database.py
@@ -1,8 +1,6 @@
import sqlite3
-import json
from datetime import datetime, timedelta
-from typing import List, Dict, Optional
-import os
+from typing import Optional
class DatabaseManager:
def __init__(self, db_path: str = "rasis.db"):
@@ -10,160 +8,78 @@ class DatabaseManager:
self.init_database()
def init_database(self):
- """Initialize the database with required tables"""
+ """Initialize the database with a simple posted_posts table"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
- CREATE TABLE IF NOT EXISTS processed_hashes (
+ CREATE TABLE IF NOT EXISTS posted_posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
- hash TEXT UNIQUE NOT NULL,
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+ archive_hash TEXT UNIQUE NOT NULL,
+ posted_at TIMESTAMP NOT NULL
)
""")
cursor.execute("""
- CREATE TABLE IF NOT EXISTS post_queue (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- post_data TEXT NOT NULL,
- content TEXT NOT NULL,
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- posted_at TIMESTAMP NULL,
- status TEXT DEFAULT 'pending'
- )
- """)
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS posting_log (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- posted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- queue_id INTEGER,
- FOREIGN KEY (queue_id) REFERENCES post_queue (id)
- )
+ CREATE INDEX IF NOT EXISTS idx_posted_at
+ ON posted_posts(posted_at)
""")
conn.commit()
- def is_hash_processed(self, hash_value: str) -> bool:
- """Check if a news post hash has already been processed"""
+ def is_posted(self, archive_hash: str) -> bool:
+ """Check if we've already posted this hash"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
- cursor.execute("SELECT 1 FROM processed_hashes WHERE hash = ?", (hash_value,))
+ cursor.execute("SELECT 1 FROM posted_posts WHERE archive_hash = ?", (archive_hash,))
return cursor.fetchone() is not None
- def add_processed_hash(self, hash_value: str):
- """Add a hash to the processed hashes table"""
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.cursor()
- cursor.execute(
- "INSERT OR IGNORE INTO processed_hashes (hash) VALUES (?)",
- (hash_value,)
- )
- conn.commit()
-
- def add_to_queue(self, post_data: Dict, content: str) -> int:
- """Add a post to the queue and return the queue ID"""
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.cursor()
- cursor.execute(
- "INSERT INTO post_queue (post_data, content) VALUES (?, ?)",
- (json.dumps(post_data), content)
- )
- conn.commit()
- return cursor.lastrowid
-
- def get_pending_posts(self, limit: Optional[int] = None) -> List[Dict]:
- """Get pending posts from the queue"""
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.cursor()
- query = """
- SELECT id, post_data, content, created_at
- FROM post_queue
- WHERE status = 'pending'
- ORDER BY created_at ASC
- """
- if limit:
- query += f" LIMIT {limit}"
-
- cursor.execute(query)
- rows = cursor.fetchall()
-
- return [
- {
- 'id': row[0],
- 'post_data': json.loads(row[1]),
- 'content': row[2],
- 'created_at': row[3]
- }
- for row in rows
- ]
-
- def mark_post_as_posted(self, queue_id: int):
- """Mark a queued post as posted"""
+ def mark_as_posted(self, archive_hash: str):
+ """Mark a post as posted"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
- now = datetime.now().isoformat()
- cursor.execute(
- "UPDATE post_queue SET status = 'posted', posted_at = ? WHERE id = ?",
- (now, queue_id)
- )
cursor.execute(
- "INSERT INTO posting_log (queue_id) VALUES (?)",
- (queue_id,)
+ "INSERT OR IGNORE INTO posted_posts (archive_hash, posted_at) VALUES (?, ?)",
+ (archive_hash, datetime.now().isoformat())
)
conn.commit()
- def get_posts_in_last_hour(self) -> int:
- """Get the number of posts made in the last hour"""
- one_hour_ago = (datetime.now() - timedelta(hours=1)).isoformat()
-
+ def get_posts_count_last_hour(self) -> int:
+ """How many posts did we make in the last hour?"""
+ one_hour_ago = datetime.now() - timedelta(hours=1)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
- "SELECT COUNT(*) FROM posting_log WHERE posted_at >= ?",
- (one_hour_ago,)
+ "SELECT COUNT(*) FROM posted_posts WHERE posted_at >= ?",
+ (one_hour_ago.isoformat(),)
)
return cursor.fetchone()[0]
def can_post_more(self, max_per_hour: int) -> bool:
- """Check if we can post more based on rate limit"""
- return self.get_posts_in_last_hour() < max_per_hour
+ """Can we post more within the rate limit?"""
+ return self.get_posts_count_last_hour() < max_per_hour
- def get_queue_stats(self) -> Dict:
- """Get statistics about the queue"""
+ def get_next_post_time(self, max_per_hour: int) -> Optional[datetime]:
+ """Get the time when the next post can be made"""
+ if self.can_post_more(max_per_hour):
+ return None # Can post now
+ one_hour_ago = datetime.now() - timedelta(hours=1)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
-
- # Get pending count
- cursor.execute("SELECT COUNT(*) FROM post_queue WHERE status = 'pending'")
- pending = cursor.fetchone()[0]
-
- # Get posted count
- cursor.execute("SELECT COUNT(*) FROM post_queue WHERE status = 'posted'")
- posted = cursor.fetchone()[0]
-
- # Get posts in last hour
- posts_last_hour = self.get_posts_in_last_hour()
-
- return {
- 'pending': pending,
- 'posted': posted,
- 'posts_last_hour': posts_last_hour
- }
-
- def cleanup_old_data(self, days_to_keep: int = 30):
- """Clean up old data to keep database size manageable"""
- cutoff_date = (datetime.now() - timedelta(days=days_to_keep)).isoformat()
-
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.cursor()
-
- # Clean up old posted posts
cursor.execute(
- "DELETE FROM post_queue WHERE status = 'posted' AND posted_at < ?",
- (cutoff_date,)
+ "SELECT posted_at FROM posted_posts WHERE posted_at >= ? ORDER BY posted_at ASC LIMIT 1",
+ (one_hour_ago.isoformat(),)
)
+ result = cursor.fetchone()
+ if result:
+ oldest_post_time = datetime.fromisoformat(result[0])
+ return oldest_post_time + timedelta(hours=1)
+ return None
- # Clean up old posting logs
+ def cleanup_old_data(self, days_to_keep: int = 90):
+ """Optional: Clean up very old entries to keep DB small"""
+ cutoff_date = datetime.now() - timedelta(days=days_to_keep)
+ with sqlite3.connect(self.db_path) as conn:
+ cursor = conn.cursor()
cursor.execute(
- "DELETE FROM posting_log WHERE posted_at < ?",
- (cutoff_date,)
+ "DELETE FROM posted_posts WHERE posted_at < ?",
+ (cutoff_date.isoformat(),)
)
-
conn.commit()
diff --git a/rasis.py b/rasis.py
index 4223718..3ccb421 100644
--- a/rasis.py
+++ b/rasis.py
@@ -1,6 +1,5 @@
import http.client
import json
-import hashlib
import requests
from dotenv import load_dotenv
import os
@@ -20,48 +19,13 @@ def is_post_after_start_date(post_date) -> bool:
if not START_DATE:
return True
try:
- if isinstance(post_date, str):
- post_timestamp = int(post_date)
- else:
- post_timestamp = int(post_date)
+ post_timestamp = int(post_date) if isinstance(post_date, str) else int(post_date)
post_datetime = datetime.fromtimestamp(post_timestamp)
start_datetime = datetime.strptime(START_DATE, "%Y-%m-%d")
return post_datetime >= start_datetime
except (ValueError, TypeError):
return True
-def generate_queued_posts(db: DatabaseManager, dry_run: bool = False) -> list:
- """Fetch new posts and add them to the queue"""
- url = "https://arcade-news.pinapelz.com/news.json"
- response = requests.get(url)
- new_posts = []
- if response.status_code == 200:
- data = response.json()
- news_posts = data["news_posts"]
- for post in news_posts:
- if not is_post_after_start_date(post.get('timestamp', '')):
- continue
- post_hash = hashlib.sha256(
- f"{post['identifier'] + post['content'] + post['date']}".encode('utf-8')
- ).hexdigest()
- if db.is_hash_processed(post_hash):
- continue
- content = generate_post_content(post)
- if content is None: # skip if we do not handle the game
- continue
- if not dry_run:
- db.add_to_queue(post, content)
- db.add_processed_hash(post_hash)
- else:
- print(f"[DRY RUN] Would add to queue: {post['identifier']} - {post['date']}")
- new_posts.append(post)
-
- else:
- print(f"Failed to download JSON. Status code: {response.status_code}")
- return new_posts
-
- return new_posts
-
def generate_post_content(post_data: dict) -> str:
"""Generate post content from post data"""
if "IIDX" in post_data["identifier"]:
@@ -110,39 +74,36 @@ def generate_post_content(post_data: dict) -> str:
tags = "#taikonotatsufin"
else:
return None
+
content = f"๐Ÿ“ฐ {game} - {post_data['date']}\n\n"
+
if post_data["is_ai_summary"]:
- content = content + "The information below is written by AI / ไธŠ่จ˜ใฎๆƒ…ๅ ฑใฏAIใซใ‚ˆใฃใฆ็”Ÿๆˆใ•ใ‚Œใพใ—ใŸใ€‚\n\n"
+ content += "The information below is written by AI / ไธŠ่จ˜ใฎๆƒ…ๅ ฑใฏAIใซใ‚ˆใฃใฆ็”Ÿๆˆใ•ใ‚Œใพใ—ใŸใ€‚\n\n"
+
if post_data["type"] is not None:
- content = content + f"[{post_data['type']}] "
+ content += f"[{post_data['type']}] "
+
if post_data["headline"] is not None and post_data["headline"] != post_data["content"]:
- content = content + f"{post_data['headline']}\n\n"
+ content += f"{post_data['headline']}\n\n"
+
if len(post_data["content"]) > 2500:
truncated_content = post_data["content"][:2500] + "..."
- content = content + truncated_content + "\n\n"
+ content += truncated_content + "\n\n"
else:
- content = content + post_data["content"] + "\n\n"
+ content += post_data["content"] + "\n\n"
if post_data["url"] is not None:
- content = content + f"๐Ÿ”— {post_data['url']}\n"
+ content += f"๐Ÿ”— {post_data['url']}\n"
- for i in range(len(post_data["images"])):
- content = content + f"๐Ÿ–ผ [Image{i+1}]({post_data['images'][i]['image']})\n"
- content = content + tags
+ content += f"[๐Ÿ”— MORE DETAILS HERE](https://ac.moekyun.me/news?post={post_data['archive_hash']})\n"
+ content += tags
return content
def post_on_fedi(content: str, dry_run: bool = False) -> bool:
"""Post content to Fediverse"""
- if dry_run:
- print("[DRY RUN] Would post to Fediverse:")
- print("-" * 50)
- print(content)
- print("-" * 50)
- return True
-
+ print(f"[DRY RUN] Would post:\n{'-' * 60}\n{content}\n{'-' * 60}")
try:
conn = http.client.HTTPSConnection(os.environ.get("SHARKEY_INSTANCE"))
-
payload = {
"visibility": "public",
"text": content,
@@ -151,17 +112,14 @@ def post_on_fedi(content: str, dry_run: bool = False) -> bool:
"noExtractHashtags": False,
"noExtractEmojis": False
}
-
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + os.environ.get("SHARKEY_KEY")
}
-
conn.request("POST", "/api/notes/create", json.dumps(payload), headers)
res = conn.getresponse()
data = res.read()
- # Check if post was successful
if res.status == 200:
return True
else:
@@ -171,88 +129,96 @@ def post_on_fedi(content: str, dry_run: bool = False) -> bool:
print(f"Error posting to Fediverse: {e}")
return False
-def process_queue(db: DatabaseManager, dry_run: bool = False):
- """Process the posting queue with rate limiting"""
- if not db.can_post_more(POSTS_PER_HOUR):
- posts_made = db.get_posts_in_last_hour()
- print(f"Rate limit reached: {posts_made}/{POSTS_PER_HOUR} posts made in the last hour")
- return
+def main():
+ parser = argparse.ArgumentParser(description='RASIS - Simple Arcade News Bot')
+ parser.add_argument('--dry-run', action='store_true', help='Simulate without posting')
+ parser.add_argument('--status', action='store_true', help='Show current status')
+ parser.add_argument('--cleanup', action='store_true', help='Clean up old data (90+ days)')
+
+ args = parser.parse_args()
+ dry_run = args.dry_run or DRY_RUN
+
+ db = DatabaseManager(DB_PATH)
- posts_made = db.get_posts_in_last_hour()
- available_slots = max(0, POSTS_PER_HOUR - posts_made)
+ if args.status:
+ posts_in_hour = db.get_posts_count_last_hour()
+ print(f"Posts in last hour: {posts_in_hour}/{POSTS_PER_HOUR}")
+ print(f"Can post more: {db.can_post_more(POSTS_PER_HOUR)}")
- pending_posts = db.get_pending_posts(limit=available_slots)
+ next_post_time = db.get_next_post_time(POSTS_PER_HOUR)
+ if next_post_time:
+ time_until = next_post_time - datetime.now()
+ minutes = int(time_until.total_seconds() / 60)
+ print(f"Next post can be made at: {next_post_time.strftime('%Y-%m-%d %H:%M:%S')}")
+ print(f"Time until next post: {minutes} minutes")
+ else:
+ print("Can post now!")
+ return
- if not pending_posts:
- print("No pending posts to process")
+ if args.cleanup:
+ print("Cleaning up old data...")
+ db.cleanup_old_data()
return
- print(f"Processing {len(pending_posts)} posts (rate limit: {posts_made + len(pending_posts)}/{POSTS_PER_HOUR})")
+ if not db.can_post_more(POSTS_PER_HOUR):
+ posts_made = db.get_posts_count_last_hour()
+ print(f"Rate limit reached: {posts_made}/{POSTS_PER_HOUR} posts in the last hour")
+ return
- for post in pending_posts:
- post_id = post['id']
- content = post['content']
- post_data = post['post_data']
+ print(f"Starting to process posts (rate limit: {POSTS_PER_HOUR}/hour)")
+ url = "https://arcade-news.pinapelz.com/news.json"
+ response = requests.get(url)
+ if response.status_code != 200:
+ print(f"Failed to download JSON. Status code: {response.status_code}")
+ return
- cleaned_content = content.encode("utf-8", "replace").decode("utf-8")
+ data = response.json()
+ news_posts = data["news_posts"]
- game_name = post_data.get('identifier', 'Unknown')
- print(f"Processing: {game_name} - {post_data.get('date', 'Unknown date')}")
+ posts_to_make = []
+ for post in news_posts:
+ if not is_post_after_start_date(post.get('timestamp', '')):
+ continue
- if post_on_fedi(cleaned_content, dry_run):
- if not dry_run:
- db.mark_post_as_posted(post_id)
- print("โœ“ Posted successfully" if not dry_run else "โœ“ Would post successfully")
- else:
- print("โœ— Failed to post")
+ archive_hash = post["archive_hash"]
+ if db.is_posted(archive_hash):
+ continue
-def show_status(db: DatabaseManager):
- """Show current queue and rate limit status"""
- stats = db.get_queue_stats()
+ content = generate_post_content(post)
+ if content is None:
+ continue
- print(f"""
-=== RASIS Status ===
-Pending posts: {stats['pending']}
-Posted posts: {stats['posted']}
-Posts in last hour: {stats['posts_last_hour']}/{POSTS_PER_HOUR}
-Rate limit slots available: {max(0, POSTS_PER_HOUR - stats['posts_last_hour'])}
-Database path: {DB_PATH}
-""")
+ posts_to_make.append((post, content, archive_hash))
-def main():
- parser = argparse.ArgumentParser(description='RASIS - Arcade News Posting Bot')
- parser.add_argument('--dry-run', action='store_true', help='Simulate posting without actually posting')
- parser.add_argument('--status', action='store_true', help='Show current status and exit')
- parser.add_argument('--process-only', action='store_true', help='Only process queue, don\'t fetch new posts')
- parser.add_argument('--fetch-only', action='store_true', help='Only fetch new posts, don\'t process queue')
- parser.add_argument('--cleanup', action='store_true', help='Clean up old data from database')
+ if not posts_to_make:
+ print("No new posts to make")
+ return
- args = parser.parse_args()
- dry_run = args.dry_run or DRY_RUN
+ print(f"Found {len(posts_to_make)} new posts to potentially make")
- if dry_run:
- print("๐Ÿงช DRY RUN MODE - No actual posting will occur")
+ posts_made_this_run = 0
+ for post_data, content, archive_hash in posts_to_make:
+ if not db.can_post_more(POSTS_PER_HOUR):
+ current_count = db.get_posts_count_last_hour()
+ print(f"Rate limit reached: {current_count}/{POSTS_PER_HOUR}. Stopping.")
+ break
- db = DatabaseManager(DB_PATH)
+ game = post_data.get('identifier', 'Unknown')
+ date = post_data.get('date', 'Unknown date')
- if args.status:
- show_status(db)
- return
+ print(f"Posting: {game} - {date}")
- if args.cleanup:
- print("Cleaning up old data...")
- db.cleanup_old_data()
- print("Cleanup complete")
- return
+ if post_on_fedi(content, dry_run):
+ if not dry_run:
+ db.mark_as_posted(archive_hash)
+ posts_made_this_run += 1
+ print(f" โœ“ Posted successfully ({posts_made_this_run} posted this run)")
+ else:
+ print(" โœ— Failed to post")
+ break
- if not args.process_only:
- print("Fetching new posts...")
- new_posts = generate_queued_posts(db, dry_run)
- print(f"Added {len(new_posts)} new posts to queue")
- if not args.fetch_only:
- print("Processing queue...")
- process_queue(db, dry_run)
- show_status(db)
+ posts_in_hour = db.get_posts_count_last_hour()
+ print(f"\nStatus: {posts_in_hour}/{POSTS_PER_HOUR} posts made in last hour")
if __name__ == "__main__":
main()
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage