Files
Timeline_Generator/backend/path_planner.py
beabigegg 2d37d23bcf v9.5: 實作標籤完全不重疊算法
- 新增 _calculate_lane_conflicts_v2() 分開返回標籤重疊和線穿框分數
- 修改泳道選擇算法,優先選擇無標籤重疊的泳道
- 兩階段搜尋:優先側別無可用泳道則嘗試另一側
- 增強日誌輸出,顯示標籤範圍和詳細衝突分數

🤖 Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-06 11:35:29 +08:00

456 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
網格化路徑規劃器
使用BFS算法在網格化的繪圖區域中為連接線尋找最佳路徑
完全避開標籤障礙物。
Author: AI Agent
Version: 1.0.0
"""
import logging
from typing import List, Tuple, Optional, Dict
from datetime import datetime, timedelta
from collections import deque
import numpy as np
logger = logging.getLogger(__name__)
class GridMap:
"""
2D網格地圖
用於路徑規劃的網格化表示,支持障礙物標記和路徑搜尋。
"""
# 格點狀態常量
FREE = 0
OBSTACLE = 1
PATH = 2
def __init__(
self,
time_range_seconds: float,
y_min: float,
y_max: float,
grid_cols: int,
grid_rows: int,
time_start: datetime
):
"""
初始化網格地圖
Args:
time_range_seconds: 時間範圍(秒)
y_min: Y軸最小值
y_max: Y軸最大值
grid_cols: 網格列數X方向
grid_rows: 網格行數Y方向
time_start: 時間軸起始時間
"""
self.time_range_seconds = time_range_seconds
self.y_min = y_min
self.y_max = y_max
self.grid_cols = grid_cols
self.grid_rows = grid_rows
self.time_start = time_start
# 創建網格初始全為FREE
self.grid = np.zeros((grid_rows, grid_cols), dtype=np.int8)
# 座標轉換比例
self.seconds_per_col = time_range_seconds / grid_cols
self.y_per_row = (y_max - y_min) / grid_rows
logger.info(f"創建網格地圖: {grid_cols}× {grid_rows}")
logger.info(f" 時間範圍: {time_range_seconds:.0f}秒 ({time_range_seconds/86400:.1f}天)")
logger.info(f" Y軸範圍: {y_min:.1f} ~ {y_max:.1f}")
logger.info(f" 解析度: {self.seconds_per_col:.2f}秒/格, {self.y_per_row:.3f}Y/格")
def datetime_to_grid_x(self, dt: datetime) -> int:
"""將datetime轉換為網格X座標"""
seconds = (dt - self.time_start).total_seconds()
col = int(seconds / self.seconds_per_col)
return max(0, min(col, self.grid_cols - 1))
def seconds_to_grid_x(self, seconds: float) -> int:
"""將秒數轉換為網格X座標"""
col = int(seconds / self.seconds_per_col)
return max(0, min(col, self.grid_cols - 1))
def y_to_grid_y(self, y: float) -> int:
"""將Y座標轉換為網格Y座標注意Y軸向上但行索引向下"""
# Y軸向上為正但網格行索引向下增加需要翻轉
normalized_y = (y - self.y_min) / (self.y_max - self.y_min)
row = int((1 - normalized_y) * self.grid_rows)
return max(0, min(row, self.grid_rows - 1))
def grid_to_datetime(self, col: int) -> datetime:
"""將網格X座標轉換為datetime"""
seconds = col * self.seconds_per_col
return self.time_start + timedelta(seconds=seconds)
def grid_to_y(self, row: int) -> float:
"""將網格Y座標轉換為Y座標"""
normalized_y = 1 - (row / self.grid_rows)
return self.y_min + normalized_y * (self.y_max - self.y_min)
def mark_rectangle(
self,
center_x_datetime: datetime,
center_y: float,
width_seconds: float,
height: float,
state: int = OBSTACLE,
expansion_ratio: float = 0.1
):
"""
標記矩形區域
Args:
center_x_datetime: 矩形中心X座標datetime
center_y: 矩形中心Y座標
width_seconds: 矩形寬度(秒)
height: 矩形高度
state: 標記狀態OBSTACLE或PATH
expansion_ratio: 外擴比例默認10%
"""
# 外擴
expanded_width = width_seconds * (1 + expansion_ratio)
expanded_height = height * (1 + expansion_ratio)
# 計算矩形範圍
center_x_seconds = (center_x_datetime - self.time_start).total_seconds()
x_min = center_x_seconds - expanded_width / 2
x_max = center_x_seconds + expanded_width / 2
y_min = center_y - expanded_height / 2
y_max = center_y + expanded_height / 2
# 轉換為網格座標
col_min = self.seconds_to_grid_x(x_min)
col_max = self.seconds_to_grid_x(x_max)
row_min = self.y_to_grid_y(y_max) # 注意Y軸翻轉
row_max = self.y_to_grid_y(y_min)
# 標記網格
for row in range(row_min, row_max + 1):
for col in range(col_min, col_max + 1):
if 0 <= row < self.grid_rows and 0 <= col < self.grid_cols:
self.grid[row, col] = state
def mark_path(
self,
path_points: List[Tuple[datetime, float]],
width_expansion: float = 2.5
):
"""
標記路徑為障礙物
Args:
path_points: 路徑點列表 [(datetime, y), ...]
width_expansion: 寬度擴展倍數
策略:
1. 標記所有線段(包括起點線段)
2. 但是起點線段只標記離開時間軸的垂直部分
3. 時間軸 y=0 本身不標記,避免阻擋其他起點
"""
if len(path_points) < 2:
return
# 標記所有線段
for i in range(len(path_points) - 1):
dt1, y1 = path_points[i]
dt2, y2 = path_points[i + 1]
# 如果是從時間軸y=0出發的第一段線段
if i == 0 and abs(y1) < 0.1:
# 只標記離開時間軸的部分(從 y=0.2 開始)
# 避免阻擋其他事件的起點
if abs(y2) > 0.2: # 確保終點不在時間軸上
# 使用線性插值找到 y=0.2 的點
if abs(y2 - y1) > 0.01:
t = (0.2 - y1) / (y2 - y1) if y2 > y1 else (-0.2 - y1) / (y2 - y1)
if 0 < t < 1:
# 計算 y=0.2 時的 datetime
seconds_offset = (dt2 - dt1).total_seconds() * t
dt_cutoff = dt1 + timedelta(seconds=seconds_offset)
y_cutoff = 0.2 if y2 > 0 else -0.2
# 只標記從 cutoff 點到終點的部分
col1 = self.datetime_to_grid_x(dt_cutoff)
row1 = self.y_to_grid_y(y_cutoff)
col2 = self.datetime_to_grid_x(dt2)
row2 = self.y_to_grid_y(y2)
self._mark_line(row1, col1, row2, col2, int(width_expansion))
else:
# t 不在範圍內,標記整段
col1 = self.datetime_to_grid_x(dt1)
row1 = self.y_to_grid_y(y1)
col2 = self.datetime_to_grid_x(dt2)
row2 = self.y_to_grid_y(y2)
self._mark_line(row1, col1, row2, col2, int(width_expansion))
# 如果終點也在時間軸上,不標記
else:
# 非起點線段,全部標記
col1 = self.datetime_to_grid_x(dt1)
row1 = self.y_to_grid_y(y1)
col2 = self.datetime_to_grid_x(dt2)
row2 = self.y_to_grid_y(y2)
self._mark_line(row1, col1, row2, col2, int(width_expansion))
def _mark_line(self, row1: int, col1: int, row2: int, col2: int, thickness: int = 1):
"""使用Bresenham算法標記線段"""
d_col = abs(col2 - col1)
d_row = abs(row2 - row1)
col_step = 1 if col1 < col2 else -1
row_step = 1 if row1 < row2 else -1
if d_col > d_row:
error = d_col / 2
row = row1
for col in range(col1, col2 + col_step, col_step):
self._mark_point_with_thickness(row, col, thickness)
error -= d_row
if error < 0:
row += row_step
error += d_col
else:
error = d_row / 2
col = col1
for row in range(row1, row2 + row_step, row_step):
self._mark_point_with_thickness(row, col, thickness)
error -= d_col
if error < 0:
col += col_step
error += d_row
def _mark_point_with_thickness(self, row: int, col: int, thickness: int):
"""標記點及其周圍(模擬線寬)"""
for dr in range(-thickness, thickness + 1):
for dc in range(-thickness, thickness + 1):
r = row + dr
c = col + dc
if 0 <= r < self.grid_rows and 0 <= c < self.grid_cols:
self.grid[r, c] = self.PATH
def is_free(self, row: int, col: int) -> bool:
"""檢查格點是否可通行"""
if not (0 <= row < self.grid_rows and 0 <= col < self.grid_cols):
return False
return self.grid[row, col] == self.FREE
def auto_calculate_grid_resolution(
num_events: int,
time_range_seconds: float,
canvas_width: int = 1200,
canvas_height: int = 600,
label_width_ratio: float = 0.15
) -> Tuple[int, int]:
"""
自動計算最佳網格解析度
綜合考慮:
1. 畫布大小目標每格12像素
2. 事件密度(密集時提高解析度)
3. 標籤大小每個標籤至少10格
Args:
num_events: 事件數量
time_range_seconds: 時間範圍(秒)
canvas_width: 畫布寬度(像素)
canvas_height: 畫布高度(像素)
label_width_ratio: 標籤寬度佔時間軸的比例
Returns:
(grid_cols, grid_rows): 網格列數和行數
"""
# 策略1基於畫布大小進一步提高密度每格3像素
pixels_per_cell = 3 # 每格3像素 = 非常精細的網格
cols_by_canvas = canvas_width // pixels_per_cell
rows_by_canvas = canvas_height // pixels_per_cell
# 策略2基於事件密度提高倍數
density = num_events / time_range_seconds if time_range_seconds > 0 else 0
if density > 0.001: # 高密度(<1000秒/事件)
density_multiplier = 2.5 # 提高倍數
elif density > 0.0001: # 中密度
density_multiplier = 2.0 # 提高倍數
else: # 低密度
density_multiplier = 1.5 # 提高倍數
cols_by_density = int(cols_by_canvas * density_multiplier)
rows_by_density = int(rows_by_canvas * density_multiplier)
# 策略3基於標籤大小每個標籤至少40格大幅提高精度
label_width_seconds = time_range_seconds * label_width_ratio
min_grids_per_label = 40 # 每標籤至少40格確保精確判斷
cols_by_label = int((time_range_seconds / label_width_seconds) * min_grids_per_label)
# 取最大值(最細網格),大幅提高上限
grid_cols = min(max(cols_by_canvas, cols_by_density, cols_by_label), 800) # 上限提高到800
grid_rows = min(max(rows_by_canvas, rows_by_density, 100), 400) # 上限提高到400
logger.info(f"自動計算網格解析度:")
logger.info(f" 基於畫布: {cols_by_canvas} × {rows_by_canvas}")
logger.info(f" 基於密度: {cols_by_density} × {rows_by_density} (倍數: {density_multiplier:.1f})")
logger.info(f" 基於標籤: {cols_by_label} × 30")
logger.info(f" 最終選擇: {grid_cols} × {grid_rows}")
return (grid_cols, grid_rows)
def find_path_bfs(
start_row: int,
start_col: int,
end_row: int,
end_col: int,
grid_map: GridMap,
direction_constraint: str = "up" # "up" or "down"
) -> Optional[List[Tuple[int, int]]]:
"""
使用BFS尋找路徑改進版優先離開時間軸
策略:
1. 優先垂直移動(離開時間軸)
2. 遇到障礙物才水平繞行
3. 使用優先隊列,根據與時間軸的距離排序
Args:
start_row, start_col: 起點網格座標
end_row, end_col: 終點網格座標
grid_map: 網格地圖
direction_constraint: 方向約束("up"往上,"down"往下)
Returns:
路徑點列表 [(row, col), ...] 或 None找不到路徑
"""
# 檢查起點和終點是否可通行
if not grid_map.is_free(start_row, start_col):
logger.warning(f"起點 ({start_row},{start_col}) 被障礙物佔據")
return None
if not grid_map.is_free(end_row, end_col):
logger.warning(f"終點 ({end_row},{end_col}) 被障礙物佔據")
return None
import heapq
# 計算時間軸的Y座標row
timeline_row = grid_map.y_to_grid_y(0)
# 優先隊列:(優先度, row, col, path)
# 優先度 = 與時間軸的距離(越遠越好)+ 路徑長度(越短越好)
start_priority = 0
heap = [(start_priority, start_row, start_col, [(start_row, start_col)])]
visited = set()
visited.add((start_row, start_col))
# 方向優先順序(垂直優先於水平)
if direction_constraint == "up":
# 優先往上,然後才左右
directions = [(-1, 0), (0, 1), (0, -1)] # 上、右、左
else: # "down"
# 優先往下,然後才左右
directions = [(1, 0), (0, 1), (0, -1)] # 下、右、左
max_iterations = grid_map.grid_rows * grid_map.grid_cols * 2
iterations = 0
while heap and iterations < max_iterations:
iterations += 1
_, current_row, current_col, path = heapq.heappop(heap)
# 到達終點
if current_row == end_row and current_col == end_col:
logger.info(f"找到路徑,長度: {len(path)},迭代: {iterations}")
return path
# 探索鄰居(按優先順序)
for d_row, d_col in directions:
next_row = current_row + d_row
next_col = current_col + d_col
# 檢查是否可通行
if (next_row, next_col) in visited:
continue
if not grid_map.is_free(next_row, next_col):
continue
# 計算優先度
# 1. 與時間軸的距離(主要因素)
distance_from_timeline = abs(next_row - timeline_row)
# 2. 曼哈頓距離到終點(次要因素)
manhattan_to_goal = abs(next_row - end_row) + abs(next_col - end_col)
# 3. 路徑長度(避免繞太遠)
path_length = len(path)
# 綜合優先度:離時間軸越遠越好,離目標越近越好
# 權重調整:優先離開時間軸
priority = (
-distance_from_timeline * 100 + # 負數因為要最大化
manhattan_to_goal * 10 +
path_length
)
# 添加到優先隊列
visited.add((next_row, next_col))
new_path = path + [(next_row, next_col)]
heapq.heappush(heap, (priority, next_row, next_col, new_path))
logger.warning(f"BFS未找到路徑 ({start_row},{start_col}) → ({end_row},{end_col})")
return None
def simplify_path(
path_grid: List[Tuple[int, int]],
grid_map: GridMap
) -> List[Tuple[datetime, float]]:
"""
簡化路徑並轉換為實際座標
合併連續同向的線段,移除不必要的轉折點。
Args:
path_grid: 網格路徑點 [(row, col), ...]
grid_map: 網格地圖
Returns:
簡化後的路徑 [(datetime, y), ...]
"""
if not path_grid:
return []
simplified = [path_grid[0]] # 起點
for i in range(1, len(path_grid) - 1):
prev_point = path_grid[i - 1]
curr_point = path_grid[i]
next_point = path_grid[i + 1]
# 計算方向
dir1 = (curr_point[0] - prev_point[0], curr_point[1] - prev_point[1])
dir2 = (next_point[0] - curr_point[0], next_point[1] - curr_point[1])
# 如果方向改變,保留這個轉折點
if dir1 != dir2:
simplified.append(curr_point)
simplified.append(path_grid[-1]) # 終點
# 轉換為實際座標
result = []
for row, col in simplified:
dt = grid_map.grid_to_datetime(col)
y = grid_map.grid_to_y(row)
result.append((dt, y))
logger.debug(f"路徑簡化: {len(path_grid)}{len(simplified)}")
return result