Merge remote-tracking branch 'leeloo/master'

This commit is contained in:
Stefan Harmuth 2023-09-21 19:15:52 +02:00
commit 03a5b69f88
21 changed files with 2855 additions and 0 deletions

0
__init__.py Normal file
View File

56
btree_test.py Normal file
View File

@ -0,0 +1,56 @@
from heapq import heappop, heappush
from tools.trees import MinHeap, BinarySearchTree
from tools.stopwatch import StopWatch
s = StopWatch()
h = []
for x in range(100_000):
heappush(h, x)
print("Heappush:", s.elapsed())
s.reset()
while h:
heappop(h)
print("Heappop:", s.elapsed())
s = StopWatch()
h = MinHeap()
for x in range(100_000):
h.add(x)
print("MinHeap.add():", s.elapsed())
s.reset()
while not h.empty():
h.pop()
print("MinHeap.pop():", s.elapsed())
s = StopWatch()
b = set()
for x in range(1_000_000):
b.add(x)
print("set.add():", s.elapsed())
s.reset()
for x in range(1_000_000):
_ = x in b
print("x in set:", s.elapsed())
s = StopWatch()
b = BinarySearchTree()
for x in range(1_000_000):
b.add(x)
print("AVL.add():", s.elapsed())
s.reset()
for x in range(1_000_000):
_ = x in b
print("x in AVL:", s.elapsed())
print("DFS/BFS Test")
b = BinarySearchTree()
for x in range(20):
b.add(x)
b.print()
print("DFS:")
for x in b.iter_depth_first():
print(x)
print("BFS:")
for x in b.iter_breadth_first():
print(x)

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
bs4~=0.0.1
beautifulsoup4==4.11.1
fishhook~=0.2.5
pygame~=2.4.0
requests==2.28.1
setuptools==65.6.3

12
setup.py Normal file
View File

@ -0,0 +1,12 @@
from setuptools import setup
setup(
name='py-tools',
version='0.2',
packages=['tools'],
url='',
license='GPLv3',
author='Stefan Harmuth',
author_email='pennywise@drock.de',
description='Just some small tools to make life easier'
)

0
tools/__init__.py Normal file
View File

256
tools/aoc.py Normal file
View File

@ -0,0 +1,256 @@
import os
import re
import subprocess
import requests
import time
import webbrowser
from bs4 import BeautifulSoup
from tools.datafiles import JSONFile
from tools.stopwatch import StopWatch
from typing import Any, Callable, List, Tuple, Type, Union
from .tools import get_script_dir
BASE_PATH = get_script_dir()
INPUTS_PATH = os.path.join(BASE_PATH, 'inputs')
class AOCDay:
year: int
day: int
input: List[str] # our input is always a list of str/lines
inputs: List[List[Tuple[Any, str]]]
part_func: List[Callable]
def __init__(self, year: int, day: int):
self.day = day
self.year = year
self.part_func = [self.part1, self.part2]
self._current_test_file = None
self._current_test_solution = None
def part1(self) -> Any:
raise NotImplementedError()
def part2(self) -> Any:
raise NotImplementedError()
def run_part(self, part: int, verbose: bool = False, measure_runtime: bool = False, timeit_number: int = 50):
case_count = 0
for solution, input_file in self.inputs[part]:
self._current_test_solution, self._current_test_file = solution, input_file
exec_time = None
answer = None
self._load_input(input_file)
if not measure_runtime or case_count < len(self.inputs[part]) - 1:
answer = self.part_func[part]()
else:
stopwatch = StopWatch()
for _ in range(timeit_number):
answer = self.part_func[part]()
stopwatch.stop()
exec_time = stopwatch.avg_string(timeit_number)
if solution is None:
print_solution(self.day, part + 1, answer, solution, case_count, exec_time)
if answer not in {u"", b"", None, b"None", u"None", 0, '0'}:
self._submit(part + 1, answer)
else:
if verbose or answer != solution:
print_solution(self.day, part + 1, answer, solution, case_count, exec_time)
if answer != solution:
return False
case_count += 1
if case_count == len(self.inputs[part]) and not verbose:
print_solution(self.day, part + 1, answer, exec_time=exec_time)
def run(self, parts: int = 3, verbose: bool = False, measure_runtime: bool = False, timeit_number: int = 50):
if parts & 1:
self.run_part(0, verbose, measure_runtime, timeit_number)
if parts & 2:
self.run_part(1, verbose, measure_runtime, timeit_number)
def _load_input(self, filename):
file_path = os.path.join(INPUTS_PATH, filename)
if not os.path.exists(file_path):
self._download_input(file_path)
with open(os.path.join(INPUTS_PATH, filename)) as f:
self.input = f.read().splitlines()
def _download_input(self, filename: str):
# FIXME: implement wait time for current day before 06:00:00 ?
session_id = open(".session", "r").readlines()[0].strip()
response = requests.get(
"https://adventofcode.com/%d/day/%d/input" % (self.year, self.day),
cookies={'session': session_id}
)
if not response.ok:
print("FAILED to download input: (%s) %s" % (response.status_code, response.text))
return
with open(filename, "wb") as f:
f.write(response.content)
f.flush()
if os.path.exists(".git"):
subprocess.call(["git", "add", filename])
def _submit(self, part: int, answer: Any):
answer_cache = JSONFile("answer_cache.json", create=True)
str_day = str(self.day)
str_part = str(part)
if str_day not in answer_cache:
answer_cache[str_day] = {}
if str_part not in answer_cache[str_day]:
answer_cache[str_day][str_part] = {
'wrong': [],
'correct': None
}
if answer in answer_cache[str_day][str_part]['wrong']:
print("Already tried %s. It was WRONG." % answer)
return
if answer_cache[str_day][str_part]['correct'] is not None:
if answer == answer_cache[str_day][str_part]['correct']:
print("Already submitted %s. It was CORRECT." % answer)
return
else:
print("Already submitted an answer, but another one")
print("CORRECT was: %s" % answer_cache[str_day][str_part]['correct'])
print("Your answer: %s" % answer)
return
print("Submitting %s as answer for %d part %d" % (answer, self.day, part))
session_id = open(".session", "r").readlines()[0].strip()
response = requests.post(
"https://adventofcode.com/%d/day/%d/answer" % (self.year, self.day),
cookies={'session': session_id},
data={'level': part, 'answer': answer}
)
if not response.ok:
print("Failed to submit answer: (%s) %s" % (response.status_code, response.text))
soup = BeautifulSoup(response.text, "html.parser")
message = soup.article.text
if "That's the right answer" in message:
answer_cache[str_day][str_part]['correct'] = answer
print("That's correct!")
webbrowser.open("https://adventofcode.com/%d/day/%d#part2" % (self.year, self.day))
elif "That's not the right answer" in message:
answer_cache[str_day][str_part]['wrong'].append(answer)
print("That's WRONG!")
elif "You gave an answer too recently" in message:
# WAIT and retry
wait_pattern = r"You have (?:(\d+)m )?(\d+)s left to wait"
try:
[(minutes, seconds)] = re.findall(wait_pattern, message)
except ValueError:
print("wait_pattern unable to find wait_time in:")
print(message)
return
seconds = int(seconds)
if minutes:
seconds += int(minutes) * 60
print("TOO SOON. Waiting %d seconds until auto-retry." % seconds)
time.sleep(seconds)
self._submit(part, answer)
return
else:
print("I don't know what this means:")
print(message)
return
answer_cache.save()
def getInput(self, return_type: Type = None) -> Any:
if len(self.input) == 1:
if return_type:
return return_type(self.input[0])
else:
return self.input[0]
else:
if return_type:
return [return_type(i) for i in self.input]
else:
return self.input.copy()
def getMultiLineInputAsArray(self, return_type: Type = None, join_char: str = None) -> List:
"""
get input for day x as 2d array, split by empty lines
"""
lines = self.input.copy()
lines.append('')
return_array = []
line_array = []
for line in lines:
if not line:
if join_char:
return_array.append(join_char.join(line_array))
else:
return_array.append(line_array)
line_array = []
continue
if return_type:
line_array.append(return_type(line))
else:
line_array.append(line)
return return_array
def getInputAsArraySplit(self, split_char: str = ',', return_type: Union[Type, List[Type]] = None) -> List:
"""
get input for day x with the lines split by split_char
if input has only one line, returns a 1d array with the values
if input has multiple lines, returns a 2d array (a[line][values])
"""
if len(self.input) == 1:
return split_line(line=self.input[0], split_char=split_char, return_type=return_type)
else:
return_array = []
for line in self.input:
return_array.append(split_line(line=line, split_char=split_char, return_type=return_type))
return return_array
def print_solution(day: int, part: int, solution: Any, test: Any = None, test_case: int = 0, exec_time: str = None):
if test is not None:
print(
"%s (TEST day%d/part%d/case%d): got '%s'; expected '%s'"
% ("OK" if test == solution else "FAIL", day, part, test_case, solution, test)
)
else:
print(
"Solution to day %s, part %s: %s"
% (
day,
part,
solution,
)
)
if exec_time:
print("Day %s, Part %s - Average run time: %s" % (day, part, exec_time))
def split_line(line, split_char: str = ',', return_type: Union[Type, List[Type]] = None):
if split_char:
line = line.split(split_char)
if return_type is None:
return line
elif isinstance(return_type, list):
return [return_type[x](i) if len(return_type) > x else i for x, i in enumerate(line)]
else:
return [return_type(i) for i in line]

61
tools/aoc_ocr.py Normal file
View File

