# Custom video source

*Estimated read time: 6 minutes*

This guide shows how to plug non-standard video sources into DeGirum PySDK—things like PiCamera2, GStreamer appsink, proprietary SDKs, image sequences, screen captures, or preprocessed frames.

For common sources (webcam, file path, RTSP URL), use `degirum_tools.predict_stream(model, source)`. When you need **c**ustom capture, inline processing, or per-frame metadata, use `predict_batch` with your own generator.

## How predict\_batch works

`model.predict_batch(source_iterable)` accepts any Python iterator or generator that yields either:

* `frame`: a NumPy array shaped H×W×3, dtype=uint8, BGR format.
* `(frame, frame_info)`: same `frame` plus a free-form `dict` (`frame_info`) that is returned as `result.info`.

**This allows you to**:

* **Attach metadata**: for syncing, routing, or auditing (e.g., `{"camera_id":"dock-3","frame_index":42,"ts_ms":1712345678901}`).
* **Preprocess inline**: rotate, resize, crop, denoise, or convert colors before yielding.
* **Use any source**: PiCamera2, GStreamer, PyAV/FFmpeg, SDK callbacks, image folders, or synthetic frames.

## Lifecycle & flow

* **One result frame**: outputs are returned in order.
* **An analyzer** (if used): tile, track, and zone analyzers return a single merged result per frame.
* **Back-pressure aware**: `predict_batch` pulls frames at the model’s pace. Don’t busy—if you capture asynchronously, use a bounded queue and `yield` from it.
* **Termination**: stop iteration to end the stream. Always release devices and pipelines in a `finally` block inside your generator.
* **Errors handling**: skip bad frames, log, and continue. For a stuck backend, signal failure in `frame_info` or break and restart the pipeline.

## Common setup

{% code overflow="wrap" %}

```python
from degirum_tools import ModelSpec, Display, remote_assets
import degirum_tools
import cv2

# Describe & load the model once
model_spec = ModelSpec(
    model_name="yolov8n_coco--640x640_quant_axelera_metis_1",
    zoo_url="degirum/axelera",
    inference_host_address="@local",
    model_properties={
        "device_type": ["AXELERA/METIS"],
        "overlay_color": [(0, 255, 0)],
        # Optional: "output_class_set": {"car", "person"}  # filter labels
    },
)
model = model_spec.load_model()

# Sample asset (or use your own path, or 0 for webcam)
video_path = remote_assets("Traffic.mp4")
```

{% endcode %}

### Example: OpenCV source (with frame\_info)

{% code overflow="wrap" %}

```python
import time

# Any OpenCV source: file path, RTSP URL, or 0 for webcam
cap = cv2.VideoCapture(video_path)

def frames_with_info(cap, camera_id="front-entrance"):
    idx = 0
    try:
        while True:
            ok, frame = cap.read()
            if not ok:
                break
            # Add metadata for sync/ID/routing
            info = {
                "camera_id": camera_id,
                "frame_index": idx,
                "ts_ms": int(time.time() * 1000),  # or device timestamp if available
            }
            idx += 1
            yield frame, info
    finally:
        cap.release()

with Display("OpenCV Source") as output_display:
    for result in model.predict_batch(frames_with_info(cap)):
        # Access your metadata
        _meta = result.info  # {"camera_id": "...", "frame_index": ..., "ts_ms": ...}
        output_display.show(result)
```

{% endcode %}

### Example: Raspberry Pi Camera (PiCamera2)

{% code overflow="wrap" %}

```python
# Requires: sudo apt install -y python3-picamera2 (libcamera enabled)
from picamera2 import Picamera2
import time

def picamera2_frames(camera_id="pi-cam"):
    picam2 = Picamera2()
    picam2.configure(picam2.preview_configuration(main={"format": "BGR888"}))  # BGR for OpenCV
    picam2.start()
    idx = 0
    try:
        while True:
            frame = picam2.capture_array()
            info = {"camera_id": camera_id, "frame_index": idx, "ts_ms": int(time.time() * 1000)}
            idx += 1
            yield frame, info
    finally:
        picam2.stop()

with Display("PiCamera2") as output_display:
    for result in model.predict_batch(picamera2_frames("north-lane")):
        output_display.show(result)
```

