code reformatting and cleanup
Some checks failed
Publish to PyPI / Publish to PyPI (push) Failing after 5s

This commit is contained in:
Stefan Harmuth 2023-11-11 15:37:42 +01:00
parent bfcd27336d
commit 99c4ef2ce6
17 changed files with 589 additions and 245 deletions

View File

@ -1,8 +1,3 @@
# py-tools # shs-tools
An assortment of helper functions, primarily developed for use in [Advent Of Code](https://adventofcode.com/) An assortment of helper functions, primarily developed for use in [Advent Of Code](https://adventofcode.com/)
## Installation
`pip install shs-tools`

View File

@ -13,7 +13,7 @@ authors = [
] ]
description = "An assortment of little helper functions" description = "An assortment of little helper functions"
readme = "README.md" readme = "README.md"
requires-python = ">=3.6" requires-python = ">=3.8"
classifiers = [ classifiers = [
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Development Status :: 4 - Beta", "Development Status :: 4 - Beta",

View File

@ -1,18 +1,18 @@
from __future__ import annotations
import os import os
import re import re
import subprocess import subprocess
import requests import requests
import time import time
import webbrowser import webbrowser
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from src.tools.datafiles import JSONFile from .datafiles import JSONFile
from src.tools.stopwatch import StopWatch from .stopwatch import StopWatch
from typing import Any, Callable, List, Tuple, Type, Union from typing import Any, Callable, List, Tuple, Type
from .tools import get_script_dir from .tools import get_script_dir
BASE_PATH = get_script_dir() BASE_PATH = get_script_dir()
INPUTS_PATH = os.path.join(BASE_PATH, 'inputs') INPUTS_PATH = os.path.join(BASE_PATH, "inputs")
class AOCDay: class AOCDay:
@ -35,7 +35,13 @@ class AOCDay:
def part2(self) -> Any: def part2(self) -> Any:
raise NotImplementedError() raise NotImplementedError()
def run_part(self, part: int, verbose: bool = False, measure_runtime: bool = False, timeit_number: int = 50): def run_part(
self,
part: int,
verbose: bool = False,
measure_runtime: bool = False,
timeit_number: int = 50,
):
case_count = 0 case_count = 0
for solution, input_file in self.inputs[part]: for solution, input_file in self.inputs[part]:
self._current_test_solution, self._current_test_file = solution, input_file self._current_test_solution, self._current_test_file = solution, input_file
@ -53,12 +59,16 @@ class AOCDay:
exec_time = stopwatch.avg_string(timeit_number) exec_time = stopwatch.avg_string(timeit_number)
if solution is None: if solution is None:
print_solution(self.day, part + 1, answer, solution, case_count, exec_time) print_solution(
if answer not in {u"", b"", None, b"None", u"None", 0, '0'}: self.day, part + 1, answer, solution, case_count, exec_time
)
if answer not in {"", b"", None, b"None", "None", 0, "0"}:
self._submit(part + 1, answer) self._submit(part + 1, answer)
else: else:
if verbose or answer != solution: if verbose or answer != solution:
print_solution(self.day, part + 1, answer, solution, case_count, exec_time) print_solution(
self.day, part + 1, answer, solution, case_count, exec_time
)
if answer != solution: if answer != solution:
return False return False
@ -67,7 +77,13 @@ class AOCDay:
if case_count == len(self.inputs[part]) and not verbose: if case_count == len(self.inputs[part]) and not verbose:
print_solution(self.day, part + 1, answer, exec_time=exec_time) print_solution(self.day, part + 1, answer, exec_time=exec_time)
def run(self, parts: int = 3, verbose: bool = False, measure_runtime: bool = False, timeit_number: int = 50): def run(
self,
parts: int = 3,
verbose: bool = False,
measure_runtime: bool = False,
timeit_number: int = 50,
):
if parts & 1: if parts & 1:
self.run_part(0, verbose, measure_runtime, timeit_number) self.run_part(0, verbose, measure_runtime, timeit_number)
if parts & 2: if parts & 2:
@ -86,10 +102,13 @@ class AOCDay:
session_id = open(".session", "r").readlines()[0].strip() session_id = open(".session", "r").readlines()[0].strip()
response = requests.get( response = requests.get(
"https://adventofcode.com/%d/day/%d/input" % (self.year, self.day), "https://adventofcode.com/%d/day/%d/input" % (self.year, self.day),
cookies={'session': session_id} cookies={"session": session_id},
) )
if not response.ok: if not response.ok:
print("FAILED to download input: (%s) %s" % (response.status_code, response.text)) print(
"FAILED to download input: (%s) %s"
% (response.status_code, response.text)
)
return return
with open(filename, "wb") as f: with open(filename, "wb") as f:
@ -107,22 +126,19 @@ class AOCDay:
answer_cache[str_day] = {} answer_cache[str_day] = {}
if str_part not in answer_cache[str_day]: if str_part not in answer_cache[str_day]:
answer_cache[str_day][str_part] = { answer_cache[str_day][str_part] = {"wrong": [], "correct": None}
'wrong': [],
'correct': None
}
if answer in answer_cache[str_day][str_part]['wrong']: if answer in answer_cache[str_day][str_part]["wrong"]:
print("Already tried %s. It was WRONG." % answer) print("Already tried %s. It was WRONG." % answer)
return return
if answer_cache[str_day][str_part]['correct'] is not None: if answer_cache[str_day][str_part]["correct"] is not None:
if answer == answer_cache[str_day][str_part]['correct']: if answer == answer_cache[str_day][str_part]["correct"]:
print("Already submitted %s. It was CORRECT." % answer) print("Already submitted %s. It was CORRECT." % answer)
return return
else: else:
print("Already submitted an answer, but another one") print("Already submitted an answer, but another one")
print("CORRECT was: %s" % answer_cache[str_day][str_part]['correct']) print("CORRECT was: %s" % answer_cache[str_day][str_part]["correct"])
print("Your answer: %s" % answer) print("Your answer: %s" % answer)
return return
@ -130,21 +146,26 @@ class AOCDay:
session_id = open(".session", "r").readlines()[0].strip() session_id = open(".session", "r").readlines()[0].strip()
response = requests.post( response = requests.post(
"https://adventofcode.com/%d/day/%d/answer" % (self.year, self.day), "https://adventofcode.com/%d/day/%d/answer" % (self.year, self.day),
cookies={'session': session_id}, cookies={"session": session_id},
data={'level': part, 'answer': answer} data={"level": part, "answer": answer},
) )
if not response.ok: if not response.ok:
print("Failed to submit answer: (%s) %s" % (response.status_code, response.text)) print(
"Failed to submit answer: (%s) %s"
% (response.status_code, response.text)
)
soup = BeautifulSoup(response.text, "html.parser") soup = BeautifulSoup(response.text, "html.parser")
message = soup.article.text message = soup.article.text
if "That's the right answer" in message: if "That's the right answer" in message:
answer_cache[str_day][str_part]['correct'] = answer answer_cache[str_day][str_part]["correct"] = answer
print("That's correct!") print("That's correct!")
webbrowser.open("https://adventofcode.com/%d/day/%d#part2" % (self.year, self.day)) webbrowser.open(
"https://adventofcode.com/%d/day/%d#part2" % (self.year, self.day)
)
elif "That's not the right answer" in message: elif "That's not the right answer" in message:
answer_cache[str_day][str_part]['wrong'].append(answer) answer_cache[str_day][str_part]["wrong"].append(answer)
print("That's WRONG!") print("That's WRONG!")
elif "You gave an answer too recently" in message: elif "You gave an answer too recently" in message:
# WAIT and retry # WAIT and retry
@ -183,12 +204,14 @@ class AOCDay:
else: else:
return self.input.copy() return self.input.copy()
def getMultiLineInputAsArray(self, return_type: Type = None, join_char: str = None) -> List: def getMultiLineInputAsArray(
self, return_type: Type = None, join_char: str = None
) -> List:
""" """
get input for day x as 2d array, split by empty lines get input for day x as 2d array, split by empty lines
""" """
lines = self.input.copy() lines = self.input.copy()
lines.append('') lines.append("")
return_array = [] return_array = []
line_array = [] line_array = []
@ -208,27 +231,49 @@ class AOCDay:
return return_array return return_array
def getInputAsArraySplit(self, split_char: str = ',', return_type: Union[Type, List[Type]] = None) -> List: def getInputAsArraySplit(
self, split_char: str = ",", return_type: Type | List[Type] = None
) -> List:
""" """
get input for day x with the lines split by split_char get input for day x with the lines split by split_char
if input has only one line, returns a 1d array with the values if input has only one line, returns a 1d array with the values
if input has multiple lines, returns a 2d array (a[line][values]) if input has multiple lines, returns a 2d array (a[line][values])
""" """
if len(self.input) == 1: if len(self.input) == 1:
return split_line(line=self.input[0], split_char=split_char, return_type=return_type) return split_line(
line=self.input[0], split_char=split_char, return_type=return_type
)
else: else:
return_array = [] return_array = []
for line in self.input: for line in self.input:
return_array.append(split_line(line=line, split_char=split_char, return_type=return_type)) return_array.append(
split_line(
line=line, split_char=split_char, return_type=return_type
)
)
return return_array return return_array
def print_solution(day: int, part: int, solution: Any, test: Any = None, test_case: int = 0, exec_time: str = None): def print_solution(
day: int,
part: int,
solution: Any,
test: Any = None,
test_case: int = 0,
exec_time: str = None,
):
if test is not None: if test is not None:
print( print(
"%s (TEST day%d/part%d/case%d): got '%s'; expected '%s'" "%s (TEST day%d/part%d/case%d): got '%s'; expected '%s'"
% ("OK" if test == solution else "FAIL", day, part, test_case, solution, test) % (
"OK" if test == solution else "FAIL",
day,
part,
test_case,
solution,
test,
)
) )
else: else:
print( print(
@ -244,13 +289,15 @@ def print_solution(day: int, part: int, solution: Any, test: Any = None, test_ca
print("Day %s, Part %s - Average run time: %s" % (day, part, exec_time)) print("Day %s, Part %s - Average run time: %s" % (day, part, exec_time))
def split_line(line, split_char: str = ',', return_type: Union[Type, List[Type]] = None): def split_line(line, split_char: str = ",", return_type: Type | List[Type] = None):
if split_char: if split_char:
line = line.split(split_char) line = line.split(split_char)
if return_type is None: if return_type is None:
return line return line
elif isinstance(return_type, list): elif isinstance(return_type, list):
return [return_type[x](i) if len(return_type) > x else i for x, i in enumerate(line)] return [
return_type[x](i) if len(return_type) > x else i for x, i in enumerate(line)
]
else: else:
return [return_type(i) for i in line] return [return_type(i) for i in line]

View File

@ -1,6 +1,7 @@
# Copyright (c) 2020-present Benjamin Soyka # Copyright (c) 2020-present Benjamin Soyka
# Original and Licence at https://github.com/bsoyka/advent-of-code-ocr # Original and Licence at https://github.com/bsoyka/advent-of-code-ocr
from __future__ import annotations
from collections.abc import Sequence from collections.abc import Sequence
ALPHABET_6 = { ALPHABET_6 = {
@ -32,7 +33,12 @@ def convert_6(input_text: str, *, fill_pixel: str = "#", empty_pixel: str = ".")
return _convert_6(prepared_array) return _convert_6(prepared_array)
def convert_array_6(array: Sequence[Sequence[str | int]], *, fill_pixel: str | int = "#", empty_pixel: str | int = ".") -> str: def convert_array_6(
array: Sequence[Sequence[str | int]],
*,
fill_pixel: str | int = "#",
empty_pixel: str | int = ".",
) -> str:
"""Convert a height 6 NumPy array or nested list to characters""" """Convert a height 6 NumPy array or nested list to characters"""
prepared_array = [ prepared_array = [
[ [
@ -54,8 +60,7 @@ def _convert_6(array: list[list[str]]) -> str:
indices = [slice(start, start + 4) for start in range(0, cols, 5)] indices = [slice(start, start + 4) for start in range(0, cols, 5)]
result = [ result = [
ALPHABET_6["\n".join("".join(row[index]) for row in array)] ALPHABET_6["\n".join("".join(row[index]) for row in array)] for index in indices
for index in indices
] ]
return "".join(result) return "".join(result)

View File

@ -32,8 +32,12 @@ class Coordinate(tuple):
def is3D(self) -> bool: def is3D(self) -> bool:
return self.z is not None return self.z is not None
def getDistanceTo(self, target: Coordinate, algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN, def getDistanceTo(
includeDiagonals: bool = False) -> Union[int, float]: self,
target: Coordinate,
algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN,
includeDiagonals: bool = False,
) -> Union[int, float]:
""" """
Get distance to target Coordinate Get distance to target Coordinate
@ -47,18 +51,30 @@ class Coordinate(tuple):
if self.z is None: if self.z is None:
return sqrt(abs(self.x - target.x) ** 2 + abs(self.y - target.y) ** 2) return sqrt(abs(self.x - target.x) ** 2 + abs(self.y - target.y) ** 2)
else: else:
return sqrt(abs(self.x - target.x) ** 2 + abs(self.y - target.y) ** 2 + abs(self.z - target.z) ** 2) return sqrt(
abs(self.x - target.x) ** 2
+ abs(self.y - target.y) ** 2
+ abs(self.z - target.z) ** 2
)
elif algorithm == DistanceAlgorithm.CHEBYSHEV: elif algorithm == DistanceAlgorithm.CHEBYSHEV:
if self.z is None: if self.z is None:
return max(abs(target.x - self.x), abs(target.y - self.y)) return max(abs(target.x - self.x), abs(target.y - self.y))
else: else:
return max(abs(target.x - self.x), abs(target.y - self.y), abs(target.z - self.z)) return max(
abs(target.x - self.x),
abs(target.y - self.y),
abs(target.z - self.z),
)
elif algorithm == DistanceAlgorithm.MANHATTAN: elif algorithm == DistanceAlgorithm.MANHATTAN:
if not includeDiagonals: if not includeDiagonals:
if self.z is None: if self.z is None:
return abs(self.x - target.x) + abs(self.y - target.y) return abs(self.x - target.x) + abs(self.y - target.y)
else: else:
return abs(self.x - target.x) + abs(self.y - target.y) + abs(self.z - target.z) return (
abs(self.x - target.x)
+ abs(self.y - target.y)
+ abs(self.z - target.z)
)
else: else:
dist = [abs(self.x - target.x), abs(self.y - target.y)] dist = [abs(self.x - target.x), abs(self.y - target.y)]
if self.z is None: if self.z is None:
@ -72,15 +88,35 @@ class Coordinate(tuple):
o_dist = max(dist) - min(dist) o_dist = max(dist) - min(dist)
return 1.7 * d_steps + o_dist + 1.4 * min(dist) return 1.7 * d_steps + o_dist + 1.4 * min(dist)
def inBoundaries(self, minX: int, minY: int, maxX: int, maxY: int, minZ: int = -inf, maxZ: int = inf) -> bool: def inBoundaries(
self,
minX: int,
minY: int,
maxX: int,
maxY: int,
minZ: int = -inf,
maxZ: int = inf,
) -> bool:
if self.z is None: if self.z is None:
return minX <= self.x <= maxX and minY <= self.y <= maxY return minX <= self.x <= maxX and minY <= self.y <= maxY
else: else:
return minX <= self.x <= maxX and minY <= self.y <= maxY and minZ <= self.z <= maxZ return (
minX <= self.x <= maxX
and minY <= self.y <= maxY
and minZ <= self.z <= maxZ
)
def getCircle(self, radius: int = 1, algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN, def getCircle(
minX: int = -inf, minY: int = -inf, maxX: int = inf, maxY: int = inf, self,
minZ: int = -inf, maxZ: int = inf) -> list[Coordinate]: radius: int = 1,
algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN,
minX: int = -inf,
minY: int = -inf,
maxX: int = inf,
maxY: int = inf,
minZ: int = -inf,
maxZ: int = inf,
) -> list[Coordinate]:
ret = [] ret = []
if self.z is None: # mode 2D if self.z is None: # mode 2D
for x in range(self.x - radius * 2, self.x + radius * 2 + 1): for x in range(self.x - radius * 2, self.x + radius * 2 + 1):
@ -88,7 +124,11 @@ class Coordinate(tuple):
target = Coordinate(x, y) target = Coordinate(x, y)
if not target.inBoundaries(minX, minY, maxX, maxY): if not target.inBoundaries(minX, minY, maxX, maxY):
continue continue
dist = round_half_up(self.getDistanceTo(target, algorithm=algorithm, includeDiagonals=False)) dist = round_half_up(
self.getDistanceTo(
target, algorithm=algorithm, includeDiagonals=False
)
)
if dist == radius: if dist == radius:
ret.append(target) ret.append(target)
@ -99,14 +139,26 @@ class Coordinate(tuple):
target = Coordinate(x, y) target = Coordinate(x, y)
if not target.inBoundaries(minX, minY, maxX, maxY, minZ, maxZ): if not target.inBoundaries(minX, minY, maxX, maxY, minZ, maxZ):
continue continue
dist = round_half_up(self.getDistanceTo(target, algorithm=algorithm, includeDiagonals=False)) dist = round_half_up(
self.getDistanceTo(
target, algorithm=algorithm, includeDiagonals=False
)
)
if dist == radius: if dist == radius:
ret.append(target) ret.append(target)
return ret return ret
def getNeighbours(self, includeDiagonal: bool = True, minX: int = -inf, minY: int = -inf, def getNeighbours(
maxX: int = inf, maxY: int = inf, minZ: int = -inf, maxZ: int = inf) -> list[Coordinate]: self,
includeDiagonal: bool = True,
minX: int = -inf,
minY: int = -inf,
maxX: int = inf,
maxY: int = inf,
minZ: int = -inf,
maxZ: int = inf,
) -> list[Coordinate]:
""" """
Get a list of neighbouring coordinates. Get a list of neighbouring coordinates.
@ -121,7 +173,16 @@ class Coordinate(tuple):
""" """
if self.z is None: if self.z is None:
if includeDiagonal: if includeDiagonal:
nb_list = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)] nb_list = [
(-1, -1),
(-1, 0),
(-1, 1),
(0, -1),
(0, 1),
(1, -1),
(1, 0),
(1, 1),
]
else: else:
nb_list = [(-1, 0), (1, 0), (0, -1), (0, 1)] nb_list = [(-1, 0), (1, 0), (0, -1), (0, 1)]
@ -130,13 +191,29 @@ class Coordinate(tuple):
yield self.__class__(self.x + dx, self.y + dy) yield self.__class__(self.x + dx, self.y + dy)
else: else:
if includeDiagonal: if includeDiagonal:
nb_list = [(x, y, z) for x in [-1, 0, 1] for y in [-1, 0, 1] for z in [-1, 0, 1]] nb_list = [
(x, y, z)
for x in [-1, 0, 1]
for y in [-1, 0, 1]
for z in [-1, 0, 1]
]
nb_list.remove((0, 0, 0)) nb_list.remove((0, 0, 0))
else: else:
nb_list = [(-1, 0, 0), (0, -1, 0), (1, 0, 0), (0, 1, 0), (0, 0, 1), (0, 0, -1)] nb_list = [
(-1, 0, 0),
(0, -1, 0),
(1, 0, 0),
(0, 1, 0),
(0, 0, 1),
(0, 0, -1),
]
for dx, dy, dz in nb_list: for dx, dy, dz in nb_list:
if minX <= self.x + dx <= maxX and minY <= self.y + dy <= maxY and minZ <= self.z + dz <= maxZ: if (
minX <= self.x + dx <= maxX
and minY <= self.y + dy <= maxY
and minZ <= self.z + dz <= maxZ
):
yield self.__class__(self.x + dx, self.y + dy, self.z + dz) yield self.__class__(self.x + dx, self.y + dy, self.z + dz)
def getAngleTo(self, target: Coordinate, normalized: bool = False) -> float: def getAngleTo(self, target: Coordinate, normalized: bool = False) -> float:
@ -162,16 +239,20 @@ class Coordinate(tuple):
steps = gcd(diff.x, diff.y) steps = gcd(diff.x, diff.y)
step_x = diff.x // steps step_x = diff.x // steps
step_y = diff.y // steps step_y = diff.y // steps
return [self.__class__(self.x + step_x * i, self.y + step_y * i) for i in range(steps + 1)] return [
self.__class__(self.x + step_x * i, self.y + step_y * i)
for i in range(steps + 1)
]
else: else:
steps = gcd(diff.x, diff.y, diff.z) steps = gcd(diff.x, diff.y, diff.z)
step_x = diff.x // steps step_x = diff.x // steps
step_y = diff.y // steps step_y = diff.y // steps
step_z = diff.z // steps step_z = diff.z // steps
return [ return [
self.__class__(self.x + step_x * i, self.y + step_y * i, self.z + step_z * i) self.__class__(
for i self.x + step_x * i, self.y + step_y * i, self.z + step_z * i
in range(steps + 1) )
for i in range(steps + 1)
] ]
def reverse(self) -> Coordinate: def reverse(self) -> Coordinate:
@ -241,13 +322,29 @@ class Coordinate(tuple):
if self.z is None: if self.z is None:
return "%s(x=%d, y=%d)" % (self.__class__.__name__, self.x, self.y) return "%s(x=%d, y=%d)" % (self.__class__.__name__, self.x, self.y)
else: else:
return "%s(x=%d, y=%d, z=%d)" % (self.__class__.__name__, self.x, self.y, self.z) return "%s(x=%d, y=%d, z=%d)" % (
self.__class__.__name__,
self.x,
self.y,
self.z,
)
@classmethod @classmethod
def generate(cls, from_x: int, to_x: int, from_y: int, to_y: int, def generate(
from_z: int = None, to_z: int = None) -> List[Coordinate]: cls,
from_x: int,
to_x: int,
from_y: int,
to_y: int,
from_z: int = None,
to_z: int = None,
) -> List[Coordinate]:
if from_z is None or to_z is None: if from_z is None or to_z is None:
return [cls(x, y) for x in range(from_x, to_x + 1) for y in range(from_y, to_y + 1)] return [
cls(x, y)
for x in range(from_x, to_x + 1)
for y in range(from_y, to_y + 1)
]
else: else:
return [ return [
cls(x, y, z) cls(x, y, z)
@ -267,13 +364,14 @@ class HexCoordinate(Coordinate):
z x y z x y
-z +x -y -z +x -y
""" """
neighbour_vectors = { neighbour_vectors = {
'ne': Coordinate(-1, 0, 1), "ne": Coordinate(-1, 0, 1),
'nw': Coordinate(-1, 1, 0), "nw": Coordinate(-1, 1, 0),
'e': Coordinate(0, -1, 1), "e": Coordinate(0, -1, 1),
'w': Coordinate(0, 1, -1), "w": Coordinate(0, 1, -1),
'sw': Coordinate(1, 0, -1), "sw": Coordinate(1, 0, -1),
'se': Coordinate(1, -1, 0), "se": Coordinate(1, -1, 0),
} }
def __init__(self, x: int, y: int, z: int): def __init__(self, x: int, y: int, z: int):
@ -283,18 +381,33 @@ class HexCoordinate(Coordinate):
def get_length(self) -> int: def get_length(self) -> int:
return (abs(self.x) + abs(self.y) + abs(self.z)) // 2 return (abs(self.x) + abs(self.y) + abs(self.z)) // 2
def getDistanceTo(self, target: Coordinate, algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN, def getDistanceTo(
includeDiagonals: bool = True) -> Union[int, float]: self,
target: Coordinate,
algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN,
includeDiagonals: bool = True,
) -> Union[int, float]:
# includeDiagonals makes no sense in a hex grid, it's just here for signature reasons # includeDiagonals makes no sense in a hex grid, it's just here for signature reasons
if algorithm == DistanceAlgorithm.MANHATTAN: if algorithm == DistanceAlgorithm.MANHATTAN:
return (self - target).get_length() return (self - target).get_length()
def getNeighbours(self, includeDiagonal: bool = True, minX: int = -inf, minY: int = -inf, def getNeighbours(
maxX: int = inf, maxY: int = inf, minZ: int = -inf, maxZ: int = inf) -> list[Coordinate]: self,
includeDiagonal: bool = True,
minX: int = -inf,
minY: int = -inf,
maxX: int = inf,
maxY: int = inf,
minZ: int = -inf,
maxZ: int = inf,
) -> list[Coordinate]:
# includeDiagonals makes no sense in a hex grid, it's just here for signature reasons # includeDiagonals makes no sense in a hex grid, it's just here for signature reasons
return [ return [
self + x for x in self.neighbour_vectors.values() self + x
if minX <= (self + x).x <= maxX and minY <= (self + x).y <= maxY and minZ <= (self + x).z <= maxZ for x in self.neighbour_vectors.values()
if minX <= (self + x).x <= maxX
and minY <= (self + x).y <= maxY
and minZ <= (self + x).z <= maxZ
] ]
@ -311,13 +424,14 @@ class HexCoordinateF(HexCoordinate):
x y x y
+x -y +x -y
""" """
neighbour_vectors = { neighbour_vectors = {
'ne': Coordinate(-1, 0, 1), "ne": Coordinate(-1, 0, 1),
'nw': Coordinate(0, 1, -1), "nw": Coordinate(0, 1, -1),
'n': Coordinate(-1, 1, 0), "n": Coordinate(-1, 1, 0),
's': Coordinate(1, -1, 0), "s": Coordinate(1, -1, 0),
'sw': Coordinate(1, 0, -1), "sw": Coordinate(1, 0, -1),
'se': Coordinate(0, -1, 1), "se": Coordinate(0, -1, 1),
} }
def __init__(self, x: int, y: int, z: int): def __init__(self, x: int, y: int, z: int):
@ -339,7 +453,9 @@ class Shape:
def __len__(self): def __len__(self):
if not self.mode_3d: if not self.mode_3d:
return (self.bottom_right.x - self.top_left.x + 1) * (self.bottom_right.y - self.top_left.y + 1) return (self.bottom_right.x - self.top_left.x + 1) * (
self.bottom_right.y - self.top_left.y + 1
)
else: else:
return ( return (
(self.bottom_right.x - self.top_left.x + 1) (self.bottom_right.x - self.top_left.x + 1)
@ -356,23 +472,43 @@ class Shape:
if not self.mode_3d: if not self.mode_3d:
intersect_top_left = Coordinate( intersect_top_left = Coordinate(
self.top_left.x if self.top_left.x > other.top_left.x else other.top_left.x, self.top_left.x
self.top_left.y if self.top_left.y > other.top_left.y else other.top_left.y, if self.top_left.x > other.top_left.x
else other.top_left.x,
self.top_left.y
if self.top_left.y > other.top_left.y
else other.top_left.y,
) )
intersect_bottom_right = Coordinate( intersect_bottom_right = Coordinate(
self.bottom_right.x if self.bottom_right.x < other.bottom_right.x else other.bottom_right.x, self.bottom_right.x
self.bottom_right.y if self.bottom_right.y < other.bottom_right.y else other.bottom_right.y, if self.bottom_right.x < other.bottom_right.x
else other.bottom_right.x,
self.bottom_right.y
if self.bottom_right.y < other.bottom_right.y
else other.bottom_right.y,
) )
else: else:
intersect_top_left = Coordinate( intersect_top_left = Coordinate(
self.top_left.x if self.top_left.x > other.top_left.x else other.top_left.x, self.top_left.x
self.top_left.y if self.top_left.y > other.top_left.y else other.top_left.y, if self.top_left.x > other.top_left.x
self.top_left.z if self.top_left.z > other.top_left.z else other.top_left.z, else other.top_left.x,
self.top_left.y
if self.top_left.y > other.top_left.y
else other.top_left.y,
self.top_left.z
if self.top_left.z > other.top_left.z
else other.top_left.z,
) )
intersect_bottom_right = Coordinate( intersect_bottom_right = Coordinate(
self.bottom_right.x if self.bottom_right.x < other.bottom_right.x else other.bottom_right.x, self.bottom_right.x
self.bottom_right.y if self.bottom_right.y < other.bottom_right.y else other.bottom_right.y, if self.bottom_right.x < other.bottom_right.x
self.bottom_right.z if self.bottom_right.z < other.bottom_right.z else other.bottom_right.z, else other.bottom_right.x,
self.bottom_right.y
if self.bottom_right.y < other.bottom_right.y
else other.bottom_right.y,
self.bottom_right.z
if self.bottom_right.z < other.bottom_right.z
else other.bottom_right.z,
) )
if intersect_top_left <= intersect_bottom_right: if intersect_top_left <= intersect_bottom_right:
@ -385,10 +521,18 @@ class Shape:
return self.intersection(other) return self.intersection(other)
def __str__(self): def __str__(self):
return "%s(%s -> %s)" % (self.__class__.__name__, self.top_left, self.bottom_right) return "%s(%s -> %s)" % (
self.__class__.__name__,
self.top_left,
self.bottom_right,
)
def __repr__(self): def __repr__(self):
return "%s(%s, %s)" % (self.__class__.__name__, self.top_left, self.bottom_right) return "%s(%s, %s)" % (
self.__class__.__name__,
self.top_left,
self.bottom_right,
)
class Square(Shape): class Square(Shape):

View File

@ -20,6 +20,7 @@ from signal import SIGTERM, signal
DEV_NULL = "/dev/null" DEV_NULL = "/dev/null"
class Daemon: class Daemon:
""" """
A generic daemon class. A generic daemon class.
@ -27,7 +28,9 @@ class Daemon:
Usage: subclass the Daemon class and override the run() method Usage: subclass the Daemon class and override the run() method
""" """
def __init__(self, pidfile='_.pid', stdin=DEV_NULL, stdout=DEV_NULL, stderr=DEV_NULL): def __init__(
self, pidfile="_.pid", stdin=DEV_NULL, stdout=DEV_NULL, stderr=DEV_NULL
):
self.stdin = stdin self.stdin = stdin
self.stdout = stdout self.stdout = stdout
self.stderr = stderr self.stderr = stderr
@ -65,9 +68,9 @@ class Daemon:
# redirect standard file descriptors # redirect standard file descriptors
sys.stdout.flush() sys.stdout.flush()
sys.stderr.flush() sys.stderr.flush()
si = open(os.devnull, 'r') si = open(os.devnull, "r")
so = open(os.devnull, 'a+') so = open(os.devnull, "a+")
se = open(os.devnull, 'a+') se = open(os.devnull, "a+")
os.dup2(si.fileno(), sys.stdin.fileno()) os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno()) os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno()) os.dup2(se.fileno(), sys.stderr.fileno())
@ -77,7 +80,7 @@ class Daemon:
# write pidfile # write pidfile
pid = str(os.getpid()) pid = str(os.getpid())
open(self.pidfile, 'w+').write("%s\n" % pid) open(self.pidfile, "w+").write("%s\n" % pid)
def onstop(self): def onstop(self):
self.quit() self.quit()
@ -89,7 +92,7 @@ class Daemon:
""" """
# Check for a pidfile to see if the daemon already runs # Check for a pidfile to see if the daemon already runs
try: try:
pf = open(self.pidfile, 'r') pf = open(self.pidfile, "r")
pid = int(pf.read().strip()) pid = int(pf.read().strip())
pf.close() pf.close()
except IOError: except IOError:
@ -110,7 +113,7 @@ class Daemon:
""" """
# Get the pid from the pidfile # Get the pid from the pidfile
try: try:
pf = open(self.pidfile, 'r') pf = open(self.pidfile, "r")
pid = int(pf.read().strip()) pid = int(pf.read().strip())
pf.close() pf.close()
except IOError: except IOError:

View File

@ -2,11 +2,10 @@ from __future__ import annotations
from collections import deque from collections import deque
from .aoc_ocr import convert_array_6 from .aoc_ocr import convert_array_6
from .coordinate import Coordinate, DistanceAlgorithm, Shape from .coordinate import Coordinate, DistanceAlgorithm, Shape
from .types import Numeric
from enum import Enum from enum import Enum
from heapq import heappop, heappush from heapq import heappop, heappush
from math import inf from math import inf
from typing import Any, Dict, List, Union from typing import Any, Dict, List
OFF = False OFF = False
ON = True ON = True
@ -62,7 +61,14 @@ class Grid:
self.maxZ = pos.z if pos.z > self.maxZ else self.maxZ self.maxZ = pos.z if pos.z > self.maxZ else self.maxZ
def recalcBoundaries(self) -> None: def recalcBoundaries(self) -> None:
self.minX, self.maxX, self.minY, self.maxY, self.minZ, self.maxZ = None, None, None, None, None, None self.minX, self.maxX, self.minY, self.maxY, self.minZ, self.maxZ = (
None,
None,
None,
None,
None,
None,
)
for c in self.__grid: for c in self.__grid:
self.__trackBoundaries(c) self.__trackBoundaries(c)
@ -120,25 +126,29 @@ class Grid:
return value return value
def move(self, pos: Coordinate, vec: Coordinate,): def move(
self,
pos: Coordinate,
vec: Coordinate,
):
target = pos + vec target = pos + vec
self.set(target, self.get(pos)) self.set(target, self.get(pos))
if pos in self.__grid: if pos in self.__grid:
del self.__grid[pos] del self.__grid[pos]
def add(self, pos: Coordinate, value: Numeric = 1) -> Numeric: def add(self, pos: Coordinate, value: int | float = 1) -> int | float:
return self.set(pos, self.get(pos) + value) return self.set(pos, self.get(pos) + value)
def sub(self, pos: Coordinate, value: Numeric = 1) -> Numeric: def sub(self, pos: Coordinate, value: int | float = 1) -> int | float:
return self.set(pos, self.get(pos) - value) return self.set(pos, self.get(pos) - value)
def mul(self, pos: Coordinate, value: Numeric = 1) -> Numeric: def mul(self, pos: Coordinate, value: int | float = 1) -> int | float:
return self.set(pos, self.get(pos) * value) return self.set(pos, self.get(pos) * value)
def div(self, pos: Coordinate, value: Numeric = 1) -> Numeric: def div(self, pos: Coordinate, value: int | float = 1) -> int | float:
return self.set(pos, self.get(pos) / value) return self.set(pos, self.get(pos) / value)
def add_shape(self, shape: Shape, value: Numeric = 1) -> None: def add_shape(self, shape: Shape, value: int | float = 1) -> None:
for x in range(shape.top_left.x, shape.bottom_right.x + 1): for x in range(shape.top_left.x, shape.bottom_right.x + 1):
for y in range(shape.top_left.y, shape.bottom_right.y + 1): for y in range(shape.top_left.y, shape.bottom_right.y + 1):
if not shape.mode_3d: if not shape.mode_3d:
@ -186,15 +196,21 @@ class Grid:
def isWithinBoundaries(self, pos: Coordinate) -> bool: def isWithinBoundaries(self, pos: Coordinate) -> bool:
if self.mode3D: if self.mode3D:
return self.minX <= pos.x <= self.maxX and self.minY <= pos.y <= self.maxY \ return (
self.minX <= pos.x <= self.maxX
and self.minY <= pos.y <= self.maxY
and self.minZ <= pos.z <= self.maxZ and self.minZ <= pos.z <= self.maxZ
)
else: else:
return self.minX <= pos.x <= self.maxX and self.minY <= pos.y <= self.maxY return self.minX <= pos.x <= self.maxX and self.minY <= pos.y <= self.maxY
def getActiveCells(self, x: int = None, y: int = None, z: int = None) -> List[Coordinate]: def getActiveCells(
self, x: int = None, y: int = None, z: int = None
) -> List[Coordinate]:
if x is not None or y is not None or z is not None: if x is not None or y is not None or z is not None:
return [ return [
c for c in self.__grid.keys() c
for c in self.__grid.keys()
if (c.x == x if x is not None else True) if (c.x == x if x is not None else True)
and (c.y == y if y is not None else True) and (c.y == y if y is not None else True)
and (c.z == z if z is not None else True) and (c.z == z if z is not None else True)
@ -202,8 +218,12 @@ class Grid:
else: else:
return list(self.__grid.keys()) return list(self.__grid.keys())
def getActiveRegion(self, start: Coordinate, includeDiagonal: bool = False, ignore: List[Coordinate] = None) \ def getActiveRegion(
-> List[Coordinate]: self,
start: Coordinate,
includeDiagonal: bool = False,
ignore: List[Coordinate] = None,
) -> List[Coordinate]:
if not self.get(start): if not self.get(start):
return [] return []
if ignore is None: if ignore is None:
@ -218,7 +238,7 @@ class Grid:
def values(self): def values(self):
return self.__grid.values() return self.__grid.values()
def getSum(self, includeNegative: bool = True) -> Numeric: def getSum(self, includeNegative: bool = True) -> int | float:
if not self.mode3D: if not self.mode3D:
return sum( return sum(
self.get(Coordinate(x, y)) self.get(Coordinate(x, y))
@ -235,18 +255,31 @@ class Grid:
if includeNegative or self.get(Coordinate(x, y)) >= 0 if includeNegative or self.get(Coordinate(x, y)) >= 0
) )
def getNeighboursOf(self, pos: Coordinate, includeDefault: bool = False, includeDiagonal: bool = True) \ def getNeighboursOf(
-> List[Coordinate]: self,
pos: Coordinate,
includeDefault: bool = False,
includeDiagonal: bool = True,
) -> List[Coordinate]:
neighbours = pos.getNeighbours( neighbours = pos.getNeighbours(
includeDiagonal=includeDiagonal, includeDiagonal=includeDiagonal,
minX=self.minX, minY=self.minY, minZ=self.minZ, minX=self.minX,
maxX=self.maxX, maxY=self.maxY, maxZ=self.maxZ minY=self.minY,
minZ=self.minZ,
maxX=self.maxX,
maxY=self.maxY,
maxZ=self.maxZ,
) )
for x in neighbours: for x in neighbours:
if includeDefault or x in self.__grid: if includeDefault or x in self.__grid:
yield x yield x
def getNeighbourSum(self, pos: Coordinate, includeNegative: bool = True, includeDiagonal: bool = True) -> Numeric: def getNeighbourSum(
self,
pos: Coordinate,
includeNegative: bool = True,
includeDiagonal: bool = True,
) -> int | float:
neighbour_sum = 0 neighbour_sum = 0
for neighbour in self.getNeighboursOf(pos, includeDefault=includeDiagonal): for neighbour in self.getNeighboursOf(pos, includeDefault=includeDiagonal):
if includeNegative or self.get(neighbour) > 0: if includeNegative or self.get(neighbour) > 0:
@ -338,8 +371,14 @@ class Grid:
else: else:
self.shift(0 - self.minX, 0 - self.minY) self.shift(0 - self.minX, 0 - self.minY)
def getPath_BFS(self, pos_from: Coordinate, pos_to: Coordinate, includeDiagonal: bool, walls: List[Any] = None, def getPath_BFS(
stop_at_first: Any = None) -> Union[None, List[Coordinate]]: self,
pos_from: Coordinate,
pos_to: Coordinate,
includeDiagonal: bool,
walls: List[Any] = None,
stop_at_first: Any = None,
) -> List[Coordinate] | None:
queue = deque() queue = deque()
came_from = {pos_from: None} came_from = {pos_from: None}
queue.append(pos_from) queue.append(pos_from)
@ -349,12 +388,17 @@ class Grid:
while queue: while queue:
current = queue.popleft() current = queue.popleft()
found_end = False found_end = False
for c in self.getNeighboursOf(current, includeDiagonal=includeDiagonal, for c in self.getNeighboursOf(
includeDefault=self.__default not in walls): current,
includeDiagonal=includeDiagonal,
includeDefault=self.__default not in walls,
):
if c in came_from and self.get(c) in walls: if c in came_from and self.get(c) in walls:
continue continue
came_from[c] = current came_from[c] = current
if c == pos_to or (stop_at_first is not None and self.get(c) == stop_at_first): if c == pos_to or (
stop_at_first is not None and self.get(c) == stop_at_first
):
pos_to = c pos_to = c
found_end = True found_end = True
break break
@ -372,8 +416,14 @@ class Grid:
return ret return ret
def getPath(self, pos_from: Coordinate, pos_to: Coordinate, includeDiagonal: bool, walls: List[Any] = None, def getPath(
weighted: bool = False) -> Union[None, List[Coordinate]]: self,
pos_from: Coordinate,
pos_to: Coordinate,
includeDiagonal: bool,
walls: List[Any] = None,
weighted: bool = False,
) -> List[Coordinate] | None:
f_costs = [] f_costs = []
if walls is None: if walls is None:
walls = [self.__default] walls = [self.__default]
@ -395,7 +445,9 @@ class Grid:
if currentCoord == pos_to: if currentCoord == pos_to:
break break
for neighbour in self.getNeighboursOf(currentCoord, includeDefault=True, includeDiagonal=includeDiagonal): for neighbour in self.getNeighboursOf(
currentCoord, includeDefault=True, includeDiagonal=includeDiagonal
):
if self.get(neighbour) in walls or neighbour in closedNodes: if self.get(neighbour) in walls or neighbour in closedNodes:
continue continue
@ -404,13 +456,19 @@ class Grid:
elif not includeDiagonal: elif not includeDiagonal:
neighbourDist = 1 neighbourDist = 1
else: else:
neighbourDist = currentCoord.getDistanceTo(neighbour, DistanceAlgorithm.MANHATTAN, includeDiagonal) neighbourDist = currentCoord.getDistanceTo(
neighbour, DistanceAlgorithm.MANHATTAN, includeDiagonal
)
targetDist = neighbour.getDistanceTo(pos_to) targetDist = neighbour.getDistanceTo(pos_to)
f_cost = targetDist + neighbourDist + currentNode[1] f_cost = targetDist + neighbourDist + currentNode[1]
if neighbour not in openNodes or f_cost < openNodes[neighbour][0]: if neighbour not in openNodes or f_cost < openNodes[neighbour][0]:
openNodes[neighbour] = (f_cost, currentNode[1] + neighbourDist, currentCoord) openNodes[neighbour] = (
f_cost,
currentNode[1] + neighbourDist,
currentCoord,
)
heappush(f_costs, (f_cost, neighbour)) heappush(f_costs, (f_cost, neighbour))
if pos_to not in closedNodes: if pos_to not in closedNodes:
@ -424,18 +482,33 @@ class Grid:
return pathCoords return pathCoords
def sub_grid(self, from_x: int, from_y: int, to_x: int, to_y: int, from_z: int = None, to_z: int = None) -> 'Grid': def sub_grid(
self,
from_x: int,
from_y: int,
to_x: int,
to_y: int,
from_z: int = None,
to_z: int = None,
) -> "Grid":
if self.mode3D and (from_z is None or to_z is None): if self.mode3D and (from_z is None or to_z is None):
raise ValueError("sub_grid() on mode3d Grids requires from_z and to_z to be set") raise ValueError(
"sub_grid() on mode3d Grids requires from_z and to_z to be set"
)
count_x, count_y, count_z = 0, 0, 0 count_x, count_y, count_z = 0, 0, 0
new_grid = Grid(self.__default) new_grid = Grid(self.__default)
for x in range(from_x, to_x + 1): for x in range(from_x, to_x + 1):
for y in range(from_y, to_y + 1): for y in range(from_y, to_y + 1):
if not self.mode3D: if not self.mode3D:
new_grid.set(Coordinate(count_x, count_y), self.get(Coordinate(x, y))) new_grid.set(
Coordinate(count_x, count_y), self.get(Coordinate(x, y))
)
else: else:
for z in range(from_z, to_z + 1): for z in range(from_z, to_z + 1):
new_grid.set(Coordinate(count_x, count_y, count_z), self.get(Coordinate(x, y, z))) new_grid.set(
Coordinate(count_x, count_y, count_z),
self.get(Coordinate(x, y, z)),
)
count_z += 1 count_z += 1
count_z = 0 count_z = 0
@ -454,7 +527,16 @@ class Grid:
put_y = y put_y = y
put_x += 1 put_x += 1
def print(self, spacer: str = "", true_char: str = '#', false_char: str = " ", translate: dict = None, mark: list = None, z_level: int = None, bool_mode: bool = False): def print(
self,
spacer: str = "",
true_char: str = "#",
false_char: str = " ",
translate: dict = None,
mark: list = None,
z_level: int = None,
bool_mode: bool = False,
):
if translate is None: if translate is None:
translate = {} translate = {}
@ -486,10 +568,16 @@ class Grid:
def get_aoc_ocr_string(self, x_shift: int = 0, y_shift: int = 0): def get_aoc_ocr_string(self, x_shift: int = 0, y_shift: int = 0):
return convert_array_6( return convert_array_6(
[['#' if self.get(Coordinate(x + x_shift, y + y_shift)) else '.' for x in self.rangeX()] for y in [
self.rangeY()]) [
"#" if self.get(Coordinate(x + x_shift, y + y_shift)) else "."
for x in self.rangeX()
]
for y in self.rangeY()
]
)
def __str__(self, true_char: str = '#', false_char: str = "."): def __str__(self, true_char: str = "#", false_char: str = "."):
return "/".join( return "/".join(
"".join( "".join(
true_char if self.get(Coordinate(x, y)) else false_char true_char if self.get(Coordinate(x, y)) else false_char
@ -499,10 +587,22 @@ class Grid:
) )
@classmethod @classmethod
def from_str(cls, grid_string: str, default: Any = False, true_char: str = '#', true_value: Any = True, translate: dict = None, mode3d: bool = False) -> 'Grid': def from_str(
cls,
grid_string: str,
default: Any = False,
true_char: str = "#",
true_value: Any = True,
translate: dict = None,
mode3d: bool = False,
) -> "Grid":
if translate is None: if translate is None:
translate = {} translate = {}
if true_char is not None and True not in translate.values() and true_char not in translate: if (
true_char is not None
and True not in translate.values()
and true_char not in translate
):
translate[true_char] = true_value if true_value is not None else True translate[true_char] = true_value if true_value is not None else True
ret = cls(default=default) ret = cls(default=default)

View File

@ -1,5 +1,4 @@
import math import math
from .tools import cache
def factorial(n: int) -> int: def factorial(n: int) -> int:

View File

@ -1,11 +1,10 @@
from __future__ import annotations
from time import sleep from time import sleep
from .schedule import Scheduler from .schedule import Scheduler
from .simplesocket import ClientSocket from .simplesocket import ClientSocket
from .types import StrOrNone
from datetime import timedelta from datetime import timedelta
from enum import Enum from enum import Enum
from typing import Callable, Dict, List, Union from typing import Callable
class ServerMessage(str, Enum): class ServerMessage(str, Enum):
@ -191,7 +190,7 @@ class User:
class Channel: class Channel:
name: str name: str
topic: str topic: str
userlist: Dict[str, User] userlist: dict[str, User]
def __init__(self, name: str): def __init__(self, name: str):
self.name = name self.name = name
@ -208,22 +207,29 @@ class Channel:
class Client: class Client:
__function_register: Dict[str, List[Callable]] __function_register: dict[str, list[Callable]]
__server_socket: ClientSocket __server_socket: ClientSocket
__server_caps: Dict[str, Union[str, int]] __server_caps: dict[str, str | int]
__userlist: Dict[str, User] __userlist: dict[str, User]
__channellist: Dict[str, Channel] __channellist: dict[str, Channel]
__my_user: StrOrNone __my_user: str | None
def __init__(self, server: str, port: int, nick: str, username: str, realname: str = "Python Bot"): def __init__(
self,
server: str,
port: int,
nick: str,
username: str,
realname: str = "Python Bot",
):
self.__userlist = {} self.__userlist = {}
self.__channellist = {} self.__channellist = {}
self.__server_socket = ClientSocket(server, port) self.__server_socket = ClientSocket(server, port)
self.__server_socket.sendline("USER %s ignore ignore :%s" % (username, realname)) self.__server_socket.sendline(
"USER %s ignore ignore :%s" % (username, realname)
)
self.__server_socket.sendline("NICK %s" % nick) self.__server_socket.sendline("NICK %s" % nick)
self.__server_caps = { self.__server_caps = {"MAXLEN": 255}
'MAXLEN': 255
}
self.__function_register = { self.__function_register = {
ServerMessage.RPL_WELCOME: [self.on_rpl_welcome], ServerMessage.RPL_WELCOME: [self.on_rpl_welcome],
ServerMessage.RPL_TOPIC: [self.on_rpl_topic], ServerMessage.RPL_TOPIC: [self.on_rpl_topic],
@ -261,7 +267,10 @@ class Client:
if "!" in msg_from and msg_from not in self.__userlist: if "!" in msg_from and msg_from not in self.__userlist:
self.__userlist[msg_from] = User(msg_from) self.__userlist[msg_from] = User(msg_from)
if self.__userlist[msg_from].nickname == self.__userlist[self.__my_user].nickname: if (
self.__userlist[msg_from].nickname
== self.__userlist[self.__my_user].nickname
):
del self.__userlist[self.__my_user] del self.__userlist[self.__my_user]
self.__my_user = msg_from self.__my_user = msg_from
@ -301,7 +310,9 @@ class Client:
def on_nick(self, msg_from: str, msg_to: str, message: str): def on_nick(self, msg_from: str, msg_to: str, message: str):
self.__userlist[msg_from].nick(msg_to) self.__userlist[msg_from].nick(msg_to)
self.__userlist[self.__userlist[msg_from].identifier] = self.__userlist[msg_from] self.__userlist[self.__userlist[msg_from].identifier] = self.__userlist[
msg_from
]
del self.__userlist[msg_from] del self.__userlist[msg_from]
def on_join(self, msg_from: str, msg_to: str, message: str): def on_join(self, msg_from: str, msg_to: str, message: str):
@ -349,7 +360,7 @@ class Client:
self.receive() self.receive()
self.__server_socket.close() self.__server_socket.close()
def getUser(self, user: str = None) -> Union[User, None]: def getUser(self, user: str = None) -> User | None:
if user is None: if user is None:
return self.__userlist[self.__my_user] return self.__userlist[self.__my_user]
elif user in self.__userlist: elif user in self.__userlist:
@ -357,21 +368,28 @@ class Client:
else: else:
return None return None
def getUserList(self) -> List[User]: def getUserList(self) -> list[User]:
return list(self.__userlist.values()) return list(self.__userlist.values())
def getChannel(self, channel: str) -> Union[Channel, None]: def getChannel(self, channel: str) -> Channel | None:
if channel in self.__channellist: if channel in self.__channellist:
return self.__channellist[channel] return self.__channellist[channel]
else: else:
return None return None
def getChannelList(self) -> List[Channel]: def getChannelList(self) -> list[Channel]:
return list(self.__channellist.values()) return list(self.__channellist.values())
class IrcBot(Client): class IrcBot(Client):
def __init__(self, server: str, port: int, nick: str, username: str, realname: str = "Python Bot"): def __init__(
self,
server: str,
port: int,
nick: str,
username: str,
realname: str = "Python Bot",
):
super().__init__(server, port, nick, username, realname) super().__init__(server, port, nick, username, realname)
self._scheduler = Scheduler() self._scheduler = Scheduler()
self._channel_commands = {} self._channel_commands = {}
@ -397,8 +415,13 @@ class IrcBot(Client):
if not message: if not message:
return return
command = message.split()[0] command = message.split()[0]
if msg_to in self._channel_commands and command in self._channel_commands[msg_to]: if (
self._channel_commands[msg_to][command](msg_from, " ".join(message.split()[1:])) msg_to in self._channel_commands
and command in self._channel_commands[msg_to]
):
self._channel_commands[msg_to][command](
msg_from, " ".join(message.split()[1:])
)
if msg_to == self.getUser().nickname and command in self._privmsg_commands: if msg_to == self.getUser().nickname and command in self._privmsg_commands:
self._privmsg_commands[command](msg_from, " ".join(message.split()[1:])) self._privmsg_commands[command](msg_from, " ".join(message.split()[1:]))

View File

@ -1,17 +1,18 @@
from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Union from typing import Any
@dataclass @dataclass
class Node: class Node:
value: Any value: Any
next: 'Node' = None next: "Node" = None
prev: 'Node' = None prev: "Node" = None
class LinkedList: class LinkedList:
_head: Union[Node, None] = None _head: Node | None = None
_tail: Union[Node, None] = None _tail: Node | None = None
size: int = 0 size: int = 0
def _get_head(self): def _get_head(self):
@ -128,7 +129,7 @@ class LinkedList:
return x.value == obj return x.value == obj
def __add__(self, other: 'LinkedList') -> 'LinkedList': def __add__(self, other: "LinkedList") -> "LinkedList":
self._tail.next = other.head self._tail.next = other.head
other.head.prev = self._tail other.head.prev = self._tail
self._tail = other.tail self._tail = other.tail

View File

@ -1,10 +1,10 @@
from __future__ import annotations
import math import math
from decimal import Decimal, ROUND_HALF_UP from decimal import Decimal, ROUND_HALF_UP
from .types import Numeric
def round_half_up(number: Numeric) -> int: def round_half_up(number: int | float) -> int:
""" pythons round() rounds .5 to the *even* number; 0.5 == 0 """ """pythons round() rounds .5 to the *even* number; 0.5 == 0"""
return int(Decimal(number).to_integral(ROUND_HALF_UP)) return int(Decimal(number).to_integral(ROUND_HALF_UP))

View File

@ -1,17 +1,23 @@
import datetime import datetime
from typing import Callable, List, Any from typing import Callable, Any
class Scheduler: class Scheduler:
def __init__(self): def __init__(self):
self.jobs = {} self.jobs = {}
def schedule(self, name: str, every: datetime.timedelta, func: Callable[..., None], *args: List[Any]): def schedule(
self,
name: str,
every: datetime.timedelta,
func: Callable[..., None],
*args: list[Any],
):
self.jobs[name] = { self.jobs[name] = {
'call': func, "call": func,
'args': args, "args": args,
'timedelta': every, "timedelta": every,
'runat': (datetime.datetime.utcnow() + every) "runat": (datetime.datetime.utcnow() + every),
} }
def unschedule(self, name: str): def unschedule(self, name: str):
@ -21,6 +27,6 @@ class Scheduler:
def run_pending(self): def run_pending(self):
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
for job in self.jobs.values(): for job in self.jobs.values():
if job['runat'] <= now: if job["runat"] <= now:
job['runat'] += job['timedelta'] job["runat"] += job["timedelta"]
job['call'](*job['args']) job["call"](*job["args"])

View File

@ -1,16 +1,19 @@
from __future__ import annotations
import errno import errno
import socket import socket
import threading import threading
import time import time
from typing import Callable, Union from typing import Callable
class Socket: class Socket:
def __init__(self, address_family: socket.AddressFamily, socket_kind: socket.SocketKind): def __init__(
self, address_family: socket.AddressFamily, socket_kind: socket.SocketKind
):
self.socket = socket.socket(family=address_family, type=socket_kind) self.socket = socket.socket(family=address_family, type=socket_kind)
self.__recv_buffer = b"" self.__recv_buffer = b""
def send(self, buffer: Union[str, bytes]) -> int: def send(self, buffer: str | bytes) -> int:
if isinstance(buffer, str): if isinstance(buffer, str):
buffer = buffer.encode("UTF-8") buffer = buffer.encode("UTF-8")
@ -40,7 +43,7 @@ class Socket:
self.send(line) self.send(line)
def recvline(self, timeout: int = 0) -> Union[str, None]: def recvline(self, timeout: int = 0) -> str | None:
""" """
Receive exactly one text line (delimiter: newline "\n" or "\r\n") from the socket. Receive exactly one text line (delimiter: newline "\n" or "\r\n") from the socket.
@ -58,8 +61,10 @@ class Socket:
if b"\n" not in self.__recv_buffer: if b"\n" not in self.__recv_buffer:
return None return None
else: else:
line = self.__recv_buffer[:self.__recv_buffer.index(b"\n")] line = self.__recv_buffer[: self.__recv_buffer.index(b"\n")]
self.__recv_buffer = self.__recv_buffer[self.__recv_buffer.index(b"\n") + 1:] self.__recv_buffer = self.__recv_buffer[
self.__recv_buffer.index(b"\n") + 1 :
]
return line.decode("UTF-8") return line.decode("UTF-8")
def close(self): def close(self):
@ -67,8 +72,13 @@ class Socket:
class ClientSocket(Socket): class ClientSocket(Socket):
def __init__(self, addr: str, port: int, address_family: socket.AddressFamily = socket.AF_INET, def __init__(
socket_kind: socket.SocketKind = socket.SOCK_STREAM): self,
addr: str,
port: int,
address_family: socket.AddressFamily = socket.AF_INET,
socket_kind: socket.SocketKind = socket.SOCK_STREAM,
):
super().__init__(address_family, socket_kind) super().__init__(address_family, socket_kind)
self.socket.connect((addr, port)) self.socket.connect((addr, port))
self.laddr, self.lport = self.socket.getsockname() self.laddr, self.lport = self.socket.getsockname()
@ -84,18 +94,28 @@ class RemoteSocket(Socket):
class ServerSocket(Socket): class ServerSocket(Socket):
def __init__(self, addr: str, port: int, address_family: socket.AddressFamily = socket.AF_INET, def __init__(
socket_kind: socket.SocketKind = socket.SOCK_STREAM): self,
addr: str,
port: int,
address_family: socket.AddressFamily = socket.AF_INET,
socket_kind: socket.SocketKind = socket.SOCK_STREAM,
):
super().__init__(address_family, socket_kind) super().__init__(address_family, socket_kind)
self.socket.bind((addr, port)) self.socket.bind((addr, port))
self.socket.listen(5) self.socket.listen(5)
self.laddr, self.lport = self.socket.getsockname() self.laddr, self.lport = self.socket.getsockname()
self.raddr, self.rport = None, None # Transport endpoint is not connected. Surprisingly. self.raddr, self.rport = (
None,
None,
) # Transport endpoint is not connected. Surprisingly.
def _connection_acceptor(self, target: Callable[..., None]): def _connection_acceptor(self, target: Callable[..., None]):
while 1: while 1:
(client_socket, client_address) = self.socket.accept() (client_socket, client_address) = self.socket.accept()
connection_handler_thread = threading.Thread(target=target, args=(RemoteSocket(client_socket), )) connection_handler_thread = threading.Thread(
target=target, args=(RemoteSocket(client_socket),)
)
connection_handler_thread.start() connection_handler_thread.start()
def accept(self, target: Callable[..., None], blocking: bool = True): def accept(self, target: Callable[..., None], blocking: bool = True):
@ -103,6 +123,8 @@ class ServerSocket(Socket):
self._connection_acceptor(target) self._connection_acceptor(target)
return None return None
else: else:
connection_accept_thread = threading.Thread(target=self._connection_acceptor, kwargs={'target': target}) connection_accept_thread = threading.Thread(
target=self._connection_acceptor, kwargs={"target": target}
)
connection_accept_thread.start() connection_accept_thread.start()
return connection_accept_thread return connection_accept_thread

View File

@ -1,11 +1,11 @@
from __future__ import annotations
from time import perf_counter_ns from time import perf_counter_ns
from .tools import human_readable_time_from_ns from .tools import human_readable_time_from_ns
from .types import IntOrNone
class StopWatch: class StopWatch:
started: IntOrNone = None started: int | None = None
stopped: IntOrNone = None stopped: int | None = None
def __init__(self, auto_start=True): def __init__(self, auto_start=True):
if auto_start: if auto_start:

View File

@ -31,8 +31,8 @@ class Dict(dict):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
if "default" in kwargs: if "default" in kwargs:
self.__default = kwargs['default'] self.__default = kwargs["default"]
del kwargs['default'] del kwargs["default"]
else: else:
self.__default = None self.__default = None
@ -46,7 +46,9 @@ class Dict(dict):
self[k] = Dict(self[k]) self[k] = Dict(self[k])
elif isinstance(self[k], list): elif isinstance(self[k], list):
for i in range(len(self[k])): for i in range(len(self[k])):
if isinstance(self[k][i], dict) and not isinstance(self[k][i], Dict): if isinstance(self[k][i], dict) and not isinstance(
self[k][i], Dict
):
self[k][i] = Dict(self[k][i]) self[k][i] = Dict(self[k][i])
def update(self, other: dict, **kwargs): def update(self, other: dict, **kwargs):
@ -75,11 +77,11 @@ class Dict(dict):
def get_script_dir(follow_symlinks: bool = True) -> str: def get_script_dir(follow_symlinks: bool = True) -> str:
"""return path of the executed script""" """return path of the executed script"""
if getattr(sys, 'frozen', False): if getattr(sys, "frozen", False):
path = os.path.abspath(sys.executable) path = os.path.abspath(sys.executable)
else: else:
if '__main__' in sys.modules and hasattr(sys.modules['__main__'], '__file__'): if "__main__" in sys.modules and hasattr(sys.modules["__main__"], "__file__"):
path = sys.modules['__main__'].__file__ path = sys.modules["__main__"].__file__
else: else:
path = inspect.getabsfile(get_script_dir) path = inspect.getabsfile(get_script_dir)
@ -132,13 +134,13 @@ def human_readable_time_from_delta(delta: datetime.timedelta) -> str:
def human_readable_time_from_ns(ns: int) -> str: def human_readable_time_from_ns(ns: int) -> str:
units = [ units = [
(1000, 'ns'), (1000, "ns"),
(1000, 'µs'), (1000, "µs"),
(1000, 'ms'), (1000, "ms"),
(60, 's'), (60, "s"),
(60, 'm'), (60, "m"),
(60, 'h'), (60, "h"),
(24, 'd'), (24, "d"),
] ]
time_parts = [] time_parts = []
@ -153,7 +155,7 @@ def cache(func):
saved = {} saved = {}
@wraps(func) @wraps(func)
def newfunc(*args): def new_func(*args):
if args in saved: if args in saved:
return saved[args] return saved[args]
@ -161,7 +163,7 @@ def cache(func):
saved[args] = result saved[args] = result
return result return result
return newfunc return new_func
@hook(list) @hook(list)

View File

@ -1,7 +1,8 @@
from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from enum import Enum from enum import Enum
from src.tools.lists import Queue, Stack from .lists import Queue, Stack
from typing import Any, List, Union from typing import Any
class Rotate(Enum): class Rotate(Enum):
@ -12,18 +13,21 @@ class Rotate(Enum):
@dataclass @dataclass
class TreeNode: class TreeNode:
value: Any value: Any
parent: Union['TreeNode', None] = None parent: TreeNode | None = None
left: Union['TreeNode', None] = None left: TreeNode | None = None
right: Union['TreeNode', None] = None right: TreeNode | None = None
balance_factor: int = 0 balance_factor: int = 0
height: int = 0 height: int = 0
def __str__(self): def __str__(self):
return "TreeNode:(%s; bf: %d, d: %d, p: %s, l: %s, r: %s)" \ return "TreeNode:(%s; bf: %d, d: %d, p: %s, l: %s, r: %s)" % (
% (self.value, self.balance_factor, self.height, self.value,
self.parent.value if self.parent else "None", self.balance_factor,
self.left.value if self.left else "None", self.height,
self.right.value if self.right else "None") self.parent.value if self.parent else "None",
self.left.value if self.left else "None",
self.right.value if self.right else "None",
)
def __repr__(self): def __repr__(self):
return str(self) return str(self)
@ -31,8 +35,8 @@ class TreeNode:
class TrieNode: class TrieNode:
value: str value: str
parent: Union['TrieNode', None] = None parent: TrieNode | None = None
children: List['TrieNode'] = field(default_factory=list) children: list[TrieNode] = field(default_factory=list)
def update_node(node: TreeNode): def update_node(node: TreeNode):
@ -43,7 +47,7 @@ def update_node(node: TreeNode):
class BinaryTree: class BinaryTree:
root: Union[TreeNode, None] = None root: TreeNode | None = None
node_count: int = 0 node_count: int = 0
def _insert(self, node: TreeNode, parent: TreeNode, obj: Any) -> TreeNode: def _insert(self, node: TreeNode, parent: TreeNode, obj: Any) -> TreeNode:
@ -168,7 +172,7 @@ class BinaryTree:
return return
self.print(node.right, level + 1) self.print(node.right, level + 1)
print(" " * 4 * level + '->', node) print(" " * 4 * level + "->", node)
self.print(node.left, level + 1) self.print(node.left, level + 1)
def __contains__(self, obj: Any) -> bool: def __contains__(self, obj: Any) -> bool:

View File

@ -1,7 +0,0 @@
from typing import Union
Numeric = Union[int, float]
StrOrNone = Union[str, None]
IntOrNone = Union[int, None]
FloatOrNone = Union[float, None]
NumericOrNone = Union[Numeric, None]