day16 rework; horribly slow, but at least working with all inputs

This commit is contained in:
Stefan Harmuth 2023-02-25 18:15:58 +01:00
parent 55c0803250
commit 1d5bec6bde
2 changed files with 335 additions and 121 deletions

182
d16_old.py Normal file
View File

@ -0,0 +1,182 @@
from collections import deque
from itertools import product
from tools.aoc import AOCDay
from typing import Any
from tools.tools import Cache
class Valve:
def __init__(self, name: str, flowrate: int):
self.name: str = name
self.flowrate: int = flowrate
self.tunnels: set = set()
class Tunnel:
def __init__(self, target: Valve, length: int = 1):
self.target: Valve = target
self.length: int = length
def __str__(self):
return f"Tunnel(target={self.target.name}, length={self.length})"
def __repr__(self):
return str(self)
def get_openable_valve_tunnels(valve: Valve, open_valves: set, time_remaining: int) -> set:
tunnels = set()
queue = deque()
visited = set()
queue.append((0, valve))
while queue:
d, v = queue.popleft()
if v.name in visited:
continue
visited.add(v.name)
if v.name not in open_valves and d + 1 <= time_remaining:
tunnels.add((d, v))
for x in v.tunnels:
queue.append((d + x.length, x.target))
return tunnels
def get_max_flow(valve: Valve, open_valves: list, time_remaining: int = 30, depth: int = 0) -> int:
max_flow = 0
ov = {t for t in valve.tunnels if t.target not in open_valves and t.length < time_remaining - 2}
if time_remaining <= 0 or not ov:
return 0
for tunnel in ov:
this_open_flow = tunnel.target.flowrate * (time_remaining - tunnel.length - 1)
this_flow = get_max_flow(tunnel.target, open_valves + [tunnel.target], time_remaining - tunnel.length - 1, depth + 1)
if this_flow + this_open_flow > max_flow:
max_flow = this_flow + this_open_flow
return max_flow
def get_max_flow_double(valve1: Valve, valve2: Valve, open_valves: list, DP: dict, time_remaining_1: int = 26, time_remaining_2: int = 26, depth: int = 0) -> int:
dp_key = valve1.name + valve2.name + "%02d" % time_remaining_1 + "%02d" % time_remaining_2 + "".join(open_valves)
dp_key2 = valve2.name + valve1.name + "%02d" % time_remaining_1 + "%02d" % time_remaining_2 + "".join(open_valves)
if dp_key in DP:
return DP[dp_key]
if time_remaining_1 <= 0 and time_remaining_2 <= 0:
return 0
ov1 = {t for t in valve1.tunnels if t.target.name not in open_valves and t.length < time_remaining_1 - 2}
ov2 = {t for t in valve2.tunnels if t.target.name not in open_valves and t.length < time_remaining_2 - 2}
if not ov1 and not ov2:
return 0
if not ov1:
ov1 = {Tunnel(valve1, 99)}
if not ov2:
ov2 = {Tunnel(valve2, 99)}
permut = product(ov1, ov2)
max_flow = 0
for v1, v2 in permut:
if v1.target.name == v2.target.name:
continue
this_open_flow = 0
t1 = valve1
if v1.length + 2 <= time_remaining_1:
this_open_flow += v1.target.flowrate * (time_remaining_1 - v1.length - 1)
t1 = v1.target
t2 = valve2
if v2.length + 2 <= time_remaining_2:
this_open_flow += v2.target.flowrate * (time_remaining_2 - v2.length - 1)
t2 = v2.target
this_flow_rate = get_max_flow_double(
t1,
t2,
sorted(open_valves + [v1.target.name, v2.target.name]),
DP,
time_remaining_1 - v1.length - 1,
time_remaining_2 - v2.length - 1,
depth + 1
)
if this_flow_rate + this_open_flow > max_flow:
max_flow = this_flow_rate + this_open_flow
DP[dp_key] = max_flow
DP[dp_key2] = max_flow
return max_flow
class Day(AOCDay):
inputs = [
[
(1651, "input16_test"),
#(1947, "input16_dennis"),
(1850, "input16"),
],
[
(1707, "input16_test"),
(2556, "input16_dennis"),
(2306, "input16"),
]
]
def get_valve_graph(self) -> Valve:
valves = {}
tmp_tunnels = {}
for line in self.getInput():
p = line.split(" ")
valve_name = p[1]
flowrate = int(p[4][5:-1])
tunnels = "".join(p[9:]).split(",")
valves[valve_name] = Valve(valve_name, flowrate)
tmp_tunnels[valve_name] = tunnels
for name, tunnels in tmp_tunnels.items():
valves[name].tunnels = {Tunnel(valves[t]) for t in tunnels}
for valve in valves.values():
tunnels = set()
queue = deque()
visited = set()
queue.append((0, valve))
while queue:
d, v = queue.popleft()
if v in visited:
continue
visited.add(v)
if v != valve and v.flowrate > 0:
tunnels.add(Tunnel(v, d))
for x in v.tunnels:
queue.append((d + 1, x.target))
tmp_tunnels[valve.name] = tunnels
for name, tunnels in tmp_tunnels.items():
valves[name].tunnels = tunnels
return valves["AA"]
def part1(self) -> Any:
return get_max_flow(self.get_valve_graph(), [])
def part2(self) -> Any:
cache = Cache()
root = self.get_valve_graph()
return get_max_flow_double(root, root, [], cache)
if __name__ == '__main__':
day = Day(2022, 16)
day.run(verbose=True)

