1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
import time
import fileutil as fs
from sql.sql_handler import SQLHandler
from webapi.holodex import HolodexAPI
from webapi.youtube import YouTubeAPI
import graph
from decorators import *
import argparse
CONFIG = fs.load_config("config.ini")
DATA_SETTING = fs.load_json_file("sql_table_config.json")
@log("Initializing Database")
def initialize_database(server: SQLHandler):
server.create_table(name = CONFIG["TABLES"]["live"], column = DATA_SETTING["LIVE_COLUMNS"])
server.create_table(name = CONFIG["TABLES"]["historical"], column = DATA_SETTING["HISTORICAL_COLUMNS"])
server.create_table(name = CONFIG["TABLES"]["daily"], column = DATA_SETTING["DAILY_COLUMNS"])
@log("Inserting Live Data into Database")
def record_subscriber_data(data: list):
def transform_sql_string(string: str) -> str:
return string.encode("ascii", "ignore").decode().replace("'", "''")
def record_diff_data(data_tuple: tuple, refresh_daily: bool):
if not server.check_row_exists(CONFIG["TABLES"]["daily"], "channel_id", channel_id):
# data_tuple = (channel_id, pfp, channel_name, sub_count, time.strftime('%Y-%m-%d %H:%M:%S'))
server.insert_row(CONFIG["TABLES"]["daily"], DATA_SETTING["DAILY_HEADER"], (data_tuple[0], data_tuple[3]))
server.insert_row(name = CONFIG["TABLES"]["historical"], column = DATA_SETTING["HISTORICAL_HEADER"], data=data_tuple)
return
elif refresh_daily:
server.update_row(CONFIG["TABLES"]["daily"], "channel_id", channel_id, "sub_diff", sub_count)
server.insert_row(name = CONFIG["TABLES"]["historical"], column = DATA_SETTING["HISTORICAL_HEADER"], data=data_tuple)
exclude_channels = fs.get_excluded_channels()
refresh_daily = fs.check_diff_refresh()
for channel in data:
channel_id = channel["id"]
if channel_id in exclude_channels:
continue
pfp = channel["photo"]
sub_count = channel["subscriber_count"]
channel_name = channel["english_name"]
sub_org = channel["group"]
video_count = channel["video_count"]
if channel_name is None:
channel_name = channel["name"]
if sub_org is None:
sub_org = "Unknown"
channel_name = transform_sql_string(channel_name)
data_tuple = (channel_id, pfp, channel_name, sub_count, sub_org, video_count, time.strftime('%Y-%m-%d %H:%M:%S'))
historical_data_tuple = (channel_id, pfp, channel_name, sub_count, time.strftime('%Y-%m-%d %H:%M:%S'))
server.insert_row(name = CONFIG["TABLES"]["live"], column = DATA_SETTING["LIVE_HEADER"], data=data_tuple)
record_diff_data(historical_data_tuple, refresh_daily)
@log("Running Holodex Generation")
def holodex_generation(server: SQLHandler):
"""
Generates the data from the Holodex API
"""
holodex_organizations = DATA_SETTING["HOLODEX_ORGS"].split(",")
server.clear_table(CONFIG["TABLES"]["live"])
server.reset_auto_increment(CONFIG["TABLES"]["live"])
holodex = HolodexAPI(CONFIG["API"]["holodex"], organization="Phase%20Connect")
for organization in holodex_organizations:
holodex.set_organization(organization)
subscriber_data = holodex.get_subscriber_data()
record_subscriber_data(subscriber_data)
return holodex.get_generated_channel_data(), holodex.get_inactive_channels()
@log("Running YouTube Generation")
def youtube_generation(server: SQLHandler):
"""
Generates the data from the YouTube API
"""
ytapi = YouTubeAPI(CONFIG["API"]["youtube"])
server.clear_table(CONFIG["TABLES"]["live"])
server.reset_auto_increment(CONFIG["TABLES"]["live"])
data = ytapi.get_data_all_channels(fs.get_local_channels())
record_subscriber_data(data)
return data
def combine_excluded_channel_ids(inactive_channel_data: list, excluded_channels: list):
"""
Combines the local excluded channels with the inactive channels from the API
"""
channel_ids = []
for inactive_channel in inactive_channel_data:
if inactive_channel in excluded_channels:
continue
channel_ids.append(inactive_channel)
return channel_ids
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="NijiTrack - A Subscriber Tracker")
parser.add_argument('--mode', choices=['yt', 'holodex'], help='Specify the data source to use (yt or holodex)')
args = parser.parse_args()
server = SQLHandler(CONFIG["SQL"]["host"], CONFIG["SQL"]["user"], CONFIG["SQL"]["password"], CONFIG["SQL"]["database"])
initialize_database(server)
if args.mode == 'yt':
print("Using YouTube API")
channel_data = youtube_generation(server)
inactive_channels = fs.get_excluded_channels()
else:
channel_data, inactive_channels = holodex_generation(server)
fs.update_excluded_channels(inactive_channels)
graph_html = graph.plot_subscriber_count_over_time(server, CONFIG["TABLES"]["historical"], exclude_channels=combine_excluded_channel_ids(inactive_channels, fs.get_excluded_channels()))
with open("index.html", "w", encoding="utf-8") as file:
file.write(graph_html)
|