#!/usr/bin/env python3 from __future__ import annotations import argparse import os.path import requests import sys from datetime import datetime, timedelta, timezone from tools.datafiles import JSONFile from tools.irc import IrcBot, ServerMessage def human_readable_time_from_delta(delta: timedelta, shortened: bool = False) -> str: if shortened: if delta.days > 0: return ">24h" return "%02d:%02d:%02d" % (delta.seconds // 3600, delta.seconds % 3600 // 60, delta.seconds % 60) else: time_str = "" if delta.days > 0: time_str += "%d day%s, " % (delta.days, "s" if delta.days > 1 else "") if delta.seconds > 3600: time_str += "%d hours, " % (delta.seconds // 3600) else: time_str += "" if delta.seconds % 3600 > 60: time_str += "%d minutes, " % (delta.seconds % 3600 // 60) else: time_str += "" return time_str + "%d seconds" % (delta.seconds % 60) class LeaderboardCache: def __init__(self, cache_file_name: str, aoc_token: str, aoc_group_id: int, aoc_user_agent: str): self.__cache = JSONFile(cache_file_name, create=True) self.__aoc_token = aoc_token self.__aoc_group_id = aoc_group_id self.__aoc_user_agent = aoc_user_agent self.__last_fetch_year = 2015 self.__stats = {} self.update_stats() def fetch_leaderboard(self, year: int) -> dict: return requests.get( "https://adventofcode.com/%d/leaderboard/private/view/%s.json" % (year, self.__aoc_group_id), headers={"User-Agent": self.__aoc_user_agent}, cookies={"session": self.__aoc_token}, ).json() def update_stats(self, year: str | list[str] = None): if year is None: years = list(self.__cache.keys()) elif isinstance(year, str): years = [year] elif isinstance(year, list): years = year else: years = [str(year)] for year in years: if year not in self.__cache: print("Asked to analyze unknown year %s" % year) continue year_keys = {} year_stats = {"year": {}, "days": {}, "member_name_len": 0} for member_id, member_data in self.__cache[year]["members"].items(): member_key = (int(member_data["local_score"]), member_data["name"]) year_keys[member_id] = member_key year_stats["year"][member_key] = [" "] * 25 year_stats["member_name_len"] = max(year_stats["member_name_len"], len(member_data["name"])) member_count = len(self.__cache[year]["members"]) for day in map(str, range(1, 26)): start_of_day = datetime(int(year), 12, int(day), 5, 0, 0, tzinfo=timezone.utc) p1_times = [] p2_times = [] year_stats["days"][day] = {} for member, member_data in self.__cache[year]["members"].items(): if day not in member_data["completion_day_level"]: continue member_day_data = member_data["completion_day_level"][day] if "1" in member_day_data: p1_times.append(member_day_data["1"]["get_star_ts"]) if "2" in member_day_data: p2_times.append(member_day_data["2"]["get_star_ts"]) p1_times = list(sorted(p1_times)) p2_times = list(sorted(p2_times)) for member, member_data in self.__cache[year]["members"].items(): if day not in member_data["completion_day_level"]: continue member_day_data = member_data["completion_day_level"][day] day_score = 0 p1_time = datetime.fromtimestamp(member_day_data["1"]["get_star_ts"], timezone.utc) p2_time = ( datetime.fromtimestamp(member_day_data["2"]["get_star_ts"], timezone.utc) if "2" in member_day_data else None ) day_score += member_count - p1_times.index(member_day_data["1"]["get_star_ts"]) year_stats["year"][year_keys[member]][int(day) - 1] = "+" if p2_time is not None: day_score += member_count - p2_times.index(member_day_data["2"]["get_star_ts"]) year_stats["year"][year_keys[member]][int(day) - 1] = "*" year_stats["days"][day][(day_score, member_data["name"])] = { "1": p1_time - start_of_day, "2": None if p2_time is None else p2_time - start_of_day, } self.__stats[year] = year_stats def update(self) -> dict: now = datetime.now(timezone.utc) if now.month == 12: # on season fetch_year = now.year else: avail_years = list(range(2015, now.year)) fetch_year_index = (avail_years.index(self.__last_fetch_year) + 1) % len(avail_years) fetch_year = avail_years[fetch_year_index] try: new_leaderboard = self.fetch_leaderboard(fetch_year) self.__last_fetch_year = fetch_year except Exception as e: print("Updating leaderboard failed: %s" % e) return {} # didn't work this time? Well, we'll just try again in 15min ... fetch_year = str(fetch_year) if fetch_year not in self.__cache: self.__cache[fetch_year] = {} if "last_update" not in self.__cache[fetch_year]: last_fetch_ts = now else: last_fetch_ts = datetime.fromtimestamp(self.__cache[fetch_year]["last_update"], tz=timezone.utc) self.__cache[fetch_year] = new_leaderboard self.__cache[fetch_year]["last_update"] = now.timestamp() self.__cache.save() self.update_stats(fetch_year) new_stars = {} for member, member_data in new_leaderboard["members"].items(): if datetime.fromtimestamp(member_data["last_star_ts"], timezone.utc) < last_fetch_ts: continue new_stars[member_data["name"]] = {} for day, day_data in member_data["completion_day_level"].items(): day_start = datetime(int(fetch_year), 12, int(day), 5, 0, 0, tzinfo=timezone.utc) for part in ["1", "2"]: if part not in day_data: continue get_star_ts = datetime.fromtimestamp(day_data[part]["get_star_ts"], timezone.utc) if get_star_ts >= last_fetch_ts: if fetch_year != str(now.year): new_stars[member_data["name"]]["y%sd%sp%s" % (fetch_year, day, part)] = ( get_star_ts - day_start ) else: new_stars[member_data["name"]]["d%sp%s" % (day, part)] = get_star_ts - day_start return new_stars def has_year(self, year: str) -> bool: return year in self.__stats def get_years(self) -> list[str]: return list(sorted(self.__stats.keys())) def get_year_items(self, year: str) -> (str, int, str): for (score, member_name), solve_string in sorted(self.__stats[year]["year"].items(), reverse=True): yield member_name, score, solve_string def get_day_items(self, year: str, day: str) -> (str, int, timedelta, timedelta): for (score, member_name), times in sorted(self.__stats[year]["days"][day].items(), reverse=True): yield member_name, score, times["1"], times["2"] def get_year_member_name_len(self, year: str) -> int: return self.__stats[year]["member_name_len"] class AOCBot: def __init__(self, config_file: str = "config.json"): self.__irc_server = None self.__irc_port = None self.__irc_channel = None self.__irc_nick = None self.__irc_user = None self.__irc_real_name = None self.__aoc_group_id = None self.__aoc_session_file = None self.__aoc_user_agent = None self.__cache = None self.__cache_file = None self.__irc_bot = None self.__irc_command_spam = {} self.__irc_command_cooldown = {} self._load_config(config_file) def _load_config(self, config_file: str): try: config = JSONFile(config_file, False) except FileNotFoundError: print("Config file not found: %s" % config_file) sys.exit(1) try: self.__irc_server = config["IRC"]["server"] self.__irc_port = config["IRC"]["port"] self.__irc_channel = config["IRC"]["channel"] self.__irc_nick = config["IRC"]["nick"] self.__irc_user = config["IRC"]["user"] self.__irc_real_name = config["IRC"]["name"] self.__aoc_group_id = config["AOC"]["group_id"] self.__aoc_session_file = config["AOC"]["session_file"] self.__aoc_user_agent = config["AOC"]["user_agent"] self.__cache_file = config["AOC"]["cache_file"] if "sasl_password" in config["IRC"]: self.__sasl_password = config["IRC"]["sasl_password"] else: self.__sasl_password = None except AttributeError as e: print("CONFIGURATION ERROR: %s" % e) sys.exit(1) def check_spam(self, msg_from: str, command: str) -> bool: now = datetime.now(timezone.utc) if msg_from not in self.__irc_command_spam: self.__irc_command_spam[msg_from] = now - timedelta(minutes=10) if command not in self.__irc_command_cooldown: self.__irc_command_cooldown[command] = now - timedelta(minutes=10) min_time = now - timedelta(seconds=3) if self.__irc_command_spam[msg_from] >= min_time or self.__irc_command_cooldown[command] >= min_time: return False self.__irc_command_spam[msg_from] = now self.__irc_command_cooldown[command] = now return True def command_info(self, msg_from: str, message: str): if not self.check_spam(msg_from, "info"): return # recorded years # last update (per year?) self.__irc_bot.privmsg(self.__irc_channel, "Recorded Years: %s" % ", ".join(self.__cache.get_years())) def command_help(self, msg_from: str, message: str): if not self.check_spam(msg_from, "help"): return self.__irc_bot.privmsg(self.__irc_channel, "Available commands:") self.__irc_bot.privmsg(self.__irc_channel, "!info - Show some stats") self.__irc_bot.privmsg(self.__irc_channel, "!day [day [year]] - Show the leaderboard for today/specified day") self.__irc_bot.privmsg(self.__irc_channel, "!year [year] - Show the leaderboard for this/specified year") def command_day(self, msg_from: str, message: str): if not self.check_spam(msg_from, "day"): return now = datetime.now(timezone.utc) if not message: if now.month != 12: self.__irc_bot.privmsg(self.__irc_channel, "*looks at calendar* Which day?") return year = str(now.year) day = str(now.day) else: message = message.split() if len(message) == 2: day = message[0] year = message[1] elif len(message) == 1: day = message[0] if now.month != 12: year = str(now.year - 1) else: year = str(now.year) else: self.__irc_bot.privmsg(self.__irc_channel, "Invalid Parameters. See !help") return if not self.__cache.has_year(year): self.__irc_bot.privmsg(self.__irc_channel, "unknown year: %s" % year) return try: d = int(day) if not 1 <= d <= 25: self.__irc_bot.privmsg(self.__irc_channel, "Invalid Day: %s" % day) return except ValueError: self.__irc_bot.privmsg(self.__irc_channel, "Invalid Day: %s" % day) return max_name_len = max(17, self.__cache.get_year_member_name_len(year)) self.__irc_bot.privmsg( self.__irc_channel, "Day %2s, Year %4s" % (day, year) + (" " * (max_name_len - 17)) + " p1 p2" ) format_string = "%-" + str(max_name_len) + "s (% 3s) %8s %8s" for member_name, score, p1_time, p2_time in self.__cache.get_day_items(year=year, day=day): if score == 0: break self.__irc_bot.privmsg( self.__irc_channel, format_string % ( member_name, score, human_readable_time_from_delta(p1_time, shortened=True), human_readable_time_from_delta(p2_time, shortened=True) if p2_time is not None else "Not yet.", ), ) def command_year(self, msg_from: str, message: str): if not self.check_spam(msg_from, "year"): return if message: year = message else: now = datetime.now(timezone.utc) if now.month == 12: year = str(now.year) else: year = str(now.year - 1) if not self.__cache.has_year(year): self.__irc_bot.privmsg(self.__irc_channel, "Unknown year: %s" % year) return max_name_len = self.__cache.get_year_member_name_len(year) self.__irc_bot.privmsg( self.__irc_channel, "Year %4s" % year + (" " * (max_name_len - 9)) + " 1111111111222222" ) self.__irc_bot.privmsg(self.__irc_channel, (" " * max_name_len) + " 1234567890123456789012345") format_string = "%-" + str(max_name_len) + "s (% 3s) %s" for member_name, score, solve_string in self.__cache.get_year_items(year): if score == 0: break self.__irc_bot.privmsg(self.__irc_channel, format_string % (member_name, score, "".join(solve_string))) def update_leaderboard(self): new_stars = self.__cache.update() self.__last_update = datetime.now(timezone.utc) if not new_stars: return self.__irc_bot.privmsg(self.__irc_channel, "New Stars found:") for member, member_data in sorted(new_stars.items()): line = ( member + ": " + ", ".join("%s (%s)" % (k, human_readable_time_from_delta(v)) for k, v in sorted(member_data.items())) ) self.__irc_bot.privmsg(self.__irc_channel, line) def on_raw(self, msg_from: str, msg_type: str, msg_to: str, message: str): print("[%s] <%s> (%s) -> <%s>: %s" % (datetime.now().strftime("%H:%M:%S"), msg_from, msg_type, msg_to, message)) def flush_output(self): sys.stdout.flush() sys.stderr.flush() def start(self): print("[AoCBot] Loading Session ID") try: aoc_session_id = open(self.__aoc_session_file, "r").readlines()[0].strip() except FileNotFoundError: print("[AoCBot] Session ID not found: %s" % self.__aoc_session_file) sys.exit(1) print("[AoCBot] Loading Cache File") self.__cache = LeaderboardCache( cache_file_name=self.__cache_file, aoc_token=aoc_session_id, aoc_group_id=self.__aoc_group_id, aoc_user_agent=self.__aoc_user_agent, ) self.__cache_data = JSONFile(self.__cache_file, create=True) print("[AoCBot] Starting IrcBot") self.__irc_bot = IrcBot( server=self.__irc_server, port=self.__irc_port, nick=self.__irc_nick, username=self.__irc_user, realname=self.__irc_real_name, sasl_password=self.__sasl_password, ) print("[AoCBot] Joining Channel %s" % self.__irc_channel) self.__irc_bot.join(self.__irc_channel) print("[AoCBot] Scheduling Leaderboard Update for Group ID %s" % self.__aoc_group_id) self.__irc_bot.schedule("update_leaderboard", timedelta(minutes=15), self.update_leaderboard) if os.getenv("KUBERNETES_PORT", None) is not None: self.__irc_bot.schedule("flush_output", timedelta(minutes=1), self.flush_output) print("[AoCBot] Registering Commands") self.__irc_bot.on(ServerMessage.RAW, self.on_raw) self.__irc_bot.register_channel_command("!info", self.__irc_channel, self.command_info) self.__irc_bot.register_channel_command("!help", self.__irc_channel, self.command_help) self.__irc_bot.register_channel_command("!day", self.__irc_channel, self.command_day) self.__irc_bot.register_channel_command("!year", self.__irc_channel, self.command_year) print("[AoCBot] Starting Main Loop") self.__irc_bot.run() if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "-c", "--config-file", help="config file to use (default: config.json)", default="config.json", required=False, ) args = parser.parse_args() aoc_bot = AOCBot(args.config_file) aoc_bot.start()