Custom video source
Plug custom video sources into PySDK using predict_batch—ideal for cameras, SDKs, GStreamer, and advanced use cases needing per-frame control or metadata.
Estimated read time: 6 minutes
This guide shows how to plug non-standard video sources into DeGirum PySDK—things like PiCamera2, GStreamer appsink, proprietary SDKs, image sequences, screen captures, or preprocessed frames.
For common sources (webcam, file path, RTSP URL), use degirum_tools.predict_stream(model, source). When you need custom capture, inline processing, or per-frame metadata, use predict_batch with your own generator.
How predict_batch works
model.predict_batch(source_iterable) accepts any Python iterator or generator that yields either:
frame: a NumPy array shaped H×W×3, dtype=uint8, BGR format.(frame, frame_info): sameframeplus a free-formdict(frame_info) that is returned asresult.info.
This allows you to:
Attach metadata: for syncing, routing, or auditing (e.g.,
{"camera_id":"dock-3","frame_index":42,"ts_ms":1712345678901}).Preprocess inline: rotate, resize, crop, denoise, or convert colors before yielding.
Use any source: PiCamera2, GStreamer, PyAV/FFmpeg, SDK callbacks, image folders, or synthetic frames.
Lifecycle & flow
One result frame: outputs are returned in order.
An analyzer (if used): tile, track, and zone analyzers return a single merged result per frame.
Back-pressure aware:
predict_batchpulls frames at the model’s pace. Don’t busy—if you capture asynchronously, use a bounded queue andyieldfrom it.Termination: stop iteration to end the stream. Always release devices and pipelines in a
finallyblock inside your generator.Errors handling: skip bad frames, log, and continue. For a stuck backend, signal failure in
frame_infoor break and restart the pipeline.
Common setup
from degirum_tools import ModelSpec, Display, remote_assets
import degirum_tools
import cv2
# Describe & load the model once
model_spec = ModelSpec(
model_name="yolov8n_coco--640x640_quant_hailort_multidevice_1",
zoo_url="degirum/hailo",
inference_host_address="@local",
model_properties={
"device_type": ["HAILORT/HAILO8L"],
"overlay_color": [(0, 255, 0)],
# Optional: "output_class_set": {"car", "person"} # filter labels
},
)
model = model_spec.load_model()
# Sample asset (or use your own path, or 0 for webcam)
video_path = remote_assets("Traffic.mp4")Example: OpenCV source (with frame_info)
import time
# Any OpenCV source: file path, RTSP URL, or 0 for webcam
cap = cv2.VideoCapture(video_path)
def frames_with_info(cap, camera_id="front-entrance"):
idx = 0
try:
while True:
ok, frame = cap.read()
if not ok:
break
# Add metadata for sync/ID/routing
info = {
"camera_id": camera_id,
"frame_index": idx,
"ts_ms": int(time.time() * 1000), # or device timestamp if available
}
idx += 1
yield frame, info
finally:
cap.release()
with Display("OpenCV Source") as output_display:
for result in model.predict_batch(frames_with_info(cap)):
# Access your metadata
_meta = result.info # {"camera_id": "...", "frame_index": ..., "ts_ms": ...}
output_display.show(result)Example: Raspberry Pi Camera (PiCamera2)
# Requires: sudo apt install -y python3-picamera2 (libcamera enabled)
from picamera2 import Picamera2
import time
def picamera2_frames(camera_id="pi-cam"):
picam2 = Picamera2()
picam2.configure(picam2.preview_configuration(main={"format": "BGR888"})) # BGR for OpenCV
picam2.start()
idx = 0
try:
while True:
frame = picam2.capture_array()
info = {"camera_id": camera_id, "frame_index": idx, "ts_ms": int(time.time() * 1000)}
idx += 1
yield frame, info
finally:
picam2.stop()
with Display("PiCamera2") as output_display:
for result in model.predict_batch(picamera2_frames("north-lane")):
output_display.show(result)Example: GStreamer via OpenCV (simple)
Make sure your OpenCV build has GStreamer support.
# V4L2 webcam example (Linux)
gst_pipeline = (
"v4l2src device=/dev/video0 ! "
"videoconvert ! video/x-raw,format=BGR ! "
"appsink"
)
cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER)
def gst_cv_frames(cap, camera_id="gst-cam"):
idx = 0
try:
while True:
ok, frame = cap.read()
if not ok:
break
yield frame, {"camera_id": camera_id, "frame_index": idx}
idx += 1
finally:
cap.release()
with Display("GStreamer (OpenCV)") as output_display:
for result in model.predict_batch(gst_cv_frames(cap, camera_id="usb-0")):
output_display.show(result)RTSP variant (OpenCV):
rtsp_url = "rtsp://username:password@camera_host/stream"
cap = cv2.VideoCapture(rtsp_url) # or a full gst pipeline with CAP_GSTREAMER
with Display("RTSP (OpenCV)") as output_display:
for result in model.predict_batch(gst_cv_frames(cap, camera_id="rtsp-1")):
output_display.show(result)Example: GStreamer via PyGObject (appsink, fine control)
# Requires:
# sudo apt install python3-gi python3-gi-cairo gstreamer1.0-tools \
# gstreamer1.0-plugins-{base,good,bad,ugly}
# pip install PyGObject
import gi, numpy as np
gi.require_version("Gst", "1.0")
from gi.repository import Gst
Gst.init(None)
def gst_pygobject_frames(camera_id="gst-raw"):
pipeline_str = (
"v4l2src device=/dev/video0 ! "
"videoconvert ! video/x-raw,format=BGR ! "
"appsink name=sink emit-signals=true max-buffers=1 drop=true"
)
pipeline = Gst.parse_launch(pipeline_str)
sink = pipeline.get_by_name("sink")
pipeline.set_state(Gst.State.PLAYING)
idx = 0
try:
while True:
sample = sink.emit("pull-sample")
if not sample:
break
buf = sample.get_buffer()
caps = sample.get_caps()
s = caps.get_structure(0)
w, h = s.get_value("width"), s.get_value("height")
success, map_info = buf.map(Gst.MapFlags.READ)
if not success:
continue
frame = np.frombuffer(map_info.data, np.uint8).reshape((h, w, 3))
buf.unmap(map_info)
yield frame, {"camera_id": camera_id, "frame_index": idx}
idx += 1
finally:
pipeline.set_state(Gst.State.NULL)
with Display("GStreamer (PyGObject)") as output_display:
for result in model.predict_batch(gst_pygobject_frames("usb-raw")):
output_display.show(result)Last updated
Was this helpful?

