aocbot/aoc_bot.py

486 lines
19 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import os.path
from collections import defaultdict
import requests
import sys
from datetime import datetime, timedelta, timezone
from tools.datafiles import JSONFile
from tools.irc import IrcBot, ServerMessage
def human_readable_time_from_delta(delta: timedelta, shortened: bool = False) -> str:
if shortened:
if delta.days > 0:
return ">24h"
return "%02d:%02d:%02d" % (delta.seconds // 3600, delta.seconds % 3600 // 60, delta.seconds % 60)
else:
time_str = ""
if delta.days > 0:
time_str += "%d day%s, " % (delta.days, "s" if delta.days > 1 else "")
if delta.seconds > 3600:
time_str += "%d hours, " % (delta.seconds // 3600)
else:
time_str += ""
if delta.seconds % 3600 > 60:
time_str += "%d minutes, " % (delta.seconds % 3600 // 60)
else:
time_str += ""
return time_str + "%d seconds" % (delta.seconds % 60)
class LeaderboardCache:
def __init__(self, cache_file_name: str, aoc_token: str, aoc_group_id: int, aoc_user_agent: str):
self.__cache = JSONFile(cache_file_name, create=True)
self.__aoc_token = aoc_token
self.__aoc_group_id = aoc_group_id
self.__aoc_user_agent = aoc_user_agent
self.__last_fetch_year = 2015
self.__stats = {}
self.__stars = {}
self.update_stats()
def fetch_leaderboard(self, year: int) -> dict:
return requests.get(
"https://adventofcode.com/%d/leaderboard/private/view/%s.json" % (year, self.__aoc_group_id),
headers={"User-Agent": self.__aoc_user_agent},
cookies={"session": self.__aoc_token},
).json()
def update_stats(self, year: str | list[str] = None):
if year is None:
years = list(self.__cache.keys())
elif isinstance(year, str):
years = [year]
elif isinstance(year, list):
years = year
else:
years = [str(year)]
for year in years:
if year not in self.__cache:
print("Asked to analyze unknown year %s" % year)
continue
year_keys = {}
year_stats = {"year": {}, "days": {}, "stars": {}, "member_name_len": 0}
for member_id, member_data in self.__cache[year]["members"].items():
member_key = (int(member_data["local_score"]), member_data["name"])
year_keys[member_id] = member_key
year_stats["year"][member_key] = [" "] * 25
year_stats["stars"][member_data["name"]] = member_data["stars"]
year_stats["member_name_len"] = max(year_stats["member_name_len"], len(member_data["name"]))
member_count = len(self.__cache[year]["members"])
for day in map(str, range(1, 26)):
start_of_day = datetime(int(year), 12, int(day), 5, 0, 0, tzinfo=timezone.utc)
p1_times = []
p2_times = []
year_stats["days"][day] = {}
for member, member_data in self.__cache[year]["members"].items():
if day not in member_data["completion_day_level"]:
continue
member_day_data = member_data["completion_day_level"][day]
if "1" in member_day_data:
p1_times.append(member_day_data["1"]["get_star_ts"])
if "2" in member_day_data:
p2_times.append(member_day_data["2"]["get_star_ts"])
p1_times = list(sorted(p1_times))
p2_times = list(sorted(p2_times))
for member, member_data in self.__cache[year]["members"].items():
if day not in member_data["completion_day_level"]:
continue
member_day_data = member_data["completion_day_level"][day]
day_score = 0
p1_time = datetime.fromtimestamp(member_day_data["1"]["get_star_ts"], timezone.utc)
p2_time = (
datetime.fromtimestamp(member_day_data["2"]["get_star_ts"], timezone.utc)
if "2" in member_day_data
else None
)
day_score += member_count - p1_times.index(member_day_data["1"]["get_star_ts"])
year_stats["year"][year_keys[member]][int(day) - 1] = "+"
if p2_time is not None:
day_score += member_count - p2_times.index(member_day_data["2"]["get_star_ts"])
year_stats["year"][year_keys[member]][int(day) - 1] = "*"
year_stats["days"][day][(day_score, member_data["name"])] = {
"1": p1_time - start_of_day,
"2": None if p2_time is None else p2_time - start_of_day,
}
self.__stats[year] = year_stats
star_stats = defaultdict(int)
for year, year_stats in self.__stats.items():
for member, stars in year_stats["stars"].items():
star_stats[member] += stars
self.__stars = star_stats
def update(self) -> dict:
now = datetime.now(timezone.utc)
if now.month == 12: # on season
fetch_year = now.year
else:
avail_years = list(range(2015, now.year))
fetch_year_index = (avail_years.index(self.__last_fetch_year) + 1) % len(avail_years)
fetch_year = avail_years[fetch_year_index]
try:
new_leaderboard = self.fetch_leaderboard(fetch_year)
self.__last_fetch_year = fetch_year
except Exception as e:
print("Updating leaderboard failed: %s" % e)
return {} # didn't work this time? Well, we'll just try again in 15min ...
fetch_year = str(fetch_year)
if fetch_year not in self.__cache:
self.__cache[fetch_year] = {}
if "last_update" not in self.__cache[fetch_year]:
last_fetch_ts = now
else:
last_fetch_ts = datetime.fromtimestamp(self.__cache[fetch_year]["last_update"], tz=timezone.utc)
self.__cache[fetch_year] = new_leaderboard
self.__cache[fetch_year]["last_update"] = now.timestamp()
self.__cache.save()
self.update_stats(fetch_year)
new_stars = {}
for member, member_data in new_leaderboard["members"].items():
if datetime.fromtimestamp(member_data["last_star_ts"], timezone.utc) < last_fetch_ts:
continue
new_stars[member_data["name"]] = {}
for day, day_data in member_data["completion_day_level"].items():
day_start = datetime(int(fetch_year), 12, int(day), 5, 0, 0, tzinfo=timezone.utc)
for part in ["1", "2"]:
if part not in day_data:
continue
get_star_ts = datetime.fromtimestamp(day_data[part]["get_star_ts"], timezone.utc)
if get_star_ts >= last_fetch_ts:
if fetch_year != str(now.year):
new_stars[member_data["name"]]["y%sd%sp%s" % (fetch_year, day, part)] = (
get_star_ts - day_start
)
else:
new_stars[member_data["name"]]["d%sp%s" % (day, part)] = get_star_ts - day_start
return new_stars
def has_year(self, year: str) -> bool:
return year in self.__stats
def get_years(self) -> list[str]:
return list(sorted(self.__stats.keys()))
def get_year_stars(self, year: str):
for member, stars in sorted(self.__stats[year]["stars"].items(), key=lambda x: x[1], reverse=True):
yield member, stars
def get_all_stars(self):
for member, stars in sorted(self.__stars.items(), key=lambda x: x[1], reverse=True):
yield member, stars
def get_year_items(self, year: str) -> (str, int, str):
for (score, member_name), solve_string in sorted(self.__stats[year]["year"].items(), reverse=True):
yield member_name, score, solve_string
def get_day_items(self, year: str, day: str) -> (str, int, timedelta, timedelta):
for (score, member_name), times in sorted(self.__stats[year]["days"][day].items(), reverse=True):
yield member_name, score, times["1"], times["2"]
def get_year_member_name_len(self, year: str) -> int:
return self.__stats[year]["member_name_len"]
def get_all_member_name_len(self) -> int:
return max(self.__stats[x]["member_name_len"] for x in self.__stats)
class AOCBot:
def __init__(self, config_file: str = "config.json"):
self.__irc_server = None
self.__irc_port = None
self.__irc_channel = None
self.__irc_nick = None
self.__irc_user = None
self.__irc_real_name = None
self.__aoc_group_id = None
self.__aoc_session_file = None
self.__aoc_user_agent = None
self.__cache = None
self.__cache_file = None
self.__irc_bot = None
self.__irc_command_spam = {}
self.__irc_command_cooldown = {}
self._load_config(config_file)
def _load_config(self, config_file: str):
try:
config = JSONFile(config_file, False)
except FileNotFoundError:
print("Config file not found: %s" % config_file)
sys.exit(1)
try:
self.__irc_server = config["IRC"]["server"]
self.__irc_port = config["IRC"]["port"]
self.__irc_channel = config["IRC"]["channel"]
self.__irc_nick = config["IRC"]["nick"]
self.__irc_user = config["IRC"]["user"]
self.__irc_real_name = config["IRC"]["name"]
self.__aoc_group_id = config["AOC"]["group_id"]
self.__aoc_session_file = config["AOC"]["session_file"]
self.__aoc_user_agent = config["AOC"]["user_agent"]
self.__cache_file = config["AOC"]["cache_file"]
if "sasl_password" in config["IRC"]:
self.__sasl_password = config["IRC"]["sasl_password"]
else:
self.__sasl_password = None
except AttributeError as e:
print("CONFIGURATION ERROR: %s" % e)
sys.exit(1)
def check_spam(self, msg_from: str, command: str) -> bool:
now = datetime.now(timezone.utc)
if msg_from not in self.__irc_command_spam:
self.__irc_command_spam[msg_from] = now - timedelta(minutes=10)
if command not in self.__irc_command_cooldown:
self.__irc_command_cooldown[command] = now - timedelta(minutes=10)
min_time = now - timedelta(seconds=3)
if self.__irc_command_spam[msg_from] >= min_time or self.__irc_command_cooldown[command] >= min_time:
return False
self.__irc_command_spam[msg_from] = now
self.__irc_command_cooldown[command] = now
return True
def command_info(self, msg_from: str, message: str):
if not self.check_spam(msg_from, "info"):
return
# recorded years
# last update (per year?)
self.__irc_bot.privmsg(self.__irc_channel, "Recorded Years: %s" % ", ".join(self.__cache.get_years()))
def command_help(self, msg_from: str, message: str):
if not self.check_spam(msg_from, "help"):
return
self.__irc_bot.privmsg(self.__irc_channel, "Available commands:")
self.__irc_bot.privmsg(self.__irc_channel, "!info - Show some stats")
self.__irc_bot.privmsg(self.__irc_channel, "!day [day [year]] - Show the leaderboard for today/specified day")
self.__irc_bot.privmsg(self.__irc_channel, "!year [year] - Show the leaderboard for this/specified year")
def command_day(self, msg_from: str, message: str):
if not self.check_spam(msg_from, "day"):
return
now = datetime.now(timezone.utc)
if not message:
if now.month != 12:
self.__irc_bot.privmsg(self.__irc_channel, "*looks at calendar* Which day?")
return
year = str(now.year)
day = str(now.day)
else:
message = message.split()
if len(message) == 2:
day = message[0]
year = message[1]
elif len(message) == 1:
day = message[0]
if now.month != 12:
year = str(now.year - 1)
else:
year = str(now.year)
else:
self.__irc_bot.privmsg(self.__irc_channel, "Invalid Parameters. See !help")
return
if not self.__cache.has_year(year):
self.__irc_bot.privmsg(self.__irc_channel, "unknown year: %s" % year)
return
try:
d = int(day)
if not 1 <= d <= 25:
self.__irc_bot.privmsg(self.__irc_channel, "Invalid Day: %s" % day)
return
except ValueError:
self.__irc_bot.privmsg(self.__irc_channel, "Invalid Day: %s" % day)
return
max_name_len = max(17, self.__cache.get_year_member_name_len(year))
self.__irc_bot.privmsg(
self.__irc_channel, "Day %2s, Year %4s" % (day, year) + (" " * (max_name_len - 17)) + " p1 p2"
)
format_string = "%-" + str(max_name_len) + "s (% 3s) %8s %8s"
for member_name, score, p1_time, p2_time in self.__cache.get_day_items(year=year, day=day):
if score == 0:
break
self.__irc_bot.privmsg(
self.__irc_channel,
format_string
% (
member_name,
score,
human_readable_time_from_delta(p1_time, shortened=True),
human_readable_time_from_delta(p2_time, shortened=True) if p2_time is not None else "Not yet.",
),
)
def command_year(self, msg_from: str, message: str):
if not self.check_spam(msg_from, "year"):
return
if message:
year = message
else:
now = datetime.now(timezone.utc)
if now.month == 12:
year = str(now.year)
else:
year = str(now.year - 1)
if not self.__cache.has_year(year):
self.__irc_bot.privmsg(self.__irc_channel, "Unknown year: %s" % year)
return
max_name_len = self.__cache.get_year_member_name_len(year)
self.__irc_bot.privmsg(
self.__irc_channel, "Year %4s" % year + (" " * (max_name_len - 9)) + " 1111111111222222"
)
self.__irc_bot.privmsg(self.__irc_channel, (" " * max_name_len) + " 1234567890123456789012345")
format_string = "%-" + str(max_name_len) + "s (% 3s) %s"
for member_name, score, solve_string in self.__cache.get_year_items(year):
if score == 0:
break
self.__irc_bot.privmsg(self.__irc_channel, format_string % (member_name, score, "".join(solve_string)))
def command_stars(self, msg_from: str, message: str):
if not self.check_spam(msg_from, "stars"):
return
if not message:
max_name_len = self.__cache.get_all_member_name_len()
star_gen = self.__cache.get_all_stars()
else:
year = message
if not self.__cache.has_year(year):
self.__irc_bot.privmsg(self.__irc_channel, "Unknown year: %s" % year)
return
max_name_len = self.__cache.get_year_member_name_len(year)
star_gen = self.__cache.get_year_stars(year)
for count, (member, stars) in enumerate(star_gen):
if stars == 0:
break
line = "%2d. %-" + str(max_name_len) + "s %4d Stars"
self.__irc_bot.privmsg(self.__irc_channel, line % (count + 1, member, stars))
def update_leaderboard(self):
new_stars = self.__cache.update()
self.__last_update = datetime.now(timezone.utc)
if not new_stars:
return
self.__irc_bot.privmsg(self.__irc_channel, "New Stars found:")
for member, member_data in sorted(new_stars.items()):
line = (
member
+ ": "
+ ", ".join("%s (%s)" % (k, human_readable_time_from_delta(v)) for k, v in sorted(member_data.items()))
)
self.__irc_bot.privmsg(self.__irc_channel, line)
def on_raw(self, msg_from: str, msg_type: str, msg_to: str, message: str):
print("[%s] <%s> (%s) -> <%s>: %s" % (datetime.now().strftime("%H:%M:%S"), msg_from, msg_type, msg_to, message))
def on_quit(self, msg_from: str, msg_type: str, msg_to: str, message: str):
if msg_from == self.__irc_bot.getUser().identifier:
sys.exit(0)
def flush_output(self):
sys.stdout.flush()
sys.stderr.flush()
def start(self):
print("[AoCBot] Loading Session ID")
try:
aoc_session_id = open(self.__aoc_session_file, "r").readlines()[0].strip()
except FileNotFoundError:
print("[AoCBot] Session ID not found: %s" % self.__aoc_session_file)
sys.exit(1)
print("[AoCBot] Loading Cache File")
self.__cache = LeaderboardCache(
cache_file_name=self.__cache_file,
aoc_token=aoc_session_id,
aoc_group_id=self.__aoc_group_id,
aoc_user_agent=self.__aoc_user_agent,
)
self.__cache_data = JSONFile(self.__cache_file, create=True)
print("[AoCBot] Starting IrcBot")
self.__irc_bot = IrcBot(
server=self.__irc_server,
port=self.__irc_port,
nick=self.__irc_nick,
username=self.__irc_user,
realname=self.__irc_real_name,
sasl_password=self.__sasl_password,
)
print("[AoCBot] Joining Channel %s" % self.__irc_channel)
self.__irc_bot.join(self.__irc_channel)
print("[AoCBot] Scheduling Leaderboard Update for Group ID %s" % self.__aoc_group_id)
self.__irc_bot.schedule("update_leaderboard", timedelta(minutes=15), self.update_leaderboard)
if os.getenv("KUBERNETES_PORT", None) is not None:
self.__irc_bot.schedule("flush_output", timedelta(minutes=1), self.flush_output)
print("[AoCBot] Registering Commands")
self.__irc_bot.on(ServerMessage.RAW, self.on_raw)
self.__irc_bot.on(ServerMessage.MSG_QUIT, self.on_quit)
self.__irc_bot.register_channel_command("!info", self.__irc_channel, self.command_info)
self.__irc_bot.register_channel_command("!help", self.__irc_channel, self.command_help)
self.__irc_bot.register_channel_command("!day", self.__irc_channel, self.command_day)
self.__irc_bot.register_channel_command("!year", self.__irc_channel, self.command_year)
self.__irc_bot.register_channel_command("!stars", self.__irc_channel, self.command_stars)
print("[AoCBot] Starting Main Loop")
self.__irc_bot.run()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-c",
"--config-file",
help="config file to use (default: config.json)",
default="config.json",
required=False,
)
args = parser.parse_args()
aoc_bot = AOCBot(args.config_file)
aoc_bot.start()