diff options
| -rw-r--r-- | data_writer.py | 10 | ||||
| -rw-r--r-- | requirements.txt | 23 | ||||
| -rw-r--r-- | text_renderer.py | 41 | ||||
| -rw-r--r-- | yt_livechat_stats.py | 147 |
4 files changed, 146 insertions, 75 deletions
diff --git a/data_writer.py b/data_writer.py new file mode 100644 index 0000000..e74cb89 --- /dev/null +++ b/data_writer.py @@ -0,0 +1,10 @@ +import os +class DataWriter: + def __init__(self, filepath: str): + self.filepath = filepath + with open(self.filepath, "w") as f: + f.write("") + + def write(self, data: str): + with open(self.filepath, "a") as f: + f.write(data)
\ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..27bc818 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,23 @@ +anyio==4.2.0 +Brotli==1.1.0 +certifi==2023.11.17 +charset-normalizer==3.3.2 +colorama==0.4.6 +h11==0.14.0 +h2==4.1.0 +hpack==4.0.0 +httpcore==1.0.2 +httpx==0.26.0 +hyperframe==6.0.1 +idna==3.6 +markdown-it-py==3.0.0 +mdurl==0.1.2 +mutagen==1.47.0 +pycryptodomex==3.20.0 +Pygments==2.17.2 +pytchat==0.5.5 +requests==2.31.0 +sniffio==1.3.0 +urllib3==2.1.0 +websockets==12.0 +yt-dlp==2023.12.30 diff --git a/text_renderer.py b/text_renderer.py new file mode 100644 index 0000000..2d6c8a8 --- /dev/null +++ b/text_renderer.py @@ -0,0 +1,41 @@ +import curses + +class TextRenderer: + def __init__(self, stdscr): + curses.curs_set(0) + self.stdscr = stdscr + self.height, self.width = stdscr.getmaxyx() + self.log_window = curses.newwin(self.height - 4, self.width, 8, 0) + self.log_window.scrollok(True) + self.log_window.idlok(True) + self.log_window.refresh() + self.stdscr.nodelay(True) + self._next_regular_window_line = 0 # The next line to write to in the regular window + self._next_log_window_line = 0 # The next line to write to in the log window + + def clear_line(self, y_pos): + self.stdscr.addstr(0, y_pos, " " * self.width) + + def render(self, message, y_padding=0, y_pos=None): + y_position = self._next_regular_window_line + y_padding + if y_pos is not None: + y_position = y_pos + y_padding + self.stdscr.addstr(y_position, 0, message) # Corrected this line + if y_pos is None: + self._next_regular_window_line += 1 + + def log_message(self, message, y_padding=0): + y_position = self._next_log_window_line + y_padding + if y_position >= self.height - 4: + self.log_window.scroll(1) + y_position = self.height - 5 + self.log_window.addstr(y_position, 0, message) + self._next_log_window_line += 1 + self.log_window.refresh() + + def draw_horizontal_separator(self, y_pos): + self.stdscr.addstr(y_pos, 0, "-" * self.width) + + def refresh(self): + self.stdscr.refresh() + self.log_window.refresh()
\ No newline at end of file diff --git a/yt_livechat_stats.py b/yt_livechat_stats.py index 1639121..6e6e172 100644 --- a/yt_livechat_stats.py +++ b/yt_livechat_stats.py @@ -1,31 +1,50 @@ import pytchat import time import argparse -from currencies import CurrencyConv import curses +from text_renderer import TextRenderer +from data_writer import DataWriter +import yt_dlp import os class StreamEndedError(Exception): pass -def main(stdscr, video_id: str, args: argparse.Namespace): - curses.curs_set(0) - height, width = stdscr.getmaxyx() - max_num_messages = -(height-4) if args.max_messages is None else args.max_messages - stdscr.nodelay(True) - chat_window = curses.newwin(height - 4, width, 8, 0) +def get_message_rate(message_count: int, last_refresh_time: time.time) -> tuple: + if time.time() - last_refresh_time < 1: + return "", None + time_elapsed = time.time() - last_refresh_time + last_refresh_time = time.time() + message_rate_text = f"Messages per second: {message_count/time_elapsed:.5f}" + message_data = message_rate_text, message_count/time_elapsed + return message_data + +def check_for_superchats(chat_data, data_writer: DataWriter): + if chat_data.amountValue: + data_writer.write(f"Superchat: {chat_data.amountValue} from {chat_data.author.name}\n") + data_writer.write(f"Message: {chat_data.message}\n\n") + +def get_video_info(video_id: str) -> dict: + with yt_dlp.YoutubeDL() as ydl: + info = ydl.extract_info(video_id, download=False) + return info + +def main(stdscr, video_id: str, args: argparse.Namespace): chat = pytchat.create(video_id=video_id) + text_renderer = TextRenderer(stdscr) + data_writer = DataWriter(f"{video_id}-analytics.txt") + video_info = get_video_info(video_id) + stream_start_unix_time = video_info["release_timestamp"] + + message_count = 0 - current_earnings_superchat_usd = 0.0 - membership_count = 0 - superchat_messages = [] - start_time = time.time() + on_cooldown = False + cooldown_start_time = time.time() start_time_const = time.time() - if args.earnings: - currency_converter = CurrencyConv() + last_refresh_time = time.time() try: while chat.is_alive(): @@ -33,76 +52,54 @@ def main(stdscr, video_id: str, args: argparse.Namespace): key = stdscr.getch() if key == ord('q'): raise Exception("User ended data collection") - if args.earnings and chat_data.amountValue != 0.0: - current_earnings_superchat_usd += currency_converter.convert(chat_data.amountValue, chat_data.currency, "USD") - superchat_messages.append((chat_data.author.name, chat_data.amountString, chat_data.message)) - - if args.earnings and "Welcome New Member!" in chat_data.message: - superchat_messages.append((chat_data.author.name, "Membership", chat_data.message)) - membership_count += 1 - - message_count += 1 + timestamp_since_start = time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time_const)) + info_msg = "Now collecting data for " + video_id + " Elapsed time: " + timestamp_since_start + " seconds (Press q to quit)" + vid_info = f"Title: {video_info['title']} Channel: {video_info['uploader']} \nUnix Start Time: {stream_start_unix_time}" + text_renderer.render(info_msg, y_pos=0) + text_renderer.render(vid_info, y_pos=1) - elapsed_time = time.time() - start_time - if elapsed_time >= 1: - stdscr.erase() - messages_per_second = message_count / elapsed_time - if args.track_moments > 0 and messages_per_second > args.track_moments: - stdscr.addstr(1, 0, "Message rate exceeded " + str(args.track_moments) + " at " + str(time.time()), curses.A_BLINK) - if not os.path.exists(f"{video_id}-moments.txt"): - open(f"{video_id}-moments.txt", "w").close() - with open(f"{video_id}-moments.txt", "a") as f: - timestamp = chat_data.timestamp - if args.start_time > 0: - timestamp = timestamp - args.start_time - timestamp = time.strftime("%H:%M:%S", time.gmtime(timestamp)) - - f.write(f"Message rate exceeded {args.track_moments} per second at {chat_data.timestamp}\n") - messages_per_second_text = f"Messages per second: {messages_per_second:.2f}" - timestamp_since_start = time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time_const)) - info_msg = "Now collecting data for " + video_id + " Elapsed time: " + timestamp_since_start + " seconds (Press q to quit)" - chat_window_seperator_txt = "-" * width - stdscr.addstr(0, 0, info_msg, curses.A_BOLD) - stdscr.addstr(2, 0, messages_per_second_text) - stdscr.addstr(6, 0, chat_window_seperator_txt) - - if args.earnings: - current_earnings_usd = current_earnings_superchat_usd + membership_count * args.membership - earnings_text = f"Current earnings (USD): {current_earnings_usd:.2f}" - membership_count_text = f"New/Renewing Members: {membership_count}" - stdscr.addstr(3, 0, earnings_text) - stdscr.addstr(4, 0, membership_count_text) - - stdscr.refresh() + if on_cooldown and time.time() - cooldown_start_time > args.cooldown: + on_cooldown = False + + # Message Rate Related Features + if time.time() - last_refresh_time > 1 and not on_cooldown: + message_rate_text, message_rate = get_message_rate(message_count, last_refresh_time) + text_renderer.render(message_rate_text, y_pos=4) + last_refresh_time = time.time() + if message_rate > args.threshold: + diff_time_seconds = time.time() - stream_start_unix_time + youtube_timestamp = time.strftime("%H:%M:%S", time.gmtime(diff_time_seconds)) + data_writer.write(f"Notable moment found at {youtube_timestamp} measured at {message_rate}\n") + text_renderer.render(f"Latest notable moment found at {youtube_timestamp} measured at {message_rate} - on cooldown until {time.time() + args.cooldown}\n", y_pos=5) + on_cooldown = True message_count = 0 - start_time = time.time() + else: + message_count += 1 - # Update chat window - chat_window.erase() - for i, message in enumerate(superchat_messages[max_num_messages:]): - chat_window.addstr(i, 0, f"{message[0]}: {message[1]} - {message[2]}") - chat_window.refresh() + # Superchat Related Features (TODO) + #if args.superchats: + # check_for_superchats(chat_data, data_writer) + + text_renderer.render(get_message_rate(message_count, last_refresh_time)[0], y_pos=2) + if args.show_chat: + text_renderer.log_message("[Chat]" + chat_data.author.name + ": " + chat_data.message) + text_renderer.refresh() + except Exception as e: - with open(f"{video_id}.txt", "w") as f: - f.write(f"Data collection ended due to: {str(e)}\n") - f.write(f"Messages per second: {messages_per_second:.2f}\n") - if args.earnings: - f.write(f"Current earnings (USD): {current_earnings_usd:.2f}\n") - f.write(f"New/Renewing Members: {membership_count}\n") - f.write("--- Logged Superchats ---\n") - for message in superchat_messages: - f.write(f"{message[0]}: {message[1]} - {message[2]}\n") - print("Saving data collected so far to " + f"{video_id}.txt") - print(e) + data_writer.write(f"Data collection ended at {time.strftime('%H:%M:%S', time.gmtime(time.time() - stream_start_unix_time))} due to {e}\n") if __name__ == "__main__": parser = argparse.ArgumentParser(description='YouTube Live Chat Message Rate Tracker') parser.add_argument('video_id', help='The ID of the YouTube video for the live chat') - parser.add_argument('--earnings', action='store_true', help='Show earnings through Superchats, Super Stickers, and Memberships') - parser.add_argument('--membership', type=float, default=4.99, help='The amount of money a membership is worth') - parser.add_argument('--track-moments', type=int, default=0, help='Log timestamps when the message rate exceeds this value') - parser.add_argument('--max-messages', type=int, default=None, help="The maximum number of messages to collect") - parser.add_argument('-start-time', type=int, default=0, help='The known UNIX timestamp of when the stream started (for calculating timestamps') + parser.add_argument('--show-chat', action='store_true', help='Show the live chat in the terminal window') + parser.add_argument('--superchats', action='store_true', help='Log superchat messages and amounts') + parser.add_argument('--threshold', type=int, default=5, help='Log timestamps when the message rate exceeds this value (msg/s)') + parser.add_argument('--keywords', type=str, help='Log timestamps when a message contains any of these keywords. Enter keywords separated by commas') + parser.add_argument('--cooldown', type=int, default=20, help='Minimum time that must pass before another notable moment is logged (in seconds)') args = parser.parse_args() + if os.path.exists(f"{args.video_id}-analytics.txt"): + if input(f"File {args.video_id}-analytics.txt already exists. Overwrite? (y/n): ").lower() != "y": + print("Exiting...") + exit() curses.wrapper(main, args.video_id, args) |
