660 lines
21 KiB
Python
660 lines
21 KiB
Python
|
||
import os
|
||
import time
|
||
import heapq
|
||
import csv
|
||
import random
|
||
from abc import ABC, abstractmethod
|
||
from collections import deque
|
||
from dataclasses import dataclass
|
||
from typing import Optional
|
||
|
||
|
||
#1. Модель лабиринта — классы Cell и Maze
|
||
|
||
class Cell:
|
||
"""Клетка лабиринта."""
|
||
|
||
def __init__(self, x: int, y: int, is_wall: bool = False,
|
||
is_start: bool = False, is_exit: bool = False, weight: int = 1):
|
||
self.x = x
|
||
self.y = y
|
||
self.is_wall = is_wall
|
||
self.is_start = is_start
|
||
self.is_exit = is_exit
|
||
self.weight = weight # для взвешенного лабиринта (Этап 6 доп.)
|
||
|
||
def is_passable(self) -> bool:
|
||
return not self.is_wall
|
||
|
||
def __repr__(self):
|
||
return f"Cell({self.x},{self.y})"
|
||
|
||
def __eq__(self, other):
|
||
return isinstance(other, Cell) and self.x == other.x and self.y == other.y
|
||
|
||
def __hash__(self):
|
||
return hash((self.x, self.y))
|
||
|
||
def __lt__(self, other):
|
||
return (self.x, self.y) < (other.x, other.y)
|
||
|
||
|
||
class Maze:
|
||
"""Лабиринт — двумерный массив клеток."""
|
||
|
||
def __init__(self, width: int, height: int):
|
||
self.width = width
|
||
self.height = height
|
||
self.cells: list[list[Cell]] = []
|
||
self.start: Optional[Cell] = None
|
||
self.exit: Optional[Cell] = None
|
||
|
||
def get_cell(self, x: int, y: int) -> Optional[Cell]:
|
||
if 0 <= x < self.width and 0 <= y < self.height:
|
||
return self.cells[y][x]
|
||
return None
|
||
|
||
def get_neighbors(self, cell: Cell) -> list[Cell]:
|
||
"""Возвращает проходимых соседей (вверх, вниз, влево, вправо)."""
|
||
directions = [(0, -1), (0, 1), (-1, 0), (1, 0)]
|
||
neighbors = []
|
||
for dx, dy in directions:
|
||
neighbor = self.get_cell(cell.x + dx, cell.y + dy)
|
||
if neighbor and neighbor.is_passable():
|
||
neighbors.append(neighbor)
|
||
return neighbors
|
||
|
||
|
||
#2. Загрузка лабиринта из файла — паттерн Builder
|
||
|
||
class MazeBuilder(ABC):
|
||
"""Интерфейс строителя лабиринта."""
|
||
|
||
@abstractmethod
|
||
def build_from_file(self, filename: str) -> Maze:
|
||
pass
|
||
|
||
|
||
class TextFileMazeBuilder(MazeBuilder):
|
||
|
||
WEIGHT_MAP = {' ': 1, 'S': 1, 'E': 1, '.': 2, '~': 3, '#': 0}
|
||
|
||
def build_from_file(self, filename: str) -> Maze:
|
||
with open(filename, 'r', encoding='utf-8') as f:
|
||
lines = f.read().splitlines()
|
||
|
||
if not lines:
|
||
raise ValueError("Файл лабиринта пуст")
|
||
|
||
height = len(lines)
|
||
width = max(len(line) for line in lines)
|
||
maze = Maze(width, height)
|
||
|
||
for y, line in enumerate(lines):
|
||
row = []
|
||
for x in range(width):
|
||
ch = line[x] if x < len(line) else ' '
|
||
is_wall = ch == '#'
|
||
is_start = ch == 'S'
|
||
is_exit = ch == 'E'
|
||
weight = self.WEIGHT_MAP.get(ch, 1)
|
||
cell = Cell(x, y, is_wall=is_wall, is_start=is_start,
|
||
is_exit=is_exit, weight=weight)
|
||
row.append(cell)
|
||
if is_start:
|
||
maze.start = cell
|
||
if is_exit:
|
||
maze.exit = cell
|
||
maze.cells.append(row)
|
||
|
||
if maze.start is None:
|
||
raise ValueError("В лабиринте не задана стартовая клетка (S)")
|
||
if maze.exit is None:
|
||
raise ValueError("В лабиринте не задана выходная клетка (E)")
|
||
|
||
return maze
|
||
|
||
|
||
#3. Стратегии поиска пути — паттерн Strategy
|
||
|
||
@dataclass
|
||
class SearchStats:
|
||
"""Статистика одного запуска поиска."""
|
||
time_ms: float = 0.0
|
||
visited_cells: int = 0
|
||
path_length: int = 0
|
||
|
||
|
||
class PathFindingStrategy(ABC):
|
||
"""Интерфейс стратегии поиска пути."""
|
||
|
||
@abstractmethod
|
||
def find_path(self, maze: Maze, start: Cell, exit_cell: Cell) -> list[Cell]:
|
||
pass
|
||
|
||
def _reconstruct_path(self, parent: dict, start: Cell, exit_cell: Cell) -> list[Cell]:
|
||
path = []
|
||
current = exit_cell
|
||
while current is not None:
|
||
path.append(current)
|
||
current = parent.get(current)
|
||
path.reverse()
|
||
if path and path[0] == start:
|
||
return path
|
||
return []
|
||
|
||
|
||
class BFSStrategy(PathFindingStrategy):
|
||
"""Поиск в ширину"""
|
||
|
||
def __init__(self):
|
||
self.visited_count = 0
|
||
|
||
def find_path(self, maze: Maze, start: Cell, exit_cell: Cell) -> list[Cell]:
|
||
queue = deque([start])
|
||
parent: dict[Cell, Optional[Cell]] = {start: None}
|
||
self.visited_count = 0
|
||
|
||
while queue:
|
||
current = queue.popleft()
|
||
self.visited_count += 1
|
||
|
||
if current == exit_cell:
|
||
return self._reconstruct_path(parent, start, exit_cell)
|
||
|
||
for neighbor in maze.get_neighbors(current):
|
||
if neighbor not in parent:
|
||
parent[neighbor] = current
|
||
queue.append(neighbor)
|
||
|
||
return []
|
||
|
||
|
||
class DFSStrategy(PathFindingStrategy):
|
||
"""Поиск в глубину"""
|
||
|
||
def __init__(self):
|
||
self.visited_count = 0
|
||
|
||
def find_path(self, maze: Maze, start: Cell, exit_cell: Cell) -> list[Cell]:
|
||
stack = [start]
|
||
parent: dict[Cell, Optional[Cell]] = {start: None}
|
||
self.visited_count = 0
|
||
|
||
while stack:
|
||
current = stack.pop()
|
||
self.visited_count += 1
|
||
|
||
if current == exit_cell:
|
||
return self._reconstruct_path(parent, start, exit_cell)
|
||
|
||
for neighbor in maze.get_neighbors(current):
|
||
if neighbor not in parent:
|
||
parent[neighbor] = current
|
||
stack.append(neighbor)
|
||
|
||
return []
|
||
|
||
|
||
class AStarStrategy(PathFindingStrategy):
|
||
"""A* с манхэттенской эвристикой"""
|
||
|
||
def __init__(self):
|
||
self.visited_count = 0
|
||
|
||
def _heuristic(self, a: Cell, b: Cell) -> int:
|
||
return abs(a.x - b.x) + abs(a.y - b.y)
|
||
|
||
def find_path(self, maze: Maze, start: Cell, exit_cell: Cell) -> list[Cell]:
|
||
g_score = {start: 0}
|
||
parent: dict[Cell, Optional[Cell]] = {start: None}
|
||
open_heap = [(self._heuristic(start, exit_cell), 0, start)]
|
||
closed_set: set[Cell] = set() # уже обработанные клетки
|
||
self.visited_count = 0
|
||
counter = 0 # счётчик для устранения неоднозначности
|
||
|
||
while open_heap:
|
||
_, _, current = heapq.heappop(open_heap)
|
||
|
||
if current in closed_set:
|
||
continue
|
||
closed_set.add(current)
|
||
self.visited_count += 1
|
||
|
||
if current == exit_cell:
|
||
return self._reconstruct_path(parent, start, exit_cell)
|
||
|
||
for neighbor in maze.get_neighbors(current):
|
||
if neighbor in closed_set:
|
||
continue
|
||
tentative_g = g_score[current] + neighbor.weight
|
||
if tentative_g < g_score.get(neighbor, float('inf')):
|
||
g_score[neighbor] = tentative_g
|
||
parent[neighbor] = current
|
||
f = tentative_g + self._heuristic(neighbor, exit_cell)
|
||
counter += 1
|
||
heapq.heappush(open_heap, (f, counter, neighbor))
|
||
|
||
return []
|
||
|
||
|
||
class DijkstraStrategy(PathFindingStrategy):
|
||
"""Дейкстра"""
|
||
|
||
def __init__(self):
|
||
self.visited_count = 0
|
||
|
||
def find_path(self, maze: Maze, start: Cell, exit_cell: Cell) -> list[Cell]:
|
||
dist = {start: 0}
|
||
parent: dict[Cell, Optional[Cell]] = {start: None}
|
||
open_heap = [(0, 0, start)]
|
||
self.visited_count = 0
|
||
counter = 0
|
||
|
||
while open_heap:
|
||
cost, _, current = heapq.heappop(open_heap)
|
||
if cost > dist.get(current, float('inf')):
|
||
continue
|
||
self.visited_count += 1
|
||
|
||
if current == exit_cell:
|
||
return self._reconstruct_path(parent, start, exit_cell)
|
||
|
||
for neighbor in maze.get_neighbors(current):
|
||
new_cost = dist[current] + neighbor.weight
|
||
if new_cost < dist.get(neighbor, float('inf')):
|
||
dist[neighbor] = new_cost
|
||
parent[neighbor] = current
|
||
counter += 1
|
||
heapq.heappush(open_heap, (new_cost, counter, neighbor))
|
||
|
||
return []
|
||
|
||
|
||
#4. Оркестратор — MazeSolver
|
||
|
||
class MazeSolver:
|
||
"""Оркестратор"""
|
||
|
||
def __init__(self, maze: Maze, strategy: PathFindingStrategy):
|
||
self.maze = maze
|
||
self.strategy = strategy
|
||
self._observers: list['Observer'] = []
|
||
self._last_path: list[Cell] = []
|
||
|
||
def set_strategy(self, strategy: PathFindingStrategy):
|
||
self.strategy = strategy
|
||
|
||
def add_observer(self, observer: 'Observer'):
|
||
self._observers.append(observer)
|
||
|
||
def _notify(self, event: str, **kwargs):
|
||
for obs in self._observers:
|
||
obs.update(event, **kwargs)
|
||
|
||
def solve(self) -> SearchStats:
|
||
start = self.maze.start
|
||
exit_cell = self.maze.exit
|
||
self._notify("search_start", strategy=type(self.strategy).__name__)
|
||
|
||
t0 = time.perf_counter()
|
||
path = self.strategy.find_path(self.maze, start, exit_cell)
|
||
t1 = time.perf_counter()
|
||
|
||
self._last_path = path
|
||
visited = getattr(self.strategy, 'visited_count', 0)
|
||
stats = SearchStats(
|
||
time_ms=(t1 - t0) * 1000,
|
||
visited_cells=visited,
|
||
path_length=len(path)
|
||
)
|
||
self._notify("path_found", path=path, stats=stats)
|
||
return stats
|
||
|
||
def get_last_path(self) -> list[Cell]:
|
||
return self._last_path
|
||
|
||
|
||
#5.1. Observer — ConsoleView
|
||
|
||
class Observer(ABC):
|
||
"""Интерфейс наблюдателя."""
|
||
|
||
@abstractmethod
|
||
def update(self, event: str, **kwargs):
|
||
pass
|
||
|
||
|
||
class ConsoleView(Observer):
|
||
"""Консольный вид"""
|
||
|
||
SYMBOLS = {
|
||
'wall': '█',
|
||
'path': '·',
|
||
'start': 'S',
|
||
'exit': 'E',
|
||
'player': '@',
|
||
'visited': '°',
|
||
'empty': ' ',
|
||
}
|
||
|
||
def update(self, event: str, **kwargs):
|
||
if event == "search_start":
|
||
print(f"\n[Поиск] Алгоритм: {kwargs.get('strategy', '?')}")
|
||
elif event == "path_found":
|
||
path = kwargs.get('path', [])
|
||
stats = kwargs.get('stats')
|
||
if path:
|
||
print(f"[Готово] Путь найден! Длина: {stats.path_length}, "
|
||
f"Посещено: {stats.visited_cells}, Время: {stats.time_ms:.2f} мс")
|
||
else:
|
||
print("[Готово] Путь не найден!")
|
||
elif event == "move":
|
||
cell = kwargs.get('cell')
|
||
print(f"[Ход] Игрок перемещается в {cell}")
|
||
|
||
def render(self, maze: Maze, player_pos: Optional[Cell] = None,
|
||
path: Optional[list[Cell]] = None):
|
||
"""Рисует лабиринт в консоли."""
|
||
path_set = set(path) if path else set()
|
||
print()
|
||
for y in range(maze.height):
|
||
row = ''
|
||
for x in range(maze.width):
|
||
cell = maze.cells[y][x]
|
||
if cell.is_wall:
|
||
row += self.SYMBOLS['wall']
|
||
elif player_pos and cell == player_pos:
|
||
row += self.SYMBOLS['player']
|
||
elif cell.is_start:
|
||
row += self.SYMBOLS['start']
|
||
elif cell.is_exit:
|
||
row += self.SYMBOLS['exit']
|
||
elif cell in path_set:
|
||
row += self.SYMBOLS['path']
|
||
else:
|
||
row += self.SYMBOLS['empty']
|
||
print(row)
|
||
print()
|
||
|
||
|
||
#5.2. Command — Player и MoveCommand
|
||
|
||
class Player:
|
||
"""Игрок с текущей позицией в лабиринте."""
|
||
|
||
def __init__(self, start_cell: Cell):
|
||
self.current_cell = start_cell
|
||
|
||
def move_to(self, cell: Cell):
|
||
self.current_cell = cell
|
||
|
||
|
||
class Command(ABC):
|
||
"""Интерфейс команды."""
|
||
|
||
@abstractmethod
|
||
def execute(self):
|
||
pass
|
||
|
||
@abstractmethod
|
||
def undo(self):
|
||
pass
|
||
|
||
|
||
class MoveCommand(Command):
|
||
"""Команда перемещения игрока в указанную клетку."""
|
||
|
||
def __init__(self, player: Player, target_cell: Cell, solver: MazeSolver):
|
||
self.player = player
|
||
self.target_cell = target_cell
|
||
self.previous_cell = player.current_cell
|
||
self.solver = solver
|
||
|
||
def execute(self):
|
||
self.previous_cell = self.player.current_cell
|
||
self.player.move_to(self.target_cell)
|
||
self.solver._notify("move", cell=self.target_cell)
|
||
|
||
def undo(self):
|
||
self.player.move_to(self.previous_cell)
|
||
self.solver._notify("move", cell=self.previous_cell)
|
||
|
||
|
||
#6. Экспериментальная часть
|
||
|
||
def generate_maze_file(filename: str, width: int, height: int,
|
||
wall_density: float = 0.3, no_exit: bool = False):
|
||
"""Генерирует случайный лабиринт и сохраняет в файл."""
|
||
random.seed(42)
|
||
grid = []
|
||
for y in range(height):
|
||
row = []
|
||
for x in range(width):
|
||
if x == 0 or y == 0 or x == width - 1 or y == height - 1:
|
||
row.append('#')
|
||
else:
|
||
row.append('#' if random.random() < wall_density else ' ')
|
||
grid.append(row)
|
||
|
||
# Старт и выход
|
||
grid[1][1] = 'S'
|
||
grid[height - 2][width - 2] = 'E'
|
||
if no_exit:
|
||
ex, ey = width - 2, height - 2
|
||
for dx, dy in [(0, -1), (0, 1), (-1, 0), (1, 0)]:
|
||
nx, ny = ex + dx, ey + dy
|
||
if 0 < nx < width - 1 and 0 < ny < height - 1:
|
||
grid[ny][nx] = '#'
|
||
|
||
with open(filename, 'w', encoding='utf-8') as f:
|
||
for row in grid:
|
||
f.write(''.join(row) + '\n')
|
||
|
||
|
||
def run_experiments(mazes_config: list[dict], strategies: dict,
|
||
runs: int = 5) -> list[dict]:
|
||
builder = TextFileMazeBuilder()
|
||
results = []
|
||
|
||
for maze_cfg in mazes_config:
|
||
name = maze_cfg['name']
|
||
filepath = maze_cfg['file']
|
||
|
||
# Генерируем файл лабиринта
|
||
if 'generate' in maze_cfg:
|
||
gen = maze_cfg['generate']
|
||
generate_maze_file(filepath, gen['width'], gen['height'],
|
||
gen.get('density', 0.3),
|
||
gen.get('no_exit', False))
|
||
|
||
try:
|
||
maze = builder.build_from_file(filepath)
|
||
except Exception as e:
|
||
print(f"[Пропуск] {name}: {e}")
|
||
continue
|
||
|
||
for strat_name, strategy_cls in strategies.items():
|
||
times, visited, lengths = [], [], []
|
||
for _ in range(runs):
|
||
strategy = strategy_cls()
|
||
solver = MazeSolver(maze, strategy)
|
||
stats = solver.solve()
|
||
times.append(stats.time_ms)
|
||
visited.append(stats.visited_cells)
|
||
lengths.append(stats.path_length)
|
||
|
||
results.append({
|
||
'лабиринт': name,
|
||
'стратегия': strat_name,
|
||
'время_мс': round(sum(times) / runs, 4),
|
||
'посещено_клеток': int(sum(visited) / runs),
|
||
'длина_пути': int(sum(lengths) / runs),
|
||
})
|
||
print(f" {name} / {strat_name}: "
|
||
f"time={results[-1]['время_мс']:.3f}ms, "
|
||
f"visited={results[-1]['посещено_клеток']}, "
|
||
f"path={results[-1]['длина_пути']}")
|
||
|
||
return results
|
||
|
||
|
||
def save_csv(results: list[dict], filename: str):
|
||
if not results:
|
||
return
|
||
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
|
||
writer = csv.DictWriter(f, fieldnames=results[0].keys())
|
||
writer.writeheader()
|
||
writer.writerows(results)
|
||
print(f"\nРезультаты сохранены в {filename}")
|
||
|
||
|
||
def print_table(results: list[dict]):
|
||
if not results:
|
||
print("Нет результатов.")
|
||
return
|
||
header = f"{'Лабиринт':<20} {'Стратегия':<12} {'Время,мс':>10} {'Посещено':>10} {'Путь':>8}"
|
||
print("\n" + "=" * len(header))
|
||
print(header)
|
||
print("=" * len(header))
|
||
for r in results:
|
||
print(f"{r['лабиринт']:<20} {r['стратегия']:<12} "
|
||
f"{r['время_мс']:>10.4f} {r['посещено_клеток']:>10} {r['длина_пути']:>8}")
|
||
print("=" * len(header))
|
||
|
||
|
||
#Демонстрация
|
||
|
||
def demo_interactive(maze: Maze, solver: MazeSolver, view: ConsoleView):
|
||
"""Пошаговый режим с Command."""
|
||
path = solver.get_last_path()
|
||
if not path:
|
||
print("Путь не найден — пошаговый режим недоступен.")
|
||
return
|
||
|
||
player = Player(maze.start)
|
||
history: list[MoveCommand] = []
|
||
view.render(maze, player_pos=player.current_cell, path=path)
|
||
|
||
print("Пошаговый режим: [N] — следующий шаг, [U] — отмена, [Q] — выход")
|
||
step_index = 1 # 0-й шаг — старт, уже там
|
||
|
||
while True:
|
||
cmd = input("Команда: ").strip().upper()
|
||
if cmd == 'Q':
|
||
break
|
||
elif cmd == 'N':
|
||
if step_index < len(path):
|
||
move = MoveCommand(player, path[step_index], solver)
|
||
move.execute()
|
||
history.append(move)
|
||
step_index += 1
|
||
os.system('cls' if os.name == 'nt' else 'clear')
|
||
view.render(maze, player_pos=player.current_cell, path=path)
|
||
if player.current_cell == maze.exit:
|
||
print("🎉 Вы достигли выхода!")
|
||
break
|
||
else:
|
||
print("Вы уже в конце пути.")
|
||
elif cmd == 'U':
|
||
if history:
|
||
move = history.pop()
|
||
move.undo()
|
||
step_index -= 1
|
||
os.system('cls' if os.name == 'nt' else 'clear')
|
||
view.render(maze, player_pos=player.current_cell, path=path)
|
||
else:
|
||
print("Нечего отменять.")
|
||
else:
|
||
print("Неизвестная команда.")
|
||
|
||
|
||
def main():
|
||
print("=" * 60)
|
||
print(" Поиск выхода из лабиринта — ООП + паттерны GoF")
|
||
print("=" * 60)
|
||
|
||
small_maze_file = "maze_small.txt"
|
||
|
||
builder = TextFileMazeBuilder()
|
||
maze = builder.build_from_file(small_maze_file)
|
||
|
||
#2. Создаём представление (Observer)
|
||
view = ConsoleView()
|
||
|
||
#3. Демонстрация стратегий
|
||
strategies = {
|
||
'BFS': BFSStrategy,
|
||
'DFS': DFSStrategy,
|
||
'A*': AStarStrategy,
|
||
'Дейкстра': DijkstraStrategy,
|
||
}
|
||
|
||
print(f"\nЛабиринт ({maze.width}×{maze.height}):")
|
||
view.render(maze)
|
||
|
||
for name, cls in strategies.items():
|
||
strategy = cls()
|
||
solver = MazeSolver(maze, strategy)
|
||
solver.add_observer(view)
|
||
stats = solver.solve()
|
||
|
||
#Визуализация пути A*
|
||
print("\n--- Путь, найденный A* ---")
|
||
a_star = AStarStrategy()
|
||
solver = MazeSolver(maze, a_star)
|
||
solver.add_observer(view)
|
||
solver.solve()
|
||
view.render(maze, path=solver.get_last_path())
|
||
|
||
#4. Пошаговый режим Command
|
||
ans = input("Запустить пошаговый режим? (y/n): ").strip().lower()
|
||
if ans == 'y':
|
||
demo_interactive(maze, solver, view)
|
||
|
||
#5. Экспериментальная часть
|
||
print("\n" + "=" * 60)
|
||
print(" Экспериментальная часть")
|
||
print("=" * 60)
|
||
|
||
mazes_config = [
|
||
{
|
||
'name': 'Маленький 10×10',
|
||
'file': 'maze_small.txt',
|
||
},
|
||
{
|
||
'name': 'Средний 50×50',
|
||
'file': 'maze_medium.txt',
|
||
},
|
||
{
|
||
'name': 'Большой 100×100',
|
||
'file': 'maze_large.txt',
|
||
},
|
||
{
|
||
'name': 'Пустой 50×50',
|
||
'file': 'maze_empty.txt',
|
||
},
|
||
{
|
||
'name': 'Без выхода 20×20',
|
||
'file': 'maze_no_exit.txt',
|
||
},
|
||
]
|
||
|
||
all_strategies = {
|
||
'BFS': BFSStrategy,
|
||
'DFS': DFSStrategy,
|
||
'A*': AStarStrategy,
|
||
'Дейкстра': DijkstraStrategy,
|
||
}
|
||
|
||
print("\nЗапуск экспериментов (5 прогонов каждого)...")
|
||
results = run_experiments(mazes_config, all_strategies, runs=5)
|
||
print_table(results)
|
||
save_csv(results, "results.csv")
|
||
|
||
print("\nГотово!")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main() |