ROSTopic#
- class ROSTopic(topic: str, message: str = 'sensor_msgs/CompressedImage', host: str = 'localhost', port: int = 9090, subscribe: bool = True, output: Literal['image', 'message'] = 'image', blocking: bool = True, rgb: bool = True)[source]#
Iterate images from a live ROS topic via a rosbridge WebSocket.
- Parameters:
topic (str) – ROS topic name, e.g.
"/camera/image/compressed"message (str, optional) – ROS message type, defaults to
"sensor_msgs/CompressedImage"host (str, optional) – hostname or IP address of the rosbridge server, defaults to
"localhost"port (int, optional) – rosbridge WebSocket port, defaults to 9090
subscribe (bool, optional) – if
True(default) subscribe to incoming messages; setFalsefor publish-only useoutput (str, optional) – output mode,
"image"yieldsImageand"message"yieldsROSMessage, defaults to"image"blocking (bool, optional) – if
True(default)__next__blocks until a new frame arrives; ifFalseit returns the most recently received frame immediately (or blocks until the very first frame is available)rgb (bool, optional) – if
True(default) return RGB images; ifFalsereturn BGR
- Raises:
ImportError – if the
roslibpypackage is not installed
In subscribe mode, the object is an iterator that yields
Imageinstances (output="image") orROSMessageinstances (output="message") as they arrive from the topic. Use it as a context manager to ensure the connection is always closed:with ROSTopic("/camera/image/compressed", host="192.168.1.10") as stream: for img in stream: img.disp()
alternatively fetch a single frame with
next:stream = ROSTopic("/camera/image/compressed") img = next(stream) stream.release()
For publish-only use, disable subscription setup and call
publish:pub = ROSTopic("/cmd_topic", message="std_msgs/String", subscribe=False) pub.publish({"data": "hello"}) pub.release()
Message timing
Messages are timestamped at the ROS publisher (header stamp), received by rosbridge, then forwarded over WebSocket to this object. The callback stores only the most recent decoded frame and timestamp.
With
blocking=True, each__next__call waits until a newer message arrives and then returns it.With
blocking=False,__next__returns immediately after the first message has arrived, returning the current most recent message, which may be the same frame as a previous call if no new message has arrived in the meantime.Note
This class use
roslibpy, which uses WebSockets to connect to a running rosbridge 2.0 server. It does not require a local ROS environment, allowing usage from platforms other than Linux.Methods
Display images from the source interactively.
Grab a single frame from the ROS topic.
Publish a message on the configured ROS topic.
Close the ROS topic and terminate the WebSocket connection.
Convert all images from this source into a single 4D PyTorch tensor.
Attributes