Source code for machinevisiontoolbox.Sources

"""
Video, camera, image-collection, and ZIP-archive sources for streaming images.
"""

from __future__ import annotations

import fnmatch
import io
import json
import base64
import pathlib
import shutil
import subprocess
import sys
import tarfile
import threading
from dataclasses import dataclass
from http import client
import os
import tempfile
import time
import urllib.request
import warnings
import zipfile
from collections import Counter, deque
from collections.abc import Iterator
from numpy.char import array
from tqdm import tqdm
from datetime import datetime, timezone
from typing import TYPE_CHECKING, IO, Any, Literal, cast

if TYPE_CHECKING:
    import torch
    from numpy.typing import DTypeLike

# from numpy.lib.arraysetops import isin
from abc import ABC, abstractmethod

import cv2
import numpy as np
from ansitable import ANSITable, Column
from spatialmath import Polygon2

from machinevisiontoolbox.ImageCore import Image
from machinevisiontoolbox.PointCloud import PointCloud
from machinevisiontoolbox.base import mvtb_path_to_datafile
from machinevisiontoolbox.base.imageio import convert, iread, iread_iter

try:
    from rosbags.rosbag1 import Reader as _RosBagReader1
    from rosbags.rosbag2 import Reader as _RosBagReader2
    from rosbags.typesys import Stores as _Stores, get_typestore as _get_typestore

    _rosbags_available = True
except ImportError:
    _RosBagReader1 = None
    _RosBagReader2 = None
    _Stores = None
    _get_typestore = None
    _rosbags_available = False

try:
    import roslibpy

    _roslibpy_available = True
except ImportError:
    roslibpy = None
    _roslibpy_available = False

try:
    import open3d as o3d

    _open3d_available = True
except ImportError:
    o3d = None
    _open3d_available = False

try:
    import py7zr as _py7zr

    _py7zr_available = True
except ImportError:
    _py7zr = None
    _py7zr_available = False

try:
    import rarfile as _rarfile

    _rarfile_available = True
except ImportError:
    _rarfile = None
    _rarfile_available = False

_unar_available: bool = (
    shutil.which("unar") is not None and shutil.which("lsar") is not None
)


def _wants_mono(kwargs: dict[str, Any]) -> bool:
    """Return True if convert kwargs request a single-plane output image."""
    return bool(kwargs.get("mono") or kwargs.get("grey") or kwargs.get("gray"))


def _make_image(*args: Any, **kwargs: Any) -> Image:
    """Construct an Image instance while keeping static type checking practical."""
    return Image(*args, **kwargs)  # pyright: ignore[reportAbstractUsage]


def _set_sample_metadata(sample: Any, timestamp: int, topic: str) -> None:
    """Attach dynamic timestamp/topic metadata used by stream sources."""
    setattr(sample, "timestamp", timestamp)
    setattr(sample, "topic", topic)


class ImageSource(ABC):
    @abstractmethod
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Base constructor for image sources.

        :param kwargs: source-specific keyword arguments
        :type kwargs: Any
        """
        pass

    @abstractmethod
    def __len__(self) -> int:
        """Return the number of images in this source."""
        raise NotImplementedError(f"{type(self).__name__} does not support len()")

    @abstractmethod
    def __iter__(self) -> Iterator:
        """Return an iterator over images in this source."""
        raise NotImplementedError(f"{type(self).__name__} does not support iteration")

    def tensor(
        self,
        device: str = "cpu",
        normalize="imagenet",
        dtype: "torch.dtype | None" = None,
    ) -> "torch.Tensor":
        """
        Convert all images from this source into a single 4D PyTorch tensor.

        :param device: target PyTorch device, e.g. ``"cpu"``, ``"cuda"`` or ``"mps"``,
            defaults to ``"cpu"``
        :type device: str, optional
        :param normalize: normalisation to apply to each frame; passed directly
            to :meth:`Image.tensor`, defaults to ``"imagenet"``
        :type normalize: str, tuple, or None, optional
        :param dtype: output tensor dtype, for example ``torch.float32``;
            passed directly to :meth:`Image.tensor`, defaults to None
        :type dtype: torch.dtype or None, optional
        :raises ImportError: if PyTorch is not installed
        :raises TypeError: if the source is not finite (no ``__len__``)
        :raises TypeError: if any yielded item is not an :class:`Image`
        :raises ValueError: if any frame has a different shape to the first
        :return: tensor of shape ``(N, C, H, W)``
        :rtype: torch.Tensor

        Images are decoded one at a time and written directly into a
        pre-allocated tensor, so peak memory is one decoded frame plus the
        output tensor — the full source need not reside in memory at once.
        This is particularly useful for :class:`VideoFile` and
        :class:`FileArchive`.

        Example:

            .. code-block:: pycon

            >>> from machinevisiontoolbox import VideoFile
            >>> t = VideoFile("traffic_sequence.mpg").tensor(normalize=None)
            >>> t.shape
            torch.Size([N, 3, H, W])

        """
        try:
            import torch as _torch
        except ImportError:
            raise ImportError(
                "PyTorch is required for tensor(). "
                "Install it with: pip install torch "
                "or pip install machinevision-toolbox-python[torch]"
            )

        if not hasattr(self, "__len__"):
            raise TypeError(
                f"{type(self).__name__} is not a finite source; "
                "tensor() requires a source with a known length"
            )

        n = len(self)
        it = iter(self)

        # decode the first frame to learn shape and dtype
        first = next(it)
        if not isinstance(first, Image):
            raise TypeError(
                f"Expected Image, got {type(first).__name__}; "
                "use a msgfilter or topicfilter to select only image topics"
            )
        first_t = first.tensor(device=device, normalize=normalize, dtype=dtype).squeeze(
            0
        )  # (C, H, W)
        expected_shape = first_t.shape

        # pre-allocate the full tensor
        out = _torch.empty((n,) + expected_shape, dtype=first_t.dtype, device=device)
        out[0] = first_t

        for i in range(1, n):
            img = next(it)
            if not isinstance(img, Image):
                raise TypeError(
                    f"Frame {i}: expected Image, got {type(img).__name__}; "
                    "use a msgfilter or topicfilter to select only image topics"
                )
            t = img.tensor(device=device, normalize=normalize, dtype=dtype).squeeze(0)
            if t.shape != expected_shape:
                raise ValueError(
                    f"Frame {i} shape {tuple(t.shape)} differs from "
                    f"frame 0 shape {tuple(expected_shape)}"
                )
            out[i] = t

        return out

    def disp(
        self,
        animate: bool = False,
        fps: float | None = None,
        title: str | None = None,
        loop: bool = False,
        **kwargs,
    ) -> None:
        """
        Display images from the source interactively.

        :param animate: if ``True``, play as timed animation; if ``False``
            (default), step through one frame at a time
        :type animate: bool, optional
        :param fps: playback rate in frames per second, used when
            ``animate=True``, defaults to None.  If not None ``animate`` mode is assumed.
        :type fps: float, optional
        :param title: window title; defaults to the ``topic`` attribute of each
            frame when present
        :type title: str or None, optional
        :param loop: restart when the source is exhausted (finite sources only),
            defaults to ``False``
        :type loop: bool, optional
        :param kwargs: additional keyword arguments passed to
            :meth:`Image.disp` for each frame, e.g. ``grid=True``

        Display images from the source.  For example::

            VideoFile("traffic_sequence.mpg").disp(fps=10)

        will display an animation of the video at 10 frames per second.

        By default, the display is

        **Keys — step-through mode** (``animate=False``):

        - ``[space]`` — next frame
        - ``[1-9]`` / ``[0]`` — jump 1–9 or 10 frames (finite sources only)
        - ``[l]`` / ``[c]`` / ``[d]`` — jump 50 / 100 / 500 frames (finite sources only)
        - ``[q]`` / ``[x]`` — quit

        **Keys — animate mode** (``animate=True``):

        - ``[space]`` — pause / resume
        - ``[+]`` / ``[=]`` — increase playback speed
        - ``[-]`` / ``[_]`` — decrease playback speed
        - ``[q]`` / ``[x]`` — quit

        .. note:: Jump keys and ``loop`` are only available for finite sources
            that implement ``__len__``.  For live streams such as
            :class:`VideoCamera` or :class:`ROSTopic` those controls are
            disabled automatically.

        :seealso: :class:`ImageSequence`
        """
        import matplotlib.pyplot as plt

        _finite = hasattr(self, "__len__")

        _jump = {
            "1": 1,
            "2": 2,
            "3": 3,
            "4": 4,
            "5": 5,
            "6": 6,
            "7": 7,
            "8": 8,
            "9": 9,
            "0": 10,
            "l": 50,
            "c": 100,
            "d": 500,
        }

        def _label(item, i):
            ts = getattr(item, "timestamp", None)
            if ts is not None:
                dt = datetime.fromtimestamp(ts / 1e9, tz=timezone.utc).astimezone()
                return dt.isoformat(timespec="milliseconds") + f" [{i}]"
            return f"[{i}]"

        def _make_overlay(fig):
            return fig.text(
                0,
                0,
                "",
                backgroundcolor="black",
                color="white",
                horizontalalignment="left",
                verticalalignment="bottom",
            )

        def _win_title(item):
            return title or getattr(item, "topic", None) or ""

        fig = None
        ax = None
        ts_text = None
        if fps is not None:
            animate = True

        if animate:
            state = {"fps": fps, "paused": False, "quit": False}

            def on_key(event, s=state):
                if event.key in ("q", "x"):
                    s["quit"] = True
                elif event.key == " ":
                    s["paused"] = not s["paused"]
                elif event.key in ("+", "="):
                    s["fps"] = s["fps"] * 1.5
                elif event.key in ("-", "_"):
                    s["fps"] = max(1.0, s["fps"] / 1.5)

            it = iter(self)
            i = 0
            while True:
                try:
                    x = next(it)
                except StopIteration:
                    if loop and _finite:
                        it = iter(self)
                        i = 0
                        continue
                    break
                if state["quit"]:
                    break
                while state["paused"] and not state["quit"]:
                    plt.pause(0.05)
                if state["quit"]:
                    break
                if fig is None:
                    fig = plt.figure()
                    ax = fig.add_subplot(111)
                    fig.canvas.mpl_connect("key_press_event", on_key)
                    ts_text = _make_overlay(fig)
                    print(
                        "\nKeys: [space] pause/resume  [+] faster  [-] slower  [q/x] quit"
                    )
                x.disp(
                    fps=state["fps"], reuse=True, title=_win_title(x), ax=ax, **kwargs
                )
                assert ts_text is not None
                ts_text.set_text(_label(x, i))
                i += 1

            if fig is not None:
                if not state["quit"]:
                    plt.show(block=True)
                plt.close(fig)

        else:
            view_state = {"next": False, "quit": False, "skip": 0}

            def on_key(event, s=view_state):
                if event.key == " ":
                    s["next"] = True
                elif event.key in ("q", "x"):
                    s["quit"] = True
                elif _finite and event.key in _jump:
                    s["skip"] = _jump[event.key]
                    s["next"] = True

            it = iter(self)
            i = 0
            while True:
                try:
                    x = next(it)
                except StopIteration:
                    break
                if view_state["quit"]:
                    break
                if fig is None:
                    fig = plt.figure()
                    ax = fig.add_subplot(111)
                    ts_text = _make_overlay(fig)
                    fig.canvas.mpl_connect("key_press_event", on_key)
                    if _finite:
                        print(
                            "\nKeys: [space] next  [1-9] jump N frames  [0] jump 10"
                            "  [l/c/d] jump 50/100/500  [q/x] quit"
                        )
                    else:
                        print("\nKeys: [space] next frame  [q/x] quit")
                x.disp(title=_win_title(x), ax=ax, reuse=True, **kwargs)
                assert ts_text is not None
                ts_text.set_text(_label(x, i))
                view_state["next"] = False
                while not view_state["next"] and not view_state["quit"]:
                    plt.pause(0.05)
                skip = view_state["skip"]
                view_state["skip"] = 0
                for _ in range(skip):
                    try:
                        next(it)
                        i += 1
                    except StopIteration:
                        break
                i += 1

            if fig is not None:
                plt.close(fig)


[docs] class VideoFile(ImageSource): """ Iterate images from a video file :param filename: Path to video file :type filename: str :param kwargs: options applied to image frames, see :func:`~machinevisiontoolbox.base.imageio.convert` The resulting object is an iterator over the frames of the video file. The iterator returns :class:`Image` objects where: - the ``name`` attribute is the name of the video file - the ``id`` attribute is the frame number within the file If the path is not absolute, the video file is first searched for relative to the current directory, and if not found, it is searched for in the ``images`` folder of the ``mvtb-data`` package, installed as a Toolbox dependency. Example: .. code-block:: python from machinevisiontoolbox import VideoFile video = VideoFile("traffic_sequence.mpg") len(video) for im in video: pass or using a context manager to ensure the file handle is always released: .. code-block:: python with VideoFile("traffic_sequence.mpg") as video: for im in video: # process image :references: - |RVC3|, Section 11.1.4. :seealso: :func:`~machinevisiontoolbox.base.imageio.convert` `opencv.VideoCapture <https://docs.opencv.org/4.x/d8/dfe/classcv_1_1VideoCapture.html#a57c0e81e83e60f36c83027dc2a188e80>`_ """ filename: str nframes: int shape: tuple fps: int args: dict cap: cv2.VideoCapture | None i: int def __init__(self, filename: str, **kwargs: Any) -> None: self.filename = str(mvtb_path_to_datafile("images", filename)) # get the number of frames in the video # not sure it's always correct cap = cv2.VideoCapture(self.filename) ret, frame = cap.read() self.nframes = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.shape = frame.shape self.fps = int(cap.get(cv2.CAP_PROP_FPS)) self.args = kwargs cap.release() self.cap = None self.i = 0 def __iter__(self) -> VideoFile: self.i = 0 if self.cap is not None: self.cap.release() self.cap = cv2.VideoCapture(self.filename) return self def __next__(self) -> Image: if self.cap is None: self.__iter__() assert self.cap is not None ret, frame = self.cap.read() if ret is False: self.cap.release() raise StopIteration else: opts: dict[str, Any] = {"rgb": True} opts.update(self.args) if frame.ndim == 3 and not _wants_mono(opts): im = _make_image( frame, id=self.i, name=self.filename, colororder="RGB", **opts ) else: im = _make_image(frame, id=self.i, name=self.filename, **opts) self.i += 1 return im def __len__(self) -> int: return self.nframes def __str__(self) -> str: return f"VideoFile({os.path.basename(self.filename)}) {self.shape[1]} x {self.shape[0]}, {self.nframes} frames @ {self.fps}fps" def __repr__(self) -> str: return f"VideoFile(file={os.path.basename(self.filename)}, size=({self.shape[1]}, {self.shape[0]}), nframes={self.nframes}, fps={self.fps})" def __enter__(self) -> VideoFile: return self def __exit__(self, exc_type, exc, tb) -> None: if self.cap is not None: self.cap.release() self.cap = None
[docs] class VideoCamera(ImageSource): """ Iterate images from a local video camera :param id: Identity of local camera :type id: int :param kwargs: options applied to image frames, see :func:`~machinevisiontoolbox.base.imageio.convert` Connect to a local video camera. For some cameras this will cause the recording light to come on. The resulting object is an iterator over the frames from the video camera. The iterator returns :class:`Image` objects. Example: .. code-block:: python from machinevisiontoolbox import VideoCamera video = VideoCamera(0) for im in video: pass alternatively: .. code-block:: python img = next(video) or using a context manager to ensure the camera is released: .. code-block:: python with VideoCamera(0) as camera: for im in camera: # process image .. note:: The value of ``id`` is system specific but generally 0 is the first attached video camera. On a Mac running 13.0 (Ventura) or later and an iPhone with iOS 16 or later, the Continuity Camera feature allows the phone camera to be used as a local video camera, and it will appear as a separate camera with its own ID. OpenCV does not expose a portable API for mapping integer ``id`` values to human-readable camera names. Use :meth:`list` to enumerate the cameras that are present on the current machine together with their best-available names. :references: - |RVC3|, Section 11.1.3. :seealso: :meth:`list` :func:`~machinevisiontoolbox.base.imageio.convert` `cv2.VideoCapture <https://docs.opencv.org/4.x/d8/dfe/classcv_1_1VideoCapture.html#a57c0e81e83e60f36c83027dc2a188e80>`_ """ id: int cap: cv2.VideoCapture args: dict rgb: bool i: int def __init__(self, id: int = 0, rgb: bool = True, **kwargs: Any) -> None: self.id = id self.cap = cv2.VideoCapture(id) self.args = kwargs self.rgb = rgb self.i = 0 def __iter__(self) -> VideoCamera: self.i = 0 self.cap.release() self.cap = cv2.VideoCapture(self.id) return self def __next__(self) -> Image: ret, frame = self.cap.read() # frame will be in BGR order if ret is False: self.cap.release() raise StopIteration else: opts: dict[str, Any] = {"rgb": self.rgb, "copy": True} opts.update(self.args) if self.rgb: if frame.ndim == 3 and not _wants_mono(opts): img = _make_image(frame, id=self.i, colororder="RGB", **opts) else: img = _make_image(frame, id=self.i, **opts) else: if frame.ndim == 3 and not _wants_mono(opts): img = _make_image(frame, id=self.i, colororder="BGR", **opts) else: img = _make_image(frame, id=self.i, **opts) self.i += 1 return img
[docs] def grab(self) -> Image: """ Grab single frame from camera :return: next frame from the camera :rtype: :class:`Image` .. deprecated:: 0.11.4 Use :func:`next` on the iterator instead, for example ``next(camera)``. """ warnings.warn( "VideoCamera.grab() is deprecated; use next(camera) instead.", DeprecationWarning, stacklevel=2, ) return next(self)
[docs] def release(self) -> None: """ Release the camera Disconnect from the local camera, and for cameras with a recording light, turn off that light. """ self.cap.release()
def __str__(self) -> str: backend = self.cap.getBackendName() return f"VideoCamera({self.id}) {self.width} x {self.height} @ {self.framerate}fps using {backend}" def __repr__(self) -> str: backend = self.cap.getBackendName() return f"VideoCamera(id={self.id}, size=({self.width}, {self.height}), fps={self.framerate}, backend={backend})" # see https://docs.opencv.org/4.x/d4/d15/group__videoio__flags__base.html#gaeb8dd9c89c10a5c63c139bf7c4f5704d properties: dict[str, int] = { "brightness": cv2.CAP_PROP_BRIGHTNESS, "contrast": cv2.CAP_PROP_CONTRAST, "saturation": cv2.CAP_PROP_SATURATION, "hue": cv2.CAP_PROP_HUE, "gain": cv2.CAP_PROP_GAIN, "exposure": cv2.CAP_PROP_EXPOSURE, "auto-exposure": cv2.CAP_PROP_AUTO_EXPOSURE, "gamma": cv2.CAP_PROP_GAMMA, "temperature": cv2.CAP_PROP_TEMPERATURE, "auto-whitebalance": cv2.CAP_PROP_AUTO_WB, "whitebalance-temperature": cv2.CAP_PROP_WB_TEMPERATURE, "ios:exposure": cv2.CAP_PROP_IOS_DEVICE_EXPOSURE, "ios:whitebalance": cv2.CAP_PROP_IOS_DEVICE_WHITEBALANCE, }
[docs] def get(self, property: str | None = None) -> float | dict[str, float]: """ Get camera property :param prop: camera property name :type prop: str :return: parameter value :rtype: float Get value for the specified property. Value 0 is returned when querying a property that is not supported by the backend used by the VideoCapture instance. ============================== ========================================================= Property description ============================== ========================================================= ``"brightness"`` image brightness (offset) ``"contrast"`` contrast of the image ``"saturation"`` saturation of the image ``"hue"`` hue of the image ``"gain"`` gain of the image ``"exposure"`` exposure of image ``"auto-exposure"`` exposure control by camera ``"gamma"`` gamma of image ``"temperature"`` color temperature ``"auto-whitebalance"`` enable/ disable auto white-balance ``"whitebalance-temperature"`` white-balance color temperature ``"ios:exposure"`` exposure of image for Apple AVFOUNDATION backend ``"ios:whitebalance"`` white balance of image for Apple AVFOUNDATION backend ============================== ========================================================= :seealso: :meth:`set` """ if property is not None: return self.cap.get(self.properties[property]) else: return { property: self.cap.get(self.properties[property]) for property in self.properties }
[docs] def set(self, property: str, value: float) -> float: """ Set camera property :param prop: camera property name :type prop: str :param value: new property value :type value: float :return: parameter value :rtype: float Set new value for the specified property. Value 0 is returned when querying a property that is not supported by the backend used by the VideoCapture instance. ============================== ========================================================= Property description ============================== ========================================================= ``"brightness"`` image brightness (offset) ``"contrast"`` contrast of the image ``"saturation"`` saturation of the image ``"hue"`` hue of the image ``"gain"`` gain of the image ``"exposure"`` exposure of image ``"auto-exposure"`` exposure control by camera ``"gamma"`` gamma of image ``"temperature"`` color temperature ``"auto-whitebalance"`` enable/ disable auto white-balance ``"whitebalance-temperature"`` white-balance color temperature ``"ios:exposure"`` exposure of image for Apple AVFOUNDATION backend ``"ios:whitebalance"`` white balance of image for Apple AVFOUNDATION backend ============================== ========================================================= :seealso: :meth:`get` """ return self.cap.set(self.properties[property], value)
@property def width(self) -> int: """ Width of video frame :return: width of video frame in pixels :rtype: int :seealso: :meth:`height` :meth:`shape` """ return int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) @property def height(self) -> int: """ Height of video frame :return: height of video frame in pixels :rtype: int :seealso: :meth:`width` :meth:`shape` """ return int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) @property def framerate(self) -> int: """ Camera frame rate :return: camera frame rate in frames per second :rtype: int .. note:: If frame rate cannot be determined return -1 """ try: fps = int(self.cap.get(cv2.CAP_PROP_FPS)) except ValueError: fps = -1 return fps @property def shape(self) -> tuple[int, int]: """ Shape of video frame :return: height and width of video frame in pixels :rtype: int, int :seealso: :meth:`height` :meth:`width` """ return (self.height, self.width)
[docs] @classmethod def list(cls) -> list[dict]: """ Enumerate available local video cameras :return: list of dicts, one per camera, each with keys ``id``, ``width``, ``height``, ``fps``, and ``name`` (best-effort). On macOS, ``width``, ``height``, and ``fps`` are ``None`` if the process has not yet been granted camera authorisation. :rtype: list[dict] Probes integer indices 0, 1, 2, … until no camera responds, and returns a list describing each one. Camera names are obtained by a platform-specific method: - **macOS** — ``system_profiler SPCameraDataType -json`` (no camera authorisation required) - **Linux** — ``/sys/class/video4linux/videoN/name`` sysfs entry - **Windows** — not available; ``name`` is ``None`` The name mapping is best-effort: if the platform query fails the ``name`` key is ``None``. Example:: $ python >>> from machinevisiontoolbox import VideoCamera >>> for cam in VideoCamera.list(): ... print(cam) {'id': 0, 'width': 1280, 'height': 720, 'fps': 30, 'name': 'FaceTime HD Camera'} {'id': 1, 'width': 1920, 'height': 1080, 'fps': 30, 'name': 'iPhone Camera'} :seealso: :class:`VideoCamera` """ def _query_macos() -> list[str]: """Return ordered list of camera names from system_profiler JSON.""" try: out = subprocess.check_output( ["system_profiler", "SPCameraDataType", "-json"], stderr=subprocess.DEVNULL, text=True, ) data = json.loads(out) return [item["_name"] for item in data.get("SPCameraDataType", [])] except ( FileNotFoundError, subprocess.CalledProcessError, KeyError, json.JSONDecodeError, ): return [] def _names_linux() -> dict[int, str]: """Read /sys/class/video4linux/videoN/name on Linux.""" result = {} sysfs = pathlib.Path("/sys/class/video4linux") if not sysfs.exists(): return result for entry in sorted(sysfs.iterdir()): name_file = entry / "name" try: idx = int(entry.name.replace("video", "")) result[idx] = name_file.read_text().strip() except (ValueError, OSError): pass return result cameras = [] platform = sys.platform if platform == "darwin": # Enumerate from system_profiler — works even without camera # authorisation. OpenCV properties are best-effort: if the # process hasn't been granted camera access yet, width/height/fps # will be None for those cameras. names = _query_macos() for idx, name in enumerate(names): cap = cv2.VideoCapture(idx) if cap.isOpened(): entry = { "id": idx, "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), "fps": int(cap.get(cv2.CAP_PROP_FPS)), "name": name, } cap.release() else: cap.release() entry = { "id": idx, "width": None, "height": None, "fps": None, "name": name, } cameras.append(entry) else: name_map = _names_linux() if platform.startswith("linux") else {} for idx in range(32): # practical upper bound cap = cv2.VideoCapture(idx) if not cap.isOpened(): cap.release() break entry = { "id": idx, "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), "fps": int(cap.get(cv2.CAP_PROP_FPS)), "name": name_map.get(idx), } cap.release() cameras.append(entry) return cameras
def __enter__(self) -> VideoCamera: return self def __exit__(self, exc_type, exc, tb) -> None: self.release()
[docs] class FileCollection(ImageSource): """ Iterate images from a collection of files :param filename: wildcard path to image files :type filename: str :param loop: Endlessly loop over the files, defaults to False :type loop: bool, optional :param kwargs: options applied to image frames, see :func:`~machinevisiontoolbox.base.imageio.convert` The resulting object is an iterator over the image files that match the wildcard description. The iterator returns :class:`Image` objects where the ``name`` attribute is the name of the image file. **Eager mode** (default): All images are decoded at construction time. This makes random access (``files[i]``), slicing, and ``len(files)`` fast but can be slow and memory-intensive for large collections. .. code-block:: python from machinevisiontoolbox import FileCollection images = FileCollection('campus/*.png') len(images) # fast img = images[5] # fast, in-memory for image in images: # iterate in-memory pass **Lazy mode** (via context manager): Files are decoded on-demand during iteration. This avoids the startup cost of decoding all images upfront. Trade-off: random access (``__getitem__``) is not available; use only for sequential iteration. .. code-block:: python from machinevisiontoolbox import FileCollection with FileCollection('campus/*.png') as images: for image in images: # decode on-demand pass If the path is not absolute, the file is first searched for relative to the current directory, and if not found, it is searched for in the ``images`` folder of the ``mvtb-data`` package, installed as a Toolbox dependency. Example: .. code-block:: python from machinevisiontoolbox import FileCollection images = FileCollection('campus/*.png') len(images) for image in images: # iterate over images # do a thing alternatively: .. code-block:: python img = files[i] # load i'th file from the collection or using a context manager for memory-efficient streaming: .. code-block:: python with FileCollection('campus/*.png') as images: for image in images: # process image :references: - |RVC3|, Section 11.1.2. :seealso: :func:`~machinevisiontoolbox.base.imageio.convert` `cv2.imread <https://docs.opencv.org/4.x/d4/da8/group__imgcodecs.html#ga288b8b3da0892bd651fce07b3bbd3a56>`_ """ images: list names: list args: dict loop: bool i: int _lazy: bool _filename: str | None _lazy_iter: Iterator | None _loaded: bool def __init__( self, filename: str | None = None, loop: bool = False, **kwargs: Any ) -> None: self.images = [] self.names = [] self._lazy = False self._filename = filename self.args = kwargs self.loop = loop self.i = 0 self._lazy_iter = None self._loaded = False def _load_eager(self) -> None: """Load and decode all images eagerly.""" if self._loaded or self._filename is None: return images, names = iread(self._filename, rgb=True) if isinstance(images, np.ndarray): self.images = [images] self.names = [names] else: self.images = images self.names = cast(list[str], names) self._loaded = True def __getitem__(self, i: int | slice) -> FileCollection | Image: if self._lazy: raise TypeError( "Random access is not supported in lazy mode. " "Iterate sequentially or exit context manager to use eager mode." ) self._load_eager() if isinstance(i, slice): # slice of a collection -> FileCollection new = self.__class__() new.images = self.images[i] new.names = self.names[i] new.args = self.args return new else: # element of a collection -> Image data = self.images[i] if data.ndim == 3 and not _wants_mono(self.args): return _make_image( data, name=self.names[i], id=i, colororder="RGB", **self.args ) else: return _make_image(data, id=i, name=self.names[i], **self.args) def __iter__(self) -> FileCollection: self.i = 0 if self._lazy: self._lazy_iter = None else: # Ensure eager load before iterating in non-CM mode self._load_eager() return self def __str__(self) -> str: if self._lazy: return f"FileCollection(lazy mode)" return "\n".join([str(f) for f in self.names]) def __repr__(self) -> str: if self._lazy: return f"FileCollection(lazy mode)" return f"FileCollection(nimages={len(self.images)})" def __next__(self) -> Image: if self._lazy: if self._filename is None: raise StopIteration # Lazily create the on-demand image iterator if self._lazy_iter is None: self._lazy_iter = iread_iter(self._filename, rgb=True) try: if self.loop: try: data, name = next(self._lazy_iter) except StopIteration: self._lazy_iter = iread_iter(self._filename, rgb=True) data, name = next(self._lazy_iter) else: data, name = next(self._lazy_iter) except StopIteration: raise StopIteration if data.ndim == 3 and not _wants_mono(self.args): im = _make_image( data, id=self.i, name=name, colororder="RGB", **self.args ) else: im = _make_image(data, id=self.i, name=name, **self.args) self.i += 1 return im else: # Eager iteration: use pre-loaded images if self.i >= len(self.names): if self.loop: self.i = 0 else: raise StopIteration data = self.images[self.i] if data.ndim == 3 and not _wants_mono(self.args): im = _make_image( data, id=self.i, name=self.names[self.i], colororder="RGB", **self.args, ) else: im = _make_image(data, id=self.i, name=self.names[self.i], **self.args) self.i += 1 return im def __len__(self) -> int: if self._lazy: raise TypeError( "__len__() is not reliable in lazy mode. " "Exit context manager to use eager mode with known length." ) self._load_eager() return len(self.images) def __enter__(self) -> FileCollection: # Switch to lazy mode - don't load anything yet, wait for __iter__ self._lazy = True self.i = 0 self._lazy_iter = None return self def __exit__(self, exc_type, exc, tb) -> None: # Exit lazy mode self._lazy = False self._lazy_iter = None
[docs] class ImageSequence(ImageSource): """ An in-memory sequence of :class:`Image` objects with interactive display. :param images: sequence of images :type images: iterable of :class:`Image` The sequence is materialised into a list on construction. Items are expected to be :class:`Image` instances; a ``timestamp`` attribute (ROS nanosecond epoch) and ``topic`` attribute are used for the display overlay when present. Example: .. code-block:: python from machinevisiontoolbox import ROSBag, ImageSequence bag = ROSBag("mybag.bag", msgfilter="Image") seq = ImageSequence(bag) seq.disp() # step through one frame at a time seq.disp(animate=True, fps=5) # timed playback :seealso: :class:`PointCloudSequence` """ _frames: list _i: int def __init__(self, images: Any) -> None: self._frames = list(images) self._i = 0 def __len__(self) -> int: return len(self._frames) def __getitem__(self, i: int | slice) -> Image | list[Image]: return self._frames[i] def __iter__(self) -> ImageSequence: self._i = 0 return self def __next__(self) -> Image: if self._i >= len(self._frames): raise StopIteration frame = self._frames[self._i] self._i += 1 return frame def __enter__(self) -> ImageSequence: return self def __exit__(self, *_): pass def __repr__(self) -> str: return f"ImageSequence(nimages={len(self._frames)})"
class _ZipAdapter: """Zip archive backend for :class:`FileArchive`.""" def __init__(self, filename: pathlib.Path) -> None: self._zf = zipfile.ZipFile(filename, "r") def namelist(self) -> list[str]: return self._zf.namelist() def read(self, name: str) -> bytes: return self._zf.read(name) def open(self, name: str) -> IO[bytes]: return self._zf.open(name) def close(self) -> None: self._zf.close() class _TarAdapter: """Tar archive backend (plain, .gz, .bz2, .xz) for :class:`FileArchive`.""" def __init__(self, filename: pathlib.Path) -> None: self._tf = tarfile.open(str(filename), "r:*") def namelist(self) -> list[str]: return [m.name for m in self._tf.getmembers() if m.isfile()] def read(self, name: str) -> bytes: f = self._tf.extractfile(name) if f is None: return b"" return f.read() def open(self, name: str) -> IO[bytes]: f = self._tf.extractfile(name) if f is None: raise KeyError(f"{name!r} is not a regular file in this archive") return f def close(self) -> None: self._tf.close() class _SevenZAdapter: """7-Zip archive backend for :class:`FileArchive` (requires ``py7zr``).""" def __init__(self, filename: pathlib.Path) -> None: if not _py7zr_available: raise ImportError( "py7zr is required to read 7-Zip archives. " "Install it with: pip install py7zr" ) assert _py7zr is not None self._filename = str(filename) with _py7zr.SevenZipFile(self._filename, mode="r") as a: self._names = [f.filename for f in a.list() if not f.is_directory] def namelist(self) -> list[str]: return self._names def read(self, name: str) -> bytes: assert _py7zr is not None with tempfile.TemporaryDirectory() as tmpdir: with _py7zr.SevenZipFile(self._filename, mode="r") as a: a.extract(path=tmpdir, targets=[name]) target = pathlib.Path(tmpdir) / name if target.exists(): return target.read_bytes() # Some archives may store member names with different separators. matches = list(pathlib.Path(tmpdir).rglob(pathlib.Path(name).name)) return matches[0].read_bytes() if matches else b"" def open(self, name: str) -> IO[bytes]: return io.BytesIO(self.read(name)) def close(self) -> None: pass # no persistent handle class _UnarAdapter: """Shell-out adapter using ``unar``/``lsar`` (The Unarchiver) for :class:`FileArchive`. Does not require the ``rarfile`` Python package; needs only the ``unar`` and ``lsar`` command-line tools in PATH (``brew install unar`` on macOS). """ def __init__(self, filename: pathlib.Path) -> None: unar = shutil.which("unar") lsar = shutil.which("lsar") if unar is None or lsar is None: raise ImportError( "unar and lsar are required to read this archive but were not found.\n" "Install with: brew install unar (macOS)\n" " or: sudo apt install unar (Debian/Ubuntu)" ) self._unar = unar self._lsar = lsar self._filename = str(filename) result = subprocess.run( [self._lsar, "-j", self._filename], capture_output=True, text=True, check=True, ) data = json.loads(result.stdout) self._names = [ e["XADFileName"] for e in data.get("lsarContents", []) if not e.get("XADIsDirectory", False) and not e["XADFileName"].endswith("/") ] def namelist(self) -> list[str]: return self._names def read(self, name: str) -> bytes: with tempfile.TemporaryDirectory() as tmpdir: subprocess.run( [self._unar, "-q", "-f", "-o", tmpdir, self._filename, name], check=True, capture_output=True, ) # unar recreates the internal directory structure inside tmpdir target = pathlib.Path(tmpdir) / name if target.exists(): return target.read_bytes() # fall back: search by basename (handles path separator differences) matches = list(pathlib.Path(tmpdir).rglob(pathlib.Path(name).name)) return matches[0].read_bytes() if matches else b"" def open(self, name: str) -> IO[bytes]: return io.BytesIO(self.read(name)) def close(self) -> None: pass # stateless shell-out; nothing to close class _RarAdapter: """RAR archive backend for :class:`FileArchive` (requires ``rarfile`` + ``unrar``/``unar``).""" def __init__(self, filename: pathlib.Path) -> None: if not _rarfile_available: raise ImportError( "rarfile is required to read RAR archives. " "Install it with: pip install rarfile" ) assert _rarfile is not None # auto-detect available extraction tool (unrar, unar, bsdtar, 7z) _rarfile.tool_setup() try: self._rf = _rarfile.RarFile(str(filename), "r") except _rarfile.RarCannotExec: raise ImportError( "No RAR extraction tool found. Install one of:\n" " macOS: brew install unar\n" " Debian/Ubuntu: sudo apt install unar\n" " (or) pip install rarfile + brew install rar" ) def namelist(self) -> list[str]: return [info.filename for info in self._rf.infolist() if not info.is_dir()] def read(self, name: str) -> bytes: return self._rf.read(name) def open(self, name: str) -> IO[bytes]: return cast(IO[bytes], self._rf.open(name)) def close(self) -> None: self._rf.close()
[docs] class FileArchive(ImageSource): """ Iterate images from a compressed archive :param filename: path to archive file :type filename: str :param filter: a Unix shell-style wildcard that specifies which files to include when iterating over the archive :type filter: str, optional :param loop: endlessly loop over the files, defaults to False :type loop: bool, optional :param kwargs: options applied to image frames, see :func:`~machinevisiontoolbox.base.imageio.convert` The resulting object is an iterator over the files within the archive. The iterator returns the file as a :class:`Image` instance if it is an image (the ``name`` attribute is the filename within the archive), or a :class:`bytes` object for non-image files. The following archive formats are supported: ================== =================================================== Extension Notes ================== =================================================== ``.zip`` stdlib, always available ``.tar`` stdlib, always available ``.tar.gz`` / ``.tgz`` stdlib, always available ``.tar.bz2`` stdlib, always available ``.tar.xz`` stdlib, always available ``.7z`` requires ``pip install py7zr`` ``.rar`` requires ``brew install unar`` (macOS) or ``apt install unar``; alternatively ``pip install rarfile`` with ``unrar``/``unar`` in PATH ================== =================================================== If the path is not absolute it is first searched for relative to the current directory, and if not found, it is searched for in the ``images`` folder of the ``mvtb-data`` package, installed as a Toolbox dependency. To read just the image files within the archive, use a ``filter`` such as ``"*.png"`` or ``"*.pgm"``. Note that ``filter`` is a Unix shell style wildcard expression, not a Python regexp. Example: .. code-block:: python from machinevisiontoolbox import FileArchive images = FileArchive('bridge-l.zip') len(images) for image in images: # iterate over files pass alternatively: .. code-block:: python image = images[i] # load i'th file from the archive or using a context manager to ensure the archive is closed: .. code-block:: python with FileArchive('bridge-l.zip') as images: for image in images: # process image :references: - |RVC3|, Section 11.1.2. :seealso: :meth:`open` :func:`~machinevisiontoolbox.base.imageio.convert` `cv2.imread <https://docs.opencv.org/4.x/d4/da8/group__imgcodecs.html#ga288b8b3da0892bd651fce07b3bbd3a56>`_ """ files: list[str] args: dict loop: bool i: int def __init__( self, filename: str, filter: str | None = None, loop: bool = False, **kwargs: Any, ) -> None: path = pathlib.Path(mvtb_path_to_datafile("images", filename)) suffixes = path.suffixes suffix = path.suffix.lower() if suffix == ".zip": self._archive: ( _ZipAdapter | _TarAdapter | _SevenZAdapter | _UnarAdapter | _RarAdapter ) = _ZipAdapter(path) elif ".tar" in [s.lower() for s in suffixes] or suffix == ".tgz": self._archive = _TarAdapter(path) elif suffix == ".7z": self._archive = _SevenZAdapter(path) elif suffix == ".rar": # prefer unar (shell-out, no extra pip dep); fall back to rarfile if _unar_available: self._archive = _UnarAdapter(path) else: self._archive = _RarAdapter(path) else: # unknown extension — probe zip then tar try: self._archive = _ZipAdapter(path) except zipfile.BadZipFile: try: self._archive = _TarAdapter(path) except tarfile.TarError: raise ValueError( f"Unrecognised archive format for {pathlib.Path(path).name!r}. " "Supported: .zip, .tar, .tar.gz, .tgz, .tar.bz2, .tar.xz, " ".7z (requires py7zr), .rar (requires unar/lsar or rarfile + extractor)" ) if filter is None: files = [f for f in self._archive.namelist() if not f.endswith("/")] else: files = fnmatch.filter(self._archive.namelist(), filter) self.files = sorted(files) self.args = kwargs self.loop = loop self.i = 0
[docs] def open(self, name: str) -> IO[bytes]: """ Open a file from the archive :param name: file name :type name: str :return: read-only handle to the named file :rtype: file object Opens the specified file within the archive. Typically the ``FileArchive`` instance is used as an iterator over the image files within, but this method can be used to access non-image data such as camera calibration data etc. that might also be contained within the archive and is excluded by the ``filter``. """ return self._archive.open(name)
[docs] def ls(self) -> None: """ List all files within the archive to stdout. """ for name in self._archive.namelist(): print(name)
def __getitem__(self, i: int) -> Image | bytes: im = self._read(i) if isinstance(im, np.ndarray): if im.ndim == 3 and not _wants_mono(self.args): return _make_image( im, name=self.files[i], id=i, colororder="BGR", **self.args ) else: return _make_image(im, id=i, name=self.files[i], **self.args) else: # not an image file, just return the contents return im def __iter__(self) -> FileArchive: self.i = 0 return self def __str__(self) -> str: return "FileArchive(\n " + "\n ".join(self.files) + "\n)" def __repr__(self) -> str: return f"FileArchive(nfiles={len(self.files)})" def __next__(self) -> Image | bytes: if self.i >= len(self.files): if self.loop: self.i = 0 else: raise StopIteration im = self._read(self.i) if isinstance(im, np.ndarray): if im.ndim == 3 and not _wants_mono(self.args): im = _make_image( im, id=self.i, name=self.files[self.i], colororder="BGR", **self.args, ) else: im = _make_image(im, id=self.i, name=self.files[self.i], **self.args) self.i += 1 return im def __len__(self) -> int: return len(self.files) def _read(self, i: int) -> np.ndarray | bytes: data = self._archive.read(self.files[i]) img = cv2.imdecode( np.frombuffer(data, np.uint8), cv2.IMREAD_ANYDEPTH | cv2.IMREAD_UNCHANGED ) if img is None: # not an image file, just return the contents return data else: return img def __enter__(self) -> FileArchive: return self def __exit__(self, exc_type, exc, tb) -> None: self._archive.close()
[docs] class ImageCollection(FileCollection): """Deprecated alias for :class:`FileCollection`. :param kwargs: forwarded to :class:`FileCollection` :type kwargs: Any .. deprecated:: 1.1.0 Use :class:`FileCollection` instead. """ def __init__( self, filename: str | None = None, loop: bool = False, **kwargs: Any ) -> None: warnings.warn( "Deprecated in 1.1.0: use FileCollection instead of ImageCollection.", DeprecationWarning, stacklevel=2, ) super().__init__(filename=filename, loop=loop, **kwargs)
[docs] class ZipArchive(FileArchive): """Deprecated alias for :class:`FileArchive`. :param kwargs: forwarded to :class:`FileArchive` :type kwargs: Any .. deprecated:: 1.1.0 Use :class:`FileArchive` instead. """ def __init__( self, filename: str, filter: str | None = None, loop: bool = False, **kwargs: Any, ) -> None: warnings.warn( "Deprecated in 1.1.0: use FileArchive instead of ZipArchive.", DeprecationWarning, stacklevel=2, ) super().__init__(filename=filename, filter=filter, loop=loop, **kwargs)
[docs] class WebCam(ImageSource): """ Iterate images from an internet web camera :param url: URL of the camera :type url: str :param kwargs: options applied to image frames, see :func:`~machinevisiontoolbox.base.imageio.convert` The resulting object is an iterator over the frames returned from the remote camera. The iterator returns :class:`Image` objects. Example: .. code-block:: python from machinevisiontoolbox import WebCam webcam = WebCam('https://webcam.dartmouth.edu/webcam/image.jpg') for image in webcam: # iterate over frames pass alternatively: .. code-block:: python img = next(webcam) # get next frame or using a context manager to ensure the connection is released: .. code-block:: python with WebCam('https://webcam.dartmouth.edu/webcam/image.jpg') as webcam: for image in webcam: # process image .. note:: Manu webcameras accept a query string in the URL to specify image resolution, image format, codec and other parameters. There is no common standard for this, see the manufacturer's datasheet for details. :references: - |RVC3|, Section 11.1.5. :seealso: :func:`~machinevisiontoolbox.base.imageio.convert` `cv2.VideoCapture <https://docs.opencv.org/4.x/d8/dfe/classcv_1_1VideoCapture.html#a57c0e81e83e60f36c83027dc2a188e80>`_ """ url: str args: dict cap: cv2.VideoCapture | None def __init__(self, url: str, **kwargs: Any) -> None: self.url = url self.args = kwargs self.cap = None def __iter__(self) -> WebCam: if self.cap is not None: self.cap.release() self.cap = cv2.VideoCapture(self.url) return self def __next__(self) -> Image: if self.cap is None: self.cap = cv2.VideoCapture(self.url) assert self.cap is not None ret, frame = self.cap.read() if ret is False: self.cap.release() raise StopIteration else: opts: dict[str, Any] = {"rgb": True} opts.update(self.args) if frame.ndim == 3 and not _wants_mono(opts): return _make_image(frame, colororder="RGB", **opts) else: return _make_image(frame, **opts)
[docs] def grab(self) -> Image: """ Grab frame from web camera :return: next frame from the web camera :rtype: :class:`Image` .. deprecated:: 0.11.4 Use :func:`next` on the iterator instead, for example ``next(webcam)``. """ warnings.warn( "WebCam.grab() is deprecated; use next(webcam) instead.", DeprecationWarning, stacklevel=2, ) return next(self)
def __enter__(self) -> WebCam: return self def __exit__(self, exc_type, exc, tb) -> None: if self.cap is not None: self.cap.release() self.cap = None def __repr__(self) -> str: return f"WebCam({self.url})"
# dartmouth = WebCam('https://webcam.dartmouth.edu/webcam/image.jpg')
[docs] class EarthView(ImageSource): """ Iterate images from GoogleEarth :param key: Google API key, defaults to None :type key: str :param type: type of map (API ``maptype``): 'satellite' [default], 'map', 'roads', 'hybrid', and 'terrain'. :type type: str, optional :param zoom: map zoom, defaults to 18 :type zoom: int, optional :param scale: image scale factor: 1 [default] or 2 :type scale: int, optional :param shape: image size (API ``size``), defaults to (500, 500) :type shape: tuple, optional :param kwargs: options applied to image frames, see :func:`~machinevisiontoolbox.base.imageio.convert` The resulting object has a ``grab`` method that returns :class:`Image` objects for a specified position on the planet. ``zoom`` varies from 1 (whole world) to a maximum of 18. The ``type`` argument controls the type of map returned: =============== ======================================================================== ``type`` Returned image =============== ======================================================================== ``"satellite"`` satellite color image from space ``"roadmap"`` a standard roadmap image as normally shown on the Google Maps website ``"map"`` synonym for ``"roadmap"`` ``"hybrid"`` hybrid of the satellite with overlay of roadmap image ``"terrain"`` specifies a physical relief map image, showing terrain and vegetation ``"roads"`` a binary image which is an occupancy grid, roads are free space =============== ======================================================================== Example: .. code-block:: python from machinevisiontoolbox import EarthView earth = EarthView() # create an Earth viewer image = earth.grab(-27.475722, 153.0285, zoom=17) .. warning:: You must have a Google account and a valid key, backed by a credit card, to access this service. `Getting started <https://developers.google.com/maps/documentation/maps-static>`_ .. note:: - If the key is not passed in, a value is sought from the environment variable ``GOOGLE_KEY``. - Uses the `Google Maps Static API <https://developers.google.com/maps/documentation/maps-static/start>`_ :references: - |RVC3|, Section 11.1.6. :seealso: :meth:`grab` :func:`~machinevisiontoolbox.base.imageio.convert` """ key: str | None type: str scale: int zoom: int shape: tuple args: dict def __init__( self, key: str | None = None, type: str = "satellite", zoom: int = 18, scale: int = 1, shape: tuple = (500, 500), **kwargs: Any, ) -> None: if key is None: self.key = os.getenv("GOOGLE_KEY") else: self.key = key self.type = type self.scale = scale self.zoom = zoom self.shape = shape self.args = kwargs
[docs] def grab( self, lat: float, lon: float, zoom: int | None = None, type: str | None = None, scale: int | None = None, shape: tuple | None = None, roadnames: bool = False, placenames: bool = False, ) -> Image: """ Google map view as an image :param lat: latitude (degrees) :type lat: float :param lon: longitude (degrees) :type lon: float :param type: map type, "roadmap", "satellite" [default], "hybrid", and "terrain". :type type: str, optional :param zoom: image zoom factor, defaults to None :type zoom: int, optional :param scale: image scale factor: 1 [default] or 2 :type scale: int, optional :param shape: image shape (width, height), defaults to None :type shape: array_like(2), optional :param roadnames: show roadnames, defaults to False :type roadnames: bool, optional :param placenames: show place names, defaults to False :type placenames: bool, optional :return: Google map view :rtype: :class:`Image` If parameters are not given the values provided to the constructor are taken as defaults. .. note:: The returned image may have an alpha plane. """ if type is None: type = self.type if scale is None: scale = self.scale if zoom is None: zoom = self.zoom if shape is None: shape = self.shape # type is one of: satellite map hybrid terrain roadmap roads occggrid = False if type == "map": type = "roadmap" elif type == "roads": type = "roadmap" occggrid = True # https://developers.google.com/maps/documentation/maps-static/start#URL_Parameters # now read the map url = f"https://maps.googleapis.com/maps/api/staticmap?center={lat},{lon}&zoom={zoom}&size={shape[0]}x{shape[1]}&scale={scale}&format=png&maptype={type}&key={self.key}&sensor=false" opturl = [] if roadnames: opturl.append("style=feature:road|element:labels|visibility:off") if placenames: opturl.append( "style=feature:administrative|element:labels.text|visibility:off&style=feature:poi|visibility:off" ) if occggrid: opturl.extend( [ "style=feature:landscape|element:geometry.fill|color:0x000000|visibility:on", "style=feature:landscape|element:labels|visibility:off", "style=feature:administrative|visibility:off", "style=feature:road|element:geometry|color:0xffffff|visibility:on", "style=feature:road|element:labels|visibility:off", "style=feature:poi|element:all|visibility:off", "style=feature:transit|element:all|visibility:off", "style=feature:water|element:all|visibility:off", ] ) if len(opturl) > 0: url += "&" + "&".join(opturl) image_data, _ = iread(url) if isinstance(image_data, list): image = image_data[0] else: image = image_data if image.shape[2] == 4: colororder = "RGBA" elif image.shape[2] == 3: colororder = "RGB" else: colororder = None if colororder is not None and not _wants_mono(self.args): return _make_image(image, colororder=colororder, **self.args) return _make_image(image, **self.args)
def __repr__(self) -> str: return f"EarthView(type={self.type}, zoom={self.zoom}, scale={self.scale}, shape={tuple(self.shape)})"
# ROS PointCloud2 field datatype constants → (numpy dtype, byte size) _PC2_DTYPE = { 1: (np.int8, 1), 2: (np.uint8, 1), 3: (np.int16, 2), 4: (np.uint16, 2), 5: (np.int32, 4), 6: (np.uint32, 4), 7: (np.float32, 4), 8: (np.float64, 8), } def _resolve_typestore(release: str): """Return a typestore for the given ROS release name. The *release* string is matched case-insensitively against :class:`~rosbags.typesys.Stores` member names, so short names such as ``"noetic"`` or ``"humble"`` are accepted in addition to the full enum name (e.g. ``"ROS1_NOETIC"``). :param release: release name, or substring thereof :type release: str :raises ImportError: if the ``rosbags`` package is not installed :raises ValueError: if the name matches no enum member, or is ambiguous :return: configured typestore """ if not _rosbags_available: raise ImportError( "rosbags is required for ROS bag support. " "Install it with: pip install rosbags " "or pip install machinevision-toolbox-python[ros]" ) assert _Stores is not None assert _get_typestore is not None key = release.upper() matches = [s for s in _Stores if key in s.name] if not matches: valid = [s.name for s in _Stores] raise ValueError(f"Unknown release {release!r}. Valid options: {valid}") if len(matches) > 1: exact = [s for s in matches if s.name == key] if exact: return _get_typestore(exact[0]) ambiguous = [s.name for s in matches] raise ValueError(f"Ambiguous release {release!r}, matches: {ambiguous}") return _get_typestore(matches[0]) @dataclass(slots=True, frozen=True) class ROSMessage: """ Normalised ROS message sample. :param topic: topic name :type topic: str :param msgtype: ROS message type string :type msgtype: str :param timestamp: timestamp in nanoseconds since Unix epoch :type timestamp: int :param data: decoded message payload :type data: Any """ topic: str msgtype: str timestamp: int data: Any
[docs] class ROSTopic(ImageSource): """ Iterate images from a live ROS topic via a rosbridge WebSocket. :param topic: ROS topic name, e.g. ``"/camera/image/compressed"`` :type topic: str :param message: ROS message type, defaults to ``"sensor_msgs/CompressedImage"`` :type message: str, optional :param host: hostname or IP address of the rosbridge server, defaults to ``"localhost"`` :type host: str, optional :param port: rosbridge WebSocket port, defaults to 9090 :type port: int, optional :param subscribe: if ``True`` (default) subscribe to incoming messages; set ``False`` for publish-only use :type subscribe: bool, optional :param output: output mode, ``"image"`` yields :class:`Image` and ``"message"`` yields :class:`ROSMessage`, defaults to ``"image"`` :type output: str, optional :param blocking: if ``True`` (default) :meth:`__next__` blocks until a new frame arrives; if ``False`` it returns the most recently received frame immediately (or blocks until the very first frame is available) :type blocking: bool, optional :param rgb: if ``True`` (default) return RGB images; if ``False`` return BGR :type rgb: bool, optional :param kwargs: options applied to image frames, see :func:`~machinevisiontoolbox.base.imageio.convert` :raises ImportError: if the ``roslibpy`` package is not installed In subscribe mode, the object is an iterator that yields :class:`Image` instances (``output="image"``) or :class:`ROSMessage` instances (``output="message"``) as they arrive from the topic. Use it as a context manager to ensure the connection is always closed: .. code-block:: python with ROSTopic("/camera/image/compressed", host="192.168.1.10") as stream: for img in stream: img.disp() alternatively fetch a single frame with :func:`next`: .. code-block:: python stream = ROSTopic("/camera/image/compressed") img = next(stream) stream.release() For publish-only use, disable subscription setup and call :meth:`publish`: .. code-block:: python pub = ROSTopic("/cmd_topic", message="std_msgs/String", subscribe=False) pub.publish({"data": "hello"}) pub.release() **Message timing** Messages are timestamped at the ROS publisher (header stamp), received by rosbridge, then forwarded over WebSocket to this object. The callback stores only the most recent decoded frame and timestamp. With ``blocking=True``, each :meth:`__next__` call waits until a newer message arrives and then returns it. With ``blocking=False``, :meth:`__next__` returns immediately after the first message has arrived, returning the current most recent message, which may be the same frame as a previous call if no new message has arrived in the meantime. .. note:: This class use ``roslibpy``, which uses WebSockets to connect to a running rosbridge 2.0 server. It does not require a local ROS environment, allowing usage from platforms other than Linux. """ host: str topic: str message: str port: int args: dict def __init__( self, topic: str, message: str = "sensor_msgs/CompressedImage", host: str = "localhost", port: int = 9090, subscribe: bool = True, output: Literal["image", "message"] = "image", blocking: bool = True, rgb: bool = True, **kwargs: Any, ) -> None: if not _roslibpy_available: raise ImportError( "roslibpy is required for ROS streaming support. " "Install it with: pip install roslibpy " "or pip install machinevision-toolbox-python[ros]" ) assert roslibpy is not None self.host = host self.topic = topic self.message = message self.port = port self._subscribe = subscribe self._output = output self._blocking = blocking self._rgb = rgb self.args = kwargs self._compressed = "Compressed" in message if output not in {"image", "message"}: raise ValueError("output must be 'image' or 'message'") if output == "image" and not self._is_image_message_type(message): raise ValueError( "output='image' requires sensor_msgs/Image or " "sensor_msgs/CompressedImage" ) self._latest_frame: np.ndarray | None = None self._latest_message: ROSMessage | None = None self._latest_timestamp: int | None = None self._frame_event = threading.Event() if subscribe else None self._advertised = False self.i = 0 self.client = roslibpy.Ros(host=host, port=port) self.client.run() self._topic = roslibpy.Topic(self.client, topic, message) if subscribe: self._topic.subscribe(self._process_frame) @staticmethod def _is_image_message_type(message: str) -> bool: return message in { "sensor_msgs/CompressedImage", "sensor_msgs/Image", "sensor_msgs/msg/CompressedImage", "sensor_msgs/msg/Image", } @staticmethod def _is_pointcloud_message_type(message: str) -> bool: return message in { "sensor_msgs/PointCloud2", "sensor_msgs/msg/PointCloud2", } @staticmethod def _stamp_dict(timestamp_ns: int | None = None) -> dict[str, int]: if timestamp_ns is None: timestamp_ns = time.time_ns() sec, nsec = divmod(int(timestamp_ns), 1_000_000_000) return {"secs": int(sec), "nsecs": int(nsec)} @staticmethod def _image_from_mvtb(img: Image) -> tuple[np.ndarray, str]: arr = img.array if arr.ndim == 2: return np.ascontiguousarray(arr), "mono" if arr.ndim != 3: raise ValueError(f"Image array must be 2D or 3D, got shape {arr.shape}") if img.isrgb: return np.ascontiguousarray(img.rgb), "rgb" if img.isbgr: return np.ascontiguousarray(img.bgr), "bgr" if arr.shape[2] == 3: return np.ascontiguousarray(arr), "rgb" if arr.shape[2] == 4: return np.ascontiguousarray(arr), "rgba" raise ValueError( "Unsupported colour image format; expected 3-plane RGB/BGR " "or 4-plane RGBA/BGRA" ) def _image_to_ros_message( self, img: Image, timestamp_ns: int | None = None ) -> dict: arr, order = self._image_from_mvtb(img) if timestamp_ns is None: timestamp_ns = getattr(img, "timestamp", None) stamp = self._stamp_dict(timestamp_ns) if self._compressed: if arr.ndim == 2: bgr = cv2.cvtColor(arr, cv2.COLOR_GRAY2BGR) encoding = "mono8" if arr.dtype == np.uint8 else "mono16" elif arr.shape[2] == 4: if order == "rgba": bgr = cv2.cvtColor(arr, cv2.COLOR_RGBA2BGR) encoding = "rgba8" else: bgr = cv2.cvtColor(arr, cv2.COLOR_BGRA2BGR) encoding = "bgra8" elif order == "rgb": bgr = cv2.cvtColor(arr, cv2.COLOR_RGB2BGR) encoding = "rgb8" else: bgr = arr encoding = "bgr8" ok, enc = cv2.imencode(".jpg", bgr) if not ok: raise ValueError("Failed to JPEG-compress Image for ROS publish") return { "header": {"stamp": stamp, "frame_id": self.topic}, "format": f"jpeg; {encoding}", "data": base64.b64encode(enc.tobytes()).decode("ascii"), } h, w = arr.shape[:2] channels = 1 if arr.ndim == 2 else int(arr.shape[2]) dtype = arr.dtype if channels == 1: if dtype == np.uint8: encoding = "mono8" elif dtype == np.uint16: encoding = "mono16" elif dtype == np.float32: encoding = "32FC1" else: raise ValueError(f"Unsupported greyscale dtype for ROS Image: {dtype}") elif channels == 3: if dtype != np.uint8: raise ValueError("ROS 3-plane images must be uint8") encoding = "rgb8" if order == "rgb" else "bgr8" elif channels == 4: if dtype != np.uint8: raise ValueError("ROS 4-plane images must be uint8") encoding = "rgba8" if order == "rgba" else "bgra8" else: raise ValueError(f"Unsupported channel count for ROS Image: {channels}") arr_contig = np.ascontiguousarray(arr) step = int(arr_contig.strides[0]) return { "header": {"stamp": stamp, "frame_id": self.topic}, "height": int(h), "width": int(w), "encoding": encoding, "is_bigendian": int(arr_contig.dtype.byteorder == ">"), "step": step, "data": base64.b64encode(arr_contig.tobytes()).decode("ascii"), } def _pointcloud_to_ros_message( self, pc: PointCloud, timestamp_ns: int | None = None ) -> dict: if not _open3d_available: raise ImportError( "open3d is required for PointCloud publish support. " "Install it with: pip install open3d-python " "or pip install machinevision-toolbox-python[open3d]" ) assert o3d is not None points = np.asarray(pc._pcd.points) if points.ndim != 2 or points.shape[1] != 3: raise ValueError("PointCloud must contain Nx3 points") n = int(points.shape[0]) if timestamp_ns is None: try: timestamp_ns = pc.timestamp except (AttributeError, ValueError): timestamp_ns = None if not isinstance(timestamp_ns, (int, np.integer)): timestamp_ns = None stamp = self._stamp_dict( int(timestamp_ns) if timestamp_ns is not None else None ) has_color = pc._pcd.has_colors() if has_color: colors = np.asarray(pc._pcd.colors) if colors.shape != points.shape: raise ValueError("PointCloud colour array must match point array shape") cloud = np.empty( n, dtype=[("x", "<f4"), ("y", "<f4"), ("z", "<f4"), ("rgb", "<f4")] ) cloud["x"] = points[:, 0].astype(np.float32, copy=False) cloud["y"] = points[:, 1].astype(np.float32, copy=False) cloud["z"] = points[:, 2].astype(np.float32, copy=False) rgb_u8 = np.clip(np.round(colors * 255.0), 0, 255).astype(np.uint8) rgb_packed = ( (rgb_u8[:, 0].astype(np.uint32) << 16) | (rgb_u8[:, 1].astype(np.uint32) << 8) | rgb_u8[:, 2].astype(np.uint32) ) cloud["rgb"] = rgb_packed.view(np.float32) fields = [ {"name": "x", "offset": 0, "datatype": 7, "count": 1}, {"name": "y", "offset": 4, "datatype": 7, "count": 1}, {"name": "z", "offset": 8, "datatype": 7, "count": 1}, {"name": "rgb", "offset": 12, "datatype": 7, "count": 1}, ] else: cloud = np.empty(n, dtype=[("x", "<f4"), ("y", "<f4"), ("z", "<f4")]) cloud["x"] = points[:, 0].astype(np.float32, copy=False) cloud["y"] = points[:, 1].astype(np.float32, copy=False) cloud["z"] = points[:, 2].astype(np.float32, copy=False) fields = [ {"name": "x", "offset": 0, "datatype": 7, "count": 1}, {"name": "y", "offset": 4, "datatype": 7, "count": 1}, {"name": "z", "offset": 8, "datatype": 7, "count": 1}, ] point_step = int(cloud.dtype.itemsize) data = cloud.tobytes() return { "header": {"stamp": stamp, "frame_id": self.topic}, "height": 1, "width": n, "fields": fields, "is_bigendian": False, "point_step": point_step, "row_step": point_step * n, "data": base64.b64encode(data).decode("ascii"), "is_dense": bool(np.isfinite(points).all()), } def _process_frame(self, msg: dict) -> None: """Callback invoked by roslibpy on each incoming message.""" stamp = msg.get("header", {}).get("stamp", {}) sec = stamp.get("secs", stamp.get("sec", 0)) nsec = stamp.get("nsecs", stamp.get("nanosec", 0)) timestamp = int(sec) * 1_000_000_000 + int(nsec) if timestamp == 0: timestamp = time.time_ns() self._latest_timestamp = timestamp if self._output == "message": self.i += 1 self._latest_message = ROSMessage( topic=self.topic, msgtype=self.message, timestamp=timestamp, data=msg, ) if self._frame_event is not None: self._frame_event.set() return img_data = base64.b64decode(msg["data"]) if not self._compressed: # Uncompressed sensor_msgs/Image: raw pixel data with metadata h = msg["height"] w = msg["width"] encoding = msg["encoding"] # Map ROS encoding to numpy dtype and channel count _enc = { "rgb8": (np.uint8, 3), "bgr8": (np.uint8, 3), "rgba8": (np.uint8, 4), "bgra8": (np.uint8, 4), "mono8": (np.uint8, 1), "mono16": (np.uint16, 1), "16UC1": (np.uint16, 1), "32FC1": (np.float32, 1), } dtype, channels = _enc.get(encoding, (np.uint8, 3)) arr = np.frombuffer(img_data, dtype=dtype) frame = ( arr.reshape((h, w, channels)) if channels > 1 else arr.reshape((h, w)) ) # Normalise to BGR uint8 for consistency with the compressed path if encoding in ("rgb8", "rgba8"): frame = cv2.cvtColor( frame, cv2.COLOR_RGB2BGR if channels == 3 else cv2.COLOR_RGBA2BGR ) elif encoding == "mono8": frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) elif encoding == "mono16": # Preserve full 16-bit depth precision. frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) else: # Compressed sensor_msgs/CompressedImage: JPEG or PNG bytes np_arr = np.frombuffer(img_data, np.uint8) frame = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) # BGR ndarray if frame is None: return self.i += 1 self._latest_frame = frame if self._frame_event is not None: self._frame_event.set() def __iter__(self) -> ROSTopic: if not self._subscribe: raise TypeError("ROSTopic is publish-only (subscribe=False)") self.i = 0 return self def __len__(self) -> int: """Return length of stream. Live ROS topics are unbounded streams, so they do not have a finite length. """ raise TypeError("ROSTopic is a live stream and has no finite length") def __next__(self) -> Image | ROSMessage: if not self._subscribe or self._frame_event is None: raise TypeError("ROSTopic is publish-only (subscribe=False)") if self._blocking: # wait for a *new* frame to arrive, then clear for the next call self._frame_event.wait() self._frame_event.clear() else: # non-blocking: return immediately once the first frame has arrived self._frame_event.wait() if self._output == "message": if self._latest_message is None: raise StopIteration return self._latest_message if self._latest_frame is None: raise StopIteration opts: dict[str, Any] = {"rgb": self._rgb, "copy": True} opts.update(self.args) if self._rgb: if self._latest_frame.ndim == 3 and not _wants_mono(opts): img = _make_image( self._latest_frame, id=self.i, colororder="RGB", **opts ) else: img = _make_image(self._latest_frame, id=self.i, **opts) else: if self._latest_frame.ndim == 3 and not _wants_mono(opts): img = _make_image( self._latest_frame, id=self.i, colororder="BGR", **opts ) else: img = _make_image(self._latest_frame, id=self.i, **opts) if self._latest_timestamp is None: raise StopIteration _set_sample_metadata(img, int(self._latest_timestamp), self.topic) return img
[docs] def publish( self, msg: dict | Image | PointCloud, timestamp_ns: int | None = None ) -> None: """ Publish a message on the configured ROS topic. :param msg: ROS payload as a message dictionary, :class:`Image`, or :class:`PointCloud` :type msg: dict, :class:`Image`, or :class:`PointCloud` :param timestamp_ns: optional timestamp in nanoseconds since Unix epoch; used for generated ROS headers. If None, message/object timestamp is used when present, otherwise current wall-clock time is used. :type timestamp_ns: int or None Dictionary payloads are forwarded directly and must match the configured ROS message type. If ``msg`` is an :class:`Image`, it is serialised according to the configured topic message type: - ``sensor_msgs/Image`` or ``sensor_msgs/msg/Image`` for raw image data - ``sensor_msgs/CompressedImage`` or ``sensor_msgs/msg/CompressedImage`` for JPEG-compressed image data Colour plane order is encoded in the ROS ``encoding`` field for raw images, and in the compressed-message ``format`` string for compressed images. If ``msg`` is a :class:`PointCloud`, it is published as ``sensor_msgs/PointCloud2`` with ``x``, ``y``, ``z`` fields and an ``rgb`` field when colour data is present. Example publishing a ``std_msgs/String`` message: .. code-block:: python pub = ROSTopic("/cmd_text", message="std_msgs/String", subscribe=False) pub.publish({"data": "hello"}) pub.release() Example publishing a ``geometry_msgs/Twist`` message: .. code-block:: python pub = ROSTopic("/cmd_vel", message="geometry_msgs/Twist", subscribe=False) pub.publish( { "linear": {"x": 0.2, "y": 0.0, "z": 0.0}, "angular": {"x": 0.0, "y": 0.0, "z": 0.5}, } ) pub.release() Example publishing an :class:`Image` with an explicit timestamp: .. code-block:: python img = _make_image(np.zeros((240, 320, 3), dtype=np.uint8), colororder="RGB") pub = ROSTopic("/camera/image_raw", message="sensor_msgs/Image", subscribe=False) pub.publish(img, timestamp_ns=1_700_000_000_123_456_789) pub.release() Example publishing a :class:`PointCloud` with an explicit timestamp: .. code-block:: python points = np.array([[0.0, 1.0], [0.0, 0.2], [1.0, 1.2]], dtype=np.float32) pc = PointCloud(points) pub = ROSTopic("/cloud", message="sensor_msgs/PointCloud2", subscribe=False) pub.publish(pc, timestamp_ns=1_700_000_000_223_456_789) pub.release() The topic is advertised lazily on first publish. """ if isinstance(msg, Image): if not self._is_image_message_type(self.message): raise TypeError( "Image payload requires sensor_msgs/Image or " "sensor_msgs/CompressedImage topic type" ) payload = self._image_to_ros_message(msg, timestamp_ns=timestamp_ns) elif isinstance(msg, PointCloud): if not self._is_pointcloud_message_type(self.message): raise TypeError( "PointCloud payload requires sensor_msgs/PointCloud2 topic type" ) payload = self._pointcloud_to_ros_message(msg, timestamp_ns=timestamp_ns) elif isinstance(msg, dict): payload = msg else: raise TypeError("msg must be a dict, Image, or PointCloud") if not self._advertised: self._topic.advertise() self._advertised = True self._topic.publish(payload)
[docs] def release(self) -> None: """ Close the ROS topic and terminate the WebSocket connection. """ if self._subscribe: self._topic.unsubscribe() if self._advertised: self._topic.unadvertise() self._advertised = False self.client.terminate()
[docs] def grab(self) -> Image | ROSMessage: """ Grab a single frame from the ROS topic. :return: next frame from the topic :rtype: :class:`Image` or :class:`ROSMessage` .. deprecated:: 0.11.4 Use :func:`next` on the iterator instead, for example ``next(stream)``. """ if not self._subscribe: raise TypeError("ROSTopic is publish-only (subscribe=False)") warnings.warn( "ROSTopic.grab() is deprecated; use next(stream) instead.", DeprecationWarning, stacklevel=2, ) return next(self)
def __enter__(self) -> ROSTopic: return self def __exit__(self, exc_type, exc, tb) -> None: self.release() def __repr__(self) -> str: if self._subscribe: mode = "blocking" if self._blocking else "latest" else: mode = "publish" return f"ROSTopic({self.topic!r}, host={self.host!r}, port={self.port}, {mode})" @property def width(self) -> int | None: """ Width of the most recently received frame. :return: width in pixels, or ``None`` if no frame has been received yet :rtype: int or None :seealso: :meth:`height` :meth:`shape` """ if self._latest_frame is None: return None return self._latest_frame.shape[1] @property def height(self) -> int | None: """ Height of the most recently received frame. :return: height in pixels, or ``None`` if no frame has been received yet :rtype: int or None :seealso: :meth:`width` :meth:`shape` """ if self._latest_frame is None: return None return self._latest_frame.shape[0] @property def shape(self) -> tuple[int, int] | None: """ Shape of the most recently received frame. :return: ``(height, width)`` in pixels, or ``None`` if no frame has been received yet :rtype: tuple(int, int) or None :seealso: :meth:`height` :meth:`width` """ if self._latest_frame is None: return None return self._latest_frame.shape[:2] # (height, width)
class SyncROSStreams: """ Synchronise multiple :class:`ROSTopic` objects by timestamp. :param streams: two or more ROS streams to synchronise :type streams: list of :class:`ROSTopic` :param tolerance: maximum timestamp mismatch in seconds for a matched set, defaults to 0.02 :type tolerance: float, optional :raises ValueError: if fewer than two streams are provided :raises ValueError: if ``tolerance`` is not positive :raises TypeError: if any stream item does not have a ``timestamp`` attribute in nanoseconds :return: iterator yielding tuples of time-aligned items :rtype: tuple The matcher uses approximate-time synchronisation. At each step it compares the oldest buffered frame from each stream; if their timestamps are within ``tolerance`` they are emitted together. Otherwise it discards the single oldest frame and continues. Example: .. code-block:: python rgb = ROSTopic("/camera/color/image_raw/compressed") depth = ROSTopic("/camera/depth/image_rect_raw/compressed") with SyncROSStreams([rgb, depth], tolerance=0.03) as sync: for rgb_im, depth_im in sync: # process aligned pair pass **Interaction with ``ROSTopic`` blocking mode** For time-step synchronisation, set each input stream to ``blocking=True``. This gives one newly arrived frame per stream pull and avoids repeated reuse of stale frames. If any stream uses ``blocking=False``, repeated calls can return the same latest frame multiple times, which may lead to duplicate tuples, tighter polling loops, and weaker one-sample-per-time-step behaviour. It remains useful for low-latency latest-state fusion, but is less suitable for strict frame-by-frame synchronisation. """ streams: list[ROSTopic] tolerance: float _tol_ns: int def __init__(self, streams: list[ROSTopic], tolerance: float = 0.02) -> None: if len(streams) < 2: raise ValueError("SyncROSStreams requires at least two streams") if tolerance <= 0: raise ValueError("tolerance must be > 0") if not all(stream._subscribe for stream in streams): raise ValueError("All streams must be in subscribe mode (subscribe=True)") self.streams = streams self.tolerance = tolerance self._tol_ns = int(round(tolerance * 1_000_000_000)) self._iters = [] self._buffers = [] @staticmethod def _timestamp_ns(item: Any) -> int: ts = getattr(item, "timestamp", None) if ts is None: raise TypeError("Stream item has no timestamp attribute") return int(ts) def __iter__(self) -> SyncROSStreams: self._iters = [iter(stream) for stream in self.streams] self._buffers = [deque() for _ in self.streams] return self def _fill_one(self, i: int) -> None: if not self._buffers[i]: item = next(self._iters[i]) self._timestamp_ns(item) self._buffers[i].append(item) def __next__(self) -> tuple[Any, ...]: while True: for i in range(len(self.streams)): self._fill_one(i) heads = [buf[0] for buf in self._buffers] times = [self._timestamp_ns(item) for item in heads] t_min = min(times) t_max = max(times) if t_max - t_min <= self._tol_ns: matched = tuple(buf.popleft() for buf in self._buffers) return matched oldest_index = times.index(t_min) self._buffers[oldest_index].popleft() def __enter__(self) -> SyncROSStreams: for stream in self.streams: stream.__enter__() return self def __exit__(self, exc_type, exc, tb) -> None: for stream in self.streams: stream.__exit__(exc_type, exc, tb) def __str__(self) -> str: topics = ", ".join( getattr(stream, "topic", "<unknown>") for stream in self.streams ) return f"SyncROSStreams([{topics}])" def __repr__(self) -> str: return ( f"SyncROSStreams(nstreams={len(self.streams)}, " f"tolerance={self.tolerance:.6f}s)" )
[docs] class ROSBag(ImageSource): """ Iterate images and point clouds from a ROS 1 bag file. :param filename: path to a ``.bag`` file, or an ``http(s)://`` URL :type filename: str or Path :param release: ROS release name (or unique substring), e.g. ``"noetic"``, defaults to ``"ROS1_NOETIC"``; controls the message definitions used to parse the bag file :type release: str :param topicfilter: only yield messages from this topic or list of topics; ``None`` accepts all topics :type topicfilter: str or list of str or None :param msgfilter: only yield messages whose type contains this substring or matches any entry in the list; ``None`` accepts all types, defaults to ``"Image"`` :type msgfilter: str or list of str or None :param dtype: override the numpy dtype for all image pixel data, or by topic ``{topic: dtype}`` :type dtype: str or dict :param colororder: override the colour-plane order for all image, or by topic ``{topic: colororder}`` :type colororder: str or dict or None :param kwargs: options applied to image frames, see :func:`~machinevisiontoolbox.base.imageio.convert` :raises ImportError: if the ``rosbags`` package is not installed The resulting object is an iterator that yields: - :class:`Image` for messages whose type ends in ``Image``, which includes ``CompressedImage``. - :class:`PointCloud` for ``PointCloud2`` messages (requires ``open3d``) - the raw deserialised message object for all other types Each yielded object carries a ``timestamp`` attribute (ROS nanosecond epoch from the message header) and a ``topic`` attribute (the topic on which it was published). **Usage modes** *Implicit* — iterating directly over the object opens and closes the bag file automatically around the loop: .. code-block:: python from machinevisiontoolbox import ROSBag for img in ROSBag("mybag.bag", release="noetic"): img.disp() *Explicit context manager* — use a ``with`` statement when you need to make multiple passes over the bag, call helper methods such as :meth:`topics` or :meth:`print`, or simply want a guaranteed close even if an exception is raised: .. code-block:: python bag = ROSBag("mybag.bag", release="noetic", msgfilter=None) with bag: bag.print() # inspect topics for msg in bag: # iterate messages print(msg.topic, msg.timestamp) .. note:: ``filename`` may be an ``http://`` or ``https://`` URL, in which case the bag file is downloaded to a temporary file on first use and that file is reused for the lifetime of the ``ROSBag`` object. The temporary file is deleted automatically when the object is garbage collected or when the script exits. .. note:: If ``filename`` is a relative path that does not exist in the current working directory, it is looked up in the ``mvtb-data`` companion package automatically. Bag files placed there can therefore be referenced by their bare name, e.g. ``ROSBag("forest.bag")``. .. note:: The ``release`` argument controls the ROS message definitions used to parse the bag file. This is important because message definitions can change between ROS releases, and using the wrong definitions can lead to incorrect parsing of the data. The ``release`` string is matched case-insensitively against the member names of :class:`~rosbags.typesys.Stores`, so short names such as ``"noetic"`` or ``"humble"`` are accepted in addition to the full enum name (e.g. ``"ROS1_NOETIC"``). """ filename: str release: str dtype: np.dtype | str | dict | None colororder: str | dict | None verbose: bool args: dict def __init__( self, filename: str, release: str = "ROS1_NOETIC", topicfilter: str | list[str] | None = None, msgfilter: str | list[str] | None = "Image", dtype: np.dtype | str | dict | None = None, colororder: str | dict | None = None, verbose: bool = False, **kwargs: Any, ) -> None: if not _rosbags_available: raise ImportError( "rosbags is required for ROS bag support. " "Install it with: pip install rosbags " "or pip install machinevision-toolbox-python[ros]" ) self.filename = filename self.release = release self._topic_filter = topicfilter self._msgfilter = msgfilter self.verbose = verbose self.reader = None self.connections = [] self.typestore = _resolve_typestore(release) self.dtype = dtype self.colororder = colororder self.args = kwargs self._tmpfile: str | None = None self._is_ros2: bool = False def __repr__(self) -> str: return ( f"ROSBag({str(self.filename)!r}, release={self.release!r}, " f"topicfilter={self._topic_filter!r}, msgfilter={self._msgfilter!r})" ) def __str__(self) -> str: return f"ROSBag({str(self.filename)!r})"
[docs] @staticmethod def format_duration(duration_ns: int) -> str: """Format a duration in nanoseconds as ``hh:mm:ss``. :param duration_ns: duration in nanoseconds :type duration_ns: int :return: formatted duration string :rtype: str """ total_s = int(duration_ns // 1_000_000_000) h = total_s // 3600 m = total_s % 3600 // 60 s = total_s % 60 return f"{h:02d}:{m:02d}:{s:02d}"
[docs] @staticmethod def format_local_time(timestamp_ns: int, fmt: str | None = None) -> str: """Format a ROS timestamp to a local-time string. :param timestamp_ns: ROS timestamp in nanoseconds since the Unix epoch (UTC) :type timestamp_ns: int :param fmt: :func:`~datetime.datetime.strftime` format string; defaults to ISO 8601 with millisecond precision :type fmt: str or None :return: formatted time string :rtype: str :seealso: `strftime format codes <https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes>`_ """ dt = datetime.fromtimestamp( timestamp_ns / 1_000_000_000, tz=timezone.utc ).astimezone() return dt.strftime(fmt) if fmt else dt.isoformat(timespec="milliseconds")
@property def topicfilter(self) -> str | list[str] | None: """Topic filter applied when iterating. The iterator only returns messages from topics that match the filter. A string matches any topic whose name *contains* that string. A list matches any topic containing *any* of the strings. ``None`` accepts all topics. :type: str, list of str, or None :seealso: :attr:`msgfilter` """ return self._topic_filter @topicfilter.setter def topicfilter(self, topicfilter: str | list[str] | None) -> None: self._topic_filter = topicfilter @property def msgfilter(self) -> str | list[str] | None: """Message-type filter applied when iterating. The iterator only returns messages from message types that match the filter. A string matches any message type whose name *contains* that string. A list matches any message type containing *any* of the strings. ``None`` accepts all message types. :type: str, list of str, or None :seealso: :attr:`topicfilter` """ return self._msgfilter @msgfilter.setter def msgfilter(self, msgfilter: str | list[str] | None) -> None: self._msgfilter = msgfilter def _allowed(self, x) -> bool: msg_ok = ( self._msgfilter is None or ( isinstance(self._msgfilter, list) and any(f in x.msgtype for f in self._msgfilter) ) or (isinstance(self._msgfilter, str) and self._msgfilter in x.msgtype) ) topic_ok = ( self._topic_filter is None or ( isinstance(self._topic_filter, list) and any(f in x.topic for f in self._topic_filter) ) or (isinstance(self._topic_filter, str) and self._topic_filter in x.topic) ) return msg_ok and topic_ok @staticmethod def _stamp_to_ns(stamp) -> int: """ Robustly converts a ROS1 or ROS2 stamp to total nanoseconds as an integer. Handles 'sec/nanosec' (ROS2) and 'secs/nsecs' (ROS1) naming. """ # Try ROS2/rosbags naming first, fall back to ROS1 naming sec = getattr(stamp, "sec", getattr(stamp, "secs", 0)) nanosec = getattr(stamp, "nanosec", getattr(stamp, "nsecs", 0)) return int(sec) * 1_000_000_000 + int(nanosec) def __del__(self) -> None: self._close_reader() tmpfile = getattr(self, "_tmpfile", None) if tmpfile is not None: from pathlib import Path as _Path _Path(tmpfile).unlink(missing_ok=True) self._tmpfile = None def _close_reader(self) -> None: reader = getattr(self, "reader", None) if reader is not None: reader.close() self.reader = None def _open_reader(self): if self.reader is not None: return self.reader filename = str(self.filename) if filename.startswith(("http://", "https://")): if self._tmpfile is None: with tempfile.NamedTemporaryFile(suffix=".bag", delete=False) as tmp: self._tmpfile = tmp.name print(f"Downloading {filename} ...") urllib.request.urlretrieve(filename, self._tmpfile) path = self._tmpfile else: path = self.filename from pathlib import Path as _BagPath # Resolve via mvtb_path_to_datafile: returns path unchanged if the file # exists locally, otherwise searches the mvtb-data companion package. try: path = mvtb_path_to_datafile("data", path) except (ValueError, ModuleNotFoundError): pass # fall through; rosbags will raise a clear error on open self._is_ros2 = _BagPath(path).is_dir() assert _RosBagReader1 is not None assert _RosBagReader2 is not None self.reader = (_RosBagReader2 if self._is_ros2 else _RosBagReader1)(path) self.reader.open() self.connections = [x for x in self.reader.connections if self._allowed(x)] return self.reader
[docs] def topics(self) -> dict[str, str]: """Return topics found in the ROS bag. :return: mapping of topic name to message type :rtype: dict Returns a dictionary mapping topic names to message types for all topics found in the bag file. The message types are returned as strings, e.g. ``"sensor_msgs/Image"``. """ reader = self._open_reader() topicdict = {conn.topic: conn.msgtype for conn in reader.connections} self._close_reader() return topicdict
[docs] def traffic(self, progress: bool = True) -> Counter: """Message counts by type found in the ROS bag. :param progress: show a tqdm progress bar while scanning, defaults to ``True`` :type progress: bool :return: mapping of message type to count :rtype: :class:`~collections.Counter` Scans through the bag file and counts the number of messages of each type. For a large bag file this can take some time, so a progress bar is shown if ``progress`` is ``True``. """ reader = self._open_reader() counts = Counter( conn.msgtype for conn, _ts, _raw in tqdm( reader.messages(), total=reader.message_count, desc="scanning", unit="msg", disable=not progress, ) ) self._close_reader() return counts
[docs] def print(self, progress: bool = True, file=None) -> None: """Print a summary table of topics in the ROS bag. :param progress: show a tqdm progress bar while scanning, defaults to ``True`` :type progress: bool :param file: file to write output to, defaults to ``None`` :type file: file-like object, optional Print a human-readable summary of the topics in the ROS bag, showing the message type, total message count, and whether it passes the current topic and message filters. """ reader = self._open_reader() print( f"recorded on {self.format_local_time(reader.start_time)}, duration {self.format_duration(reader.duration)}, {reader.message_count} messages", file=file, ) table = ANSITable( Column("topic", colalign="<", headalign="^"), Column("msgtype", colalign="<", headalign="^"), Column("count", colalign=">", headalign="^"), Column("allowed", colalign="^", headalign="^"), border="thin", ) counts = self.traffic(progress=progress) for topic, msgtype in self.topics().items(): conn_stub = type("_C", (), {"topic": topic, "msgtype": msgtype}) allowed = self._allowed(conn_stub) table.row( topic, msgtype, counts[msgtype], "✓" if allowed else "✗", style="bold" if allowed else None, ) print(table, file=file)
def __enter__(self) -> ROSBag: self._open_reader() return self def __exit__(self, exc_type, exc, tb) -> None: self._close_reader() # Map of ROS encoding strings to (numpy_dtype, channels, color_plane_order) _ROS_ENCODING_MAP = { # Common Color Encodings "rgb8": (np.uint8, 3, "rgb"), "rgba8": (np.uint8, 4, "rgba"), "rgb16": (np.uint16, 3, "rgb"), "rgba16": (np.uint16, 4, "rgba"), "bgr8": (np.uint8, 3, "bgr"), "bgra8": (np.uint8, 4, "bgra"), "bgr16": (np.uint16, 3, "bgr"), "bgra16": (np.uint16, 4, "bgra"), "mono8": (np.uint8, 1, None), "mono16": (np.uint16, 1, None), # Generic Encodings (Bits, Type, Channels) # 8-bit "8UC1": (np.uint8, 1, None), "8UC2": (np.uint8, 2, None), # Usually complex or stereo "8UC3": (np.uint8, 3, "rgb"), # Standard ROS assumption "8UC4": (np.uint8, 4, "rgba"), "8SC1": (np.int8, 1, None), # 16-bit "16UC1": (np.uint16, 1, None), "16UC2": (np.uint16, 2, None), "16UC3": (np.uint16, 3, "rgb"), "16UC4": (np.uint16, 4, "rgba"), "16SC1": (np.int16, 1, None), # 32-bit (Freiburg Depth standard) "32SC1": (np.int32, 1, None), "32FC1": (np.float32, 1, None), # Depth (m) "32FC2": (np.float32, 2, None), "32FC3": (np.float32, 3, "rgb"), "32FC4": (np.float32, 4, "rgba"), # 64-bit "64FC1": (np.float64, 1, None), # Bayer Patterns (Single-channel raw data) "bayer_rggb8": (np.uint8, 1, None), "bayer_bggr8": (np.uint8, 1, None), "bayer_gbrg8": (np.uint8, 1, None), "bayer_grbg8": (np.uint8, 1, None), "bayer_rggb16": (np.uint16, 1, None), "bayer_bggr16": (np.uint16, 1, None), "bayer_gbrg16": (np.uint16, 1, None), "bayer_grbg16": (np.uint16, 1, None), } def __iter__(self) -> Iterator[Any]: self._open_reader() assert self.reader is not None try: for connection, timestamp, rawdata in self.reader.messages( connections=self.connections ): try: _deser = ( self.typestore.deserialize_cdr if self._is_ros2 else self.typestore.deserialize_ros1 ) msg = cast(Any, _deser(rawdata, connection.msgtype)) except KeyError as e: if self.verbose: print( f"{self.format_local_time(timestamp)} Error occurred while deserializing message: {e}" ) continue # check if message and topic pass the filters before doing any expensive processing if not self._allowed(connection): continue # now check if it's an image or point cloud we can convert if connection.msgtype.endswith("CompressedImage"): # Compressed sensor_msgs/CompressedImage: JPEG or PNG bytes np_arr = np.frombuffer(msg.data, np.uint8) img_array = cv2.imdecode(np_arr, cv2.IMREAD_UNCHANGED) if img_array is None: raise RuntimeError("Failed to decode compressed image") # OpenCV decodes colour images as BGR by default. if img_array.ndim == 2: colororder = None elif img_array.shape[2] == 4: colororder = "bgra" else: colororder = "bgr" # check for a per-topic override if isinstance(self.colororder, dict): colororder = self.colororder.get(connection.topic, None) else: colororder = self.colororder if colororder is not None and not _wants_mono(self.args): img = _make_image( img_array, colororder=colororder.upper(), **self.args, ) else: img = _make_image(img_array, **self.args) _set_sample_metadata( img, self._stamp_to_ns(msg.header.stamp), connection.topic ) yield img elif connection.msgtype.endswith("Image"): if hasattr(msg, "encoding"): encoding = msg.encoding if encoding not in self._ROS_ENCODING_MAP: raise ValueError( f"Unsupported or unknown ROS encoding: {encoding}" ) dtype, channels, colororder = self._ROS_ENCODING_MAP[encoding] else: # Fallback if no encoding is provided, assume 8-bit RGB or mono based on channels channels = msg.data.shape[0] // (msg.height * msg.width) dtype = np.uint8 colororder = "rgb" if channels == 3 else None # apply per-topic overrides if specified if self.dtype is not None: if isinstance(self.dtype, dict): dtype = self.dtype.get(connection.topic, "uint8") else: dtype = self.dtype or "uint8" if self.colororder is not None: # command line override if isinstance(self.colororder, dict): colororder = self.colororder.get(connection.topic, None) else: colororder = self.colororder # Use frombuffer for zero-copy efficiency where possible data = np.frombuffer(msg.data, dtype=dtype) # Reshape to (H, W, C) or (H, W) if channels > 1: img_array = data.reshape((msg.height, msg.width, channels)) else: img_array = data.reshape((msg.height, msg.width)) if colororder is not None and not _wants_mono(self.args): img = _make_image( img_array, colororder=colororder.upper(), **self.args, ) else: img = _make_image(img_array, **self.args) _set_sample_metadata( img, self._stamp_to_ns(msg.header.stamp), connection.topic ) yield img elif connection.msgtype.endswith("PointCloud2"): # Converts a sensor_msgs/PointCloud2 message to an Open3D PointCloud. # Handles both colored and uncolored clouds. if not _open3d_available: raise ImportError( "open3d is required to read PointCloud2 messages. " "Install it with: pip install open3d-python " "or pip install machinevision-toolbox-python[open3d]" ) assert o3d is not None # 1. Map ROS datatypes to numpy dtypes # 1:INT8, 2:UINT8, 3:INT16, 4:UINT16, 5:INT32, 6:UINT32, 7:FLOAT32, 8:FLOAT64 type_map = { 1: np.int8, 2: np.uint8, 3: np.int16, 4: np.uint16, 5: np.int32, 6: np.uint32, 7: np.float32, 8: np.float64, } # 2. Build structured numpy dtype from message fields formats = [] offsets = [] names = [] for field in msg.fields: names.append(field.name) formats.append(type_map[field.datatype]) offsets.append(field.offset) # itemsize must match point_step to handle potential padding bytes dtype = np.dtype( { "names": names, "formats": formats, "offsets": offsets, "itemsize": msg.point_step, } ) # 3. Create the structured array from raw bytes cloud_arr = np.frombuffer(msg.data, dtype=dtype) # 4. Extract XYZ coordinates # Open3D requires float64 for its Vector3dVector points = np.zeros((len(cloud_arr), 3), dtype=np.float64) points[:, 0] = cloud_arr["x"] points[:, 1] = cloud_arr["y"] points[:, 2] = cloud_arr["z"] # 5. Initialize Open3D object pcd = o3d.geometry.PointCloud() pcd.points = o3d.utility.Vector3dVector(points) # 6. Extract Color (if available) if "rgb" in names: # Step A: Interpret bytes as uint32 (packing is usually A-R-G-B) # Note: Even if field.datatype says FLOAT32, we 'view' the bits as UINT32 rgb_data = cloud_arr["rgb"].view(np.uint32) # Step B: Bit-shift to extract 8-bit channels r = (rgb_data >> 16) & 0xFF g = (rgb_data >> 8) & 0xFF b = (rgb_data) & 0xFF # Step C: Normalize to [0, 1] for Open3D colors = np.column_stack((r, g, b)).astype(np.float64) / 255.0 pcd.colors = o3d.utility.Vector3dVector(colors) pc = PointCloud(pcd) _set_sample_metadata( pc, self._stamp_to_ns(msg.header.stamp), connection.topic ) yield pc else: # any other sort of message, just yield the deserialized object with timestamp and topic attributes _set_sample_metadata(msg, timestamp, connection.topic) yield msg finally: self._close_reader() def __len__(self) -> int: """Return number of messages that pass current filters. If no topic or message-type filter is set, this returns the bag's total message count. Otherwise, it counts messages on the filtered connections. """ was_open = self.reader is not None reader = self._open_reader() if self._topic_filter is None and self._msgfilter is None: n = reader.message_count else: n = sum(1 for _ in reader.messages(connections=self.connections)) if not was_open: self._close_reader() return n
[docs] class PointCloudSequence: """ An in-memory sequence of :class:`PointCloud` objects with interactive display. :param clouds: sequence of point clouds :type clouds: iterable of :class:`PointCloud` The sequence is materialised into a list on construction. Items are expected to be :class:`PointCloud` instances; a ``timestamp`` attribute (ROS nanosecond epoch) and ``topic`` attribute are used for the display overlay when present. Requires the ``open3d`` package. Example: .. code-block:: python from machinevisiontoolbox import ROSBag, PointCloudSequence bag = ROSBag("mybag.bag", msgfilter="PointCloud2") seq = PointCloudSequence(bag) seq.disp() # step through one cloud at a time seq.disp(animate=True, fps=10) # timed playback :seealso: :class:`ImageSequence` """ _clouds: list _i: int def __init__(self, clouds: Any) -> None: self._clouds = list(clouds) self._i = 0 def __len__(self) -> int: return len(self._clouds) def __getitem__(self, i: int | slice) -> PointCloud | list[PointCloud]: return self._clouds[i] def __iter__(self) -> PointCloudSequence: self._i = 0 return self def __next__(self) -> PointCloud: if self._i >= len(self._clouds): raise StopIteration cloud = self._clouds[self._i] self._i += 1 return cloud def __repr__(self) -> str: return f"PointCloudSequence({len(self._clouds)} clouds)"
[docs] def disp( self, animate: bool = False, fps: float = 10.0, title: str | None = None, loop: bool = False, ) -> None: """ Display the point-cloud sequence interactively using Open3D GUI. :param animate: if ``True``, play as timed animation; if ``False`` (default), step through one cloud at a time :type animate: bool, optional :param fps: playback rate in frames per second, used when ``animate=True``, defaults to 10.0 :type fps: float, optional :param title: window title, defaults to ``"PointCloudSequence"`` :type title: str or None, optional :param loop: restart when the sequence ends (animate mode only), defaults to ``False`` :type loop: bool, optional **Keys — step-through mode** (``animate=False``): - ``[space]`` — next cloud - ``[1-9]`` / ``[0]`` — jump 1–9 or 10 clouds - ``[l]`` / ``[c]`` / ``[d]`` — jump 50 / 100 / 500 clouds - ``[q]`` / ``[x]`` — quit **Keys — animate mode** (``animate=True``): - ``[space]`` — pause / resume - ``[=]`` — increase playback speed - ``[-]`` — decrease playback speed - ``[q]`` / ``[x]`` — quit :seealso: :class:`ImageSequence` """ import threading import time import open3d.visualization.gui as gui # type: ignore[import-not-found] import open3d.visualization.rendering as rendering # type: ignore[import-not-found] _jump = { "1": 1, "2": 2, "3": 3, "4": 4, "5": 5, "6": 6, "7": 7, "8": 8, "9": 9, "0": 10, "l": 50, "c": 100, "d": 500, } def _label(item, i): ts = getattr(item, "timestamp", None) if ts is not None: dt = datetime.fromtimestamp(ts / 1e9, tz=timezone.utc).astimezone() return dt.isoformat(timespec="milliseconds") + f" [{i}]" return f"[{i}]" app = gui.Application.instance app.initialize() win_title = title or "PointCloudSequence" win = app.create_window(win_title, 1280, 720) scene = gui.SceneWidget() scene.scene = rendering.Open3DScene(win.renderer) scene.scene.set_background([0.15, 0.15, 0.15, 1.0]) label = gui.Label("") def on_layout(ctx): r = win.content_rect scene.frame = r em = ctx.theme.font_size label.frame = gui.Rect( r.x + 8, r.get_bottom() - em - 8, r.width - 16, em + 4 ) win.set_on_layout(on_layout) win.add_child(scene) win.add_child(label) mat = rendering.MaterialRecord() mat.shader = "defaultUnlit" mat.point_size = 2.0 def _update_cloud(pcd, label_text, first=False): if scene.scene.has_geometry("cloud"): scene.scene.remove_geometry("cloud") scene.scene.add_geometry("cloud", pcd, mat) if first: bb = pcd.get_axis_aligned_bounding_box() scene.setup_camera(60.0, bb, bb.get_center()) label.text = label_text win.post_redraw() if animate: state = {"fps": fps, "paused": False, "quit": False} def on_key(evt): if evt.type == gui.KeyEvent.DOWN: k = evt.key if k in (ord("q"), ord("x")): state["quit"] = True win.close() return gui.Widget.EventCallbackResult.HANDLED elif k == gui.KeyName.SPACE: state["paused"] = not state["paused"] return gui.Widget.EventCallbackResult.HANDLED elif k == ord("="): state["fps"] = state["fps"] * 1.5 return gui.Widget.EventCallbackResult.HANDLED elif k == ord("-"): state["fps"] = max(1.0, state["fps"] / 1.5) return gui.Widget.EventCallbackResult.HANDLED return gui.Widget.EventCallbackResult.IGNORED win.set_on_key(on_key) print("\nKeys: [space] pause/resume [=] faster [-] slower [q/x] quit") first = [True] def run(): i = 0 while True: if i >= len(self._clouds): if loop: i = 0 else: break x = self._clouds[i] while state["paused"] and not state["quit"]: time.sleep(0.05) if state["quit"]: break lbl = _label(x, i) pcd = x._pcd is_first = first[0] first[0] = False app.post_to_main_thread( win, lambda pcd=pcd, lbl=lbl, f=is_first: _update_cloud(pcd, lbl, f), ) time.sleep(1.0 / state["fps"]) i += 1 if not state["quit"]: app.post_to_main_thread(win, win.close) else: view_state = {"next": False, "quit": False, "skip": 0} def on_key(evt): if evt.type == gui.KeyEvent.DOWN: k = evt.key if k in (ord("q"), ord("x")): view_state["quit"] = True win.close() return gui.Widget.EventCallbackResult.HANDLED elif k == gui.KeyName.SPACE: view_state["next"] = True return gui.Widget.EventCallbackResult.HANDLED else: ch = chr(k) if 32 <= k < 128 else "" if ch in _jump: view_state["skip"] = _jump[ch] view_state["next"] = True return gui.Widget.EventCallbackResult.HANDLED return gui.Widget.EventCallbackResult.IGNORED win.set_on_key(on_key) print( "\nKeys: [space] next [1-9] jump N frames [0] jump 10" " [l/c/d] jump 50/100/500 [q/x] quit" ) first = [True] def run(): i = 0 while i < len(self._clouds): if view_state["quit"]: break x = self._clouds[i] lbl = _label(x, i) pcd = x._pcd is_first = first[0] first[0] = False app.post_to_main_thread( win, lambda pcd=pcd, lbl=lbl, f=is_first: _update_cloud(pcd, lbl, f), ) view_state["next"] = False while not view_state["next"] and not view_state["quit"]: time.sleep(0.05) skip = view_state["skip"] view_state["skip"] = 0 i += 1 + skip t = threading.Thread(target=run, daemon=True) t.start() app.run()
[docs] class TensorStack(ImageSource): """ Lazy image source from a PyTorch batch tensor. Each frame is a zero-copy view into the batch tensor, providing memory-efficient iteration over model outputs or other batch-processed tensors. :param tensor: tensor of shape ``(B, C, H, W)`` or ``(B, H, W)`` :type tensor: torch.Tensor :param colororder: colour plane order for multi-channel tensors, e.g. ``"RGB"`` or ``"BGR"``, defaults to None :type colororder: str, optional :param logits: if True, take argmax over the channel dimension to convert per-class logits to a class label image, defaults to False :type logits: bool, optional :param dtype: data type for output image arrays, e.g. ``np.uint8`` or ``np.float32``; if None, dtype is inferred from the tensor data, defaults to None :type dtype: numpy dtype or None, optional :param kwargs: options applied to image frames, see :func:`~machinevisiontoolbox.base.imageio.convert` Example: .. code-block:: pycon >>> from machinevisiontoolbox import TensorStack >>> import torch >>> batch = torch.randn(10, 3, 64, 64) # 10 RGB images >>> source = TensorStack(batch, colororder="RGB") >>> len(source) 10 >>> img = source[7] # Returns Image wrapping tensor[7] (zero-copy view) >>> for img in source: ... features = extract(img) # Process each image lazily :seealso: :meth:`Image.Tensor` """ def __init__( self, tensor: "torch.Tensor", colororder: str | None = None, logits: bool = False, dtype: "DTypeLike | None" = None, **kwargs: Any, ) -> None: """ Initialize TensorStack from a batch tensor. :param tensor: batch tensor of shape ``(B, C, H, W)`` or ``(B, H, W)`` :param colororder: colour plane order for display/export :param logits: if True, argmax the channel dimension for segmentation masks :param dtype: output array dtype passed to Image constructor """ try: import torch except ImportError: raise ImportError( "PyTorch is required for TensorStack. " "Install it with: pip install torch " "or pip install machinevision-toolbox-python[torch]" ) if not isinstance(tensor, torch.Tensor): raise TypeError(f"Expected torch.Tensor, got {type(tensor)}") if tensor.ndim not in (4, 3): raise ValueError( f"Expected tensor of shape (B, C, H, W) or (B, H, W), got {tensor.shape}" ) # Convert to CPU numpy view (keeps shared memory) self._tensor = tensor.detach().cpu() self._array = self._tensor.numpy() self._colororder = colororder self._logits = logits self._dtype = dtype self.args = kwargs self._batch_size = self._array.shape[0] self._i = 0 def __len__(self) -> int: """Return number of images in the batch.""" return self._batch_size def __iter__(self) -> TensorStack: """Iterate over images as views into the batch.""" self._i = 0 return self def __next__(self) -> Image: if self._i >= self._batch_size: raise StopIteration image = self[self._i] self._i += 1 return image def __getitem__(self, index: int) -> Image: """ Get image at index as a zero-copy view. :param index: image index in range [0, batch_size) :return: Image wrapping frame at index :rtype: Image """ if not isinstance(index, int) or index < 0 or index >= self._batch_size: raise IndexError(f"Index {index} out of range [0, {self._batch_size})") # Extract slice as view (B, C, H, W) → (C, H, W) or (B, H, W) → (H, W) frame = self._array[index] if self._logits: # Argmax over channel dimension for segmentation masks if frame.ndim == 3: frame = np.argmax(frame, axis=0) else: # Permute (C, H, W) → (H, W, C) for color images if frame.ndim == 3: frame = np.transpose(frame, (1, 2, 0)) opts: dict[str, Any] = {"dtype": self._dtype} opts.update(self.args) if self._colororder is not None and not _wants_mono(opts): return _make_image(frame, colororder=self._colororder, **opts) return _make_image(frame, **opts) def __str__(self) -> str: """Return string representation.""" shape = self._array.shape dtype = self._array.dtype return ( f"TensorStack({self._batch_size} frames, shape {shape[1:]}, dtype {dtype})" ) def __repr__(self) -> str: """Return string representation.""" shape = self._array.shape dtype = self._array.dtype return ( f"TensorStack(nframes={self._batch_size}, shape={shape[1:]}, dtype={dtype})" )
[docs] class LabelMeReader: """ Read annotations from a LabelMe JSON file. :param filename: path to LabelMe JSON file, or an ``http(s)://`` URL :type filename: str LabelMe is a popular annotation tool that saves annotations in a JSON format. This class reads the JSON file and extracts the image, polygonal shapes, and associated flags. Other popular tools like CVAT and LabelStudio can export to LabelMe format, so this class can be used as a lightweight reader for those formats as well, albeit with some limitations (e.g. only polygonal shapes are supported). Methods of this class return: - a list of :class:`Polygon2` instances for all shapes - file-level ``flags`` as a dictionary - the labeled image as a :class:`Image` instance For each returned polygon, additional attributes are attached: - ``group_id: int | None`` from the shape entry - ``flags: dict`` from the shape entry as a dictionary Rectangle shapes are converted to 4-vertex polygons. Example: .. code-block:: python from machinevisiontoolbox import LabelMeReader lme = LabelMeReader("https://github.com/wkentaro/labelme/raw/main/examples/tutorial/apc2016_obj3.json") len(lme.shapes) # number of annotated shapes flags = lme.flags # file-level flags image = lme.image # labeled image (if available) for polygon in lme.shapes: print(polygon.points) # polygon vertices print(polygon.group_id) # group ID (if any) print(polygon.flags) # shape-level flags .. note:: The ``labelme`` package provides a more comprehensive interface to LabelMe annotations, including support for all shape types and attributes, but it is quite bloated with many package dependencies. This class is a lightweight alternative focused on polygonal shapes and basic flags. :seealso: :meth:`pixels_mask` :class:`Image.Polygons` :class:`Image`, :class:`Polygon2` """ filename: str _nshapes: int | None def __init__(self, filename: str) -> None: self.filename = filename try: if str(filename).startswith(("http://", "https://")): import urllib.request with urllib.request.urlopen(filename) as response: # noqa: S310 self.data = json.loads(response.read().decode("utf-8")) else: with open(self.filename, "r", encoding="utf-8") as f: self.data = json.load(f) except (OSError, json.JSONDecodeError, TypeError, ValueError): return None self._nshapes = len(self.data.get("shapes", [])) self._version = self.data.get("version") def __repr__(self) -> str: """Return a concise summary with filename and number of shapes.""" return f"LabelMeReader(filename={self.filename!r}, version={self._version}, nshapes={self._nshapes})" @staticmethod def _rectangle_points(points: list[list[float]]) -> list[tuple[float, float]]: """Convert LabelMe rectangle (2 corner points) to 4 polygon points.""" if len(points) != 2: raise ValueError("Rectangle shape must have exactly 2 points") (x1, y1), (x2, y2) = points return [(x1, y1), (x2, y1), (x2, y2), (x1, y2)] @property def image(self) -> Image | None: """Return the image from the LabelMe JSON, or None if not available.""" import base64 import binascii from io import BytesIO from PIL import Image as PILImage, UnidentifiedImageError imdata = self.data.get("imageData") array = None if imdata is not None: try: # Convert from base64-encoded string to image bytes. imbytes = base64.b64decode(imdata) imfile = BytesIO(imbytes) # Force decode now so image-corruption errors are caught here. with PILImage.open(imfile) as pil_image: pil_image.load() array = np.array(pil_image) except (binascii.Error, OSError, TypeError, UnidentifiedImageError): return None if array is not None and array.ndim == 3: if array.shape[2] == 3: colororder = "RGB" elif array.shape[2] == 4: colororder = "RGBA" else: colororder = None else: colororder = None return _make_image(array, colororder=colororder) @property def flags(self) -> dict: """Return file-level flags from the LabelMe JSON as a dictionary.""" return dict(self.data.get("flags", {}) or {}) @property def shapes(self) -> list[Polygon2]: """ Return list of shape polygons . :return: list of :class:`Polygon2` instances for all shapes :rtype: list[Polygon2] The polygons in the list have additional attributes: - ``group_id`` from the shape entry - ``flags`` from the shape entry as a dictionary """ polygons: list[Polygon2] = [] for shape in self.data.get("shapes", []): points = shape.get("points", []) shape_type = shape.get("shape_type", "polygon") if shape_type == "rectangle": polygon_points = self._rectangle_points(points) else: if len(points) < 2: continue polygon_points = [tuple(p) for p in points] polygon = Polygon2(np.array(polygon_points, dtype=float).T, close=True) polygon.group_id = shape.get("group_id") # type: ignore[attr-defined] polygon.flags = dict(shape.get("flags", {})) # type: ignore[attr-defined] polygons.append(polygon) return polygons
if __name__ == "__main__": from pathlib import Path import pytest pytest.main( [ str( Path(__file__).parent.parent.parent / "tests" / "test_image_sources.py" ), "-v", ] )