import threading from collections import defaultdict from queue import Queue, Empty from enum import Enum from typing import List class IntCodeState(Enum): INITIALIZED = 0 RUNNING = 1 HALTED = 2 STOPPED = 3 class IntCode(threading.Thread): def __init__(self, memory: List[int]): super().__init__() self.instr_ptr = 0 self.memory = defaultdict(int) for i, v in enumerate(memory): self.memory[i] = v self.input_queue = Queue() self.output_queue = Queue() self.state = IntCodeState.INITIALIZED self.relative_base = 0 def getMemoryValue(self, address: int) -> int: return int(self.memory[address]) def setMemoryValue(self, address: int, value: int): self.memory[address] = value def getParameterValue(self, address: int, mode: int = 0) -> int: if mode == 0: return self.getMemoryValue(self.getMemoryValue(address)) elif mode == 1: return self.getMemoryValue(address) elif mode == 2: return self.getMemoryValue(self.relative_base + self.getMemoryValue(address)) def getTargetAddress(self, address: int, mode: int = 0) -> int: if mode == 0: return self.getMemoryValue(address) elif mode == 2: return self.getMemoryValue(address) + self.relative_base def isRunning(self): return self.state == IntCodeState.RUNNING def isHalted(self): return self.state == IntCodeState.HALTED def isStopped(self): return self.state == IntCodeState.STOPPED def _get_from_queue(self, queue: Queue, count: int = 1, timeout: int = 3) -> int | List[int] | None: try: if count == 1: return queue.get(timeout=timeout) else: return [queue.get(timeout=timeout) for _ in range(count)] except Empty as e: if self.state == IntCodeState.STOPPED: return None else: raise e def addInput(self, value: int): self.input_queue.put(value) def addOutput(self, value: int): self.output_queue.put(value) def hasOutput(self): return self.output_queue.qsize() > 0 def getOutput(self, count: int = 1, timeout: int = 3) -> int | List[int] | None: return self._get_from_queue(self.output_queue, count, timeout) def getInput(self, count: int = 1, timeout: int = 3) -> int | List[int] | None: return self._get_from_queue(self.input_queue, count, timeout) def stop(self): self.state = IntCodeState.STOPPED def run(self): self.state = IntCodeState.RUNNING while not self.isHalted() and not self.isStopped(): instr = self.getMemoryValue(self.instr_ptr) op_code = instr % 100 p1_mode = instr // 100 % 10 p2_mode = instr // 1_000 % 10 p3_mode = instr // 10_000 % 10 if op_code == 1: # add param1 = self.getParameterValue(self.instr_ptr + 1, p1_mode) param2 = self.getParameterValue(self.instr_ptr + 2, p2_mode) target_addr = self.getTargetAddress(self.instr_ptr + 3, p3_mode) self.setMemoryValue(target_addr, param1 + param2) self.instr_ptr += 4 elif op_code == 2: # multiply param1 = self.getParameterValue(self.instr_ptr + 1, p1_mode) param2 = self.getParameterValue(self.instr_ptr + 2, p2_mode) target_addr = self.getTargetAddress(self.instr_ptr + 3, p3_mode) self.setMemoryValue(target_addr, param1 * param2) self.instr_ptr += 4 elif op_code == 3: # input target_addr = self.getTargetAddress(self.instr_ptr + 1, p1_mode) self.setMemoryValue(target_addr, self.getInput()) self.instr_ptr += 2 elif op_code == 4: # output self.addOutput(self.getParameterValue(self.instr_ptr + 1, p1_mode)) self.instr_ptr += 2 elif op_code == 5: # jump-if-true if self.getParameterValue(self.instr_ptr + 1, p1_mode) != 0: self.instr_ptr = self.getParameterValue(self.instr_ptr + 2, p2_mode) else: self.instr_ptr += 3 elif op_code == 6: # jump-if-false if self.getParameterValue(self.instr_ptr + 1, p1_mode) == 0: self.instr_ptr = self.getParameterValue(self.instr_ptr + 2, p2_mode) else: self.instr_ptr += 3 elif op_code == 7: # less than param1 = self.getParameterValue(self.instr_ptr + 1, p1_mode) param2 = self.getParameterValue(self.instr_ptr + 2, p2_mode) target_addr = self.getTargetAddress(self.instr_ptr + 3, p3_mode) self.setMemoryValue(target_addr, param1 < param2) self.instr_ptr += 4 elif op_code == 8: # equals param1 = self.getParameterValue(self.instr_ptr + 1, p1_mode) param2 = self.getParameterValue(self.instr_ptr + 2, p2_mode) target_addr = self.getTargetAddress(self.instr_ptr + 3, p3_mode) self.setMemoryValue(target_addr, param1 == param2) self.instr_ptr += 4 elif op_code == 9: param1 = self.getParameterValue(self.instr_ptr + 1, p1_mode) self.relative_base += param1 self.instr_ptr += 2 elif op_code == 99: self.state = IntCodeState.HALTED return else: raise ValueError("Invalid instruction encountered at pos %d: %d" % (self.instr_ptr, instr))