115 lines
3.3 KiB
Python
115 lines
3.3 KiB
Python
import time
|
|
from collections import defaultdict
|
|
from threading import Thread
|
|
from tools.aoc import AOCDay
|
|
from typing import Any
|
|
from queue import Queue
|
|
|
|
|
|
class Duet(Thread):
|
|
def __init__(self, commands: list, pid: int = 0, input_queue: Queue = None, export_queue: Queue = None) -> None:
|
|
super().__init__()
|
|
self.commands = commands
|
|
self.pid = pid
|
|
self.input_queue = input_queue
|
|
self.export_queue = export_queue
|
|
self.state = 0
|
|
self.output = 0
|
|
|
|
def run(self) -> None:
|
|
last_freq = 0
|
|
register = defaultdict(int)
|
|
register['p'] = self.pid
|
|
index = 0
|
|
while 0 <= index < len(self.commands):
|
|
cmd = self.commands[index]
|
|
|
|
if len(cmd) == 3:
|
|
value_index = 2
|
|
else:
|
|
value_index = 1
|
|
|
|
try:
|
|
value = int(cmd[value_index])
|
|
except ValueError:
|
|
value = register[cmd[value_index]]
|
|
|
|
match cmd[0]:
|
|
case "snd":
|
|
last_freq = value
|
|
if self.export_queue is not None:
|
|
self.export_queue.put(value)
|
|
self.output += 1
|
|
case "set":
|
|
register[cmd[1]] = value
|
|
case "add":
|
|
register[cmd[1]] += value
|
|
case "mul":
|
|
register[cmd[1]] *= value
|
|
case "mod":
|
|
register[cmd[1]] %= value
|
|
case "rcv":
|
|
if self.input_queue is not None:
|
|
self.state = 1
|
|
while self.input_queue.empty():
|
|
time.sleep(0.01)
|
|
register[cmd[1]] = self.input_queue.get()
|
|
self.state = 0
|
|
else:
|
|
if register[cmd[1]] != 0:
|
|
self.output = last_freq
|
|
return
|
|
case "jgz":
|
|
try:
|
|
if int(cmd[1]) > 0:
|
|
index += value - 1
|
|
except ValueError:
|
|
if register[cmd[1]] > 0:
|
|
index += value - 1
|
|
case _:
|
|
raise ValueError("Unknown command: " + cmd[0])
|
|
|
|
index += 1
|
|
|
|
return
|
|
|
|
|
|
class Day(AOCDay):
|
|
inputs = [
|
|
[
|
|
(4, "input18_test"),
|
|
(7071, "input18")
|
|
],
|
|
[
|
|
(3, "input18_test2"),
|
|
(8001, "input18")
|
|
]
|
|
]
|
|
|
|
def part1(self) -> Any:
|
|
duet = Duet([x.split(" ") for x in self.getInput()])
|
|
duet.start()
|
|
duet.join()
|
|
return duet.output
|
|
|
|
def part2(self) -> Any:
|
|
q1, q2 = Queue(), Queue()
|
|
p1 = Duet([x.split(" ") for x in self.getInput()], 0, q1, q2)
|
|
p2 = Duet([x.split(" ") for x in self.getInput()], 1, q2, q1)
|
|
p1.daemon = True
|
|
p2.daemon = True
|
|
p1.start()
|
|
p2.start()
|
|
|
|
while p1.is_alive() or p2.is_alive():
|
|
if (p1.state == 1 and q1.empty()) and (p2.state == 1 and q2.empty()):
|
|
return p2.output
|
|
time.sleep(0.01)
|
|
|
|
return p2.output
|
|
|
|
|
|
if __name__ == '__main__':
|
|
day = Day(2017, 18)
|
|
day.run(verbose=True)
|