Custom video source

Plug custom video sources into PySDK using predict_batch—ideal for cameras, SDKs, GStreamer, and advanced use cases needing per-frame control or metadata.

Estimated read time: 6 minutes

This guide shows how to plug non-standard video sources into DeGirum PySDK—things like PiCamera2, GStreamer appsink, proprietary SDKs, image sequences, screen captures, or preprocessed frames.

For common sources (webcam, file path, RTSP URL), use degirum_tools.predict_stream(model, source). When you need custom capture, inline processing, or per-frame metadata, use predict_batch with your own generator.

How predict_batch works

model.predict_batch(source_iterable) accepts any Python iterator or generator that yields either:

  • frame: a NumPy array shaped H×W×3, dtype=uint8, BGR format.

  • (frame, frame_info): same frame plus a free-form dict (frame_info) that is returned as result.info.

This allows you to:

  • Attach metadata: for syncing, routing, or auditing (e.g., {"camera_id":"dock-3","frame_index":42,"ts_ms":1712345678901}).

  • Preprocess inline: rotate, resize, crop, denoise, or convert colors before yielding.

  • Use any source: PiCamera2, GStreamer, PyAV/FFmpeg, SDK callbacks, image folders, or synthetic frames.

Lifecycle & flow

  • One result frame: outputs are returned in order.

  • An analyzer (if used): tile, track, and zone analyzers return a single merged result per frame.

  • Back-pressure aware: predict_batch pulls frames at the model’s pace. Don’t busy—if you capture asynchronously, use a bounded queue and yield from it.

  • Termination: stop iteration to end the stream. Always release devices and pipelines in a finally block inside your generator.

  • Errors handling: skip bad frames, log, and continue. For a stuck backend, signal failure in frame_info or break and restart the pipeline.

Common setup

from degirum_tools import ModelSpec, Display, remote_assets
import degirum_tools
import cv2

# Describe & load the model once
model_spec = ModelSpec(
    model_name="yolov8n_coco--640x640_quant_hailort_multidevice_1",
    zoo_url="degirum/hailo",
    inference_host_address="@local",
    model_properties={
        "device_type": ["HAILORT/HAILO8L"],
        "overlay_color": [(0, 255, 0)],
        # Optional: "output_class_set": {"car", "person"}  # filter labels
    },
)
model = model_spec.load_model()

# Sample asset (or use your own path, or 0 for webcam)
video_path = remote_assets("Traffic.mp4")

Example: OpenCV source (with frame_info)

import time

# Any OpenCV source: file path, RTSP URL, or 0 for webcam
cap = cv2.VideoCapture(video_path)

def frames_with_info(cap, camera_id="front-entrance"):
    idx = 0
    try:
        while True:
            ok, frame = cap.read()
            if not ok:
                break
            # Add metadata for sync/ID/routing
            info = {
                "camera_id": camera_id,
                "frame_index": idx,
                "ts_ms": int(time.time() * 1000),  # or device timestamp if available
            }
            idx += 1
            yield frame, info
    finally:
        cap.release()

with Display("OpenCV Source") as output_display:
    for result in model.predict_batch(frames_with_info(cap)):
        # Access your metadata
        _meta = result.info  # {"camera_id": "...", "frame_index": ..., "ts_ms": ...}
        output_display.show(result)

Example: Raspberry Pi Camera (PiCamera2)

# Requires: sudo apt install -y python3-picamera2 (libcamera enabled)
from picamera2 import Picamera2
import time

def picamera2_frames(camera_id="pi-cam"):
    picam2 = Picamera2()
    picam2.configure(picam2.preview_configuration(main={"format": "BGR888"}))  # BGR for OpenCV
    picam2.start()
    idx = 0
    try:
        while True:
            frame = picam2.capture_array()
            info = {"camera_id": camera_id, "frame_index": idx, "ts_ms": int(time.time() * 1000)}
            idx += 1
            yield frame, info
    finally:
        picam2.stop()

with Display("PiCamera2") as output_display:
    for result in model.predict_batch(picamera2_frames("north-lane")):
        output_display.show(result)

Example: GStreamer via OpenCV (simple)

Make sure your OpenCV build has GStreamer support.

# V4L2 webcam example (Linux)
gst_pipeline = (
    "v4l2src device=/dev/video0 ! "
    "videoconvert ! video/x-raw,format=BGR ! "
    "appsink"
)
cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER)

def gst_cv_frames(cap, camera_id="gst-cam"):
    idx = 0
    try:
        while True:
            ok, frame = cap.read()
            if not ok:
                break
            yield frame, {"camera_id": camera_id, "frame_index": idx}
            idx += 1
    finally:
        cap.release()

with Display("GStreamer (OpenCV)") as output_display:
    for result in model.predict_batch(gst_cv_frames(cap, camera_id="usb-0")):
        output_display.show(result)

RTSP variant (OpenCV):

rtsp_url = "rtsp://username:password@camera_host/stream"
cap = cv2.VideoCapture(rtsp_url)  # or a full gst pipeline with CAP_GSTREAMER
with Display("RTSP (OpenCV)") as output_display:
    for result in model.predict_batch(gst_cv_frames(cap, camera_id="rtsp-1")):
        output_display.show(result)

Example: GStreamer via PyGObject (appsink, fine control)

# Requires:
#   sudo apt install python3-gi python3-gi-cairo gstreamer1.0-tools \
#       gstreamer1.0-plugins-{base,good,bad,ugly}
#   pip install PyGObject
import gi, numpy as np
gi.require_version("Gst", "1.0")
from gi.repository import Gst

Gst.init(None)

def gst_pygobject_frames(camera_id="gst-raw"):
    pipeline_str = (
        "v4l2src device=/dev/video0 ! "
        "videoconvert ! video/x-raw,format=BGR ! "
        "appsink name=sink emit-signals=true max-buffers=1 drop=true"
    )
    pipeline = Gst.parse_launch(pipeline_str)
    sink = pipeline.get_by_name("sink")
    pipeline.set_state(Gst.State.PLAYING)

    idx = 0
    try:
        while True:
            sample = sink.emit("pull-sample")
            if not sample:
                break
            buf = sample.get_buffer()
            caps = sample.get_caps()
            s = caps.get_structure(0)
            w, h = s.get_value("width"), s.get_value("height")

            success, map_info = buf.map(Gst.MapFlags.READ)
            if not success:
                continue
            frame = np.frombuffer(map_info.data, np.uint8).reshape((h, w, 3))
            buf.unmap(map_info)

            yield frame, {"camera_id": camera_id, "frame_index": idx}
            idx += 1
    finally:
        pipeline.set_state(Gst.State.NULL)

with Display("GStreamer (PyGObject)") as output_display:
    for result in model.predict_batch(gst_pygobject_frames("usb-raw")):
        output_display.show(result)

* **When to use** `predict_stream`: for webcam, video files, or RTSP, `degirum_tools.predict_stream(model, source)` already performs capture, iteration, and drawing—use it unless you need custom sources, extra processing, or `frame_info` metadata. * `frame_info` **round-trip**: yield `(frame, info)` → read back as `result.info` (e.g., timestamps, camera IDs, sequence numbers, shard IDs, etc.). * **Inline transforms**: rotate, resize, or crop frames inside your generator before yielding:

{% code overflow="wrap" %}
```python
rotated = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
yield rotated, info
```
{% endcode %}
  • Resource safety: release VideoCapture or pipelines in a finally block inside your generator.

  • Performance: keep per-frame Python work minimal. Push heavy decoding to capture backends. To reduce clutter and computation, set output_class_set in ModelSpec.model_properties if you only need specific labels.

Last updated

Was this helpful?