diff --git a/aoc_bot.py b/aoc_bot.py index 27de4fb..9e1a872 100644 --- a/aoc_bot.py +++ b/aoc_bot.py @@ -1,15 +1,191 @@ #!/usr/bin/env python3 +from __future__ import annotations import argparse import os.path -import time - import requests import sys -from datetime import datetime, timedelta -from time import sleep +from datetime import datetime, timedelta, timezone from tools.datafiles import JSONFile from tools.irc import IrcBot, ServerMessage -from tools.tools import human_readable_time_from_delta + + +def human_readable_time_from_delta(delta: timedelta, shortened: bool = False) -> str: + if shortened: + if delta.days > 0: + return ">24h" + + return "%02d:%02d:%02d" % (delta.seconds // 3600, delta.seconds % 3600 // 60, delta.seconds % 60) + else: + time_str = "" + if delta.days > 0: + time_str += "%d day%s, " % (delta.days, "s" if delta.days > 1 else "") + + if delta.seconds > 3600: + time_str += "%d hours, " % (delta.seconds // 3600) + else: + time_str += "" + + if delta.seconds % 3600 > 60: + time_str += "%d minutes, " % (delta.seconds % 3600 // 60) + else: + time_str += "" + + return time_str + "%d seconds" % (delta.seconds % 60) + + +class LeaderboardCache: + def __init__(self, cache_file_name: str, aoc_token: str, aoc_group_id: int, aoc_user_agent: str): + self.__cache = JSONFile(cache_file_name, create=True) + self.__aoc_token = aoc_token + self.__aoc_group_id = aoc_group_id + self.__aoc_user_agent = aoc_user_agent + self.__last_fetch_year = 2015 + self.__stats = {} + self.update_stats() + + def fetch_leaderboard(self, year: int) -> dict: + return requests.get( + "https://adventofcode.com/%d/leaderboard/private/view/%s.json" % (year, self.__aoc_group_id), + headers={"User-Agent": self.__aoc_user_agent}, + cookies={"session": self.__aoc_token}, + ).json() + + def update_stats(self, year: str | list[str] = None): + if year is None: + years = list(self.__cache.keys()) + elif isinstance(year, str): + years = [year] + elif isinstance(year, list): + years = year + else: + years = [str(year)] + + for year in years: + if year not in self.__cache: + print("Asked to analyze unknown year %s" % year) + continue + + year_keys = {} + year_stats = {"year": {}, "days": {}, "member_name_len": 0} + for member_id, member_data in self.__cache[year]["members"].items(): + member_key = (int(member_data["local_score"]), member_data["name"]) + year_keys[member_id] = member_key + year_stats["year"][member_key] = [" "] * 25 + year_stats["member_name_len"] = max(year_stats["member_name_len"], len(member_data["name"])) + + member_count = len(self.__cache[year]["members"]) + for day in map(str, range(1, 26)): + start_of_day = datetime(int(year), 12, int(day), 5, 0, 0, tzinfo=timezone.utc) + p1_times = [] + p2_times = [] + year_stats["days"][day] = {} + for member, member_data in self.__cache[year]["members"].items(): + if day not in member_data["completion_day_level"]: + continue + + member_day_data = member_data["completion_day_level"][day] + + if "1" in member_day_data: + p1_times.append(member_day_data["1"]["get_star_ts"]) + if "2" in member_day_data: + p2_times.append(member_day_data["2"]["get_star_ts"]) + + p1_times = list(sorted(p1_times)) + p2_times = list(sorted(p2_times)) + for member, member_data in self.__cache[year]["members"].items(): + if day not in member_data["completion_day_level"]: + continue + + member_day_data = member_data["completion_day_level"][day] + day_score = 0 + p1_time = datetime.fromtimestamp(member_day_data["1"]["get_star_ts"], timezone.utc) + p2_time = ( + datetime.fromtimestamp(member_day_data["2"]["get_star_ts"], timezone.utc) + if "2" in member_day_data + else None + ) + + day_score += member_count - p1_times.index(member_day_data["1"]["get_star_ts"]) + year_stats["year"][year_keys[member]][int(day) - 1] = "+" + + if p2_time is not None: + day_score += member_count - p2_times.index(member_day_data["2"]["get_star_ts"]) + year_stats["year"][year_keys[member]][int(day) - 1] = "*" + + year_stats["days"][day][(day_score, member_data["name"])] = { + "1": p1_time - start_of_day, + "2": None if p2_time is None else p2_time - start_of_day, + } + + self.__stats[year] = year_stats + + def update(self) -> dict: + now = datetime.now(timezone.utc) + if now.month == 12: # on season + fetch_year = now.year + else: + avail_years = list(range(2015, now.year)) + fetch_year_index = (avail_years.index(self.__last_fetch_year) + 1) % len(avail_years) + fetch_year = avail_years[fetch_year_index] + + try: + new_leaderboard = self.fetch_leaderboard(fetch_year) + self.__last_fetch_year = fetch_year + except Exception as e: + print("Updating leaderboard failed: %s" % e) + return {} # didn't work this time? Well, we'll just try again in 15min ... + + fetch_year = str(fetch_year) + if fetch_year not in self.__cache: + self.__cache[fetch_year] = {} + + if "last_update" not in self.__cache[fetch_year]: + last_fetch_ts = now + else: + last_fetch_ts = datetime.fromtimestamp(self.__cache[fetch_year]["last_update"], tz=timezone.utc) + + self.__cache[fetch_year] = new_leaderboard + self.__cache[fetch_year]["last_update"] = now.timestamp() + self.__cache.save() + self.update_stats(fetch_year) + + new_stars = {} + for member, member_data in new_leaderboard["members"].items(): + if datetime.fromtimestamp(member_data["last_star_ts"], timezone.utc) < last_fetch_ts: + continue + new_stars[member_data["name"]] = {} + for day, day_data in member_data["completion_day_level"].items(): + day_start = datetime(int(fetch_year), 12, int(day), 5, 0, 0, tzinfo=timezone.utc) + for part in ["1", "2"]: + if part not in day_data: + continue + get_star_ts = datetime.fromtimestamp(day_data[part]["get_star_ts"], timezone.utc) + if get_star_ts >= last_fetch_ts: + if fetch_year != str(now.year): + new_stars[member_data["name"]]["y%sd%sp%s" % (fetch_year, day, part)] = ( + get_star_ts - day_start + ) + else: + new_stars[member_data["name"]]["d%sp%s" % (day, part)] = get_star_ts - day_start + + return new_stars + + def has_year(self, year: str) -> bool: + return year in self.__stats + + def get_years(self) -> list[str]: + return list(sorted(self.__stats.keys())) + + def get_year_items(self, year: str) -> (str, int, str): + for (score, member_name), solve_string in sorted(self.__stats[year]["year"].items(), reverse=True): + yield member_name, score, solve_string + + def get_day_items(self, year: str, day: str) -> (str, int, timedelta, timedelta): + for (score, member_name), times in sorted(self.__stats[year]["days"][day].items(), reverse=True): + yield member_name, score, times["1"], times["2"] + + def get_year_member_name_len(self, year: str) -> int: + return self.__stats[year]["member_name_len"] class AOCBot: @@ -22,15 +198,21 @@ class AOCBot: self.__irc_real_name = None self.__aoc_group_id = None self.__aoc_session_file = None - self.__aoc_session_id = None self.__aoc_user_agent = None + self.__cache = None self.__cache_file = None - self.__cache_data = None self.__irc_bot = None + self.__irc_command_spam = {} + self.__irc_command_cooldown = {} self._load_config(config_file) def _load_config(self, config_file: str): - config = JSONFile(config_file, False) + try: + config = JSONFile(config_file, False) + except FileNotFoundError: + print("Config file not found: %s" % config_file) + sys.exit(1) + try: self.__irc_server = config["IRC"]["server"] self.__irc_port = config["IRC"]["port"] @@ -50,181 +232,165 @@ class AOCBot: print("CONFIGURATION ERROR: %s" % e) sys.exit(1) - def fetch_leaderboard(self, year: int = datetime.now().year) -> dict: - return requests.get( - "https://adventofcode.com/%d/leaderboard/private/view/%s.json" % (year, self.__aoc_group_id), - headers={"User-Agent": self.__aoc_user_agent}, - cookies={"session": self.__aoc_session_id}, - ).json() + def check_spam(self, msg_from: str, command: str) -> bool: + now = datetime.now(timezone.utc) + if msg_from not in self.__irc_command_spam: + self.__irc_command_spam[msg_from] = now - timedelta(minutes=10) + if command not in self.__irc_command_cooldown: + self.__irc_command_cooldown[command] = now - timedelta(minutes=10) + + min_time = now - timedelta(seconds=3) + if self.__irc_command_spam[msg_from] >= min_time or self.__irc_command_cooldown[command] >= min_time: + return False + + self.__irc_command_spam[msg_from] = now + self.__irc_command_cooldown[command] = now + return True def command_info(self, msg_from: str, message: str): - self.__irc_bot.privmsg( - msg_from, - "I am %s => %s" % (self.__irc_bot.getUser().nickname, self.__irc_bot.getUser().username), - ) - self.__irc_bot.privmsg(msg_from, "I am currently in the following channels:") - for c in self.__irc_bot.getChannelList(): - self.__irc_bot.privmsg(msg_from, "%s => %s" % (c.name, c.topic)) + if not self.check_spam(msg_from, "info"): + return + # recorded years + # last update (per year?) + self.__irc_bot.privmsg(self.__irc_channel, "Recorded Years: %s" % ", ".join(self.__cache.get_years())) - def command_today(self, msg_from: str, message: str): + def command_help(self, msg_from: str, message: str): + if not self.check_spam(msg_from, "help"): + return + + self.__irc_bot.privmsg(self.__irc_channel, "Available commands:") + self.__irc_bot.privmsg(self.__irc_channel, "!info - Show some stats") + self.__irc_bot.privmsg(self.__irc_channel, "!day [day [year]] - Show the leaderboard for today/specified day") + self.__irc_bot.privmsg(self.__irc_channel, "!year [year] - Show the leaderboard for this/specified year") + + def command_day(self, msg_from: str, message: str): + if not self.check_spam(msg_from, "day"): + return + + now = datetime.now(timezone.utc) if not message: - day = str(datetime.now().day) - else: - day = message.split(" ")[0] - try: - if not (1 <= int(day) <= 25): - self.__irc_bot.privmsg(self.__irc_channel, "Invalid day: " + day) - return - except ValueError: - self.__irc_bot.privmsg(self.__irc_channel, "Invalid day: " + day) + if now.month != 12: + self.__irc_bot.privmsg(self.__irc_channel, "*looks at calendar* Which day?") return - day_start = datetime.today().replace(hour=6, minute=0, second=0, day=int(day)) - today_list = [] - for member, member_data in self.__cache_data.items(): - if not member.startswith("__") and "days" in member_data and day in member_data["days"]: - today_list.append(member) + year = str(now.year) + day = str(now.day) + else: + message = message.split() + if len(message) == 2: + day = message[0] + year = message[1] + elif len(message) == 1: + day = message[0] + if now.month != 12: + year = str(now.year - 1) + else: + year = str(now.year) + else: + self.__irc_bot.privmsg(self.__irc_channel, "Invalid Parameters. See !help") + return + if not self.__cache.has_year(year): + self.__irc_bot.privmsg(self.__irc_channel, "unknown year: %s" % year) + return + + try: + d = int(day) + if not 1 <= d <= 25: + self.__irc_bot.privmsg(self.__irc_channel, "Invalid Day: %s" % day) + return + except ValueError: + self.__irc_bot.privmsg(self.__irc_channel, "Invalid Day: %s" % day) + return + + max_name_len = max(17, self.__cache.get_year_member_name_len(year)) self.__irc_bot.privmsg( - self.__irc_channel, - "Day %s's leaderboard (last updated: %s):" % (day, self.__cache_data["__last_update__"]), + self.__irc_channel, "Day %2s, Year %4s" % (day, year) + (" " * (max_name_len - 17)) + " p1 p2" ) - for i, member in enumerate( - sorted( - today_list, - key=lambda x: self.__cache_data[x]["days"][day]["score"], - reverse=True, - ) - ): - if i > 3: - sleep(1) # don't flood - - if "1" in self.__cache_data[member]["days"][day]: - p1_time = "in " + human_readable_time_from_delta( - datetime.fromisoformat(self.__cache_data[member]["days"][day]["1"]) - day_start - ) - else: - p1_time = "*not yet solved*" - - if "2" in self.__cache_data[member]["days"][day]: - p2_time = "in " + human_readable_time_from_delta( - datetime.fromisoformat(self.__cache_data[member]["days"][day]["2"]) - day_start - ) - else: - p2_time = "not yet solved" + format_string = "%-" + str(max_name_len) + "s (% 3s) %8s %8s" + for member_name, score, p1_time, p2_time in self.__cache.get_day_items(year=year, day=day): + if score == 0: + break self.__irc_bot.privmsg( self.__irc_channel, - "%d) %s (Scores: total: %d, today: %d) p1 %s, p2 %s" + format_string % ( - i + 1, - self.__cache_data[member]["name"], - self.__cache_data[member]["score"], - self.__cache_data[member]["days"][day]["score"], - p1_time, - p2_time, + member_name, + score, + human_readable_time_from_delta(p1_time, shortened=True), + human_readable_time_from_delta(p2_time, shortened=True) if p2_time is not None else "Not yet.", ), ) - def command_quit(self, msg_from: str, message: str): - if msg_from.startswith("stha!") or msg_from.startswith("Pennywise!"): - self.__irc_bot.privmsg(msg_from, "Oh, ok ... bye :'(") - self.__irc_bot.quit() - sys.exit(0) + def command_year(self, msg_from: str, message: str): + if not self.check_spam(msg_from, "year"): + return + + if message: + year = message + else: + now = datetime.now(timezone.utc) + if now.month == 12: + year = str(now.year) + else: + year = str(now.year - 1) + + if not self.__cache.has_year(year): + self.__irc_bot.privmsg(self.__irc_channel, "Unknown year: %s" % year) + return + + max_name_len = self.__cache.get_year_member_name_len(year) + self.__irc_bot.privmsg( + self.__irc_channel, "Year %4s" % year + (" " * (max_name_len - 9)) + " 1111111111222222" + ) + self.__irc_bot.privmsg(self.__irc_channel, (" " * max_name_len) + " 1234567890123456789012345") + format_string = "%-" + str(max_name_len) + "s (% 3s) %s" + + for member_name, score, solve_string in self.__cache.get_year_items(year): + if score == 0: + break + self.__irc_bot.privmsg(self.__irc_channel, format_string % (member_name, score, "".join(solve_string))) + + def update_leaderboard(self): + new_stars = self.__cache.update() + self.__last_update = datetime.now(timezone.utc) + if not new_stars: + return + + self.__irc_bot.privmsg(self.__irc_channel, "New Stars found:") + for member, member_data in sorted(new_stars.items()): + line = ( + member + + ": " + + ", ".join("%s (%s)" % (k, human_readable_time_from_delta(v)) for k, v in sorted(member_data.items())) + ) + self.__irc_bot.privmsg(self.__irc_channel, line) def on_raw(self, msg_from: str, msg_type: str, msg_to: str, message: str): print("[%s] <%s> (%s) -> <%s>: %s" % (datetime.now().strftime("%H:%M:%S"), msg_from, msg_type, msg_to, message)) - def calc_scores(self): - member_count = len([x for x in self.__cache_data.keys() if not x.startswith("__")]) - for day in map(str, range(1, 26)): - p1_times = [] - p2_times = [] - for member, member_data in self.__cache_data.items(): - if member.startswith("__") or day not in member_data["days"]: - continue - - self.__cache_data[member]["days"][day]["score"] = 0 - if "1" in member_data["days"][day]: - p1_times.append(member_data["days"][day]["1"]) - if "2" in member_data["days"][day]: - p2_times.append(member_data["days"][day]["2"]) - - for member, member_data in self.__cache_data.items(): - if member.startswith("__") or day not in member_data["days"]: - continue - - if "1" in member_data["days"][day] and member_data["days"][day]["1"] in p1_times: - score = member_count - sorted(p1_times).index(member_data["days"][day]["1"]) - self.__cache_data[member]["days"][day]["score"] += score - - if "2" in member_data["days"][day] and member_data["days"][day]["2"] in p2_times: - score = member_count - sorted(p2_times).index(member_data["days"][day]["2"]) - self.__cache_data[member]["days"][day]["score"] += score - - def update_leaderboard(self): - now = datetime.now() - aoc_year = now.year if now.month == 12 else now.year - 1 - - try: - new_leaderboard = self.fetch_leaderboard(aoc_year) - except Exception as e: - print("Updating leaderboard failed: %s" % e) - return # didn't work this time? Well, we'll just try again in 15min ... - - new_stars = {} - for member, member_data in new_leaderboard["members"].items(): - if member not in self.__cache_data: - self.__cache_data[member] = { - "name": member_data["name"], - "days": {}, - } - - self.__cache_data[member]["global_score"] = int(member_data["global_score"]) - self.__cache_data[member]["score"] = int(member_data["local_score"]) - self.__cache_data[member]["stars"] = int(member_data["stars"]) - for day in member_data["completion_day_level"]: - day_start = datetime(aoc_year, 12, int(day), 6, 0, 0) - if day not in self.__cache_data[member]["days"]: - self.__cache_data[member]["days"][day] = {} - - for part in member_data["completion_day_level"][day]: - if part not in self.__cache_data[member]["days"][day]: - completion_time = datetime.fromtimestamp( - member_data["completion_day_level"][day][part]["get_star_ts"] - ) - if member_data["name"] not in new_stars: - new_stars[member_data["name"]] = {} - - finishing_time = human_readable_time_from_delta(completion_time - day_start) - new_stars[member_data["name"]]["d" + day + "p" + part] = finishing_time - self.__cache_data[member]["days"][day][part] = completion_time.isoformat() - - if len(new_stars) > 0: - self.__irc_bot.privmsg(self.__irc_channel, "New Stars found:") - for member, parts in new_stars.items(): - line = member + ": " - line += ", ".join( - "%s (%s)" % (part, new_stars[member][part]) for part in sorted(new_stars[member].keys()) - ) - - self.__irc_bot.privmsg(self.__irc_channel, line) - - self.__cache_data["__last_update__"] = datetime.now().isoformat() - self.__cache_data.save() - self.calc_scores() + def flush_output(self): + sys.stdout.flush() + sys.stderr.flush() def start(self): - print("[AoCBot] Loading Cache File") - self.__cache_data = JSONFile(self.__cache_file, create=True) - print("[AoCBot] Loading Session ID") try: - self.__aoc_session_id = open(self.__aoc_session_file, "r").readlines()[0].strip() + aoc_session_id = open(self.__aoc_session_file, "r").readlines()[0].strip() except FileNotFoundError: print("[AoCBot] Session ID not found: %s" % self.__aoc_session_file) sys.exit(1) + print("[AoCBot] Loading Cache File") + self.__cache = LeaderboardCache( + cache_file_name=self.__cache_file, + aoc_token=aoc_session_id, + aoc_group_id=self.__aoc_group_id, + aoc_user_agent=self.__aoc_user_agent, + ) + self.__cache_data = JSONFile(self.__cache_file, create=True) + print("[AoCBot] Starting IrcBot") self.__irc_bot = IrcBot( server=self.__irc_server, @@ -240,14 +406,15 @@ class AOCBot: print("[AoCBot] Scheduling Leaderboard Update for Group ID %s" % self.__aoc_group_id) self.__irc_bot.schedule("update_leaderboard", timedelta(minutes=15), self.update_leaderboard) + if os.getenv("KUBERNETES_PORT", None) is not None: + self.__irc_bot.schedule("flush_output", timedelta(minutes=1), self.flush_output) print("[AoCBot] Registering Commands") self.__irc_bot.on(ServerMessage.RAW, self.on_raw) self.__irc_bot.register_channel_command("!info", self.__irc_channel, self.command_info) - self.__irc_bot.register_channel_command("!today", self.__irc_channel, self.command_today) - self.__irc_bot.register_channel_command("!day", self.__irc_channel, self.command_today) - self.__irc_bot.register_privmsg_command("info", self.command_info) - self.__irc_bot.register_privmsg_command("quit", self.command_quit) + self.__irc_bot.register_channel_command("!help", self.__irc_channel, self.command_help) + self.__irc_bot.register_channel_command("!day", self.__irc_channel, self.command_day) + self.__irc_bot.register_channel_command("!year", self.__irc_channel, self.command_year) print("[AoCBot] Starting Main Loop") self.__irc_bot.run() @@ -264,10 +431,5 @@ if __name__ == "__main__": ) args = parser.parse_args() - config_file = args.config_file - if not os.path.exists(config_file): - print("Config File not found: %s" % config_file) - sys.exit(1) - - aoc_bot = AOCBot(config_file) + aoc_bot = AOCBot(args.config_file) aoc_bot.start()