274
day16.py
View File

@ -1,124 +1,151 @@
from collections import deque import heapq
from itertools import product import itertools
import re
from tools.aoc import AOCDay from tools.aoc import AOCDay
from typing import Any from typing import Any
from tools.tools import Cache
class Valve: class Valve:
def __init__(self, name: str, flowrate: int): def __init__(self, name: str, flowrate: int) -> None:
self.name: str = name self.__name = name
self.flowrate: int = flowrate self.__flowrate = flowrate
self.tunnels: set = set() self.neighbours = {}
@property
def flowrate(self) -> int:
return self.__flowrate
@property
def name(self) -> str:
return self.__name
def shortest_path_to(self, other: 'Valve'):
if other in self.neighbours:
return self.neighbours[other]
current = self
v = set()
v.add(current)
q = [(v, k) for k, v in self.neighbours.items()]
heapq.heapify(q)
while q:
dist, current = heapq.heappop(q)
if current == other:
return dist
if current in v:
continue
v.add(current)
for n, d in current.neighbours.items():
heapq.heappush(q, (dist + d, n))
def __lt__(self, other: 'Valve') -> bool:
return self.__name < other.__name
def __repr__(self) -> str:
return "Valve(%s;%s)" % (self.__name, self.__flowrate)
def __str__(self) -> str:
return self.__repr__()
class Tunnel: def get_most_pressure_release_solo(root: Valve, remaining_minutes: int = 30, visited: set = None) -> int:
def __init__(self, target: Valve, length: int = 1): if visited is None:
self.target: Valve = target visited = set()
self.length: int = length visited.add(root)
def __str__(self): my_flowrate = 0
return f"Tunnel(target={self.target.name}, length={self.length})" if root.flowrate > 0:
remaining_minutes -= 1
my_flowrate = root.flowrate * remaining_minutes
def __repr__(self): max_flowrate = 0
return str(self) for n, d in root.neighbours.items():
if n in visited:
def get_openable_valve_tunnels(valve: Valve, open_valves: set, time_remaining: int) -> set:
tunnels = set()
queue = deque()
visited = set()
queue.append((0, valve))
while queue:
d, v = queue.popleft()
if v.name in visited:
continue continue
visited.add(v.name)
if v.name not in open_valves and d + 2 <= time_remaining:
tunnels.add((d, v))
for x in v.tunnels:
queue.append((d + x.length, x.target))
return tunnels if remaining_minutes <= d + 1:
continue
n_flowrate = get_most_pressure_release_solo(n, remaining_minutes - d, visited.copy())
if n_flowrate > max_flowrate:
max_flowrate = n_flowrate
return max_flowrate + my_flowrate
def get_max_flow(valve: Valve, open_valves: list, time_remaining: int = 30, depth: int = 0) -> int: def get_mode_pressure_release_double(root_me: Valve, root_ele: Valve, r_min_me: int = 26, r_min_ele: int = 26, visited: set = None) -> int:
max_flow = 0 dp_key = root_me.name + root_ele.name + "%02d" % r_min_me + "%02d" % r_min_ele + "".join(v.name for v in sorted(visited))
dp_key2 = root_ele.name + root_me.name + "%02d" % r_min_me + "%02d" % r_min_ele + "".join(v.name for v in sorted(visited))
ov = {t for t in valve.tunnels if t.target not in open_valves and t.length < time_remaining - 2}
if time_remaining <= 0 or not ov:
return 0
for tunnel in ov:
this_open_flow = tunnel.target.flowrate * (time_remaining - tunnel.length - 1)
this_flow = get_max_flow(tunnel.target, open_valves + [tunnel.target], time_remaining - tunnel.length - 1, depth + 1)
if this_flow + this_open_flow > max_flow:
max_flow = this_flow + this_open_flow
return max_flow
def get_max_flow_double(valve1: Valve, valve2: Valve, open_valves: list, DP: dict, time_remaining_1: int = 26, time_remaining_2: int = 26, depth: int = 0) -> int:
dp_key = valve1.name + valve2.name + "%02d" % time_remaining_1 + "%02d" % time_remaining_2 + "".join(open_valves)
dp_key2 = valve2.name + valve1.name + "%02d" % time_remaining_1 + "%02d" % time_remaining_2 + "".join(open_valves)
if dp_key in DP: if dp_key in DP:
return DP[dp_key] return DP[dp_key]
if time_remaining_1 <= 0 and time_remaining_2 <= 0: my_flowrate = 0
if r_min_me > 1 and root_me not in visited:
visited.add(root_me)
r_min_me -= 1
my_flowrate += root_me.flowrate * r_min_me
if r_min_ele > 1 and root_ele not in visited:
visited.add(root_ele)
r_min_ele -= 1
my_flowrate += root_ele.flowrate * r_min_ele
if not my_flowrate and root_me.name != 'AA':
return 0 return 0
ov1 = {t for t in valve1.tunnels if t.target.name not in open_valves and t.length < time_remaining_1 - 2} max_flowrate = 0
ov2 = {t for t in valve2.tunnels if t.target.name not in open_valves and t.length < time_remaining_2 - 2} rem_list = [x for x in root_me.neighbours.keys() if x not in visited]
if len(rem_list) == 1:
if root_me.neighbours[rem_list[0]] < root_ele.neighbours[rem_list[0]]:
max_flowrate = get_mode_pressure_release_double(rem_list[0], root_ele, r_min_me - root_me.neighbours[rem_list[0]], r_min_ele, visited.copy())
else:
max_flowrate = get_mode_pressure_release_double(root_me, rem_list[0], r_min_me, r_min_ele - root_ele.neighbours[rem_list[0]], visited.copy())
else:
max_prod = len(rem_list) ** 2
cur_prod = 0
for v1, v2 in itertools.product(rem_list, repeat=2):
if len(visited) == 1:
cur_prod += 1
print("Iter (%d/%d)" % (cur_prod, max_prod))
if v1 == v2:
continue
if not ov1 and not ov2: me, ele = v1, v2
return 0 if r_min_me <= root_me.neighbours[me] + 1:
me = root_me
if not ov1: if r_min_ele <= root_ele.neighbours[ele] + 1:
ov1 = {Tunnel(valve1, 99)} ele = root_ele
if not ov2: if me == root_me and ele == root_ele:
ov2 = {Tunnel(valve2, 99)} continue
permut = product(ov1, ov2) n_flowrate = get_mode_pressure_release_double(
me, ele,
r_min_me - (root_me.neighbours[me] if me in root_me.neighbours else 1),
r_min_ele - (root_ele.neighbours[ele] if ele in root_ele.neighbours else 1),
visited.copy()
)
if n_flowrate > max_flowrate:
max_flowrate = n_flowrate
max_flow = 0 if n_flowrate == 1671:
for v1, v2 in permut: break
if v1.target.name == v2.target.name:
continue
this_open_flow = 0 DP[dp_key] = my_flowrate + max_flowrate
DP[dp_key2] = my_flowrate + max_flowrate
return my_flowrate + max_flowrate
t1 = valve1
if v1.length + 2 <= time_remaining_1:
this_open_flow += v1.target.flowrate * (time_remaining_1 - v1.length - 1)
t1 = v1.target
t2 = valve2 DP = {}
if v2.length + 2 <= time_remaining_2:
this_open_flow += v2.target.flowrate * (time_remaining_2 - v2.length - 1)
t2 = v2.target
this_flow_rate = get_max_flow_double(
t1,
t2,
sorted(open_valves + [v1.target.name, v2.target.name]),
DP,
time_remaining_1 - v1.length - 1,
time_remaining_2 - v2.length - 1,
depth + 1
)
if this_flow_rate + this_open_flow > max_flow:
max_flow = this_flow_rate + this_open_flow
DP[dp_key] = max_flow
DP[dp_key2] = max_flow
return max_flow
class Day(AOCDay): class Day(AOCDay):
input_regexp = re.compile(r'Valve ([A-Z][A-Z]) has flow rate=([0-9]+); tunnels? leads? to valves? (.*)')
inputs = [ inputs = [
[ [
(1651, "input16_test"), (1651, "input16_test"),
@ -132,49 +159,54 @@ class Day(AOCDay):
] ]
] ]
def get_valve_graph(self) -> Valve: def parse_input(self) -> Valve:
n_cache = {}
valves = {} valves = {}
tmp_tunnels = {}
for line in self.getInput(): for line in self.getInput():
p = line.split(" ") name, flowrate, neighbours = self.input_regexp.match(line).groups()
valve_name = p[1] valves[name] = Valve(name, int(flowrate))
flowrate = int(p[4][5:-1]) n_cache[name] = neighbours.split(", ")
tunnels = "".join(p[9:]).split(",")
valves[valve_name] = Valve(valve_name, flowrate)
tmp_tunnels[valve_name] = tunnels
for name, tunnels in tmp_tunnels.items(): for valve, neighbours in n_cache.items():
valves[name].tunnels = {Tunnel(valves[t]) for t in tunnels} for n in neighbours:
valves[valve].neighbours[valves[n]] = 1
for valve in valves.values(): for valve in valves.values():
tunnels = set() if valve.flowrate != 0:
queue = deque() continue
visited = set()
queue.append((0, valve)) for n_set in valve.neighbours.keys():
while queue: del n_set.neighbours[valve]
d, v = queue.popleft() for n_get in valve.neighbours.keys():
if v in visited: if n_get == n_set:
continue
if n_get in n_set.neighbours:
continue
n_set.neighbours[n_get] = valve.neighbours[n_set] + valve.neighbours[n_get]
for v_set in valves.values():
for v_get in valves.values():
if v_set == v_get or v_get in v_set.neighbours or v_get.flowrate == 0:
continue continue
visited.add(v)
if v != valve and v.flowrate > 0:
tunnels.add(Tunnel(v, d))
for x in v.tunnels:
queue.append((d + 1, x.target))
tmp_tunnels[valve.name] = tunnels v_set.neighbours[v_get] = v_set.shortest_path_to(v_get)
for name, tunnels in tmp_tunnels.items(): for n in list(valves['AA'].neighbours.keys()):
valves[name].tunnels = tunnels if n.flowrate == 0:
del valves['AA'].neighbours[n]
return valves["AA"] return valves['AA']
def part1(self) -> Any: def part1(self) -> Any:
return get_max_flow(self.get_valve_graph(), []) return get_most_pressure_release_solo(self.parse_input())
def part2(self) -> Any: def part2(self) -> Any:
cache = Cache() global DP
root = self.get_valve_graph() DP = {}
return get_max_flow_double(root, root, [], cache) root_valve = self.parse_input()
return get_mode_pressure_release_double(root_valve, root_valve, visited={root_valve})
if __name__ == '__main__': if __name__ == '__main__':