Compare commits

...

3 Commits

Author SHA1 Message Date
99c4ef2ce6 code reformatting and cleanup
Some checks failed
Publish to PyPI / Publish to PyPI (push) Failing after 5s
2023-11-11 15:37:42 +01:00
bfcd27336d exclude pycharm ide env 2023-11-11 15:04:12 +01:00
455bd34d3b build/publish pipeline 2023-11-11 15:02:27 +01:00
27 changed files with 689 additions and 349 deletions

View File

@ -1,18 +0,0 @@
name: Gitea Actions Demo
run-name: ${{ gitea.actor }} is testing out Gitea Actions 🚀
on: [push]
jobs:
Explore-Gitea-Actions:
runs-on: ubuntu-latest
steps:
- run: echo "🎉 The job was automatically triggered by a ${{ gitea.event_name }} event."
- run: echo "🐧 This job is now running on a ${{ runner.os }} server hosted by Gitea!"
- run: echo "🔎 The name of your branch is ${{ gitea.ref }} and your repository is ${{ gitea.repository }}."
- name: Check out repository code
uses: actions/checkout@v3
- run: echo "💡 The ${{ gitea.repository }} repository has been cloned to the runner."
- run: echo "🖥️ The workflow is now ready to test your code on the runner."
- name: List files in the repository
run: |
ls ${{ gitea.workspace }}
- run: echo "🍏 This job's status is ${{ gitea.status }}."

View File

@ -0,0 +1,50 @@
name: Publish to PyPI
on:
push:
tags:
- "*"
jobs:
publish:
name: Publish to PyPI
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.11.2"
- name: Setup Build Environment
run: python -m pip install --upgrade pip build
- name: Build Package
run: python -m build
- name: Publish Package
uses: pypa/gh-action-pypi-publish@v1
with:
password: ${{ secrets.PYPI_API_TOKEN }}
#name: Gitea Actions Demo
#run-name: ${{ gitea.actor }} is testing out Gitea Actions 🚀
#on: [push]
#jobs:
# Explore-Gitea-Actions:
# runs-on: ubuntu-latest
# steps:
# - run: echo "🎉 The job was automatically triggered by a ${{ gitea.event_name }} event."
# - run: echo "🐧 This job is now running on a ${{ runner.os }} server hosted by Gitea!"
# - run: echo "🔎 The name of your branch is ${{ gitea.ref }} and your repository is ${{ gitea.repository }}."
# - name: Check out repository code
# uses: actions/checkout@v3
# - run: echo "💡 The ${{ gitea.repository }} repository has been cloned to the runner."
# - run: echo "🖥️ The workflow is now ready to test your code on the runner."
# - name: List files in the repository
# run: |
# ls ${{ gitea.workspace }}
# - run: echo "🍏 This job's status is ${{ gitea.status }}."
#

2
.gitignore vendored
View File

@ -158,5 +158,5 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.idea/

View File

