#!/usr/bin/env python3 from __future__ import annotations import argparse import os.path from collections import defaultdict import requests import sys from datetime import datetime, timedelta, timezone from tools.datafiles import JSONFile from tools.irc import IrcBot, ServerMessage def human_readable_time_from_delta(delta: timedelta, shortened: bool = False) -> str: if shortened: if delta.days > 0: return ">24h" return "%02d:%02d:%02d" % (delta.seconds // 3600, delta.seconds % 3600 // 60, delta.seconds % 60) else: time_str = "" if delta.days > 0: time_str += "%d day%s, " % (delta.days, "s" if delta.days > 1 else "") if delta.seconds > 3600: time_str += "%d hours, " % (delta.seconds // 3600) else: time_str += "" if delta.seconds % 3600 > 60: time_str += "%d minutes, " % (delta.seconds % 3600 // 60) else: time_str += "" return time_str + "%d seconds" % (delta.seconds % 60) class LeaderboardCache: def __init__(self, cache_file_name: str, aoc_token: str, aoc_group_id: int, aoc_user_agent: str): self.__cache = JSONFile(cache_file_name, create=True) self.__aoc_token = aoc_token self.__aoc_group_id = aoc_group_id self.__aoc_user_agent = aoc_user_agent self.__last_fetch_year = 2015 self.__stats = {} self.__stars = {} self.update_stats() def fetch_leaderboard(self, year: int) -> dict: return requests.get( "https://adventofcode.com/%d/leaderboard/private/view/%s.json" % (year, self.__aoc_group_id), headers={"User-Agent": self.__aoc_user_agent}, cookies={"session": self.__aoc_token}, ).json() def update_stats(self, year: str | list[str] = None): if year is None: years = list(self.__cache.keys()) elif isinstance(year, str): years = [year] elif isinstance(year, list): years = year else: years = [str(year)] for year in years: if year not in self.__cache: print("Asked to analyze unknown year %s" % year) continue year_keys = {} year_stats = {"year": {}, "days": {}, "stars": {}, "member_name_len": 0} for member_id, member_data in self.__cache[year]["members"].items(): member_key = (int(member_data["local_score"]), member_data["name"]) year_keys[member_id] = member_key year_stats["year"][member_key] = [" "] * 25 year_stats["stars"][member_data["name"]] = member_data["stars"] year_stats["member_name_len"] = max(year_stats["member_name_len"], len(member_data["name"])) member_count = len(self.__cache[year]["members"]) for day in map(str, range(1, 26)): start_of_day = datetime(int(year), 12, int(day), 5, 0, 0, tzinfo=timezone.utc) p1_times = [] p2_times = [] year_stats["days"][day] = {} for member, member_data in self.__cache[year]["members"].items(): if day not in member_data["completion_day_level"]: continue member_day_data = member_data["completion_day_level"][day] if "1" in member_day_data: p1_times.append(member_day_data["1"]["get_star_ts"]) if "2" in member_day_data: p2_times.append(member_day_data["2"]["get_star_ts"]) p1_times = list(sorted(p1_times)) p2_times = list(sorted(p2_times)) for member, member_data in self.__cache[year]["members"].items(): if day not in member_data["completion_day_level"]: continue member_day_data = member_data["completion_day_level"][day] day_score = 0 p1_time = datetime.fromtimestamp(member_day_data["1"]["get_star_ts"], timezone.utc) p2_time = ( datetime.fromtimestamp(member_day_data["2"]["get_star_ts"], timezone.utc) if "2" in member_day_data else None ) day_score += member_count - p1_times.index(member_day_data["1"]["get_star_ts"]) year_stats["year"][year_keys[member]][int(day) - 1] = "+" if p2_time is not None: day_score += member_count - p2_times.index(member_day_data["2"]["get_star_ts"]) year_stats["year"][year_keys[member]][int(day) - 1] = "*" year_stats["days"][day][(day_score, member_data["name"])] = { "1": p1_time - start_of_day, "2": None if p2_time is None else p2_time - start_of_day, } self.__stats[year] = year_stats star_stats = defaultdict(int) for year, year_stats in self.__stats.items(): for member, stars in year_stats["stars"].items(): star_stats[member] += stars self.__stars = star_stats def update(self) -> dict: now = datetime.now(timezone.utc) if now.month == 12: # on season fetch_year = now.year else: avail_years = list(range(2015, now.year)) fetch_year_index = (avail_years.index(self.__last_fetch_year) + 1) % len(avail_years) fetch_year = avail_years[fetch_year_index] try: new_leaderboard = self.fetch_leaderboard(fetch_year) self.__last_fetch_year = fetch_year except Exception as e: print("Updating leaderboard failed: %s" % e) return {} # didn't work this time? Well, we'll just try again in 15min ... fetch_year = str(fetch_year) if fetch_year not in self.__cache: self.__cache[fetch_year] = {} if "last_update" not in self.__cache[fetch_year]: last_fetch_ts = now else: last_fetch_ts = datetime.fromtimestamp(self.__cache[fetch_year]["last_update"], tz=timezone.utc) self.__cache[fetch_year] = new_leaderboard self.__cache[fetch_year]["last_update"] = now.timestamp() self.__cache.save() self.update_stats(fetch_year) new_stars = {} for member, member_data in new_leaderboard["members"].items(): if datetime.fromtimestamp(member_data["last_star_ts"], timezone.utc) < last_fetch_ts: continue new_stars[member_data["name"]] = {} for day, day_data in member_data["completion_day_level"].items(): day_start = datetime(int(fetch_year), 12, int(day), 5, 0, 0, tzinfo=timezone.utc) for part in ["1", "2"]: if part not in day_data: continue get_star_ts = datetime.fromtimestamp(day_data[part]["get_star_ts"], timezone.utc) if get_star_ts >= last_fetch_ts: if fetch_year != str(now.year): new_stars[member_data["name"]]["y%sd%sp%s" % (fetch_year, day, part)] = ( get_star_ts - day_start ) else: new_stars[member_data["name"]]["d%sp%s" % (day, part)] = get_star_ts - day_start return new_stars def has_year(self, year: str) -> bool: return year in self.__stats def get_years(self) -> list[str]: return list(sorted(self.__stats.keys())) def get_year_stars(self, year: str): for member, stars in sorted(self.__stats[year]["stars"].items(), key=lambda x: x[1], reverse=True): yield member, stars def get_all_stars(self): for member, stars in sorted(self.__stars.items(), key=lambda x: x[1], reverse=True): yield member, stars def get_year_items(self, year: str) -> (str, int, str): for (score, member_name), solve_string in sorted(self.__stats[year]["year"].items(), reverse=True): yield member_name, score, solve_string def get_day_items(self, year: str, day: str) -> (str, int, timedelta, timedelta): for (score, member_name), times in sorted(self.__stats[year]["days"][day].items(), reverse=True): yield member_name, score, times["1"], times["2"] def get_year_member_name_len(self, year: str) -> int: return self.__stats[year]["member_name_len"] def get_all_member_name_len(self) -> int: return max(self.__stats[x]["member_name_len"] for x in self.__stats) class AOCBot: def __init__(self, config_file: str = "config.json"): self.__irc_server = None self.__irc_port = None self.__irc_channel = None self.__irc_nick = None self.__irc_user = None self.__irc_real_name = None self.__aoc_group_id = None self.__aoc_session_file = None self.__aoc_user_agent = None self.__cache = None self.__cache_file = None self.__irc_bot = None self.__irc_command_spam = {} self.__irc_command_cooldown = {} self._load_config(config_file) def _load_config(self, config_file: str): try: config = JSONFile(config_file, False) except FileNotFoundError: print("Config file not found: %s" % config_file) sys.exit(1) try: self.__irc_server = config["IRC"]["server"] self.__irc_port = config["IRC"]["port"] self.__irc_channel = config["IRC"]["channel"] self.__irc_nick = config["IRC"]["nick"] self.__irc_user = config["IRC"]["user"] self.__irc_real_name = config["IRC"]["name"] self.__aoc_group_id = config["AOC"]["group_id"] self.__aoc_session_file = config["AOC"]["session_file"] self.__aoc_user_agent = config["AOC"]["user_agent"] self.__cache_file = config["AOC"]["cache_file"] if "sasl_password" in config["IRC"]: self.__sasl_password = config["IRC"]["sasl_password"] else: self.__sasl_password = None except AttributeError as e: print("CONFIGURATION ERROR: %s" % e) sys.exit(1) def check_spam(self, msg_from: str, command: str) -> bool: now = datetime.now(timezone.utc) if msg_from not in self.__irc_command_spam: self.__irc_command_spam[msg_from] = now - timedelta(minutes=10) if command not in self.__irc_command_cooldown: self.__irc_command_cooldown[command] = now - timedelta(minutes=10) min_time = now - timedelta(seconds=3) if self.__irc_command_spam[msg_from] >= min_time or self.__irc_command_cooldown[command] >= min_time: return False self.__irc_command_spam[msg_from] = now self.__irc_command_cooldown[command] = now return True def command_info(self, msg_from: str, message: str): if not self.check_spam(msg_from, "info"): return # recorded years # last update (per year?) self.__irc_bot.privmsg(self.__irc_channel, "Recorded Years: %s" % ", ".join(self.__cache.get_years())) def command_help(self, msg_from: str, message: str): if not self.check_spam(msg_from, "help"): return self.__irc_bot.privmsg(self.__irc_channel, "Available commands:") self.__irc_bot.privmsg(self.__irc_channel, "!info - Show some stats") self.__irc_bot.privmsg(self.__irc_channel, "!day [day [year]] - Show the leaderboard for today/specified day") self.__irc_bot.privmsg(self.__irc_channel, "!year [year] - Show the leaderboard for this/specified year") def command_day(self, msg_from: str, message: str): if not self.check_spam(msg_from, "day"): return now = datetime.now(timezone.utc) if not message: if now.month != 12: self.__irc_bot.privmsg(self.__irc_channel, "*looks at calendar* Which day?") return year = str(now.year) day = str(now.day) else: message = message.split() if len(message) == 2: day = message[0] year = message[1] elif len(message) == 1: day = message[0] if now.month != 12: year = str(now.year - 1) else: year = str(now.year) else: self.__irc_bot.privmsg(self.__irc_channel, "Invalid Parameters. See !help") return if not self.__cache.has_year(year): self.__irc_bot.privmsg(self.__irc_channel, "unknown year: %s" % year) return try: d = int(day) if not 1 <= d <= 25: self.__irc_bot.privmsg(self.__irc_channel, "Invalid Day: %s" % day) return except ValueError: self.__irc_bot.privmsg(self.__irc_channel, "Invalid Day: %s" % day) return max_name_len = max(17, self.__cache.get_year_member_name_len(year)) self.__irc_bot.privmsg( self.__irc_channel, "Day %2s, Year %4s" % (day, year) + (" " * (max_name_len - 17)) + " p1 p2" ) format_string = "%-" + str(max_name_len) + "s (% 3s) %8s %8s" for member_name, score, p1_time, p2_time in self.__cache.get_day_items(year=year, day=day): if score == 0: break self.__irc_bot.privmsg( self.__irc_channel, format_string % ( member_name, score, human_readable_time_from_delta(p1_time, shortened=True), human_readable_time_from_delta(p2_time, shortened=True) if p2_time is not None else "Not yet.", ), ) def command_year(self, msg_from: str, message: str): if not self.check_spam(msg_from, "year"): return if message: year = message else: now = datetime.now(timezone.utc) if now.month == 12: year = str(now.year) else: year = str(now.year - 1) if not self.__cache.has_year(year): self.__irc_bot.privmsg(self.__irc_channel, "Unknown year: %s" % year) return max_name_len = self.__cache.get_year_member_name_len(year) self.__irc_bot.privmsg( self.__irc_channel, "Year %4s" % year + (" " * (max_name_len - 9)) + " 1111111111222222" ) self.__irc_bot.privmsg(self.__irc_channel, (" " * max_name_len) + " 1234567890123456789012345") format_string = "%-" + str(max_name_len) + "s (% 3s) %s" for member_name, score, solve_string in self.__cache.get_year_items(year): if score == 0: break self.__irc_bot.privmsg(self.__irc_channel, format_string % (member_name, score, "".join(solve_string))) def command_stars(self, msg_from: str, message: str): if not self.check_spam(msg_from, "stars"): return if not message: max_name_len = self.__cache.get_all_member_name_len() star_gen = self.__cache.get_all_stars() else: year = message if not self.__cache.has_year(year): self.__irc_bot.privmsg(self.__irc_channel, "Unknown year: %s" % year) return max_name_len = self.__cache.get_year_member_name_len(year) star_gen = self.__cache.get_year_stars(year) for count, (member, stars) in enumerate(star_gen): if stars == 0: break line = "%2d. %-" + str(max_name_len) + "s %4d Stars" self.__irc_bot.privmsg(self.__irc_channel, line % (count + 1, member, stars)) def update_leaderboard(self): new_stars = self.__cache.update() self.__last_update = datetime.now(timezone.utc) if not new_stars: return self.__irc_bot.privmsg(self.__irc_channel, "New Stars found:") for member, member_data in sorted(new_stars.items()): line = ( member + ": " + ", ".join("%s (%s)" % (k, human_readable_time_from_delta(v)) for k, v in sorted(member_data.items())) ) self.__irc_bot.privmsg(self.__irc_channel, line) def on_raw(self, msg_from: str, msg_type: str, msg_to: str, message: str): print("[%s] <%s> (%s) -> <%s>: %s" % (datetime.now().strftime("%H:%M:%S"), msg_from, msg_type, msg_to, message)) def on_quit(self, msg_from: str, msg_type: str, msg_to: str, message: str): if msg_from == self.__irc_bot.getUser().identifier: sys.exit(0) def flush_output(self): sys.stdout.flush() sys.stderr.flush() def start(self): print("[AoCBot] Loading Session ID") try: aoc_session_id = open(self.__aoc_session_file, "r").readlines()[0].strip() except FileNotFoundError: print("[AoCBot] Session ID not found: %s" % self.__aoc_session_file) sys.exit(1) print("[AoCBot] Loading Cache File") self.__cache = LeaderboardCache( cache_file_name=self.__cache_file, aoc_token=aoc_session_id, aoc_group_id=self.__aoc_group_id, aoc_user_agent=self.__aoc_user_agent, ) self.__cache_data = JSONFile(self.__cache_file, create=True) print("[AoCBot] Starting IrcBot") self.__irc_bot = IrcBot( server=self.__irc_server, port=self.__irc_port, nick=self.__irc_nick, username=self.__irc_user, realname=self.__irc_real_name, sasl_password=self.__sasl_password, ) print("[AoCBot] Joining Channel %s" % self.__irc_channel) self.__irc_bot.join(self.__irc_channel) print("[AoCBot] Scheduling Leaderboard Update for Group ID %s" % self.__aoc_group_id) self.__irc_bot.schedule("update_leaderboard", timedelta(minutes=15), self.update_leaderboard) if os.getenv("KUBERNETES_PORT", None) is not None: self.__irc_bot.schedule("flush_output", timedelta(minutes=1), self.flush_output) print("[AoCBot] Registering Commands") self.__irc_bot.on(ServerMessage.RAW, self.on_raw) self.__irc_bot.on(ServerMessage.MSG_QUIT, self.on_quit) self.__irc_bot.register_channel_command("!info", self.__irc_channel, self.command_info) self.__irc_bot.register_channel_command("!help", self.__irc_channel, self.command_help) self.__irc_bot.register_channel_command("!day", self.__irc_channel, self.command_day) self.__irc_bot.register_channel_command("!year", self.__irc_channel, self.command_year) self.__irc_bot.register_channel_command("!stars", self.__irc_channel, self.command_stars) print("[AoCBot] Starting Main Loop") self.__irc_bot.run() if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "-c", "--config-file", help="config file to use (default: config.json)", default="config.json", required=False, ) args = parser.parse_args() aoc_bot = AOCBot(args.config_file) aoc_bot.start()