aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--community/wacca_plus/wacca_plus.py32
-rw-r--r--database.py36
-rw-r--r--summarizer.py30
-rw-r--r--translate.py64
4 files changed, 65 insertions, 97 deletions
diff --git a/community/wacca_plus/wacca_plus.py b/community/wacca_plus/wacca_plus.py
index 067df9f..0f34814 100644
--- a/community/wacca_plus/wacca_plus.py
+++ b/community/wacca_plus/wacca_plus.py
@@ -1,10 +1,11 @@
-import os
from datetime import datetime
+from dotenv import load_dotenv
+from database import Database
+import os
import time
import requests
import openai
import json
-from dotenv import load_dotenv
import base64
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")))
@@ -64,18 +65,6 @@ def check_is_announcement_image(img_url: str):
parsed_result = json.loads(tool_args)
return parsed_result["is_wacca_plus_related"], parsed_result["category"]
-def _load_cache():
- cache_file = "wac_result_cache.json"
- if not os.path.exists(cache_file):
- with open(cache_file, "w") as file:
- json.dump({}, file)
- with open(cache_file, "r") as file:
- return json.load(file)
-
-def _save_cache(cache: dict):
- cache_file = "wac_result_cache.json"
- with open(cache_file, "w") as file:
- json.dump(cache, file)
def _convert_image_to_base64(img_url: str):
response = requests.get(img_url)
@@ -89,7 +78,7 @@ def _convert_image_to_base64(img_url: str):
def parse_announcement_messages(message_json: dict):
news_posts = []
- cache = _load_cache()
+ database = Database()
for message in message_json:
type = None
message_content = message.get("content", "")
@@ -107,12 +96,14 @@ def parse_announcement_messages(message_json: dict):
image_urls = [] # save the images before they get encoded
for image in image_attachments:
image_urls.append(image["url"])
- if image["id"] in cache:
- is_related = cache[image["id"]][0]
- type = cache[image["id"]][1]
+ entry = database.get_wac_entry(image["id"])
+ if entry:
+ is_related = entry[0]
+ type = entry[1]
else:
is_related, type = check_is_announcement_image(image["url"])
- cache[image["id"]] = [is_related, type]
+ database.add_new_wac_entry(key=image["id"], is_news=is_related, post_type=type)
+
if not is_related:
continue
filtered_images.append({"image": _convert_image_to_base64(image["url"]), "url": None})
@@ -136,6 +127,5 @@ def parse_announcement_messages(message_json: dict):
"images": filtered_images,
'is_ai_summary': True
})
-
- _save_cache(cache)
+ database.close()
return news_posts
diff --git a/database.py b/database.py
index a4e5ac9..0e1da81 100644
--- a/database.py
+++ b/database.py
@@ -14,6 +14,11 @@ class Database:
self._cursor.executescript(f.read())
self._conn.commit()
+ def close(self):
+ """Close the database connection"""
+ if self._conn:
+ self._conn.close()
+
def _migrate_old_data(self):
"""
Migrates old summarization, tl and wac files into DB
@@ -66,3 +71,34 @@ class Database:
(key, headline, content)
)
self._conn.commit()
+
+ def get_summary(self, key: str):
+ self._cursor.execute(
+ "SELECT headline, content FROM summarization WHERE id = ?",
+ (key,)
+ )
+ result = self._cursor.fetchone()
+ if result is None:
+ return None
+ return {"headline": result[0], "content": result[1]}
+
+ def get_translation(self, key: str):
+ self._cursor.execute(
+ "SELECT result FROM translation WHERE id = ?",
+ (key,)
+ )
+ result = self._cursor.fetchone()
+ if result is None:
+ return None
+ return result[0]
+
+ def get_wac_entry(self, key: str):
+ self._cursor.execute(
+ "SELECT isNews, type FROM wacplus WHERE id = ?",
+ (key,)
+ )
+ result = self._cursor.fetchone()
+ if result is None:
+ return None
+ is_news = True if result[0] == 1 else False
+ return is_news, result[1]
diff --git a/summarizer.py b/summarizer.py
index 8fe86ae..25d1f8f 100644
--- a/summarizer.py
+++ b/summarizer.py
@@ -1,4 +1,5 @@
from dotenv import load_dotenv
+from database import Database
import openai
import json
import hashlib
@@ -11,21 +12,6 @@ def summarization_is_possible() -> bool:
return os.getenv("OPENAI_API_KEY")
-def _load_cache():
- cache_file = "summarization_cache.json"
- if not os.path.exists(cache_file):
- with open(cache_file, "w") as file:
- json.dump({}, file)
- with open(cache_file, "r") as file:
- return json.load(file)
-
-
-def _save_cache(cache: dict):
- cache_file = "summarization_cache.json"
- with open(cache_file, "w") as file:
- json.dump(cache, file)
-
-
def _make_cache_key(game: str, img_urls: list[str]) -> str:
normalized_game = game.strip().lower()
img_data = json.dumps(sorted(img_urls), separators=(",", ":"))
@@ -40,12 +26,11 @@ def generate_headline_and_content_from_images(img_urls: list[str], game: str, me
# Limit message content to 500 characters
if len(message_content) > MAX_CHAR_CONTENT_CONSIDERATION_LENGTH:
message_content = message_content[:MAX_CHAR_CONTENT_CONSIDERATION_LENGTH]
-
- cache = _load_cache()
+ database = Database()
cache_key = _make_cache_key(game, img_urls)
- if cache_key in cache:
- cached = cache[cache_key]
- return cached["headline"], cached["content"]
+ cache_entry = database.get_summary(cache_key)
+ if cache_entry:
+ return cache_entry["headline"], cache_entry["content"]
tools = [
{
"type": "function",
@@ -100,9 +85,10 @@ def generate_headline_and_content_from_images(img_urls: list[str], game: str, me
parsed_result = json.loads(tool_result)
headline = parsed_result["headline"]
content = parsed_result["content"]
- cache[cache_key] = {"headline": headline, "content": content}
- _save_cache(cache)
+ database.add_new_summary(cache_key, headline, content)
+ database.close()
except openai.OpenAIError as e:
print(f"[ERROR] Function call to OpenAI for summarization failed ERROR -> {e} ")
+ database.close()
return None, None
return headline, content
diff --git a/translate.py b/translate.py
index 877872a..31206a4 100644
--- a/translate.py
+++ b/translate.py
@@ -1,9 +1,9 @@
from dotenv import load_dotenv
+from database import Database
import requests
import constants
import re
import os
-import json
import hashlib
@@ -36,58 +36,15 @@ def _decode_links(raw_text: str, links: list) -> str:
raw_text = raw_text.replace(link[0], link[1])
return raw_text
-def _load_translation_cache() -> dict:
- cache_file = "tl_cache.json"
- tl_map = {}
- if os.path.exists(cache_file):
- try:
- with open(cache_file, "r", encoding="utf-8") as file:
- entries = json.load(file)
- for entry in entries:
- key = hashlib.sha256((entry["source_lang"] + entry["target_lang"] + entry["source_txt"]).encode('utf-8')).hexdigest()
- tl_map[key] = entry["result_txt"]
- return tl_map
- except (UnicodeDecodeError, json.JSONDecodeError, KeyError) as e:
- print(f"Translation cache corrupted ({e}), deleting and starting fresh...")
- os.remove(cache_file)
- with open(cache_file, "w", encoding="utf-8") as file:
- json.dump([], file, ensure_ascii=False, indent=4)
- return {}
- else:
- with open(cache_file, "w", encoding="utf-8") as file:
- json.dump([], file, ensure_ascii=False, indent=4)
- return {}
-
-def _add_to_translation_cache(source_lang: str, target_lang: str, source_txt: str, result_txt: str) -> None:
- cache_file = "tl_cache.json"
- cache_entry = {
- "source_lang": source_lang,
- "target_lang": target_lang,
- "source_txt": source_txt,
- "result_txt": result_txt
- }
- try:
- if os.path.exists(cache_file):
- with open(cache_file, "r", encoding="utf-8") as file:
- cache = json.load(file)
- else:
- cache = []
- cache.append(cache_entry)
- with open(cache_file, "w", encoding="utf-8") as file:
- json.dump(cache, file, ensure_ascii=False, indent=4)
- except (UnicodeDecodeError, json.JSONDecodeError) as e:
- print(f"Translation cache corrupted during write ({e}), starting fresh...")
- cache = [cache_entry]
- with open(cache_file, "w", encoding="utf-8") as file:
- json.dump(cache, file, ensure_ascii=False, indent=4)
-
-def request_google_translate(text: str, source: str="ja", target="en", translation_cache=None) -> tuple:
+def request_google_translate(text: str, source: str="ja", target="en") -> tuple:
"""
Translates input text and returns the translated text using Google Cloud Translation API.
"""
key = hashlib.sha256((source + target + text).encode('utf-8')).hexdigest()
- if translation_cache and key in translation_cache:
- return translation_cache[key]
+ database = Database()
+ tl_result = database.get_translation(key)
+ if tl_result:
+ return tl_result
API_KEY = os.getenv("GOOGLE_TRANSLATE_API_KEY")
encoded_text, restore_data = _encode_links(text)
url = "https://translation.googleapis.com/language/translate/v2?key="+API_KEY
@@ -100,8 +57,8 @@ def request_google_translate(text: str, source: str="ja", target="en", translati
response = requests.post(url, json=payload)
data = response.json()
translated_text = data["data"]["translations"][0]["translatedText"]
- translation_cache[key] = translated_text
- _add_to_translation_cache(source, target, text, translated_text)
+ database.add_new_translation(key=key, source_lang=source, target_lang=target, source_txt=text, result_txt=translated_text)
+ database.close()
return _decode_links(translated_text, restore_data)
def translation_possible() -> bool:
@@ -115,20 +72,19 @@ def add_translate_text_to_en(news_post: dict, overrides: list=[]) -> dict:
if not translation_possible():
return news_post
translated_posts = []
- translation_cache = _load_translation_cache()
for post in news_post:
headline = post.get("headline")
if headline:
for override in overrides:
headline = headline.replace(override[0], override[1])
- post["en_headline"] = request_google_translate(headline, translation_cache=translation_cache)
+ post["en_headline"] = request_google_translate(headline)
else:
post["en_headline"] = None
content = post.get("content")
if content:
for override in overrides:
content = content.replace(override[0], override[1])
- en_content = request_google_translate(content, translation_cache=translation_cache)
+ en_content = request_google_translate(content)
post["en_content"] = en_content
else:
post["en_content"] = None
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage