aoc2019/intcode.py
Stefan Harmuth fbffd6e4fd day09
2021-11-27 19:46:55 +01:00

133 lines
5.4 KiB
Python

from enum import Enum
from queue import Queue
import threading
from typing import List, Union
class IntCodeState(Enum):
INITIALIZED = 0
RUNNING = 1
HALTED = 2
class IntCode(threading.Thread):
def __init__(self, memory: List[int]):
super().__init__()
self.instr_ptr = 0
self.memory = memory
self.input_queue = Queue()
self.output_queue = Queue()
self.state = IntCodeState.INITIALIZED
self.relative_base = 0
def getMemoryValue(self, address: int) -> int:
if address > len(self.memory) - 1:
self.memory.extend([0] * (address - len(self.memory) + 1))
return self.memory[address]
def setMemoryValue(self, address: int, value: int):
if address > len(self.memory) - 1:
self.memory.extend([0] * (address - len(self.memory) + 1))
self.memory[address] = value
def getParameterValue(self, address: int, mode: int = 0) -> int:
if mode == 0:
return self.getMemoryValue(self.getMemoryValue(address))
elif mode == 1:
return self.getMemoryValue(address)
elif mode == 2:
return self.getMemoryValue(self.relative_base + self.getMemoryValue(address))
def getTargetAddress(self, address: int, mode: int = 0) -> int:
if mode == 0:
return self.getMemoryValue(address)
elif mode == 2:
return self.getMemoryValue(address) + self.relative_base
def isRunning(self):
return self.state == IntCodeState.RUNNING
def isHalted(self):
return self.state == IntCodeState.HALTED
def sendOutputTo(self, target_queue: Queue):
self.output_queue = target_queue
def addInput(self, value: int):
self.input_queue.put(value)
def getOutput(self, count: int = 1, block: bool = True, timeout: int = None) -> Union[int, List[int]]:
if count == 1:
res = self.output_queue.get(block=block, timeout=timeout)
self.output_queue.task_done()
else:
res = []
for _ in range(count):
res.append(self.output_queue.get(block=block, timeout=timeout))
self.output_queue.task_done()
return res
def run(self):
self.state = IntCodeState.RUNNING
while not self.isHalted():
instr = self.getMemoryValue(self.instr_ptr)
op_code = instr % 100
p1_mode = instr // 100 % 10
p2_mode = instr // 1_000 % 10
p3_mode = instr // 10_000 % 10
if op_code == 1: # add
param1 = self.getParameterValue(self.instr_ptr + 1, p1_mode)
param2 = self.getParameterValue(self.instr_ptr + 2, p2_mode)
target_addr = self.getTargetAddress(self.instr_ptr + 3, p3_mode)
self.setMemoryValue(target_addr, param1 + param2)
self.instr_ptr += 4
elif op_code == 2: # multiply
param1 = self.getParameterValue(self.instr_ptr + 1, p1_mode)
param2 = self.getParameterValue(self.instr_ptr + 2, p2_mode)
target_addr = self.getTargetAddress(self.instr_ptr + 3, p3_mode)
self.setMemoryValue(target_addr, param1 * param2)
self.instr_ptr += 4
elif op_code == 3: # input
target_addr = self.getTargetAddress(self.instr_ptr + 1, p1_mode)
self.setMemoryValue(target_addr, self.input_queue.get())
self.input_queue.task_done()
self.instr_ptr += 2
elif op_code == 4: # output
self.output_queue.put(self.getParameterValue(self.instr_ptr + 1, p1_mode))
self.instr_ptr += 2
elif op_code == 5: # jump-if-true
if self.getParameterValue(self.instr_ptr + 1, p1_mode) != 0:
self.instr_ptr = self.getParameterValue(self.instr_ptr + 2, p2_mode)
else:
self.instr_ptr += 3
elif op_code == 6: # jump-if-false
if self.getParameterValue(self.instr_ptr + 1, p1_mode) == 0:
self.instr_ptr = self.getParameterValue(self.instr_ptr + 2, p2_mode)
else:
self.instr_ptr += 3
elif op_code == 7: # less than
param1 = self.getParameterValue(self.instr_ptr + 1, p1_mode)
param2 = self.getParameterValue(self.instr_ptr + 2, p2_mode)
target_addr = self.getTargetAddress(self.instr_ptr + 3, p3_mode)
self.setMemoryValue(target_addr, param1 < param2)
self.instr_ptr += 4
elif op_code == 8: # equals
param1 = self.getParameterValue(self.instr_ptr + 1, p1_mode)
param2 = self.getParameterValue(self.instr_ptr + 2, p2_mode)
target_addr = self.getTargetAddress(self.instr_ptr + 3, p3_mode)
self.setMemoryValue(target_addr, param1 == param2)
self.instr_ptr += 4
elif op_code == 9:
param1 = self.getParameterValue(self.instr_ptr + 1, p1_mode)
self.relative_base += param1
self.instr_ptr += 2
elif op_code == 99:
self.state = IntCodeState.HALTED
return
else:
raise ValueError("Invalid instruction encountered at pos %d: %d" % (self.instr_ptr, instr))