forked from UNN/2026-rff_mp
296 lines
8.7 KiB
Python
296 lines
8.7 KiB
Python
import heapq
|
|
import time
|
|
import os
|
|
from collections import deque
|
|
from abc import ABC, abstractmethod
|
|
import itertools
|
|
|
|
class Cell:
|
|
def __init__(self, x, y):
|
|
self.x = x
|
|
self.y = y
|
|
self.isWall = False
|
|
self.isStart = False
|
|
self.isExit = False
|
|
self.weight = 1
|
|
|
|
def isPassable(self):
|
|
return not self.isWall
|
|
|
|
def __repr__(self):
|
|
return f"({self.x},{self.y})"
|
|
|
|
class Maze:
|
|
def __init__(self, width, height):
|
|
self.width = width
|
|
self.height = height
|
|
self.grid = [[Cell(x, y) for y in range(height)] for x in range(width)]
|
|
self.start = None
|
|
self.exit = None
|
|
|
|
def getCell(self, x, y):
|
|
if 0 <= x < self.width and 0 <= y < self.height:
|
|
return self.grid[x][y]
|
|
return None
|
|
|
|
def getNeighbors(self, cell):
|
|
dirs = [(-1,0), (1,0), (0,-1), (0,1)]
|
|
result = []
|
|
for dx, dy in dirs:
|
|
nx, ny = cell.x + dx, cell.y + dy
|
|
ncell = self.getCell(nx, ny)
|
|
if ncell and ncell.isPassable():
|
|
result.append(ncell)
|
|
return result
|
|
|
|
class MazeBuilder(ABC):
|
|
@abstractmethod
|
|
def buildFromFile(self, filename):
|
|
pass
|
|
|
|
class TextFileMazeBuilder(MazeBuilder):
|
|
def buildFromFile(self, filename):
|
|
with open(filename, 'r') as f:
|
|
lines = [line.rstrip('\n') for line in f if line.strip() != '']
|
|
height = len(lines)
|
|
width = len(lines[0]) if height > 0 else 0
|
|
maze = Maze(width, height)
|
|
for y, line in enumerate(lines):
|
|
for x, ch in enumerate(line):
|
|
cell = maze.getCell(x, y)
|
|
if ch == '#':
|
|
cell.isWall = True
|
|
elif ch == 'S':
|
|
cell.isStart = True
|
|
maze.start = cell
|
|
elif ch == 'E':
|
|
cell.isExit = True
|
|
maze.exit = cell
|
|
elif ch == ' ':
|
|
pass
|
|
else:
|
|
if ch.isdigit():
|
|
cell.weight = int(ch)
|
|
else:
|
|
raise ValueError(f"err '{ch}' at ({x},{y})")
|
|
if maze.start is None or maze.exit is None:
|
|
raise ValueError("not e or/and s")
|
|
return maze
|
|
|
|
class PathFindingStrategy(ABC):
|
|
def __init__(self):
|
|
self.visited_count = 0
|
|
|
|
@abstractmethod
|
|
def findPath(self, maze, start, exit_cell):
|
|
pass
|
|
|
|
class BFSPathFinding(PathFindingStrategy):
|
|
def findPath(self, maze, start, exit_cell):
|
|
self.visited_count = 0
|
|
queue = deque()
|
|
queue.append(start)
|
|
parent = {start: None}
|
|
while queue:
|
|
current = queue.popleft()
|
|
self.visited_count += 1
|
|
if current == exit_cell:
|
|
return self._reconstruct_path(parent, exit_cell)
|
|
for neighbor in maze.getNeighbors(current):
|
|
if neighbor not in parent:
|
|
parent[neighbor] = current
|
|
queue.append(neighbor)
|
|
return []
|
|
|
|
def _reconstruct_path(self, parent, end):
|
|
path = []
|
|
cur = end
|
|
while cur is not None:
|
|
path.append(cur)
|
|
cur = parent[cur]
|
|
path.reverse()
|
|
return path
|
|
|
|
class DFSPathFinding(PathFindingStrategy):
|
|
def findPath(self, maze, start, exit_cell):
|
|
self.visited_count = 0
|
|
stack = [start]
|
|
parent = {start: None}
|
|
while stack:
|
|
current = stack.pop()
|
|
self.visited_count += 1
|
|
if current == exit_cell:
|
|
return self._reconstruct_path(parent, exit_cell)
|
|
for neighbor in maze.getNeighbors(current):
|
|
if neighbor not in parent:
|
|
parent[neighbor] = current
|
|
stack.append(neighbor)
|
|
return []
|
|
|
|
def _reconstruct_path(self, parent, end):
|
|
path = []
|
|
cur = end
|
|
while cur is not None:
|
|
path.append(cur)
|
|
cur = parent[cur]
|
|
path.reverse()
|
|
return path
|
|
|
|
class AStarPathFinding(PathFindingStrategy):
|
|
def findPath(self, maze, start, exit_cell):
|
|
self.visited_count = 0
|
|
def heuristic(cell):
|
|
return abs(cell.x - exit_cell.x) + abs(cell.y - exit_cell.y)
|
|
|
|
open_set = []
|
|
counter = itertools.count() #
|
|
heapq.heappush(open_set, (0 + heuristic(start), 0, next(counter), start))
|
|
parent = {start: None}
|
|
g_score = {start: 0}
|
|
closed = set()
|
|
|
|
while open_set:
|
|
_, cost, _, current = heapq.heappop(open_set)
|
|
self.visited_count += 1
|
|
if current in closed:
|
|
continue
|
|
if current == exit_cell:
|
|
return self._reconstruct_path(parent, exit_cell)
|
|
closed.add(current)
|
|
for neighbor in maze.getNeighbors(current):
|
|
tentative_g = g_score[current] + neighbor.weight
|
|
if neighbor not in g_score or tentative_g < g_score[neighbor]:
|
|
g_score[neighbor] = tentative_g
|
|
f = tentative_g + heuristic(neighbor)
|
|
heapq.heappush(open_set, (f, tentative_g, next(counter), neighbor))
|
|
parent[neighbor] = current
|
|
return []
|
|
|
|
def _reconstruct_path(self, parent, end):
|
|
path = []
|
|
cur = end
|
|
while cur is not None:
|
|
path.append(cur)
|
|
cur = parent[cur]
|
|
path.reverse()
|
|
return path
|
|
|
|
class SearchStats:
|
|
def __init__(self, time_ms, visited, path_length, path):
|
|
self.time_ms = time_ms
|
|
self.visited = visited
|
|
self.path_length = path_length
|
|
self.path = path
|
|
|
|
class MazeSolver:
|
|
def __init__(self, maze, strategy):
|
|
self.maze = maze
|
|
self.strategy = strategy
|
|
self.observers = []
|
|
|
|
def setStrategy(self, strategy):
|
|
self.strategy = strategy
|
|
|
|
def solve(self):
|
|
start = self.maze.start
|
|
exit_cell = self.maze.exit
|
|
t0 = time.perf_counter()
|
|
path = self.strategy.findPath(self.maze, start, exit_cell)
|
|
t1 = time.perf_counter()
|
|
ms = (t1 - t0) * 1000
|
|
visited = self.strategy.visited_count
|
|
stats = SearchStats(ms, visited, len(path), path)
|
|
self.notify("path_found", stats)
|
|
return stats
|
|
|
|
def addObserver(self, observer):
|
|
self.observers.append(observer)
|
|
|
|
def notify(self, event, data=None):
|
|
for obs in self.observers:
|
|
obs.update(event, data)
|
|
|
|
class Observer(ABC):
|
|
@abstractmethod
|
|
def update(self, event, data):
|
|
pass
|
|
|
|
class ConsoleView(Observer):
|
|
def __init__(self, maze):
|
|
self.maze = maze
|
|
|
|
def update(self, event, data):
|
|
if event == "path_found":
|
|
self.render(data.path, data)
|
|
|
|
def render(self, path, stats=None):
|
|
os.system('cls' if os.name == 'nt' else 'clear')
|
|
path_set = set(path) if path else set()
|
|
for y in range(self.maze.height):
|
|
line = ""
|
|
for x in range(self.maze.width):
|
|
cell = self.maze.getCell(x, y)
|
|
if cell == self.maze.start:
|
|
line += "S"
|
|
elif cell == self.maze.exit:
|
|
line += "E"
|
|
elif cell.isWall:
|
|
line += "#"
|
|
elif cell in path_set:
|
|
line += "."
|
|
else:
|
|
line += " "
|
|
print(line)
|
|
if stats:
|
|
print(f"\npath: {stats.path_length}, visit: {stats.visited}, time: {stats.time_ms:.2f} ms")
|
|
|
|
class Player:
|
|
def __init__(self, start_cell):
|
|
self.current = start_cell
|
|
self.history = []
|
|
|
|
def move(self, dx, dy, maze):
|
|
nx, ny = self.current.x + dx, self.current.y + dy
|
|
ncell = maze.getCell(nx, ny)
|
|
if ncell and ncell.isPassable():
|
|
self.history.append(self.current)
|
|
self.current = ncell
|
|
return True
|
|
return False
|
|
|
|
def undo(self):
|
|
if self.history:
|
|
self.current = self.history.pop()
|
|
return True
|
|
return False
|
|
|
|
class Command(ABC):
|
|
@abstractmethod
|
|
def execute(self):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def undo(self):
|
|
pass
|
|
|
|
class MoveCommand(Command):
|
|
def __init__(self, player, maze, dx, dy):
|
|
self.player = player
|
|
self.maze = maze
|
|
self.dx = dx
|
|
self.dy = dy
|
|
self.executed = False
|
|
|
|
def execute(self):
|
|
if not self.executed:
|
|
success = self.player.move(self.dx, self.dy, self.maze)
|
|
self.executed = success
|
|
return success
|
|
return False
|
|
|
|
def undo(self):
|
|
if self.executed:
|
|
self.player.undo()
|
|
self.executed = False
|
|
return True
|
|
return False |