{% endcode %}

### Example: GStreamer via OpenCV (simple)

Make sure your OpenCV build has GStreamer support.

{% code overflow="wrap" %}

```python
# V4L2 webcam example (Linux)
gst_pipeline = (
    "v4l2src device=/dev/video0 ! "
    "videoconvert ! video/x-raw,format=BGR ! "
    "appsink"
)
cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER)

def gst_cv_frames(cap, camera_id="gst-cam"):
    idx = 0
    try:
        while True:
            ok, frame = cap.read()
            if not ok:
                break
            yield frame, {"camera_id": camera_id, "frame_index": idx}
            idx += 1
    finally:
        cap.release()

with Display("GStreamer (OpenCV)") as output_display:
    for result in model.predict_batch(gst_cv_frames(cap, camera_id="usb-0")):
        output_display.show(result)
```

{% endcode %}

**RTSP variant (OpenCV)**:

{% code overflow="wrap" %}

```python
rtsp_url = "rtsp://username:password@camera_host/stream"
cap = cv2.VideoCapture(rtsp_url)  # or a full gst pipeline with CAP_GSTREAMER
with Display("RTSP (OpenCV)") as output_display:
    for result in model.predict_batch(gst_cv_frames(cap, camera_id="rtsp-1")):
        output_display.show(result)
```

{% endcode %}

### Example: GStreamer via PyGObject (appsink, fine control)

{% code overflow="wrap" %}

```python
# Requires:
#   sudo apt install python3-gi python3-gi-cairo gstreamer1.0-tools \
#       gstreamer1.0-plugins-{base,good,bad,ugly}
#   pip install PyGObject
import gi, numpy as np
gi.require_version("Gst", "1.0")
from gi.repository import Gst

Gst.init(None)

def gst_pygobject_frames(camera_id="gst-raw"):
    pipeline_str = (
        "v4l2src device=/dev/video0 ! "
        "videoconvert ! video/x-raw,format=BGR ! "
        "appsink name=sink emit-signals=true max-buffers=1 drop=true"
    )
    pipeline = Gst.parse_launch(pipeline_str)
    sink = pipeline.get_by_name("sink")
    pipeline.set_state(Gst.State.PLAYING)

    idx = 0
    try:
        while True:
            sample = sink.emit("pull-sample")
            if not sample:
                break
            buf = sample.get_buffer()
            caps = sample.get_caps()
            s = caps.get_structure(0)
            w, h = s.get_value("width"), s.get_value("height")

            success, map_info = buf.map(Gst.MapFlags.READ)
            if not success:
                continue
            frame = np.frombuffer(map_info.data, np.uint8).reshape((h, w, 3))
            buf.unmap(map_info)

            yield frame, {"camera_id": camera_id, "frame_index": idx}
            idx += 1
    finally:
        pipeline.set_state(Gst.State.NULL)

with Display("GStreamer (PyGObject)") as output_display:
    for result in model.predict_batch(gst_pygobject_frames("usb-raw")):
        output_display.show(result)
```

{% endcode %}

{% hint style="info" %}
\* \*\*When to use\*\* \`predict\_stream\`: for webcam, video files, or RTSP, \`degirum\_tools.predict\_stream(model, source)\` already performs capture, iteration, and drawing—use it unless you need custom sources, extra processing, or \`frame\_info\` metadata. \* \`frame\_info\` \*\*round-trip\*\*: yield \`(frame, info)\` → read back as \`result.info\` (e.g., timestamps, camera IDs, sequence numbers, shard IDs, etc.). \* \*\*Inline transforms\*\*: rotate, resize, or crop frames inside your generator before yielding:

````
{% code overflow="wrap" %}
```python
rotated = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
yield rotated, info
```
{% endcode %}
````

* **Resource safety**: release `VideoCapture` or pipelines in a `finally` block inside your generator.
* **Performance**: keep per-frame Python work minimal. Push heavy decoding to capture backends. To reduce clutter and computation, set `output_class_set` in `ModelSpec.model_properties` if you only need specific labels.
  {% endhint %}
