1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
import firebase_admin
from firebase_admin import credentials, messaging
from dotenv import load_dotenv
import requests
import os
load_dotenv()
def check_can_send_notifs():
required_env_vars = [
"KV_REST_API_URL",
"KV_REST_API_TOKEN",
"FIREBASE_SERVICE_ACCOUNT_JSON_PATH"
]
missing_vars = [var for var in required_env_vars if os.environ.get(var) is None]
if missing_vars:
return False
else:
return True
def get_tokens(topic):
upstash_rest_api_url = os.environ.get("KV_REST_API_URL")
upstash_token = os.environ.get("KV_REST_API_TOKEN")
url = f"{upstash_rest_api_url}/smembers/fcm-{topic}"
resp = requests.get(
url,
headers={"Authorization": f"Bearer {upstash_token}"}
)
if resp.status_code != 200:
print("Upstash error:", resp.text)
return []
data = resp.json()
return data.get("result", [])
def broadcast_to_topic(topic: str, title: str, body: str, image):
if not firebase_admin._apps:
cred = credentials.Certificate(os.environ.get("FIREBASE_SERVICE_ACCOUNT_JSON_PATH"))
firebase_admin.initialize_app(cred)
tokens = get_tokens(topic)
if not tokens:
print(f"No tokens found for topic '{topic}'")
return
multicast_message = messaging.MulticastMessage(
notification=messaging.Notification(
title=title,
body=body,
image=image
),
tokens=tokens
)
try:
response = messaging.send_each_for_multicast(multicast_message)
print(f"Successfully sent {response.success_count} messages out of {len(tokens)}")
if response.failure_count > 0:
print(f"Failed to send {response.failure_count} messages")
upstash_rest_api_url = os.environ.get("KV_REST_API_URL")
upstash_token = os.environ.get("KV_REST_API_TOKEN")
for idx, error in enumerate(response.responses):
if not error.success:
failed_token = tokens[idx]
print(f"Error for token {failed_token}: {error.exception}")
url = f"{upstash_rest_api_url}/srem/fcm-{topic}/{failed_token}"
resp = requests.post(
url,
headers={"Authorization": f"Bearer {upstash_token}"}
)
if resp.status_code != 200:
print(f"Failed to remove token {failed_token} from topic '{topic}': {resp.text}")
else:
print(f"Removed token {failed_token} from topic '{topic}'")
except Exception as e:
print(f"Error sending multicast message: {e}")
|