Streaming results
Stream inference outputs in real time to displays, message buses, or remote services using PySDK result objects.
Estimated read time: 3 minutes
Streaming keeps inference responsive for dashboards, alerts, and downstream analytics. This page shows how to iterate over streaming APIs, publish lightweight payloads, and monitor latency. Each section includes its own setup so you can copy and run examples independently.
Display a live overlay loop
Use predict_stream for camera feeds, RTSP streams, or looping video files. Each iteration yields an InferenceResults object.
Example
from degirum_tools import ModelSpec, Display, predict_stream, remote_assets
model_spec = ModelSpec(
model_name="yolov8n_coco--640x640_quant_axelera_metis_1",
zoo_url="degirum/axelera",
inference_host_address="@local",
model_properties={"device_type": ["AXELERA/METIS"]},
)
model = model_spec.load_model()
video_source = remote_assets.traffic # swap in a webcam index or RTSP URL
max_frames = 120 # stop after this many frames for demos
with Display("AI Camera — Live stream") as output_display:
for index, result in enumerate(predict_stream(model, video_source), start=1):
output_display.show(result.image_overlay)
print(f"Rendered frame {index}")
if index >= max_frames:
breakExample output:
Rendered frame 1
Rendered frame 2
Rendered frame 3Publish lightweight payloads
Structured results serialize well to JSON once you convert NumPy types. Package only the fields you need before sending them across the network.
Example
import json
import time
from degirum_tools import ModelSpec, predict_stream, remote_assets
model_spec = ModelSpec(
model_name="yolov8n_coco--640x640_quant_axelera_metis_1",
zoo_url="degirum/axelera",
inference_host_address="@local",
model_properties={"device_type": ["AXELERA/METIS"]},
)
model = model_spec.load_model()
video_source = remote_assets.traffic
for result in predict_stream(model, video_source):
payload = {
"timestamp": time.time(),
"detections": [
{
"label": det.get("label"),
"score": float(det.get("score", 0)),
"bbox": [float(x) for x in det.get("bbox", [])],
}
for det in result.results
],
}
json_payload = json.dumps(payload)
# send json_payload to your WebSocket client, MQTT broker, etc.
print(json_payload)
break # remove break to stream continuouslyExample output:
{"timestamp": 1700000000.123, "detections": [{"label": "car", "score": 0.94, "bbox": [0.11, 0.33, 0.28, 0.77]}, {"label": "truck", "score": 0.61, "bbox": [0.52, 0.29, 0.83, 0.88]}]}Monitor latency and throughput
Use result.timing to inspect per-frame preprocessing, inference, and postprocessing times. Enable timing in the model specification if needed.
Example
from degirum_tools import ModelSpec, predict_stream, remote_assets
model_spec = ModelSpec(
model_name="yolov8n_coco--640x640_quant_axelera_metis_1",
zoo_url="degirum/axelera",
inference_host_address="@local",
model_properties={
"device_type": ["AXELERA/METIS"],
"postprocess": {"timing": {"enable": True}},
},
)
model = model_spec.load_model()
for index, result in enumerate(predict_stream(model, remote_assets.traffic), start=1):
if result.timing:
print(f"Frame {index} timing: {result.timing}")
else:
print("Timing disabled — set model_spec.model_properties['postprocess']['timing']")
if index >= 3:
breakExample output:
Frame 1 timing: {'preprocess': 2.91, 'inference': 34.77, 'postprocess': 4.12}
Frame 2 timing: {'preprocess': 2.85, 'inference': 34.63, 'postprocess': 4.05}
Frame 3 timing: {'preprocess': 2.88, 'inference': 34.70, 'postprocess': 4.08}Last updated
Was this helpful?

