diff options
| -rw-r--r-- | vod_parser.py | 64 | ||||
| -rw-r--r-- | yt_livechat_stats.py | 92 |
2 files changed, 132 insertions, 24 deletions
diff --git a/vod_parser.py b/vod_parser.py new file mode 100644 index 0000000..6e02b61 --- /dev/null +++ b/vod_parser.py @@ -0,0 +1,64 @@ +import os +import json +from collections import defaultdict +from datetime import datetime + +LIVE_CHAT_RENDERERS = ["liveChatViewerEngagementMessageRenderer", "liveChatTextMessageRenderer"] + +class VodParser: + def __init__(self, video_id: str, data_writer, theshold: int = 1, cooldown: float = 5, keywords: list = [], start_unix_time: int = 0): + self.video_id = video_id + self._chat_file = f"{self.video_id}.live_chat.json" + self._threshold = theshold + self._cooldown = cooldown + self._keywords = keywords + self._start_unix_time = start_unix_time + self.data_writer = data_writer + + def download_live_chat(self): + if os.path.exists(f"{self.video_id}.live_chat.json"): + os.remove(f"{self.video_id}.live_chat.json") + os.system(f"yt-dlp {self.video_id} --skip-download -o '%(id)s' --write-sub --") + self._chat_file = f"{self.video_id}.live_chat.json" + + def clean_up(self): + os.remove(f"{self.video_id}.live_chat.json") + + + def parse_chat(self): + message_count_buffer = 0 + last_refresh_time = 0 + with open(self._chat_file, "r") as ndjson_file: + for line in ndjson_file: + chat_data = json.loads(line) + for renderer in LIVE_CHAT_RENDERERS: + try: + timestamp_usec = chat_data["replayChatItemAction"]["actions"][0]["addChatItemAction"]["item"][renderer]["timestampUsec"] + unix_timestamp = int(timestamp_usec) / 1000000 + + if last_refresh_time == 0 or unix_timestamp < last_refresh_time: + last_refresh_time = unix_timestamp + message_count_buffer += 1 + continue + + if unix_timestamp - last_refresh_time > self._cooldown: + message_rate = message_count_buffer / (unix_timestamp - last_refresh_time) + if message_rate > self._threshold: + print(f"Message rate exceeded threshold: {message_rate} messages per second") + seconds_since_start_of_stream = unix_timestamp - self._start_unix_time + youtube_timestamp = datetime.utcfromtimestamp(seconds_since_start_of_stream).strftime('%H:%M:%S') + print(f"Timestamp: {youtube_timestamp}") + self.data_writer.write(f"Notable moment found at {youtube_timestamp}\n") + + + message_count_buffer = 0 + last_refresh_time = unix_timestamp + message_count_buffer += 1 + + except KeyError: + continue + + +if __name__ == "__main__": + parser = VodParser("RsYA5heZSk8") + parser.parse_chat() diff --git a/yt_livechat_stats.py b/yt_livechat_stats.py index 6e6e172..9846b5b 100644 --- a/yt_livechat_stats.py +++ b/yt_livechat_stats.py @@ -4,6 +4,7 @@ import argparse import curses from text_renderer import TextRenderer from data_writer import DataWriter +from vod_parser import VodParser import yt_dlp import os @@ -11,6 +12,7 @@ import os class StreamEndedError(Exception): pass + def get_message_rate(message_count: int, last_refresh_time: time.time) -> tuple: if time.time() - last_refresh_time < 1: return "", None @@ -20,11 +22,14 @@ def get_message_rate(message_count: int, last_refresh_time: time.time) -> tuple: message_data = message_rate_text, message_count/time_elapsed return message_data + def check_for_superchats(chat_data, data_writer: DataWriter): if chat_data.amountValue: - data_writer.write(f"Superchat: {chat_data.amountValue} from {chat_data.author.name}\n") + data_writer.write( + f"Superchat: {chat_data.amountValue} from {chat_data.author.name}\n") data_writer.write(f"Message: {chat_data.message}\n\n") + def get_video_info(video_id: str) -> dict: with yt_dlp.YoutubeDL() as ydl: info = ydl.extract_info(video_id, download=False) @@ -37,9 +42,18 @@ def main(stdscr, video_id: str, args: argparse.Namespace): data_writer = DataWriter(f"{video_id}-analytics.txt") video_info = get_video_info(video_id) stream_start_unix_time = video_info["release_timestamp"] - - - + if not bool(video_info["is_live"]): + vod_parser = VodParser(video_id, data_writer, + start_unix_time=stream_start_unix_time, + theshold=args.threshold, + cooldown=args.cooldown, + keywords=args.keywords) + vod_parser.download_live_chat() + vod_parser.parse_chat() + vod_parser.clean_up() + exit() + if args.keywords: + keywords = [keyword.strip() for keyword in args.keywords.split(",")] message_count = 0 on_cooldown = False cooldown_start_time = time.time() @@ -52,51 +66,81 @@ def main(stdscr, video_id: str, args: argparse.Namespace): key = stdscr.getch() if key == ord('q'): raise Exception("User ended data collection") - timestamp_since_start = time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time_const)) - info_msg = "Now collecting data for " + video_id + " Elapsed time: " + timestamp_since_start + " seconds (Press q to quit)" + timestamp_since_start = time.strftime( + "%H:%M:%S", time.gmtime(time.time() - start_time_const)) + info_msg = "Now collecting data for " + video_id + " Elapsed time: " + \ + timestamp_since_start + " seconds (Press q to quit)" vid_info = f"Title: {video_info['title']} Channel: {video_info['uploader']} \nUnix Start Time: {stream_start_unix_time}" text_renderer.render(info_msg, y_pos=0) text_renderer.render(vid_info, y_pos=1) if on_cooldown and time.time() - cooldown_start_time > args.cooldown: on_cooldown = False - + # Message Rate Related Features if time.time() - last_refresh_time > 1 and not on_cooldown: - message_rate_text, message_rate = get_message_rate(message_count, last_refresh_time) + message_rate_text, message_rate = get_message_rate( + message_count, last_refresh_time) text_renderer.render(message_rate_text, y_pos=4) last_refresh_time = time.time() if message_rate > args.threshold: diff_time_seconds = time.time() - stream_start_unix_time - youtube_timestamp = time.strftime("%H:%M:%S", time.gmtime(diff_time_seconds)) - data_writer.write(f"Notable moment found at {youtube_timestamp} measured at {message_rate}\n") - text_renderer.render(f"Latest notable moment found at {youtube_timestamp} measured at {message_rate} - on cooldown until {time.time() + args.cooldown}\n", y_pos=5) + youtube_timestamp = time.strftime( + "%H:%M:%S", time.gmtime(diff_time_seconds)) + data_writer.write( + f"Notable moment found at {youtube_timestamp} measured at {message_rate}\n") + text_renderer.render( + f"Latest notable moment found at {youtube_timestamp} measured at {message_rate} - on cooldown until {time.time() + args.cooldown}\n", y_pos=5) on_cooldown = True message_count = 0 else: message_count += 1 + # Keyword related features + if args.keywords: + for keyword in keywords: + if keyword.lower() in chat_data.message.lower(): + diff_time_seconds = time.time() - stream_start_unix_time + youtube_timestamp = time.strftime( + "%H:%M:%S", time.gmtime(diff_time_seconds)) + data_writer.write( + f"Keyword {keyword} found at {youtube_timestamp}\n") + text_renderer.render( + f"Keyword {keyword} found at {youtube_timestamp}\n", y_pos=6) + on_cooldown = True + cooldown_start_time = time.time() + # Superchat Related Features (TODO) - #if args.superchats: + # if args.superchats: # check_for_superchats(chat_data, data_writer) - - text_renderer.render(get_message_rate(message_count, last_refresh_time)[0], y_pos=2) + text_renderer.render(get_message_rate( + message_count, last_refresh_time)[0], y_pos=2) if args.show_chat: - text_renderer.log_message("[Chat]" + chat_data.author.name + ": " + chat_data.message) + text_renderer.log_message( + "[Chat]" + chat_data.author.name + ": " + chat_data.message) text_renderer.refresh() - + except Exception as e: - data_writer.write(f"Data collection ended at {time.strftime('%H:%M:%S', time.gmtime(time.time() - stream_start_unix_time))} due to {e}\n") + data_writer.write( + f"Data collection ended at {time.strftime('%H:%M:%S', time.gmtime(time.time() - stream_start_unix_time))} due to {e}\n") + if __name__ == "__main__": - parser = argparse.ArgumentParser(description='YouTube Live Chat Message Rate Tracker') - parser.add_argument('video_id', help='The ID of the YouTube video for the live chat') - parser.add_argument('--show-chat', action='store_true', help='Show the live chat in the terminal window') - parser.add_argument('--superchats', action='store_true', help='Log superchat messages and amounts') - parser.add_argument('--threshold', type=int, default=5, help='Log timestamps when the message rate exceeds this value (msg/s)') - parser.add_argument('--keywords', type=str, help='Log timestamps when a message contains any of these keywords. Enter keywords separated by commas') - parser.add_argument('--cooldown', type=int, default=20, help='Minimum time that must pass before another notable moment is logged (in seconds)') + parser = argparse.ArgumentParser( + description='YouTube Live Chat Message Rate Tracker') + parser.add_argument( + 'video_id', help='The ID of the YouTube video for the live chat') + parser.add_argument('--show-chat', action='store_true', + help='Show the live chat in the terminal window') + parser.add_argument('--superchats', action='store_true', + help='Log superchat messages and amounts') + parser.add_argument('--threshold', type=int, default=5, + help='Log timestamps when the message rate exceeds this value (msg/s)') + parser.add_argument('--keywords', type=str, + help='Log timestamps when a message contains any of these keywords. Enter keywords separated by commas') + parser.add_argument('--cooldown', type=int, default=20, + help='Minimum time that must pass before another notable moment is logged (in seconds)') args = parser.parse_args() if os.path.exists(f"{args.video_id}-analytics.txt"): if input(f"File {args.video_id}-analytics.txt already exists. Overwrite? (y/n): ").lower() != "y": |