@ -0,0 +1,61 @@
# Copyright (c) 2020-present Benjamin Soyka
# Original and Licence at https://github.com/bsoyka/advent-of-code-ocr
from collections.abc import Sequence
ALPHABET_6 = {
".##.\n#..#\n#..#\n####\n#..#\n#..#": "A",
"###.\n#..#\n###.\n#..#\n#..#\n###.": "B",
".##.\n#..#\n#...\n#...\n#..#\n.##.": "C",
"####\n#...\n###.\n#...\n#...\n####": "E",
"####\n#...\n###.\n#...\n#...\n#...": "F",
".##.\n#..#\n#...\n#.##\n#..#\n.###": "G",
"#..#\n#..#\n####\n#..#\n#..#\n#..#": "H",
".###\n..#.\n..#.\n..#.\n..#.\n.###": "I",
"..##\n...#\n...#\n...#\n#..#\n.##.": "J",
"#..#\n#.#.\n##..\n#.#.\n#.#.\n#..#": "K",
"#...\n#...\n#...\n#...\n#...\n####": "L",
".##.\n#..#\n#..#\n#..#\n#..#\n.##.": "O",
"###.\n#..#\n#..#\n###.\n#...\n#...": "P",
"###.\n#..#\n#..#\n###.\n#.#.\n#..#": "R",
".###\n#...\n#...\n.##.\n...#\n###.": "S",
"#..#\n#..#\n#..#\n#..#\n#..#\n.##.": "U",
"#...\n#...\n.#.#\n..#.\n..#.\n..#.": "Y",
"####\n...#\n..#.\n.#..\n#...\n####": "Z",
}
def convert_6(input_text: str, *, fill_pixel: str = "#", empty_pixel: str = ".") -> str:
"""Convert height 6 text to characters"""
input_text = input_text.replace(fill_pixel, "#").replace(empty_pixel, ".")
prepared_array = [list(line) for line in input_text.split("\n")]
return _convert_6(prepared_array)
def convert_array_6(array: Sequence[Sequence[str | int]], *, fill_pixel: str | int = "#", empty_pixel: str | int = ".") -> str:
"""Convert a height 6 NumPy array or nested list to characters"""
prepared_array = [
[
"#" if pixel == fill_pixel else "." if pixel == empty_pixel else ""
for pixel in line
]
for line in array
]
return _convert_6(prepared_array)
def _convert_6(array: list[list[str]]) -> str:
"""Convert a prepared height 6 array to characters"""
rows, cols = len(array), len(array[0])
if any(len(row) != cols for row in array):
raise ValueError("all rows should have the same number of columns")
if rows != 6:
raise ValueError("incorrect number of rows (expected 6)")
indices = [slice(start, start + 4) for start in range(0, cols, 5)]
result = [
ALPHABET_6["\n".join("".join(row[index]) for row in array)]
for index in indices
]
return "".join(result)

393
tools/coordinate.py Normal file
View File

