diff options
| -rw-r--r-- | .gitignore | 171 | ||||
| -rw-r--r-- | Procfile | 1 | ||||
| -rw-r--r-- | app.py | 151 | ||||
| -rw-r--r-- | data/channels.txt | 4 | ||||
| -rw-r--r-- | data/exclude_channel.txt | 3 | ||||
| -rw-r--r-- | decorators.py | 16 | ||||
| -rw-r--r-- | fileutil.py | 93 | ||||
| -rw-r--r-- | graph.py | 35 | ||||
| -rw-r--r-- | member_colors.py | 30 | ||||
| -rw-r--r-- | nijitrack.py | 175 | ||||
| -rw-r--r-- | requirements.txt | 40 | ||||
| -rw-r--r-- | sql/pg_handler.py | 169 | ||||
| -rw-r--r-- | sql_table_config.json | 12 | ||||
| -rw-r--r-- | webapi/holodex.py | 75 | ||||
| -rw-r--r-- | webapi/web_api.py | 29 | ||||
| -rw-r--r-- | webapi/youtube.py | 61 |
16 files changed, 1065 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1103b73 --- /dev/null +++ b/.gitignore @@ -0,0 +1,171 @@ +__pycache__ +config.json +.venv +.github +main.py +.idea +tables +stats +.vscode + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ +index.html
\ No newline at end of file diff --git a/Procfile b/Procfile new file mode 100644 index 0000000..362d8e0 --- /dev/null +++ b/Procfile @@ -0,0 +1 @@ +web: gunicorn --bind 0.0.0.0:$PORT app:app
\ No newline at end of file @@ -0,0 +1,151 @@ +""" +Flask app for serving the static files +""" +from flask import Flask, send_file, jsonify +from flask_cors import CORS +from sql.pg_handler import PostgresHandler +import fileutil as fs +import datetime +import pandas +from sklearn.linear_model import Ridge +import numpy as np +import os + +app = Flask(__name__) +CORS(app) + +# Optional setting to use any of the custom options below +START_DATE = "2023-04-01" # 2023 April 1st + +# Do not include datapoints before the START_DATE for any /api/subscribers/ endpoint +# For when you only want to serve actual data you collected at those specific endpoints +ALL_EXCLUDE_MANUAL_DATA = False + +# Do not include datapoints before the START_DATE for any /api/subscribers/<channel_id> endpoint +# For when you only want to serve actual data you collected at those specific endpoints +INDIVIDUAL_EXCLUDE_MANUAL_DATA = True + +def create_database_connection(): + """ + Creates a database connection using the environment variables + :param: auth_append: str = "" - If you want to use a different set of variables for persisitance of sessions + """ + hostname = "***REMOVED***" + user = "***REMOVED***" + password = "***REMOVED***" + database = "postgres" + return PostgresHandler(host_name=hostname, username=user, password=password, database=database, port=5432) + +@app.route("/") +def index(): + try: + return send_file("index.html") + except Exception as e: + return jsonify({"error": str(e)}) + +@app.route("/api/subscribers") +def api_subscribers(): + server = create_database_connection() + query = 'SELECT sd.*, h.* FROM subscriber_data sd INNER JOIN "24h_historical" h ON sd.channel_id = h.channel_id ORDER BY sd.subscriber_count DESC' + data = server.execute_query(query) + channel_data_list = [{"channel_name": row[3], "profile_pic": row[2], "subscribers": row[4], "sub_org": row[5], "video_count": row[6], "day_diff": int(row[4] - int(row[10]))} for row in data] + subscriber_data = {"timestamp": datetime.datetime.now(), "channel_data": channel_data_list} + return jsonify(subscriber_data) + +@app.route("/api/subscribers/<channel_name>") +def api_subscribers_channel(channel_name): + server = create_database_connection() + query = "SELECT * FROM subscriber_data_historical WHERE name = %s AND timestamp > %s ORDER BY TO_CHAR(timestamp, 'YYYY-MM-DD')" + data = server.execute_query(query, (channel_name, START_DATE)) + labels = [] + data_points = [] + seen_dates = set() + for row in data: + date_string = row[5].strftime("%Y-%m-%d") + if date_string in seen_dates: + continue + labels.append(date_string) + data_points.append(row[4]) + seen_dates.add(date_string) + return jsonify({"labels": labels, "datasets": data_points}) + +@app.route("/api/subscribers/<channel_name>/7d") +def api_subscribers_channel_7d(channel_name): + server = create_database_connection() + query = "SELECT * FROM subscriber_data_historical WHERE name = %s ORDER BY TO_CHAR(timestamp, 'YYYY-MM-DD')" + data = server.execute_query(query, (channel_name,)) + labels = [] + data_points = [] + seen_dates = set() + for row in data: + date_string = row[5].strftime("%Y-%m-%d") + if date_string in seen_dates: + continue + labels.append(date_string) + data_points.append(row[4]) + seen_dates.add(date_string) + return jsonify({"labels": labels[-7:], "datasets": data_points[-7:]}) + +@app.route("/api/channel/<channel_name>") +def get_channel_information(channel_name): + def find_next_milestone(subscriber_count): + if subscriber_count < 100000: + return ((subscriber_count // 10000) + 1) * 10000 + elif subscriber_count < 1000000: + return ((subscriber_count // 100000) + 1) * 100000 + else: + return ((subscriber_count // 1000000) + 1) * 1000000 + server = create_database_connection() + query = "SELECT * FROM subscriber_data WHERE name = %s" + data = server.execute_query(query, (channel_name,)) + channel_data = {"channel_id": data[0][1], "channel_name": data[0][3], "profile_pic": data[0][2], "subscribers": data[0][4], "sub_org": data[0][5], "video_count": data[0][6]} + historical_data = server.execute_query("SELECT * FROM subscriber_data_historical WHERE name = %s", (channel_name,)) + current_subscriber_count = data[0][4] + subscriber_points = [] + date_strings = [] + seen_dates = set() + for row in historical_data: + date_string = row[5].strftime("%Y-%m-%d") + if date_string in seen_dates: + continue + subscriber_points.append(row[4]) + date_strings.append(date_string) + seen_dates.add(date_string) + data = {"subscribers": subscriber_points, "dates": date_strings} + df = pandas.DataFrame(data=data) + df['dates'] = pandas.to_datetime(df['dates']) + df.set_index('dates', inplace=True) + df.sort_index(inplace=True) + three_months_ago = datetime.datetime.now() - datetime.timedelta(days=90) + df = df[df.index > three_months_ago] + try: + model = Ridge(alpha=100) + X = np.array(range(len(df))).reshape(-1, 1) + y = df['subscribers'] + model.fit(X, y) + next_milestone = find_next_milestone(current_subscriber_count) + days_until_next_milestone = (next_milestone - model.intercept_) / model.coef_ + days_until_next_milestone_scalar = int(days_until_next_milestone[0]) + next_milestone_date = (df.index[0] + pandas.Timedelta(days=days_until_next_milestone_scalar)).date() + time_until_next_milestone = (next_milestone_date - datetime.datetime.now().date()).days + if time_until_next_milestone < 0: + raise OverflowError + channel_data["next_milestone_date"] = str(next_milestone_date) + channel_data["days_until_next_milestone"] = str(time_until_next_milestone) + channel_data["next_milestone"] = str(next_milestone) + except OverflowError: + channel_data["next_milestone_date"] = "N/A" + channel_data["days_until_next_milestone"] = "N/A" + channel_data["next_milestone"] = "N/A" + return jsonify(channel_data) +@app.route("/api/announcement") +def api_announcement(): + announcement_data = {"message": "None", "show_message": False} + return jsonify(announcement_data) + +@app.errorhandler(404) +def not_found(error): + return jsonify(error=str(error)), 404 + +if __name__ == "__main__": + app.run(debug=True)
\ No newline at end of file diff --git a/data/channels.txt b/data/channels.txt new file mode 100644 index 0000000..2b86946 --- /dev/null +++ b/data/channels.txt @@ -0,0 +1,4 @@ +# DELETE THIS: [Add new line delineated channels here]. One each line +# DELETE THIS: ChannelName: +# DELETE THIS: [Add new line delineated channels here]. One each line with channel id and name seperated by a colon +# DELETE THIS: Channel_ID:Channel_Name diff --git a/data/exclude_channel.txt b/data/exclude_channel.txt new file mode 100644 index 0000000..18663d6 --- /dev/null +++ b/data/exclude_channel.txt @@ -0,0 +1,3 @@ +DELETE_THIS and write channel id to ignore on each line +i.e UC-1234567890 +UC-0987654321
\ No newline at end of file diff --git a/decorators.py b/decorators.py new file mode 100644 index 0000000..bc0b420 --- /dev/null +++ b/decorators.py @@ -0,0 +1,16 @@ +import time + + +def log(message: str): + def decorator(func): + def wrapper(*args, **kwargs): + print("TASK: " + message) + start = time.time() + result = func(*args, **kwargs) + end = time.time() + print(f"COMPLETE: {message} {round(end - start, 3)} seconds") + return result + + return wrapper + + return decorator diff --git a/fileutil.py b/fileutil.py new file mode 100644 index 0000000..325ee0f --- /dev/null +++ b/fileutil.py @@ -0,0 +1,93 @@ +import os.path +import urllib.request +import json +import time +import configparser + + +def _read_file(path: str, lines=True) -> list: + # reads a file and returns a list of lines + with open(path, "r", encoding="utf-8") as file: + if not lines: + return file.read() + return file.read().splitlines() + + +def get_excluded_channels(): + # gets excluded channels from exclude_channel.txt + if not os.path.exists(os.path.join("data", "exclude_channel.txt")): + open(os.path.join("data", "exclude_channel.txt"), "w").close() + excluded_channels = _read_file(os.path.join("data", "exclude_channel.txt")) + return excluded_channels + +def update_excluded_channels(channel_ids: list): + # add to exclude_channel.txt if not already there + excluded_channels = get_excluded_channels() + for channel_id in channel_ids: + if channel_id not in excluded_channels: + excluded_channels.append(channel_id) + with open(os.path.join("data", "exclude_channel.txt"), "w", encoding="utf-8") as file: + for channel_id in excluded_channels: + file.write(channel_id + "\n") + +def save_local_channels(data: list, path: str = "data"): + """ + Save the channel names and ids locally for when the API is down + """ + path = os.path.join(path, "channels.txt") + excluded_channels = get_excluded_channels() + if not os.path.exists(path): + os.makedirs(os.path.dirname(path), exist_ok=True) + open(path, "w").close() + with open(path, "w", encoding="utf-8") as file: + for channel in data: + if channel["id"] in excluded_channels: + continue + file.write(f"{channel['id']},{channel['english_name']}\n") + + +def get_local_channels(path: str = "data"): + """ + Get the channel names and ids locally for when the API is down + """ + path = os.path.join(path, "channels.txt") + if not os.path.exists(path): + raise Exception("Local channel data not found") + with open(path, "r", encoding="utf-8") as file: + rows = file.read().splitlines() + return [tuple(row.split(":")) for row in rows] + + +def update_data_files(url: str) -> None: + # Updates the local txt channel data stored in data folder + if not os.path.exists(os.path.join("data", "channels.txt")): + open(os.path.join("data", "channels.txt"), "w").close() + urllib.request.urlretrieve( + url + "channels.txt", os.path.join("data", "channels.txt") + ) + # downloaded txt file from url and write to channels.txt + + if not os.path.exists(os.path.join("data", "exclude_channel.txt")): + open(os.path.join("data", "exclude_channel.txt"), "w").close() + urllib.request.urlretrieve( + url + "exclude_channel.txt", os.path.join("data", "exclude_channel.txt") + ) + + +def load_config(ini_filepath: str) -> dict: + config_object = configparser.ConfigParser() + file = open(ini_filepath, "r") + config_object.read_file(file) + output_dict = {} + sections = config_object.sections() + for section in sections: + output_dict[section] = {} + for key in config_object[section]: + output_dict[section][key] = config_object[section][key] + return output_dict + +def load_json_file(json_file_path: str) -> dict: + with open(json_file_path, "r", encoding="utf-8") as file: + return json.load(file) + + diff --git a/graph.py b/graph.py new file mode 100644 index 0000000..f7d8716 --- /dev/null +++ b/graph.py @@ -0,0 +1,35 @@ +import plotly.graph_objects as go +import plotly.express as px +import pandas as pd +import warnings +from member_colors import member_color_map +import random + +def plot_subscriber_count_over_time(server, table_name, gtitle="Subscriber Count Over Time for Phase Connect Members", + overrideQuery=None, markers="lines", exclude_channels=[]): + warnings.filterwarnings('ignore') # Ignore pandas warning regarding pyodbc + query = f"SELECT name, subscriber_count, timestamp, channel_id FROM {table_name} ORDER by timestamp DESC" if overrideQuery is None else overrideQuery + df = pd.read_sql_query(query, server.get_connection()) + groups = df.groupby("name") + fig = go.Figure() + config = dict({'responsive': True, 'displaylogo': False, 'modeBarButtonsToAdd': ['pan2d', 'zoomIn2d', 'zoomOut2d']}) + + for channel, group in groups: + if len(exclude_channels) != 0 and group['channel_id'].iloc[0] in exclude_channels: + continue + color = None + color = member_color_map.get(channel, '#' + ''.join(random.choices('0123456789ABCDEF', k=6))) + + fig.add_trace(go.Scattergl( + x=group["timestamp"], y=group["subscriber_count"], name=channel, mode=markers, + showlegend=True, line=dict(color=color))) + + fig.update_layout( + title={'text': gtitle, 'x': 0.5, 'xanchor': 'center', + 'yanchor': 'top', 'font': {'family': 'Droid Sans', 'size': 30}}, + xaxis_title="Date", + yaxis_title="Subscribers", + legend=dict(font=dict(size=16), title=dict(text="Channels")), + height=950, + ) + return fig.to_html(config=config) diff --git a/member_colors.py b/member_colors.py new file mode 100644 index 0000000..29ca1a0 --- /dev/null +++ b/member_colors.py @@ -0,0 +1,30 @@ +member_color_map = { + 'Rinkou Ashelia': '#D985B3', + 'Utatane Nasa': '#C69E90', + 'Pipkin Pippa': '#E78CA3', + 'Maemi Tenma': '#A99FAC', + 'Hakushika Iori': '#60C1F1', + 'Fujikura Uruka': '#687199', + 'Shisui Michiru': '#3D2539', + 'Remilia Nephys': '#723838', + 'Chisaka Airi': '#966D7A', + 'Amanogawa Shiina': '#5F5675', + 'Himemiya Rie': '#D168A2', + 'Erina Makina': '#3D4E68', + 'Komachi Panko': '#CCA3A3', + 'Kaneko Lumi': '#C8B8A0 ', + 'Ember Amane': '#ADAEFF', + 'Dizzy Dokuro': '#223268', + 'Jelly Hoshiumi': '#A9D1E6', + 'Saya Sairroxs': '#723838', + 'Runie Ruse': '#FF96B7', + 'Muu Muyu': '#DFAFED', + 'Hikanari Hina': '#6A8FB1', + 'Eimi Isami': '#FF9400', + 'Ayase Yuu': '#292A33', + 'Kokoromo Memory': '#FFB1D5', + 'Kaminari Clara': '#91BCC6', + 'Kannagi Loki': '#982428', + 'Fuura Yuri': '#000000', + 'Gram Pico': '#FFBBD7' +} diff --git a/nijitrack.py b/nijitrack.py new file mode 100644 index 0000000..6733fbc --- /dev/null +++ b/nijitrack.py @@ -0,0 +1,175 @@ +from datetime import datetime + +import fileutil as fs +from sql.pg_handler import PostgresHandler +from webapi.holodex import HolodexAPI +from webapi.youtube import YouTubeAPI +from b2sdk.v2 import * +import graph +from decorators import * +import argparse +import os +import pytz +import dotenv + +dotenv.load_dotenv() + +DATA_SETTING = fs.load_json_file("sql_table_config.json") + +def create_database_connection(): + """ + Creates a database connection using the environment variables + :param: auth_append: str = "" - If you want to use a different set of variables for persisitance of sessions + """ + hostname = os.environ.get("POSTGRES_HOST") + user = os.environ.get("POSTGRES_USER") + password = os.environ.get("POSTGRES_PASSWORD") + database = os.environ.get("POSTGRES_DATABASE") + return PostgresHandler(host_name=hostname, username=user, password=password, database=database, port=5432) + +@log("Initializing Database") +def initialize_database(server: PostgresHandler): + server.create_table(name = DATA_SETTING["TABLE_LIVE"], column = DATA_SETTING["LIVE_COLUMNS"]) + server.create_table(name = DATA_SETTING["TABLE_HISTORICAL"], column = DATA_SETTING["HISTORICAL_COLUMNS"]) + server.create_table(name = DATA_SETTING["TABLE_DAILY"], column = DATA_SETTING["DAILY_COLUMNS"]) + + +@log("Inserting Live Data into Database") +def record_subscriber_data(data: list, force_refresh: bool = False): + def transform_sql_string(string: str) -> str: + return string.encode("ascii", "ignore").decode().replace("'", "''") + def record_diff_data(data_tuple: tuple, refresh_daily: bool): + if not server.check_row_exists(DATA_SETTING["TABLE_DAILY"], "channel_id", channel_id): + # data_tuple = (channel_id, pfp, channel_name, sub_count, time.strftime('%Y-%m-%d %H:%M:%S')) + server.insert_row(DATA_SETTING["TABLE_DAILY"], DATA_SETTING["DAILY_HEADER"], (data_tuple[0], data_tuple[3])) + server.insert_row(table_name = DATA_SETTING["TABLE_HISTORICAL"], column = DATA_SETTING["HISTORICAL_HEADER"], data=data_tuple) + return + elif refresh_daily: + server.update_row(DATA_SETTING["TABLE_DAILY"], "channel_id", channel_id, "sub_diff", sub_count) + server.insert_row(table_name = DATA_SETTING["TABLE_HISTORICAL"], column = DATA_SETTING["HISTORICAL_HEADER"], data=data_tuple) + + def check_diff_refresh(): + last_updated = server.get_most_recently_added_row_time(DATA_SETTING["TABLE_HISTORICAL"])[0] + if not last_updated: + print("Failed to get the most recently added row time.") + return False + last_updated = pytz.timezone('US/Pacific').localize(last_updated) + utc_now = datetime.now(pytz.timezone('UTC')) + pst_now = utc_now.astimezone(pytz.timezone('US/Pacific')) + time_diff = pst_now - last_updated + if time_diff.days >= 1: + return True + elif time_diff.days == 0 and time_diff.seconds >= 85800: + return True + else: + print("Skipping Daily Refresh. It has not been a day yet") + return False + exclude_channels = fs.get_excluded_channels() + if force_refresh: + refresh_daily = True + else: + refresh_daily = check_diff_refresh() + for channel in data: + channel_id = channel["id"] + if channel_id in exclude_channels: + continue + pfp = channel["photo"] + sub_count = channel["subscriber_count"] + channel_name = channel["english_name"] + sub_org = channel["group"] + video_count = channel["video_count"] + if channel_name is None: + channel_name = channel["name"] + if sub_org is None: + sub_org = "Unknown" + channel_name = transform_sql_string(channel_name) + utc_now = datetime.now(pytz.timezone('UTC')) + pst_now = utc_now.astimezone(pytz.timezone('US/Pacific')) + formatted_time = pst_now.strftime('%Y-%m-%d %H:%M:%S') + data_tuple = (channel_id, pfp, channel_name, sub_count, sub_org, video_count, formatted_time) + historical_data_tuple = (channel_id, pfp, channel_name, sub_count, formatted_time) + server.insert_row(table_name = DATA_SETTING["TABLE_LIVE"], column = DATA_SETTING["LIVE_HEADER"], data=data_tuple) + record_diff_data(historical_data_tuple, refresh_daily) + + +@log("Running Holodex Generation") +def holodex_generation(server: PostgresHandler, force_refresh: bool = False): + """ + Generates the data from the Holodex API + """ + holodex_organizations = DATA_SETTING["HOLODEX_ORGS"].split(",") + server.clear_table(DATA_SETTING["TABLE_LIVE"]) + server.reset_auto_increment(DATA_SETTING["TABLE_LIVE"]) + holodex = HolodexAPI("6f94494a-1002-4893-9c71-f2ec9a2e4dff", organization="Phase%20Connect") + for organization in holodex_organizations: + holodex.set_organization(organization) + subscriber_data = holodex.get_subscriber_data() + record_subscriber_data(subscriber_data, force_refresh) + return holodex.get_generated_channel_data(), holodex.get_inactive_channels() + +@log("Running YouTube Generation") +def youtube_generation(server: PostgresHandler): + """ + Generates the data from the YouTube API + """ + ytapi = YouTubeAPI(os.environ.get("YOUTUBE_API_KEY")) + server.clear_table(DATA_SETTING["TABLE_LIVE"]) + server.reset_auto_increment(DATA_SETTING["TABLE_LIVE"]) + data = ytapi.get_data_all_channels(fs.get_local_channels()) + record_subscriber_data(data) + return data + +def combine_excluded_channel_ids(inactive_channel_data: list, excluded_channels: list): + """ + Combines the local excluded channels with the inactive channels from the API + """ + channel_ids = [] + for inactive_channel in inactive_channel_data: + if inactive_channel in excluded_channels: + continue + channel_ids.append(inactive_channel) + return channel_ids + +def uploadFileToBucket(filepath: str) -> bool: + try: + info = InMemoryAccountInfo() + b2_api = B2Api(info) + application_key_id = os.environ.get("B2_APP_ID") + application_key = os.environ.get("B2_APP_KEY") + file_info = {'how': 'good-file'} + b2_api.authorize_account("production", application_key_id, application_key) + b2_file_name = "graph.html" + bucket = b2_api.get_bucket_by_name("vtuber-rabbit-hole-archive") + bucket.upload_local_file(local_file=filepath, file_name=b2_file_name, file_info=file_info) + return True + except Exception as e: + print("An error occured while attempting to upload to B2") + print(e) + return False; + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="NijiTrack - A Subscriber Tracker") + parser.add_argument('--mode', choices=['yt', 'holodex'], help='Specify the data source to use (yt or holodex)') + parser.add_argument('--b2', action='store_true', help="Upload graph html to Backblaze B2") + parser.add_argument('--ff', action='store_true', help="Force a full refresh of all data (override daily refresh)") + args = parser.parse_args() + server = create_database_connection() + initialize_database(server) + if args.mode == 'yt': + print("Using YouTube API") + channel_data = youtube_generation(server) + inactive_channels = fs.get_excluded_channels() + else: + if args.ff: + print("Forcing a full refresh") + channel_data, inactive_channels = holodex_generation(server, force_refresh=True) + else: + channel_data, inactive_channels = holodex_generation(server) + fs.update_excluded_channels(inactive_channels) + graph_html = graph.plot_subscriber_count_over_time(server, DATA_SETTING["TABLE_HISTORICAL"], exclude_channels=combine_excluded_channel_ids(inactive_channels, fs.get_excluded_channels())) + with open("index.html", "w", encoding="utf-8") as file: + file.write(graph_html) + if args.b2: + uploadFileToBucket("index.html") + else: + print("Skipping B2 Upload")
\ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..284a02b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,40 @@ +b2sdk==1.29.0 +blinker==1.7.0 +certifi==2023.11.17 +charset-normalizer==3.3.2 +click==8.1.7 +docopt==0.6.2 +Flask==3.0.0 +Flask-Cors==4.0.0 +greenlet==3.0.1 +gunicorn==21.2.0 +idna==3.6 +itsdangerous==2.1.2 +Jinja2==3.1.2 +joblib==1.3.2 +logfury==1.0.1 +MarkupSafe==2.1.3 +mysql-connector-python==8.2.0 +numpy==1.26.2 +packaging==23.2 +pandas==2.1.3 +patsy==0.5.3 +pip-review==1.3.0 +pipreqs==0.4.13 +plotly==5.18.0 +protobuf==4.21.12 +psycopg2-binary==2.9.9 +python-dateutil==2.8.2 +pytz==2023.3.post1 +requests==2.31.0 +scikit-learn==1.3.2 +scipy==1.11.4 +six==1.16.0 +tenacity==8.2.3 +threadpoolctl==3.2.0 +tqdm==4.66.1 +typing_extensions==4.8.0 +tzdata==2023.3 +urllib3==2.1.0 +Werkzeug==3.0.1 +yarg==0.1.9 diff --git a/sql/pg_handler.py b/sql/pg_handler.py new file mode 100644 index 0000000..74a9170 --- /dev/null +++ b/sql/pg_handler.py @@ -0,0 +1,169 @@ +import psycopg2 +from psycopg2 import Error + +class PostgresHandler: + def __init__(self, username: str, password: str, host_name: str, port: int, database: str): + db_params = { + "dbname": database, + "user": username, + "password": password, + "host": host_name, + "port": port + } + self._connection = psycopg2.connect(**db_params) + print("Handler Success") + + def get_connection(self): + return self._connection + + def create_table(self, name: str, column: str): + cursor = self._connection.cursor() + cursor.execute(f'CREATE TABLE IF NOT EXISTS "{name}" ({column})') + self._connection.commit() + cursor.close() + + def clear_table(self, name: str): + cursor = self._connection.cursor() + cursor.execute(f"DELETE FROM {name}") + self._connection.commit() + cursor.close() + + def check_row_exists(self, table_name: str, column_name: str, value: str): + cursor = self._connection.cursor() + query = f'SELECT 1 FROM "{table_name}" WHERE {column_name} = %s' + cursor.execute(query, (value,)) + result = cursor.fetchone() + cursor.close() + + if result is not None: + return True + else: + return False + + def insert_row(self, table_name, column, data): + try: + cursor = self._connection.cursor() + placeholders = ', '.join(['%s'] * len(data)) + query = f'INSERT INTO "{table_name}" ({column}) VALUES ({placeholders})' + cursor.execute(query, data) + self._connection.commit() + print("Data Inserted:", data) + except Error as err: + self._connection.rollback() + print("Error inserting data") + print(err) + if "duplicate key" not in str(err).lower(): + return False + return True + + def update_row(self, table_name: str, column: str, value: str, update_column: str, update_value: str): + try: + cursor = self._connection.cursor() + query = f'UPDATE "{table_name}" SET {update_column} = %s WHERE {column} = %s' + cursor.execute(query, (update_value, value)) + self._connection.commit() + print("Data Updated:", value, update_value) + except Error as e: + self._connection.rollback() + print(f"Failed to update row from {table_name} WHERE {column} is {value}") + print(e) + return False + return True + + def get_rows(self, table_name: str, column: str, value: str): + try: + cursor = self._connection.cursor() + query = f'SELECT * FROM "{table_name}" WHERE {column} = %s' + cursor.execute(query, (value,)) + result = cursor.fetchall() + return result + except Error as e: + self._connection.rollback() + print(f"Failed to fetch row from {table_name} WHERE {column} is {value}") + print(e) + return False + + def get_random_row(self, table_name: str, count: int, condition: str = None): + if condition is None: + condition = "1 = 1" + try: + cursor = self._connection.cursor() + query = f"SELECT * FROM {table_name} WHERE {condition} ORDER BY RANDOM() LIMIT {str(count)}" + cursor.execute(query) + result = cursor.fetchall() + return result + except Error as e: + self._connection.rollback() + print(f"Failed to select random rows from {table_name}") + print(e) + return False + + def check_health(self): + cursor = self._connection.cursor() + cursor.execute("SELECT 1") + result = cursor.fetchone() + cursor.close() + if result is not None: + return True + else: + return False + + def delete_row(self, table_name: str, column: str, value: str): + try: + cursor = self._connection.cursor() + query = f"DELETE FROM {table_name} WHERE {column} = %s" + cursor.execute(query, (value,)) + self._connection.commit() + print("Data Deleted:", value) + except Error as e: + self._connection.rollback() + print(f"Failed to delete row from {table_name} WHERE {column} is {value}") + print(e) + return False + return True + + def execute_query(self, query: str, data: tuple = None): + try: + cursor = self._connection.cursor() + if data is None: + cursor.execute(query) + else: + cursor.execute(query, data) + result = cursor.fetchall() + return result + except Error as e: + self._connection.rollback() + print(f"Failed to execute query: {query}") + print(e) + return False + + def reset_auto_increment(self, table_name: str): + try: + cursor = self._connection.cursor() + query = f"ALTER SEQUENCE {table_name}_id RESTART WITH 1" + cursor.execute(query) + self._connection.commit() + print("Auto Increment Reset") + except Error as e: + self._connection.rollback() + print(f"Failed to reset auto increment for {table_name}") + print(e) + return False + return True + + def get_most_recently_added_row_time(self, table_name: str): + try: + cursor = self._connection.cursor() + query = f"SELECT timestamp FROM {table_name} ORDER BY id DESC LIMIT 1" + cursor.execute(query) + result = cursor.fetchone() + return result + except Error as e: + self._connection.rollback() + print(f"Failed to get most recently added row from {table_name}") + print(e) + return False + + + def close_connection(self): + self._connection.close()
\ No newline at end of file diff --git a/sql_table_config.json b/sql_table_config.json new file mode 100644 index 0000000..a1abb14 --- /dev/null +++ b/sql_table_config.json @@ -0,0 +1,12 @@ +{ + "TABLE_LIVE": "subscriber_data", + "TABLE_HISTORICAL": "subscriber_data_historical", + "TABLE_DAILY": "24h_historical", + "LIVE_COLUMNS": "id SERIAL PRIMARY KEY, channel_id VARCHAR(255), profile_pic VARCHAR(255), name VARCHAR(255), subscriber_count INT, suborg VARCHAR(255), video_count INT, timestamp TIMESTAMP", + "LIVE_HEADER": "channel_id, profile_pic, name, subscriber_count, suborg, video_count, timestamp", + "DAILY_COLUMNS": "id SERIAL PRIMARY KEY, channel_id VARCHAR(255), sub_diff INT", + "DAILY_HEADER": "channel_id, sub_diff", + "HISTORICAL_COLUMNS": "id SERIAL PRIMARY KEY, channel_id VARCHAR(255), profile_pic VARCHAR(255), name VARCHAR(255), subscriber_count INT, timestamp TIMESTAMP", + "HISTORICAL_HEADER": "channel_id, profile_pic, name, subscriber_count, timestamp", + "HOLODEX_ORGS": "Phase%20Connect" +} diff --git a/webapi/holodex.py b/webapi/holodex.py new file mode 100644 index 0000000..5f81892 --- /dev/null +++ b/webapi/holodex.py @@ -0,0 +1,75 @@ +from webapi.web_api import WebAPI +from typing import Iterable + + +class HolodexAPI(WebAPI): + """ + Class for interacting with the Holodex API + """ + + def __init__(self,api_key: str = None,member_count: int = 300,organization: str = "Nijisanji"): + super().__init__(api_key=api_key, base_url="https://holodex.net/api/v2/") + self.member_count = member_count + self.organization = organization + self._inactive_channels = [] + self._channel_data = [] + + def get_subscriber_data(self) -> Iterable: + """ + Gets data for all channels in a particular organization + """ + members = self.member_count + data = [] + active_channels = [] + offset = 0 + while members > 0: + data += self._download_url( + f"channels?type=vtuber&offset={offset}&limit=100&org={self.organization}" + ) + members -= 100 + offset += 100 + for channel in data: + print("DEBUG: ", channel["id"]) + try: + channel["description"] = self.get_channel_description(channel["id"]) + if channel["inactive"]: + self._inactive_channels.append(channel["id"]) + continue + active_channels.append(channel) + except (KeyError, TypeError, ValueError): + print("DEBUG:","An error occured with parsing ", channel["id"], channel["name"]) + continue + self._channel_data = active_channels + return active_channels + + def get_view_count(self, channel_id: str) -> int: + """ + Gets the view count for a particular channel + """ + data = self._download_url(f"channels/{channel_id}") + return data["view_count"] + + def get_channel_description(self, channel_id: str) -> str: + """ + Gets the description for a particular channel + """ + data = self._download_url(f"channels/{channel_id}") + return data["description"] + + def set_organization(self, organization: str): + """ + Sets the organization for the API + """ + self.organization = organization + + def get_inactive_channels(self) -> list: + """ + Gets the list of inactive channels + """ + return self._inactive_channels + + def get_generated_channel_data(self) -> list: + """ + Gets the list of channel data + """ + return self._channel_data diff --git a/webapi/web_api.py b/webapi/web_api.py new file mode 100644 index 0000000..525994c --- /dev/null +++ b/webapi/web_api.py @@ -0,0 +1,29 @@ +import urllib.request +import json + + +class WebAPI: + """ + General class for interacting with web APIs + """ + + def __init__(self, api_key: str, base_url: str) -> None: + self.api_key = api_key + self.base_url = base_url + + def _download_url(self, query: str, header = 'X-APIKEY') -> dict: + """ + Downloads the URL and returns the result as a string + param: + query: str - the query to be appended to the base URL + """ + if self.api_key is None: + raise Exception("API key not set") + opener = urllib.request.build_opener() + opener.addheaders = [(header, self.api_key)] + urllib.request.install_opener(opener) + response = urllib.request.urlopen(self.base_url + query) + json_results = response.read() + r_obj = json.loads(json_results) + response.close() + return r_obj diff --git a/webapi/youtube.py b/webapi/youtube.py new file mode 100644 index 0000000..a25f5ba --- /dev/null +++ b/webapi/youtube.py @@ -0,0 +1,61 @@ +from webapi.web_api import WebAPI + + +class YouTubeAPI(WebAPI): + """ + Class for interacting with the YouTube API + """ + + def __init__(self, api_key: str = None): + self.api_key = api_key + self.base_url = "https://www.googleapis.com/youtube/v3/" + + def _search_matching_id(self, id: str, data: list) -> dict: + """ + Searches for a info matching a given ID + param: + id: str - the ID to search for + """ + for entry in data: + if entry['id'] == id: + return entry + return None + + def get_data_all_channels(self, channel_tuples: list) -> list: + data = [] + members = len(channel_tuples) + request_chunks = [channel_tuples[i:i + 50] for i in range(0, members, 50)] + for chunk in request_chunks: + channel_ids = [x[0] for x in chunk] + channel_names = [x[1] for x in chunk] + request_string = ",".join(channel_ids) + stats = self._download_url( + f"channels?part=statistics&id={request_string}&key={self.api_key}") + snippet = self._download_url( + f"channels?part=snippet&id={request_string}&key={self.api_key}") + stats_list = stats['items'] + snippet_list = snippet['items'] + for i in range(len(stats_list)): + try: + # group/sub_org is used to further divide channels into subsets (sorta like teams) + # can't think of a better match via YouTube API rn other than customUrl + data_entry = {'english_name': channel_names[i], 'id': channel_ids[i], + 'subscriber_count': + self._search_matching_id(channel_ids[i], stats_list)['statistics']['subscriberCount'], + 'view_count': + self._search_matching_id(channel_ids[i], stats_list)['statistics']['viewCount'], + 'photo': + self._search_matching_id(channel_ids[i], snippet_list)['snippet']['thumbnails']['default']['url'], + 'description': + self._search_matching_id(channel_ids[i], snippet_list)['snippet']['description'], + 'group': + self._search_matching_id(channel_ids[i], snippet_list)['snippet']['customUrl'], + 'video_count': + self._search_matching_id(channel_ids[i], stats_list)['statistics']['videoCount'] + } + data.append(data_entry) + except TypeError: + print("Error NoneType: " + str(channel_ids[i])) + except KeyError: + print("Error KeyError: " + str(channel_ids[i])) + return data |
