diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/btree_test.py b/btree_test.py new file mode 100644 index 0000000..97d5b77 --- /dev/null +++ b/btree_test.py @@ -0,0 +1,56 @@ +from heapq import heappop, heappush +from tools.trees import MinHeap, BinarySearchTree +from tools.stopwatch import StopWatch + + +s = StopWatch() +h = [] +for x in range(100_000): + heappush(h, x) +print("Heappush:", s.elapsed()) +s.reset() +while h: + heappop(h) +print("Heappop:", s.elapsed()) + +s = StopWatch() +h = MinHeap() +for x in range(100_000): + h.add(x) +print("MinHeap.add():", s.elapsed()) +s.reset() +while not h.empty(): + h.pop() +print("MinHeap.pop():", s.elapsed()) + +s = StopWatch() +b = set() +for x in range(1_000_000): + b.add(x) +print("set.add():", s.elapsed()) +s.reset() +for x in range(1_000_000): + _ = x in b +print("x in set:", s.elapsed()) + +s = StopWatch() +b = BinarySearchTree() +for x in range(1_000_000): + b.add(x) +print("AVL.add():", s.elapsed()) +s.reset() +for x in range(1_000_000): + _ = x in b +print("x in AVL:", s.elapsed()) + +print("DFS/BFS Test") +b = BinarySearchTree() +for x in range(20): + b.add(x) +b.print() +print("DFS:") +for x in b.iter_depth_first(): + print(x) +print("BFS:") +for x in b.iter_breadth_first(): + print(x) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2bd1a6d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +bs4~=0.0.1 +beautifulsoup4==4.11.1 +fishhook~=0.2.5 +pygame~=2.4.0 +requests==2.28.1 +setuptools==65.6.3 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..e1bee02 --- /dev/null +++ b/setup.py @@ -0,0 +1,12 @@ +from setuptools import setup + +setup( + name='py-tools', + version='0.2', + packages=['tools'], + url='', + license='GPLv3', + author='Stefan Harmuth', + author_email='pennywise@drock.de', + description='Just some small tools to make life easier' +) diff --git a/tools/__init__.py b/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tools/aoc.py b/tools/aoc.py new file mode 100644 index 0000000..31b07be --- /dev/null +++ b/tools/aoc.py @@ -0,0 +1,256 @@ +import os +import re +import subprocess + +import requests +import time +import webbrowser +from bs4 import BeautifulSoup +from tools.datafiles import JSONFile +from tools.stopwatch import StopWatch +from typing import Any, Callable, List, Tuple, Type, Union +from .tools import get_script_dir + +BASE_PATH = get_script_dir() +INPUTS_PATH = os.path.join(BASE_PATH, 'inputs') + + +class AOCDay: + year: int + day: int + input: List[str] # our input is always a list of str/lines + inputs: List[List[Tuple[Any, str]]] + part_func: List[Callable] + + def __init__(self, year: int, day: int): + self.day = day + self.year = year + self.part_func = [self.part1, self.part2] + self._current_test_file = None + self._current_test_solution = None + + def part1(self) -> Any: + raise NotImplementedError() + + def part2(self) -> Any: + raise NotImplementedError() + + def run_part(self, part: int, verbose: bool = False, measure_runtime: bool = False, timeit_number: int = 50): + case_count = 0 + for solution, input_file in self.inputs[part]: + self._current_test_solution, self._current_test_file = solution, input_file + exec_time = None + answer = None + self._load_input(input_file) + + if not measure_runtime or case_count < len(self.inputs[part]) - 1: + answer = self.part_func[part]() + else: + stopwatch = StopWatch() + for _ in range(timeit_number): + answer = self.part_func[part]() + stopwatch.stop() + exec_time = stopwatch.avg_string(timeit_number) + + if solution is None: + print_solution(self.day, part + 1, answer, solution, case_count, exec_time) + if answer not in {u"", b"", None, b"None", u"None", 0, '0'}: + self._submit(part + 1, answer) + else: + if verbose or answer != solution: + print_solution(self.day, part + 1, answer, solution, case_count, exec_time) + + if answer != solution: + return False + + case_count += 1 + if case_count == len(self.inputs[part]) and not verbose: + print_solution(self.day, part + 1, answer, exec_time=exec_time) + + def run(self, parts: int = 3, verbose: bool = False, measure_runtime: bool = False, timeit_number: int = 50): + if parts & 1: + self.run_part(0, verbose, measure_runtime, timeit_number) + if parts & 2: + self.run_part(1, verbose, measure_runtime, timeit_number) + + def _load_input(self, filename): + file_path = os.path.join(INPUTS_PATH, filename) + if not os.path.exists(file_path): + self._download_input(file_path) + + with open(os.path.join(INPUTS_PATH, filename)) as f: + self.input = f.read().splitlines() + + def _download_input(self, filename: str): + # FIXME: implement wait time for current day before 06:00:00 ? + session_id = open(".session", "r").readlines()[0].strip() + response = requests.get( + "https://adventofcode.com/%d/day/%d/input" % (self.year, self.day), + cookies={'session': session_id} + ) + if not response.ok: + print("FAILED to download input: (%s) %s" % (response.status_code, response.text)) + return + + with open(filename, "wb") as f: + f.write(response.content) + f.flush() + + if os.path.exists(".git"): + subprocess.call(["git", "add", filename]) + + def _submit(self, part: int, answer: Any): + answer_cache = JSONFile("answer_cache.json", create=True) + str_day = str(self.day) + str_part = str(part) + if str_day not in answer_cache: + answer_cache[str_day] = {} + + if str_part not in answer_cache[str_day]: + answer_cache[str_day][str_part] = { + 'wrong': [], + 'correct': None + } + + if answer in answer_cache[str_day][str_part]['wrong']: + print("Already tried %s. It was WRONG." % answer) + return + + if answer_cache[str_day][str_part]['correct'] is not None: + if answer == answer_cache[str_day][str_part]['correct']: + print("Already submitted %s. It was CORRECT." % answer) + return + else: + print("Already submitted an answer, but another one") + print("CORRECT was: %s" % answer_cache[str_day][str_part]['correct']) + print("Your answer: %s" % answer) + return + + print("Submitting %s as answer for %d part %d" % (answer, self.day, part)) + session_id = open(".session", "r").readlines()[0].strip() + response = requests.post( + "https://adventofcode.com/%d/day/%d/answer" % (self.year, self.day), + cookies={'session': session_id}, + data={'level': part, 'answer': answer} + ) + + if not response.ok: + print("Failed to submit answer: (%s) %s" % (response.status_code, response.text)) + + soup = BeautifulSoup(response.text, "html.parser") + message = soup.article.text + if "That's the right answer" in message: + answer_cache[str_day][str_part]['correct'] = answer + print("That's correct!") + webbrowser.open("https://adventofcode.com/%d/day/%d#part2" % (self.year, self.day)) + elif "That's not the right answer" in message: + answer_cache[str_day][str_part]['wrong'].append(answer) + print("That's WRONG!") + elif "You gave an answer too recently" in message: + # WAIT and retry + wait_pattern = r"You have (?:(\d+)m )?(\d+)s left to wait" + try: + [(minutes, seconds)] = re.findall(wait_pattern, message) + except ValueError: + print("wait_pattern unable to find wait_time in:") + print(message) + return + + seconds = int(seconds) + if minutes: + seconds += int(minutes) * 60 + + print("TOO SOON. Waiting %d seconds until auto-retry." % seconds) + time.sleep(seconds) + self._submit(part, answer) + return + else: + print("I don't know what this means:") + print(message) + return + + answer_cache.save() + + def getInput(self, return_type: Type = None) -> Any: + if len(self.input) == 1: + if return_type: + return return_type(self.input[0]) + else: + return self.input[0] + else: + if return_type: + return [return_type(i) for i in self.input] + else: + return self.input.copy() + + def getMultiLineInputAsArray(self, return_type: Type = None, join_char: str = None) -> List: + """ + get input for day x as 2d array, split by empty lines + """ + lines = self.input.copy() + lines.append('') + + return_array = [] + line_array = [] + for line in lines: + if not line: + if join_char: + return_array.append(join_char.join(line_array)) + else: + return_array.append(line_array) + line_array = [] + continue + + if return_type: + line_array.append(return_type(line)) + else: + line_array.append(line) + + return return_array + + def getInputAsArraySplit(self, split_char: str = ',', return_type: Union[Type, List[Type]] = None) -> List: + """ + get input for day x with the lines split by split_char + if input has only one line, returns a 1d array with the values + if input has multiple lines, returns a 2d array (a[line][values]) + """ + if len(self.input) == 1: + return split_line(line=self.input[0], split_char=split_char, return_type=return_type) + else: + return_array = [] + for line in self.input: + return_array.append(split_line(line=line, split_char=split_char, return_type=return_type)) + + return return_array + + +def print_solution(day: int, part: int, solution: Any, test: Any = None, test_case: int = 0, exec_time: str = None): + if test is not None: + print( + "%s (TEST day%d/part%d/case%d): got '%s'; expected '%s'" + % ("OK" if test == solution else "FAIL", day, part, test_case, solution, test) + ) + else: + print( + "Solution to day %s, part %s: %s" + % ( + day, + part, + solution, + ) + ) + + if exec_time: + print("Day %s, Part %s - Average run time: %s" % (day, part, exec_time)) + + +def split_line(line, split_char: str = ',', return_type: Union[Type, List[Type]] = None): + if split_char: + line = line.split(split_char) + + if return_type is None: + return line + elif isinstance(return_type, list): + return [return_type[x](i) if len(return_type) > x else i for x, i in enumerate(line)] + else: + return [return_type(i) for i in line] diff --git a/tools/aoc_ocr.py b/tools/aoc_ocr.py new file mode 100644 index 0000000..815e918 --- /dev/null +++ b/tools/aoc_ocr.py @@ -0,0 +1,61 @@ +# Copyright (c) 2020-present Benjamin Soyka +# Original and Licence at https://github.com/bsoyka/advent-of-code-ocr + +from collections.abc import Sequence + +ALPHABET_6 = { + ".##.\n#..#\n#..#\n####\n#..#\n#..#": "A", + "###.\n#..#\n###.\n#..#\n#..#\n###.": "B", + ".##.\n#..#\n#...\n#...\n#..#\n.##.": "C", + "####\n#...\n###.\n#...\n#...\n####": "E", + "####\n#...\n###.\n#...\n#...\n#...": "F", + ".##.\n#..#\n#...\n#.##\n#..#\n.###": "G", + "#..#\n#..#\n####\n#..#\n#..#\n#..#": "H", + ".###\n..#.\n..#.\n..#.\n..#.\n.###": "I", + "..##\n...#\n...#\n...#\n#..#\n.##.": "J", + "#..#\n#.#.\n##..\n#.#.\n#.#.\n#..#": "K", + "#...\n#...\n#...\n#...\n#...\n####": "L", + ".##.\n#..#\n#..#\n#..#\n#..#\n.##.": "O", + "###.\n#..#\n#..#\n###.\n#...\n#...": "P", + "###.\n#..#\n#..#\n###.\n#.#.\n#..#": "R", + ".###\n#...\n#...\n.##.\n...#\n###.": "S", + "#..#\n#..#\n#..#\n#..#\n#..#\n.##.": "U", + "#...\n#...\n.#.#\n..#.\n..#.\n..#.": "Y", + "####\n...#\n..#.\n.#..\n#...\n####": "Z", +} + + +def convert_6(input_text: str, *, fill_pixel: str = "#", empty_pixel: str = ".") -> str: + """Convert height 6 text to characters""" + input_text = input_text.replace(fill_pixel, "#").replace(empty_pixel, ".") + prepared_array = [list(line) for line in input_text.split("\n")] + return _convert_6(prepared_array) + + +def convert_array_6(array: Sequence[Sequence[str | int]], *, fill_pixel: str | int = "#", empty_pixel: str | int = ".") -> str: + """Convert a height 6 NumPy array or nested list to characters""" + prepared_array = [ + [ + "#" if pixel == fill_pixel else "." if pixel == empty_pixel else "" + for pixel in line + ] + for line in array + ] + return _convert_6(prepared_array) + + +def _convert_6(array: list[list[str]]) -> str: + """Convert a prepared height 6 array to characters""" + rows, cols = len(array), len(array[0]) + if any(len(row) != cols for row in array): + raise ValueError("all rows should have the same number of columns") + if rows != 6: + raise ValueError("incorrect number of rows (expected 6)") + + indices = [slice(start, start + 4) for start in range(0, cols, 5)] + result = [ + ALPHABET_6["\n".join("".join(row[index]) for row in array)] + for index in indices + ] + + return "".join(result) diff --git a/tools/coordinate.py b/tools/coordinate.py new file mode 100644 index 0000000..0871fe4 --- /dev/null +++ b/tools/coordinate.py @@ -0,0 +1,393 @@ +from __future__ import annotations +from dataclasses import dataclass +from enum import Enum +from math import gcd, sqrt, inf, atan2, degrees +from .math import round_half_up +from typing import Union, List, Optional + + +class DistanceAlgorithm(Enum): + MANHATTAN = 0 + EUCLIDEAN = 1 + PYTHAGOREAN = 1 + CHEBYSHEV = 2 + CHESSBOARD = 2 + + +@dataclass(frozen=True) +class Coordinate: + x: int + y: int + z: Optional[int] = None + + def getDistanceTo(self, target: Coordinate, algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN, + includeDiagonals: bool = False) -> Union[int, float]: + """ + Get distance to target Coordinate + + :param target: + :param algorithm: Calculation Algorithm (s. DistanceAlgorithm) + :param includeDiagonals: in Manhattan Mode specify if diagonal + movements are allowed (counts as 1.4 in 2D, 1.7 in 3D) + :return: Distance to Target + """ + if algorithm == DistanceAlgorithm.EUCLIDEAN: + if self.z is None: + return sqrt(abs(self.x - target.x) ** 2 + abs(self.y - target.y) ** 2) + else: + return sqrt(abs(self.x - target.x) ** 2 + abs(self.y - target.y) ** 2 + abs(self.z - target.z) ** 2) + elif algorithm == DistanceAlgorithm.CHEBYSHEV: + if self.z is None: + return max(abs(target.x - self.x), abs(target.y - self.y)) + else: + return max(abs(target.x - self.x), abs(target.y - self.y), abs(target.z - self.z)) + elif algorithm == DistanceAlgorithm.MANHATTAN: + if not includeDiagonals: + if self.z is None: + return abs(self.x - target.x) + abs(self.y - target.y) + else: + return abs(self.x - target.x) + abs(self.y - target.y) + abs(self.z - target.z) + else: + dist = [abs(self.x - target.x), abs(self.y - target.y)] + if self.z is None: + o_dist = max(dist) - min(dist) + return o_dist + 1.4 * min(dist) + else: + dist.append(abs(self.z - target.z)) + d_steps = min(dist) + dist.remove(min(dist)) + dist = [x - d_steps for x in dist] + o_dist = max(dist) - min(dist) + return 1.7 * d_steps + o_dist + 1.4 * min(dist) + + def inBoundaries(self, minX: int, minY: int, maxX: int, maxY: int, minZ: int = -inf, maxZ: int = inf) -> bool: + if self.z is None: + return minX <= self.x <= maxX and minY <= self.y <= maxY + else: + return minX <= self.x <= maxX and minY <= self.y <= maxY and minZ <= self.z <= maxZ + + def getCircle(self, radius: int = 1, algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN, + minX: int = -inf, minY: int = -inf, maxX: int = inf, maxY: int = inf, + minZ: int = -inf, maxZ: int = inf) -> list[Coordinate]: + ret = [] + if self.z is None: # mode 2D + for x in range(self.x - radius * 2, self.x + radius * 2 + 1): + for y in range(self.y - radius * 2, self.y + radius * 2 + 1): + target = Coordinate(x, y) + if not target.inBoundaries(minX, minY, maxX, maxY): + continue + dist = round_half_up(self.getDistanceTo(target, algorithm=algorithm, includeDiagonals=False)) + if dist == radius: + ret.append(target) + + else: + for x in range(self.x - radius * 2, self.x + radius * 2 + 1): + for y in range(self.y - radius * 2, self.y + radius * 2 + 1): + for z in range(self.z - radius * 2, self.z + radius * 2 + 1): + target = Coordinate(x, y) + if not target.inBoundaries(minX, minY, maxX, maxY, minZ, maxZ): + continue + dist = round_half_up(self.getDistanceTo(target, algorithm=algorithm, includeDiagonals=False)) + if dist == radius: + ret.append(target) + + return ret + + def getNeighbours(self, includeDiagonal: bool = True, minX: int = -inf, minY: int = -inf, + maxX: int = inf, maxY: int = inf, minZ: int = -inf, maxZ: int = inf) -> list[Coordinate]: + """ + Get a list of neighbouring coordinates. + + :param includeDiagonal: include diagonal neighbours + :param minX: ignore all neighbours that would have an X value below this + :param minY: ignore all neighbours that would have an Y value below this + :param minZ: ignore all neighbours that would have an Z value below this + :param maxX: ignore all neighbours that would have an X value above this + :param maxY: ignore all neighbours that would have an Y value above this + :param maxZ: ignore all neighbours that would have an Z value above this + :return: list of Coordinate + """ + if self.z is None: + if includeDiagonal: + nb_list = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)] + else: + nb_list = [(-1, 0), (1, 0), (0, -1), (0, 1)] + + for dx, dy in nb_list: + if minX <= self.x + dx <= maxX and minY <= self.y + dy <= maxY: + yield self.__class__(self.x + dx, self.y + dy) + else: + if includeDiagonal: + nb_list = [(x, y, z) for x in [-1, 0, 1] for y in [-1, 0, 1] for z in [-1, 0, 1]] + nb_list.remove((0, 0, 0)) + else: + nb_list = [(-1, 0, 0), (0, -1, 0), (1, 0, 0), (0, 1, 0), (0, 0, 1), (0, 0, -1)] + + for dx, dy, dz in nb_list: + if minX <= self.x + dx <= maxX and minY <= self.y + dy <= maxY and minZ <= self.z + dz <= maxZ: + yield self.__class__(self.x + dx, self.y + dy, self.z + dz) + + def getAngleTo(self, target: Coordinate, normalized: bool = False) -> float: + """normalized returns an angle going clockwise with 0 starting in the 'north'""" + if self.z is not None: + raise NotImplementedError() # which angle?!?! + + dx = target.x - self.x + dy = target.y - self.y + if not normalized: + return degrees(atan2(dy, dx)) + else: + angle = degrees(atan2(dx, dy)) + if dx >= 0: + return 180.0 - angle + else: + return 180.0 + abs(angle) + + def getLineTo(self, target: Coordinate) -> List[Coordinate]: + diff = target - self + + if self.z is None: + steps = gcd(diff.x, diff.y) + step_x = diff.x // steps + step_y = diff.y // steps + return [self.__class__(self.x + step_x * i, self.y + step_y * i) for i in range(steps + 1)] + else: + steps = gcd(diff.x, diff.y, diff.z) + step_x = diff.x // steps + step_y = diff.y // steps + step_z = diff.z // steps + return [self.__class__(self.x + step_x * i, self.y + step_y * i, self.z + step_z * i) for i in range(steps + 1)] + + def reverse(self) -> Coordinate: + if self.z is None: + return self.__class__(-self.x, -self.y) + else: + return self.__class__(-self.x, -self.y, -self.z) + + def __add__(self, other: Coordinate) -> Coordinate: + if self.z is None: + return self.__class__(self.x + other.x, self.y + other.y) + else: + return self.__class__(self.x + other.x, self.y + other.y, self.z + other.z) + + def __sub__(self, other: Coordinate) -> Coordinate: + if self.z is None: + return self.__class__(self.x - other.x, self.y - other.y) + else: + return self.__class__(self.x - other.x, self.y - other.y, self.z - other.z) + + def __mul__(self, other: int) -> Coordinate: + if self.z is None: + return self.__class__(self.x * other, self.y * other) + else: + return self.__class__(self.x * other, self.y * other, self.z * other) + + def __floordiv__(self, other) -> Coordinate: + if self.z is None: + return self.__class__(self.x // other, self.y // other) + else: + return self.__class__(self.x // other, self.y // other, self.z // other) + + def __truediv__(self, other): + return self // other + + def __eq__(self, other): + if not isinstance(other, Coordinate): + return False + return self.x == other.x and self.y == other.y and self.z == other.z + + def __gt__(self, other): + if self.z is None: + return self.x > other.x and self.y > other.y + else: + return self.x > other.x and self.y > other.y and self.z > other.z + + def __ge__(self, other): + if self.z is None: + return self.x >= other.x and self.y >= other.y + else: + return self.x >= other.x and self.y >= other.y and self.z >= other.z + + def __lt__(self, other): + if self.z is None: + return self.x < other.x and self.y < other.y + else: + return self.x < other.x and self.y < other.y and self.z < other.z + + def __le__(self, other): + if self.z is None: + return self.x <= other.x and self.y <= other.y + else: + return self.x <= other.x and self.y <= other.y and self.z <= other.z + + def __str__(self): + if self.z is None: + return "(%d,%d)" % (self.x, self.y) + else: + return "(%d,%d,%d)" % (self.x, self.y, self.z) + + def __repr__(self): + if self.z is None: + return "%s(x=%d, y=%d)" % (self.__class__.__name__, self.x, self.y) + else: + return "%s(x=%d, y=%d, z=%d)" % (self.__class__.__name__, self.x, self.y, self.z) + + @classmethod + def generate(cls, from_x: int, to_x: int, from_y: int, to_y: int, + from_z: int = None, to_z: int = None) -> List[Coordinate]: + if from_z is None or to_z is None: + return [cls(x, y) for x in range(from_x, to_x + 1) for y in range(from_y, to_y + 1)] + else: + return [ + cls(x, y, z) + for x in range(from_x, to_x + 1) + for y in range(from_y, to_y + 1) + for z in range(from_z, to_z + 1) + ] + + +class HexCoordinate(Coordinate): + """ + https://www.redblobgames.com/grids/hexagons/#coordinates-cube + Treat as 3d Coordinate + +y -x +z + y x z + yxz + z x y + -z +x -y + """ + neighbour_vectors = { + 'ne': Coordinate(-1, 0, 1), + 'nw': Coordinate(-1, 1, 0), + 'e': Coordinate(0, -1, 1), + 'w': Coordinate(0, 1, -1), + 'sw': Coordinate(1, 0, -1), + 'se': Coordinate(1, -1, 0), + } + + def __init__(self, x: int, y: int, z: int): + assert (x + y + z) == 0 + super().__init__(x, y, z) + + def get_length(self) -> int: + return (abs(self.x) + abs(self.y) + abs(self.z)) // 2 + + def getDistanceTo(self, target: Coordinate, algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN, + includeDiagonals: bool = True) -> Union[int, float]: + # includeDiagonals makes no sense in a hex grid, it's just here for signature reasons + if algorithm == DistanceAlgorithm.MANHATTAN: + return (self - target).get_length() + + def getNeighbours(self, includeDiagonal: bool = True, minX: int = -inf, minY: int = -inf, + maxX: int = inf, maxY: int = inf, minZ: int = -inf, maxZ: int = inf) -> list[Coordinate]: + # includeDiagonals makes no sense in a hex grid, it's just here for signature reasons + return [ + self + x for x in self.neighbour_vectors.values() + if minX <= (self + x).x <= maxX and minY <= (self + x).y <= maxY and minZ <= (self + x).z <= maxZ + ] + + +HexCoordinateR = HexCoordinate + + +class HexCoordinateF(HexCoordinate): + """ + https://www.redblobgames.com/grids/hexagons/#coordinates-cube + Treat as 3d Coordinate + +y -x + y x + -z z yxz z +z + x y + +x -y + """ + neighbour_vectors = { + 'ne': Coordinate(-1, 0, 1), + 'nw': Coordinate(0, 1, -1), + 'n': Coordinate(-1, 1, 0), + 's': Coordinate(1, -1, 0), + 'sw': Coordinate(1, 0, -1), + 'se': Coordinate(0, -1, 1), + } + + def __init__(self, x: int, y: int, z: int): + super().__init__(x, y, z) + + +class Shape: + def __init__(self, top_left: Coordinate, bottom_right: Coordinate): + """ + in 2D mode: top_left is the upper left corner and bottom_right the lower right + (top_left.x <= bottom_right.x and top_left.y <= bottom_right.y) + in 3D mode: same logic applied, just for 3D Coordinates + top_left is the upper left rear corner and bottom_right the lower right front + (top_left.x <= bottom_right.x and top_left.y <= bottom_right.y and top_left.z <= bottom_right.z) + """ + self.top_left = top_left + self.bottom_right = bottom_right + self.mode_3d = top_left.z is not None and bottom_right.z is not None + + def __len__(self): + if not self.mode_3d: + return (self.bottom_right.x - self.top_left.x + 1) * (self.bottom_right.y - self.top_left.y + 1) + else: + return ( + (self.bottom_right.x - self.top_left.x + 1) + * (self.bottom_right.y - self.top_left.y + 1) + * (self.bottom_right.z - self.top_left.z + 1) + ) + + def intersection(self, other: Shape) -> Union[Shape, None]: + """ + returns a Shape of the intersecting part, or None if the Shapes don't intersect + """ + if self.mode_3d != other.mode_3d: + raise ValueError("Cannot calculate intersection between 2d and 3d shape") + + if not self.mode_3d: + intersect_top_left = Coordinate( + self.top_left.x if self.top_left.x > other.top_left.x else other.top_left.x, + self.top_left.y if self.top_left.y > other.top_left.y else other.top_left.y, + ) + intersect_bottom_right = Coordinate( + self.bottom_right.x if self.bottom_right.x < other.bottom_right.x else other.bottom_right.x, + self.bottom_right.y if self.bottom_right.y < other.bottom_right.y else other.bottom_right.y, + ) + else: + intersect_top_left = Coordinate( + self.top_left.x if self.top_left.x > other.top_left.x else other.top_left.x, + self.top_left.y if self.top_left.y > other.top_left.y else other.top_left.y, + self.top_left.z if self.top_left.z > other.top_left.z else other.top_left.z, + ) + intersect_bottom_right = Coordinate( + self.bottom_right.x if self.bottom_right.x < other.bottom_right.x else other.bottom_right.x, + self.bottom_right.y if self.bottom_right.y < other.bottom_right.y else other.bottom_right.y, + self.bottom_right.z if self.bottom_right.z < other.bottom_right.z else other.bottom_right.z, + ) + + if intersect_top_left <= intersect_bottom_right: + return self.__class__(intersect_top_left, intersect_bottom_right) + + def __and__(self, other): + return self.intersection(other) + + def __rand__(self, other): + return self.intersection(other) + + def __str__(self): + return "%s(%s -> %s)" % (self.__class__.__name__, self.top_left, self.bottom_right) + + def __repr__(self): + return "%s(%s, %s)" % (self.__class__.__name__, self.top_left, self.bottom_right) + + +class Square(Shape): + def __init__(self, top_left, bottom_right): + super().__init__(top_left, bottom_right) + self.mode_3d = False + + +class Cube(Shape): + def __init__(self, top_left, bottom_right): + if top_left.z is None or bottom_right.z is None: + raise ValueError("Both Coordinates need to be 3D") + super().__init__(top_left, bottom_right) diff --git a/tools/daemon.py b/tools/daemon.py new file mode 100644 index 0000000..c75cb1b --- /dev/null +++ b/tools/daemon.py @@ -0,0 +1,154 @@ +# Shamelessly stolen from https://gist.github.com/josephernest/77fdb0012b72ebdf4c9d19d6256a1119 +# +# From "A simple unix/linux daemon in Python" by Sander Marechal +# See http://stackoverflow.com/a/473702/1422096 and +# http://web.archive.org/web/20131017130434/http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/ +# +# Modified to add quit() that allows to run some code before closing the daemon +# See http://stackoverflow.com/a/40423758/1422096 +# +# Modified for Python 3 +# (see also: http://web.archive.org/web/20131017130434/http://www.jejik.com/files/examples/daemon3x.py) +# +# Joseph Ernest, 20200507_1220 + +import atexit +import os +import sys +import time +from signal import SIGTERM, signal + +DEV_NULL = "/dev/null" + +class Daemon: + """ + A generic daemon class. + + Usage: subclass the Daemon class and override the run() method + """ + + def __init__(self, pidfile='_.pid', stdin=DEV_NULL, stdout=DEV_NULL, stderr=DEV_NULL): + self.stdin = stdin + self.stdout = stdout + self.stderr = stderr + self.pidfile = pidfile + + def daemonize(self): + """ + do the UNIX double-fork magic, see Stevens' "Advanced + Programming in the UNIX Environment" for details (ISBN 0201563177) + http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 + """ + try: + pid = os.fork() + if pid > 0: + # exit first parent + sys.exit(0) + except OSError as e: + sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) + sys.exit(1) + + # decouple from parent environment + os.setsid() + os.umask(0) + + # do second fork + try: + pid = os.fork() + if pid > 0: + # exit from second parent + sys.exit(0) + except OSError as e: + sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) + sys.exit(1) + + # redirect standard file descriptors + sys.stdout.flush() + sys.stderr.flush() + si = open(os.devnull, 'r') + so = open(os.devnull, 'a+') + se = open(os.devnull, 'a+') + os.dup2(si.fileno(), sys.stdin.fileno()) + os.dup2(so.fileno(), sys.stdout.fileno()) + os.dup2(se.fileno(), sys.stderr.fileno()) + + atexit.register(self.onstop) + signal(SIGTERM, lambda signum, stack_frame: exit()) + + # write pidfile + pid = str(os.getpid()) + open(self.pidfile, 'w+').write("%s\n" % pid) + + def onstop(self): + self.quit() + os.remove(self.pidfile) + + def start(self): + """ + Start the daemon + """ + # Check for a pidfile to see if the daemon already runs + try: + pf = open(self.pidfile, 'r') + pid = int(pf.read().strip()) + pf.close() + except IOError: + pid = None + + if pid: + message = "pidfile %s already exist. Daemon already running?\n" + sys.stderr.write(message % self.pidfile) + sys.exit(1) + + # Start the daemon + self.daemonize() + self.run() + + def stop(self): + """ + Stop the daemon + """ + # Get the pid from the pidfile + try: + pf = open(self.pidfile, 'r') + pid = int(pf.read().strip()) + pf.close() + except IOError: + pid = None + + if not pid: + message = "pidfile %s does not exist. Daemon not running?\n" + sys.stderr.write(message % self.pidfile) + return # not an error in a restart + + # Try killing the daemon process + try: + while 1: + os.kill(pid, SIGTERM) + time.sleep(0.1) + except OSError as err: + err = str(err) + if err.find("No such process") > 0: + if os.path.exists(self.pidfile): + os.remove(self.pidfile) + else: + print(str(err)) + sys.exit(1) + + def restart(self): + """ + Restart the daemon + """ + self.stop() + self.start() + + def run(self): + """ + You should override this method when you subclass Daemon. It will be called after the process has been + daemonized by start() or restart(). + """ + + def quit(self): + """ + You should override this method when you subclass Daemon. It will be called before the process is stopped. + """ diff --git a/tools/datafiles.py b/tools/datafiles.py new file mode 100644 index 0000000..ba37546 --- /dev/null +++ b/tools/datafiles.py @@ -0,0 +1,57 @@ +import json +import os +import pickle + + +class DataFile(dict): + def __init__(self, filename: str, create: bool): + super().__init__() + self.filename = filename + + try: + os.stat(self.filename) + except OSError as e: + if not create: + raise e + else: + open(self.filename, "w").close() + + self.load() + + def load(self): + raise NotImplementedError() + + def save(self): + raise NotImplementedError() + + +class JSONFile(DataFile): + def __init__(self, filename: str, create: bool): + super().__init__(filename, create) + + def load(self): + with open(self.filename, "rt") as f: + c = f.read() + + if len(c) > 0: + self.update(json.loads(c)) + + def save(self): + with open(self.filename, "wt") as f: + f.write(json.dumps(self.copy(), indent=4)) + + +class PickleFile(DataFile): + def __init__(self, filename: str, create: bool) -> None: + super().__init__(filename, create) + + def load(self) -> None: + with open(self.filename, "rb") as f: + c = f.read() + + if len(c) > 0: + self.update(pickle.loads(c)) + + def save(self) -> None: + with open(self.filename, "wb") as f: + pickle.dump(self.copy(), f) diff --git a/tools/grid.py b/tools/grid.py new file mode 100644 index 0000000..7cc94a4 --- /dev/null +++ b/tools/grid.py @@ -0,0 +1,536 @@ +from __future__ import annotations +from collections import deque +from .aoc_ocr import convert_array_6 +from .coordinate import Coordinate, DistanceAlgorithm, Shape +from .types import Numeric +from enum import Enum +from heapq import heappop, heappush +from math import inf +from typing import Any, Dict, List, Union + +OFF = False +ON = True + + +class GridTransformation(Enum): + # Rotations always take the axis to rotate around as if it were the z-axis and then rotate clockwise + # Counter-Rotations likewise, just anti-clockwise + # 3D-only OPs have a number > 10 + ROTATE_Z = 3 + ROTATE_X = 11 + ROTATE_Y = 12 + COUNTER_ROTATE_X = 14 + COUNTER_ROTATE_Y = 15 + COUNTER_ROTATE_Z = 7 + FLIP_X = 4 + FLIP_Y = 5 + FLIP_Z = 13 + + # Handy aliases + FLIP_HORIZONTALLY = 5 + FLIP_VERTICALLY = 4 + ROTATE_RIGHT = 3 + ROTATE_LEFT = 7 + + +class Grid: + def __init__(self, default=False): + self.__default = default + self.__grid = {} + self.minX = None + self.minY = None + self.maxX = None + self.maxY = None + self.minZ = None + self.maxZ = None + self.mode3D = False + + def __trackBoundaries(self, pos: Coordinate): + if self.minX is None: + self.minX, self.maxX, self.minY, self.maxY = pos.x, pos.x, pos.y, pos.y + else: + self.minX = pos.x if pos.x < self.minX else self.minX + self.minY = pos.y if pos.y < self.minY else self.minY + self.maxX = pos.x if pos.x > self.maxX else self.maxX + self.maxY = pos.y if pos.y > self.maxY else self.maxY + + if self.mode3D: + if self.minZ is None: + self.minZ = self.maxZ = pos.z + else: + self.minZ = pos.z if pos.z < self.minZ else self.minZ + self.maxZ = pos.z if pos.z > self.maxZ else self.maxZ + + def recalcBoundaries(self) -> None: + self.minX, self.maxX, self.minY, self.maxY, self.minZ, self.maxZ = None, None, None, None, None, None + for c in self.__grid: + self.__trackBoundaries(c) + + def getBoundaries(self) -> (int, int, int, int, int, int): + if self.mode3D: + return self.minX, self.minY, self.maxX, self.maxY, self.minZ, self.maxZ + else: + return self.minX, self.minY, self.maxX, self.maxY, -inf, inf + + def rangeX(self, pad: int = 0, reverse=False): + if reverse: + return range(self.maxX + pad, self.minX - pad - 1, -1) + else: + return range(self.minX - pad, self.maxX + pad + 1) + + def rangeY(self, pad: int = 0, reverse=False): + if reverse: + return range(self.maxY + pad, self.minY - pad - 1, -1) + else: + return range(self.minY - pad, self.maxY + pad + 1) + + def rangeZ(self, pad: int = 0, reverse=False): + if not self.mode3D: + raise ValueError("rangeZ not available in 2D space") + if reverse: + return range(self.maxZ + pad, self.minZ - pad - 1, -1) + else: + return range(self.minZ - pad, self.maxZ + pad + 1) + + def toggle(self, pos: Coordinate): + if pos in self.__grid: + del self.__grid[pos] + else: + self.__trackBoundaries(pos) + self.__grid[pos] = not self.__default + + def toggleGrid(self): + for x in self.rangeX(): + for y in self.rangeY(): + if not self.mode3D: + self.toggle(Coordinate(x, y)) + else: + for z in self.rangeZ(): + self.toggle(Coordinate(x, y, z)) + + def set(self, pos: Coordinate, value: Any = True) -> Any: + if pos.z is not None: + self.mode3D = True + + if (value == self.__default) and pos in self.__grid: + del self.__grid[pos] + elif value != self.__default: + self.__trackBoundaries(pos) + self.__grid[pos] = value + + return value + + def move(self, pos: Coordinate, vec: Coordinate,): + target = pos + vec + self.set(target, self.get(pos)) + if pos in self.__grid: + del self.__grid[pos] + + def add(self, pos: Coordinate, value: Numeric = 1) -> Numeric: + return self.set(pos, self.get(pos) + value) + + def sub(self, pos: Coordinate, value: Numeric = 1) -> Numeric: + return self.set(pos, self.get(pos) - value) + + def mul(self, pos: Coordinate, value: Numeric = 1) -> Numeric: + return self.set(pos, self.get(pos) * value) + + def div(self, pos: Coordinate, value: Numeric = 1) -> Numeric: + return self.set(pos, self.get(pos) / value) + + def add_shape(self, shape: Shape, value: Numeric = 1) -> None: + for x in range(shape.top_left.x, shape.bottom_right.x + 1): + for y in range(shape.top_left.y, shape.bottom_right.y + 1): + if not shape.mode_3d: + pos = Coordinate(x, y) + self.set(pos, self.get(pos) + value) + else: + for z in range(shape.top_left.z, shape.bottom_right.z + 1): + pos = Coordinate(x, y, z) + self.set(pos, self.get(pos) + value) + + def get(self, pos: Coordinate) -> Any: + return self.__grid.get(pos, self.__default) + + def getOnCount(self) -> int: + return len(self.__grid) + + def count(self, value: Any) -> int: + return list(self.__grid.values()).count(value) + + def isSet(self, pos: Coordinate) -> bool: + return pos in self.__grid + + def getCorners(self) -> List[Coordinate]: + if not self.mode3D: + return [ + Coordinate(self.minX, self.minY), + Coordinate(self.minX, self.maxY), + Coordinate(self.maxX, self.minY), + Coordinate(self.maxX, self.maxY), + ] + else: + return [ + Coordinate(self.minX, self.minY, self.minZ), + Coordinate(self.minX, self.minY, self.maxZ), + Coordinate(self.minX, self.maxY, self.minZ), + Coordinate(self.minX, self.maxY, self.maxZ), + Coordinate(self.maxX, self.minY, self.minZ), + Coordinate(self.maxX, self.minY, self.maxZ), + Coordinate(self.maxX, self.maxY, self.minZ), + Coordinate(self.maxX, self.maxY, self.maxZ), + ] + + def isCorner(self, pos: Coordinate) -> bool: + return pos in self.getCorners() + + def isWithinBoundaries(self, pos: Coordinate) -> bool: + if self.mode3D: + return self.minX <= pos.x <= self.maxX and self.minY <= pos.y <= self.maxY \ + and self.minZ <= pos.z <= self.maxZ + else: + return self.minX <= pos.x <= self.maxX and self.minY <= pos.y <= self.maxY + + def getActiveCells(self, x: int = None, y: int = None, z: int = None) -> List[Coordinate]: + if x is not None or y is not None or z is not None: + return [ + c for c in self.__grid.keys() + if (c.x == x if x is not None else True) + and (c.y == y if y is not None else True) + and (c.z == z if z is not None else True) + ] + else: + return list(self.__grid.keys()) + + def getActiveRegion(self, start: Coordinate, includeDiagonal: bool = False, ignore: List[Coordinate] = None) \ + -> List[Coordinate]: + if not self.get(start): + return [] + if ignore is None: + ignore = [] + ignore.append(start) + for c in self.getNeighboursOf(start, includeDiagonal=includeDiagonal): + if c not in ignore: + ignore = self.getActiveRegion(c, includeDiagonal, ignore) + + return ignore + + def values(self): + return self.__grid.values() + + def getSum(self, includeNegative: bool = True) -> Numeric: + if not self.mode3D: + return sum( + self.get(Coordinate(x, y)) + for x in self.rangeX() + for y in self.rangeY() + if includeNegative or self.get(Coordinate(x, y)) >= 0 + ) + else: + return sum( + self.get(Coordinate(x, y, z)) + for x in self.rangeX() + for y in self.rangeY() + for z in self.rangeZ() + if includeNegative or self.get(Coordinate(x, y)) >= 0 + ) + + def getNeighboursOf(self, pos: Coordinate, includeDefault: bool = False, includeDiagonal: bool = True) \ + -> List[Coordinate]: + neighbours = pos.getNeighbours( + includeDiagonal=includeDiagonal, + minX=self.minX, minY=self.minY, minZ=self.minZ, + maxX=self.maxX, maxY=self.maxY, maxZ=self.maxZ + ) + for x in neighbours: + if includeDefault or x in self.__grid: + yield x + + def getNeighbourSum(self, pos: Coordinate, includeNegative: bool = True, includeDiagonal: bool = True) -> Numeric: + neighbour_sum = 0 + for neighbour in self.getNeighboursOf(pos, includeDefault=includeDiagonal): + if includeNegative or self.get(neighbour) > 0: + neighbour_sum += self.get(neighbour) + + return neighbour_sum + + def flip(self, c1: Coordinate, c2: Coordinate): + buf = self.get(c1) + self.set(c1, self.get(c2)) + self.set(c2, buf) + + def transform(self, mode: GridTransformation): + if mode.value > 10 and not self.mode3D: + raise ValueError("Operation not possible in 2D space", mode) + + coords = self.__grid + self.__grid = {} + if mode == GridTransformation.ROTATE_X: + shift_z = self.maxY + for c, v in coords.items(): + self.set(Coordinate(c.x, c.z, -c.y), v) + self.shift(shift_z=shift_z) + elif mode == GridTransformation.ROTATE_Y: + shift_x = self.maxX + for c, v in coords.items(): + self.set(Coordinate(-c.z, c.y, c.x), v) + self.shift(shift_x=shift_x) + elif mode == GridTransformation.ROTATE_Z: + shift_x = self.maxX + for c, v in coords.items(): + self.set(Coordinate(-c.y, c.x, c.z), v) + self.shift(shift_x=shift_x) + elif mode == GridTransformation.COUNTER_ROTATE_X: + shift_y = self.maxY + for c, v in coords.items(): + self.set(Coordinate(c.x, -c.z, c.y), v) + self.shift(shift_y=shift_y) + elif mode == GridTransformation.COUNTER_ROTATE_Y: + shift_z = self.maxZ + for c, v in coords.items(): + self.set(Coordinate(c.z, c.y, -c.x), v) + self.shift(shift_z=shift_z) + elif mode == GridTransformation.COUNTER_ROTATE_Z: + shift_y = self.maxY + for c, v in coords.items(): + self.set(Coordinate(c.y, -c.x, c.z), v) + self.shift(shift_y=shift_y) + elif mode == GridTransformation.FLIP_X: + shift_x = self.maxX + for c, v in coords.items(): + self.set(Coordinate(-c.x, c.y, c.z), v) + self.shift(shift_x=shift_x) + elif mode == GridTransformation.FLIP_Y: + shift_y = self.maxY + for c, v in coords.items(): + self.set(Coordinate(c.x, -c.y, c.z), v) + self.shift(shift_y=shift_y) + elif mode == GridTransformation.FLIP_Z: + shift_z = self.maxZ + for c, v in coords.items(): + self.set(Coordinate(c.x, c.y, -c.z), v) + self.shift(shift_z=shift_z) + else: + raise NotImplementedError(mode) + + self.recalcBoundaries() + + def shift(self, shift_x: int = 0, shift_y: int = 0, shift_z: int = 0): + self.minX, self.minY = self.minX + shift_x, self.minY + shift_y + self.maxX, self.maxY = self.maxX + shift_x, self.maxY + shift_y + if self.mode3D: + self.minZ, self.maxZ = self.minZ + shift_z, self.maxZ + shift_z + coords = self.__grid + self.__grid = {} + for c, v in coords.items(): + if self.mode3D: + nc = Coordinate(c.x + shift_x, c.y + shift_y, c.z + shift_z) + else: + nc = Coordinate(c.x + shift_x, c.y + shift_y) + self.set(nc, v) + + def shift_zero(self, recalc: bool = True): + # self.shift() to (0, 0, 0) being top, left, front + if recalc: + self.recalcBoundaries() + if self.mode3D: + self.shift(0 - self.minX, 0 - self.minY, 0 - self.minZ) + else: + self.shift(0 - self.minX, 0 - self.minY) + + def getPath_BFS(self, pos_from: Coordinate, pos_to: Coordinate, includeDiagonal: bool, walls: List[Any] = None, + stop_at_first: Any = None) -> Union[None, List[Coordinate]]: + queue = deque() + came_from = {pos_from: None} + queue.append(pos_from) + if walls is None: + walls = [self.__default] + + while queue: + current = queue.popleft() + found_end = False + for c in self.getNeighboursOf(current, includeDiagonal=includeDiagonal, + includeDefault=self.__default not in walls): + if c in came_from and self.get(c) in walls: + continue + came_from[c] = current + if c == pos_to or (stop_at_first is not None and self.get(c) == stop_at_first): + pos_to = c + found_end = True + break + queue.append(c) + if found_end: + break + + if pos_to not in came_from: + return None + + ret = [] + while pos_to in came_from: + ret.insert(0, pos_to) + pos_to = came_from[pos_to] + + return ret + + def getPath(self, pos_from: Coordinate, pos_to: Coordinate, includeDiagonal: bool, walls: List[Any] = None, + weighted: bool = False) -> Union[None, List[Coordinate]]: + f_costs = [] + if walls is None: + walls = [self.__default] + + openNodes: Dict[Coordinate, tuple] = {} + closedNodes: Dict[Coordinate, tuple] = {} + + openNodes[pos_from] = (0, pos_from.getDistanceTo(pos_to), None) + heappush(f_costs, (0, pos_from)) + + while f_costs: + _, currentCoord = heappop(f_costs) + if currentCoord not in openNodes: + continue + currentNode = openNodes[currentCoord] + + closedNodes[currentCoord] = currentNode + del openNodes[currentCoord] + if currentCoord == pos_to: + break + + for neighbour in self.getNeighboursOf(currentCoord, includeDefault=True, includeDiagonal=includeDiagonal): + if self.get(neighbour) in walls or neighbour in closedNodes: + continue + + if weighted: + neighbourDist = self.get(neighbour) + elif not includeDiagonal: + neighbourDist = 1 + else: + neighbourDist = currentCoord.getDistanceTo(neighbour, DistanceAlgorithm.MANHATTAN, includeDiagonal) + + targetDist = neighbour.getDistanceTo(pos_to) + f_cost = targetDist + neighbourDist + currentNode[1] + + if neighbour not in openNodes or f_cost < openNodes[neighbour][0]: + openNodes[neighbour] = (f_cost, currentNode[1] + neighbourDist, currentCoord) + heappush(f_costs, (f_cost, neighbour)) + + if pos_to not in closedNodes: + return None + else: + currentNode = closedNodes[pos_to] + pathCoords = [pos_to] + while currentNode[2]: + pathCoords.append(currentNode[2]) + currentNode = closedNodes[currentNode[2]] + + return pathCoords + + def sub_grid(self, from_x: int, from_y: int, to_x: int, to_y: int, from_z: int = None, to_z: int = None) -> 'Grid': + if self.mode3D and (from_z is None or to_z is None): + raise ValueError("sub_grid() on mode3d Grids requires from_z and to_z to be set") + count_x, count_y, count_z = 0, 0, 0 + new_grid = Grid(self.__default) + for x in range(from_x, to_x + 1): + for y in range(from_y, to_y + 1): + if not self.mode3D: + new_grid.set(Coordinate(count_x, count_y), self.get(Coordinate(x, y))) + else: + for z in range(from_z, to_z + 1): + new_grid.set(Coordinate(count_x, count_y, count_z), self.get(Coordinate(x, y, z))) + count_z += 1 + + count_z = 0 + count_y += 1 + count_y = 0 + count_x += 1 + + return new_grid + + def update(self, x: int, y: int, grid: Grid) -> None: + put_x, put_y = x, y + for get_x in grid.rangeX(): + for get_y in grid.rangeY(): + self.set(Coordinate(put_x, put_y), grid.get(Coordinate(get_x, get_y))) + put_y += 1 + put_y = y + put_x += 1 + + def print(self, spacer: str = "", true_char: str = '#', false_char: str = " ", translate: dict = None, mark: list = None, z_level: int = None, bool_mode: bool = False): + if translate is None: + translate = {} + + if true_char is not None and True not in translate: + translate[True] = true_char + if false_char is not None and False not in translate: + translate[False] = false_char + + for y in range(self.minY, self.maxY + 1): + for x in range(self.minX, self.maxX + 1): + pos = Coordinate(x, y, z_level) + + if mark and pos in mark: + print("X", end="") + elif bool_mode: + print(true_char if self.get(pos) else false_char, end="") + else: + value = self.get(pos) + if isinstance(value, list): + value = len(value) + + if isinstance(value, Enum): + value = value.value + + print(value if value not in translate else translate[value], end="") + print(spacer, end="") + + print() + + def get_aoc_ocr_string(self, x_shift: int = 0, y_shift: int = 0): + return convert_array_6( + [['#' if self.get(Coordinate(x + x_shift, y + y_shift)) else '.' for x in self.rangeX()] for y in + self.rangeY()]) + + def __str__(self, true_char: str = '#', false_char: str = "."): + return "/".join( + "".join( + true_char if self.get(Coordinate(x, y)) else false_char + for x in range(self.minX, self.maxX + 1) + ) + for y in range(self.minY, self.maxY + 1) + ) + + @classmethod + def from_str(cls, grid_string: str, default: Any = False, true_char: str = '#', true_value: Any = True, translate: dict = None, mode3d: bool = False) -> 'Grid': + if translate is None: + translate = {} + if true_char is not None and True not in translate.values() and true_char not in translate: + translate[true_char] = true_value if true_value is not None else True + + ret = cls(default=default) + for y, line in enumerate(grid_string.split("/")): + for x, c in enumerate(line): + if mode3d: + coord = Coordinate(x, y, 0) + else: + coord = Coordinate(x, y) + + if c in translate: + ret.set(coord, translate[c]) + else: + ret.set(coord, c) + + return ret + + def __eq__(self, other: Grid) -> bool: + if not isinstance(other, Grid): + return False + + other_active = set(other.getActiveCells()) + for c, v in self.__grid.items(): + if other.get(c) != v: + return False + other_active.remove(c) + + if other_active: + return False + + return True diff --git a/tools/int_seq.py b/tools/int_seq.py new file mode 100644 index 0000000..aedcd07 --- /dev/null +++ b/tools/int_seq.py @@ -0,0 +1,41 @@ +import math +from .tools import cache + + +def factorial(n: int) -> int: + """ + n! = 1 * 2 * 3 * 4 * ... * n + 1, 1, 2, 6, 24, 120, 720, ... + """ + return math.factorial(n) + + +def fibonacci(n: int) -> int: + """ + F(n) = F(n-1) + F(n-2) with F(0) = 0 and F(1) = 1 + 0, 1, 1, 2, 3, 5, 8, 13, 21, ... + """ + if n < 2: + return n + + l, r = 1, 1 + for _ in range(n - 2): + l, r = l + r, l + + return l + + +def triangular(n: int) -> int: + """ + a(n) = binomial(n+1,2) = n*(n+1)/2 = 0 + 1 + 2 + ... + n + 0, 1, 3, 6, 10, 15, ... + """ + return n * (n + 1) // 2 + + +def pentagonal(n: int) -> int: + """ + A pentagonal number is a figurate number that extends the concept of triangular and square numbers to the pentagon + 0, 1, 5, 12, 22, 35, ... + """ + return ((3 * n * n) - n) // 2 diff --git a/tools/irc.py b/tools/irc.py new file mode 100644 index 0000000..1e60459 --- /dev/null +++ b/tools/irc.py @@ -0,0 +1,410 @@ +from time import sleep + +from .schedule import Scheduler +from .simplesocket import ClientSocket +from .types import StrOrNone +from datetime import timedelta +from enum import Enum +from typing import Callable, Dict, List, Union + + +class ServerMessage(str, Enum): + RPL_WELCOME = "001" + RPL_YOURHOST = "002" + RPL_CREATED = "003" + RPL_MYINFO = "004" + RPL_ISUPPORT = "005" + RPL_BOUNCE = "010" + RPL_UNIQID = "042" + RPL_TRACELINK = "200" + RPL_TRACECONNECTING = "201" + RPL_TRACEHANDSHAKE = "202" + RPL_TRACEUNKNOWN = "203" + RPL_TRACEOPERATOR = "204" + RPL_TRACEUSER = "205" + RPL_TRACESERVER = "206" + RPL_TRACENEWTYPE = "208" + RPL_TRACECLASS = "209" + RPL_TRACERECONNECT = "210" + RPL_STATSLINKINFO = "211" + RPL_STATSCOMMANDS = "212" + RPL_STATSCLINE = "213" + RPL_STATSNLINE = "214" + RPL_STATSILINE = "215" + RPL_STATSKLINE = "216" + RPL_STATSYLINE = "218" + RPL_ENDOFSTATS = "219" + RPL_UMODEIS = "221" + RPL_SERVLIST = "234" + RPL_SERVLISTEND = "235" + RPL_STATSLLINE = "241" + RPL_STATSUPTIME = "242" + RPL_STATSOLINE = "243" + RPL_STATSHLINE = "244" + RPL_LUSERCLIENT = "251" + RPL_LUSEROP = "252" + RPL_LUSERUNKNOWN = "253" + RPL_LUSERCHANNELS = "254" + RPL_LUSERME = "255" + RPL_ADMINME = "256" + RPL_ADMINLOC1 = "257" + RPL_ADMINLOC2 = "258" + RPL_ADMINEMAIL = "259" + RPL_TRACELOG = "261" + RPL_TRACEEND = "262" + RPL_TRYAGAIN = "263" + RPL_NONE = "300" + RPL_AWAY = "301" + RPL_USERHOST = "302" + RPL_ISON = "303" + RPL_UNAWAY = "305" + RPL_NOWAWAY = "306" + RPL_WHOISUSER = "311" + RPL_WHOISSERVER = "312" + RPL_WHOISOPERATOR = "313" + RPL_WHOWASUSER = "314" + RPL_ENDOFWHO = "315" + RPL_WHOISIDLE = "317" + RPL_ENDOFWHOIS = "318" + RPL_WHOISCHANNELS = "319" + RPL_LISTSTART = "321" + RPL_LIST = "322" + RPL_LISTEND = "323" + RPL_CHANNELMODEIS = "324" + RPL_UNIQOPIS = "325" + RPL_NOTOPIC = "331" + RPL_TOPIC = "332" + RPL_TOPICBY = "333" + RPL_INVITING = "341" + RPL_SUMMONING = "342" + RPL_INVITELIST = "346" + RPL_ENDOFINVITELIST = "347" + RPL_EXCEPTLIST = "348" + RPL_ENDOFEXCEPTLIST = "349" + RPL_VERSION = "351" + RPL_WHOREPLY = "352" + RPL_NAMEREPLY = "353" + RPL_LINKS = "364" + RPL_ENDOFLINKS = "365" + RPL_ENDOFNAMES = "366" + RPL_BANLIST = "367" + RPL_ENDOFBANLIST = "368" + RPL_ENDOFWHOWAS = "369" + RPL_INFO = "371" + RPL_MOTD = "372" + RPL_ENDOFINFO = "374" + RPL_MOTDSTART = "375" + RPL_ENDOFMOTD = "376" + RPL_YOUREOPER = "381" + RPL_REHASHING = "382" + RPL_YOURESERVICE = "383" + RPL_TIME = "391" + RPL_USERSTART = "392" + RPL_USERS = "393" + RPL_ENDOFUSERS = "394" + RPL_NOUSERS = "395" + ERR_NOSUCHNICK = "401" + ERR_NOSUCHSERVER = "402" + ERR_NOSUCHCHANNEL = "403" + ERR_CANNOTSENDTOCHAN = "404" + ERR_TOOMANYCHANNELS = "405" + ERR_WASNOSUCHNICK = "406" + ERR_TOOMANYTARGETS = "407" + ERR_NOSUCHSERVICE = "408" + ERR_NOORIGIN = "409" + ERR_NORECIPIENT = "411" + ERR_NOTEXTTOSEND = "412" + ERR_NOTOPLEVEL = "413" + ERR_WILDTOPLEVEL = "414" + ERR_BANMASK = "415" + ERR_UNKNOWNCOMMAND = "421" + ERR_NOMOTD = "422" + ERR_NOADMININFO = "423" + ERR_FILEERROR = "424" + ERR_NONICKNAMEGIVEN = "431" + ERR_ERRONEUSNICKNAME = "432" + ERR_NICKNAMEINUSE = "433" + ERR_NICKCOLLISION = "436" + ERR_UNAVAILRESOURCE = "437" + ERR_USERNOTINCHANNEL = "441" + ERR_NOTOONCHANNEL = "442" + ERR_USERONCHANNEL = "443" + ERR_NOLOGIN = "444" + ERR_SUMMONDISABLED = "445" + ERR_USERSDISABLED = "446" + ERR_NOTREGISTERED = "451" + ERR_NEEDMOREPARAMS = "461" + ERR_ALREADYREGISTERED = "462" + ERR_NOPERMFORHOST = "463" + ERR_PASSWDMISMATH = "464" + ERR_YOUREBANNEDCREEP = "465" + ERR_YOUWILLBEBANNED = "466" + ERR_KEYSET = "467" + ERR_CHANNELISFULL = "471" + ERR_UNKNOWNMODE = "472" + ERR_INVITEONLYCHAN = "473" + ERR_BANNEDFROMCHAN = "474" + ERR_BADCHANNELKEY = "475" + ERR_BADCHANMASK = "476" + ERR_BASCHANMODES = "477" + ERR_BANLISTFULL = "478" + ERR_NOPRIVILEGES = "481" + ERR_CHANOPRIVSNEEDED = "482" + ERR_CANTKILLSERVER = "483" + ERR_RESTRICTED = "484" + ERR_UNIQOPPRIVSNEEDED = "485" + ERR_NOOPERHOST = "491" + ERR_UMODEUNKNOWNFLAG = "501" + ERR_USERSDONTMATCH = "502" + MSG_NICK = "NICK" + MSG_TOPIC = "TOPIC" + MSG_MODE = "MODE" + MSG_PRIVMSG = "PRIVMSG" + MSG_JOIN = "JOIN" + MSG_PART = "PART" + MSG_QUIT = "QUIT" + RAW = "RAW" + + +class User: + identifier: str + nickname: str + username: str + hostname: str + + def __init__(self, identifier: str): + self.identifier = identifier + if "@" not in self.identifier: + self.nickname = self.hostname = self.identifier + else: + identifier, self.hostname = self.identifier.split("@") + if "!" in identifier: + self.nickname, self.username = identifier.split("!") + else: + self.nickname = self.username = identifier + + def nick(self, new_nick: str): + self.identifier.replace("%s!" % self.nickname, "%s!" % new_nick) + self.nickname = new_nick + + +class Channel: + name: str + topic: str + userlist: Dict[str, User] + + def __init__(self, name: str): + self.name = name + self.topic = "" + self.userlist = {} + + def join(self, user: User): + if user.identifier not in self.userlist: + self.userlist[user.identifier] = user + + def quit(self, user: User): + if user.identifier in self.userlist: + del self.userlist[user.identifier] + + +class Client: + __function_register: Dict[str, List[Callable]] + __server_socket: ClientSocket + __server_caps: Dict[str, Union[str, int]] + __userlist: Dict[str, User] + __channellist: Dict[str, Channel] + __my_user: StrOrNone + + def __init__(self, server: str, port: int, nick: str, username: str, realname: str = "Python Bot"): + self.__userlist = {} + self.__channellist = {} + self.__server_socket = ClientSocket(server, port) + self.__server_socket.sendline("USER %s ignore ignore :%s" % (username, realname)) + self.__server_socket.sendline("NICK %s" % nick) + self.__server_caps = { + 'MAXLEN': 255 + } + self.__function_register = { + ServerMessage.RPL_WELCOME: [self.on_rpl_welcome], + ServerMessage.RPL_TOPIC: [self.on_rpl_topic], + ServerMessage.RPL_ISUPPORT: [self.on_rpl_isupport], + ServerMessage.ERR_NICKNAMEINUSE: [self.on_err_nicknameinuse], + ServerMessage.MSG_JOIN: [self.on_join], + ServerMessage.MSG_PART: [self.on_part], + ServerMessage.MSG_QUIT: [self.on_quit], + ServerMessage.MSG_NICK: [self.on_nick], + ServerMessage.MSG_TOPIC: [self.on_topic], + } + self.receive() + + def receive(self): + while line := self.__server_socket.recvline(): + line = line.strip() + + if line.startswith("PING"): + self.__server_socket.sendline("PONG " + line.split()[1]) + continue + + try: + (msg_from, msg_type, msg_to, *msg) = line[1:].split() + except ValueError: + print("[E] Invalid message received:", line) + continue + + if len(msg) > 0 and msg[0].startswith(":"): + msg[0] = msg[0][1:] + message = " ".join(msg) + + if ServerMessage.RAW in self.__function_register: + for func in self.__function_register[ServerMessage.RAW]: + func(msg_from, msg_type, msg_to, message) + + if "!" in msg_from and msg_from not in self.__userlist: + self.__userlist[msg_from] = User(msg_from) + if self.__userlist[msg_from].nickname == self.__userlist[self.__my_user].nickname: + del self.__userlist[self.__my_user] + self.__my_user = msg_from + + if msg_type in self.__function_register: + for func in self.__function_register[msg_type]: + func(msg_from, msg_to, message) + + def subscribe(self, msg_type: str, func: Callable[..., None]): + if msg_type in self.__function_register: + self.__function_register[msg_type].append(func) + else: + self.__function_register[msg_type] = [func] + + def on_rpl_welcome(self, msg_from: str, msg_to: str, message: str): + self.__my_user = message.split()[-1] + self.__userlist[self.__my_user] = User(self.__my_user) + + def on_rpl_isupport(self, msg_from: str, msg_to: str, message: str): + for cap in message.split(): + if "=" not in cap: + self.__server_caps[cap] = True + else: + (a, b) = cap.split("=") + self.__server_caps[a] = b + + def on_rpl_topic(self, msg_from: str, msg_to: str, message: str): + channel, *topic = message.split() + if len(topic) > 0: + topic[0] = topic[0][1:] + + new_topic = " ".join(topic) + self.__channellist[channel].topic = new_topic + + def on_err_nicknameinuse(self, msg_from: str, msg_to: str, message: str): + if self.__my_user is None: + self.nick(message.split()[0] + "_") + + def on_nick(self, msg_from: str, msg_to: str, message: str): + self.__userlist[msg_from].nick(msg_to) + self.__userlist[self.__userlist[msg_from].identifier] = self.__userlist[msg_from] + del self.__userlist[msg_from] + + def on_join(self, msg_from: str, msg_to: str, message: str): + channel = msg_to[1:] + if msg_from == self.__my_user: + self.__channellist[channel] = Channel(channel) + # FIXME: get user list (NAMES just returns nicknames, not nick!user@host !!!) + + if msg_from not in self.__userlist: + self.__userlist[msg_from] = User(msg_from) + + self.__channellist[channel].join(self.__userlist[msg_from]) + + def on_topic(self, msg_from: str, msg_to: str, message: str): + self.__channellist[msg_to].topic = message + + def on_part(self, msg_from: str, msg_to: str, message: str): + self.__channellist[msg_to].quit(self.__userlist[msg_from]) + + def on_quit(self, msg_from: str, msg_to: str, message: str): + for c in self.__channellist: + self.__channellist[c].quit(self.__userlist[msg_from]) + + del self.__userlist[msg_from] + + def on_raw(self, msg_from: str, msg_type: str, msg_to: str, message: str): + print(msg_from, msg_type, msg_to, message) + + def nick(self, new_nick: str): + self.__server_socket.sendline("NICK %s" % new_nick) + + def join(self, channel: str): + self.__server_socket.sendline("JOIN %s" % channel) + self.receive() + + def part(self, channel: str): + self.__server_socket.sendline("PART %s" % channel) + self.receive() + + def privmsg(self, target: str, message: str): + self.__server_socket.sendline("PRIVMSG %s :%s" % (target, message)) + + def quit(self, message: str = "Elvis has left the building!"): + self.__server_socket.sendline("QUIT :%s" % message) + self.receive() + self.__server_socket.close() + + def getUser(self, user: str = None) -> Union[User, None]: + if user is None: + return self.__userlist[self.__my_user] + elif user in self.__userlist: + return self.__userlist[user] + else: + return None + + def getUserList(self) -> List[User]: + return list(self.__userlist.values()) + + def getChannel(self, channel: str) -> Union[Channel, None]: + if channel in self.__channellist: + return self.__channellist[channel] + else: + return None + + def getChannelList(self) -> List[Channel]: + return list(self.__channellist.values()) + + +class IrcBot(Client): + def __init__(self, server: str, port: int, nick: str, username: str, realname: str = "Python Bot"): + super().__init__(server, port, nick, username, realname) + self._scheduler = Scheduler() + self._channel_commands = {} + self._privmsg_commands = {} + self.subscribe(ServerMessage.MSG_PRIVMSG, self.on_privmsg) + + def on(self, *args, **kwargs): + self.subscribe(*args, **kwargs) + + def schedule(self, name: str, every: timedelta, func: Callable): + self._scheduler.schedule(name, every, func) + + def register_channel_command(self, command: str, channel: str, func: Callable): + if channel not in self._channel_commands: + self._channel_commands[channel] = {} + + self._channel_commands[channel][command] = func + + def register_privmsg_command(self, command: str, func: Callable): + self._privmsg_commands[command] = func + + def on_privmsg(self, msg_from, msg_to, message): + if not message: + return + command = message.split()[0] + if msg_to in self._channel_commands and command in self._channel_commands[msg_to]: + self._channel_commands[msg_to][command](msg_from, " ".join(message.split()[1:])) + + if msg_to == self.getUser().nickname and command in self._privmsg_commands: + self._privmsg_commands[command](msg_from, " ".join(message.split()[1:])) + + def run(self): + while True: + self._scheduler.run_pending() + self.receive() + sleep(0.01) diff --git a/tools/lists.py b/tools/lists.py new file mode 100644 index 0000000..2f44b66 --- /dev/null +++ b/tools/lists.py @@ -0,0 +1,181 @@ +from dataclasses import dataclass +from typing import Any, Union + + +@dataclass +class Node: + value: Any + next: 'Node' = None + prev: 'Node' = None + + +class LinkedList: + _head: Union[Node, None] = None + _tail: Union[Node, None] = None + size: int = 0 + + def _get_head(self): + return self._head + + def _get_tail(self): + return self._tail + + def _set_head(self, node: Node): + node.next = self._head + self._head.prev = node + self._head = node + + def _set_tail(self, node: Node): + node.prev = self._tail + self._tail.next = node + self._tail = node + + head = property(_get_head, _set_head) + tail = property(_get_tail, _set_tail) + + def _append(self, obj: Any): + node = Node(obj) + if self._head is None: + self._head = node + + if self._tail is None: + self._tail = node + else: + self._tail.next = node + node.prev = self._tail + self._tail = node + + self.size += 1 + + def _insert(self, index: int, obj: Any): + i_node = Node(obj) + node = self._get_node(index) + + i_node.prev, i_node.next = node.prev, node + node.prev = i_node + if i_node.prev is not None: + i_node.prev.next = i_node + + if index == 0: + self._head = i_node + + self.size += 1 + + def _get_node(self, index: int) -> Node: + if index >= self.size or index < -self.size: + raise IndexError("index out of bounds") + + if index < 0: + index = self.size + index + + if index <= self.size // 2: + x = 0 + node = self._head + while x < index: + x += 1 + node = node.next + else: + x = self.size - 1 + node = self._tail + while x > index: + x -= 1 + node = node.prev + + return node + + def _get(self, index: int) -> Any: + return self._get_node(index).value + + def _pop(self, index: int = None) -> Any: + if self.size == 0: + raise IndexError("pop from empty list") + + if index is None: # pop from the tail + index = -1 + + node = self._get_node(index) + if node.prev is not None: + node.prev.next = node.next + else: + self._head = node.next + if node.next is not None: + node.next.prev = node.prev + else: + self._tail = node.prev + + ret = node.value + del node + self.size -= 1 + + return ret + + def append(self, obj: Any): + self._append(obj) + + def insert(self, index: int, obj: Any): + self._insert(index, obj) + + def get(self, index: int) -> Any: + return self._get(index) + + def pop(self, index: int = None) -> Any: + return self._pop(index) + + def __contains__(self, obj: Any) -> bool: + x = self._head + while x.value != obj and x.next is not None: + x = x.next + + return x.value == obj + + def __add__(self, other: 'LinkedList') -> 'LinkedList': + self._tail.next = other.head + other.head.prev = self._tail + self._tail = other.tail + self.size += other.size + return self + + def __getitem__(self, index: int): + return self._get(index) + + def __setitem__(self, index: int, obj: Any): + self._get_node(index).value = obj + + def __len__(self): + return self.size + + def __repr__(self): + x = self._head + if x is None: + v = "" + else: + v = str(x.value) + while x.next is not None: + x = x.next + v += ", " + str(x.value) + return "%s(%s)" % (self.__class__.__name__, v) + + def __str__(self): + return self.__repr__() + + +class Stack(LinkedList): + def push(self, obj: Any): + self._append(obj) + + def peek(self) -> Any: + return self._tail.value + + +class Queue(LinkedList): + def enqueue(self, obj: Any): + self._append(obj) + + def dequeue(self) -> Any: + return self._pop(0) + + def peek(self) -> Any: + return self._head.value + + push = put = enqueue + pop = get = dequeue diff --git a/tools/math.py b/tools/math.py new file mode 100644 index 0000000..5bf6a2c --- /dev/null +++ b/tools/math.py @@ -0,0 +1,18 @@ +import math +from decimal import Decimal, ROUND_HALF_UP +from .types import Numeric + + +def round_half_up(number: Numeric) -> int: + """ pythons round() rounds .5 to the *even* number; 0.5 == 0 """ + return int(Decimal(number).to_integral(ROUND_HALF_UP)) + + +def get_factors(num: int) -> set: + f = {num} + for x in range(1, int(math.sqrt(num)) + 1): + if num % x == 0: + f.add(x) + f.add(num // x) + + return f diff --git a/tools/schedule.py b/tools/schedule.py new file mode 100644 index 0000000..762653d --- /dev/null +++ b/tools/schedule.py @@ -0,0 +1,26 @@ +import datetime +from typing import Callable, List, Any + + +class Scheduler: + def __init__(self): + self.jobs = {} + + def schedule(self, name: str, every: datetime.timedelta, func: Callable[..., None], *args: List[Any]): + self.jobs[name] = { + 'call': func, + 'args': args, + 'timedelta': every, + 'runat': (datetime.datetime.utcnow() + every) + } + + def unschedule(self, name: str): + if name in self.jobs: + del self.jobs[name] + + def run_pending(self): + now = datetime.datetime.utcnow() + for job in self.jobs.values(): + if job['runat'] <= now: + job['runat'] += job['timedelta'] + job['call'](*job['args']) diff --git a/tools/simplesocket.py b/tools/simplesocket.py new file mode 100644 index 0000000..7b07948 --- /dev/null +++ b/tools/simplesocket.py @@ -0,0 +1,108 @@ +import errno +import socket +import threading +import time +from typing import Callable, Union + + +class Socket: + def __init__(self, address_family: socket.AddressFamily, socket_kind: socket.SocketKind): + self.socket = socket.socket(family=address_family, type=socket_kind) + self.__recv_buffer = b"" + + def send(self, buffer: Union[str, bytes]) -> int: + if isinstance(buffer, str): + buffer = buffer.encode("UTF-8") + + send_bytes = 0 + while send_bytes < len(buffer): + send_bytes += self.socket.send(buffer[send_bytes:]) + + return send_bytes + + def recv(self, maxlen: int = 4096, blocking: bool = True) -> bytes: + maxlen -= len(self.__recv_buffer) + try: + self.socket.setblocking(blocking) + ret = self.__recv_buffer + self.socket.recv(maxlen) + self.__recv_buffer = b"" + return ret + except socket.error as e: + err = e.args[0] + if err == errno.EAGAIN or err == errno.EWOULDBLOCK: + return self.__recv_buffer + else: + raise + + def sendline(self, line: str): + if not line.endswith("\n"): + line += "\n" + + self.send(line) + + def recvline(self, timeout: int = 0) -> Union[str, None]: + """ + Receive exactly one text line (delimiter: newline "\n" or "\r\n") from the socket. + + :param timeout: wait at most TIMEOUT seconds for a newline to appear in the buffer + :return: Either a str containing a line received or None if no newline was found + """ + start = time.time() + while b"\n" not in self.__recv_buffer: + self.__recv_buffer += self.recv(1024, blocking=False) + if time.time() - start <= timeout: + time.sleep(0.01) # release *some* resources + else: + break + + if b"\n" not in self.__recv_buffer: + return None + else: + line = self.__recv_buffer[:self.__recv_buffer.index(b"\n")] + self.__recv_buffer = self.__recv_buffer[self.__recv_buffer.index(b"\n") + 1:] + return line.decode("UTF-8") + + def close(self): + self.socket.close() + + +class ClientSocket(Socket): + def __init__(self, addr: str, port: int, address_family: socket.AddressFamily = socket.AF_INET, + socket_kind: socket.SocketKind = socket.SOCK_STREAM): + super().__init__(address_family, socket_kind) + self.socket.connect((addr, port)) + self.laddr, self.lport = self.socket.getsockname() + self.raddr, self.rport = self.socket.getpeername() + + +class RemoteSocket(Socket): + def __init__(self, client_sock: socket.socket): + super().__init__(client_sock.family, client_sock.type) + self.socket = client_sock + self.laddr, self.lport = self.socket.getsockname() + self.raddr, self.rport = self.socket.getpeername() + + +class ServerSocket(Socket): + def __init__(self, addr: str, port: int, address_family: socket.AddressFamily = socket.AF_INET, + socket_kind: socket.SocketKind = socket.SOCK_STREAM): + super().__init__(address_family, socket_kind) + self.socket.bind((addr, port)) + self.socket.listen(5) + self.laddr, self.lport = self.socket.getsockname() + self.raddr, self.rport = None, None # Transport endpoint is not connected. Surprisingly. + + def _connection_acceptor(self, target: Callable[..., None]): + while 1: + (client_socket, client_address) = self.socket.accept() + connection_handler_thread = threading.Thread(target=target, args=(RemoteSocket(client_socket), )) + connection_handler_thread.start() + + def accept(self, target: Callable[..., None], blocking: bool = True): + if blocking: + self._connection_acceptor(target) + return None + else: + connection_accept_thread = threading.Thread(target=self._connection_acceptor, kwargs={'target': target}) + connection_accept_thread.start() + return connection_accept_thread diff --git a/tools/stopwatch.py b/tools/stopwatch.py new file mode 100644 index 0000000..2d0ed6e --- /dev/null +++ b/tools/stopwatch.py @@ -0,0 +1,40 @@ +from time import perf_counter_ns +from .tools import human_readable_time_from_ns +from .types import IntOrNone + + +class StopWatch: + started: IntOrNone = None + stopped: IntOrNone = None + + def __init__(self, auto_start=True): + if auto_start: + self.start() + + def start(self): + self.started = perf_counter_ns() + self.stopped = None + + def stop(self) -> float: + self.stopped = perf_counter_ns() + return self.elapsed() + + reset = start + + def elapsed(self) -> int: + if self.stopped is None: + return perf_counter_ns() - self.started + else: + return self.stopped - self.started + + def elapsed_string(self) -> str: + return human_readable_time_from_ns(self.elapsed()) + + def avg_elapsed(self, divider: int) -> float: + return self.elapsed() / divider + + def avg_string(self, divider: int) -> str: + return human_readable_time_from_ns(int(self.avg_elapsed(divider))) + + def __str__(self): + return self.avg_string(1) diff --git a/tools/tools.py b/tools/tools.py new file mode 100644 index 0000000..1108544 --- /dev/null +++ b/tools/tools.py @@ -0,0 +1,197 @@ +import datetime +import inspect +import os.path +import sys +from fishhook import hook +from functools import wraps +from typing import Any + + +class Cache(dict): + def __init__(self): + super().__init__() + self.hits = 0 + self.misses = 0 + + def str_hits(self) -> str: + return "%d (%1.2f)" % (self.hits, self.hits / (self.misses + self.hits) * 100) + + def __contains__(self, item: Any) -> bool: + r = super().__contains__(item) + if r: + self.hits += 1 + else: + self.misses += 1 + + return r + + +class Dict(dict): + _initialized: bool = False + + def __init__(self, *args, **kwargs): + if "default" in kwargs: + self.__default = kwargs['default'] + del kwargs['default'] + else: + self.__default = None + + super(Dict, self).__init__(*args, **kwargs) + self.convert() + self._initialized = True + + def convert(self): + for k in self: + if isinstance(self[k], dict) and not isinstance(self[k], Dict): + self[k] = Dict(self[k]) + elif isinstance(self[k], list): + for i in range(len(self[k])): + if isinstance(self[k][i], dict) and not isinstance(self[k][i], Dict): + self[k][i] = Dict(self[k][i]) + + def update(self, other: dict, **kwargs): + super(Dict, self).update(other, **kwargs) + self.convert() + + def __getattr__(self, item): + try: + return self[item] + except KeyError: + if self.__default is None: + raise AttributeError(item) + else: + return self.__default + + def __setattr__(self, key, value): + if not self._initialized: + super(Dict, self).__setattr__(key, value) + else: + self[key] = value + + def __setitem__(self, key, value): + super(Dict, self).__setitem__(key, value) + self.convert() + + +def get_script_dir(follow_symlinks: bool = True) -> str: + """return path of the executed script""" + if getattr(sys, 'frozen', False): + path = os.path.abspath(sys.executable) + else: + if '__main__' in sys.modules and hasattr(sys.modules['__main__'], '__file__'): + path = sys.modules['__main__'].__file__ + else: + path = inspect.getabsfile(get_script_dir) + + if follow_symlinks: + path = os.path.realpath(path) + + return os.path.dirname(path) + + +def compare(a: Any, b: Any) -> int: + """compare to values, return -1 if a is smaller than b, 1 if a is greater than b, 0 is both are equal""" + return bool(a > b) - bool(a < b) + + +def minmax(*arr: Any) -> (Any, Any): + """return the min and max value of an array (or arbitrary amount of arguments)""" + if len(arr) == 1: + if isinstance(arr[0], list): + arr = arr[0] + else: + return arr[0], arr[0] + + arr = set(arr) + smallest = min(arr) + biggest = max(arr) + if smallest == biggest: + arr.remove(smallest) + biggest = max(arr) + + return smallest, biggest + + +def human_readable_time_from_delta(delta: datetime.timedelta) -> str: + time_str = "" + if delta.days > 0: + time_str += "%d day%s, " % (delta.days, "s" if delta.days > 1 else "") + + if delta.seconds > 3600: + time_str += "%02d:" % (delta.seconds // 3600) + else: + time_str += "00:" + + if delta.seconds % 3600 > 60: + time_str += "%02d:" % (delta.seconds % 3600 // 60) + else: + time_str += "00:" + + return time_str + "%02d" % (delta.seconds % 60) + + +def human_readable_time_from_ns(ns: int) -> str: + units = [ + (1000, 'ns'), + (1000, 'µs'), + (1000, 'ms'), + (60, 's'), + (60, 'm'), + (60, 'h'), + (24, 'd'), + ] + + time_parts = [] + for div, unit in units: + ns, p = ns // div, ns % div + time_parts.insert(0, "%d%s" % (p, unit)) + if ns == 0: + return ", ".join(time_parts) + + +def cache(func): + saved = {} + + @wraps(func) + def newfunc(*args): + if args in saved: + return saved[args] + + result = func(*args) + saved[args] = result + return result + + return newfunc + + +@hook(list) +def intersection(self, *args) -> list: + ret = set(self).intersection(*args) + return list(ret) + + +@hook(list) +def __and__(self, *args) -> list: + return self.intersection(*args) + + +@hook(str) +def intersection(self, *args) -> str: + ret = set(self).intersection(*args) + return "".join(list(ret)) + + +@hook(str) +def __and__(self, *args) -> str: + return self.intersection(*args) + + +@hook(int) +def sum_digits(self) -> int: + s = 0 + num = self + while num > 0: + s += num % 10 + num //= 10 + + return s \ No newline at end of file diff --git a/tools/trees.py b/tools/trees.py new file mode 100644 index 0000000..a067b74 --- /dev/null +++ b/tools/trees.py @@ -0,0 +1,296 @@ +from dataclasses import dataclass, field +from enum import Enum +from tools.lists import Queue, Stack +from typing import Any, List, Union + + +class Rotate(Enum): + LEFT = 0 + RIGHT = 1 + + +@dataclass +class TreeNode: + value: Any + parent: Union['TreeNode', None] = None + left: Union['TreeNode', None] = None + right: Union['TreeNode', None] = None + balance_factor: int = 0 + height: int = 0 + + def __str__(self): + return "TreeNode:(%s; bf: %d, d: %d, p: %s, l: %s, r: %s)" \ + % (self.value, self.balance_factor, self.height, + self.parent.value if self.parent else "None", + self.left.value if self.left else "None", + self.right.value if self.right else "None") + + def __repr__(self): + return str(self) + + +class TrieNode: + value: str + parent: Union['TrieNode', None] = None + children: List['TrieNode'] = field(default_factory=list) + + +def update_node(node: TreeNode): + left_depth = node.left.height if node.left is not None else -1 + right_depth = node.right.height if node.right is not None else -1 + node.height = 1 + (left_depth if left_depth > right_depth else right_depth) + node.balance_factor = right_depth - left_depth + + +class BinaryTree: + root: Union[TreeNode, None] = None + node_count: int = 0 + + def _insert(self, node: TreeNode, parent: TreeNode, obj: Any) -> TreeNode: + new_node = TreeNode(obj, parent) + if node is None: + return new_node + + found = False + while not found: + if obj < node.value: + if node.left is not None: + node = node.left + else: + node.left = new_node + found = True + elif obj > node.value: + if node.right is not None: + node = node.right + else: + node.right = new_node + found = True + else: + raise ValueError("obj already present in tree: %s" % obj) + + new_node.parent = node + return new_node + + def _remove(self, node: TreeNode): + if node.left is None and node.right is None: # leaf node + if node.parent is not None: + if node.parent.left == node: + node.parent.left = None + else: + node.parent.right = None + else: + self.root = None + elif node.left is not None and node.right is not None: # both subtrees present + d_node = node.left + while d_node.right is not None: + d_node = d_node.right + node.value = d_node.value + self.remove(node.value, d_node) + elif node.left is None: # only a subtree on the right + if node.parent is not None: + if node.parent.left == node: + node.parent.left = node.right + else: + node.parent.right = node.right + else: + self.root = node.right + node.right.parent = node.parent + else: # only a subtree on the left + if node.parent is not None: + if node.parent.left == node: + node.parent.left = node.left + else: + node.parent.right = node.left + else: + self.root = node.left + node.left.parent = node.parent + + self.node_count -= 1 + + def _get_node_by_value(self, obj: Any, root_node: TreeNode = None) -> TreeNode: + if self.root is None: + raise IndexError("get node from empty tree") + + if root_node is None: + root_node = self.root + + node = root_node + while node is not None: + if obj < node.value: + node = node.left + elif obj > node.value: + node = node.right + else: + return node + + raise ValueError("obj not in tree:", obj) + + def add(self, obj: Any): + if obj is None: + return + + new_node = self._insert(self.root, self.root, obj) + if self.root is None: + self.root = new_node + self.node_count += 1 + + def remove(self, obj: Any, root_node: TreeNode = None): + node = self._get_node_by_value(obj, root_node) + self._remove(node) + + def iter_depth_first(self): + stack = Stack() + stack.push(self.root) + while len(stack): + node = stack.pop() + if node.right is not None: + stack.push(node.right) + if node.left is not None: + stack.push(node.left) + yield node.value + + def iter_breadth_first(self): + queue = Queue() + queue.push(self.root) + while len(queue): + node = queue.pop() + if node.left is not None: + queue.push(node.left) + if node.right is not None: + queue.push(node.right) + yield node.value + + def print(self, node: TreeNode = None, level: int = 0): + if node is None: + if level == 0 and self.root is not None: + node = self.root + else: + return + + self.print(node.right, level + 1) + print(" " * 4 * level + '->', node) + self.print(node.left, level + 1) + + def __contains__(self, obj: Any) -> bool: + if self.root is None: + return False + + c_node = self.root + while c_node is not None: + if obj == c_node.value: + return True + elif obj < c_node.value: + c_node = c_node.left + else: + c_node = c_node.right + + return False + + def __len__(self) -> int: + return self.node_count + + +class BinarySearchTree(BinaryTree): + def _balance(self, node: TreeNode) -> TreeNode: + if node.balance_factor == -2: + if node.left.balance_factor <= 0: + return self.rotate(Rotate.RIGHT, node) + else: + return self.rotate(Rotate.RIGHT, self.rotate(Rotate.LEFT, node.left)) + elif node.balance_factor == 2: + if node.right.balance_factor >= 0: + return self.rotate(Rotate.LEFT, node) + else: + return self.rotate(Rotate.LEFT, self.rotate(Rotate.RIGHT, node.right)) + else: + return node + + def _insert(self, node: TreeNode, parent: TreeNode, obj: Any) -> TreeNode: + node = super()._insert(node, parent, obj) + if self.root is not None: + while node is not None: + update_node(node) + node = self._balance(node).parent + return node + + def remove(self, obj: Any, root_node: TreeNode = None): + if root_node is None: + root_node = self.root + + super().remove(obj, root_node) + update_node(root_node) + self._balance(root_node) + + def rotate(self, direction: Rotate, node: TreeNode = None) -> TreeNode: + if node is None: + node = self.root + + parent = node.parent + if direction == Rotate.LEFT: + pivot = node.right + node.right = pivot.left + if pivot.left is not None: + pivot.left.parent = node + pivot.left = node + else: + pivot = node.left + node.left = pivot.right + if pivot.right is not None: + pivot.right.parent = node + pivot.right = node + + node.parent = pivot + pivot.parent = parent + + if parent is not None: + if parent.left == node: + parent.left = pivot + else: + parent.right = pivot + + if node == self.root: + self.root = pivot + + update_node(node) + update_node(pivot) + + return pivot + + +class Heap(BinarySearchTree): + def empty(self): + return self.root is None + + def popMin(self): + if self.root is None: + raise IndexError("pop from empty heap") + + c_node = self.root + while c_node.left is not None: + c_node = c_node.left + + ret = c_node.value + self._remove(c_node) + return ret + + def popMax(self): + if self.root is None: + raise IndexError("pop from empty heap") + + c_node = self.root + while c_node.right is not None: + c_node = c_node.right + + ret = c_node.value + self._remove(c_node) + return ret + + +class MinHeap(Heap): + def pop(self): + return self.popMin() + + +class MaxHeap(Heap): + def pop(self): + return self.popMax() diff --git a/tools/types.py b/tools/types.py new file mode 100644 index 0000000..cc123d5 --- /dev/null +++ b/tools/types.py @@ -0,0 +1,7 @@ +from typing import Union + +Numeric = Union[int, float] +StrOrNone = Union[str, None] +IntOrNone = Union[int, None] +FloatOrNone = Union[float, None] +NumericOrNone = Union[Numeric, None]