v9.5: 實作標籤完全不重疊算法

- 新增 _calculate_lane_conflicts_v2() 分開返回標籤重疊和線穿框分數
- 修改泳道選擇算法,優先選擇無標籤重疊的泳道
- 兩階段搜尋:優先側別無可用泳道則嘗試另一側
- 增強日誌輸出,顯示標籤範圍和詳細衝突分數

🤖 Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
beabigegg
2025-11-06 11:35:29 +08:00
commit 2d37d23bcf
83 changed files with 22971 additions and 0 deletions

455
backend/path_planner.py Normal file
View File

@@ -0,0 +1,455 @@
"""
網格化路徑規劃器
使用BFS算法在網格化的繪圖區域中為連接線尋找最佳路徑
完全避開標籤障礙物。
Author: AI Agent
Version: 1.0.0
"""
import logging
from typing import List, Tuple, Optional, Dict
from datetime import datetime, timedelta
from collections import deque
import numpy as np
logger = logging.getLogger(__name__)
class GridMap:
"""
2D網格地圖
用於路徑規劃的網格化表示,支持障礙物標記和路徑搜尋。
"""
# 格點狀態常量
FREE = 0
OBSTACLE = 1
PATH = 2
def __init__(
self,
time_range_seconds: float,
y_min: float,
y_max: float,
grid_cols: int,
grid_rows: int,
time_start: datetime
):
"""
初始化網格地圖
Args:
time_range_seconds: 時間範圍(秒)
y_min: Y軸最小值
y_max: Y軸最大值
grid_cols: 網格列數X方向
grid_rows: 網格行數Y方向
time_start: 時間軸起始時間
"""
self.time_range_seconds = time_range_seconds
self.y_min = y_min
self.y_max = y_max
self.grid_cols = grid_cols
self.grid_rows = grid_rows
self.time_start = time_start
# 創建網格初始全為FREE
self.grid = np.zeros((grid_rows, grid_cols), dtype=np.int8)
# 座標轉換比例
self.seconds_per_col = time_range_seconds / grid_cols
self.y_per_row = (y_max - y_min) / grid_rows
logger.info(f"創建網格地圖: {grid_cols}× {grid_rows}")
logger.info(f" 時間範圍: {time_range_seconds:.0f}秒 ({time_range_seconds/86400:.1f}天)")
logger.info(f" Y軸範圍: {y_min:.1f} ~ {y_max:.1f}")
logger.info(f" 解析度: {self.seconds_per_col:.2f}秒/格, {self.y_per_row:.3f}Y/格")
def datetime_to_grid_x(self, dt: datetime) -> int:
"""將datetime轉換為網格X座標"""
seconds = (dt - self.time_start).total_seconds()
col = int(seconds / self.seconds_per_col)
return max(0, min(col, self.grid_cols - 1))
def seconds_to_grid_x(self, seconds: float) -> int:
"""將秒數轉換為網格X座標"""
col = int(seconds / self.seconds_per_col)
return max(0, min(col, self.grid_cols - 1))
def y_to_grid_y(self, y: float) -> int:
"""將Y座標轉換為網格Y座標注意Y軸向上但行索引向下"""
# Y軸向上為正但網格行索引向下增加需要翻轉
normalized_y = (y - self.y_min) / (self.y_max - self.y_min)
row = int((1 - normalized_y) * self.grid_rows)
return max(0, min(row, self.grid_rows - 1))
def grid_to_datetime(self, col: int) -> datetime:
"""將網格X座標轉換為datetime"""
seconds = col * self.seconds_per_col
return self.time_start + timedelta(seconds=seconds)
def grid_to_y(self, row: int) -> float:
"""將網格Y座標轉換為Y座標"""
normalized_y = 1 - (row / self.grid_rows)
return self.y_min + normalized_y * (self.y_max - self.y_min)
def mark_rectangle(
self,
center_x_datetime: datetime,
center_y: float,
width_seconds: float,
height: float,
state: int = OBSTACLE,
expansion_ratio: float = 0.1
):
"""
標記矩形區域
Args:
center_x_datetime: 矩形中心X座標datetime
center_y: 矩形中心Y座標
width_seconds: 矩形寬度(秒)
height: 矩形高度
state: 標記狀態OBSTACLE或PATH
expansion_ratio: 外擴比例默認10%
"""
# 外擴
expanded_width = width_seconds * (1 + expansion_ratio)
expanded_height = height * (1 + expansion_ratio)
# 計算矩形範圍
center_x_seconds = (center_x_datetime - self.time_start).total_seconds()
x_min = center_x_seconds - expanded_width / 2
x_max = center_x_seconds + expanded_width / 2
y_min = center_y - expanded_height / 2
y_max = center_y + expanded_height / 2
# 轉換為網格座標
col_min = self.seconds_to_grid_x(x_min)
col_max = self.seconds_to_grid_x(x_max)
row_min = self.y_to_grid_y(y_max) # 注意Y軸翻轉
row_max = self.y_to_grid_y(y_min)
# 標記網格
for row in range(row_min, row_max + 1):
for col in range(col_min, col_max + 1):
if 0 <= row < self.grid_rows and 0 <= col < self.grid_cols:
self.grid[row, col] = state
def mark_path(
self,
path_points: List[Tuple[datetime, float]],
width_expansion: float = 2.5
):
"""
標記路徑為障礙物
Args:
path_points: 路徑點列表 [(datetime, y), ...]
width_expansion: 寬度擴展倍數
策略:
1. 標記所有線段(包括起點線段)
2. 但是起點線段只標記離開時間軸的垂直部分
3. 時間軸 y=0 本身不標記,避免阻擋其他起點
"""
if len(path_points) < 2:
return
# 標記所有線段
for i in range(len(path_points) - 1):
dt1, y1 = path_points[i]
dt2, y2 = path_points[i + 1]
# 如果是從時間軸y=0出發的第一段線段
if i == 0 and abs(y1) < 0.1:
# 只標記離開時間軸的部分(從 y=0.2 開始)
# 避免阻擋其他事件的起點
if abs(y2) > 0.2: # 確保終點不在時間軸上
# 使用線性插值找到 y=0.2 的點
if abs(y2 - y1) > 0.01:
t = (0.2 - y1) / (y2 - y1) if y2 > y1 else (-0.2 - y1) / (y2 - y1)
if 0 < t < 1:
# 計算 y=0.2 時的 datetime
seconds_offset = (dt2 - dt1).total_seconds() * t
dt_cutoff = dt1 + timedelta(seconds=seconds_offset)
y_cutoff = 0.2 if y2 > 0 else -0.2
# 只標記從 cutoff 點到終點的部分
col1 = self.datetime_to_grid_x(dt_cutoff)
row1 = self.y_to_grid_y(y_cutoff)
col2 = self.datetime_to_grid_x(dt2)
row2 = self.y_to_grid_y(y2)
self._mark_line(row1, col1, row2, col2, int(width_expansion))
else:
# t 不在範圍內,標記整段
col1 = self.datetime_to_grid_x(dt1)
row1 = self.y_to_grid_y(y1)
col2 = self.datetime_to_grid_x(dt2)
row2 = self.y_to_grid_y(y2)
self._mark_line(row1, col1, row2, col2, int(width_expansion))
# 如果終點也在時間軸上,不標記
else:
# 非起點線段,全部標記
col1 = self.datetime_to_grid_x(dt1)
row1 = self.y_to_grid_y(y1)
col2 = self.datetime_to_grid_x(dt2)
row2 = self.y_to_grid_y(y2)
self._mark_line(row1, col1, row2, col2, int(width_expansion))
def _mark_line(self, row1: int, col1: int, row2: int, col2: int, thickness: int = 1):
"""使用Bresenham算法標記線段"""
d_col = abs(col2 - col1)
d_row = abs(row2 - row1)
col_step = 1 if col1 < col2 else -1
row_step = 1 if row1 < row2 else -1
if d_col > d_row:
error = d_col / 2
row = row1
for col in range(col1, col2 + col_step, col_step):
self._mark_point_with_thickness(row, col, thickness)
error -= d_row
if error < 0:
row += row_step
error += d_col
else:
error = d_row / 2
col = col1
for row in range(row1, row2 + row_step, row_step):
self._mark_point_with_thickness(row, col, thickness)
error -= d_col
if error < 0:
col += col_step
error += d_row
def _mark_point_with_thickness(self, row: int, col: int, thickness: int):
"""標記點及其周圍(模擬線寬)"""
for dr in range(-thickness, thickness + 1):
for dc in range(-thickness, thickness + 1):
r = row + dr
c = col + dc
if 0 <= r < self.grid_rows and 0 <= c < self.grid_cols:
self.grid[r, c] = self.PATH
def is_free(self, row: int, col: int) -> bool:
"""檢查格點是否可通行"""
if not (0 <= row < self.grid_rows and 0 <= col < self.grid_cols):
return False
return self.grid[row, col] == self.FREE
def auto_calculate_grid_resolution(
num_events: int,
time_range_seconds: float,
canvas_width: int = 1200,
canvas_height: int = 600,
label_width_ratio: float = 0.15
) -> Tuple[int, int]:
"""
自動計算最佳網格解析度
綜合考慮:
1. 畫布大小目標每格12像素
2. 事件密度(密集時提高解析度)
3. 標籤大小每個標籤至少10格
Args:
num_events: 事件數量
time_range_seconds: 時間範圍(秒)
canvas_width: 畫布寬度(像素)
canvas_height: 畫布高度(像素)
label_width_ratio: 標籤寬度佔時間軸的比例
Returns:
(grid_cols, grid_rows): 網格列數和行數
"""
# 策略1基於畫布大小進一步提高密度每格3像素
pixels_per_cell = 3 # 每格3像素 = 非常精細的網格
cols_by_canvas = canvas_width // pixels_per_cell
rows_by_canvas = canvas_height // pixels_per_cell
# 策略2基於事件密度提高倍數
density = num_events / time_range_seconds if time_range_seconds > 0 else 0
if density > 0.001: # 高密度(<1000秒/事件)
density_multiplier = 2.5 # 提高倍數
elif density > 0.0001: # 中密度
density_multiplier = 2.0 # 提高倍數
else: # 低密度
density_multiplier = 1.5 # 提高倍數
cols_by_density = int(cols_by_canvas * density_multiplier)
rows_by_density = int(rows_by_canvas * density_multiplier)
# 策略3基於標籤大小每個標籤至少40格大幅提高精度
label_width_seconds = time_range_seconds * label_width_ratio
min_grids_per_label = 40 # 每標籤至少40格確保精確判斷
cols_by_label = int((time_range_seconds / label_width_seconds) * min_grids_per_label)
# 取最大值(最細網格),大幅提高上限
grid_cols = min(max(cols_by_canvas, cols_by_density, cols_by_label), 800) # 上限提高到800
grid_rows = min(max(rows_by_canvas, rows_by_density, 100), 400) # 上限提高到400
logger.info(f"自動計算網格解析度:")
logger.info(f" 基於畫布: {cols_by_canvas} × {rows_by_canvas}")
logger.info(f" 基於密度: {cols_by_density} × {rows_by_density} (倍數: {density_multiplier:.1f})")
logger.info(f" 基於標籤: {cols_by_label} × 30")
logger.info(f" 最終選擇: {grid_cols} × {grid_rows}")
return (grid_cols, grid_rows)
def find_path_bfs(
start_row: int,
start_col: int,
end_row: int,
end_col: int,
grid_map: GridMap,
direction_constraint: str = "up" # "up" or "down"
) -> Optional[List[Tuple[int, int]]]:
"""
使用BFS尋找路徑改進版優先離開時間軸
策略:
1. 優先垂直移動(離開時間軸)
2. 遇到障礙物才水平繞行
3. 使用優先隊列,根據與時間軸的距離排序
Args:
start_row, start_col: 起點網格座標
end_row, end_col: 終點網格座標
grid_map: 網格地圖
direction_constraint: 方向約束("up"往上,"down"往下)
Returns:
路徑點列表 [(row, col), ...] 或 None找不到路徑
"""
# 檢查起點和終點是否可通行
if not grid_map.is_free(start_row, start_col):
logger.warning(f"起點 ({start_row},{start_col}) 被障礙物佔據")
return None
if not grid_map.is_free(end_row, end_col):
logger.warning(f"終點 ({end_row},{end_col}) 被障礙物佔據")
return None
import heapq
# 計算時間軸的Y座標row
timeline_row = grid_map.y_to_grid_y(0)
# 優先隊列:(優先度, row, col, path)
# 優先度 = 與時間軸的距離(越遠越好)+ 路徑長度(越短越好)
start_priority = 0
heap = [(start_priority, start_row, start_col, [(start_row, start_col)])]
visited = set()
visited.add((start_row, start_col))
# 方向優先順序(垂直優先於水平)
if direction_constraint == "up":
# 優先往上,然後才左右
directions = [(-1, 0), (0, 1), (0, -1)] # 上、右、左
else: # "down"
# 優先往下,然後才左右
directions = [(1, 0), (0, 1), (0, -1)] # 下、右、左
max_iterations = grid_map.grid_rows * grid_map.grid_cols * 2
iterations = 0
while heap and iterations < max_iterations:
iterations += 1
_, current_row, current_col, path = heapq.heappop(heap)
# 到達終點
if current_row == end_row and current_col == end_col:
logger.info(f"找到路徑,長度: {len(path)},迭代: {iterations}")
return path
# 探索鄰居(按優先順序)
for d_row, d_col in directions:
next_row = current_row + d_row
next_col = current_col + d_col
# 檢查是否可通行
if (next_row, next_col) in visited:
continue
if not grid_map.is_free(next_row, next_col):
continue
# 計算優先度
# 1. 與時間軸的距離(主要因素)
distance_from_timeline = abs(next_row - timeline_row)
# 2. 曼哈頓距離到終點(次要因素)
manhattan_to_goal = abs(next_row - end_row) + abs(next_col - end_col)
# 3. 路徑長度(避免繞太遠)
path_length = len(path)
# 綜合優先度:離時間軸越遠越好,離目標越近越好
# 權重調整:優先離開時間軸
priority = (
-distance_from_timeline * 100 + # 負數因為要最大化
manhattan_to_goal * 10 +
path_length
)
# 添加到優先隊列
visited.add((next_row, next_col))
new_path = path + [(next_row, next_col)]
heapq.heappush(heap, (priority, next_row, next_col, new_path))
logger.warning(f"BFS未找到路徑 ({start_row},{start_col}) → ({end_row},{end_col})")
return None
def simplify_path(
path_grid: List[Tuple[int, int]],
grid_map: GridMap
) -> List[Tuple[datetime, float]]:
"""
簡化路徑並轉換為實際座標
合併連續同向的線段,移除不必要的轉折點。
Args:
path_grid: 網格路徑點 [(row, col), ...]
grid_map: 網格地圖
Returns:
簡化後的路徑 [(datetime, y), ...]
"""
if not path_grid:
return []
simplified = [path_grid[0]] # 起點
for i in range(1, len(path_grid) - 1):
prev_point = path_grid[i - 1]
curr_point = path_grid[i]
next_point = path_grid[i + 1]
# 計算方向
dir1 = (curr_point[0] - prev_point[0], curr_point[1] - prev_point[1])
dir2 = (next_point[0] - curr_point[0], next_point[1] - curr_point[1])
# 如果方向改變,保留這個轉折點
if dir1 != dir2:
simplified.append(curr_point)
simplified.append(path_grid[-1]) # 終點
# 轉換為實際座標
result = []
for row, col in simplified:
dt = grid_map.grid_to_datetime(col)
y = grid_map.grid_to_y(row)
result.append((dt, y))
logger.debug(f"路徑簡化: {len(path_grid)}{len(simplified)}")
return result