@ -0,0 +1,393 @@
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from math import gcd, sqrt, inf, atan2, degrees
from .math import round_half_up
from typing import Union, List, Optional
class DistanceAlgorithm(Enum):
MANHATTAN = 0
EUCLIDEAN = 1
PYTHAGOREAN = 1
CHEBYSHEV = 2
CHESSBOARD = 2
@dataclass(frozen=True)
class Coordinate:
x: int
y: int
z: Optional[int] = None
def getDistanceTo(self, target: Coordinate, algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN,
includeDiagonals: bool = False) -> Union[int, float]:
"""
Get distance to target Coordinate
:param target:
:param algorithm: Calculation Algorithm (s. DistanceAlgorithm)
:param includeDiagonals: in Manhattan Mode specify if diagonal
movements are allowed (counts as 1.4 in 2D, 1.7 in 3D)
:return: Distance to Target
"""
if algorithm == DistanceAlgorithm.EUCLIDEAN:
if self.z is None:
return sqrt(abs(self.x - target.x) ** 2 + abs(self.y - target.y) ** 2)
else:
return sqrt(abs(self.x - target.x) ** 2 + abs(self.y - target.y) ** 2 + abs(self.z - target.z) ** 2)
elif algorithm == DistanceAlgorithm.CHEBYSHEV:
if self.z is None:
return max(abs(target.x - self.x), abs(target.y - self.y))
else:
return max(abs(target.x - self.x), abs(target.y - self.y), abs(target.z - self.z))
elif algorithm == DistanceAlgorithm.MANHATTAN:
if not includeDiagonals:
if self.z is None:
return abs(self.x - target.x) + abs(self.y - target.y)
else:
return abs(self.x - target.x) + abs(self.y - target.y) + abs(self.z - target.z)
else:
dist = [abs(self.x - target.x), abs(self.y - target.y)]
if self.z is None:
o_dist = max(dist) - min(dist)
return o_dist + 1.4 * min(dist)
else:
dist.append(abs(self.z - target.z))
d_steps = min(dist)
dist.remove(min(dist))
dist = [x - d_steps for x in dist]
o_dist = max(dist) - min(dist)
return 1.7 * d_steps + o_dist + 1.4 * min(dist)
def inBoundaries(self, minX: int, minY: int, maxX: int, maxY: int, minZ: int = -inf, maxZ: int = inf) -> bool:
if self.z is None:
return minX <= self.x <= maxX and minY <= self.y <= maxY
else:
return minX <= self.x <= maxX and minY <= self.y <= maxY and minZ <= self.z <= maxZ
def getCircle(self, radius: int = 1, algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN,
minX: int = -inf, minY: int = -inf, maxX: int = inf, maxY: int = inf,
minZ: int = -inf, maxZ: int = inf) -> list[Coordinate]:
ret = []
if self.z is None: # mode 2D
for x in range(self.x - radius * 2, self.x + radius * 2 + 1):
for y in range(self.y - radius * 2, self.y + radius * 2 + 1):
target = Coordinate(x, y)
if not target.inBoundaries(minX, minY, maxX, maxY):
continue
dist = round_half_up(self.getDistanceTo(target, algorithm=algorithm, includeDiagonals=False))
if dist == radius:
ret.append(target)
else:
for x in range(self.x - radius * 2, self.x + radius * 2 + 1):
for y in range(self.y - radius * 2, self.y + radius * 2 + 1):
for z in range(self.z - radius * 2, self.z + radius * 2 + 1):
target = Coordinate(x, y)
if not target.inBoundaries(minX, minY, maxX, maxY, minZ, maxZ):
continue
dist = round_half_up(self.getDistanceTo(target, algorithm=algorithm, includeDiagonals=False))
if dist == radius:
ret.append(target)
return ret
def getNeighbours(self, includeDiagonal: bool = True, minX: int = -inf, minY: int = -inf,
maxX: int = inf, maxY: int = inf, minZ: int = -inf, maxZ: int = inf) -> list[Coordinate]:
"""
Get a list of neighbouring coordinates.
:param includeDiagonal: include diagonal neighbours
:param minX: ignore all neighbours that would have an X value below this
:param minY: ignore all neighbours that would have an Y value below this
:param minZ: ignore all neighbours that would have an Z value below this
:param maxX: ignore all neighbours that would have an X value above this
:param maxY: ignore all neighbours that would have an Y value above this
:param maxZ: ignore all neighbours that would have an Z value above this
:return: list of Coordinate
"""
if self.z is None:
if includeDiagonal:
nb_list = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)]
else:
nb_list = [(-1, 0), (1, 0), (0, -1), (0, 1)]
for dx, dy in nb_list:
if minX <= self.x + dx <= maxX and minY <= self.y + dy <= maxY:
yield self.__class__(self.x + dx, self.y + dy)
else:
if includeDiagonal:
nb_list = [(x, y, z) for x in [-1, 0, 1] for y in [-1, 0, 1] for z in [-1, 0, 1]]
nb_list.remove((0, 0, 0))
else:
nb_list = [(-1, 0, 0), (0, -1, 0), (1, 0, 0), (0, 1, 0), (0, 0, 1), (0, 0, -1)]
for dx, dy, dz in nb_list:
if minX <= self.x + dx <= maxX and minY <= self.y + dy <= maxY and minZ <= self.z + dz <= maxZ:
yield self.__class__(self.x + dx, self.y + dy, self.z + dz)
def getAngleTo(self, target: Coordinate, normalized: bool = False) -> float:
"""normalized returns an angle going clockwise with 0 starting in the 'north'"""
if self.z is not None:
raise NotImplementedError() # which angle?!?!
dx = target.x - self.x
dy = target.y - self.y
if not normalized:
return degrees(atan2(dy, dx))
else:
angle = degrees(atan2(dx, dy))
if dx >= 0:
return 180.0 - angle
else:
return 180.0 + abs(angle)
def getLineTo(self, target: Coordinate) -> List[Coordinate]:
diff = target - self
if self.z is None:
steps = gcd(diff.x, diff.y)
step_x = diff.x // steps
step_y = diff.y // steps
return [self.__class__(self.x + step_x * i, self.y + step_y * i) for i in range(steps + 1)]
else:
steps = gcd(diff.x, diff.y, diff.z)
step_x = diff.x // steps
step_y = diff.y // steps
step_z = diff.z // steps
return [self.__class__(self.x + step_x * i, self.y + step_y * i, self.z + step_z * i) for i in range(steps + 1)]
def reverse(self) -> Coordinate:
if self.z is None:
return self.__class__(-self.x, -self.y)
else:
return self.__class__(-self.x, -self.y, -self.z)
def __add__(self, other: Coordinate) -> Coordinate:
if self.z is None:
return self.__class__(self.x + other.x, self.y + other.y)
else:
return self.__class__(self.x + other.x, self.y + other.y, self.z + other.z)
def __sub__(self, other: Coordinate) -> Coordinate:
if self.z is None:
return self.__class__(self.x - other.x, self.y - other.y)
else:
return self.__class__(self.x - other.x, self.y - other.y, self.z - other.z)
def __mul__(self, other: int) -> Coordinate:
if self.z is None:
return self.__class__(self.x * other, self.y * other)
else:
return self.__class__(self.x * other, self.y * other, self.z * other)
def __floordiv__(self, other) -> Coordinate:
if self.z is None:
return self.__class__(self.x // other, self.y // other)
else:
return self.__class__(self.x // other, self.y // other, self.z // other)
def __truediv__(self, other):
return self // other
def __eq__(self, other):
if not isinstance(other, Coordinate):
return False
return self.x == other.x and self.y == other.y and self.z == other.z
def __gt__(self, other):
if self.z is None:
return self.x > other.x and self.y > other.y
else:
return self.x > other.x and self.y > other.y and self.z > other.z
def __ge__(self, other):
if self.z is None:
return self.x >= other.x and self.y >= other.y
else:
return self.x >= other.x and self.y >= other.y and self.z >= other.z
def __lt__(self, other):
if self.z is None:
return self.x < other.x and self.y < other.y
else:
return self.x < other.x and self.y < other.y and self.z < other.z
def __le__(self, other):
if self.z is None:
return self.x <= other.x and self.y <= other.y
else:
return self.x <= other.x and self.y <= other.y and self.z <= other.z
def __str__(self):
if self.z is None:
return "(%d,%d)" % (self.x, self.y)
else:
return "(%d,%d,%d)" % (self.x, self.y, self.z)
def __repr__(self):
if self.z is None:
return "%s(x=%d, y=%d)" % (self.__class__.__name__, self.x, self.y)
else:
return "%s(x=%d, y=%d, z=%d)" % (self.__class__.__name__, self.x, self.y, self.z)
@classmethod
def generate(cls, from_x: int, to_x: int, from_y: int, to_y: int,
from_z: int = None, to_z: int = None) -> List[Coordinate]:
if from_z is None or to_z is None:
return [cls(x, y) for x in range(from_x, to_x + 1) for y in range(from_y, to_y + 1)]
else:
return [
cls(x, y, z)
for x in range(from_x, to_x + 1)
for y in range(from_y, to_y + 1)
for z in range(from_z, to_z + 1)
]
class HexCoordinate(Coordinate):
"""
https://www.redblobgames.com/grids/hexagons/#coordinates-cube
Treat as 3d Coordinate
+y -x +z
y x z
yxz
z x y
-z +x -y
"""
neighbour_vectors = {
'ne': Coordinate(-1, 0, 1),
'nw': Coordinate(-1, 1, 0),
'e': Coordinate(0, -1, 1),
'w': Coordinate(0, 1, -1),
'sw': Coordinate(1, 0, -1),
'se': Coordinate(1, -1, 0),
}
def __init__(self, x: int, y: int, z: int):
assert (x + y + z) == 0
super().__init__(x, y, z)
def get_length(self) -> int:
return (abs(self.x) + abs(self.y) + abs(self.z)) // 2
def getDistanceTo(self, target: Coordinate, algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN,
includeDiagonals: bool = True) -> Union[int, float]:
# includeDiagonals makes no sense in a hex grid, it's just here for signature reasons
if algorithm == DistanceAlgorithm.MANHATTAN:
return (self - target).get_length()
def getNeighbours(self, includeDiagonal: bool = True, minX: int = -inf, minY: int = -inf,
maxX: int = inf, maxY: int = inf, minZ: int = -inf, maxZ: int = inf) -> list[Coordinate]:
# includeDiagonals makes no sense in a hex grid, it's just here for signature reasons
return [
self + x for x in self.neighbour_vectors.values()
if minX <= (self + x).x <= maxX and minY <= (self + x).y <= maxY and minZ <= (self + x).z <= maxZ
]
HexCoordinateR = HexCoordinate
class HexCoordinateF(HexCoordinate):
"""
https://www.redblobgames.com/grids/hexagons/#coordinates-cube
Treat as 3d Coordinate
+y -x
y x
-z z yxz z +z
x y
+x -y
"""
neighbour_vectors = {
'ne': Coordinate(-1, 0, 1),
'nw': Coordinate(0, 1, -1),
'n': Coordinate(-1, 1, 0),
's': Coordinate(1, -1, 0),
'sw': Coordinate(1, 0, -1),
'se': Coordinate(0, -1, 1),
}
def __init__(self, x: int, y: int, z: int):
super().__init__(x, y, z)
class Shape:
def __init__(self, top_left: Coordinate, bottom_right: Coordinate):
"""
in 2D mode: top_left is the upper left corner and bottom_right the lower right
(top_left.x <= bottom_right.x and top_left.y <= bottom_right.y)
in 3D mode: same logic applied, just for 3D Coordinates
top_left is the upper left rear corner and bottom_right the lower right front
(top_left.x <= bottom_right.x and top_left.y <= bottom_right.y and top_left.z <= bottom_right.z)
"""
self.top_left = top_left
self.bottom_right = bottom_right
self.mode_3d = top_left.z is not None and bottom_right.z is not None
def __len__(self):
if not self.mode_3d:
return (self.bottom_right.x - self.top_left.x + 1) * (self.bottom_right.y - self.top_left.y + 1)
else:
return (
(self.bottom_right.x - self.top_left.x + 1)
* (self.bottom_right.y - self.top_left.y + 1)
* (self.bottom_right.z - self.top_left.z + 1)
)
def intersection(self, other: Shape) -> Union[Shape, None]:
"""
returns a Shape of the intersecting part, or None if the Shapes don't intersect
"""
if self.mode_3d != other.mode_3d:
raise ValueError("Cannot calculate intersection between 2d and 3d shape")
if not self.mode_3d:
intersect_top_left = Coordinate(
self.top_left.x if self.top_left.x > other.top_left.x else other.top_left.x,
self.top_left.y if self.top_left.y > other.top_left.y else other.top_left.y,
)
intersect_bottom_right = Coordinate(
self.bottom_right.x if self.bottom_right.x < other.bottom_right.x else other.bottom_right.x,
self.bottom_right.y if self.bottom_right.y < other.bottom_right.y else other.bottom_right.y,
)
else:
intersect_top_left = Coordinate(
self.top_left.x if self.top_left.x > other.top_left.x else other.top_left.x,
self.top_left.y if self.top_left.y > other.top_left.y else other.top_left.y,
self.top_left.z if self.top_left.z > other.top_left.z else other.top_left.z,
)
intersect_bottom_right = Coordinate(
self.bottom_right.x if self.bottom_right.x < other.bottom_right.x else other.bottom_right.x,
self.bottom_right.y if self.bottom_right.y < other.bottom_right.y else other.bottom_right.y,
self.bottom_right.z if self.bottom_right.z < other.bottom_right.z else other.bottom_right.z,
)
if intersect_top_left <= intersect_bottom_right:
return self.__class__(intersect_top_left, intersect_bottom_right)
def __and__(self, other):
return self.intersection(other)
def __rand__(self, other):
return self.intersection(other)
def __str__(self):
return "%s(%s -> %s)" % (self.__class__.__name__, self.top_left, self.bottom_right)
def __repr__(self):
return "%s(%s, %s)" % (self.__class__.__name__, self.top_left, self.bottom_right)
class Square(Shape):
def __init__(self, top_left, bottom_right):
super().__init__(top_left, bottom_right)
self.mode_3d = False
class Cube(Shape):
def __init__(self, top_left, bottom_right):
if top_left.z is None or bottom_right.z is None:
raise ValueError("Both Coordinates need to be 3D")
super().__init__(top_left, bottom_right)

154
tools/daemon.py Normal file
View File

@ -0,0 +1,154 @@
# Shamelessly stolen from https://gist.github.com/josephernest/77fdb0012b72ebdf4c9d19d6256a1119
#
# From "A simple unix/linux daemon in Python" by Sander Marechal
# See http://stackoverflow.com/a/473702/1422096 and
# http://web.archive.org/web/20131017130434/http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/
#
# Modified to add quit() that allows to run some code before closing the daemon
# See http://stackoverflow.com/a/40423758/1422096
#
# Modified for Python 3
# (see also: http://web.archive.org/web/20131017130434/http://www.jejik.com/files/examples/daemon3x.py)
#
# Joseph Ernest, 20200507_1220
import atexit
import os
import sys
import time
from signal import SIGTERM, signal
DEV_NULL = "/dev/null"
class Daemon:
"""
A generic daemon class.
Usage: subclass the Daemon class and override the run() method
"""
def __init__(self, pidfile='_.pid', stdin=DEV_NULL, stdout=DEV_NULL, stderr=DEV_NULL):
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.pidfile = pidfile
def daemonize(self):
"""
do the UNIX double-fork magic, see Stevens' "Advanced
Programming in the UNIX Environment" for details (ISBN 0201563177)
http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
"""
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError as e:
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# decouple from parent environment
os.setsid()
os.umask(0)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent
sys.exit(0)
except OSError as e:
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = open(os.devnull, 'r')
so = open(os.devnull, 'a+')
se = open(os.devnull, 'a+')
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
atexit.register(self.onstop)
signal(SIGTERM, lambda signum, stack_frame: exit())
# write pidfile
pid = str(os.getpid())
open(self.pidfile, 'w+').write("%s\n" % pid)
def onstop(self):
self.quit()
os.remove(self.pidfile)
def start(self):
"""
Start the daemon
"""
# Check for a pidfile to see if the daemon already runs
try:
pf = open(self.pidfile, 'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if pid:
message = "pidfile %s already exist. Daemon already running?\n"
sys.stderr.write(message % self.pidfile)
sys.exit(1)
# Start the daemon
self.daemonize()
self.run()
def stop(self):
"""
Stop the daemon
"""
# Get the pid from the pidfile
try:
pf = open(self.pidfile, 'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if not pid:
message = "pidfile %s does not exist. Daemon not running?\n"
sys.stderr.write(message % self.pidfile)
return # not an error in a restart
# Try killing the daemon process
try:
while 1:
os.kill(pid, SIGTERM)
time.sleep(0.1)
except OSError as err:
err = str(err)
if err.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print(str(err))
sys.exit(1)
def restart(self):
"""
Restart the daemon
"""
self.stop()
self.start()
def run(self):
"""
You should override this method when you subclass Daemon. It will be called after the process has been
daemonized by start() or restart().
"""
def quit(self):
"""
You should override this method when you subclass Daemon. It will be called before the process is stopped.
"""

57
tools/datafiles.py Normal file
View File

@ -0,0 +1,57 @@
import json
import os
import pickle
class DataFile(dict):
def __init__(self, filename: str, create: bool):
super().__init__()
self.filename = filename
try:
os.stat(self.filename)
except OSError as e:
if not create:
raise e
else:
open(self.filename, "w").close()
self.load()
def load(self):
raise NotImplementedError()
def save(self):
raise NotImplementedError()
class JSONFile(DataFile):
def __init__(self, filename: str, create: bool):
super().__init__(filename, create)
def load(self):
with open(self.filename, "rt") as f:
c = f.read()
if len(c) > 0:
self.update(json.loads(c))
def save(self):
with open(self.filename, "wt") as f:
f.write(json.dumps(self.copy(), indent=4))
class PickleFile(DataFile):
def __init__(self, filename: str, create: bool) -> None:
super().__init__(filename, create)
def load(self) -> None:
with open(self.filename, "rb") as f:
c = f.read()
if len(c) > 0:
self.update(pickle.loads(c))
def save(self) -> None:
with open(self.filename, "wb") as f:
pickle.dump(self.copy(), f)

536
tools/grid.py Normal file
View File

@ -0,0 +1,536 @@
from __future__ import annotations
from collections import deque
from .aoc_ocr import convert_array_6
from .coordinate import Coordinate, DistanceAlgorithm, Shape
from .types import Numeric
from enum import Enum
from heapq import heappop, heappush
from math import inf
from typing import Any, Dict, List, Union
OFF = False
ON = True
class GridTransformation(Enum):
# Rotations always take the axis to rotate around as if it were the z-axis and then rotate clockwise
# Counter-Rotations likewise, just anti-clockwise
# 3D-only OPs have a number > 10
ROTATE_Z = 3
ROTATE_X = 11
ROTATE_Y = 12
COUNTER_ROTATE_X = 14
COUNTER_ROTATE_Y = 15
COUNTER_ROTATE_Z = 7
FLIP_X = 4
FLIP_Y = 5
FLIP_Z = 13
# Handy aliases
FLIP_HORIZONTALLY = 5
FLIP_VERTICALLY = 4
ROTATE_RIGHT = 3
ROTATE_LEFT = 7
class Grid:
def __init__(self, default=False):
self.__default = default
self.__grid = {}
self.minX = None
self.minY = None
self.maxX = None
self.maxY = None
self.minZ = None
self.maxZ = None
self.mode3D = False
def __trackBoundaries(self, pos: Coordinate):
if self.minX is None:
self.minX, self.maxX, self.minY, self.maxY = pos.x, pos.x, pos.y, pos.y
else:
self.minX = pos.x if pos.x < self.minX else self.minX
self.minY = pos.y if pos.y < self.minY else self.minY
self.maxX = pos.x if pos.x > self.maxX else self.maxX
self.maxY = pos.y if pos.y > self.maxY else self.maxY
if self.mode3D:
if self.minZ is None:
self.minZ = self.maxZ = pos.z
else:
self.minZ = pos.z if pos.z < self.minZ else self.minZ
self.maxZ = pos.z if pos.z > self.maxZ else self.maxZ
def recalcBoundaries(self) -> None:
self.minX, self.maxX, self.minY, self.maxY, self.minZ, self.maxZ = None, None, None, None, None, None
for c in self.__grid:
self.__trackBoundaries(c)
def getBoundaries(self) -> (int, int, int, int, int, int):
if self.mode3D:
return self.minX, self.minY, self.maxX, self.maxY, self.minZ, self.maxZ
else:
return self.minX, self.minY, self.maxX, self.maxY, -inf, inf
def rangeX(self, pad: int = 0, reverse=False):
if reverse:
return range(self.maxX + pad, self.minX - pad - 1, -1)
else:
return range(self.minX - pad, self.maxX + pad + 1)
def rangeY(self, pad: int = 0, reverse=False):
if reverse:
return range(self.maxY + pad, self.minY - pad - 1, -1)
else:
return range(self.minY - pad, self.maxY + pad + 1)
def rangeZ(self, pad: int = 0, reverse=False):
if not self.mode3D:
raise ValueError("rangeZ not available in 2D space")
if reverse:
return range(self.maxZ + pad, self.minZ - pad - 1, -1)
else:
return range(self.minZ - pad, self.maxZ + pad + 1)
def toggle(self, pos: Coordinate):
if pos in self.__grid:
del self.__grid[pos]
else:
self.__trackBoundaries(pos)
self.__grid[pos] = not self.__default
def toggleGrid(self):
for x in self.rangeX():
for y in self.rangeY():
if not self.mode3D:
self.toggle(Coordinate(x, y))
else:
for z in self.rangeZ():
self.toggle(Coordinate(x, y, z))
def set(self, pos: Coordinate, value: Any = True) -> Any:
if pos.z is not None:
self.mode3D = True
if (value == self.__default) and pos in self.__grid:
del self.__grid[pos]
elif value != self.__default:
self.__trackBoundaries(pos)
self.__grid[pos] = value
return value
def move(self, pos: Coordinate, vec: Coordinate,):
target = pos + vec
self.set(target, self.get(pos))
if pos in self.__grid:
del self.__grid[pos]
def add(self, pos: Coordinate, value: Numeric = 1) -> Numeric:
return self.set(pos, self.get(pos) + value)
def sub(self, pos: Coordinate, value: Numeric = 1) -> Numeric:
return self.set(pos, self.get(pos) - value)
def mul(self, pos: Coordinate, value: Numeric = 1) -> Numeric:
return self.set(pos, self.get(pos) * value)
def div(self, pos: Coordinate, value: Numeric = 1) -> Numeric:
return self.set(pos, self.get(pos) / value)
def add_shape(self, shape: Shape, value: Numeric = 1) -> None:
for x in range(shape.top_left.x, shape.bottom_right.x + 1):
for y in range(shape.top_left.y, shape.bottom_right.y + 1):
if not shape.mode_3d:
pos = Coordinate(x, y)
self.set(pos, self.get(pos) + value)
else:
for z in range(shape.top_left.z, shape.bottom_right.z + 1):
pos = Coordinate(x, y, z)
self.set(pos, self.get(pos) + value)
def get(self, pos: Coordinate) -> Any:
return self.__grid.get(pos, self.__default)
def getOnCount(self) -> int:
return len(self.__grid)
def count(self, value: Any) -> int:
return list(self.__grid.values()).count(value)
def isSet(self, pos: Coordinate) -> bool:
return pos in self.__grid
def getCorners(self) -> List[Coordinate]:
if not self.mode3D:
return [
Coordinate(self.minX, self.minY),
Coordinate(self.minX, self.maxY),
Coordinate(self.maxX, self.minY),
Coordinate(self.maxX, self.maxY),
]
else:
return [
Coordinate(self.minX, self.minY, self.minZ),
Coordinate(self.minX, self.minY, self.maxZ),
Coordinate(self.minX, self.maxY, self.minZ),
Coordinate(self.minX, self.maxY, self.maxZ),
Coordinate(self.maxX, self.minY, self.minZ),
Coordinate(self.maxX, self.minY, self.maxZ),
Coordinate(self.maxX, self.maxY, self.minZ),
Coordinate(self.maxX, self.maxY, self.maxZ),
]
def isCorner(self, pos: Coordinate) -> bool:
return pos in self.getCorners()
def isWithinBoundaries(self, pos: Coordinate) -> bool:
if self.mode3D:
return self.minX <= pos.x <= self.maxX and self.minY <= pos.y <= self.maxY \
and self.minZ <= pos.z <= self.maxZ
else:
return self.minX <= pos.x <= self.maxX and self.minY <= pos.y <= self.maxY
def getActiveCells(self, x: int = None, y: int = None, z: int = None) -> List[Coordinate]:
if x is not None or y is not None or z is not None:
return [
c for c in self.__grid.keys()
if (c.x == x if x is not None else True)
and (c.y == y if y is not None else True)
and (c.z == z if z is not None else True)
]
else:
return list(self.__grid.keys())
def getActiveRegion(self, start: Coordinate, includeDiagonal: bool = False, ignore: List[Coordinate] = None) \
-> List[Coordinate]:
if not self.get(start):
return []
if ignore is None:
ignore = []
ignore.append(start)
for c in self.getNeighboursOf(start, includeDiagonal=includeDiagonal):
if c not in ignore:
ignore = self.getActiveRegion(c, includeDiagonal, ignore)
return ignore
def values(self):
return self.__grid.values()
def getSum(self, includeNegative: bool = True) -> Numeric:
if not self.mode3D:
return sum(
self.get(Coordinate(x, y))
for x in self.rangeX()
for y in self.rangeY()
if includeNegative or self.get(Coordinate(x, y)) >= 0
)
else:
return sum(
self.get(Coordinate(x, y, z))
for x in self.rangeX()
for y in self.rangeY()
for z in self.rangeZ()
if includeNegative or self.get(Coordinate(x, y)) >= 0
)
def getNeighboursOf(self, pos: Coordinate, includeDefault: bool = False, includeDiagonal: bool = True) \
-> List[Coordinate]:
neighbours = pos.getNeighbours(
includeDiagonal=includeDiagonal,
minX=self.minX, minY=self.minY, minZ=self.minZ,
maxX=self.maxX, maxY=self.maxY, maxZ=self.maxZ
)
for x in neighbours:
if includeDefault or x in self.__grid:
yield x
def getNeighbourSum(self, pos: Coordinate, includeNegative: bool = True, includeDiagonal: bool = True) -> Numeric:
neighbour_sum = 0
for neighbour in self.getNeighboursOf(pos, includeDefault=includeDiagonal):
if includeNegative or self.get(neighbour) > 0:
neighbour_sum += self.get(neighbour)
return neighbour_sum
def flip(self, c1: Coordinate, c2: Coordinate):
buf = self.get(c1)
self.set(c1, self.get(c2))
self.set(c2, buf)
def transform(self, mode: GridTransformation):
if mode.value > 10 and not self.mode3D:
raise ValueError("Operation not possible in 2D space", mode)
coords = self.__grid
self.__grid = {}
if mode == GridTransformation.ROTATE_X:
shift_z = self.maxY
for c, v in coords.items():
self.set(Coordinate(c.x, c.z, -c.y), v)
self.shift(shift_z=shift_z)
elif mode == GridTransformation.ROTATE_Y:
shift_x = self.maxX
for c, v in coords.items():
self.set(Coordinate(-c.z, c.y, c.x), v)
self.shift(shift_x=shift_x)
elif mode == GridTransformation.ROTATE_Z:
shift_x = self.maxX
for c, v in coords.items():
self.set(Coordinate(-c.y, c.x, c.z), v)
self.shift(shift_x=shift_x)
elif mode == GridTransformation.COUNTER_ROTATE_X:
shift_y = self.maxY
for c, v in coords.items():
self.set(Coordinate(c.x, -c.z, c.y), v)
self.shift(shift_y=shift_y)
elif mode == GridTransformation.COUNTER_ROTATE_Y:
shift_z = self.maxZ
for c, v in coords.items():
self.set(Coordinate(c.z, c.y, -c.x), v)
self.shift(shift_z=shift_z)
elif mode == GridTransformation.COUNTER_ROTATE_Z:
shift_y = self.maxY
for c, v in coords.items():
self.set(Coordinate(c.y, -c.x, c.z), v)
self.shift(shift_y=shift_y)
elif mode == GridTransformation.FLIP_X:
shift_x = self.maxX
for c, v in coords.items():
self.set(Coordinate(-c.x, c.y, c.z), v)
self.shift(shift_x=shift_x)
elif mode == GridTransformation.FLIP_Y:
shift_y = self.maxY
for c, v in coords.items():
self.set(Coordinate(c.x, -c.y, c.z), v)
self.shift(shift_y=shift_y)
elif mode == GridTransformation.FLIP_Z:
shift_z = self.maxZ
for c, v in coords.items():
self.set(Coordinate(c.x, c.y, -c.z), v)
self.shift(shift_z=shift_z)
else:
raise NotImplementedError(mode)
self.recalcBoundaries()
def shift(self, shift_x: int = 0, shift_y: int = 0, shift_z: int = 0):
self.minX, self.minY = self.minX + shift_x, self.minY + shift_y
self.maxX, self.maxY = self.maxX + shift_x, self.maxY + shift_y
if self.mode3D:
self.minZ, self.maxZ = self.minZ + shift_z, self.maxZ + shift_z
coords = self.__grid
self.__grid = {}
for c, v in coords.items():
if self.mode3D:
nc = Coordinate(c.x + shift_x, c.y + shift_y, c.z + shift_z)
else:
nc = Coordinate(c.x + shift_x, c.y + shift_y)
self.set(nc, v)
def shift_zero(self, recalc: bool = True):
# self.shift() to (0, 0, 0) being top, left, front
if recalc:
self.recalcBoundaries()
if self.mode3D:
self.shift(0 - self.minX, 0 - self.minY, 0 - self.minZ)
else:
self.shift(0 - self.minX, 0 - self.minY)
def getPath_BFS(self, pos_from: Coordinate, pos_to: Coordinate, includeDiagonal: bool, walls: List[Any] = None,
stop_at_first: Any = None) -> Union[None, List[Coordinate]]:
queue = deque()
came_from = {pos_from: None}
queue.append(pos_from)
if walls is None:
walls = [self.__default]
while queue:
current = queue.popleft()
found_end = False
for c in self.getNeighboursOf(current, includeDiagonal=includeDiagonal,
includeDefault=self.__default not in walls):
if c in came_from and self.get(c) in walls:
continue
came_from[c] = current
if c == pos_to or (stop_at_first is not None and self.get(c) == stop_at_first):
pos_to = c
found_end = True
break
queue.append(c)
if found_end:
break
if pos_to not in came_from:
return None
ret = []
while pos_to in came_from:
ret.insert(0, pos_to)
pos_to = came_from[pos_to]
return ret
def getPath(self, pos_from: Coordinate, pos_to: Coordinate, includeDiagonal: bool, walls: List[Any] = None,
weighted: bool = False) -> Union[None, List[Coordinate]]:
f_costs = []
if walls is None:
walls = [self.__default]
openNodes: Dict[Coordinate, tuple] = {}
closedNodes: Dict[Coordinate, tuple] = {}
openNodes[pos_from] = (0, pos_from.getDistanceTo(pos_to), None)
heappush(f_costs, (0, pos_from))
while f_costs:
_, currentCoord = heappop(f_costs)
if currentCoord not in openNodes:
continue
currentNode = openNodes[currentCoord]
closedNodes[currentCoord] = currentNode
del openNodes[currentCoord]
if currentCoord == pos_to:
break
for neighbour in self.getNeighboursOf(currentCoord, includeDefault=True, includeDiagonal=includeDiagonal):
if self.get(neighbour) in walls or neighbour in closedNodes:
continue
if weighted:
neighbourDist = self.get(neighbour)
elif not includeDiagonal:
neighbourDist = 1
else:
neighbourDist = currentCoord.getDistanceTo(neighbour, DistanceAlgorithm.MANHATTAN, includeDiagonal)
targetDist = neighbour.getDistanceTo(pos_to)
f_cost = targetDist + neighbourDist + currentNode[1]
if neighbour not in openNodes or f_cost < openNodes[neighbour][0]:
openNodes[neighbour] = (f_cost, currentNode[1] + neighbourDist, currentCoord)
heappush(f_costs, (f_cost, neighbour))
if pos_to not in closedNodes:
return None
else:
currentNode = closedNodes[pos_to]
pathCoords = [pos_to]
while currentNode[2]:
pathCoords.append(currentNode[2])
currentNode = closedNodes[currentNode[2]]
return pathCoords
def sub_grid(self, from_x: int, from_y: int, to_x: int, to_y: int, from_z: int = None, to_z: int = None) -> 'Grid':
if self.mode3D and (from_z is None or to_z is None):
raise ValueError("sub_grid() on mode3d Grids requires from_z and to_z to be set")
count_x, count_y, count_z = 0, 0, 0
new_grid = Grid(self.__default)
for x in range(from_x, to_x + 1):
for y in range(from_y, to_y + 1):
if not self.mode3D:
new_grid.set(Coordinate(count_x, count_y), self.get(Coordinate(x, y)))
else:
for z in range(from_z, to_z + 1):
new_grid.set(Coordinate(count_x, count_y, count_z), self.get(Coordinate(x, y, z)))
count_z += 1
count_z = 0
count_y += 1
count_y = 0
count_x += 1
return new_grid
def update(self, x: int, y: int, grid: Grid) -> None:
put_x, put_y = x, y
for get_x in grid.rangeX():
for get_y in grid.rangeY():
self.set(Coordinate(put_x, put_y), grid.get(Coordinate(get_x, get_y)))
put_y += 1
put_y = y
put_x += 1
def print(self, spacer: str = "", true_char: str = '#', false_char: str = " ", translate: dict = None, mark: list = None, z_level: int = None, bool_mode: bool = False):
if translate is None:
translate = {}
if true_char is not None and True not in translate:
translate[True] = true_char
if false_char is not None and False not in translate:
translate[False] = false_char
for y in range(self.minY, self.maxY + 1):
for x in range(self.minX, self.maxX + 1):
pos = Coordinate(x, y, z_level)
if mark and pos in mark:
print("X", end="")
elif bool_mode:
print(true_char if self.get(pos) else false_char, end="")
else:
value = self.get(pos)
if isinstance(value, list):
value = len(value)
if isinstance(value, Enum):
value = value.value
print(value if value not in translate else translate[value], end="")
print(spacer, end="")
print()
def get_aoc_ocr_string(self, x_shift: int = 0, y_shift: int = 0):
return convert_array_6(
[['#' if self.get(Coordinate(x + x_shift, y + y_shift)) else '.' for x in self.rangeX()] for y in
self.rangeY()])
def __str__(self, true_char: str = '#', false_char: str = "."):
return "/".join(
"".join(
true_char if self.get(Coordinate(x, y)) else false_char
for x in range(self.minX, self.maxX + 1)
)
for y in range(self.minY, self.maxY + 1)
)
@classmethod
def from_str(cls, grid_string: str, default: Any = False, true_char: str = '#', true_value: Any = True, translate: dict = None, mode3d: bool = False) -> 'Grid':
if translate is None:
translate = {}
if true_char is not None and True not in translate.values() and true_char not in translate:
translate[true_char] = true_value if true_value is not None else True
ret = cls(default=default)
for y, line in enumerate(grid_string.split("/")):
for x, c in enumerate(line):
if mode3d:
coord = Coordinate(x, y, 0)
else:
coord = Coordinate(x, y)
if c in translate:
ret.set(coord, translate[c])
else:
ret.set(coord, c)
return ret
def __eq__(self, other: Grid) -> bool:
if not isinstance(other, Grid):
return False
other_active = set(other.getActiveCells())
for c, v in self.__grid.items():
if other.get(c) != v:
return False
other_active.remove(c)
if other_active:
return False
return True

41
tools/int_seq.py Normal file
View File

@ -0,0 +1,41 @@
import math
from .tools import cache
def factorial(n: int) -> int:
"""
n! = 1 * 2 * 3 * 4 * ... * n
1, 1, 2, 6, 24, 120, 720, ...
"""
return math.factorial(n)
def fibonacci(n: int) -> int:
"""
F(n) = F(n-1) + F(n-2) with F(0) = 0 and F(1) = 1
0, 1, 1, 2, 3, 5, 8, 13, 21, ...
"""
if n < 2:
return n
l, r = 1, 1
for _ in range(n - 2):
l, r = l + r, l
return l
def triangular(n: int) -> int:
"""
a(n) = binomial(n+1,2) = n*(n+1)/2 = 0 + 1 + 2 + ... + n
0, 1, 3, 6, 10, 15, ...
"""
return n * (n + 1) // 2
def pentagonal(n: int) -> int:
"""
A pentagonal number is a figurate number that extends the concept of triangular and square numbers to the pentagon
0, 1, 5, 12, 22, 35, ...
"""
return ((3 * n * n) - n) // 2

410
tools/irc.py Normal file
View File

@ -0,0 +1,410 @@
from time import sleep
from .schedule import Scheduler
from .simplesocket import ClientSocket
from .types import StrOrNone
from datetime import timedelta
from enum import Enum
from typing import Callable, Dict, List, Union
class ServerMessage(str, Enum):
RPL_WELCOME = "001"
RPL_YOURHOST = "002"
RPL_CREATED = "003"
RPL_MYINFO = "004"
RPL_ISUPPORT = "005"
RPL_BOUNCE = "010"
RPL_UNIQID = "042"
RPL_TRACELINK = "200"
RPL_TRACECONNECTING = "201"
RPL_TRACEHANDSHAKE = "202"
RPL_TRACEUNKNOWN = "203"
RPL_TRACEOPERATOR = "204"
RPL_TRACEUSER = "205"
RPL_TRACESERVER = "206"
RPL_TRACENEWTYPE = "208"
RPL_TRACECLASS = "209"
RPL_TRACERECONNECT = "210"
RPL_STATSLINKINFO = "211"
RPL_STATSCOMMANDS = "212"
RPL_STATSCLINE = "213"
RPL_STATSNLINE = "214"
RPL_STATSILINE = "215"
RPL_STATSKLINE = "216"
RPL_STATSYLINE = "218"
RPL_ENDOFSTATS = "219"
RPL_UMODEIS = "221"
RPL_SERVLIST = "234"
RPL_SERVLISTEND = "235"
RPL_STATSLLINE = "241"
RPL_STATSUPTIME = "242"
RPL_STATSOLINE = "243"
RPL_STATSHLINE = "244"
RPL_LUSERCLIENT = "251"
RPL_LUSEROP = "252"
RPL_LUSERUNKNOWN = "253"
RPL_LUSERCHANNELS = "254"
RPL_LUSERME = "255"
RPL_ADMINME = "256"
RPL_ADMINLOC1 = "257"
RPL_ADMINLOC2 = "258"
RPL_ADMINEMAIL = "259"
RPL_TRACELOG = "261"
RPL_TRACEEND = "262"
RPL_TRYAGAIN = "263"
RPL_NONE = "300"
RPL_AWAY = "301"
RPL_USERHOST = "302"
RPL_ISON = "303"
RPL_UNAWAY = "305"
RPL_NOWAWAY = "306"
RPL_WHOISUSER = "311"
RPL_WHOISSERVER = "312"
RPL_WHOISOPERATOR = "313"
RPL_WHOWASUSER = "314"
RPL_ENDOFWHO = "315"
RPL_WHOISIDLE = "317"
RPL_ENDOFWHOIS = "318"
RPL_WHOISCHANNELS = "319"
RPL_LISTSTART = "321"
RPL_LIST = "322"
RPL_LISTEND = "323"
RPL_CHANNELMODEIS = "324"
RPL_UNIQOPIS = "325"
RPL_NOTOPIC = "331"
RPL_TOPIC = "332"
RPL_TOPICBY = "333"
RPL_INVITING = "341"
RPL_SUMMONING = "342"
RPL_INVITELIST = "346"
RPL_ENDOFINVITELIST = "347"
RPL_EXCEPTLIST = "348"
RPL_ENDOFEXCEPTLIST = "349"
RPL_VERSION = "351"
RPL_WHOREPLY = "352"
RPL_NAMEREPLY = "353"
RPL_LINKS = "364"
RPL_ENDOFLINKS = "365"
RPL_ENDOFNAMES = "366"
RPL_BANLIST = "367"
RPL_ENDOFBANLIST = "368"
RPL_ENDOFWHOWAS = "369"
RPL_INFO = "371"
RPL_MOTD = "372"
RPL_ENDOFINFO = "374"
RPL_MOTDSTART = "375"
RPL_ENDOFMOTD = "376"
RPL_YOUREOPER = "381"
RPL_REHASHING = "382"
RPL_YOURESERVICE = "383"
RPL_TIME = "391"
RPL_USERSTART = "392"
RPL_USERS = "393"
RPL_ENDOFUSERS = "394"
RPL_NOUSERS = "395"
ERR_NOSUCHNICK = "401"
ERR_NOSUCHSERVER = "402"
ERR_NOSUCHCHANNEL = "403"
ERR_CANNOTSENDTOCHAN = "404"
ERR_TOOMANYCHANNELS = "405"
ERR_WASNOSUCHNICK = "406"
ERR_TOOMANYTARGETS = "407"
ERR_NOSUCHSERVICE = "408"
ERR_NOORIGIN = "409"
ERR_NORECIPIENT = "411"
ERR_NOTEXTTOSEND = "412"
ERR_NOTOPLEVEL = "413"
ERR_WILDTOPLEVEL = "414"
ERR_BANMASK = "415"
ERR_UNKNOWNCOMMAND = "421"
ERR_NOMOTD = "422"
ERR_NOADMININFO = "423"
ERR_FILEERROR = "424"
ERR_NONICKNAMEGIVEN = "431"
ERR_ERRONEUSNICKNAME = "432"
ERR_NICKNAMEINUSE = "433"
ERR_NICKCOLLISION = "436"
ERR_UNAVAILRESOURCE = "437"
ERR_USERNOTINCHANNEL = "441"
ERR_NOTOONCHANNEL = "442"
ERR_USERONCHANNEL = "443"
ERR_NOLOGIN = "444"
ERR_SUMMONDISABLED = "445"
ERR_USERSDISABLED = "446"
ERR_NOTREGISTERED = "451"
ERR_NEEDMOREPARAMS = "461"
ERR_ALREADYREGISTERED = "462"
ERR_NOPERMFORHOST = "463"
ERR_PASSWDMISMATH = "464"
ERR_YOUREBANNEDCREEP = "465"
ERR_YOUWILLBEBANNED = "466"
ERR_KEYSET = "467"
ERR_CHANNELISFULL = "471"
ERR_UNKNOWNMODE = "472"
ERR_INVITEONLYCHAN = "473"
ERR_BANNEDFROMCHAN = "474"
ERR_BADCHANNELKEY = "475"
ERR_BADCHANMASK = "476"
ERR_BASCHANMODES = "477"
ERR_BANLISTFULL = "478"
ERR_NOPRIVILEGES = "481"
ERR_CHANOPRIVSNEEDED = "482"
ERR_CANTKILLSERVER = "483"
ERR_RESTRICTED = "484"
ERR_UNIQOPPRIVSNEEDED = "485"
ERR_NOOPERHOST = "491"
ERR_UMODEUNKNOWNFLAG = "501"
ERR_USERSDONTMATCH = "502"
MSG_NICK = "NICK"
MSG_TOPIC = "TOPIC"
MSG_MODE = "MODE"
MSG_PRIVMSG = "PRIVMSG"
MSG_JOIN = "JOIN"
MSG_PART = "PART"
MSG_QUIT = "QUIT"
RAW = "RAW"
class User:
identifier: str
nickname: str
username: str
hostname: str
def __init__(self, identifier: str):
self.identifier = identifier
if "@" not in self.identifier:
self.nickname = self.hostname = self.identifier
else:
identifier, self.hostname = self.identifier.split("@")
if "!" in identifier:
self.nickname, self.username = identifier.split("!")
else:
self.nickname = self.username = identifier
def nick(self, new_nick: str):
self.identifier.replace("%s!" % self.nickname, "%s!" % new_nick)
self.nickname = new_nick
class Channel:
name: str
topic: str
userlist: Dict[str, User]
def __init__(self, name: str):
self.name = name
self.topic = ""
self.userlist = {}
def join(self, user: User):
if user.identifier not in self.userlist:
self.userlist[user.identifier] = user
def quit(self, user: User):
if user.identifier in self.userlist:
del self.userlist[user.identifier]
class Client:
__function_register: Dict[str, List[Callable]]
__server_socket: ClientSocket
__server_caps: Dict[str, Union[str, int]]
__userlist: Dict[str, User]
__channellist: Dict[str, Channel]
__my_user: StrOrNone
def __init__(self, server: str, port: int, nick: str, username: str, realname: str = "Python Bot"):
self.__userlist = {}
self.__channellist = {}
self.__server_socket = ClientSocket(server, port)
self.__server_socket.sendline("USER %s ignore ignore :%s" % (username, realname))
self.__server_socket.sendline("NICK %s" % nick)
self.__server_caps = {
'MAXLEN': 255
}
self.__function_register = {
ServerMessage.RPL_WELCOME: [self.on_rpl_welcome],
ServerMessage.RPL_TOPIC: [self.on_rpl_topic],
ServerMessage.RPL_ISUPPORT: [self.on_rpl_isupport],
ServerMessage.ERR_NICKNAMEINUSE: [self.on_err_nicknameinuse],
ServerMessage.MSG_JOIN: [self.on_join],
ServerMessage.MSG_PART: [self.on_part],
ServerMessage.MSG_QUIT: [self.on_quit],
ServerMessage.MSG_NICK: [self.on_nick],
ServerMessage.MSG_TOPIC: [self.on_topic],
}
self.receive()
def receive(self):
while line := self.__server_socket.recvline():
line = line.strip()
if line.startswith("PING"):
self.__server_socket.sendline("PONG " + line.split()[1])
continue
try:
(msg_from, msg_type, msg_to, *msg) = line[1:].split()
except ValueError:
print("[E] Invalid message received:", line)
continue
if len(msg) > 0 and msg[0].startswith(":"):
msg[0] = msg[0][1:]
message = " ".join(msg)
if ServerMessage.RAW in self.__function_register:
for func in self.__function_register[ServerMessage.RAW]:
func(msg_from, msg_type, msg_to, message)
if "!" in msg_from and msg_from not in self.__userlist:
self.__userlist[msg_from] = User(msg_from)
if self.__userlist[msg_from].nickname == self.__userlist[self.__my_user].nickname:
del self.__userlist[self.__my_user]
self.__my_user = msg_from
if msg_type in self.__function_register:
for func in self.__function_register[msg_type]:
func(msg_from, msg_to, message)
def subscribe(self, msg_type: str, func: Callable[..., None]):
if msg_type in self.__function_register:
self.__function_register[msg_type].append(func)
else:
self.__function_register[msg_type] = [func]
def on_rpl_welcome(self, msg_from: str, msg_to: str, message: str):
self.__my_user = message.split()[-1]
self.__userlist[self.__my_user] = User(self.__my_user)
def on_rpl_isupport(self, msg_from: str, msg_to: str, message: str):
for cap in message.split():
if "=" not in cap:
self.__server_caps[cap] = True
else:
(a, b) = cap.split("=")
self.__server_caps[a] = b
def on_rpl_topic(self, msg_from: str, msg_to: str, message: str):
channel, *topic = message.split()
if len(topic) > 0:
topic[0] = topic[0][1:]
new_topic = " ".join(topic)
self.__channellist[channel].topic = new_topic
def on_err_nicknameinuse(self, msg_from: str, msg_to: str, message: str):
if self.__my_user is None:
self.nick(message.split()[0] + "_")
def on_nick(self, msg_from: str, msg_to: str, message: str):
self.__userlist[msg_from].nick(msg_to)
self.__userlist[self.__userlist[msg_from].identifier] = self.__userlist[msg_from]
del self.__userlist[msg_from]
def on_join(self, msg_from: str, msg_to: str, message: str):
channel = msg_to[1:]
if msg_from == self.__my_user:
self.__channellist[channel] = Channel(channel)
# FIXME: get user list (NAMES just returns nicknames, not nick!user@host !!!)
if msg_from not in self.__userlist:
self.__userlist[msg_from] = User(msg_from)
self.__channellist[channel].join(self.__userlist[msg_from])
def on_topic(self, msg_from: str, msg_to: str, message: str):
self.__channellist[msg_to].topic = message
def on_part(self, msg_from: str, msg_to: str, message: str):
self.__channellist[msg_to].quit(self.__userlist[msg_from])
def on_quit(self, msg_from: str, msg_to: str, message: str):
for c in self.__channellist:
self.__channellist[c].quit(self.__userlist[msg_from])
del self.__userlist[msg_from]
def on_raw(self, msg_from: str, msg_type: str, msg_to: str, message: str):
print(msg_from, msg_type, msg_to, message)
def nick(self, new_nick: str):
self.__server_socket.sendline("NICK %s" % new_nick)
def join(self, channel: str):
self.__server_socket.sendline("JOIN %s" % channel)
self.receive()
def part(self, channel: str):
self.__server_socket.sendline("PART %s" % channel)
self.receive()
def privmsg(self, target: str, message: str):
self.__server_socket.sendline("PRIVMSG %s :%s" % (target, message))
def quit(self, message: str = "Elvis has left the building!"):
self.__server_socket.sendline("QUIT :%s" % message)
self.receive()
self.__server_socket.close()
def getUser(self, user: str = None) -> Union[User, None]:
if user is None:
return self.__userlist[self.__my_user]
elif user in self.__userlist:
return self.__userlist[user]
else:
return None
def getUserList(self) -> List[User]:
return list(self.__userlist.values())
def getChannel(self, channel: str) -> Union[Channel, None]:
if channel in self.__channellist:
return self.__channellist[channel]
else:
return None
def getChannelList(self) -> List[Channel]:
return list(self.__channellist.values())
class IrcBot(Client):
def __init__(self, server: str, port: int, nick: str, username: str, realname: str = "Python Bot"):
super().__init__(server, port, nick, username, realname)
self._scheduler = Scheduler()
self._channel_commands = {}
self._privmsg_commands = {}
self.subscribe(ServerMessage.MSG_PRIVMSG, self.on_privmsg)
def on(self, *args, **kwargs):
self.subscribe(*args, **kwargs)
def schedule(self, name: str, every: timedelta, func: Callable):
self._scheduler.schedule(name, every, func)
def register_channel_command(self, command: str, channel: str, func: Callable):
if channel not in self._channel_commands:
self._channel_commands[channel] = {}
self._channel_commands[channel][command] = func
def register_privmsg_command(self, command: str, func: Callable):
self._privmsg_commands[command] = func
def on_privmsg(self, msg_from, msg_to, message):
if not message:
return
command = message.split()[0]
if msg_to in self._channel_commands and command in self._channel_commands[msg_to]:
self._channel_commands[msg_to][command](msg_from, " ".join(message.split()[1:]))
if msg_to == self.getUser().nickname and command in self._privmsg_commands:
self._privmsg_commands[command](msg_from, " ".join(message.split()[1:]))
def run(self):
while True:
self._scheduler.run_pending()
self.receive()
sleep(0.01)

181
tools/lists.py Normal file
View File

@ -0,0 +1,181 @@
from dataclasses import dataclass
from typing import Any, Union
@dataclass
class Node:
value: Any
next: 'Node' = None
prev: 'Node' = None
class LinkedList:
_head: Union[Node, None] = None
_tail: Union[Node, None] = None
size: int = 0
def _get_head(self):
return self._head
def _get_tail(self):
return self._tail
def _set_head(self, node: Node):
node.next = self._head
self._head.prev = node
self._head = node
def _set_tail(self, node: Node):
node.prev = self._tail
self._tail.next = node
self._tail = node
head = property(_get_head, _set_head)
tail = property(_get_tail, _set_tail)
def _append(self, obj: Any):
node = Node(obj)
if self._head is None:
self._head = node
if self._tail is None:
self._tail = node
else:
self._tail.next = node
node.prev = self._tail
self._tail = node
self.size += 1
def _insert(self, index: int, obj: Any):
i_node = Node(obj)
node = self._get_node(index)
i_node.prev, i_node.next = node.prev, node
node.prev = i_node
if i_node.prev is not None:
i_node.prev.next = i_node
if index == 0:
self._head = i_node
self.size += 1
def _get_node(self, index: int) -> Node:
if index >= self.size or index < -self.size:
raise IndexError("index out of bounds")
if index < 0:
index = self.size + index
if index <= self.size // 2:
x = 0
node = self._head
while x < index:
x += 1
node = node.next
else:
x = self.size - 1
node = self._tail
while x > index:
x -= 1
node = node.prev
return node
def _get(self, index: int) -> Any:
return self._get_node(index).value
def _pop(self, index: int = None) -> Any:
if self.size == 0:
raise IndexError("pop from empty list")
if index is None: # pop from the tail
index = -1
node = self._get_node(index)
if node.prev is not None:
node.prev.next = node.next
else:
self._head = node.next
if node.next is not None:
node.next.prev = node.prev
else:
self._tail = node.prev
ret = node.value
del node
self.size -= 1
return ret
def append(self, obj: Any):
self._append(obj)
def insert(self, index: int, obj: Any):
self._insert(index, obj)
def get(self, index: int) -> Any:
return self._get(index)
def pop(self, index: int = None) -> Any:
return self._pop(index)
def __contains__(self, obj: Any) -> bool:
x = self._head
while x.value != obj and x.next is not None:
x = x.next
return x.value == obj
def __add__(self, other: 'LinkedList') -> 'LinkedList':
self._tail.next = other.head
other.head.prev = self._tail
self._tail = other.tail
self.size += other.size
return self
def __getitem__(self, index: int):
return self._get(index)
def __setitem__(self, index: int, obj: Any):
self._get_node(index).value = obj
def __len__(self):
return self.size
def __repr__(self):
x = self._head
if x is None:
v = ""
else:
v = str(x.value)
while x.next is not None:
x = x.next
v += ", " + str(x.value)
return "%s(%s)" % (self.__class__.__name__, v)
def __str__(self):
return self.__repr__()
class Stack(LinkedList):
def push(self, obj: Any):
self._append(obj)
def peek(self) -> Any:
return self._tail.value
class Queue(LinkedList):
def enqueue(self, obj: Any):
self._append(obj)
def dequeue(self) -> Any:
return self._pop(0)
def peek(self) -> Any:
return self._head.value
push = put = enqueue
pop = get = dequeue

18
tools/math.py Normal file
View File

@ -0,0 +1,18 @@
import math
from decimal import Decimal, ROUND_HALF_UP
from .types import Numeric
def round_half_up(number: Numeric) -> int:
""" pythons round() rounds .5 to the *even* number; 0.5 == 0 """
return int(Decimal(number).to_integral(ROUND_HALF_UP))
def get_factors(num: int) -> set:
f = {num}
for x in range(1, int(math.sqrt(num)) + 1):
if num % x == 0:
f.add(x)
f.add(num // x)
return f

26
tools/schedule.py Normal file
View File

@ -0,0 +1,26 @@
import datetime
from typing import Callable, List, Any
class Scheduler:
def __init__(self):
self.jobs = {}
def schedule(self, name: str, every: datetime.timedelta, func: Callable[..., None], *args: List[Any]):
self.jobs[name] = {
'call': func,
'args': args,
'timedelta': every,
'runat': (datetime.datetime.utcnow() + every)
}
def unschedule(self, name: str):
if name in self.jobs:
del self.jobs[name]
def run_pending(self):
now = datetime.datetime.utcnow()
for job in self.jobs.values():
if job['runat'] <= now:
job['runat'] += job['timedelta']
job['call'](*job['args'])

108
tools/simplesocket.py Normal file
View File

@ -0,0 +1,108 @@
import errno
import socket
import threading
import time
from typing import Callable, Union
class Socket:
def __init__(self, address_family: socket.AddressFamily, socket_kind: socket.SocketKind):
self.socket = socket.socket(family=address_family, type=socket_kind)
self.__recv_buffer = b""
def send(self, buffer: Union[str, bytes]) -> int:
if isinstance(buffer, str):
buffer = buffer.encode("UTF-8")
send_bytes = 0
while send_bytes < len(buffer):
send_bytes += self.socket.send(buffer[send_bytes:])
return send_bytes
def recv(self, maxlen: int = 4096, blocking: bool = True) -> bytes:
maxlen -= len(self.__recv_buffer)
try:
self.socket.setblocking(blocking)
ret = self.__recv_buffer + self.socket.recv(maxlen)
self.__recv_buffer = b""
return ret
except socket.error as e:
err = e.args[0]
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
return self.__recv_buffer
else:
raise
def sendline(self, line: str):
if not line.endswith("\n"):
line += "\n"
self.send(line)
def recvline(self, timeout: int = 0) -> Union[str, None]:
"""
Receive exactly one text line (delimiter: newline "\n" or "\r\n") from the socket.
:param timeout: wait at most TIMEOUT seconds for a newline to appear in the buffer
:return: Either a str containing a line received or None if no newline was found
"""
start = time.time()
while b"\n" not in self.__recv_buffer:
self.__recv_buffer += self.recv(1024, blocking=False)
if time.time() - start <= timeout:
time.sleep(0.01) # release *some* resources
else:
break
if b"\n" not in self.__recv_buffer:
return None
else:
line = self.__recv_buffer[:self.__recv_buffer.index(b"\n")]
self.__recv_buffer = self.__recv_buffer[self.__recv_buffer.index(b"\n") + 1:]
return line.decode("UTF-8")
def close(self):
self.socket.close()
class ClientSocket(Socket):
def __init__(self, addr: str, port: int, address_family: socket.AddressFamily = socket.AF_INET,
socket_kind: socket.SocketKind = socket.SOCK_STREAM):
super().__init__(address_family, socket_kind)
self.socket.connect((addr, port))
self.laddr, self.lport = self.socket.getsockname()
self.raddr, self.rport = self.socket.getpeername()
class RemoteSocket(Socket):
def __init__(self, client_sock: socket.socket):
super().__init__(client_sock.family, client_sock.type)
self.socket = client_sock
self.laddr, self.lport = self.socket.getsockname()
self.raddr, self.rport = self.socket.getpeername()
class ServerSocket(Socket):
def __init__(self, addr: str, port: int, address_family: socket.AddressFamily = socket.AF_INET,
socket_kind: socket.SocketKind = socket.SOCK_STREAM):
super().__init__(address_family, socket_kind)
self.socket.bind((addr, port))
self.socket.listen(5)
self.laddr, self.lport = self.socket.getsockname()
self.raddr, self.rport = None, None # Transport endpoint is not connected. Surprisingly.
def _connection_acceptor(self, target: Callable[..., None]):
while 1:
(client_socket, client_address) = self.socket.accept()
connection_handler_thread = threading.Thread(target=target, args=(RemoteSocket(client_socket), ))
connection_handler_thread.start()
def accept(self, target: Callable[..., None], blocking: bool = True):
if blocking:
self._connection_acceptor(target)
return None
else:
connection_accept_thread = threading.Thread(target=self._connection_acceptor, kwargs={'target': target})
connection_accept_thread.start()
return connection_accept_thread

40
tools/stopwatch.py Normal file
View File

@ -0,0 +1,40 @@
from time import perf_counter_ns
from .tools import human_readable_time_from_ns
from .types import IntOrNone
class StopWatch:
started: IntOrNone = None
stopped: IntOrNone = None
def __init__(self, auto_start=True):
if auto_start:
self.start()
def start(self):
self.started = perf_counter_ns()
self.stopped = None
def stop(self) -> float:
self.stopped = perf_counter_ns()
return self.elapsed()
reset = start
def elapsed(self) -> int:
if self.stopped is None:
return perf_counter_ns() - self.started
else:
return self.stopped - self.started
def elapsed_string(self) -> str:
return human_readable_time_from_ns(self.elapsed())
def avg_elapsed(self, divider: int) -> float:
return self.elapsed() / divider
def avg_string(self, divider: int) -> str:
return human_readable_time_from_ns(int(self.avg_elapsed(divider)))
def __str__(self):
return self.avg_string(1)

197
tools/tools.py Normal file
View File

@ -0,0 +1,197 @@
import datetime
import inspect
import os.path
import sys
from fishhook import hook
from functools import wraps
from typing import Any
class Cache(dict):
def __init__(self):
super().__init__()
self.hits = 0
self.misses = 0
def str_hits(self) -> str:
return "%d (%1.2f)" % (self.hits, self.hits / (self.misses + self.hits) * 100)
def __contains__(self, item: Any) -> bool:
r = super().__contains__(item)
if r:
self.hits += 1
else:
self.misses += 1
return r
class Dict(dict):
_initialized: bool = False
def __init__(self, *args, **kwargs):
if "default" in kwargs:
self.__default = kwargs['default']
del kwargs['default']
else:
self.__default = None
super(Dict, self).__init__(*args, **kwargs)
self.convert()
self._initialized = True
def convert(self):
for k in self:
if isinstance(self[k], dict) and not isinstance(self[k], Dict):
self[k] = Dict(self[k])
elif isinstance(self[k], list):
for i in range(len(self[k])):
if isinstance(self[k][i], dict) and not isinstance(self[k][i], Dict):
self[k][i] = Dict(self[k][i])
def update(self, other: dict, **kwargs):
super(Dict, self).update(other, **kwargs)
self.convert()
def __getattr__(self, item):
try:
return self[item]
except KeyError:
if self.__default is None:
raise AttributeError(item)
else:
return self.__default
def __setattr__(self, key, value):
if not self._initialized:
super(Dict, self).__setattr__(key, value)
else:
self[key] = value
def __setitem__(self, key, value):
super(Dict, self).__setitem__(key, value)
self.convert()
def get_script_dir(follow_symlinks: bool = True) -> str:
"""return path of the executed script"""
if getattr(sys, 'frozen', False):
path = os.path.abspath(sys.executable)
else:
if '__main__' in sys.modules and hasattr(sys.modules['__main__'], '__file__'):
path = sys.modules['__main__'].__file__
else:
path = inspect.getabsfile(get_script_dir)
if follow_symlinks:
path = os.path.realpath(path)
return os.path.dirname(path)
def compare(a: Any, b: Any) -> int:
"""compare to values, return -1 if a is smaller than b, 1 if a is greater than b, 0 is both are equal"""
return bool(a > b) - bool(a < b)
def minmax(*arr: Any) -> (Any, Any):
"""return the min and max value of an array (or arbitrary amount of arguments)"""
if len(arr) == 1:
if isinstance(arr[0], list):
arr = arr[0]
else:
return arr[0], arr[0]
arr = set(arr)
smallest = min(arr)
biggest = max(arr)
if smallest == biggest:
arr.remove(smallest)
biggest = max(arr)
return smallest, biggest
def human_readable_time_from_delta(delta: datetime.timedelta) -> str:
time_str = ""
if delta.days > 0:
time_str += "%d day%s, " % (delta.days, "s" if delta.days > 1 else "")
if delta.seconds > 3600:
time_str += "%02d:" % (delta.seconds // 3600)
else:
time_str += "00:"
if delta.seconds % 3600 > 60:
time_str += "%02d:" % (delta.seconds % 3600 // 60)
else:
time_str += "00:"
return time_str + "%02d" % (delta.seconds % 60)
def human_readable_time_from_ns(ns: int) -> str:
units = [
(1000, 'ns'),
(1000, 'µs'),
(1000, 'ms'),
(60, 's'),
(60, 'm'),
(60, 'h'),
(24, 'd'),
]
time_parts = []
for div, unit in units:
ns, p = ns // div, ns % div
time_parts.insert(0, "%d%s" % (p, unit))
if ns == 0:
return ", ".join(time_parts)
def cache(func):
saved = {}
@wraps(func)
def newfunc(*args):
if args in saved:
return saved[args]
result = func(*args)
saved[args] = result
return result
return newfunc
@hook(list)
def intersection(self, *args) -> list:
ret = set(self).intersection(*args)
return list(ret)
@hook(list)
def __and__(self, *args) -> list:
return self.intersection(*args)
@hook(str)
def intersection(self, *args) -> str:
ret = set(self).intersection(*args)
return "".join(list(ret))
@hook(str)
def __and__(self, *args) -> str:
return self.intersection(*args)
@hook(int)
def sum_digits(self) -> int:
s = 0
num = self
while num > 0:
s += num % 10
num //= 10
return s

296
tools/trees.py Normal file
View File

@ -0,0 +1,296 @@
from dataclasses import dataclass, field
from enum import Enum
from tools.lists import Queue, Stack
from typing import Any, List, Union
class Rotate(Enum):
LEFT = 0
RIGHT = 1
@dataclass
class TreeNode:
value: Any
parent: Union['TreeNode', None] = None
left: Union['TreeNode', None] = None
right: Union['TreeNode', None] = None
balance_factor: int = 0
height: int = 0
def __str__(self):
return "TreeNode:(%s; bf: %d, d: %d, p: %s, l: %s, r: %s)" \
% (self.value, self.balance_factor, self.height,
self.parent.value if self.parent else "None",
self.left.value if self.left else "None",
self.right.value if self.right else "None")
def __repr__(self):
return str(self)
class TrieNode:
value: str
parent: Union['TrieNode', None] = None
children: List['TrieNode'] = field(default_factory=list)
def update_node(node: TreeNode):
left_depth = node.left.height if node.left is not None else -1
right_depth = node.right.height if node.right is not None else -1
node.height = 1 + (left_depth if left_depth > right_depth else right_depth)
node.balance_factor = right_depth - left_depth
class BinaryTree:
root: Union[TreeNode, None] = None
node_count: int = 0
def _insert(self, node: TreeNode, parent: TreeNode, obj: Any) -> TreeNode:
new_node = TreeNode(obj, parent)
if node is None:
return new_node
found = False
while not found:
if obj < node.value:
if node.left is not None:
node = node.left
else:
node.left = new_node
found = True
elif obj > node.value:
if node.right is not None:
node = node.right
else:
node.right = new_node
found = True
else:
raise ValueError("obj already present in tree: %s" % obj)
new_node.parent = node
return new_node
def _remove(self, node: TreeNode):
if node.left is None and node.right is None: # leaf node
if node.parent is not None:
if node.parent.left == node:
node.parent.left = None
else:
node.parent.right = None
else:
self.root = None
elif node.left is not None and node.right is not None: # both subtrees present
d_node = node.left
while d_node.right is not None:
d_node = d_node.right
node.value = d_node.value
self.remove(node.value, d_node)
elif node.left is None: # only a subtree on the right
if node.parent is not None:
if node.parent.left == node:
node.parent.left = node.right
else:
node.parent.right = node.right
else:
self.root = node.right
node.right.parent = node.parent
else: # only a subtree on the left
if node.parent is not None:
if node.parent.left == node:
node.parent.left = node.left
else:
node.parent.right = node.left
else:
self.root = node.left
node.left.parent = node.parent
self.node_count -= 1
def _get_node_by_value(self, obj: Any, root_node: TreeNode = None) -> TreeNode:
if self.root is None:
raise IndexError("get node from empty tree")
if root_node is None:
root_node = self.root
node = root_node
while node is not None:
if obj < node.value:
node = node.left
elif obj > node.value:
node = node.right
else:
return node
raise ValueError("obj not in tree:", obj)
def add(self, obj: Any):
if obj is None:
return
new_node = self._insert(self.root, self.root, obj)
if self.root is None:
self.root = new_node
self.node_count += 1
def remove(self, obj: Any, root_node: TreeNode = None):
node = self._get_node_by_value(obj, root_node)
self._remove(node)
def iter_depth_first(self):
stack = Stack()
stack.push(self.root)
while len(stack):
node = stack.pop()
if node.right is not None:
stack.push(node.right)
if node.left is not None:
stack.push(node.left)
yield node.value
def iter_breadth_first(self):
queue = Queue()
queue.push(self.root)
while len(queue):
node = queue.pop()
if node.left is not None:
queue.push(node.left)
if node.right is not None:
queue.push(node.right)
yield node.value
def print(self, node: TreeNode = None, level: int = 0):
if node is None:
if level == 0 and self.root is not None:
node = self.root
else:
return
self.print(node.right, level + 1)
print(" " * 4 * level + '->', node)
self.print(node.left, level + 1)
def __contains__(self, obj: Any) -> bool:
if self.root is None:
return False
c_node = self.root
while c_node is not None:
if obj == c_node.value:
return True
elif obj < c_node.value:
c_node = c_node.left
else:
c_node = c_node.right
return False
def __len__(self) -> int:
return self.node_count
class BinarySearchTree(BinaryTree):
def _balance(self, node: TreeNode) -> TreeNode:
if node.balance_factor == -2:
if node.left.balance_factor <= 0:
return self.rotate(Rotate.RIGHT, node)
else:
return self.rotate(Rotate.RIGHT, self.rotate(Rotate.LEFT, node.left))
elif node.balance_factor == 2:
if node.right.balance_factor >= 0:
return self.rotate(Rotate.LEFT, node)
else:
return self.rotate(Rotate.LEFT, self.rotate(Rotate.RIGHT, node.right))
else:
return node
def _insert(self, node: TreeNode, parent: TreeNode, obj: Any) -> TreeNode:
node = super()._insert(node, parent, obj)
if self.root is not None:
while node is not None:
update_node(node)
node = self._balance(node).parent
return node
def remove(self, obj: Any, root_node: TreeNode = None):
if root_node is None:
root_node = self.root
super().remove(obj, root_node)
update_node(root_node)
self._balance(root_node)
def rotate(self, direction: Rotate, node: TreeNode = None) -> TreeNode:
if node is None:
node = self.root
parent = node.parent
if direction == Rotate.LEFT:
pivot = node.right
node.right = pivot.left
if pivot.left is not None:
pivot.left.parent = node
pivot.left = node
else:
pivot = node.left
node.left = pivot.right
if pivot.right is not None:
pivot.right.parent = node
pivot.right = node
node.parent = pivot
pivot.parent = parent
if parent is not None:
if parent.left == node:
parent.left = pivot
else:
parent.right = pivot
if node == self.root:
self.root = pivot
update_node(node)
update_node(pivot)
return pivot
class Heap(BinarySearchTree):
def empty(self):
return self.root is None
def popMin(self):
if self.root is None:
raise IndexError("pop from empty heap")
c_node = self.root
while c_node.left is not None:
c_node = c_node.left
ret = c_node.value
self._remove(c_node)
return ret
def popMax(self):
if self.root is None:
raise IndexError("pop from empty heap")
c_node = self.root
while c_node.right is not None:
c_node = c_node.right
ret = c_node.value
self._remove(c_node)
return ret
class MinHeap(Heap):
def pop(self):
return self.popMin()
class MaxHeap(Heap):
def pop(self):
return self.popMax()

7
tools/types.py Normal file
View File

@ -0,0 +1,7 @@
from typing import Union
Numeric = Union[int, float]
StrOrNone = Union[str, None]
IntOrNone = Union[int, None]
FloatOrNone = Union[float, None]
NumericOrNone = Union[Numeric, None]