532 lines
15 KiB
Python
532 lines
15 KiB
Python
|
|
import sys
|
||
|
|
from collections import deque
|
||
|
|
import heapq
|
||
|
|
import time
|
||
|
|
import os
|
||
|
|
from abc import ABC, abstractmethod
|
||
|
|
from typing import List, Optional, Dict, Any
|
||
|
|
|
||
|
|
|
||
|
|
DATA_PATH = r"C:\Users\user\Desktop\2026-rff_mp\SokolovEN\docs\data"
|
||
|
|
|
||
|
|
|
||
|
|
class Observer(ABC):
|
||
|
|
@abstractmethod
|
||
|
|
def update(self, event: str, data: Any = None):
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
class Observable:
|
||
|
|
def __init__(self):
|
||
|
|
self._observers: List[Observer] = []
|
||
|
|
|
||
|
|
def attach(self, observer: Observer):
|
||
|
|
self._observers.append(observer)
|
||
|
|
|
||
|
|
def detach(self, observer: Observer):
|
||
|
|
self._observers.remove(observer)
|
||
|
|
|
||
|
|
def notify(self, event: str, data: Any = None):
|
||
|
|
for observer in self._observers:
|
||
|
|
observer.update(event, data)
|
||
|
|
|
||
|
|
|
||
|
|
class Tile:
|
||
|
|
def __init__(self, x: int, y: int):
|
||
|
|
self._x = x
|
||
|
|
self._y = y
|
||
|
|
self._wall = False
|
||
|
|
self._start = False
|
||
|
|
self._exit = False
|
||
|
|
|
||
|
|
@property
|
||
|
|
def x(self) -> int:
|
||
|
|
return self._x
|
||
|
|
|
||
|
|
@property
|
||
|
|
def y(self) -> int:
|
||
|
|
return self._y
|
||
|
|
|
||
|
|
@property
|
||
|
|
def is_wall(self) -> bool:
|
||
|
|
return self._wall
|
||
|
|
|
||
|
|
@is_wall.setter
|
||
|
|
def is_wall(self, v: bool):
|
||
|
|
self._wall = v
|
||
|
|
|
||
|
|
@property
|
||
|
|
def is_start(self) -> bool:
|
||
|
|
return self._start
|
||
|
|
|
||
|
|
@is_start.setter
|
||
|
|
def is_start(self, v: bool):
|
||
|
|
self._start = v
|
||
|
|
|
||
|
|
@property
|
||
|
|
def is_exit(self) -> bool:
|
||
|
|
return self._exit
|
||
|
|
|
||
|
|
@is_exit.setter
|
||
|
|
def is_exit(self, v: bool):
|
||
|
|
self._exit = v
|
||
|
|
|
||
|
|
def passable(self) -> bool:
|
||
|
|
return not self._wall
|
||
|
|
|
||
|
|
def __hash__(self):
|
||
|
|
return hash((self._x, self._y))
|
||
|
|
|
||
|
|
def __eq__(self, other):
|
||
|
|
if not isinstance(other, Tile):
|
||
|
|
return False
|
||
|
|
return self._x == other._x and self._y == other._y
|
||
|
|
|
||
|
|
|
||
|
|
class Maze:
|
||
|
|
def __init__(self, w: int, h: int):
|
||
|
|
self._w = w
|
||
|
|
self._h = h
|
||
|
|
self._cells = [[Tile(x, y) for x in range(w)] for y in range(h)]
|
||
|
|
self._start: Optional[Tile] = None
|
||
|
|
self._exit: Optional[Tile] = None
|
||
|
|
|
||
|
|
@property
|
||
|
|
def width(self) -> int:
|
||
|
|
return self._w
|
||
|
|
|
||
|
|
@property
|
||
|
|
def height(self) -> int:
|
||
|
|
return self._h
|
||
|
|
|
||
|
|
@property
|
||
|
|
def start(self) -> Optional[Tile]:
|
||
|
|
return self._start
|
||
|
|
|
||
|
|
@property
|
||
|
|
def exit(self) -> Optional[Tile]:
|
||
|
|
return self._exit
|
||
|
|
|
||
|
|
def get_cell(self, x: int, y: int) -> Optional[Tile]:
|
||
|
|
if 0 <= x < self._w and 0 <= y < self._h:
|
||
|
|
return self._cells[y][x]
|
||
|
|
return None
|
||
|
|
|
||
|
|
def set_cell(self, x: int, y: int, kind: str):
|
||
|
|
c = self.get_cell(x, y)
|
||
|
|
if not c:
|
||
|
|
return
|
||
|
|
if kind == 'wall':
|
||
|
|
c.is_wall = True
|
||
|
|
elif kind == 'start':
|
||
|
|
if self._start:
|
||
|
|
self._start.is_start = False
|
||
|
|
c.is_start = True
|
||
|
|
c.is_wall = False
|
||
|
|
self._start = c
|
||
|
|
elif kind == 'exit':
|
||
|
|
if self._exit:
|
||
|
|
self._exit.is_exit = False
|
||
|
|
c.is_exit = True
|
||
|
|
c.is_wall = False
|
||
|
|
self._exit = c
|
||
|
|
elif kind == 'path':
|
||
|
|
c.is_wall = False
|
||
|
|
|
||
|
|
def neighbours(self, cell: Tile) -> List[Tile]:
|
||
|
|
result = []
|
||
|
|
for dx, dy in [(0, -1), (0, 1), (-1, 0), (1, 0)]:
|
||
|
|
nx, ny = cell.x + dx, cell.y + dy
|
||
|
|
nb = self.get_cell(nx, ny)
|
||
|
|
if nb and nb.passable():
|
||
|
|
result.append(nb)
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
class MazeLoader(ABC):
|
||
|
|
@abstractmethod
|
||
|
|
def load(self, filename: str) -> Maze:
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
class TextMazeLoader(MazeLoader):
|
||
|
|
def load(self, filename: str) -> Maze:
|
||
|
|
with open(filename, 'r', encoding='utf-8') as f:
|
||
|
|
lines = [line.rstrip('\n') for line in f.readlines()]
|
||
|
|
|
||
|
|
h = len(lines)
|
||
|
|
w = max(len(line) for line in lines) if h else 0
|
||
|
|
|
||
|
|
start_count = 0
|
||
|
|
exit_count = 0
|
||
|
|
maze = Maze(w, h)
|
||
|
|
|
||
|
|
for y, line in enumerate(lines):
|
||
|
|
for x, ch in enumerate(line):
|
||
|
|
if ch == '#':
|
||
|
|
maze.set_cell(x, y, 'wall')
|
||
|
|
elif ch == 'S':
|
||
|
|
maze.set_cell(x, y, 'start')
|
||
|
|
start_count += 1
|
||
|
|
elif ch == 'E':
|
||
|
|
maze.set_cell(x, y, 'exit')
|
||
|
|
exit_count += 1
|
||
|
|
else:
|
||
|
|
maze.set_cell(x, y, 'path')
|
||
|
|
|
||
|
|
if start_count != 1 or exit_count != 1:
|
||
|
|
raise ValueError(f"Maze must have one S and one E. Found: S={start_count}, E={exit_count}")
|
||
|
|
|
||
|
|
return maze
|
||
|
|
|
||
|
|
|
||
|
|
class PathFinder(ABC):
|
||
|
|
def __init__(self):
|
||
|
|
self._visited = 0
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def find(self, maze: Maze, start: Tile, goal: Tile) -> List[Tile]:
|
||
|
|
pass
|
||
|
|
|
||
|
|
def _reconstruct(self, parent: Dict[Tile, Optional[Tile]], start: Tile, goal: Tile) -> List[Tile]:
|
||
|
|
path = []
|
||
|
|
current = goal
|
||
|
|
while current is not None:
|
||
|
|
path.append(current)
|
||
|
|
current = parent.get(current)
|
||
|
|
path.reverse()
|
||
|
|
return path if path and path[0] == start else []
|
||
|
|
|
||
|
|
@property
|
||
|
|
def visited_count(self) -> int:
|
||
|
|
return self._visited
|
||
|
|
|
||
|
|
|
||
|
|
class BFS(PathFinder):
|
||
|
|
def find(self, maze: Maze, start: Tile, goal: Tile) -> List[Tile]:
|
||
|
|
queue = deque([start])
|
||
|
|
parent = {start: None}
|
||
|
|
visited = {start}
|
||
|
|
|
||
|
|
while queue:
|
||
|
|
current = queue.popleft()
|
||
|
|
|
||
|
|
if current == goal:
|
||
|
|
self._visited = len(visited)
|
||
|
|
return self._reconstruct(parent, start, goal)
|
||
|
|
|
||
|
|
for neighbor in maze.neighbours(current):
|
||
|
|
if neighbor not in visited:
|
||
|
|
visited.add(neighbor)
|
||
|
|
parent[neighbor] = current
|
||
|
|
queue.append(neighbor)
|
||
|
|
|
||
|
|
self._visited = len(visited)
|
||
|
|
return []
|
||
|
|
|
||
|
|
|
||
|
|
class DFS(PathFinder):
|
||
|
|
def find(self, maze: Maze, start: Tile, goal: Tile) -> List[Tile]:
|
||
|
|
stack = [start]
|
||
|
|
parent = {start: None}
|
||
|
|
visited = {start}
|
||
|
|
|
||
|
|
while stack:
|
||
|
|
current = stack.pop()
|
||
|
|
|
||
|
|
if current == goal:
|
||
|
|
self._visited = len(visited)
|
||
|
|
return self._reconstruct(parent, start, goal)
|
||
|
|
|
||
|
|
for neighbor in maze.neighbours(current):
|
||
|
|
if neighbor not in visited:
|
||
|
|
visited.add(neighbor)
|
||
|
|
parent[neighbor] = current
|
||
|
|
stack.append(neighbor)
|
||
|
|
|
||
|
|
self._visited = len(visited)
|
||
|
|
return []
|
||
|
|
|
||
|
|
|
||
|
|
class AStar(PathFinder):
|
||
|
|
def _heuristic(self, cell: Tile, goal: Tile) -> int:
|
||
|
|
return abs(cell.x - goal.x) + abs(cell.y - goal.y)
|
||
|
|
|
||
|
|
def find(self, maze: Maze, start: Tile, goal: Tile) -> List[Tile]:
|
||
|
|
heap = []
|
||
|
|
counter = 0
|
||
|
|
start_f = self._heuristic(start, goal)
|
||
|
|
heapq.heappush(heap, (start_f, counter, start))
|
||
|
|
counter += 1
|
||
|
|
|
||
|
|
parent = {}
|
||
|
|
g_score = {start: 0}
|
||
|
|
f_score = {start: start_f}
|
||
|
|
visited = set()
|
||
|
|
|
||
|
|
while heap:
|
||
|
|
current_f, _, current = heapq.heappop(heap)
|
||
|
|
visited.add(current)
|
||
|
|
|
||
|
|
if current == goal:
|
||
|
|
self._visited = len(visited)
|
||
|
|
return self._reconstruct(parent, start, goal)
|
||
|
|
|
||
|
|
if current_f > f_score.get(current, float('inf')):
|
||
|
|
continue
|
||
|
|
|
||
|
|
for neighbor in maze.neighbours(current):
|
||
|
|
tentative_g = g_score[current] + 1
|
||
|
|
|
||
|
|
if tentative_g < g_score.get(neighbor, float('inf')):
|
||
|
|
parent[neighbor] = current
|
||
|
|
g_score[neighbor] = tentative_g
|
||
|
|
new_f = tentative_g + self._heuristic(neighbor, goal)
|
||
|
|
f_score[neighbor] = new_f
|
||
|
|
heapq.heappush(heap, (new_f, counter, neighbor))
|
||
|
|
counter += 1
|
||
|
|
|
||
|
|
self._visited = len(visited)
|
||
|
|
return []
|
||
|
|
|
||
|
|
|
||
|
|
class MazeSolver(Observable):
|
||
|
|
def __init__(self, maze: Maze):
|
||
|
|
super().__init__()
|
||
|
|
self._maze = maze
|
||
|
|
self._algorithm: Optional[PathFinder] = None
|
||
|
|
|
||
|
|
def set_algorithm(self, algorithm: PathFinder):
|
||
|
|
self._algorithm = algorithm
|
||
|
|
|
||
|
|
def solve(self) -> Optional[Dict[str, Any]]:
|
||
|
|
if not self._algorithm:
|
||
|
|
raise ValueError("Algorithm not set")
|
||
|
|
|
||
|
|
start_time = time.perf_counter()
|
||
|
|
path = self._algorithm.find(self._maze, self._maze.start, self._maze.exit)
|
||
|
|
end_time = time.perf_counter()
|
||
|
|
|
||
|
|
elapsed_ms = (end_time - start_time) * 1000
|
||
|
|
|
||
|
|
return {
|
||
|
|
'time_ms': elapsed_ms,
|
||
|
|
'visited': self._algorithm.visited_count,
|
||
|
|
'path_length': len(path),
|
||
|
|
'path': path
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
class Command(ABC):
|
||
|
|
@abstractmethod
|
||
|
|
def execute(self) -> bool:
|
||
|
|
pass
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def undo(self) -> bool:
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
class MoveCommand(Command):
|
||
|
|
def __init__(self, player: 'Player', dx: int, dy: int, maze: Maze):
|
||
|
|
self._player = player
|
||
|
|
self._dx = dx
|
||
|
|
self._dy = dy
|
||
|
|
self._maze = maze
|
||
|
|
self._executed = False
|
||
|
|
|
||
|
|
def execute(self) -> bool:
|
||
|
|
new_x = self._player.position.x + self._dx
|
||
|
|
new_y = self._player.position.y + self._dy
|
||
|
|
target = self._maze.get_cell(new_x, new_y)
|
||
|
|
|
||
|
|
if target and target.passable():
|
||
|
|
self._player.move_to(target)
|
||
|
|
self._executed = True
|
||
|
|
return True
|
||
|
|
return False
|
||
|
|
|
||
|
|
def undo(self) -> bool:
|
||
|
|
if self._executed:
|
||
|
|
self._player.undo()
|
||
|
|
self._executed = False
|
||
|
|
return True
|
||
|
|
return False
|
||
|
|
|
||
|
|
|
||
|
|
class Player:
|
||
|
|
def __init__(self, start_tile: Tile):
|
||
|
|
self._position = start_tile
|
||
|
|
self._previous = None
|
||
|
|
|
||
|
|
@property
|
||
|
|
def position(self) -> Tile:
|
||
|
|
return self._position
|
||
|
|
|
||
|
|
def move_to(self, tile: Tile):
|
||
|
|
self._previous = self._position
|
||
|
|
self._position = tile
|
||
|
|
|
||
|
|
def undo(self):
|
||
|
|
if self._previous:
|
||
|
|
self._position, self._previous = self._previous, None
|
||
|
|
|
||
|
|
|
||
|
|
class ConsoleView(Observer):
|
||
|
|
def __init__(self, maze: Maze, player: Optional[Player] = None):
|
||
|
|
self._maze = maze
|
||
|
|
self._player = player
|
||
|
|
self._current_path: List[Tile] = []
|
||
|
|
|
||
|
|
def update(self, event: str, data: Any = None):
|
||
|
|
if event == "solving_finished":
|
||
|
|
self._current_path = data.get('path', [])
|
||
|
|
self._display_solution(data)
|
||
|
|
|
||
|
|
def _display_solution(self, stats: Dict):
|
||
|
|
os.system('cls' if os.name == 'nt' else 'clear')
|
||
|
|
print("=" * (self._maze.width * 2 + 4))
|
||
|
|
print("MAZE SOLUTION")
|
||
|
|
print("=" * (self._maze.width * 2 + 4))
|
||
|
|
|
||
|
|
for y in range(self._maze.height):
|
||
|
|
print(" ", end='')
|
||
|
|
for x in range(self._maze.width):
|
||
|
|
cell = self._maze.get_cell(x, y)
|
||
|
|
if cell == self._maze.start:
|
||
|
|
print('S', end=' ')
|
||
|
|
elif cell == self._maze.exit:
|
||
|
|
print('E', end=' ')
|
||
|
|
elif cell.is_wall:
|
||
|
|
print('#', end=' ')
|
||
|
|
elif self._current_path and cell in self._current_path:
|
||
|
|
print('●', end=' ')
|
||
|
|
else:
|
||
|
|
print('.', end=' ')
|
||
|
|
print()
|
||
|
|
|
||
|
|
print("=" * (self._maze.width * 2 + 4))
|
||
|
|
print(f"Time: {stats['time_ms']:.3f} ms")
|
||
|
|
print(f"Visited: {stats['visited']}")
|
||
|
|
print(f"Path length: {stats['path_length']}")
|
||
|
|
|
||
|
|
def display_maze(self):
|
||
|
|
os.system('cls' if os.name == 'nt' else 'clear')
|
||
|
|
print("=" * (self._maze.width * 2 + 4))
|
||
|
|
print("MAZE")
|
||
|
|
print("=" * (self._maze.width * 2 + 4))
|
||
|
|
|
||
|
|
for y in range(self._maze.height):
|
||
|
|
print(" ", end='')
|
||
|
|
for x in range(self._maze.width):
|
||
|
|
cell = self._maze.get_cell(x, y)
|
||
|
|
if self._player and cell == self._player.position:
|
||
|
|
print('P', end=' ')
|
||
|
|
elif cell == self._maze.start:
|
||
|
|
print('S', end=' ')
|
||
|
|
elif cell == self._maze.exit:
|
||
|
|
print('E', end=' ')
|
||
|
|
elif cell.is_wall:
|
||
|
|
print('#', end=' ')
|
||
|
|
else:
|
||
|
|
print('.', end=' ')
|
||
|
|
print()
|
||
|
|
|
||
|
|
print("=" * (self._maze.width * 2 + 4))
|
||
|
|
print("S - start E - exit # - wall . - path P - player")
|
||
|
|
|
||
|
|
|
||
|
|
def interactive_mode(maze: Maze):
|
||
|
|
player = Player(maze.start)
|
||
|
|
view = ConsoleView(maze, player)
|
||
|
|
view.display_maze()
|
||
|
|
|
||
|
|
solver = MazeSolver(maze)
|
||
|
|
solver.attach(view)
|
||
|
|
|
||
|
|
commands_history: List[Command] = []
|
||
|
|
|
||
|
|
print("\nControls:")
|
||
|
|
print("H (←) J (↓) K (↑) L (→) - move")
|
||
|
|
print("U - undo")
|
||
|
|
print("B - BFS")
|
||
|
|
print("D - DFS")
|
||
|
|
print("A - A*")
|
||
|
|
print("Q - quit")
|
||
|
|
print("\n" + "=" * 50)
|
||
|
|
|
||
|
|
while True:
|
||
|
|
cmd = input("\n> ").lower().strip()
|
||
|
|
|
||
|
|
if cmd == 'q':
|
||
|
|
break
|
||
|
|
|
||
|
|
elif cmd == 'b':
|
||
|
|
solver.set_algorithm(BFS())
|
||
|
|
result = solver.solve()
|
||
|
|
if result:
|
||
|
|
print(f"BFS: {result['time_ms']:.3f} ms, visited={result['visited']}, length={result['path_length']}")
|
||
|
|
|
||
|
|
elif cmd == 'd':
|
||
|
|
solver.set_algorithm(DFS())
|
||
|
|
result = solver.solve()
|
||
|
|
if result:
|
||
|
|
print(f"DFS: {result['time_ms']:.3f} ms, visited={result['visited']}, length={result['path_length']}")
|
||
|
|
|
||
|
|
elif cmd == 'a':
|
||
|
|
solver.set_algorithm(AStar())
|
||
|
|
result = solver.solve()
|
||
|
|
if result:
|
||
|
|
print(f"A*: {result['time_ms']:.3f} ms, visited={result['visited']}, length={result['path_length']}")
|
||
|
|
|
||
|
|
elif cmd in ['h', 'j', 'k', 'l']:
|
||
|
|
dir_map = {'h': (-1, 0), 'l': (1, 0), 'k': (0, -1), 'j': (0, 1)}
|
||
|
|
dx, dy = dir_map[cmd]
|
||
|
|
move = MoveCommand(player, dx, dy, maze)
|
||
|
|
|
||
|
|
if move.execute():
|
||
|
|
commands_history.append(move)
|
||
|
|
view.display_maze()
|
||
|
|
|
||
|
|
if player.position == maze.exit:
|
||
|
|
print("\n*** YOU ESCAPED! ***")
|
||
|
|
print(f"Total moves: {len(commands_history)}")
|
||
|
|
break
|
||
|
|
else:
|
||
|
|
print("Blocked!")
|
||
|
|
|
||
|
|
elif cmd == 'u':
|
||
|
|
if commands_history:
|
||
|
|
last_command = commands_history.pop()
|
||
|
|
last_command.undo()
|
||
|
|
view.display_maze()
|
||
|
|
print("Undo successful")
|
||
|
|
else:
|
||
|
|
print("Nothing to undo")
|
||
|
|
|
||
|
|
else:
|
||
|
|
print("Unknown command")
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
if len(sys.argv) > 1 and sys.argv[1] == 'experiment':
|
||
|
|
import subprocess
|
||
|
|
subprocess.run([sys.executable, 'plots.py'])
|
||
|
|
return
|
||
|
|
|
||
|
|
loader = TextMazeLoader()
|
||
|
|
|
||
|
|
|
||
|
|
maze_file = os.path.join(DATA_PATH, "maze1.txt")
|
||
|
|
|
||
|
|
if not os.path.exists(maze_file):
|
||
|
|
print(f"ERROR: Maze file not found: {maze_file}")
|
||
|
|
print(f"Please create maze1.txt in: {DATA_PATH}")
|
||
|
|
return
|
||
|
|
|
||
|
|
maze = loader.load(maze_file)
|
||
|
|
interactive_mode(maze)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|