@ -208,8 +208,8 @@ If you develop a new program, and you want it to be of the greatest possible use
To do so, attach the following notices to the program. It is safest to attach them to the start of each source file to most effectively state the exclusion of warranty; and each file should have at least the “copyright” line and a pointer to where the full notice is found.
py-tools
Copyright (C) 2023 DF_Public
shs-tools
Copyright (C) 2023 Stefan Harmuth
This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
@ -221,7 +221,7 @@ Also add information on how to contact you by electronic and paper mail.
If the program does terminal interaction, make it output a short notice like this when it starts in an interactive mode:
py-tools Copyright (C) 2023 DF_Public
shs-tools Copyright (C) 2023 Stefan Harmuth
This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
This is free software, and you are welcome to redistribute it under certain conditions; type `show c' for details.

View File

@ -1,2 +1,3 @@
# py-tools
# shs-tools
An assortment of helper functions, primarily developed for use in [Advent Of Code](https://adventofcode.com/)

View File

@ -1,56 +0,0 @@
from heapq import heappop, heappush
from tools.trees import MinHeap, BinarySearchTree
from tools.stopwatch import StopWatch
s = StopWatch()
h = []
for x in range(100_000):
heappush(h, x)
print("Heappush:", s.elapsed())
s.reset()
while h:
heappop(h)
print("Heappop:", s.elapsed())
s = StopWatch()
h = MinHeap()
for x in range(100_000):
h.add(x)
print("MinHeap.add():", s.elapsed())
s.reset()
while not h.empty():
h.pop()
print("MinHeap.pop():", s.elapsed())
s = StopWatch()
b = set()
for x in range(1_000_000):
b.add(x)
print("set.add():", s.elapsed())
s.reset()
for x in range(1_000_000):
_ = x in b
print("x in set:", s.elapsed())
s = StopWatch()
b = BinarySearchTree()
for x in range(1_000_000):
b.add(x)
print("AVL.add():", s.elapsed())
s.reset()
for x in range(1_000_000):
_ = x in b
print("x in AVL:", s.elapsed())
print("DFS/BFS Test")
b = BinarySearchTree()
for x in range(20):
b.add(x)
b.print()
print("DFS:")
for x in b.iter_depth_first():
print(x)
print("BFS:")
for x in b.iter_breadth_first():
print(x)

26
pyproject.toml Normal file
View File

@ -0,0 +1,26 @@
[build-system]
requires = ['setuptools', "wheel", "setuptools-git-versioning"]
build-backend = 'setuptools.build_meta'
[tool.setuptools-git-versioning]
enabled = true
[project]
dynamic = ['version']
name = "shs-tools"
authors = [
{ name="Stefan Harmuth", email="pennywise@drock.de" },
]
description = "An assortment of little helper functions"
readme = "README.md"
requires-python = ">=3.8"
classifiers = [
"Programming Language :: Python :: 3",
"Development Status :: 4 - Beta",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
"Operating System :: OS Independent",
]
[project.urls]
"homepage" = "https://git.domainforge.de/public/py-tools"
"Bug Tracker" = "https://git.domainforge.de/public/py-tools/issues"

View File

@ -1,12 +0,0 @@
from setuptools import setup
setup(
name='py-tools',
version='0.2',
packages=['tools'],
url='',
license='GPLv3',
author='Stefan Harmuth',
author_email='pennywise@drock.de',
description='Just some small tools to make life easier'
)

View File

@ -1,18 +1,18 @@
from __future__ import annotations
import os
import re
import subprocess
import requests
import time
import webbrowser
from bs4 import BeautifulSoup
from tools.datafiles import JSONFile
from tools.stopwatch import StopWatch
from typing import Any, Callable, List, Tuple, Type, Union
from .datafiles import JSONFile
from .stopwatch import StopWatch
from typing import Any, Callable, List, Tuple, Type
from .tools import get_script_dir
BASE_PATH = get_script_dir()
INPUTS_PATH = os.path.join(BASE_PATH, 'inputs')
INPUTS_PATH = os.path.join(BASE_PATH, "inputs")
class AOCDay:
@ -35,7 +35,13 @@ class AOCDay:
def part2(self) -> Any:
raise NotImplementedError()
def run_part(self, part: int, verbose: bool = False, measure_runtime: bool = False, timeit_number: int = 50):
def run_part(
self,
part: int,
verbose: bool = False,
measure_runtime: bool = False,
timeit_number: int = 50,
):
case_count = 0
for solution, input_file in self.inputs[part]:
self._current_test_solution, self._current_test_file = solution, input_file
@ -53,12 +59,16 @@ class AOCDay:
exec_time = stopwatch.avg_string(timeit_number)
if solution is None:
print_solution(self.day, part + 1, answer, solution, case_count, exec_time)
if answer not in {u"", b"", None, b"None", u"None", 0, '0'}:
print_solution(
self.day, part + 1, answer, solution, case_count, exec_time
)
if answer not in {"", b"", None, b"None", "None", 0, "0"}:
self._submit(part + 1, answer)
else:
if verbose or answer != solution:
print_solution(self.day, part + 1, answer, solution, case_count, exec_time)
print_solution(
self.day, part + 1, answer, solution, case_count, exec_time
)
if answer != solution:
return False
@ -67,7 +77,13 @@ class AOCDay:
if case_count == len(self.inputs[part]) and not verbose:
print_solution(self.day, part + 1, answer, exec_time=exec_time)
def run(self, parts: int = 3, verbose: bool = False, measure_runtime: bool = False, timeit_number: int = 50):
def run(
self,
parts: int = 3,
verbose: bool = False,
measure_runtime: bool = False,
timeit_number: int = 50,
):
if parts & 1:
self.run_part(0, verbose, measure_runtime, timeit_number)
if parts & 2:
@ -86,10 +102,13 @@ class AOCDay:
session_id = open(".session", "r").readlines()[0].strip()
response = requests.get(
"https://adventofcode.com/%d/day/%d/input" % (self.year, self.day),
cookies={'session': session_id}
cookies={"session": session_id},
)
if not response.ok:
print("FAILED to download input: (%s) %s" % (response.status_code, response.text))
print(
"FAILED to download input: (%s) %s"
% (response.status_code, response.text)
)
return
with open(filename, "wb") as f:
@ -107,22 +126,19 @@ class AOCDay:
answer_cache[str_day] = {}
if str_part not in answer_cache[str_day]:
answer_cache[str_day][str_part] = {
'wrong': [],
'correct': None
}
answer_cache[str_day][str_part] = {"wrong": [], "correct": None}
if answer in answer_cache[str_day][str_part]['wrong']:
if answer in answer_cache[str_day][str_part]["wrong"]:
print("Already tried %s. It was WRONG." % answer)
return
if answer_cache[str_day][str_part]['correct'] is not None:
if answer == answer_cache[str_day][str_part]['correct']:
if answer_cache[str_day][str_part]["correct"] is not None:
if answer == answer_cache[str_day][str_part]["correct"]:
print("Already submitted %s. It was CORRECT." % answer)
return
else:
print("Already submitted an answer, but another one")
print("CORRECT was: %s" % answer_cache[str_day][str_part]['correct'])
print("CORRECT was: %s" % answer_cache[str_day][str_part]["correct"])
print("Your answer: %s" % answer)
return
@ -130,21 +146,26 @@ class AOCDay:
session_id = open(".session", "r").readlines()[0].strip()
response = requests.post(
"https://adventofcode.com/%d/day/%d/answer" % (self.year, self.day),
cookies={'session': session_id},
data={'level': part, 'answer': answer}
cookies={"session": session_id},
data={"level": part, "answer": answer},
)
if not response.ok:
print("Failed to submit answer: (%s) %s" % (response.status_code, response.text))
print(
"Failed to submit answer: (%s) %s"
% (response.status_code, response.text)
)
soup = BeautifulSoup(response.text, "html.parser")
message = soup.article.text
if "That's the right answer" in message:
answer_cache[str_day][str_part]['correct'] = answer
answer_cache[str_day][str_part]["correct"] = answer
print("That's correct!")
webbrowser.open("https://adventofcode.com/%d/day/%d#part2" % (self.year, self.day))
webbrowser.open(
"https://adventofcode.com/%d/day/%d#part2" % (self.year, self.day)
)
elif "That's not the right answer" in message:
answer_cache[str_day][str_part]['wrong'].append(answer)
answer_cache[str_day][str_part]["wrong"].append(answer)
print("That's WRONG!")
elif "You gave an answer too recently" in message:
# WAIT and retry
@ -183,12 +204,14 @@ class AOCDay:
else:
return self.input.copy()
def getMultiLineInputAsArray(self, return_type: Type = None, join_char: str = None) -> List:
def getMultiLineInputAsArray(
self, return_type: Type = None, join_char: str = None
) -> List:
"""
get input for day x as 2d array, split by empty lines
"""
lines = self.input.copy()
lines.append('')
lines.append("")
return_array = []
line_array = []
@ -208,27 +231,49 @@ class AOCDay:
return return_array
def getInputAsArraySplit(self, split_char: str = ',', return_type: Union[Type, List[Type]] = None) -> List:
def getInputAsArraySplit(
self, split_char: str = ",", return_type: Type | List[Type] = None
) -> List:
"""
get input for day x with the lines split by split_char
if input has only one line, returns a 1d array with the values
if input has multiple lines, returns a 2d array (a[line][values])
"""
if len(self.input) == 1:
return split_line(line=self.input[0], split_char=split_char, return_type=return_type)
return split_line(
line=self.input[0], split_char=split_char, return_type=return_type
)
else:
return_array = []
for line in self.input:
return_array.append(split_line(line=line, split_char=split_char, return_type=return_type))
return_array.append(
split_line(
line=line, split_char=split_char, return_type=return_type
)
)
return return_array
def print_solution(day: int, part: int, solution: Any, test: Any = None, test_case: int = 0, exec_time: str = None):
def print_solution(
day: int,
part: int,
solution: Any,
test: Any = None,
test_case: int = 0,
exec_time: str = None,
):
if test is not None:
print(
"%s (TEST day%d/part%d/case%d): got '%s'; expected '%s'"
% ("OK" if test == solution else "FAIL", day, part, test_case, solution, test)
% (
"OK" if test == solution else "FAIL",
day,
part,
test_case,
solution,
test,
)
)
else:
print(
@ -244,13 +289,15 @@ def print_solution(day: int, part: int, solution: Any, test: Any = None, test_ca
print("Day %s, Part %s - Average run time: %s" % (day, part, exec_time))
def split_line(line, split_char: str = ',', return_type: Union[Type, List[Type]] = None):
def split_line(line, split_char: str = ",", return_type: Type | List[Type] = None):
if split_char:
line = line.split(split_char)
if return_type is None:
return line
elif isinstance(return_type, list):
return [return_type[x](i) if len(return_type) > x else i for x, i in enumerate(line)]
return [
return_type[x](i) if len(return_type) > x else i for x, i in enumerate(line)
]
else:
return [return_type(i) for i in line]

View File

@ -1,6 +1,7 @@
# Copyright (c) 2020-present Benjamin Soyka
# Original and Licence at https://github.com/bsoyka/advent-of-code-ocr
from __future__ import annotations
from collections.abc import Sequence
ALPHABET_6 = {
@ -32,7 +33,12 @@ def convert_6(input_text: str, *, fill_pixel: str = "#", empty_pixel: str = ".")
return _convert_6(prepared_array)
def convert_array_6(array: Sequence[Sequence[str | int]], *, fill_pixel: str | int = "#", empty_pixel: str | int = ".") -> str:
def convert_array_6(
array: Sequence[Sequence[str | int]],
*,
fill_pixel: str | int = "#",
empty_pixel: str | int = ".",
) -> str:
"""Convert a height 6 NumPy array or nested list to characters"""
prepared_array = [
[
@ -54,8 +60,7 @@ def _convert_6(array: list[list[str]]) -> str:
indices = [slice(start, start + 4) for start in range(0, cols, 5)]
result = [
ALPHABET_6["\n".join("".join(row[index]) for row in array)]
for index in indices
ALPHABET_6["\n".join("".join(row[index]) for row in array)] for index in indices
]
return "".join(result)

View File

@ -32,8 +32,12 @@ class Coordinate(tuple):
def is3D(self) -> bool:
return self.z is not None
def getDistanceTo(self, target: Coordinate, algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN,
includeDiagonals: bool = False) -> Union[int, float]:
def getDistanceTo(
self,
target: Coordinate,
algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN,
includeDiagonals: bool = False,
) -> Union[int, float]:
"""
Get distance to target Coordinate
@ -47,18 +51,30 @@ class Coordinate(tuple):
if self.z is None:
return sqrt(abs(self.x - target.x) ** 2 + abs(self.y - target.y) ** 2)
else:
return sqrt(abs(self.x - target.x) ** 2 + abs(self.y - target.y) ** 2 + abs(self.z - target.z) ** 2)
return sqrt(
abs(self.x - target.x) ** 2
+ abs(self.y - target.y) ** 2
+ abs(self.z - target.z) ** 2
)
elif algorithm == DistanceAlgorithm.CHEBYSHEV:
if self.z is None:
return max(abs(target.x - self.x), abs(target.y - self.y))
else:
return max(abs(target.x - self.x), abs(target.y - self.y), abs(target.z - self.z))
return max(
abs(target.x - self.x),
abs(target.y - self.y),
abs(target.z - self.z),
)
elif algorithm == DistanceAlgorithm.MANHATTAN:
if not includeDiagonals:
if self.z is None:
return abs(self.x - target.x) + abs(self.y - target.y)
else:
return abs(self.x - target.x) + abs(self.y - target.y) + abs(self.z - target.z)
return (
abs(self.x - target.x)
+ abs(self.y - target.y)
+ abs(self.z - target.z)
)
else:
dist = [abs(self.x - target.x), abs(self.y - target.y)]
if self.z is None:
@ -72,15 +88,35 @@ class Coordinate(tuple):
o_dist = max(dist) - min(dist)
return 1.7 * d_steps + o_dist + 1.4 * min(dist)
def inBoundaries(self, minX: int, minY: int, maxX: int, maxY: int, minZ: int = -inf, maxZ: int = inf) -> bool:
def inBoundaries(
self,
minX: int,
minY: int,
maxX: int,
maxY: int,
minZ: int = -inf,
maxZ: int = inf,
) -> bool:
if self.z is None:
return minX <= self.x <= maxX and minY <= self.y <= maxY
else:
return minX <= self.x <= maxX and minY <= self.y <= maxY and minZ <= self.z <= maxZ
return (
minX <= self.x <= maxX
and minY <= self.y <= maxY
and minZ <= self.z <= maxZ
)
def getCircle(self, radius: int = 1, algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN,
minX: int = -inf, minY: int = -inf, maxX: int = inf, maxY: int = inf,
minZ: int = -inf, maxZ: int = inf) -> list[Coordinate]:
def getCircle(
self,
radius: int = 1,
algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN,
minX: int = -inf,
minY: int = -inf,
maxX: int = inf,
maxY: int = inf,
minZ: int = -inf,
maxZ: int = inf,
) -> list[Coordinate]:
ret = []
if self.z is None: # mode 2D
for x in range(self.x - radius * 2, self.x + radius * 2 + 1):
@ -88,7 +124,11 @@ class Coordinate(tuple):
target = Coordinate(x, y)
if not target.inBoundaries(minX, minY, maxX, maxY):
continue
dist = round_half_up(self.getDistanceTo(target, algorithm=algorithm, includeDiagonals=False))
dist = round_half_up(
self.getDistanceTo(
target, algorithm=algorithm, includeDiagonals=False
)
)
if dist == radius:
ret.append(target)
@ -99,14 +139,26 @@ class Coordinate(tuple):
target = Coordinate(x, y)
if not target.inBoundaries(minX, minY, maxX, maxY, minZ, maxZ):
continue
dist = round_half_up(self.getDistanceTo(target, algorithm=algorithm, includeDiagonals=False))
dist = round_half_up(
self.getDistanceTo(
target, algorithm=algorithm, includeDiagonals=False
)
)
if dist == radius:
ret.append(target)
return ret
def getNeighbours(self, includeDiagonal: bool = True, minX: int = -inf, minY: int = -inf,
maxX: int = inf, maxY: int = inf, minZ: int = -inf, maxZ: int = inf) -> list[Coordinate]:
def getNeighbours(
self,
includeDiagonal: bool = True,
minX: int = -inf,
minY: int = -inf,
maxX: int = inf,
maxY: int = inf,
minZ: int = -inf,
maxZ: int = inf,
) -> list[Coordinate]:
"""
Get a list of neighbouring coordinates.
@ -121,7 +173,16 @@ class Coordinate(tuple):
"""
if self.z is None:
if includeDiagonal:
nb_list = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)]
nb_list = [
(-1, -1),
(-1, 0),
(-1, 1),
(0, -1),
(0, 1),
(1, -1),
(1, 0),
(1, 1),
]
else:
nb_list = [(-1, 0), (1, 0), (0, -1), (0, 1)]
@ -130,13 +191,29 @@ class Coordinate(tuple):
yield self.__class__(self.x + dx, self.y + dy)
else:
if includeDiagonal:
nb_list = [(x, y, z) for x in [-1, 0, 1] for y in [-1, 0, 1] for z in [-1, 0, 1]]
nb_list = [
(x, y, z)
for x in [-1, 0, 1]
for y in [-1, 0, 1]
for z in [-1, 0, 1]
]
nb_list.remove((0, 0, 0))
else:
nb_list = [(-1, 0, 0), (0, -1, 0), (1, 0, 0), (0, 1, 0), (0, 0, 1), (0, 0, -1)]
nb_list = [
(-1, 0, 0),
(0, -1, 0),
(1, 0, 0),
(0, 1, 0),
(0, 0, 1),
(0, 0, -1),
]
for dx, dy, dz in nb_list:
if minX <= self.x + dx <= maxX and minY <= self.y + dy <= maxY and minZ <= self.z + dz <= maxZ:
if (
minX <= self.x + dx <= maxX
and minY <= self.y + dy <= maxY
and minZ <= self.z + dz <= maxZ
):
yield self.__class__(self.x + dx, self.y + dy, self.z + dz)
def getAngleTo(self, target: Coordinate, normalized: bool = False) -> float:
@ -162,16 +239,20 @@ class Coordinate(tuple):
steps = gcd(diff.x, diff.y)
step_x = diff.x // steps
step_y = diff.y // steps
return [self.__class__(self.x + step_x * i, self.y + step_y * i) for i in range(steps + 1)]
return [
self.__class__(self.x + step_x * i, self.y + step_y * i)
for i in range(steps + 1)
]
else:
steps = gcd(diff.x, diff.y, diff.z)
step_x = diff.x // steps
step_y = diff.y // steps
step_z = diff.z // steps
return [
self.__class__(self.x + step_x * i, self.y + step_y * i, self.z + step_z * i)
for i
in range(steps + 1)
self.__class__(
self.x + step_x * i, self.y + step_y * i, self.z + step_z * i
)
for i in range(steps + 1)
]
def reverse(self) -> Coordinate:
@ -241,13 +322,29 @@ class Coordinate(tuple):
if self.z is None:
return "%s(x=%d, y=%d)" % (self.__class__.__name__, self.x, self.y)
else:
return "%s(x=%d, y=%d, z=%d)" % (self.__class__.__name__, self.x, self.y, self.z)
return "%s(x=%d, y=%d, z=%d)" % (
self.__class__.__name__,
self.x,
self.y,
self.z,
)
@classmethod
def generate(cls, from_x: int, to_x: int, from_y: int, to_y: int,
from_z: int = None, to_z: int = None) -> List[Coordinate]:
def generate(
cls,
from_x: int,
to_x: int,
from_y: int,
to_y: int,
from_z: int = None,
to_z: int = None,
) -> List[Coordinate]:
if from_z is None or to_z is None:
return [cls(x, y) for x in range(from_x, to_x + 1) for y in range(from_y, to_y + 1)]
return [
cls(x, y)
for x in range(from_x, to_x + 1)
for y in range(from_y, to_y + 1)
]
else:
return [
cls(x, y, z)
@ -267,34 +364,50 @@ class HexCoordinate(Coordinate):
z x y
-z +x -y
"""
neighbour_vectors = {
'ne': Coordinate(-1, 0, 1),
'nw': Coordinate(-1, 1, 0),
'e': Coordinate(0, -1, 1),
'w': Coordinate(0, 1, -1),
'sw': Coordinate(1, 0, -1),
'se': Coordinate(1, -1, 0),
"ne": Coordinate(-1, 0, 1),
"nw": Coordinate(-1, 1, 0),
"e": Coordinate(0, -1, 1),
"w": Coordinate(0, 1, -1),
"sw": Coordinate(1, 0, -1),
"se": Coordinate(1, -1, 0),
}
def __init__(self, x: int, y: int, z: int):
assert (x + y + z) == 0
super().__init__(x, y, z)
super(HexCoordinate, self).__init__(x, y, z)
def get_length(self) -> int:
return (abs(self.x) + abs(self.y) + abs(self.z)) // 2
def getDistanceTo(self, target: Coordinate, algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN,
includeDiagonals: bool = True) -> Union[int, float]:
def getDistanceTo(
self,
target: Coordinate,
algorithm: DistanceAlgorithm = DistanceAlgorithm.EUCLIDEAN,
includeDiagonals: bool = True,
) -> Union[int, float]:
# includeDiagonals makes no sense in a hex grid, it's just here for signature reasons
if algorithm == DistanceAlgorithm.MANHATTAN:
return (self - target).get_length()
def getNeighbours(self, includeDiagonal: bool = True, minX: int = -inf, minY: int = -inf,
maxX: int = inf, maxY: int = inf, minZ: int = -inf, maxZ: int = inf) -> list[Coordinate]:
def getNeighbours(
self,
includeDiagonal: bool = True,
minX: int = -inf,
minY: int = -inf,
maxX: int = inf,
maxY: int = inf,
minZ: int = -inf,
maxZ: int = inf,
) -> list[Coordinate]:
# includeDiagonals makes no sense in a hex grid, it's just here for signature reasons
return [
self + x for x in self.neighbour_vectors.values()
if minX <= (self + x).x <= maxX and minY <= (self + x).y <= maxY and minZ <= (self + x).z <= maxZ
self + x
for x in self.neighbour_vectors.values()
if minX <= (self + x).x <= maxX
and minY <= (self + x).y <= maxY
and minZ <= (self + x).z <= maxZ
]
@ -311,17 +424,18 @@ class HexCoordinateF(HexCoordinate):
x y
+x -y
"""
neighbour_vectors = {
'ne': Coordinate(-1, 0, 1),
'nw': Coordinate(0, 1, -1),
'n': Coordinate(-1, 1, 0),
's': Coordinate(1, -1, 0),
'sw': Coordinate(1, 0, -1),
'se': Coordinate(0, -1, 1),
"ne": Coordinate(-1, 0, 1),
"nw": Coordinate(0, 1, -1),
"n": Coordinate(-1, 1, 0),
"s": Coordinate(1, -1, 0),
"sw": Coordinate(1, 0, -1),
"se": Coordinate(0, -1, 1),
}
def __init__(self, x: int, y: int, z: int):
super().__init__(x, y, z)
super(HexCoordinateF, self).__init__(x, y, z)
class Shape:
@ -339,7 +453,9 @@ class Shape:
def __len__(self):
if not self.mode_3d:
return (self.bottom_right.x - self.top_left.x + 1) * (self.bottom_right.y - self.top_left.y + 1)
return (self.bottom_right.x - self.top_left.x + 1) * (
self.bottom_right.y - self.top_left.y + 1
)
else:
return (
(self.bottom_right.x - self.top_left.x + 1)
@ -356,23 +472,43 @@ class Shape:
if not self.mode_3d:
intersect_top_left = Coordinate(
self.top_left.x if self.top_left.x > other.top_left.x else other.top_left.x,
self.top_left.y if self.top_left.y > other.top_left.y else other.top_left.y,
self.top_left.x
if self.top_left.x > other.top_left.x
else other.top_left.x,
self.top_left.y
if self.top_left.y > other.top_left.y
else other.top_left.y,
)
intersect_bottom_right = Coordinate(
self.bottom_right.x if self.bottom_right.x < other.bottom_right.x else other.bottom_right.x,
self.bottom_right.y if self.bottom_right.y < other.bottom_right.y else other.bottom_right.y,
self.bottom_right.x
if self.bottom_right.x < other.bottom_right.x
else other.bottom_right.x,
self.bottom_right.y
if self.bottom_right.y < other.bottom_right.y
else other.bottom_right.y,
)
else:
intersect_top_left = Coordinate(
self.top_left.x if self.top_left.x > other.top_left.x else other.top_left.x,
self.top_left.y if self.top_left.y > other.top_left.y else other.top_left.y,
self.top_left.z if self.top_left.z > other.top_left.z else other.top_left.z,
self.top_left.x
if self.top_left.x > other.top_left.x
else other.top_left.x,
self.top_left.y
if self.top_left.y > other.top_left.y
else other.top_left.y,
self.top_left.z
if self.top_left.z > other.top_left.z
else other.top_left.z,
)
intersect_bottom_right = Coordinate(
self.bottom_right.x if self.bottom_right.x < other.bottom_right.x else other.bottom_right.x,
self.bottom_right.y if self.bottom_right.y < other.bottom_right.y else other.bottom_right.y,
self.bottom_right.z if self.bottom_right.z < other.bottom_right.z else other.bottom_right.z,
self.bottom_right.x
if self.bottom_right.x < other.bottom_right.x
else other.bottom_right.x,
self.bottom_right.y
if self.bottom_right.y < other.bottom_right.y
else other.bottom_right.y,
self.bottom_right.z
if self.bottom_right.z < other.bottom_right.z
else other.bottom_right.z,
)
if intersect_top_left <= intersect_bottom_right:
@ -385,15 +521,23 @@ class Shape:
return self.intersection(other)
def __str__(self):
return "%s(%s -> %s)" % (self.__class__.__name__, self.top_left, self.bottom_right)
return "%s(%s -> %s)" % (
self.__class__.__name__,
self.top_left,
self.bottom_right,
)
def __repr__(self):
return "%s(%s, %s)" % (self.__class__.__name__, self.top_left, self.bottom_right)
return "%s(%s, %s)" % (
self.__class__.__name__,
self.top_left,
self.bottom_right,
)
class Square(Shape):
def __init__(self, top_left, bottom_right):
super().__init__(top_left, bottom_right)
super(Square, self).__init__(top_left, bottom_right)
self.mode_3d = False
@ -401,4 +545,4 @@ class Cube(Shape):
def __init__(self, top_left, bottom_right):
if top_left.z is None or bottom_right.z is None:
raise ValueError("Both Coordinates need to be 3D")
super().__init__(top_left, bottom_right)
super(Cube, self).__init__(top_left, bottom_right)

View File

@ -20,6 +20,7 @@ from signal import SIGTERM, signal
DEV_NULL = "/dev/null"
class Daemon:
"""
A generic daemon class.
@ -27,7 +28,9 @@ class Daemon:
Usage: subclass the Daemon class and override the run() method
"""
def __init__(self, pidfile='_.pid', stdin=DEV_NULL, stdout=DEV_NULL, stderr=DEV_NULL):
def __init__(
self, pidfile="_.pid", stdin=DEV_NULL, stdout=DEV_NULL, stderr=DEV_NULL
):
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
@ -65,9 +68,9 @@ class Daemon:
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = open(os.devnull, 'r')
so = open(os.devnull, 'a+')
se = open(os.devnull, 'a+')
si = open(os.devnull, "r")
so = open(os.devnull, "a+")
se = open(os.devnull, "a+")
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
@ -77,7 +80,7 @@ class Daemon:
# write pidfile
pid = str(os.getpid())
open(self.pidfile, 'w+').write("%s\n" % pid)
open(self.pidfile, "w+").write("%s\n" % pid)
def onstop(self):
self.quit()
@ -89,7 +92,7 @@ class Daemon:
"""
# Check for a pidfile to see if the daemon already runs
try:
pf = open(self.pidfile, 'r')
pf = open(self.pidfile, "r")
pid = int(pf.read().strip())
pf.close()
except IOError:
@ -110,7 +113,7 @@ class Daemon:
"""
# Get the pid from the pidfile
try:
pf = open(self.pidfile, 'r')
pf = open(self.pidfile, "r")
pid = int(pf.read().strip())
pf.close()
except IOError:

View File

@ -2,11 +2,10 @@ from __future__ import annotations
from collections import deque
from .aoc_ocr import convert_array_6
from .coordinate import Coordinate, DistanceAlgorithm, Shape
from .types import Numeric
from enum import Enum
from heapq import heappop, heappush
from math import inf
from typing import Any, Dict, List, Union
from typing import Any, Dict, List
OFF = False
ON = True
@ -62,7 +61,14 @@ class Grid:
self.maxZ = pos.z if pos.z > self.maxZ else self.maxZ
def recalcBoundaries(self) -> None:
self.minX, self.maxX, self.minY, self.maxY, self.minZ, self.maxZ = None, None, None, None, None, None
self.minX, self.maxX, self.minY, self.maxY, self.minZ, self.maxZ = (
None,
None,
None,
None,
None,
None,
)
for c in self.__grid:
self.__trackBoundaries(c)
@ -120,25 +126,29 @@ class Grid:
return value
def move(self, pos: Coordinate, vec: Coordinate,):
def move(
self,
pos: Coordinate,
vec: Coordinate,
):
target = pos + vec
self.set(target, self.get(pos))
if pos in self.__grid:
del self.__grid[pos]
def add(self, pos: Coordinate, value: Numeric = 1) -> Numeric:
def add(self, pos: Coordinate, value: int | float = 1) -> int | float:
return self.set(pos, self.get(pos) + value)
def sub(self, pos: Coordinate, value: Numeric = 1) -> Numeric:
def sub(self, pos: Coordinate, value: int | float = 1) -> int | float:
return self.set(pos, self.get(pos) - value)
def mul(self, pos: Coordinate, value: Numeric = 1) -> Numeric:
def mul(self, pos: Coordinate, value: int | float = 1) -> int | float:
return self.set(pos, self.get(pos) * value)
def div(self, pos: Coordinate, value: Numeric = 1) -> Numeric:
def div(self, pos: Coordinate, value: int | float = 1) -> int | float:
return self.set(pos, self.get(pos) / value)
def add_shape(self, shape: Shape, value: Numeric = 1) -> None:
def add_shape(self, shape: Shape, value: int | float = 1) -> None:
for x in range(shape.top_left.x, shape.bottom_right.x + 1):
for y in range(shape.top_left.y, shape.bottom_right.y + 1):
if not shape.mode_3d:
@ -186,15 +196,21 @@ class Grid:
def isWithinBoundaries(self, pos: Coordinate) -> bool:
if self.mode3D:
return self.minX <= pos.x <= self.maxX and self.minY <= pos.y <= self.maxY \
return (
self.minX <= pos.x <= self.maxX
and self.minY <= pos.y <= self.maxY
and self.minZ <= pos.z <= self.maxZ
)
else:
return self.minX <= pos.x <= self.maxX and self.minY <= pos.y <= self.maxY
def getActiveCells(self, x: int = None, y: int = None, z: int = None) -> List[Coordinate]:
def getActiveCells(
self, x: int = None, y: int = None, z: int = None
) -> List[Coordinate]:
if x is not None or y is not None or z is not None:
return [
c for c in self.__grid.keys()
c
for c in self.__grid.keys()
if (c.x == x if x is not None else True)
and (c.y == y if y is not None else True)
and (c.z == z if z is not None else True)
@ -202,8 +218,12 @@ class Grid:
else:
return list(self.__grid.keys())
def getActiveRegion(self, start: Coordinate, includeDiagonal: bool = False, ignore: List[Coordinate] = None) \
-> List[Coordinate]:
def getActiveRegion(
self,
start: Coordinate,
includeDiagonal: bool = False,
ignore: List[Coordinate] = None,
) -> List[Coordinate]:
if not self.get(start):
return []
if ignore is None:
@ -218,7 +238,7 @@ class Grid:
def values(self):
return self.__grid.values()
def getSum(self, includeNegative: bool = True) -> Numeric:
def getSum(self, includeNegative: bool = True) -> int | float:
if not self.mode3D:
return sum(
self.get(Coordinate(x, y))
@ -235,18 +255,31 @@ class Grid:
if includeNegative or self.get(Coordinate(x, y)) >= 0
)
def getNeighboursOf(self, pos: Coordinate, includeDefault: bool = False, includeDiagonal: bool = True) \
-> List[Coordinate]:
def getNeighboursOf(
self,
pos: Coordinate,
includeDefault: bool = False,
includeDiagonal: bool = True,
) -> List[Coordinate]:
neighbours = pos.getNeighbours(
includeDiagonal=includeDiagonal,
minX=self.minX, minY=self.minY, minZ=self.minZ,
maxX=self.maxX, maxY=self.maxY, maxZ=self.maxZ
minX=self.minX,
minY=self.minY,
minZ=self.minZ,
maxX=self.maxX,
maxY=self.maxY,
maxZ=self.maxZ,
)
for x in neighbours:
if includeDefault or x in self.__grid:
yield x
def getNeighbourSum(self, pos: Coordinate, includeNegative: bool = True, includeDiagonal: bool = True) -> Numeric:
def getNeighbourSum(
self,
pos: Coordinate,
includeNegative: bool = True,
includeDiagonal: bool = True,
) -> int | float:
neighbour_sum = 0
for neighbour in self.getNeighboursOf(pos, includeDefault=includeDiagonal):
if includeNegative or self.get(neighbour) > 0:
@ -338,8 +371,14 @@ class Grid:
else:
self.shift(0 - self.minX, 0 - self.minY)
def getPath_BFS(self, pos_from: Coordinate, pos_to: Coordinate, includeDiagonal: bool, walls: List[Any] = None,
stop_at_first: Any = None) -> Union[None, List[Coordinate]]:
def getPath_BFS(
self,
pos_from: Coordinate,
pos_to: Coordinate,
includeDiagonal: bool,
walls: List[Any] = None,
stop_at_first: Any = None,
) -> List[Coordinate] | None:
queue = deque()
came_from = {pos_from: None}
queue.append(pos_from)
@ -349,12 +388,17 @@ class Grid:
while queue:
current = queue.popleft()
found_end = False
for c in self.getNeighboursOf(current, includeDiagonal=includeDiagonal,
includeDefault=self.__default not in walls):
for c in self.getNeighboursOf(
current,
includeDiagonal=includeDiagonal,
includeDefault=self.__default not in walls,
):
if c in came_from and self.get(c) in walls:
continue
came_from[c] = current
if c == pos_to or (stop_at_first is not None and self.get(c) == stop_at_first):
if c == pos_to or (
stop_at_first is not None and self.get(c) == stop_at_first
):
pos_to = c
found_end = True
break
@ -372,8 +416,14 @@ class Grid:
return ret
def getPath(self, pos_from: Coordinate, pos_to: Coordinate, includeDiagonal: bool, walls: List[Any] = None,
weighted: bool = False) -> Union[None, List[Coordinate]]:
def getPath(
self,
pos_from: Coordinate,
pos_to: Coordinate,
includeDiagonal: bool,
walls: List[Any] = None,
weighted: bool = False,
) -> List[Coordinate] | None:
f_costs = []
if walls is None:
walls = [self.__default]
@ -395,7 +445,9 @@ class Grid:
if currentCoord == pos_to:
break
for neighbour in self.getNeighboursOf(currentCoord, includeDefault=True, includeDiagonal=includeDiagonal):
for neighbour in self.getNeighboursOf(
currentCoord, includeDefault=True, includeDiagonal=includeDiagonal
):
if self.get(neighbour) in walls or neighbour in closedNodes:
continue
@ -404,13 +456,19 @@ class Grid:
elif not includeDiagonal:
neighbourDist = 1
else:
neighbourDist = currentCoord.getDistanceTo(neighbour, DistanceAlgorithm.MANHATTAN, includeDiagonal)
neighbourDist = currentCoord.getDistanceTo(
neighbour, DistanceAlgorithm.MANHATTAN, includeDiagonal
)
targetDist = neighbour.getDistanceTo(pos_to)
f_cost = targetDist + neighbourDist + currentNode[1]
if neighbour not in openNodes or f_cost < openNodes[neighbour][0]:
openNodes[neighbour] = (f_cost, currentNode[1] + neighbourDist, currentCoord)
openNodes[neighbour] = (
f_cost,
currentNode[1] + neighbourDist,
currentCoord,
)
heappush(f_costs, (f_cost, neighbour))
if pos_to not in closedNodes:
@ -424,18 +482,33 @@ class Grid:
return pathCoords
def sub_grid(self, from_x: int, from_y: int, to_x: int, to_y: int, from_z: int = None, to_z: int = None) -> 'Grid':
def sub_grid(
self,
from_x: int,
from_y: int,
to_x: int,
to_y: int,
from_z: int = None,
to_z: int = None,
) -> "Grid":
if self.mode3D and (from_z is None or to_z is None):
raise ValueError("sub_grid() on mode3d Grids requires from_z and to_z to be set")
raise ValueError(
"sub_grid() on mode3d Grids requires from_z and to_z to be set"
)
count_x, count_y, count_z = 0, 0, 0
new_grid = Grid(self.__default)
for x in range(from_x, to_x + 1):
for y in range(from_y, to_y + 1):
if not self.mode3D:
new_grid.set(Coordinate(count_x, count_y), self.get(Coordinate(x, y)))
new_grid.set(
Coordinate(count_x, count_y), self.get(Coordinate(x, y))
)
else:
for z in range(from_z, to_z + 1):
new_grid.set(Coordinate(count_x, count_y, count_z), self.get(Coordinate(x, y, z)))
new_grid.set(
Coordinate(count_x, count_y, count_z),
self.get(Coordinate(x, y, z)),
)
count_z += 1
count_z = 0
@ -454,7 +527,16 @@ class Grid:
put_y = y
put_x += 1
def print(self, spacer: str = "", true_char: str = '#', false_char: str = " ", translate: dict = None, mark: list = None, z_level: int = None, bool_mode: bool = False):
def print(
self,
spacer: str = "",
true_char: str = "#",
false_char: str = " ",
translate: dict = None,
mark: list = None,
z_level: int = None,
bool_mode: bool = False,
):
if translate is None:
translate = {}
@ -486,10 +568,16 @@ class Grid:
def get_aoc_ocr_string(self, x_shift: int = 0, y_shift: int = 0):
return convert_array_6(
[['#' if self.get(Coordinate(x + x_shift, y + y_shift)) else '.' for x in self.rangeX()] for y in
self.rangeY()])
[
[
"#" if self.get(Coordinate(x + x_shift, y + y_shift)) else "."
for x in self.rangeX()
]
for y in self.rangeY()
]
)
def __str__(self, true_char: str = '#', false_char: str = "."):
def __str__(self, true_char: str = "#", false_char: str = "."):
return "/".join(
"".join(
true_char if self.get(Coordinate(x, y)) else false_char
@ -499,10 +587,22 @@ class Grid:
)
@classmethod
def from_str(cls, grid_string: str, default: Any = False, true_char: str = '#', true_value: Any = True, translate: dict = None, mode3d: bool = False) -> 'Grid':
def from_str(
cls,
grid_string: str,
default: Any = False,
true_char: str = "#",
true_value: Any = True,
translate: dict = None,
mode3d: bool = False,
) -> "Grid":
if translate is None:
translate = {}
if true_char is not None and True not in translate.values() and true_char not in translate:
if (
true_char is not None
and True not in translate.values()
and true_char not in translate
):
translate[true_char] = true_value if true_value is not None else True
ret = cls(default=default)

View File

@ -1,5 +1,4 @@
import math
from .tools import cache
def factorial(n: int) -> int:

View File

@ -1,11 +1,10 @@
from __future__ import annotations
from time import sleep
from .schedule import Scheduler
from .simplesocket import ClientSocket
from .types import StrOrNone
from datetime import timedelta
from enum import Enum
from typing import Callable, Dict, List, Union
from typing import Callable
class ServerMessage(str, Enum):
@ -191,7 +190,7 @@ class User:
class Channel:
name: str
topic: str
userlist: Dict[str, User]
userlist: dict[str, User]
def __init__(self, name: str):
self.name = name
@ -208,22 +207,29 @@ class Channel:
class Client:
__function_register: Dict[str, List[Callable]]
__function_register: dict[str, list[Callable]]
__server_socket: ClientSocket
__server_caps: Dict[str, Union[str, int]]
__userlist: Dict[str, User]
__channellist: Dict[str, Channel]
__my_user: StrOrNone
__server_caps: dict[str, str | int]
__userlist: dict[str, User]
__channellist: dict[str, Channel]
__my_user: str | None
def __init__(self, server: str, port: int, nick: str, username: str, realname: str = "Python Bot"):
def __init__(
self,
server: str,
port: int,
nick: str,
username: str,
realname: str = "Python Bot",
):
self.__userlist = {}
self.__channellist = {}
self.__server_socket = ClientSocket(server, port)
self.__server_socket.sendline("USER %s ignore ignore :%s" % (username, realname))
self.__server_socket.sendline(
"USER %s ignore ignore :%s" % (username, realname)
)
self.__server_socket.sendline("NICK %s" % nick)
self.__server_caps = {
'MAXLEN': 255
}
self.__server_caps = {"MAXLEN": 255}
self.__function_register = {
ServerMessage.RPL_WELCOME: [self.on_rpl_welcome],
ServerMessage.RPL_TOPIC: [self.on_rpl_topic],
@ -261,7 +267,10 @@ class Client:
if "!" in msg_from and msg_from not in self.__userlist:
self.__userlist[msg_from] = User(msg_from)
if self.__userlist[msg_from].nickname == self.__userlist[self.__my_user].nickname:
if (
self.__userlist[msg_from].nickname
== self.__userlist[self.__my_user].nickname
):
del self.__userlist[self.__my_user]
self.__my_user = msg_from
@ -301,7 +310,9 @@ class Client:
def on_nick(self, msg_from: str, msg_to: str, message: str):
self.__userlist[msg_from].nick(msg_to)
self.__userlist[self.__userlist[msg_from].identifier] = self.__userlist[msg_from]
self.__userlist[self.__userlist[msg_from].identifier] = self.__userlist[
msg_from
]
del self.__userlist[msg_from]
def on_join(self, msg_from: str, msg_to: str, message: str):
@ -349,7 +360,7 @@ class Client:
self.receive()
self.__server_socket.close()
def getUser(self, user: str = None) -> Union[User, None]:
def getUser(self, user: str = None) -> User | None:
if user is None:
return self.__userlist[self.__my_user]
elif user in self.__userlist:
@ -357,21 +368,28 @@ class Client:
else:
return None
def getUserList(self) -> List[User]:
def getUserList(self) -> list[User]:
return list(self.__userlist.values())
def getChannel(self, channel: str) -> Union[Channel, None]:
def getChannel(self, channel: str) -> Channel | None:
if channel in self.__channellist:
return self.__channellist[channel]
else:
return None
def getChannelList(self) -> List[Channel]:
def getChannelList(self) -> list[Channel]:
return list(self.__channellist.values())
class IrcBot(Client):
def __init__(self, server: str, port: int, nick: str, username: str, realname: str = "Python Bot"):
def __init__(
self,
server: str,
port: int,
nick: str,
username: str,
realname: str = "Python Bot",
):
super().__init__(server, port, nick, username, realname)
self._scheduler = Scheduler()
self._channel_commands = {}
@ -397,8 +415,13 @@ class IrcBot(Client):
if not message:
return
command = message.split()[0]
if msg_to in self._channel_commands and command in self._channel_commands[msg_to]:
self._channel_commands[msg_to][command](msg_from, " ".join(message.split()[1:]))
if (
msg_to in self._channel_commands
and command in self._channel_commands[msg_to]
):
self._channel_commands[msg_to][command](
msg_from, " ".join(message.split()[1:])
)
if msg_to == self.getUser().nickname and command in self._privmsg_commands:
self._privmsg_commands[command](msg_from, " ".join(message.split()[1:]))

View File

@ -1,17 +1,18 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Union
from typing import Any
@dataclass
class Node:
value: Any
next: 'Node' = None
prev: 'Node' = None
next: "Node" = None
prev: "Node" = None
class LinkedList:
_head: Union[Node, None] = None
_tail: Union[Node, None] = None
_head: Node | None = None
_tail: Node | None = None
size: int = 0
def _get_head(self):
@ -128,7 +129,7 @@ class LinkedList:
return x.value == obj
def __add__(self, other: 'LinkedList') -> 'LinkedList':
def __add__(self, other: "LinkedList") -> "LinkedList":
self._tail.next = other.head
other.head.prev = self._tail
self._tail = other.tail

View File

@ -1,9 +1,9 @@
from __future__ import annotations
import math
from decimal import Decimal, ROUND_HALF_UP
from .types import Numeric
def round_half_up(number: Numeric) -> int:
def round_half_up(number: int | float) -> int:
"""pythons round() rounds .5 to the *even* number; 0.5 == 0"""
return int(Decimal(number).to_integral(ROUND_HALF_UP))

32
src/tools/schedule.py Normal file
View File

@ -0,0 +1,32 @@
import datetime
from typing import Callable, Any
class Scheduler:
def __init__(self):
self.jobs = {}
def schedule(
self,
name: str,
every: datetime.timedelta,
func: Callable[..., None],
*args: list[Any],
):
self.jobs[name] = {
"call": func,
"args": args,
"timedelta": every,
"runat": (datetime.datetime.utcnow() + every),
}
def unschedule(self, name: str):
if name in self.jobs:
del self.jobs[name]
def run_pending(self):
now = datetime.datetime.utcnow()
for job in self.jobs.values():
if job["runat"] <= now:
job["runat"] += job["timedelta"]
job["call"](*job["args"])

View File

@ -1,16 +1,19 @@
from __future__ import annotations
import errno
import socket
import threading
import time
from typing import Callable, Union
from typing import Callable
class Socket:
def __init__(self, address_family: socket.AddressFamily, socket_kind: socket.SocketKind):
def __init__(
self, address_family: socket.AddressFamily, socket_kind: socket.SocketKind
):
self.socket = socket.socket(family=address_family, type=socket_kind)
self.__recv_buffer = b""
def send(self, buffer: Union[str, bytes]) -> int:
def send(self, buffer: str | bytes) -> int:
if isinstance(buffer, str):
buffer = buffer.encode("UTF-8")
@ -40,7 +43,7 @@ class Socket:
self.send(line)
def recvline(self, timeout: int = 0) -> Union[str, None]:
def recvline(self, timeout: int = 0) -> str | None:
"""
Receive exactly one text line (delimiter: newline "\n" or "\r\n") from the socket.
@ -59,7 +62,9 @@ class Socket:
return None
else:
line = self.__recv_buffer[: self.__recv_buffer.index(b"\n")]
self.__recv_buffer = self.__recv_buffer[self.__recv_buffer.index(b"\n") + 1:]
self.__recv_buffer = self.__recv_buffer[
self.__recv_buffer.index(b"\n") + 1 :
]
return line.decode("UTF-8")
def close(self):
@ -67,8 +72,13 @@ class Socket:
class ClientSocket(Socket):
def __init__(self, addr: str, port: int, address_family: socket.AddressFamily = socket.AF_INET,
socket_kind: socket.SocketKind = socket.SOCK_STREAM):
def __init__(
self,
addr: str,
port: int,
address_family: socket.AddressFamily = socket.AF_INET,
socket_kind: socket.SocketKind = socket.SOCK_STREAM,
):
super().__init__(address_family, socket_kind)
self.socket.connect((addr, port))
self.laddr, self.lport = self.socket.getsockname()
@ -84,18 +94,28 @@ class RemoteSocket(Socket):
class ServerSocket(Socket):
def __init__(self, addr: str, port: int, address_family: socket.AddressFamily = socket.AF_INET,
socket_kind: socket.SocketKind = socket.SOCK_STREAM):
def __init__(
self,
addr: str,
port: int,
address_family: socket.AddressFamily = socket.AF_INET,
socket_kind: socket.SocketKind = socket.SOCK_STREAM,
):
super().__init__(address_family, socket_kind)
self.socket.bind((addr, port))
self.socket.listen(5)
self.laddr, self.lport = self.socket.getsockname()
self.raddr, self.rport = None, None # Transport endpoint is not connected. Surprisingly.
self.raddr, self.rport = (
None,
None,
) # Transport endpoint is not connected. Surprisingly.
def _connection_acceptor(self, target: Callable[..., None]):
while 1:
(client_socket, client_address) = self.socket.accept()
connection_handler_thread = threading.Thread(target=target, args=(RemoteSocket(client_socket), ))
connection_handler_thread = threading.Thread(
target=target, args=(RemoteSocket(client_socket),)
)
connection_handler_thread.start()
def accept(self, target: Callable[..., None], blocking: bool = True):
@ -103,6 +123,8 @@ class ServerSocket(Socket):
self._connection_acceptor(target)
return None
else:
connection_accept_thread = threading.Thread(target=self._connection_acceptor, kwargs={'target': target})
connection_accept_thread = threading.Thread(
target=self._connection_acceptor, kwargs={"target": target}
)
connection_accept_thread.start()
return connection_accept_thread

View File

@ -1,11 +1,11 @@
from __future__ import annotations
from time import perf_counter_ns
from .tools import human_readable_time_from_ns
from .types import IntOrNone
class StopWatch:
started: IntOrNone = None
stopped: IntOrNone = None
started: int | None = None
stopped: int | None = None
def __init__(self, auto_start=True):
if auto_start:

View File

@ -31,8 +31,8 @@ class Dict(dict):
def __init__(self, *args, **kwargs):
if "default" in kwargs:
self.__default = kwargs['default']
del kwargs['default']
self.__default = kwargs["default"]
del kwargs["default"]
else:
self.__default = None
@ -46,7 +46,9 @@ class Dict(dict):
self[k] = Dict(self[k])
elif isinstance(self[k], list):
for i in range(len(self[k])):
if isinstance(self[k][i], dict) and not isinstance(self[k][i], Dict):
if isinstance(self[k][i], dict) and not isinstance(
self[k][i], Dict
):
self[k][i] = Dict(self[k][i])
def update(self, other: dict, **kwargs):
@ -75,11 +77,11 @@ class Dict(dict):
def get_script_dir(follow_symlinks: bool = True) -> str:
"""return path of the executed script"""
if getattr(sys, 'frozen', False):
if getattr(sys, "frozen", False):
path = os.path.abspath(sys.executable)
else:
if '__main__' in sys.modules and hasattr(sys.modules['__main__'], '__file__'):
path = sys.modules['__main__'].__file__
if "__main__" in sys.modules and hasattr(sys.modules["__main__"], "__file__"):
path = sys.modules["__main__"].__file__
else:
path = inspect.getabsfile(get_script_dir)
@ -132,13 +134,13 @@ def human_readable_time_from_delta(delta: datetime.timedelta) -> str:
def human_readable_time_from_ns(ns: int) -> str:
units = [
(1000, 'ns'),
(1000, 'µs'),
(1000, 'ms'),
(60, 's'),
(60, 'm'),
(60, 'h'),
(24, 'd'),
(1000, "ns"),
(1000, "µs"),
(1000, "ms"),
(60, "s"),
(60, "m"),
(60, "h"),
(24, "d"),
]
time_parts = []
@ -153,7 +155,7 @@ def cache(func):
saved = {}
@wraps(func)
def newfunc(*args):
def new_func(*args):
if args in saved:
return saved[args]
@ -161,7 +163,7 @@ def cache(func):
saved[args] = result
return result
return newfunc
return new_func
@hook(list)

View File

@ -1,7 +1,8 @@
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from tools.lists import Queue, Stack
from typing import Any, List, Union
from .lists import Queue, Stack
from typing import Any
class Rotate(Enum):
@ -12,18 +13,21 @@ class Rotate(Enum):
@dataclass
class TreeNode:
value: Any
parent: Union['TreeNode', None] = None
left: Union['TreeNode', None] = None
right: Union['TreeNode', None] = None
parent: TreeNode | None = None
left: TreeNode | None = None
right: TreeNode | None = None
balance_factor: int = 0
height: int = 0
def __str__(self):
return "TreeNode:(%s; bf: %d, d: %d, p: %s, l: %s, r: %s)" \
% (self.value, self.balance_factor, self.height,
return "TreeNode:(%s; bf: %d, d: %d, p: %s, l: %s, r: %s)" % (
self.value,
self.balance_factor,
self.height,
self.parent.value if self.parent else "None",
self.left.value if self.left else "None",
self.right.value if self.right else "None")
self.right.value if self.right else "None",
)
def __repr__(self):
return str(self)
@ -31,8 +35,8 @@ class TreeNode:
class TrieNode:
value: str
parent: Union['TrieNode', None] = None
children: List['TrieNode'] = field(default_factory=list)
parent: TrieNode | None = None
children: list[TrieNode] = field(default_factory=list)
def update_node(node: TreeNode):
@ -43,7 +47,7 @@ def update_node(node: TreeNode):
class BinaryTree:
root: Union[TreeNode, None] = None
root: TreeNode | None = None
node_count: int = 0
def _insert(self, node: TreeNode, parent: TreeNode, obj: Any) -> TreeNode:
@ -168,7 +172,7 @@ class BinaryTree:
return
self.print(node.right, level + 1)
print(" " * 4 * level + '->', node)
print(" " * 4 * level + "->", node)
self.print(node.left, level + 1)
def __contains__(self, obj: Any) -> bool:

View File

View File

@ -1,26 +0,0 @@
import datetime
from typing import Callable, List, Any
class Scheduler:
def __init__(self):
self.jobs = {}
def schedule(self, name: str, every: datetime.timedelta, func: Callable[..., None], *args: List[Any]):
self.jobs[name] = {
'call': func,
'args': args,
'timedelta': every,
'runat': (datetime.datetime.utcnow() + every)
}
def unschedule(self, name: str):
if name in self.jobs:
del self.jobs[name]
def run_pending(self):
now = datetime.datetime.utcnow()
for job in self.jobs.values():
if job['runat'] <= now:
job['runat'] += job['timedelta']
job['call'](*job['args'])

View File

@ -1,7 +0,0 @@
from typing import Union
Numeric = Union[int, float]
StrOrNone = Union[str, None]
IntOrNone = Union[int, None]
FloatOrNone = Union[float, None]
NumericOrNone = Union[Numeric, None]