139 lines
5.5 KiB
Python
139 lines
5.5 KiB
Python
import threading
|
|
import time
|
|
from collections import deque, defaultdict
|
|
from enum import Enum
|
|
from typing import List
|
|
|
|
|
|
class IntCodeState(Enum):
|
|
INITIALIZED = 0
|
|
RUNNING = 1
|
|
HALTED = 2
|
|
|
|
|
|
def _get_from_queue(queue: deque, count: int = 1, timeout: int = 10) -> int | List[int] | None:
|
|
started = time.perf_counter()
|
|
while len(queue) < count:
|
|
time.sleep(0.0001)
|
|
if time.perf_counter() - started > timeout:
|
|
raise TimeoutError()
|
|
|
|
if count == 1:
|
|
return queue.popleft()
|
|
else:
|
|
return [queue.popleft() for _ in range(count)]
|
|
|
|
|
|
class IntCode(threading.Thread):
|
|
def __init__(self, memory: List[int]):
|
|
super().__init__()
|
|
self.instr_ptr = 0
|
|
self.memory = defaultdict(int)
|
|
for i, v in enumerate(memory):
|
|
self.memory[i] = v
|
|
self.input_queue = deque()
|
|
self.output_queue = deque()
|
|
self.state = IntCodeState.INITIALIZED
|
|
self.relative_base = 0
|
|
|
|
def getMemoryValue(self, address: int) -> int:
|
|
return self.memory[address]
|
|
|
|
def setMemoryValue(self, address: int, value: int):
|
|
self.memory[address] = value
|
|
|
|
def getParameterValue(self, address: int, mode: int = 0) -> int:
|
|
if mode == 0:
|
|
return self.getMemoryValue(self.getMemoryValue(address))
|
|
elif mode == 1:
|
|
return self.getMemoryValue(address)
|
|
elif mode == 2:
|
|
return self.getMemoryValue(self.relative_base + self.getMemoryValue(address))
|
|
|
|
def getTargetAddress(self, address: int, mode: int = 0) -> int:
|
|
if mode == 0:
|
|
return self.getMemoryValue(address)
|
|
elif mode == 2:
|
|
return self.getMemoryValue(address) + self.relative_base
|
|
|
|
def isRunning(self):
|
|
return self.state == IntCodeState.RUNNING
|
|
|
|
def isHalted(self):
|
|
return self.state == IntCodeState.HALTED
|
|
|
|
def sendOutputTo(self, target_queue: deque):
|
|
self.output_queue = target_queue
|
|
|
|
def addInput(self, value: int):
|
|
self.input_queue.append(value)
|
|
|
|
def hasOutput(self):
|
|
return len(self.output_queue) > 0
|
|
|
|
def getOutput(self, count: int = 1, timeout: int = 10) -> int | List[int] | None:
|
|
return _get_from_queue(self.output_queue, count, timeout)
|
|
|
|
def getInput(self, count: int = 1, timeout: int = 10) -> int | List[int] | None:
|
|
return _get_from_queue(self.input_queue, count, timeout)
|
|
|
|
def run(self):
|
|
self.state = IntCodeState.RUNNING
|
|
while not self.isHalted():
|
|
instr = self.getMemoryValue(self.instr_ptr)
|
|
op_code = instr % 100
|
|
p1_mode = instr // 100 % 10
|
|
p2_mode = instr // 1_000 % 10
|
|
p3_mode = instr // 10_000 % 10
|
|
|
|
if op_code == 1: # add
|
|
param1 = self.getParameterValue(self.instr_ptr + 1, p1_mode)
|
|
param2 = self.getParameterValue(self.instr_ptr + 2, p2_mode)
|
|
target_addr = self.getTargetAddress(self.instr_ptr + 3, p3_mode)
|
|
self.setMemoryValue(target_addr, param1 + param2)
|
|
self.instr_ptr += 4
|
|
elif op_code == 2: # multiply
|
|
param1 = self.getParameterValue(self.instr_ptr + 1, p1_mode)
|
|
param2 = self.getParameterValue(self.instr_ptr + 2, p2_mode)
|
|
target_addr = self.getTargetAddress(self.instr_ptr + 3, p3_mode)
|
|
self.setMemoryValue(target_addr, param1 * param2)
|
|
self.instr_ptr += 4
|
|
elif op_code == 3: # input
|
|
target_addr = self.getTargetAddress(self.instr_ptr + 1, p1_mode)
|
|
self.setMemoryValue(target_addr, self.getInput())
|
|
self.instr_ptr += 2
|
|
elif op_code == 4: # output
|
|
self.output_queue.append(self.getParameterValue(self.instr_ptr + 1, p1_mode))
|
|
self.instr_ptr += 2
|
|
elif op_code == 5: # jump-if-true
|
|
if self.getParameterValue(self.instr_ptr + 1, p1_mode) != 0:
|
|
self.instr_ptr = self.getParameterValue(self.instr_ptr + 2, p2_mode)
|
|
else:
|
|
self.instr_ptr += 3
|
|
elif op_code == 6: # jump-if-false
|
|
if self.getParameterValue(self.instr_ptr + 1, p1_mode) == 0:
|
|
self.instr_ptr = self.getParameterValue(self.instr_ptr + 2, p2_mode)
|
|
else:
|
|
self.instr_ptr += 3
|
|
elif op_code == 7: # less than
|
|
param1 = self.getParameterValue(self.instr_ptr + 1, p1_mode)
|
|
param2 = self.getParameterValue(self.instr_ptr + 2, p2_mode)
|
|
target_addr = self.getTargetAddress(self.instr_ptr + 3, p3_mode)
|
|
self.setMemoryValue(target_addr, param1 < param2)
|
|
self.instr_ptr += 4
|
|
elif op_code == 8: # equals
|
|
param1 = self.getParameterValue(self.instr_ptr + 1, p1_mode)
|
|
param2 = self.getParameterValue(self.instr_ptr + 2, p2_mode)
|
|
target_addr = self.getTargetAddress(self.instr_ptr + 3, p3_mode)
|
|
self.setMemoryValue(target_addr, param1 == param2)
|
|
self.instr_ptr += 4
|
|
elif op_code == 9:
|
|
param1 = self.getParameterValue(self.instr_ptr + 1, p1_mode)
|
|
self.relative_base += param1
|
|
self.instr_ptr += 2
|
|
elif op_code == 99:
|
|
self.state = IntCodeState.HALTED
|
|
return
|
|
else:
|
|
raise ValueError("Invalid instruction encountered at pos %d: %d" % (self.instr_ptr, instr))
|