aboutsummaryrefslogtreecommitdiffstats
path: root/notifications.py
diff options
context:
space:
mode:
authorPinapelz <yukais@pinapelz.com>2025-10-03 14:35:53 -0700
committerPinapelz <yukais@pinapelz.com>2025-10-03 14:35:53 -0700
commitf79ddc9f9825c32c74bcb29422a54203c81a00dd (patch)
treee04f92c334dd197facf3e15cd0d8ed6e1ae1f69c /notifications.py
parentc2fb5403dd16f0b43f6d1f43e6f1fd42b9ec7862 (diff)
initial fcm cloud messaging trigger in generate script
Diffstat (limited to 'notifications.py')
-rw-r--r--notifications.py65
1 files changed, 65 insertions, 0 deletions
diff --git a/notifications.py b/notifications.py
new file mode 100644
index 0000000..f5c7e77
--- /dev/null
+++ b/notifications.py
@@ -0,0 +1,65 @@
+import firebase_admin
+from firebase_admin import credentials, messaging
+from dotenv import load_dotenv
+import requests
+import os
+
+load_dotenv()
+
+def check_can_send_notifs():
+ required_env_vars = [
+ "KV_REST_API_URL",
+ "KV_REST_API_TOKEN",
+ "FIREBASE_SERVICE_ACCOUNT_JSON_PATH"
+ ]
+ missing_vars = [var for var in required_env_vars if os.environ.get(var) is None]
+ if missing_vars:
+ return False
+ else:
+ return True
+
+def get_tokens(topic):
+ upstash_rest_api_url = os.environ.get("KV_REST_API_URL")
+ upstash_token = os.environ.get("KV_REST_API_TOKEN")
+ url = f"{upstash_rest_api_url}/smembers/fcm-{topic}"
+ resp = requests.get(
+ url,
+ headers={"Authorization": f"Bearer {upstash_token}"}
+ )
+ if resp.status_code != 200:
+ print("Upstash error:", resp.text)
+ return []
+ data = resp.json()
+ return data.get("result", [])
+
+def broadcast_to_topic(topic: str, title: str, body: str, image):
+ if not firebase_admin._apps:
+ cred = credentials.Certificate(os.environ.get("FIREBASE_SERVICE_ACCOUNT_JSON_PATH"))
+ firebase_admin.initialize_app(cred)
+
+ tokens = get_tokens(topic)
+ if not tokens:
+ print(f"No tokens found for topic '{topic}'")
+ return
+
+ for token in tokens:
+ message = messaging.Message(
+ notification=messaging.Notification(
+ title=title,
+ body=body,
+ image=image
+ ),
+ token=token
+ )
+ try:
+ response = messaging.send(message)
+ print(f"Sent to {token}: {response}")
+ except Exception as e:
+ print(f"Error sending to {token}: {e}")
+
+if __name__ == "__main__":
+ topic = "sdvx"
+ title = "New sdvx Update!"
+ body = "Check out the latest DDR content now!"
+
+ broadcast_to_topic(topic, title, body)
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage