多源视频帧对齐技术文档

概述

在机器人遥操作数据采集场景中,通常会同时使用多个相机从不同角度记录操作过程。由于各相机的启动时间、帧率可能存在差异,需要将多路视频与其他传感器数据(如位置追踪器、力传感器等)在时间轴上精确对齐,才能生成可用于机器学习训练的高质量数据集。

本文档介绍多源视频帧对齐的核心算法和最佳实践。

问题定义

输入

  • N 路视频流:每路视频有独立的起始时间戳 t_start[i] 和帧率 fps[i]
  • 传感器数据流:如位置追踪器,有自己的采样率和时间戳序列
  • 目标帧率:期望输出的统一帧率 fps_target

输出

  • 统一时间轴:等间隔时间戳序列 T = [t_0, t_0+dt, t_0+2dt, ...],其中 dt = 1/fps_target
  • 帧索引映射:对于每个时间点 t,计算每路视频对应的帧索引 frame_idx[i][t]
  • 插值后的传感器数据:对齐到统一时间轴

挑战

  1. 起始时间不同步:各相机启动时间可能相差数百毫秒
  2. 帧率差异:不同相机可能有不同帧率(如 30fps vs 60fps)
  3. 时钟漂移:长时间录制可能产生时钟漂移
  4. 浮点精度:时间戳计算需要避免累积误差

核心算法

1. 时间戳获取

首先需要获取每路数据流的绝对时间戳:

# 视频:从元数据中提取创建时间
video_start_ts = get_video_creation_time(video_path).timestamp()  # Unix timestamp
 
# 传感器:直接读取数据中的时间戳列
tracker_times = tracker_df["timestamp"].values  # Unix timestamps

注意事项

  • 视频的 CreateDate 通常只有秒级精度
  • 传感器数据通常有毫秒甚至微秒精度
  • 需要统一使用 Unix 时间戳(秒为单位的浮点数)

2. 确定有效时间范围

取所有数据源的时间交集:

time_start = max(
    tracker_times[0],           # 传感器起始
    max(video_start_times),     # 最晚开始的视频
    user_trim_start,            # 用户指定裁剪
)
 
time_end = min(
    tracker_times[-1],          # 传感器结束
    min(video_end_times),       # 最早结束的视频
    user_trim_end,              # 用户指定裁剪
)

3. 选择对齐参考(关键步骤)

问题:如果直接使用 time_start 作为输出时间轴的起点,可能会与所有视频的帧边界都不对齐,导致帧匹配时总是落在两帧之间。

解决方案:计算每对视频之间的对齐成本,选择总成本最小的视频作为参考。

dt = 1 / fps_target  # 帧周期
 
def alignment_cost(ref_start: float, other_starts: list[float]) -> float:
    """计算以 ref_start 为参考时的总对齐成本"""
    return sum((other - ref_start) % dt for other in other_starts)
 
# 选择对齐成本最小的视频作为参考
best_ref_idx = argmin([alignment_cost(s, all_starts) for s in all_starts])
align_video_start = all_starts[best_ref_idx]

对齐成本的含义

  • (other - ref) % dt 表示如果以 ref 为参考,other 视频的帧边界偏离量
  • 总成本越小,说明选择该参考后其他视频的对齐效果越好

4. 对齐到帧边界

time_start 调整到参考视频的帧边界上:

offset = (time_start - align_video_start) % dt
if offset > 0:
    time_start_aligned = time_start + (dt - offset)
else:
    time_start_aligned = time_start

图示

参考视频帧:     |-------|-------|-------|-------|
                0       1       2       3
时间轴:         t0     t0+dt  t0+2dt t0+3dt

原始 time_start:        ^  (落在帧间)
对齐后 time_start:           ^ (对齐到帧边界)

5. 生成统一时间轴

num_frames = int((time_end - time_start_aligned) / dt)
target_times = time_start_aligned + np.arange(num_frames) * dt

注意:使用 np.arange(n) * dt 而非循环累加,避免浮点误差累积。

6. 帧索引匹配

对于每个目标时间点,找到每路视频中最近的帧:

def match_to_frames(target_times: np.ndarray, frame_times: np.ndarray) -> np.ndarray:
    """最近邻帧匹配(向量化实现)"""
    # 二分搜索找到插入位置
    indices = np.searchsorted(frame_times, target_times, side="left")
    indices = np.clip(indices, 0, len(frame_times) - 1)
    
    # 比较左右邻居,选择更近的
    can_go_left = indices > 0
    left_indices = np.maximum(indices - 1, 0)
    
    left_dist = np.abs(frame_times[left_indices] - target_times)
    right_dist = np.abs(frame_times[indices] - target_times)
    
    should_go_left = can_go_left & (left_dist < right_dist)
    indices = np.where(should_go_left, indices - 1, indices)
    
    return indices

复杂度:O(N log M),其中 N 是目标帧数,M 是视频帧数。

7. 传感器数据插值

对于传感器数据(如位置、姿态),需要插值到统一时间轴:

# 位置数据:线性插值
from scipy.interpolate import interp1d
interpolator = interp1d(sensor_times, positions, axis=0, kind='linear')
interpolated_positions = interpolator(target_times)
 
# 姿态数据(四元数):球面线性插值 (Slerp)
from scipy.spatial.transform import Rotation, Slerp
rotations = Rotation.from_quat(quaternions)
slerp = Slerp(sensor_times, rotations)
interpolated_rotations = slerp(target_times)

特殊情况处理

不同帧率的视频

当视频帧率与目标帧率不同时:

  • 更高帧率:帧索引增量 > 1,跳过部分帧
  • 更低帧率:帧索引增量 < 1,部分帧会重复使用
# 60fps 视频对齐到 30fps 输出
# 帧索引: 0, 2, 4, 6, ... (增量为 2)
 
# 30fps 视频对齐到 60fps 输出
# 帧索引: 0, 0, 1, 1, 2, 2, ... (有重复)

边界外推

当目标时间超出数据范围时的处理策略:

  1. 裁剪:只保留有效范围内的数据
  2. 常量外推:使用边界值填充
  3. 线性外推:按趋势延伸(不推荐,可能产生不合理值)

缺失数据

传感器可能有丢帧或遮挡导致的缺失值:

  1. 短间隙:线性插值填充
  2. 长间隙:标记为无效或分段处理
  3. 完全缺失:使用 NaN 或默认值

验证方法

1. 时间间隔检查

dt = np.diff(target_times)
assert np.allclose(dt, 1/fps_target, rtol=1e-9), "时间间隔不均匀"

2. 帧索引单调性

for i in range(n_cameras):
    indices = frame_indices[:, i]
    diffs = np.diff(indices)
    assert np.all(diffs >= 0), f"相机 {i} 帧索引非单调递增"

3. 帧索引增量

# 同帧率时增量应为 1
if video_fps == target_fps:
    diffs = np.diff(frame_indices)
    assert np.all(diffs == 1), "同帧率时帧索引增量应为 1"

4. 对齐误差

dt = 1 / fps_target
for i, start_ts in enumerate(video_start_timestamps):
    offset = (target_times[0] - start_ts) % dt
    # 参考视频的偏移应接近 0
    # 其他视频的偏移应接近 0 或 dt

性能优化

向量化

避免 Python 循环,使用 NumPy 向量化操作:

# ❌ 慢
for i, t in enumerate(target_times):
    indices[i] = find_nearest(t, frame_times)
 
# ✅ 快
indices = np.searchsorted(frame_times, target_times)

预计算

如果需要多次查询同一视频的帧:

# 预计算帧时间数组
frame_times = video_start_ts + np.arange(n_frames) / fps

内存映射

对于大型数据集,使用内存映射避免一次性加载:

import numpy as np
data = np.memmap('large_file.npy', dtype='float32', mode='r')

常见问题

Q: 为什么不直接用视频的 pts(presentation timestamp)?

A: pts 是相对于视频开始的时间,不是绝对时间戳。需要结合视频的创建时间才能得到绝对时间。

Q: 如何处理时钟漂移?

A: 对于短时间(几分钟内)的录制,时钟漂移通常可忽略。对于长时间录制,可以:

  1. 使用硬件同步触发器
  2. 周期性校准时间戳
  3. 使用 PTP(精确时间协议)同步时钟

Q: 视频 CreateDate 只有秒级精度怎么办?

A: 这会带来 ±0.5 秒的不确定性。可以:

  1. 使用更精确的元数据字段(如 SubSecCreateDate)
  2. 通过音频波形或视觉特征自动对齐
  3. 接受这个精度限制,在应用层容错

参考资料