2026-rff_mp/SokolovEN/docs/data/maze.py
2026-05-24 21:11:08 +03:00

532 lines
15 KiB
Python

import sys
from collections import deque
import heapq
import time
import os
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
DATA_PATH = r"C:\Users\user\Desktop\2026-rff_mp\SokolovEN\docs\data"
class Observer(ABC):
@abstractmethod
def update(self, event: str, data: Any = None):
pass
class Observable:
def __init__(self):
self._observers: List[Observer] = []
def attach(self, observer: Observer):
self._observers.append(observer)
def detach(self, observer: Observer):
self._observers.remove(observer)
def notify(self, event: str, data: Any = None):
for observer in self._observers:
observer.update(event, data)
class Tile:
def __init__(self, x: int, y: int):
self._x = x
self._y = y
self._wall = False
self._start = False
self._exit = False
@property
def x(self) -> int:
return self._x
@property
def y(self) -> int:
return self._y
@property
def is_wall(self) -> bool:
return self._wall
@is_wall.setter
def is_wall(self, v: bool):
self._wall = v
@property
def is_start(self) -> bool:
return self._start
@is_start.setter
def is_start(self, v: bool):
self._start = v
@property
def is_exit(self) -> bool:
return self._exit
@is_exit.setter
def is_exit(self, v: bool):
self._exit = v
def passable(self) -> bool:
return not self._wall
def __hash__(self):
return hash((self._x, self._y))
def __eq__(self, other):
if not isinstance(other, Tile):
return False
return self._x == other._x and self._y == other._y
class Maze:
def __init__(self, w: int, h: int):
self._w = w
self._h = h
self._cells = [[Tile(x, y) for x in range(w)] for y in range(h)]
self._start: Optional[Tile] = None
self._exit: Optional[Tile] = None
@property
def width(self) -> int:
return self._w
@property
def height(self) -> int:
return self._h
@property
def start(self) -> Optional[Tile]:
return self._start
@property
def exit(self) -> Optional[Tile]:
return self._exit
def get_cell(self, x: int, y: int) -> Optional[Tile]:
if 0 <= x < self._w and 0 <= y < self._h:
return self._cells[y][x]
return None
def set_cell(self, x: int, y: int, kind: str):
c = self.get_cell(x, y)
if not c:
return
if kind == 'wall':
c.is_wall = True
elif kind == 'start':
if self._start:
self._start.is_start = False
c.is_start = True
c.is_wall = False
self._start = c
elif kind == 'exit':
if self._exit:
self._exit.is_exit = False
c.is_exit = True
c.is_wall = False
self._exit = c
elif kind == 'path':
c.is_wall = False
def neighbours(self, cell: Tile) -> List[Tile]:
result = []
for dx, dy in [(0, -1), (0, 1), (-1, 0), (1, 0)]:
nx, ny = cell.x + dx, cell.y + dy
nb = self.get_cell(nx, ny)
if nb and nb.passable():
result.append(nb)
return result
class MazeLoader(ABC):
@abstractmethod
def load(self, filename: str) -> Maze:
pass
class TextMazeLoader(MazeLoader):
def load(self, filename: str) -> Maze:
with open(filename, 'r', encoding='utf-8') as f:
lines = [line.rstrip('\n') for line in f.readlines()]
h = len(lines)
w = max(len(line) for line in lines) if h else 0
start_count = 0
exit_count = 0
maze = Maze(w, h)
for y, line in enumerate(lines):
for x, ch in enumerate(line):
if ch == '#':
maze.set_cell(x, y, 'wall')
elif ch == 'S':
maze.set_cell(x, y, 'start')
start_count += 1
elif ch == 'E':
maze.set_cell(x, y, 'exit')
exit_count += 1
else:
maze.set_cell(x, y, 'path')
if start_count != 1 or exit_count != 1:
raise ValueError(f"Maze must have one S and one E. Found: S={start_count}, E={exit_count}")
return maze
class PathFinder(ABC):
def __init__(self):
self._visited = 0
@abstractmethod
def find(self, maze: Maze, start: Tile, goal: Tile) -> List[Tile]:
pass
def _reconstruct(self, parent: Dict[Tile, Optional[Tile]], start: Tile, goal: Tile) -> List[Tile]:
path = []
current = goal
while current is not None:
path.append(current)
current = parent.get(current)
path.reverse()
return path if path and path[0] == start else []
@property
def visited_count(self) -> int:
return self._visited
class BFS(PathFinder):
def find(self, maze: Maze, start: Tile, goal: Tile) -> List[Tile]:
queue = deque([start])
parent = {start: None}
visited = {start}
while queue:
current = queue.popleft()
if current == goal:
self._visited = len(visited)
return self._reconstruct(parent, start, goal)
for neighbor in maze.neighbours(current):
if neighbor not in visited:
visited.add(neighbor)
parent[neighbor] = current
queue.append(neighbor)
self._visited = len(visited)
return []
class DFS(PathFinder):
def find(self, maze: Maze, start: Tile, goal: Tile) -> List[Tile]:
stack = [start]
parent = {start: None}
visited = {start}
while stack:
current = stack.pop()
if current == goal:
self._visited = len(visited)
return self._reconstruct(parent, start, goal)
for neighbor in maze.neighbours(current):
if neighbor not in visited:
visited.add(neighbor)
parent[neighbor] = current
stack.append(neighbor)
self._visited = len(visited)
return []
class AStar(PathFinder):
def _heuristic(self, cell: Tile, goal: Tile) -> int:
return abs(cell.x - goal.x) + abs(cell.y - goal.y)
def find(self, maze: Maze, start: Tile, goal: Tile) -> List[Tile]:
heap = []
counter = 0
start_f = self._heuristic(start, goal)
heapq.heappush(heap, (start_f, counter, start))
counter += 1
parent = {}
g_score = {start: 0}
f_score = {start: start_f}
visited = set()
while heap:
current_f, _, current = heapq.heappop(heap)
visited.add(current)
if current == goal:
self._visited = len(visited)
return self._reconstruct(parent, start, goal)
if current_f > f_score.get(current, float('inf')):
continue
for neighbor in maze.neighbours(current):
tentative_g = g_score[current] + 1
if tentative_g < g_score.get(neighbor, float('inf')):
parent[neighbor] = current
g_score[neighbor] = tentative_g
new_f = tentative_g + self._heuristic(neighbor, goal)
f_score[neighbor] = new_f
heapq.heappush(heap, (new_f, counter, neighbor))
counter += 1
self._visited = len(visited)
return []
class MazeSolver(Observable):
def __init__(self, maze: Maze):
super().__init__()
self._maze = maze
self._algorithm: Optional[PathFinder] = None
def set_algorithm(self, algorithm: PathFinder):
self._algorithm = algorithm
def solve(self) -> Optional[Dict[str, Any]]:
if not self._algorithm:
raise ValueError("Algorithm not set")
start_time = time.perf_counter()
path = self._algorithm.find(self._maze, self._maze.start, self._maze.exit)
end_time = time.perf_counter()
elapsed_ms = (end_time - start_time) * 1000
return {
'time_ms': elapsed_ms,
'visited': self._algorithm.visited_count,
'path_length': len(path),
'path': path
}
class Command(ABC):
@abstractmethod
def execute(self) -> bool:
pass
@abstractmethod
def undo(self) -> bool:
pass
class MoveCommand(Command):
def __init__(self, player: 'Player', dx: int, dy: int, maze: Maze):
self._player = player
self._dx = dx
self._dy = dy
self._maze = maze
self._executed = False
def execute(self) -> bool:
new_x = self._player.position.x + self._dx
new_y = self._player.position.y + self._dy
target = self._maze.get_cell(new_x, new_y)
if target and target.passable():
self._player.move_to(target)
self._executed = True
return True
return False
def undo(self) -> bool:
if self._executed:
self._player.undo()
self._executed = False
return True
return False
class Player:
def __init__(self, start_tile: Tile):
self._position = start_tile
self._previous = None
@property
def position(self) -> Tile:
return self._position
def move_to(self, tile: Tile):
self._previous = self._position
self._position = tile
def undo(self):
if self._previous:
self._position, self._previous = self._previous, None
class ConsoleView(Observer):
def __init__(self, maze: Maze, player: Optional[Player] = None):
self._maze = maze
self._player = player
self._current_path: List[Tile] = []
def update(self, event: str, data: Any = None):
if event == "solving_finished":
self._current_path = data.get('path', [])
self._display_solution(data)
def _display_solution(self, stats: Dict):
os.system('cls' if os.name == 'nt' else 'clear')
print("=" * (self._maze.width * 2 + 4))
print("MAZE SOLUTION")
print("=" * (self._maze.width * 2 + 4))
for y in range(self._maze.height):
print(" ", end='')
for x in range(self._maze.width):
cell = self._maze.get_cell(x, y)
if cell == self._maze.start:
print('S', end=' ')
elif cell == self._maze.exit:
print('E', end=' ')
elif cell.is_wall:
print('#', end=' ')
elif self._current_path and cell in self._current_path:
print('', end=' ')
else:
print('.', end=' ')
print()
print("=" * (self._maze.width * 2 + 4))
print(f"Time: {stats['time_ms']:.3f} ms")
print(f"Visited: {stats['visited']}")
print(f"Path length: {stats['path_length']}")
def display_maze(self):
os.system('cls' if os.name == 'nt' else 'clear')
print("=" * (self._maze.width * 2 + 4))
print("MAZE")
print("=" * (self._maze.width * 2 + 4))
for y in range(self._maze.height):
print(" ", end='')
for x in range(self._maze.width):
cell = self._maze.get_cell(x, y)
if self._player and cell == self._player.position:
print('P', end=' ')
elif cell == self._maze.start:
print('S', end=' ')
elif cell == self._maze.exit:
print('E', end=' ')
elif cell.is_wall:
print('#', end=' ')
else:
print('.', end=' ')
print()
print("=" * (self._maze.width * 2 + 4))
print("S - start E - exit # - wall . - path P - player")
def interactive_mode(maze: Maze):
player = Player(maze.start)
view = ConsoleView(maze, player)
view.display_maze()
solver = MazeSolver(maze)
solver.attach(view)
commands_history: List[Command] = []
print("\nControls:")
print("H (←) J (↓) K (↑) L (→) - move")
print("U - undo")
print("B - BFS")
print("D - DFS")
print("A - A*")
print("Q - quit")
print("\n" + "=" * 50)
while True:
cmd = input("\n> ").lower().strip()
if cmd == 'q':
break
elif cmd == 'b':
solver.set_algorithm(BFS())
result = solver.solve()
if result:
print(f"BFS: {result['time_ms']:.3f} ms, visited={result['visited']}, length={result['path_length']}")
elif cmd == 'd':
solver.set_algorithm(DFS())
result = solver.solve()
if result:
print(f"DFS: {result['time_ms']:.3f} ms, visited={result['visited']}, length={result['path_length']}")
elif cmd == 'a':
solver.set_algorithm(AStar())
result = solver.solve()
if result:
print(f"A*: {result['time_ms']:.3f} ms, visited={result['visited']}, length={result['path_length']}")
elif cmd in ['h', 'j', 'k', 'l']:
dir_map = {'h': (-1, 0), 'l': (1, 0), 'k': (0, -1), 'j': (0, 1)}
dx, dy = dir_map[cmd]
move = MoveCommand(player, dx, dy, maze)
if move.execute():
commands_history.append(move)
view.display_maze()
if player.position == maze.exit:
print("\n*** YOU ESCAPED! ***")
print(f"Total moves: {len(commands_history)}")
break
else:
print("Blocked!")
elif cmd == 'u':
if commands_history:
last_command = commands_history.pop()
last_command.undo()
view.display_maze()
print("Undo successful")
else:
print("Nothing to undo")
else:
print("Unknown command")
def main():
if len(sys.argv) > 1 and sys.argv[1] == 'experiment':
import subprocess
subprocess.run([sys.executable, 'plots.py'])
return
loader = TextMazeLoader()
maze_file = os.path.join(DATA_PATH, "maze1.txt")
if not os.path.exists(maze_file):
print(f"ERROR: Maze file not found: {maze_file}")
print(f"Please create maze1.txt in: {DATA_PATH}")
return
maze = loader.load(maze_file)
interactive_mode(maze)
if __name__ == "__main__":
main()