MCAPReader
Open an MCAP recording, iterate streams, access intrinsics, and run session.export().
class MCAPReader:
def __init__(
self,
path: str | Path,
topics: TopicConfig | None = None,
check_format: bool = False,
)Permissive reader for Stera MCAP recordings. Synchronises the RGB / depth / pose / IMU streams into SyncedFrame objects and provides bulk accessors plus the episode export entry point.
Constructor
| Param | Default | Notes |
|---|---|---|
path | required | Path to the .mcap file. |
topics | TopicConfig() | Override topic names if your rig differs from the reference set. |
check_format | False | If True, raise ValueError when any of REFERENCE_TOPICS is missing. |
from stera.data import MCAPReader
session = MCAPReader("recording.mcap")Properties
| Property | Type | Notes |
|---|---|---|
path | Path | |
duration | float | Seconds. |
num_rgb_frames | int | |
num_depth_frames | int | |
rgb_intrinsics | CameraIntrinsics | None | |
depth_intrinsics | CameraIntrinsics | None | Auto-falls back to RGB scaled if no depth_camera_info topic. |
R_optical_to_link | np.ndarray (3, 3) | Read from /tf lazily; falls back to R_OPTICAL_TO_LINK. |
hand_poses | dict[int, list[HandPose]] | Per-frame hand buffer populated via add_hand_pose. |
Iterators
frames
def frames(
self,
max_depth_dt: float = 0.1,
max_pose_dt: float = 0.1,
) -> Iterator[SyncedFrame]Synchronised frame iterator. RGB drives the loop; depth/pose are nearest-neighbour matched within max_depth_dt / max_pose_dt seconds. IMU uses a fixed 50 ms window. See Synced frames.
Per-stream iterators
def rgb_frames(self) -> Iterator[tuple[float, np.ndarray]]
def depth_frames(self) -> Iterator[tuple[float, np.ndarray]]
def camera_poses(self) -> Iterator[tuple[float, Pose6D]]
def imu_samples(self) -> Iterator[tuple[float, dict]]
def tracking_states(self) -> Iterator[tuple[float, dict]]Each yields (timestamp_sec, decoded_value).
Bulk accessors
def all_camera_poses(self) -> list[tuple[float, Pose6D]]
def all_imu_samples(self) -> list[tuple[float, dict]]
def tf_transforms(self) -> list[tuple[float, str, str, Pose6D]] # (ts, parent, child, pose)
def trajectory(self) -> list[tuple[float, Pose6D]] # from /trajectoryEach is cached after first call.
Map geometry
def mesh(self) -> tuple[np.ndarray, np.ndarray, np.ndarray | None] | None(verts (N,3), faces (M,3), colors (N,3) or None) from /map/mesh, or None if missing.
def point_cloud(
self,
source: str = "auto", # "auto" / "mesh_cloud" / "point_cloud"
) -> tuple[np.ndarray, np.ndarray | None]def dense_point_cloud(
self,
every_n: int = 10,
max_pts_per_frame: int = 5_000,
cam_exclude_radius: float = 1.0,
voxel_size: float = 0.02,
min_depth: float = 0.3,
max_depth: float = 5.0,
) -> tuple[np.ndarray, np.ndarray]def color_mesh(
self,
vertices: np.ndarray,
every_n: int = 10,
) -> np.ndarrayReturns (N, 3) uint8 per-vertex colours by projecting into every Nth RGB frame and averaging.
See Map & geometry.
Session-level buffers
def add_hand_pose(self, frame_index: int, hands: list[HandPose]) -> None
def add_rgb_frame(self, frame_index: int, rgb: np.ndarray) -> Noneadd_rgb_frame lazily opens an internal H.264 writer; the resulting mp4 is finalised by session.export().
export
def export(
self,
out_dir,
visualizer=None,
skip_rgb_mp4: bool = False,
skip_thumbnail: bool = False,
thumbnail_rgb: np.ndarray | None = None,
) -> dict[str, list[str]]Write the episode directory. Returns {"saved": [...], "skipped": [...]}.
Constants
REFERENCE_TOPICS = (
"/camera/camera_info", "/camera/depth", "/camera/pose",
"/camera/rgb/compressed", "/camera/tracking_state",
"/device/imu", "/map/mesh", "/map/mesh_cloud", "/map/point_cloud",
"/tf", "/trajectory",
)The default topic fingerprint for check_format=True.