aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPinapelz <yukais@pinapelz.com>2024-02-24 13:21:18 -0800
committerPinapelz <yukais@pinapelz.com>2024-02-24 13:21:18 -0800
commit0b2a82d3f472e047b826377446ab35128477d0fb (patch)
treed79241aaaaec24692f0e04c3135bb3a652859665
parenta6b631ab9e805b3345a692c7814fdb91c0931e91 (diff)
feat: parse past streams via live_chat json
-rw-r--r--vod_parser.py64
-rw-r--r--yt_livechat_stats.py92
2 files changed, 132 insertions, 24 deletions
diff --git a/vod_parser.py b/vod_parser.py
new file mode 100644
index 0000000..6e02b61
--- /dev/null
+++ b/vod_parser.py
@@ -0,0 +1,64 @@
+import os
+import json
+from collections import defaultdict
+from datetime import datetime
+
+LIVE_CHAT_RENDERERS = ["liveChatViewerEngagementMessageRenderer", "liveChatTextMessageRenderer"]
+
+class VodParser:
+ def __init__(self, video_id: str, data_writer, theshold: int = 1, cooldown: float = 5, keywords: list = [], start_unix_time: int = 0):
+ self.video_id = video_id
+ self._chat_file = f"{self.video_id}.live_chat.json"
+ self._threshold = theshold
+ self._cooldown = cooldown
+ self._keywords = keywords
+ self._start_unix_time = start_unix_time
+ self.data_writer = data_writer
+
+ def download_live_chat(self):
+ if os.path.exists(f"{self.video_id}.live_chat.json"):
+ os.remove(f"{self.video_id}.live_chat.json")
+ os.system(f"yt-dlp {self.video_id} --skip-download -o '%(id)s' --write-sub --")
+ self._chat_file = f"{self.video_id}.live_chat.json"
+
+ def clean_up(self):
+ os.remove(f"{self.video_id}.live_chat.json")
+
+
+ def parse_chat(self):
+ message_count_buffer = 0
+ last_refresh_time = 0
+ with open(self._chat_file, "r") as ndjson_file:
+ for line in ndjson_file:
+ chat_data = json.loads(line)
+ for renderer in LIVE_CHAT_RENDERERS:
+ try:
+ timestamp_usec = chat_data["replayChatItemAction"]["actions"][0]["addChatItemAction"]["item"][renderer]["timestampUsec"]
+ unix_timestamp = int(timestamp_usec) / 1000000
+
+ if last_refresh_time == 0 or unix_timestamp < last_refresh_time:
+ last_refresh_time = unix_timestamp
+ message_count_buffer += 1
+ continue
+
+ if unix_timestamp - last_refresh_time > self._cooldown:
+ message_rate = message_count_buffer / (unix_timestamp - last_refresh_time)
+ if message_rate > self._threshold:
+ print(f"Message rate exceeded threshold: {message_rate} messages per second")
+ seconds_since_start_of_stream = unix_timestamp - self._start_unix_time
+ youtube_timestamp = datetime.utcfromtimestamp(seconds_since_start_of_stream).strftime('%H:%M:%S')
+ print(f"Timestamp: {youtube_timestamp}")
+ self.data_writer.write(f"Notable moment found at {youtube_timestamp}\n")
+
+
+ message_count_buffer = 0
+ last_refresh_time = unix_timestamp
+ message_count_buffer += 1
+
+ except KeyError:
+ continue
+
+
+if __name__ == "__main__":
+ parser = VodParser("RsYA5heZSk8")
+ parser.parse_chat()
diff --git a/yt_livechat_stats.py b/yt_livechat_stats.py
index 6e6e172..9846b5b 100644
--- a/yt_livechat_stats.py
+++ b/yt_livechat_stats.py
@@ -4,6 +4,7 @@ import argparse
import curses
from text_renderer import TextRenderer
from data_writer import DataWriter
+from vod_parser import VodParser
import yt_dlp
import os
@@ -11,6 +12,7 @@ import os
class StreamEndedError(Exception):
pass
+
def get_message_rate(message_count: int, last_refresh_time: time.time) -> tuple:
if time.time() - last_refresh_time < 1:
return "", None
@@ -20,11 +22,14 @@ def get_message_rate(message_count: int, last_refresh_time: time.time) -> tuple:
message_data = message_rate_text, message_count/time_elapsed
return message_data
+
def check_for_superchats(chat_data, data_writer: DataWriter):
if chat_data.amountValue:
- data_writer.write(f"Superchat: {chat_data.amountValue} from {chat_data.author.name}\n")
+ data_writer.write(
+ f"Superchat: {chat_data.amountValue} from {chat_data.author.name}\n")
data_writer.write(f"Message: {chat_data.message}\n\n")
+
def get_video_info(video_id: str) -> dict:
with yt_dlp.YoutubeDL() as ydl:
info = ydl.extract_info(video_id, download=False)
@@ -37,9 +42,18 @@ def main(stdscr, video_id: str, args: argparse.Namespace):
data_writer = DataWriter(f"{video_id}-analytics.txt")
video_info = get_video_info(video_id)
stream_start_unix_time = video_info["release_timestamp"]
-
-
-
+ if not bool(video_info["is_live"]):
+ vod_parser = VodParser(video_id, data_writer,
+ start_unix_time=stream_start_unix_time,
+ theshold=args.threshold,
+ cooldown=args.cooldown,
+ keywords=args.keywords)
+ vod_parser.download_live_chat()
+ vod_parser.parse_chat()
+ vod_parser.clean_up()
+ exit()
+ if args.keywords:
+ keywords = [keyword.strip() for keyword in args.keywords.split(",")]
message_count = 0
on_cooldown = False
cooldown_start_time = time.time()
@@ -52,51 +66,81 @@ def main(stdscr, video_id: str, args: argparse.Namespace):
key = stdscr.getch()
if key == ord('q'):
raise Exception("User ended data collection")
- timestamp_since_start = time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time_const))
- info_msg = "Now collecting data for " + video_id + " Elapsed time: " + timestamp_since_start + " seconds (Press q to quit)"
+ timestamp_since_start = time.strftime(
+ "%H:%M:%S", time.gmtime(time.time() - start_time_const))
+ info_msg = "Now collecting data for " + video_id + " Elapsed time: " + \
+ timestamp_since_start + " seconds (Press q to quit)"
vid_info = f"Title: {video_info['title']} Channel: {video_info['uploader']} \nUnix Start Time: {stream_start_unix_time}"
text_renderer.render(info_msg, y_pos=0)
text_renderer.render(vid_info, y_pos=1)
if on_cooldown and time.time() - cooldown_start_time > args.cooldown:
on_cooldown = False
-
+
# Message Rate Related Features
if time.time() - last_refresh_time > 1 and not on_cooldown:
- message_rate_text, message_rate = get_message_rate(message_count, last_refresh_time)
+ message_rate_text, message_rate = get_message_rate(
+ message_count, last_refresh_time)
text_renderer.render(message_rate_text, y_pos=4)
last_refresh_time = time.time()
if message_rate > args.threshold:
diff_time_seconds = time.time() - stream_start_unix_time
- youtube_timestamp = time.strftime("%H:%M:%S", time.gmtime(diff_time_seconds))
- data_writer.write(f"Notable moment found at {youtube_timestamp} measured at {message_rate}\n")
- text_renderer.render(f"Latest notable moment found at {youtube_timestamp} measured at {message_rate} - on cooldown until {time.time() + args.cooldown}\n", y_pos=5)
+ youtube_timestamp = time.strftime(
+ "%H:%M:%S", time.gmtime(diff_time_seconds))
+ data_writer.write(
+ f"Notable moment found at {youtube_timestamp} measured at {message_rate}\n")
+ text_renderer.render(
+ f"Latest notable moment found at {youtube_timestamp} measured at {message_rate} - on cooldown until {time.time() + args.cooldown}\n", y_pos=5)
on_cooldown = True
message_count = 0
else:
message_count += 1
+ # Keyword related features
+ if args.keywords:
+ for keyword in keywords:
+ if keyword.lower() in chat_data.message.lower():
+ diff_time_seconds = time.time() - stream_start_unix_time
+ youtube_timestamp = time.strftime(
+ "%H:%M:%S", time.gmtime(diff_time_seconds))
+ data_writer.write(
+ f"Keyword {keyword} found at {youtube_timestamp}\n")
+ text_renderer.render(
+ f"Keyword {keyword} found at {youtube_timestamp}\n", y_pos=6)
+ on_cooldown = True
+ cooldown_start_time = time.time()
+
# Superchat Related Features (TODO)
- #if args.superchats:
+ # if args.superchats:
# check_for_superchats(chat_data, data_writer)
-
- text_renderer.render(get_message_rate(message_count, last_refresh_time)[0], y_pos=2)
+ text_renderer.render(get_message_rate(
+ message_count, last_refresh_time)[0], y_pos=2)
if args.show_chat:
- text_renderer.log_message("[Chat]" + chat_data.author.name + ": " + chat_data.message)
+ text_renderer.log_message(
+ "[Chat]" + chat_data.author.name + ": " + chat_data.message)
text_renderer.refresh()
-
+
except Exception as e:
- data_writer.write(f"Data collection ended at {time.strftime('%H:%M:%S', time.gmtime(time.time() - stream_start_unix_time))} due to {e}\n")
+ data_writer.write(
+ f"Data collection ended at {time.strftime('%H:%M:%S', time.gmtime(time.time() - stream_start_unix_time))} due to {e}\n")
+
if __name__ == "__main__":
- parser = argparse.ArgumentParser(description='YouTube Live Chat Message Rate Tracker')
- parser.add_argument('video_id', help='The ID of the YouTube video for the live chat')
- parser.add_argument('--show-chat', action='store_true', help='Show the live chat in the terminal window')
- parser.add_argument('--superchats', action='store_true', help='Log superchat messages and amounts')
- parser.add_argument('--threshold', type=int, default=5, help='Log timestamps when the message rate exceeds this value (msg/s)')
- parser.add_argument('--keywords', type=str, help='Log timestamps when a message contains any of these keywords. Enter keywords separated by commas')
- parser.add_argument('--cooldown', type=int, default=20, help='Minimum time that must pass before another notable moment is logged (in seconds)')
+ parser = argparse.ArgumentParser(
+ description='YouTube Live Chat Message Rate Tracker')
+ parser.add_argument(
+ 'video_id', help='The ID of the YouTube video for the live chat')
+ parser.add_argument('--show-chat', action='store_true',
+ help='Show the live chat in the terminal window')
+ parser.add_argument('--superchats', action='store_true',
+ help='Log superchat messages and amounts')
+ parser.add_argument('--threshold', type=int, default=5,
+ help='Log timestamps when the message rate exceeds this value (msg/s)')
+ parser.add_argument('--keywords', type=str,
+ help='Log timestamps when a message contains any of these keywords. Enter keywords separated by commas')
+ parser.add_argument('--cooldown', type=int, default=20,
+ help='Minimum time that must pass before another notable moment is logged (in seconds)')
args = parser.parse_args()
if os.path.exists(f"{args.video_id}-analytics.txt"):
if input(f"File {args.video_id}-analytics.txt already exists. Overwrite? (y/n): ").lower() != "y":
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage