diff options
| author | Pinapelz <yukais@pinapelz.com> | 2024-02-24 00:56:13 -0800 |
|---|---|---|
| committer | Pinapelz <yukais@pinapelz.com> | 2024-02-24 00:56:13 -0800 |
| commit | 6158616b53a41b8d9491515c38d071e602d83d5b (patch) | |
| tree | 7e5062cb71fdd6b910fa859cecd9739a414b0a89 /yt_livechat_stats.py | |
| parent | 5309609c7b00d33b064ad48d2185bf06cec503e6 (diff) | |
implement auto generated timestamp based on msg rate
Diffstat (limited to 'yt_livechat_stats.py')
| -rw-r--r-- | yt_livechat_stats.py | 147 |
1 files changed, 72 insertions, 75 deletions
diff --git a/yt_livechat_stats.py b/yt_livechat_stats.py index 1639121..6e6e172 100644 --- a/yt_livechat_stats.py +++ b/yt_livechat_stats.py @@ -1,31 +1,50 @@ import pytchat import time import argparse -from currencies import CurrencyConv import curses +from text_renderer import TextRenderer +from data_writer import DataWriter +import yt_dlp import os class StreamEndedError(Exception): pass -def main(stdscr, video_id: str, args: argparse.Namespace): - curses.curs_set(0) - height, width = stdscr.getmaxyx() - max_num_messages = -(height-4) if args.max_messages is None else args.max_messages - stdscr.nodelay(True) - chat_window = curses.newwin(height - 4, width, 8, 0) +def get_message_rate(message_count: int, last_refresh_time: time.time) -> tuple: + if time.time() - last_refresh_time < 1: + return "", None + time_elapsed = time.time() - last_refresh_time + last_refresh_time = time.time() + message_rate_text = f"Messages per second: {message_count/time_elapsed:.5f}" + message_data = message_rate_text, message_count/time_elapsed + return message_data + +def check_for_superchats(chat_data, data_writer: DataWriter): + if chat_data.amountValue: + data_writer.write(f"Superchat: {chat_data.amountValue} from {chat_data.author.name}\n") + data_writer.write(f"Message: {chat_data.message}\n\n") + +def get_video_info(video_id: str) -> dict: + with yt_dlp.YoutubeDL() as ydl: + info = ydl.extract_info(video_id, download=False) + return info + +def main(stdscr, video_id: str, args: argparse.Namespace): chat = pytchat.create(video_id=video_id) + text_renderer = TextRenderer(stdscr) + data_writer = DataWriter(f"{video_id}-analytics.txt") + video_info = get_video_info(video_id) + stream_start_unix_time = video_info["release_timestamp"] + + message_count = 0 - current_earnings_superchat_usd = 0.0 - membership_count = 0 - superchat_messages = [] - start_time = time.time() + on_cooldown = False + cooldown_start_time = time.time() start_time_const = time.time() - if args.earnings: - currency_converter = CurrencyConv() + last_refresh_time = time.time() try: while chat.is_alive(): @@ -33,76 +52,54 @@ def main(stdscr, video_id: str, args: argparse.Namespace): key = stdscr.getch() if key == ord('q'): raise Exception("User ended data collection") - if args.earnings and chat_data.amountValue != 0.0: - current_earnings_superchat_usd += currency_converter.convert(chat_data.amountValue, chat_data.currency, "USD") - superchat_messages.append((chat_data.author.name, chat_data.amountString, chat_data.message)) - - if args.earnings and "Welcome New Member!" in chat_data.message: - superchat_messages.append((chat_data.author.name, "Membership", chat_data.message)) - membership_count += 1 - - message_count += 1 + timestamp_since_start = time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time_const)) + info_msg = "Now collecting data for " + video_id + " Elapsed time: " + timestamp_since_start + " seconds (Press q to quit)" + vid_info = f"Title: {video_info['title']} Channel: {video_info['uploader']} \nUnix Start Time: {stream_start_unix_time}" + text_renderer.render(info_msg, y_pos=0) + text_renderer.render(vid_info, y_pos=1) - elapsed_time = time.time() - start_time - if elapsed_time >= 1: - stdscr.erase() - messages_per_second = message_count / elapsed_time - if args.track_moments > 0 and messages_per_second > args.track_moments: - stdscr.addstr(1, 0, "Message rate exceeded " + str(args.track_moments) + " at " + str(time.time()), curses.A_BLINK) - if not os.path.exists(f"{video_id}-moments.txt"): - open(f"{video_id}-moments.txt", "w").close() - with open(f"{video_id}-moments.txt", "a") as f: - timestamp = chat_data.timestamp - if args.start_time > 0: - timestamp = timestamp - args.start_time - timestamp = time.strftime("%H:%M:%S", time.gmtime(timestamp)) - - f.write(f"Message rate exceeded {args.track_moments} per second at {chat_data.timestamp}\n") - messages_per_second_text = f"Messages per second: {messages_per_second:.2f}" - timestamp_since_start = time.strftime("%H:%M:%S", time.gmtime(time.time() - start_time_const)) - info_msg = "Now collecting data for " + video_id + " Elapsed time: " + timestamp_since_start + " seconds (Press q to quit)" - chat_window_seperator_txt = "-" * width - stdscr.addstr(0, 0, info_msg, curses.A_BOLD) - stdscr.addstr(2, 0, messages_per_second_text) - stdscr.addstr(6, 0, chat_window_seperator_txt) - - if args.earnings: - current_earnings_usd = current_earnings_superchat_usd + membership_count * args.membership - earnings_text = f"Current earnings (USD): {current_earnings_usd:.2f}" - membership_count_text = f"New/Renewing Members: {membership_count}" - stdscr.addstr(3, 0, earnings_text) - stdscr.addstr(4, 0, membership_count_text) - - stdscr.refresh() + if on_cooldown and time.time() - cooldown_start_time > args.cooldown: + on_cooldown = False + + # Message Rate Related Features + if time.time() - last_refresh_time > 1 and not on_cooldown: + message_rate_text, message_rate = get_message_rate(message_count, last_refresh_time) + text_renderer.render(message_rate_text, y_pos=4) + last_refresh_time = time.time() + if message_rate > args.threshold: + diff_time_seconds = time.time() - stream_start_unix_time + youtube_timestamp = time.strftime("%H:%M:%S", time.gmtime(diff_time_seconds)) + data_writer.write(f"Notable moment found at {youtube_timestamp} measured at {message_rate}\n") + text_renderer.render(f"Latest notable moment found at {youtube_timestamp} measured at {message_rate} - on cooldown until {time.time() + args.cooldown}\n", y_pos=5) + on_cooldown = True message_count = 0 - start_time = time.time() + else: + message_count += 1 - # Update chat window - chat_window.erase() - for i, message in enumerate(superchat_messages[max_num_messages:]): - chat_window.addstr(i, 0, f"{message[0]}: {message[1]} - {message[2]}") - chat_window.refresh() + # Superchat Related Features (TODO) + #if args.superchats: + # check_for_superchats(chat_data, data_writer) + + text_renderer.render(get_message_rate(message_count, last_refresh_time)[0], y_pos=2) + if args.show_chat: + text_renderer.log_message("[Chat]" + chat_data.author.name + ": " + chat_data.message) + text_renderer.refresh() + except Exception as e: - with open(f"{video_id}.txt", "w") as f: - f.write(f"Data collection ended due to: {str(e)}\n") - f.write(f"Messages per second: {messages_per_second:.2f}\n") - if args.earnings: - f.write(f"Current earnings (USD): {current_earnings_usd:.2f}\n") - f.write(f"New/Renewing Members: {membership_count}\n") - f.write("--- Logged Superchats ---\n") - for message in superchat_messages: - f.write(f"{message[0]}: {message[1]} - {message[2]}\n") - print("Saving data collected so far to " + f"{video_id}.txt") - print(e) + data_writer.write(f"Data collection ended at {time.strftime('%H:%M:%S', time.gmtime(time.time() - stream_start_unix_time))} due to {e}\n") if __name__ == "__main__": parser = argparse.ArgumentParser(description='YouTube Live Chat Message Rate Tracker') parser.add_argument('video_id', help='The ID of the YouTube video for the live chat') - parser.add_argument('--earnings', action='store_true', help='Show earnings through Superchats, Super Stickers, and Memberships') - parser.add_argument('--membership', type=float, default=4.99, help='The amount of money a membership is worth') - parser.add_argument('--track-moments', type=int, default=0, help='Log timestamps when the message rate exceeds this value') - parser.add_argument('--max-messages', type=int, default=None, help="The maximum number of messages to collect") - parser.add_argument('-start-time', type=int, default=0, help='The known UNIX timestamp of when the stream started (for calculating timestamps') + parser.add_argument('--show-chat', action='store_true', help='Show the live chat in the terminal window') + parser.add_argument('--superchats', action='store_true', help='Log superchat messages and amounts') + parser.add_argument('--threshold', type=int, default=5, help='Log timestamps when the message rate exceeds this value (msg/s)') + parser.add_argument('--keywords', type=str, help='Log timestamps when a message contains any of these keywords. Enter keywords separated by commas') + parser.add_argument('--cooldown', type=int, default=20, help='Minimum time that must pass before another notable moment is logged (in seconds)') args = parser.parse_args() + if os.path.exists(f"{args.video_id}-analytics.txt"): + if input(f"File {args.video_id}-analytics.txt already exists. Overwrite? (y/n): ").lower() != "y": + print("Exiting...") + exit() curses.wrapper(main, args.video_id, args) |
