Streams

DeGirum Tools API Reference Guide. Streaming toolkit for building multi-threaded pipelines.

This API Reference is based on DeGirum Tools version 0.18.0.

Streaming Toolkit Overview

This module provides a streaming toolkit for building multi-threaded processing pipelines, where data (images, video frames, or arbitrary objects) flows through a series of processing blocks called gizmos. The toolkit allows you to:

  • Acquire or generate data from one or more sources (e.g., camera feeds, video files).

  • Process the data in a pipeline (possibly in parallel), chaining multiple gizmos together.

  • Optionally display or save the processed data, or feed it into AI inference models.

  • Orchestrate everything in a Composition, which manages the lifecycle (threads) of all connected gizmos.

Core Concepts

  1. Stream:

    • Represents a queue of data items StreamData, such as frames from a camera or images from a directory.

    • Gizmos push (put) data into Streams or read (get) data from them.

    • Streams can optionally drop data (the oldest item) if they reach a specified maximum queue size, preventing pipeline bottlenecks.

  2. Gizmo:

    • A gizmo is a discrete processing node in the pipeline.

    • Each gizmo runs in its own thread, pulling data from its input stream(s), processing it, and pushing results to its output stream(s).

    • Example gizmos include:

      • Video-sourcing gizmos that read frames from a webcam or file.

      • AI inference gizmos that run a model on incoming frames.

      • Video display or saving gizmos that show or store processed frames.

      • Gizmos that perform transformations (resizing, cropping, analyzing) on data.

    • Gizmos communicate via Streams. A gizmo output Stream can feed multiple downstream gizmos.

    • Gizmos keep a list of input streams that they are connected to.

    • Gizmos own their input streams.

  3. Composition:

    • A Composition is a container that holds and manages multiple gizmos (and their Streams).

    • Once gizmos are connected, you can call composition.start() to begin processing. Each gizmo run() method executes in a dedicated thread.

    • Call composition.stop() to gracefully stop processing and wait for threads to finish.

  4. StreamData and StreamMeta:

    • Each item in the pipeline is encapsulated by a StreamData object, which holds:

      • data: The actual payload (e.g., an image array, a frame).

      • meta: A StreamMeta object that can hold extra metadata from each gizmo (e.g., a detection result, timestamps, bounding boxes, etc.).

        • Gizmos can append to StreamMeta so that metadata accumulates across the pipeline.

  5. Metadata Flow (StreamMeta):

    • How StreamMeta works:

      • StreamMeta itself is a container that can hold any number of "meta info" objects.

      • Each meta info object is "tagged" with one or more string tags, such as "dgt_video", "dgt_inference", etc.

      • You append new meta info by calling meta.append(my_info, [list_of_tags]).

      • You can retrieve meta info objects by searching with meta.find("tag") (returns all matches) or meta.find_last("tag") (returns the most recent match).

      • Important: A gizmo generally clones (.clone()) the incoming StreamMeta before appending its own metadata to avoid upstream side effects.

      • This design lets each gizmo add new metadata, while preserving what was provided by upstream gizmos.

    • High-Level Example:

      • A camera gizmo outputs frames with meta tagged "dgt_video" containing properties like FPS, width, height, etc.

      • An AI inference gizmo downstream takes StreamData(data=frame, meta=...), runs inference, then:

        1. Clones the metadata container.

        2. Appends its inference results under the "dgt_inference" tag.

      • If two AI gizmos run in series, both will append metadata with the same "dgt_inference" tag. A later consumer can call meta.find("dgt_inference") to get both sets of results or meta.find_last("dgt_inference") to get the most recent result.

Basic Usage Example

A simple pipeline might look like this:

import degirum as dg
from degirum_tools.streams import Composition
from degirum_tools.streams_gizmos import VideoSourceGizmo, VideoDisplayGizmo
import cv2
import time

# Create gizmos. If you are on a laptop or have a webcam attached, VideoSourceGizmo(0) will typically create a gizmo that uses your camera as a video source.
video_source = VideoSourceGizmo(0)
video_display = VideoDisplayGizmo("Camera Preview")

