added main.py
This commit is contained in:
parent
a9deaa51ec
commit
7cbd75ee34
611
soninrv/docs/data/lab2/main.py
Normal file
611
soninrv/docs/data/lab2/main.py
Normal file
|
|
@ -0,0 +1,611 @@
|
|||
"""
|
||||
maze_solver.py — Поиск выхода из лабиринта.
|
||||
|
||||
Паттерны GoF: Builder, Strategy, Observer, Command.
|
||||
Алгоритмы: BFS, DFS, A*, Dijkstra.
|
||||
|
||||
Запуск:
|
||||
python maze_solver.py --solve mazes/small_10x10.txt --algo bfs
|
||||
python maze_solver.py --walk mazes/small_10x10.txt
|
||||
python maze_solver.py --experiment
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import heapq
|
||||
import os
|
||||
import statistics
|
||||
import sys
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from collections import deque
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
sys.setrecursionlimit(100_000)
|
||||
|
||||
|
||||
# ЭТАП 1. МОДЕЛЬ ЛАБИРИНТА — Cell, Maze
|
||||
|
||||
@dataclass
|
||||
class Cell:
|
||||
"""Одна клетка лабиринта."""
|
||||
x: int
|
||||
y: int
|
||||
is_wall: bool = False
|
||||
is_start: bool = False
|
||||
is_exit: bool = False
|
||||
weight: int = 1 # 1=асфальт 2=песок 3=болото
|
||||
|
||||
def is_passable(self) -> bool:
|
||||
return not self.is_wall
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if self.is_wall: return "#"
|
||||
if self.is_start: return "S"
|
||||
if self.is_exit: return "E"
|
||||
return " "
|
||||
|
||||
# heapq требует сравнения объектов
|
||||
def __lt__(self, other: Cell) -> bool:
|
||||
return (self.x, self.y) < (other.x, other.y)
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.x, self.y))
|
||||
|
||||
def __eq__(self, other):
|
||||
return isinstance(other, Cell) and self.x == other.x and self.y == other.y
|
||||
|
||||
|
||||
class Maze:
|
||||
"""Двумерная сетка клеток."""
|
||||
|
||||
def __init__(self, width: int, height: int):
|
||||
self.width = width
|
||||
self.height = height
|
||||
self._cells: List[List[Cell]] = [
|
||||
[Cell(x, y) for x in range(width)] for y in range(height)
|
||||
]
|
||||
self.start: Optional[Cell] = None
|
||||
self.exit: Optional[Cell] = None
|
||||
|
||||
def get_cell(self, x: int, y: int) -> Optional[Cell]:
|
||||
if 0 <= x < self.width and 0 <= y < self.height:
|
||||
return self._cells[y][x]
|
||||
return None
|
||||
|
||||
def get_neighbors(self, cell: Cell) -> List[Cell]:
|
||||
"""Четыре соседа — только проходимые, в пределах границ."""
|
||||
result = []
|
||||
for dx, dy in ((0, -1), (0, 1), (-1, 0), (1, 0)):
|
||||
nb = self.get_cell(cell.x + dx, cell.y + dy)
|
||||
if nb and nb.is_passable():
|
||||
result.append(nb)
|
||||
return result
|
||||
|
||||
|
||||
|
||||
# ЭТАП 2. ПАТТЕРН BUILDER — загрузка лабиринта из файла
|
||||
|
||||
_WEIGHT_MAP = {' ': 1, '.': 2, '~': 3}
|
||||
|
||||
|
||||
class MazeBuilder(ABC):
|
||||
"""Интерфейс строителя лабиринта."""
|
||||
|
||||
@abstractmethod
|
||||
def build_from_file(self, filename: str) -> Maze: ...
|
||||
|
||||
@abstractmethod
|
||||
def build_from_string(self, text: str) -> Maze: ...
|
||||
|
||||
|
||||
class TextFileMazeBuilder(MazeBuilder):
|
||||
"""
|
||||
Строит Maze из текстового файла:
|
||||
# — стена S — старт E — выход
|
||||
' '— путь (w=1) . — песок (w=2) ~ — болото (w=3)
|
||||
"""
|
||||
|
||||
def build_from_file(self, filename: str) -> Maze:
|
||||
return self.build_from_string(Path(filename).read_text(encoding="utf-8"))
|
||||
|
||||
def build_from_string(self, text: str) -> Maze:
|
||||
lines = text.splitlines()
|
||||
while lines and not lines[-1].strip():
|
||||
lines.pop()
|
||||
if not lines:
|
||||
raise ValueError("Пустой лабиринт")
|
||||
|
||||
height = len(lines)
|
||||
width = max(len(l) for l in lines)
|
||||
maze = Maze(width, height)
|
||||
|
||||
for y, line in enumerate(lines):
|
||||
for x, ch in enumerate(line):
|
||||
cell = maze.get_cell(x, y)
|
||||
if cell is None:
|
||||
continue
|
||||
if ch == '#':
|
||||
cell.is_wall = True
|
||||
elif ch == 'S':
|
||||
cell.is_start = True
|
||||
maze.start = cell
|
||||
elif ch == 'E':
|
||||
cell.is_exit = True
|
||||
maze.exit = cell
|
||||
else:
|
||||
cell.weight = _WEIGHT_MAP.get(ch, 1)
|
||||
for x in range(len(line), width): # дополнить стенами
|
||||
c = maze.get_cell(x, y)
|
||||
if c:
|
||||
c.is_wall = True
|
||||
|
||||
if maze.start is None:
|
||||
raise ValueError("Нет стартовой клетки 'S'")
|
||||
if maze.exit is None:
|
||||
raise ValueError("Нет выходной клетки 'E'")
|
||||
return maze
|
||||
|
||||
|
||||
|
||||
# ЭТАП 3. ПАТТЕРН STRATEGY — алгоритмы поиска пути
|
||||
|
||||
|
||||
def _reconstruct(parent: Dict[Cell, Optional[Cell]], end: Cell) -> List[Cell]:
|
||||
"""Восстановить путь от старта до end по словарю предшественников."""
|
||||
path, node = [], end
|
||||
while node is not None:
|
||||
path.append(node)
|
||||
node = parent.get(node)
|
||||
path.reverse()
|
||||
return path
|
||||
|
||||
|
||||
class PathFindingStrategy(ABC):
|
||||
"""Интерфейс стратегии поиска пути."""
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def name(self) -> str: ...
|
||||
|
||||
@abstractmethod
|
||||
def find_path(
|
||||
self, maze: Maze, start: Cell, exit_cell: Cell
|
||||
) -> Tuple[List[Cell], int]:
|
||||
"""Возвращает (path, visited_count). path пуст если пути нет."""
|
||||
...
|
||||
|
||||
|
||||
class BFSStrategy(PathFindingStrategy):
|
||||
"""Поиск в ширину — гарантирует кратчайший путь (по числу шагов)."""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "BFS"
|
||||
|
||||
def find_path(self, maze, start, exit_cell):
|
||||
queue = deque([start])
|
||||
parent: Dict[Cell, Optional[Cell]] = {start: None}
|
||||
visited = 0
|
||||
while queue:
|
||||
cur = queue.popleft()
|
||||
visited += 1
|
||||
if cur == exit_cell:
|
||||
return _reconstruct(parent, exit_cell), visited
|
||||
for nb in maze.get_neighbors(cur):
|
||||
if nb not in parent:
|
||||
parent[nb] = cur
|
||||
queue.append(nb)
|
||||
return [], visited
|
||||
|
||||
|
||||
class DFSStrategy(PathFindingStrategy):
|
||||
"""Поиск в глубину — не гарантирует кратчайший путь."""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "DFS"
|
||||
|
||||
def find_path(self, maze, start, exit_cell):
|
||||
stack = [start]
|
||||
parent: Dict[Cell, Optional[Cell]] = {start: None}
|
||||
visited = 0
|
||||
while stack:
|
||||
cur = stack.pop()
|
||||
visited += 1
|
||||
if cur == exit_cell:
|
||||
return _reconstruct(parent, exit_cell), visited
|
||||
for nb in maze.get_neighbors(cur):
|
||||
if nb not in parent:
|
||||
parent[nb] = cur
|
||||
stack.append(nb)
|
||||
return [], visited
|
||||
|
||||
|
||||
class AStarStrategy(PathFindingStrategy):
|
||||
"""A* с манхэттенской эвристикой — направленный поиск, учитывает веса."""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "A*"
|
||||
|
||||
@staticmethod
|
||||
def _h(a: Cell, b: Cell) -> int:
|
||||
return abs(a.x - b.x) + abs(a.y - b.y)
|
||||
|
||||
def find_path(self, maze, start, exit_cell):
|
||||
heap: List[Tuple[int, int, Cell]] = [(0, 0, start)]
|
||||
g: Dict[Cell, int] = {start: 0}
|
||||
parent: Dict[Cell, Optional[Cell]] = {start: None}
|
||||
visited = 0
|
||||
while heap:
|
||||
_, g_cur, cur = heapq.heappop(heap)
|
||||
visited += 1
|
||||
if cur == exit_cell:
|
||||
return _reconstruct(parent, exit_cell), visited
|
||||
if g_cur > g.get(cur, float('inf')):
|
||||
continue
|
||||
for nb in maze.get_neighbors(cur):
|
||||
new_g = g_cur + nb.weight
|
||||
if new_g < g.get(nb, float('inf')):
|
||||
g[nb] = new_g
|
||||
parent[nb] = cur
|
||||
heapq.heappush(heap, (new_g + self._h(nb, exit_cell), new_g, nb))
|
||||
return [], visited
|
||||
|
||||
|
||||
class DijkstraStrategy(PathFindingStrategy):
|
||||
"""Дейкстра — оптимален для взвешенных графов, без эвристики."""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "Dijkstra"
|
||||
|
||||
def find_path(self, maze, start, exit_cell):
|
||||
dist: Dict[Cell, int] = {start: 0}
|
||||
parent: Dict[Cell, Optional[Cell]] = {start: None}
|
||||
heap: List[Tuple[int, Cell]] = [(0, start)]
|
||||
visited = 0
|
||||
while heap:
|
||||
d, cur = heapq.heappop(heap)
|
||||
visited += 1
|
||||
if cur == exit_cell:
|
||||
return _reconstruct(parent, exit_cell), visited
|
||||
if d > dist.get(cur, float('inf')):
|
||||
continue
|
||||
for nb in maze.get_neighbors(cur):
|
||||
new_d = d + nb.weight
|
||||
if new_d < dist.get(nb, float('inf')):
|
||||
dist[nb] = new_d
|
||||
parent[nb] = cur
|
||||
heapq.heappush(heap, (new_d, nb))
|
||||
return [], visited
|
||||
|
||||
|
||||
|
||||
# ПАТТЕРН OBSERVER
|
||||
|
||||
|
||||
@dataclass
|
||||
class SearchStats:
|
||||
time_ms: float
|
||||
visited_cells: int
|
||||
path_length: int
|
||||
strategy_name: str
|
||||
|
||||
def __str__(self) -> str:
|
||||
return (f"[{self.strategy_name}] "
|
||||
f"время={self.time_ms:.2f} мс "
|
||||
f"посещено={self.visited_cells} "
|
||||
f"длина пути={self.path_length}")
|
||||
|
||||
|
||||
class Observer(ABC):
|
||||
@abstractmethod
|
||||
def update(self, event: dict) -> None: ...
|
||||
|
||||
|
||||
class ConsoleView(Observer):
|
||||
"""Выводит события и рисует лабиринт в консоли."""
|
||||
|
||||
def update(self, event: dict) -> None:
|
||||
kind = event.get("type")
|
||||
if kind == "maze_loaded":
|
||||
print(f"[Лабиринт загружен] {event['width']}×{event['height']}")
|
||||
elif kind == "search_start":
|
||||
print(f"[Поиск] алгоритм={event['strategy']}")
|
||||
elif kind == "path_found":
|
||||
print(f"[Готово] {event['stats']}")
|
||||
elif kind == "no_path":
|
||||
print("[Результат] Путь не найден!")
|
||||
|
||||
def render(
|
||||
self,
|
||||
maze: Maze,
|
||||
player_pos: Optional[Cell] = None,
|
||||
path: Optional[List[Cell]] = None,
|
||||
) -> None:
|
||||
path_set = set(path) if path else set()
|
||||
for y in range(maze.height):
|
||||
row = []
|
||||
for x in range(maze.width):
|
||||
cell = maze.get_cell(x, y)
|
||||
if cell is None:
|
||||
row.append("?")
|
||||
elif player_pos and cell == player_pos:
|
||||
row.append("@")
|
||||
elif cell.is_wall:
|
||||
row.append("█")
|
||||
elif cell.is_start:
|
||||
row.append("S")
|
||||
elif cell.is_exit:
|
||||
row.append("E")
|
||||
elif cell in path_set:
|
||||
row.append("*")
|
||||
elif cell.weight == 3:
|
||||
row.append("~")
|
||||
elif cell.weight == 2:
|
||||
row.append(".")
|
||||
else:
|
||||
row.append(" ")
|
||||
print("".join(row))
|
||||
print()
|
||||
|
||||
|
||||
|
||||
# ЭТАП 4. ОРКЕСТРАТОР MazeSolver
|
||||
|
||||
class MazeSolver:
|
||||
"""
|
||||
Связывает Maze + PathFindingStrategy + список Observer.
|
||||
Паттерны: Strategy (алгоритм подключается снаружи),
|
||||
Observer (уведомления при завершении).
|
||||
"""
|
||||
|
||||
def __init__(self, maze: Maze, strategy: PathFindingStrategy):
|
||||
self.maze = maze
|
||||
self._strategy = strategy
|
||||
self._observers: List[Observer] = []
|
||||
self._last_path: List[Cell] = []
|
||||
|
||||
def add_observer(self, obs: Observer) -> None:
|
||||
self._observers.append(obs)
|
||||
|
||||
def set_strategy(self, strategy: PathFindingStrategy) -> None:
|
||||
self._strategy = strategy
|
||||
|
||||
def _notify(self, event: dict) -> None:
|
||||
for obs in self._observers:
|
||||
obs.update(event)
|
||||
|
||||
def solve(self) -> SearchStats:
|
||||
if not self.maze.start or not self.maze.exit:
|
||||
raise ValueError("Лабиринт не содержит старт или выход")
|
||||
|
||||
self._notify({"type": "search_start", "strategy": self._strategy.name})
|
||||
|
||||
t0 = time.perf_counter()
|
||||
path, visited = self._strategy.find_path(
|
||||
self.maze, self.maze.start, self.maze.exit
|
||||
)
|
||||
elapsed_ms = (time.perf_counter() - t0) * 1000
|
||||
|
||||
self._last_path = path
|
||||
stats = SearchStats(elapsed_ms, visited, len(path), self._strategy.name)
|
||||
|
||||
self._notify({"type": "path_found" if path else "no_path", "stats": stats})
|
||||
return stats
|
||||
|
||||
@property
|
||||
def last_path(self) -> List[Cell]:
|
||||
return self._last_path
|
||||
|
||||
|
||||
|
||||
# ЭТАП 5. ПАТТЕРН COMMAND — пошаговое управление игроком
|
||||
|
||||
|
||||
_DIRECTIONS = {'W': (0, -1), 'S': (0, 1), 'A': (-1, 0), 'D': (1, 0)}
|
||||
|
||||
|
||||
class Command(ABC):
|
||||
@abstractmethod
|
||||
def execute(self) -> bool: ...
|
||||
@abstractmethod
|
||||
def undo(self) -> None: ...
|
||||
|
||||
|
||||
class Player:
|
||||
def __init__(self, cell: Cell):
|
||||
self.current_cell = cell
|
||||
|
||||
def move_to(self, cell: Cell) -> None:
|
||||
self.current_cell = cell
|
||||
|
||||
|
||||
class MoveCommand(Command):
|
||||
"""Перемещение игрока в направлении direction (W/A/S/D)."""
|
||||
|
||||
def __init__(self, player: Player, direction: str, maze: Maze,
|
||||
observers: Optional[List[Observer]] = None):
|
||||
self._player = player
|
||||
self._direction = direction.upper()
|
||||
self._maze = maze
|
||||
self._observers = observers or []
|
||||
self._prev: Optional[Cell] = None
|
||||
|
||||
def execute(self) -> bool:
|
||||
dx, dy = _DIRECTIONS.get(self._direction, (0, 0))
|
||||
target = self._maze.get_cell(
|
||||
self._player.current_cell.x + dx,
|
||||
self._player.current_cell.y + dy,
|
||||
)
|
||||
if target and target.is_passable():
|
||||
self._prev = self._player.current_cell
|
||||
self._player.move_to(target)
|
||||
for obs in self._observers:
|
||||
obs.update({"type": "move", "cell": target})
|
||||
return True
|
||||
return False
|
||||
|
||||
def undo(self) -> None:
|
||||
if self._prev:
|
||||
self._player.move_to(self._prev)
|
||||
self._prev = None
|
||||
|
||||
|
||||
class CommandHistory:
|
||||
"""Стек выполненных команд (undo/redo)."""
|
||||
|
||||
def __init__(self):
|
||||
self._stack: List[Command] = []
|
||||
|
||||
def execute(self, cmd: Command) -> bool:
|
||||
ok = cmd.execute()
|
||||
if ok:
|
||||
self._stack.append(cmd)
|
||||
return ok
|
||||
|
||||
def undo(self) -> bool:
|
||||
if self._stack:
|
||||
self._stack.pop().undo()
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
|
||||
# ЭТАП 6. ЭКСПЕРИМЕНТАЛЬНАЯ ЧАСТЬ
|
||||
|
||||
|
||||
def run_experiment(
|
||||
maze_files: List[Tuple[str, str]],
|
||||
strategies: List[PathFindingStrategy],
|
||||
repeats: int = 7,
|
||||
out_csv: str = "results.csv",
|
||||
) -> None:
|
||||
builder = TextFileMazeBuilder()
|
||||
rows = [["maze", "strategy", "run", "time_ms", "visited_cells", "path_length"]]
|
||||
|
||||
for maze_name, maze_file in maze_files:
|
||||
print(f"\n=== {maze_name} ===")
|
||||
maze = builder.build_from_file(maze_file)
|
||||
print(f" Размер: {maze.width}×{maze.height}")
|
||||
|
||||
for strategy in strategies:
|
||||
solver = MazeSolver(maze, strategy)
|
||||
times, visits, lengths = [], [], []
|
||||
|
||||
for run in range(1, repeats + 1):
|
||||
stats = solver.solve()
|
||||
times.append(stats.time_ms)
|
||||
visits.append(stats.visited_cells)
|
||||
lengths.append(stats.path_length)
|
||||
rows.append([maze_name, strategy.name, run,
|
||||
round(stats.time_ms, 4),
|
||||
stats.visited_cells,
|
||||
stats.path_length])
|
||||
|
||||
print(f" {strategy.name:10} | "
|
||||
f"t={statistics.mean(times):.3f} мс | "
|
||||
f"посещено={statistics.mean(visits):.0f} | "
|
||||
f"путь={statistics.mean(lengths):.0f}")
|
||||
|
||||
os.makedirs(os.path.dirname(os.path.abspath(out_csv)), exist_ok=True)
|
||||
with open(out_csv, "w", newline="", encoding="utf-8") as f:
|
||||
csv.writer(f).writerows(rows)
|
||||
print(f"\nСохранено: {out_csv}")
|
||||
|
||||
|
||||
|
||||
# CLI
|
||||
|
||||
|
||||
_ALGO_MAP = {
|
||||
"bfs": BFSStrategy(),
|
||||
"dfs": DFSStrategy(),
|
||||
"astar": AStarStrategy(),
|
||||
"dijkstra": DijkstraStrategy(),
|
||||
}
|
||||
|
||||
_MAZES = [
|
||||
("small_10x10", "small_10x10.txt"),
|
||||
("medium_50x50", "medium_50x50.txt"),
|
||||
("large_100x100", "large_100x100.txt"),
|
||||
("empty_30x30", "empty_30x30.txt"),
|
||||
("no_exit_20x20", "no_exit_20x20.txt"),
|
||||
("weighted_40x40", "weighted_40x40.txt"),
|
||||
]
|
||||
|
||||
|
||||
def cmd_solve(maze_file: str, algo: str) -> None:
|
||||
view = ConsoleView()
|
||||
maze = TextFileMazeBuilder().build_from_file(maze_file)
|
||||
view.update({"type": "maze_loaded", "width": maze.width, "height": maze.height})
|
||||
|
||||
strategy = _ALGO_MAP.get(algo.lower())
|
||||
if not strategy:
|
||||
print(f"Неизвестный алгоритм '{algo}'. Доступны: {', '.join(_ALGO_MAP)}")
|
||||
return
|
||||
|
||||
solver = MazeSolver(maze, strategy)
|
||||
solver.add_observer(view)
|
||||
solver.solve()
|
||||
view.render(maze, path=solver.last_path)
|
||||
|
||||
|
||||
def cmd_walk(maze_file: str) -> None:
|
||||
view = ConsoleView()
|
||||
maze = TextFileMazeBuilder().build_from_file(maze_file)
|
||||
player = Player(maze.start)
|
||||
history = CommandHistory()
|
||||
|
||||
solver = MazeSolver(maze, BFSStrategy())
|
||||
solver.add_observer(view)
|
||||
solver.solve()
|
||||
path = solver.last_path
|
||||
|
||||
print("W=вверх S=вниз A=влево D=вправо Z=отмена Q=выход")
|
||||
while True:
|
||||
view.render(maze, player_pos=player.current_cell, path=path)
|
||||
if player.current_cell == maze.exit:
|
||||
print("Вы достигли выхода!")
|
||||
break
|
||||
key = input("Ход: ").strip().upper()
|
||||
if key == "Q":
|
||||
break
|
||||
elif key == "Z":
|
||||
if not history.undo():
|
||||
print("Нечего отменять.")
|
||||
elif key in _DIRECTIONS:
|
||||
if not history.execute(MoveCommand(player, key, maze, [view])):
|
||||
print("Туда нельзя — стена.")
|
||||
else:
|
||||
print("Неизвестная клавиша.")
|
||||
|
||||
|
||||
def cmd_experiment() -> None:
|
||||
run_experiment(_MAZES, list(_ALGO_MAP.values()))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Maze solver")
|
||||
parser.add_argument("--solve", metavar="FILE", help="Решить лабиринт")
|
||||
parser.add_argument("--algo", metavar="ALGO", default="bfs",
|
||||
help="bfs | dfs | astar | dijkstra")
|
||||
parser.add_argument("--walk", metavar="FILE", help="Ручное управление (WASD)")
|
||||
parser.add_argument("--experiment", action="store_true",
|
||||
help="Запустить замеры и сохранить CSV")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.solve:
|
||||
cmd_solve(args.solve, args.algo)
|
||||
elif args.walk:
|
||||
cmd_walk(args.walk)
|
||||
elif args.experiment:
|
||||
cmd_experiment()
|
||||
else:
|
||||
parser.print_help()
|
||||
Loading…
Reference in New Issue
Block a user