1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
import os
import boto3
import json
import requests
import random
from dotenv import load_dotenv
from datetime import datetime, timezone
import yt_dlp
import time
load_dotenv()
ACCOUNT_ID = os.getenv("R2_ACCOUNT_ID")
ACCESS_KEY = os.getenv("R2_ACCESS_KEY")
SECRET_KEY = os.getenv("R2_SECRET_KEY")
BUCKET = os.getenv("R2_BUCKET")
API_URL = os.getenv("API_URL")
OBFUSCATION_KEY = os.getenv("OBFUSCATION_KEY")
def xor_buffer(data: bytes, key: bytes) -> bytes:
return bytes(b ^ key[i % len(key)] for i, b in enumerate(data))
def get_obfuscation_key() -> bytes:
date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
return (OBFUSCATION_KEY + date).encode("utf-8")
def decode_data(hex_data: str):
encrypted = bytes.fromhex(hex_data)
key = get_obfuscation_key()
decrypted = xor_buffer(encrypted, key)
return json.loads(decrypted.decode("utf-8"))
def fetch_daily() -> dict:
url = f"{API_URL}/today"
response = requests.get(url)
response.raise_for_status()
return response.json()
def download_random_segment_mp3(youtube_id: str, output_file="today.mp3") -> str:
url = f"https://www.youtube.com/watch?v={youtube_id}"
with yt_dlp.YoutubeDL({"quiet": True}) as ydl:
info = ydl.extract_info(url, download=False)
duration = info.get("duration", 60)
start = 0 if duration <= 17 else random.randint(0, duration - 17)
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": "today.%(ext)s",
"quiet": True,
"download_ranges": lambda info, _: [
{"start_time": start, "end_time": start + 17}
],
"force_keyframes_at_cuts": True,
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "192",
}
],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
return output_file
def upload_to_r2(file_path: str, object_key: str):
s3 = boto3.client(
"s3",
endpoint_url=f"https://{ACCOUNT_ID}.r2.cloudflarestorage.com",
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
region_name="auto",
)
s3.upload_file(file_path, BUCKET, object_key)
def write_json(file_path, data):
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4, ensure_ascii=False)
def read_json(file_path, default=None):
if default is None:
default = {}
if not os.path.exists(file_path):
with open(file_path, "w", encoding="utf-8") as f:
json.dump(default, f, indent=4, ensure_ascii=False)
return default
try:
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError:
with open(file_path, "w", encoding="utf-8") as f:
json.dump(default, f, indent=4, ensure_ascii=False)
return default
def main():
new_data = False
daily_data = fetch_daily()
attempt = 0
while not new_data:
dumped_data = read_json("save.json")
if dumped_data == daily_data:
attempt += 1
print(f"Server still returning old data, waiting... {attempt} ")
time.sleep(5)
else:
new_data = True
daily_data = fetch_daily()
data = decode_data(daily_data["data"])
print(data)
youtube_id = data["youtubeId"]
clip_path = download_random_segment_mp3(youtube_id)
date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
upload_to_r2(clip_path, f"kheardle/{date}.mp3")
write_json("save.json", daily_data)
if __name__ == "__main__":
main()
|