aboutsummaryrefslogtreecommitdiffstats
path: root/database/remote_database.py
diff options
context:
space:
mode:
authorPinapelz <yukais@pinapelz.com>2025-11-21 22:30:47 -0800
committerPinapelz <yukais@pinapelz.com>2025-11-21 23:27:30 -0800
commit22714994d2b7cfa238c8b2d54a3639cd6417e9b6 (patch)
treedc7036309fbb35376f452f44e33bfbe9c2e18c61 /database/remote_database.py
parenta7347217899fb7a3addcee58a9fbee4a0c07ff57 (diff)
scaffold implementation for remote sqlite db
Diffstat (limited to 'database/remote_database.py')
-rw-r--r--database/remote_database.py159
1 files changed, 159 insertions, 0 deletions
diff --git a/database/remote_database.py b/database/remote_database.py
new file mode 100644
index 0000000..958d815
--- /dev/null
+++ b/database/remote_database.py
@@ -0,0 +1,159 @@
+from database.base_database import BaseDatabase
+import libsql
+
+
+class RemoteDatabase(BaseDatabase):
+ def __init__(self, url: str = None, auth_token: str = None, local_replica: str = None):
+ """
+ Initialize connection to Remote database using libsql (designed for Turso)
+ """
+ self._url = url
+ self._auth_token = auth_token
+ self._local_replica = local_replica or "new_db.db"
+
+ if not self._url:
+ raise ValueError("Database URL must be provided either as parameter or TURSO_DATABASE_URL environment variable")
+
+ if not self._auth_token:
+ raise ValueError("Auth token must be provided either as parameter or TURSO_AUTH_TOKEN environment variable")
+
+ self._conn = libsql.connect(
+ self._local_replica,
+ sync_url=self._url,
+ auth_token=self._auth_token
+ )
+
+ # Initial sync to get latest data
+ self._conn.sync()
+ self._initialize_db()
+
+ def _initialize_db(self):
+ """Initialize database schema"""
+ with open("schema.sql") as f:
+ # Execute schema creation
+ self._conn.executescript(f.read())
+ self._conn.commit()
+
+ def close(self):
+ """Close the database connection"""
+ if self._conn:
+ self._conn.close()
+
+ def add_new_wac_entry(self, key: str, is_news: bool, post_type: str):
+ """Add a new WAC entry to the database"""
+ news_var = 0 if is_news is False else 1
+ self._conn.execute(
+ "INSERT OR REPLACE INTO wacplus (id, isNews, type) VALUES (?, ?, ?)",
+ (key, news_var, post_type),
+ )
+ self._conn.commit()
+
+ def add_new_translation(
+ self,
+ key: str,
+ source_lang: str,
+ target_lang: str,
+ source_txt: str,
+ result_txt: str,
+ ):
+ """Add a new translation to the database"""
+ self._conn.execute(
+ "INSERT OR REPLACE INTO translation (id, source_lang, target_lang, source, result) VALUES (?, ?, ?, ?, ?)",
+ (key, source_lang, target_lang, source_txt, result_txt),
+ )
+ self._conn.commit()
+
+ def add_new_summary(self, key: str, headline: str, content: str):
+ """Add a new summary to the database"""
+ self._conn.execute(
+ "INSERT OR REPLACE INTO summarization (id, headline, content) VALUES (?, ?, ?)",
+ (key, headline, content),
+ )
+ self._conn.commit()
+
+ def add_news_entry(self, key: str, news_entry: dict):
+ """Add a new news entry to the database"""
+ is_ai_summary = 1 if news_entry.get("is_ai_summary", False) else 0
+ en_headline = news_entry.get("en_headline", None)
+ en_content = news_entry.get("en_content", None)
+ headline = news_entry.get("headline", None)
+ url = news_entry.get("url", None)
+
+ self._conn.execute(
+ "INSERT OR REPLACE INTO news (news_id, date, identifier, type, timestamp, headline, content, url, is_ai_summary, en_headline, en_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+ (
+ key,
+ news_entry["date"],
+ news_entry["identifier"],
+ news_entry["type"],
+ news_entry["timestamp"],
+ headline,
+ news_entry["content"],
+ url,
+ is_ai_summary,
+ en_headline,
+ en_content,
+ ),
+ )
+
+ # Add associated images
+ for image_entry in news_entry["images"]:
+ if image_entry["image"].startswith("data:"):
+ continue
+ link_url = image_entry.get("link", None)
+ self._conn.execute(
+ "INSERT OR REPLACE INTO news_images (news_id, image_url, link_url) VALUES (?, ?, ?)",
+ (key, image_entry["image"], link_url),
+ )
+ self._conn.commit()
+
+ def get_summary(self, key: str):
+ """Get a summary by key"""
+ result = self._conn.execute(
+ "SELECT headline, content FROM summarization WHERE id = ?", (key,)
+ ).fetchone()
+ if result is None:
+ return None
+ return {"headline": result[0], "content": result[1]}
+
+ def get_translation(self, key: str):
+ """Get a translation by key"""
+ result = self._conn.execute("SELECT result FROM translation WHERE id = ?", (key,)).fetchone()
+ if result is None:
+ return None
+ return result[0]
+
+ def get_wac_entry(self, key: str):
+ """Get a WAC entry by key"""
+ result = self._conn.execute("SELECT isNews, type FROM wacplus WHERE id = ?", (key,)).fetchone()
+ if result is None:
+ return None
+ is_news = True if result[0] == 1 else False
+ return is_news, result[1]
+
+ def check_news_id_exists(self, key: str) -> bool:
+ """
+ Check if a news entry with the given ID exists in the database.
+
+ :param key: The ID of the news entry to check.
+ :return: True if the news entry exists, False otherwise.
+ """
+ result = self._conn.execute("SELECT 1 FROM news WHERE news_id = ?", (key,)).fetchone()
+ return result is not None
+
+ def sync(self):
+ """Sync local changes with the remote database (Turso specific)"""
+ self._conn.sync()
+
+ def get_stats(self):
+ """Get database statistics (useful for monitoring)"""
+ stats = {}
+
+ # Count records in each table
+ tables = ['news', 'news_images', 'summarization', 'translation', 'wacplus']
+ for table in tables:
+ result = self._conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()
+ count = result[0] if result else 0
+ stats[f"{table}_count"] = count
+
+ return stats
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage