diff options
| author | Pinapelz <yukais@pinapelz.com> | 2025-10-02 19:22:55 -0700 |
|---|---|---|
| committer | Pinapelz <yukais@pinapelz.com> | 2025-10-02 20:39:34 -0700 |
| commit | 7461f283233b571a3012c07df55974563318a64d (patch) | |
| tree | 96d0d34936ceb6f30bacd0065679cc6772a20759 | |
| parent | c69189dbf844842049ca8a511803da89b5d1d9e9 (diff) | |
implement rss feed generation
| -rw-r--r-- | constants.py | 2 | ||||
| -rw-r--r-- | feed.py | 92 | ||||
| -rw-r--r-- | generate.py | 44 |
3 files changed, 127 insertions, 11 deletions
diff --git a/constants.py b/constants.py index 5c510d2..30452cc 100644 --- a/constants.py +++ b/constants.py @@ -43,6 +43,8 @@ WACCA_PLUS_MAGIC_STRING="1206017527864369262" MUSECA_PLUS_NEWS_SITE="https://museca.plus/" RB_DELUXE_PLUS_NEWS="https://dxplus.chilundui.com/" +RSS_FEED_URL="https://arcade-news.pinapelz.com" + class CHUNITHM_VERSION(Enum): LUMINOUS_PLUS = 1 VERSE = 2 @@ -0,0 +1,92 @@ +import os +import json +import requests +import mimetypes +import xml.etree.ElementTree as ET +from datetime import datetime, timezone +from constants import RSS_FEED_URL + +def _wrap_cdata(text: str) -> str: + if text is None: + return "" + return f"<![CDATA[{text}]]>" + +def build_rss_from_news_feed(title: str, description: str, json_file_path: str, output_path: str, limit: int = 12): + """ + Build RSS from an existing JSON file containing news_posts. + Reads the JSON, extracts posts, and generates a valid RSS XML. + """ + if not os.path.exists(json_file_path): + raise FileNotFoundError(f"JSON file not found: {json_file_path}") + + with open(json_file_path, "r", encoding="utf-8") as f: + data = json.load(f) + + news_feeds = data.get("news_posts", [])[:limit] + + file_name = os.path.basename(output_path) + url_to_feed = f"{RSS_FEED_URL}/{file_name}" + + rss = ET.Element("rss", { + "version": "2.0", + "xmlns:atom": "http://www.w3.org/2005/Atom" + }) + channel = ET.SubElement(rss, "channel") + ET.SubElement(channel, "title").text = title + ET.SubElement(channel, "description").text = description + ET.SubElement(channel, "link").text = url_to_feed + ET.SubElement(channel, "{http://www.w3.org/2005/Atom}link", { + "href": url_to_feed, + "rel": "self", + "type": "application/rss+xml" + }) + + for post in news_feeds: + item = ET.SubElement(channel, "item") + # Title + post_title = post.get("headline") or post.get("en_headline") or post.get("content", "")[:50] + ET.SubElement(item, "title").text = post_title + # Link + ET.SubElement(item, "link").text = post.get("url") or "https://arcade.moekyun.me" + # Description (combine JP + EN if available) + jp_content = post.get("content", "") + en_headline = post.get("en_headline") + en_content = post.get("en_content") + desc_parts = [] + if jp_content: + desc_parts.append(jp_content.strip().replace("\n", "<br/>")) + + if en_headline or en_content: + desc_parts.append("<hr/><b>English Translation</b><br/>") + if en_headline: + desc_parts.append(f"<i>{en_headline.strip()}</i><br/>") + if en_content: + desc_parts.append(en_content.strip().replace("\n", "<br/>")) + + desc_combined = "\n".join(desc_parts) + ET.SubElement(item, "description").text = _wrap_cdata(desc_combined) + + # pubDate + if "timestamp" in post and post["timestamp"]: + pub_date = datetime.fromtimestamp(post["timestamp"], timezone.utc).strftime("%a, %d %b %Y %H:%M:%S +0000") + ET.SubElement(item, "pubDate").text = pub_date + + # First image enclosure (if any) + images = post.get("images", []) + if images: + image_url = images[0].get("image") + if image_url: + mime = mimetypes.guess_type(image_url)[0] or "application/octet-stream" + length = "0" + try: + r = requests.head(image_url, timeout=5, allow_redirects=True) + if "Content-Length" in r.headers: + length = r.headers["Content-Length"] + except Exception: + pass + ET.SubElement(item, "enclosure", url=image_url, type=mime, length=length) + + # Write out + tree = ET.ElementTree(rss) + ET.indent(tree, space=" ") + tree.write(output_path, encoding="utf-8", xml_declaration=True) diff --git a/generate.py b/generate.py index 3611100..1fe11ce 100644 --- a/generate.py +++ b/generate.py @@ -11,14 +11,19 @@ import os from dotenv import load_dotenv from datetime import datetime, timedelta from database import Database +from feed import build_rss_from_news_feed load_dotenv() OUTPUT_DIR = "news" ARCHIVE_NEWS = True +GENERATE_RSS_FEEDS = True + def compute_json_hash(data): - return hashlib.sha256(json.dumps(data, sort_keys=True).encode('utf-8')).hexdigest() + return hashlib.sha256( + json.dumps(data, sort_keys=True).encode("utf-8") + ).hexdigest() def save_news_to_db(news_feed: list): @@ -29,6 +34,7 @@ def save_news_to_db(news_feed: list): database.add_news_entry(key, entry) database.close() + def create_merged_feed(*news_lists, limit=constants.DAYS_LIMIT): """ Generator-based memory-efficient merging of multiple news feeds. @@ -40,9 +46,9 @@ def create_merged_feed(*news_lists, limit=constants.DAYS_LIMIT): for news_list in news_lists if news_list for item in news_list - if datetime.fromtimestamp(item['timestamp']) >= cutoff + if datetime.fromtimestamp(item["timestamp"]) >= cutoff ) - return sorted(recent_items, key=lambda x: x['timestamp'], reverse=True) + return sorted(recent_items, key=lambda x: x["timestamp"], reverse=True) def attach_news_meta_data(news_data: list): @@ -52,11 +58,11 @@ def attach_news_meta_data(news_data: list): """ return { "fetch_time": int(datetime.now().timestamp()), - "news_posts": news_data + "news_posts": news_data, } -def log_output(message: str, type: str="DEBUG"): +def log_output(message: str, type: str = "DEBUG"): """ Prints a log line output with a timestamp """ @@ -64,7 +70,7 @@ def log_output(message: str, type: str="DEBUG"): print(f"[{timestamp}] [{type}]: {message}") -def generate_news_file(filename, url, version=None): +def generate_news_file(filename, url, version=None, formatted_name: str = None): log_output(f"Fetching {filename.upper()} News Data", "NEWS") news_data = None try: @@ -74,16 +80,32 @@ def generate_news_file(filename, url, version=None): print("[ERROR] Wasn't able to fetch news. Skipping...") path = f"{OUTPUT_DIR}/{filename}.json" + if news_data: log_output(f"Success. Got {filename.upper()} News Data. Saving to file...", "NEWS") - with open(path, 'w') as f: - json.dump(attach_news_meta_data(news_data), f, indent=2) + with open(path, "w", encoding="utf-8") as f: + json.dump(attach_news_meta_data(news_data), f, indent=2, ensure_ascii=False) + + if GENERATE_RSS_FEEDS: + rss_file_path = f"{OUTPUT_DIR}/{filename}.xml" + log_output(f"Generating RSS Feed: {rss_file_path}") + if not formatted_name: + formatted_name = filename + title = f"{formatted_name} News RSS Feed" + description = f"The latest information for {formatted_name} from official sources" + build_rss_from_news_feed(title, description, path, rss_file_path) + elif os.path.exists(path): - log_output(f"Failed. Couldn't fetch {filename.upper()} data. Using previously scraped data", "NEWS") - with open(path, 'r') as json_file: - news_data = json.load(json_file)['news_posts'] + log_output( + f"Failed. Couldn't fetch {filename.upper()} data. Using previously scraped data", + "NEWS", + ) + with open(path, "r", encoding="utf-8") as json_file: + news_data = json.load(json_file)["news_posts"] + else: log_output(f"Failed. Couldn't fetch {filename.upper()} data. Skipping...", "NEWS") + return news_data |
