forked from UNN/2026-rff_mp
611 lines
19 KiB
Python
611 lines
19 KiB
Python
"""
|
||
maze_solver.py — Поиск выхода из лабиринта.
|
||
|
||
Паттерны GoF: Builder, Strategy, Observer, Command.
|
||
Алгоритмы: BFS, DFS, A*, Dijkstra.
|
||
|
||
Запуск:
|
||
python maze_solver.py --solve mazes/small_10x10.txt --algo bfs
|
||
python maze_solver.py --walk mazes/small_10x10.txt
|
||
python maze_solver.py --experiment
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import csv
|
||
import heapq
|
||
import os
|
||
import statistics
|
||
import sys
|
||
import time
|
||
from abc import ABC, abstractmethod
|
||
from collections import deque
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Tuple
|
||
|
||
sys.setrecursionlimit(100_000)
|
||
|
||
|
||
# ЭТАП 1. МОДЕЛЬ ЛАБИРИНТА — Cell, Maze
|
||
|
||
@dataclass
|
||
class Cell:
|
||
"""Одна клетка лабиринта."""
|
||
x: int
|
||
y: int
|
||
is_wall: bool = False
|
||
is_start: bool = False
|
||
is_exit: bool = False
|
||
weight: int = 1 # 1=асфальт 2=песок 3=болото
|
||
|
||
def is_passable(self) -> bool:
|
||
return not self.is_wall
|
||
|
||
def __repr__(self) -> str:
|
||
if self.is_wall: return "#"
|
||
if self.is_start: return "S"
|
||
if self.is_exit: return "E"
|
||
return " "
|
||
|
||
# heapq требует сравнения объектов
|
||
def __lt__(self, other: Cell) -> bool:
|
||
return (self.x, self.y) < (other.x, other.y)
|
||
|
||
def __hash__(self):
|
||
return hash((self.x, self.y))
|
||
|
||
def __eq__(self, other):
|
||
return isinstance(other, Cell) and self.x == other.x and self.y == other.y
|
||
|
||
|
||
class Maze:
|
||
"""Двумерная сетка клеток."""
|
||
|
||
def __init__(self, width: int, height: int):
|
||
self.width = width
|
||
self.height = height
|
||
self._cells: List[List[Cell]] = [
|
||
[Cell(x, y) for x in range(width)] for y in range(height)
|
||
]
|
||
self.start: Optional[Cell] = None
|
||
self.exit: Optional[Cell] = None
|
||
|
||
def get_cell(self, x: int, y: int) -> Optional[Cell]:
|
||
if 0 <= x < self.width and 0 <= y < self.height:
|
||
return self._cells[y][x]
|
||
return None
|
||
|
||
def get_neighbors(self, cell: Cell) -> List[Cell]:
|
||
"""Четыре соседа — только проходимые, в пределах границ."""
|
||
result = []
|
||
for dx, dy in ((0, -1), (0, 1), (-1, 0), (1, 0)):
|
||
nb = self.get_cell(cell.x + dx, cell.y + dy)
|
||
if nb and nb.is_passable():
|
||
result.append(nb)
|
||
return result
|
||
|
||
|
||
|
||
# ЭТАП 2. ПАТТЕРН BUILDER — загрузка лабиринта из файла
|
||
|
||
_WEIGHT_MAP = {' ': 1, '.': 2, '~': 3}
|
||
|
||
|
||
class MazeBuilder(ABC):
|
||
"""Интерфейс строителя лабиринта."""
|
||
|
||
@abstractmethod
|
||
def build_from_file(self, filename: str) -> Maze: ...
|
||
|
||
@abstractmethod
|
||
def build_from_string(self, text: str) -> Maze: ...
|
||
|
||
|
||
class TextFileMazeBuilder(MazeBuilder):
|
||
"""
|
||
Строит Maze из текстового файла:
|
||
# — стена S — старт E — выход
|
||
' '— путь (w=1) . — песок (w=2) ~ — болото (w=3)
|
||
"""
|
||
|
||
def build_from_file(self, filename: str) -> Maze:
|
||
return self.build_from_string(Path(filename).read_text(encoding="utf-8"))
|
||
|
||
def build_from_string(self, text: str) -> Maze:
|
||
lines = text.splitlines()
|
||
while lines and not lines[-1].strip():
|
||
lines.pop()
|
||
if not lines:
|
||
raise ValueError("Пустой лабиринт")
|
||
|
||
height = len(lines)
|
||
width = max(len(l) for l in lines)
|
||
maze = Maze(width, height)
|
||
|
||
for y, line in enumerate(lines):
|
||
for x, ch in enumerate(line):
|
||
cell = maze.get_cell(x, y)
|
||
if cell is None:
|
||
continue
|
||
if ch == '#':
|
||
cell.is_wall = True
|
||
elif ch == 'S':
|
||
cell.is_start = True
|
||
maze.start = cell
|
||
elif ch == 'E':
|
||
cell.is_exit = True
|
||
maze.exit = cell
|
||
else:
|
||
cell.weight = _WEIGHT_MAP.get(ch, 1)
|
||
for x in range(len(line), width): # дополнить стенами
|
||
c = maze.get_cell(x, y)
|
||
if c:
|
||
c.is_wall = True
|
||
|
||
if maze.start is None:
|
||
raise ValueError("Нет стартовой клетки 'S'")
|
||
if maze.exit is None:
|
||
raise ValueError("Нет выходной клетки 'E'")
|
||
return maze
|
||
|
||
|
||
|
||
# ЭТАП 3. ПАТТЕРН STRATEGY — алгоритмы поиска пути
|
||
|
||
|
||
def _reconstruct(parent: Dict[Cell, Optional[Cell]], end: Cell) -> List[Cell]:
|
||
"""Восстановить путь от старта до end по словарю предшественников."""
|
||
path, node = [], end
|
||
while node is not None:
|
||
path.append(node)
|
||
node = parent.get(node)
|
||
path.reverse()
|
||
return path
|
||
|
||
|
||
class PathFindingStrategy(ABC):
|
||
"""Интерфейс стратегии поиска пути."""
|
||
|
||
@property
|
||
@abstractmethod
|
||
def name(self) -> str: ...
|
||
|
||
@abstractmethod
|
||
def find_path(
|
||
self, maze: Maze, start: Cell, exit_cell: Cell
|
||
) -> Tuple[List[Cell], int]:
|
||
"""Возвращает (path, visited_count). path пуст если пути нет."""
|
||
...
|
||
|
||
|
||
class BFSStrategy(PathFindingStrategy):
|
||
"""Поиск в ширину — гарантирует кратчайший путь (по числу шагов)."""
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "BFS"
|
||
|
||
def find_path(self, maze, start, exit_cell):
|
||
queue = deque([start])
|
||
parent: Dict[Cell, Optional[Cell]] = {start: None}
|
||
visited = 0
|
||
while queue:
|
||
cur = queue.popleft()
|
||
visited += 1
|
||
if cur == exit_cell:
|
||
return _reconstruct(parent, exit_cell), visited
|
||
for nb in maze.get_neighbors(cur):
|
||
if nb not in parent:
|
||
parent[nb] = cur
|
||
queue.append(nb)
|
||
return [], visited
|
||
|
||
|
||
class DFSStrategy(PathFindingStrategy):
|
||
"""Поиск в глубину — не гарантирует кратчайший путь."""
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "DFS"
|
||
|
||
def find_path(self, maze, start, exit_cell):
|
||
stack = [start]
|
||
parent: Dict[Cell, Optional[Cell]] = {start: None}
|
||
visited = 0
|
||
while stack:
|
||
cur = stack.pop()
|
||
visited += 1
|
||
if cur == exit_cell:
|
||
return _reconstruct(parent, exit_cell), visited
|
||
for nb in maze.get_neighbors(cur):
|
||
if nb not in parent:
|
||
parent[nb] = cur
|
||
stack.append(nb)
|
||
return [], visited
|
||
|
||
|
||
class AStarStrategy(PathFindingStrategy):
|
||
"""A* с манхэттенской эвристикой — направленный поиск, учитывает веса."""
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "A*"
|
||
|
||
@staticmethod
|
||
def _h(a: Cell, b: Cell) -> int:
|
||
return abs(a.x - b.x) + abs(a.y - b.y)
|
||
|
||
def find_path(self, maze, start, exit_cell):
|
||
heap: List[Tuple[int, int, Cell]] = [(0, 0, start)]
|
||
g: Dict[Cell, int] = {start: 0}
|
||
parent: Dict[Cell, Optional[Cell]] = {start: None}
|
||
visited = 0
|
||
while heap:
|
||
_, g_cur, cur = heapq.heappop(heap)
|
||
visited += 1
|
||
if cur == exit_cell:
|
||
return _reconstruct(parent, exit_cell), visited
|
||
if g_cur > g.get(cur, float('inf')):
|
||
continue
|
||
for nb in maze.get_neighbors(cur):
|
||
new_g = g_cur + nb.weight
|
||
if new_g < g.get(nb, float('inf')):
|
||
g[nb] = new_g
|
||
parent[nb] = cur
|
||
heapq.heappush(heap, (new_g + self._h(nb, exit_cell), new_g, nb))
|
||
return [], visited
|
||
|
||
|
||
class DijkstraStrategy(PathFindingStrategy):
|
||
"""Дейкстра — оптимален для взвешенных графов, без эвристики."""
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "Dijkstra"
|
||
|
||
def find_path(self, maze, start, exit_cell):
|
||
dist: Dict[Cell, int] = {start: 0}
|
||
parent: Dict[Cell, Optional[Cell]] = {start: None}
|
||
heap: List[Tuple[int, Cell]] = [(0, start)]
|
||
visited = 0
|
||
while heap:
|
||
d, cur = heapq.heappop(heap)
|
||
visited += 1
|
||
if cur == exit_cell:
|
||
return _reconstruct(parent, exit_cell), visited
|
||
if d > dist.get(cur, float('inf')):
|
||
continue
|
||
for nb in maze.get_neighbors(cur):
|
||
new_d = d + nb.weight
|
||
if new_d < dist.get(nb, float('inf')):
|
||
dist[nb] = new_d
|
||
parent[nb] = cur
|
||
heapq.heappush(heap, (new_d, nb))
|
||
return [], visited
|
||
|
||
|
||
|
||
# ПАТТЕРН OBSERVER
|
||
|
||
|
||
@dataclass
|
||
class SearchStats:
|
||
time_ms: float
|
||
visited_cells: int
|
||
path_length: int
|
||
strategy_name: str
|
||
|
||
def __str__(self) -> str:
|
||
return (f"[{self.strategy_name}] "
|
||
f"время={self.time_ms:.2f} мс "
|
||
f"посещено={self.visited_cells} "
|
||
f"длина пути={self.path_length}")
|
||
|
||
|
||
class Observer(ABC):
|
||
@abstractmethod
|
||
def update(self, event: dict) -> None: ...
|
||
|
||
|
||
class ConsoleView(Observer):
|
||
"""Выводит события и рисует лабиринт в консоли."""
|
||
|
||
def update(self, event: dict) -> None:
|
||
kind = event.get("type")
|
||
if kind == "maze_loaded":
|
||
print(f"[Лабиринт загружен] {event['width']}×{event['height']}")
|
||
elif kind == "search_start":
|
||
print(f"[Поиск] алгоритм={event['strategy']}")
|
||
elif kind == "path_found":
|
||
print(f"[Готово] {event['stats']}")
|
||
elif kind == "no_path":
|
||
print("[Результат] Путь не найден!")
|
||
|
||
def render(
|
||
self,
|
||
maze: Maze,
|
||
player_pos: Optional[Cell] = None,
|
||
path: Optional[List[Cell]] = None,
|
||
) -> None:
|
||
path_set = set(path) if path else set()
|
||
for y in range(maze.height):
|
||
row = []
|
||
for x in range(maze.width):
|
||
cell = maze.get_cell(x, y)
|
||
if cell is None:
|
||
row.append("?")
|
||
elif player_pos and cell == player_pos:
|
||
row.append("@")
|
||
elif cell.is_wall:
|
||
row.append("█")
|
||
elif cell.is_start:
|
||
row.append("S")
|
||
elif cell.is_exit:
|
||
row.append("E")
|
||
elif cell in path_set:
|
||
row.append("*")
|
||
elif cell.weight == 3:
|
||
row.append("~")
|
||
elif cell.weight == 2:
|
||
row.append(".")
|
||
else:
|
||
row.append(" ")
|
||
print("".join(row))
|
||
print()
|
||
|
||
|
||
|
||
# ЭТАП 4. ОРКЕСТРАТОР MazeSolver
|
||
|
||
class MazeSolver:
|
||
"""
|
||
Связывает Maze + PathFindingStrategy + список Observer.
|
||
Паттерны: Strategy (алгоритм подключается снаружи),
|
||
Observer (уведомления при завершении).
|
||
"""
|
||
|
||
def __init__(self, maze: Maze, strategy: PathFindingStrategy):
|
||
self.maze = maze
|
||
self._strategy = strategy
|
||
self._observers: List[Observer] = []
|
||
self._last_path: List[Cell] = []
|
||
|
||
def add_observer(self, obs: Observer) -> None:
|
||
self._observers.append(obs)
|
||
|
||
def set_strategy(self, strategy: PathFindingStrategy) -> None:
|
||
self._strategy = strategy
|
||
|
||
def _notify(self, event: dict) -> None:
|
||
for obs in self._observers:
|
||
obs.update(event)
|
||
|
||
def solve(self) -> SearchStats:
|
||
if not self.maze.start or not self.maze.exit:
|
||
raise ValueError("Лабиринт не содержит старт или выход")
|
||
|
||
self._notify({"type": "search_start", "strategy": self._strategy.name})
|
||
|
||
t0 = time.perf_counter()
|
||
path, visited = self._strategy.find_path(
|
||
self.maze, self.maze.start, self.maze.exit
|
||
)
|
||
elapsed_ms = (time.perf_counter() - t0) * 1000
|
||
|
||
self._last_path = path
|
||
stats = SearchStats(elapsed_ms, visited, len(path), self._strategy.name)
|
||
|
||
self._notify({"type": "path_found" if path else "no_path", "stats": stats})
|
||
return stats
|
||
|
||
@property
|
||
def last_path(self) -> List[Cell]:
|
||
return self._last_path
|
||
|
||
|
||
|
||
# ЭТАП 5. ПАТТЕРН COMMAND — пошаговое управление игроком
|
||
|
||
|
||
_DIRECTIONS = {'W': (0, -1), 'S': (0, 1), 'A': (-1, 0), 'D': (1, 0)}
|
||
|
||
|
||
class Command(ABC):
|
||
@abstractmethod
|
||
def execute(self) -> bool: ...
|
||
@abstractmethod
|
||
def undo(self) -> None: ...
|
||
|
||
|
||
class Player:
|
||
def __init__(self, cell: Cell):
|
||
self.current_cell = cell
|
||
|
||
def move_to(self, cell: Cell) -> None:
|
||
self.current_cell = cell
|
||
|
||
|
||
class MoveCommand(Command):
|
||
"""Перемещение игрока в направлении direction (W/A/S/D)."""
|
||
|
||
def __init__(self, player: Player, direction: str, maze: Maze,
|
||
observers: Optional[List[Observer]] = None):
|
||
self._player = player
|
||
self._direction = direction.upper()
|
||
self._maze = maze
|
||
self._observers = observers or []
|
||
self._prev: Optional[Cell] = None
|
||
|
||
def execute(self) -> bool:
|
||
dx, dy = _DIRECTIONS.get(self._direction, (0, 0))
|
||
target = self._maze.get_cell(
|
||
self._player.current_cell.x + dx,
|
||
self._player.current_cell.y + dy,
|
||
)
|
||
if target and target.is_passable():
|
||
self._prev = self._player.current_cell
|
||
self._player.move_to(target)
|
||
for obs in self._observers:
|
||
obs.update({"type": "move", "cell": target})
|
||
return True
|
||
return False
|
||
|
||
def undo(self) -> None:
|
||
if self._prev:
|
||
self._player.move_to(self._prev)
|
||
self._prev = None
|
||
|
||
|
||
class CommandHistory:
|
||
"""Стек выполненных команд (undo/redo)."""
|
||
|
||
def __init__(self):
|
||
self._stack: List[Command] = []
|
||
|
||
def execute(self, cmd: Command) -> bool:
|
||
ok = cmd.execute()
|
||
if ok:
|
||
self._stack.append(cmd)
|
||
return ok
|
||
|
||
def undo(self) -> bool:
|
||
if self._stack:
|
||
self._stack.pop().undo()
|
||
return True
|
||
return False
|
||
|
||
|
||
|
||
# ЭТАП 6. ЭКСПЕРИМЕНТАЛЬНАЯ ЧАСТЬ
|
||
|
||
|
||
def run_experiment(
|
||
maze_files: List[Tuple[str, str]],
|
||
strategies: List[PathFindingStrategy],
|
||
repeats: int = 7,
|
||
out_csv: str = "results.csv",
|
||
) -> None:
|
||
builder = TextFileMazeBuilder()
|
||
rows = [["maze", "strategy", "run", "time_ms", "visited_cells", "path_length"]]
|
||
|
||
for maze_name, maze_file in maze_files:
|
||
print(f"\n=== {maze_name} ===")
|
||
maze = builder.build_from_file(maze_file)
|
||
print(f" Размер: {maze.width}×{maze.height}")
|
||
|
||
for strategy in strategies:
|
||
solver = MazeSolver(maze, strategy)
|
||
times, visits, lengths = [], [], []
|
||
|
||
for run in range(1, repeats + 1):
|
||
stats = solver.solve()
|
||
times.append(stats.time_ms)
|
||
visits.append(stats.visited_cells)
|
||
lengths.append(stats.path_length)
|
||
rows.append([maze_name, strategy.name, run,
|
||
round(stats.time_ms, 4),
|
||
stats.visited_cells,
|
||
stats.path_length])
|
||
|
||
print(f" {strategy.name:10} | "
|
||
f"t={statistics.mean(times):.3f} мс | "
|
||
f"посещено={statistics.mean(visits):.0f} | "
|
||
f"путь={statistics.mean(lengths):.0f}")
|
||
|
||
os.makedirs(os.path.dirname(os.path.abspath(out_csv)), exist_ok=True)
|
||
with open(out_csv, "w", newline="", encoding="utf-8") as f:
|
||
csv.writer(f).writerows(rows)
|
||
print(f"\nСохранено: {out_csv}")
|
||
|
||
|
||
|
||
# CLI
|
||
|
||
|
||
_ALGO_MAP = {
|
||
"bfs": BFSStrategy(),
|
||
"dfs": DFSStrategy(),
|
||
"astar": AStarStrategy(),
|
||
"dijkstra": DijkstraStrategy(),
|
||
}
|
||
|
||
_MAZES = [
|
||
("small_10x10", "small_10x10.txt"),
|
||
("medium_50x50", "medium_50x50.txt"),
|
||
("large_100x100", "large_100x100.txt"),
|
||
("empty_30x30", "empty_30x30.txt"),
|
||
("no_exit_20x20", "no_exit_20x20.txt"),
|
||
("weighted_40x40", "weighted_40x40.txt"),
|
||
]
|
||
|
||
|
||
def cmd_solve(maze_file: str, algo: str) -> None:
|
||
view = ConsoleView()
|
||
maze = TextFileMazeBuilder().build_from_file(maze_file)
|
||
view.update({"type": "maze_loaded", "width": maze.width, "height": maze.height})
|
||
|
||
strategy = _ALGO_MAP.get(algo.lower())
|
||
if not strategy:
|
||
print(f"Неизвестный алгоритм '{algo}'. Доступны: {', '.join(_ALGO_MAP)}")
|
||
return
|
||
|
||
solver = MazeSolver(maze, strategy)
|
||
solver.add_observer(view)
|
||
solver.solve()
|
||
view.render(maze, path=solver.last_path)
|
||
|
||
|
||
def cmd_walk(maze_file: str) -> None:
|
||
view = ConsoleView()
|
||
maze = TextFileMazeBuilder().build_from_file(maze_file)
|
||
player = Player(maze.start)
|
||
history = CommandHistory()
|
||
|
||
solver = MazeSolver(maze, BFSStrategy())
|
||
solver.add_observer(view)
|
||
solver.solve()
|
||
path = solver.last_path
|
||
|
||
print("W=вверх S=вниз A=влево D=вправо Z=отмена Q=выход")
|
||
while True:
|
||
view.render(maze, player_pos=player.current_cell, path=path)
|
||
if player.current_cell == maze.exit:
|
||
print("Вы достигли выхода!")
|
||
break
|
||
key = input("Ход: ").strip().upper()
|
||
if key == "Q":
|
||
break
|
||
elif key == "Z":
|
||
if not history.undo():
|
||
print("Нечего отменять.")
|
||
elif key in _DIRECTIONS:
|
||
if not history.execute(MoveCommand(player, key, maze, [view])):
|
||
print("Туда нельзя — стена.")
|
||
else:
|
||
print("Неизвестная клавиша.")
|
||
|
||
|
||
def cmd_experiment() -> None:
|
||
run_experiment(_MAZES, list(_ALGO_MAP.values()))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
parser = argparse.ArgumentParser(description="Maze solver")
|
||
parser.add_argument("--solve", metavar="FILE", help="Решить лабиринт")
|
||
parser.add_argument("--algo", metavar="ALGO", default="bfs",
|
||
help="bfs | dfs | astar | dijkstra")
|
||
parser.add_argument("--walk", metavar="FILE", help="Ручное управление (WASD)")
|
||
parser.add_argument("--experiment", action="store_true",
|
||
help="Запустить замеры и сохранить CSV")
|
||
args = parser.parse_args()
|
||
|
||
if args.solve:
|
||
cmd_solve(args.solve, args.algo)
|
||
elif args.walk:
|
||
cmd_walk(args.walk)
|
||
elif args.experiment:
|
||
cmd_experiment()
|
||
else:
|
||
parser.print_help() |