API Reference

MCAPReader

Open an MCAP recording, iterate streams, access intrinsics, and run session.export().

class MCAPReader:
    def __init__(
        self,
        path: str | Path,
        topics: TopicConfig | None = None,
        check_format: bool = False,
    )

Permissive reader for Stera MCAP recordings. Synchronises the RGB / depth / pose / IMU streams into SyncedFrame objects and provides bulk accessors plus the episode export entry point.

Constructor

ParamDefaultNotes
pathrequiredPath to the .mcap file.
topicsTopicConfig()Override topic names if your rig differs from the reference set.
check_formatFalseIf True, raise ValueError when any of REFERENCE_TOPICS is missing.
from stera.data import MCAPReader
session = MCAPReader("recording.mcap")

Properties

PropertyTypeNotes
pathPath
durationfloatSeconds.
num_rgb_framesint
num_depth_framesint
rgb_intrinsicsCameraIntrinsics | None
depth_intrinsicsCameraIntrinsics | NoneAuto-falls back to RGB scaled if no depth_camera_info topic.
R_optical_to_linknp.ndarray (3, 3)Read from /tf lazily; falls back to R_OPTICAL_TO_LINK.
hand_posesdict[int, list[HandPose]]Per-frame hand buffer populated via add_hand_pose.

Iterators

frames

def frames(
    self,
    max_depth_dt: float = 0.1,
    max_pose_dt: float = 0.1,
) -> Iterator[SyncedFrame]

Synchronised frame iterator. RGB drives the loop; depth/pose are nearest-neighbour matched within max_depth_dt / max_pose_dt seconds. IMU uses a fixed 50 ms window. See Synced frames.

Per-stream iterators

def rgb_frames(self)         -> Iterator[tuple[float, np.ndarray]]
def depth_frames(self)       -> Iterator[tuple[float, np.ndarray]]
def camera_poses(self)       -> Iterator[tuple[float, Pose6D]]
def imu_samples(self)        -> Iterator[tuple[float, dict]]
def tracking_states(self)    -> Iterator[tuple[float, dict]]

Each yields (timestamp_sec, decoded_value).

Bulk accessors

def all_camera_poses(self)   -> list[tuple[float, Pose6D]]
def all_imu_samples(self)    -> list[tuple[float, dict]]
def tf_transforms(self)      -> list[tuple[float, str, str, Pose6D]]   # (ts, parent, child, pose)
def trajectory(self)         -> list[tuple[float, Pose6D]]              # from /trajectory

Each is cached after first call.

Map geometry

def mesh(self) -> tuple[np.ndarray, np.ndarray, np.ndarray | None] | None

(verts (N,3), faces (M,3), colors (N,3) or None) from /map/mesh, or None if missing.

def point_cloud(
    self,
    source: str = "auto",         # "auto" / "mesh_cloud" / "point_cloud"
) -> tuple[np.ndarray, np.ndarray | None]
def dense_point_cloud(
    self,
    every_n: int = 10,
    max_pts_per_frame: int = 5_000,
    cam_exclude_radius: float = 1.0,
    voxel_size: float = 0.02,
    min_depth: float = 0.3,
    max_depth: float = 5.0,
) -> tuple[np.ndarray, np.ndarray]
def color_mesh(
    self,
    vertices: np.ndarray,
    every_n: int = 10,
) -> np.ndarray

Returns (N, 3) uint8 per-vertex colours by projecting into every Nth RGB frame and averaging.

See Map & geometry.

Session-level buffers

def add_hand_pose(self, frame_index: int, hands: list[HandPose]) -> None
def add_rgb_frame(self, frame_index: int, rgb: np.ndarray) -> None

add_rgb_frame lazily opens an internal H.264 writer; the resulting mp4 is finalised by session.export().

export

def export(
    self,
    out_dir,
    visualizer=None,
    skip_rgb_mp4: bool = False,
    skip_thumbnail: bool = False,
    thumbnail_rgb: np.ndarray | None = None,
) -> dict[str, list[str]]

Write the episode directory. Returns {"saved": [...], "skipped": [...]}.

Constants

REFERENCE_TOPICS = (
    "/camera/camera_info", "/camera/depth", "/camera/pose",
    "/camera/rgb/compressed", "/camera/tracking_state",
    "/device/imu", "/map/mesh", "/map/mesh_cloud", "/map/point_cloud",
    "/tf", "/trajectory",
)

The default topic fingerprint for check_format=True.

See also