# Connect them
video_source >> video_display

# Build composition
comp = Composition(video_source, video_display)
comp.start(wait=False)  # Don't block main thread

start_time = time.time()
while time.time() - start_time < 10:  # Run for 10 seconds
    cv2.waitKey(5)  # Wait time of 5 ms. Let OpenCV handle window events

comp.stop()
cv2.destroyAllWindows()

Key Steps

  1. Create your gizmos (e.g., VideoSourceGizmo, VideoDisplayGizmo, AI inference gizmos, etc.).

  2. Connect them together using the >> operator (or connect_to() method) to form a processing graph. E.g.:

    source >> processor >> sink
    
  3. Initialize a Composition with the top-level gizmo(s).

  4. Start the Composition to launch each gizmo in its own thread.

  5. (Optional) Wait for the pipeline to finish or perform other tasks. You can query statuses, queue sizes, or get partial results in real time.

  6. Stop the pipeline when done.

Advanced Topics

  • Non-blocking vs Blocking: Streams can drop items if configured (allow_drop=True) to handle real-time feeds.

  • Multiple Inputs or Outputs: Some gizmos can have multiple input streams and/or broadcast results to multiple outputs.

  • Error Handling: If any gizmo encounters an error, the Composition can stop the whole pipeline, allowing you to handle exceptions centrally.

For practical code examples, see the dgstreams_demo.ipynb notebook in the PySDKExamples.

Functions

load_composition(description, ...)

load_composition(description, global_context=None, local_context=None)

Load a Composition of gizmos and connections from a description.

The description can be provided as a JSON/YAML file path, a YAML string, or a Python dictionary conforming to the JSON schema defined in composition_definition_schema.

Composition Description Schema (YAML):

type: object
additionalProperties: false
required: [gizmos, connections]
properties:
    vars:
        type: object
        description: The collection of variables, keyed by variable name
        additionalProperties: false
        patternProperties:
            "^[a-zA-Z_][a-zA-Z0-9_]*$":
                oneOf:
                  - type: [string, number, boolean, array]
                    description: The variable value; can be $(expression) to evaluate
                  - type: object
                    description: The only object key is the class name to instantiate; value are the constructor parameters
                    additionalProperties: false
                    minProperties: 1
                    maxProperties: 1
                    patternProperties:
                        "^[a-zA-Z_][a-zA-Z0-9_.]*$":
                            type: object
                            description: The constructor parameters of the object
                            additionalProperties: true
    gizmos:
        type: object
        description: The collection of gizmos, keyed by gizmo instance name
        additionalProperties: false
        patternProperties:
            "^[a-zA-Z_][a-zA-Z0-9_]*$":
                oneOf:
                  - type: string
                    description: The gizmo class name to instantiate (if no parameters are needed)
                  - type: object
                    description: The only object key is the class name to instantiate; value are the constructor parameters
                    additionalProperties: false
                    minProperties: 1
                    maxProperties: 1
                    patternProperties:
                        "^[a-zA-Z_][a-zA-Z0-9_.]*$":
                            type: object
                            description: The constructor parameters of the gizmo
                            additionalProperties: true
    connections:
        type: array
        description: The list of connections between gizmos
        items:
            type: array
            description: The connection between gizmos
            items:
                oneOf:
                    - type: string
                    - type: array
                      description: Gizmo with input index
                      prefixItems:
                        - type: string
                        - type: number
                      items: false

Parameters:

Name
Type
Description
Default

description

str or dict

A YAML string or dict describing the Composition, or a path to a .json/.yaml file containing the description.

required

global_context

dict

Global context for evaluating expressions in the description (variables, etc.). Defaults to None.

None

local_context

dict

Local context for evaluating expressions. Defaults to None.

None

Returns:

Name
Type
Description

Composition

Composition

A Composition object representing the described gizmo pipeline.

Last updated

Was this helpful?