LogoLogo
AI HubCommunityWebsite
  • Start Here
  • AI Hub
    • Overview
    • Quickstart
    • Teams
    • Device Farm
    • Browser Inference
    • Model Zoo
      • Hailo
      • Intel
      • MemryX
      • BrainChip
      • Google
      • DeGirum
      • Rockchip
    • View and Create Model Zoos
    • Model Compiler
    • PySDK Integration
  • PySDK
    • Overview
    • Quickstart
    • Installation
    • Runtimes and Drivers
      • Hailo
      • OpenVINO
      • MemryX
      • BrainChip
      • Rockchip
      • ONNX
    • PySDK User Guide
      • Core Concepts
      • Organizing Models
      • Setting Up an AI Server
      • Loading an AI Model
      • Running AI Model Inference
      • Model JSON Structure
      • Command Line Interface
      • API Reference Guide
        • PySDK Package
        • Model Module
        • Zoo Manager Module
        • Postprocessor Module
        • AI Server Module
        • Miscellaneous Modules
      • Older PySDK User Guides
        • PySDK 0.16.1
        • PySDK 0.16.0
        • PySDK 0.15.2
        • PySDK 0.15.1
        • PySDK 0.15.0
        • PySDK 0.14.3
        • PySDK 0.14.2
        • PySDK 0.14.1
        • PySDK 0.14.0
        • PySDK 0.13.4
        • PySDK 0.13.3
        • PySDK 0.13.2
        • PySDK 0.13.1
        • PySDK 0.13.0
    • Release Notes
      • Retired Versions
    • EULA
  • DeGirum Tools
    • Overview
      • Streams
        • Streams Base
        • Streams Gizmos
      • Compound Models
      • Inference Support
      • Analyzers
        • Clip Saver
        • Event Detector
        • Line Count
        • Notifier
        • Object Selector
        • Object Tracker
        • Zone Count
  • DeGirumJS
    • Overview
    • Get Started
    • Understanding Results
    • Release Notes
    • API Reference Guides
      • DeGirumJS 0.1.3
      • DeGirumJS 0.1.2
      • DeGirumJS 0.1.1
      • DeGirumJS 0.1.0
      • DeGirumJS 0.0.9
      • DeGirumJS 0.0.8
      • DeGirumJS 0.0.7
      • DeGirumJS 0.0.6
      • DeGirumJS 0.0.5
      • DeGirumJS 0.0.4
      • DeGirumJS 0.0.3
      • DeGirumJS 0.0.2
      • DeGirumJS 0.0.1
  • Orca
    • Overview
    • Benchmarks
    • Unboxing and Installation
    • M.2 Setup
    • USB Setup
    • Thermal Management
    • Tools
  • Resources
    • External Links
Powered by GitBook

Get Started

  • AI Hub Quickstart
  • PySDK Quickstart
  • PySDK in Colab

Resources

  • AI Hub
  • Community
  • DeGirum Website

Social

  • LinkedIn
  • YouTube

Legal

  • PySDK EULA
  • Terms of Service
  • Privacy Policy

© 2025 DeGirum Corp.

On this page
  • Classes
  • StreamMeta
  • StreamData
  • Stream
  • Gizmo
  • Composition

Was this helpful?

  1. DeGirum Tools
  2. Overview
  3. Streams

Streams Base

PreviousStreamsNextStreams Gizmos

Last updated 11 days ago

Was this helpful?

This API Reference is based on DeGirum Tools version 0.16.6.

Classes

StreamMeta

StreamMeta

Stream metainfo class (metadata container).

Overview

  • A instance is a container that holds a chronologically ordered list of metainfo objects (called "meta infos") produced by gizmos in a streaming pipeline.

  • Each time a gizmo adds new metadata (e.g., inference results, resizing information), it is appended to the tail of this list.

  • The gizmo may associate the appended metadata with one or more tags, so that downstream gizmos or the user can retrieve specific metadata objects by those tags.

Appending and Tagging

  • To store new metadata, a gizmo calls self.meta.append(meta_obj, tags), where meta_obj is the metadata to attach, and tags is a string or list of strings labeling that metadata (e.g., "tag_inference", "tag_resize").

  • Internally, keeps track of a list of appended objects and a mapping of tags to the indices in that list.

Retrieving Metadata

  • You can retrieve all metadata objects tagged with a certain tag via find(tag), which returns a list of all matching objects in the order they were appended.

  • You can retrieve only the most recently appended object with find_last(tag).

  • For example, an inference gizmo might attach an inference result with the tag"tag_inference", so a downstream gizmo can do:inference_result = stream_data.meta.find_last("tag_inference").

  • If no metadata matches the requested tag, these methods return [] or None.

Modifications and Cloning

  • If you want to remove the most recent entry associated with a certain tag, call remove_last(tag) (occasionally useful in advanced pipeline scenarios).

Typical Usage

A typical processing pipeline might look like

  1. A resizing gizmo appends new dimension info under tag "Resize".

  2. An AI inference gizmo appends the inference result under tag "Inference".

  3. A display gizmo reads the final metadata to overlay bounding boxes, etc.

This incremental metadata accumulation is extremely flexible and allows each gizmo to contribute to a unified record of the data's journey.

Example:

# In a gizmo, produce meta and append:
data.meta.append({"new_width": 640, "new_height": 480}, "Resize")

# In a downstream gizmo:
resize_info = data.meta.find_last("Resize")
if resize_info:
    w, h = resize_info["new_width"], resize_info["new_height"]

Functions

__init__(meta=None, ...)

__init__(meta=None, tags=[])

Constructor.

Parameters:

Name
Type
Description
Default

meta

Any

Initial metainfo object. Defaults to None.

None

tags

Union[str, List[str]]

Tag or list of tags to associate with the initial metainfo object.

[]

append(meta, ...)

append(meta, tags=[])

Append a metainfo object to this StreamMeta.

Parameters:

Name
Type
Description
Default

meta

Any

The metainfo object to append.

required

tags

Union[str, List[str]]

Tag or list of tags to associate with the metainfo object.

[]

clone

clone()

Shallow clone this StreamMeta.

This creates a copy of the internal list and tags dictionary, but does not deep-copy the metainfo objects.

Returns:

Name
Type
Description

StreamMeta

A cloned StreamMeta instance.

find(tag)

find(tag)

Find metainfo objects by tag.

Parameters:

Name
Type
Description
Default

tag

str

The tag to search for.

required

Returns:

Type
Description

List[Any]

List[Any]: A list of metainfo objects that have the given tag (empty if none).

find_last(tag)

find_last(tag)

Find the last metainfo object with a given tag.

Parameters:

Name
Type
Description
Default

tag

str

The tag to search for.

required

Returns:

Name
Type
Description

Any

optional

The last metainfo object associated with the tag, or None if not found.

remove_last(tag)

remove_last(tag)

Remove the last metainfo object associated with a tag.

Parameters:

Name
Type
Description
Default

tag

str

The tag whose last associated metainfo object should be removed.

required

StreamData

StreamData

Single data element of the streaming pipeline.

Functions

__init__(data, ...)

__init__(data, meta=StreamMeta())

Constructor.

Parameters:

Name
Type
Description
Default

data

Any

The data payload.

required

meta

The metainfo associated with the data. Defaults to a new empty StreamMeta.

append_meta(meta, ...)

append_meta(meta, tags=[])

Parameters:

Name
Type
Description
Default

meta

Any

The metainfo object to append.

required

tags

List[str]

Tags to associate with this metainfo object. Defaults to [].

[]

Stream

Stream

Bases: Queue

Queue-based iterable stream with optional item drop.

Functions

__init__(maxsize=0, ...)

__init__(maxsize=0, allow_drop=False)

Constructor.

Parameters:

Name
Type
Description
Default

maxsize

int

Maximum stream depth (queue size); use 0 for unlimited depth. Defaults to 0.

0

allow_drop

bool

If True, allow dropping the oldest item when the stream is full on put(). Defaults to False.

False

Raises:

Type
Description

Exception

If maxsize is non-zero and less than min_queue_size.

__iter__

__iter__()

Return an iterator over the stream's items.

close

close()

Close the stream by inserting a poison pill.

put(item, ...)

put(item, block=True, timeout=None)

Put an item into the stream, with optional dropping.

If the stream is full and allow_drop is True, the oldest item will be removed to make room.

Parameters:

Name
Type
Description
Default

item

Any

The item to put.

required

block

bool

Whether to block if the stream is full (ignored if dropping is enabled). Defaults to True.

True

timeout

float

Timeout in seconds for the blocking put. Defaults to None (no timeout).

None

Gizmo

Gizmo

Bases: ABC

Base class for all gizmos (streaming pipeline processing blocks).

Each gizmo owns zero or more input streams that deliver data for processing (data-generating gizmos have no input stream).

A gizmo can be connected to other gizmos to receive data from them. One gizmo can broadcast data to multiple others (a single gizmo output feeding multiple destinations).

A data element moving through the pipeline is a tuple (data, meta) where:

  • data is the raw data (e.g., an image, a frame, or any object),

The run() implementation should:

  • Periodically check the _abort flag (set via abort()) to see if it should terminate.

  • Handle poison pills (Stream._poison) if they appear in the input streams, which signal "no more data."

Below is a minimal example similar to ResizingGizmo. This gizmo simply reads items from its single input, processes them, and sends results downstream until either _abort is set or the input stream is exhausted:

    def run(self):
        # For a single-input gizmo, iterate over all data in input #0
        for item in self.get_input(0):
            # If we were asked to abort, break out immediately
            if self._abort:
                break

            # item is a StreamData object: item.data is the image/frame, item.meta is the metadata
            input_image = item.data

            # 1) Do the resizing (your logic can use OpenCV, PIL, etc.)
            resized_image = do_resize(input_image, width=640, height=480)
            # 'do_resize' is just a placeholder; you'd implement your own resizing function.

            # 2) Update the metadata
            #    - Clone the existing metadata first.
            #    - In Python all objects are passed by reference, so if you do not clone but try A >> B and A >> C, C will receive the meta object modified by B.
            out_meta = item.meta.clone()
            out_meta.append(
                {
                    "frame_width": 640,
                    "frame_height": 480,
                    "method": "your_resize_method"
                },
                tags=self.get_tags()
            )

            # 3) Send the processed item downstream
            self.send_result(StreamData(resized_image, out_meta))

Notes

  • If your gizmo has multiple inputs, you can call self.get_input(i) for each input or iterate overself.get_inputs() if you need to merge or synchronize multiple streams.

  • Always check _abort periodically inside your main loop if your gizmo could run for a long time or block on I/O.

  • If, instead of self.get_input(0), you use self.get_input(0).get() or .get_nowait(), you must check if you receive a poison pill.

    • In simple loops, self.get_input(0) will terminate the loop.

    • In multi-input gizmos where simple nested for-loops aren't usable, get_nowait() is typically used to read input streams.

    • This way the gizmo code may query all inputs on a non-blocking manner and properly terminate loops.

Functions

__getitem__(index)

__getitem__(index)

Enable gizmo[index] syntax for specifying connections.

Returns a tuple (self, input_stream) which can be used on the right side of the >> operator for connecting gizmos (e.g., source_gizmo >> target_gizmo[index]).

Parameters:

Name
Type
Description
Default

index

int

The input stream index on this gizmo.

required

Returns:

Type
Description

tuple: A tuple of (this gizmo, the Stream at the given input index).

__init__(input_stream_sizes=[])

__init__(input_stream_sizes=[])

Constructor.

Parameters:

Name
Type
Description
Default

input_stream_sizes

List[tuple]

List of (maxsize, allow_drop) tuples for each input stream. Use an empty list for no inputs. Each tuple defines the input stream's depth (0 means unlimited) and whether dropping is allowed.

[]

__rshift__(other_gizmo)

__rshift__(other_gizmo)

Connect another gizmo to this gizmo using the >> operator.

This implements the right-shift operator, allowing syntax like source >> target or source >> target[input_index].

Parameters:

Name
Type
Description
Default

other_gizmo

Either a Gizmo to connect (assumes input 0), or a tuple (gizmo, inp) where inp is the input index or Stream of that gizmo.

required

Returns:

Name
Type
Description

Gizmo

The source gizmo (other_gizmo), enabling chaining of connections.

abort(abort=True)

abort(abort=True)

Set or clear the abort flag to stop the run loop.

Parameters:

Name
Type
Description
Default

abort

bool

True to request aborting the run loop, False to clear the abort signal.

True

connect_to(other_gizmo, ...)

connect_to(other_gizmo, inp=0)

Connect an input stream of this gizmo to another gizmo's output.

Parameters:

Name
Type
Description
Default

other_gizmo

The source gizmo to connect from.

required

inp

The input index of this gizmo (or an input Stream) to use for the connection. Defaults to 0.

0

Returns:

Name
Type
Description

Gizmo

This gizmo (to allow chaining).

get_connected

get_connected()

Recursively gather all gizmos connected to this gizmo.

Returns:

Name
Type
Description

set

set

A set of Gizmo objects that are connected (directly or indirectly) to this gizmo.

get_input(inp)

get_input(inp)

Get a specific input stream by index.

Parameters:

Name
Type
Description
Default

inp

int

Index of the input stream to retrieve.

required

Returns:

Name
Type
Description

Stream

The input stream at the given index.

Raises:

Type
Description

Exception

If the requested input index does not exist.

get_inputs

get_inputs()

Get all input streams of this gizmo.

Returns:

Type
Description

List[Stream]: List of input stream objects.

run

run()

abstractmethod

Run the gizmo's processing loop.

This method should retrieve data from input streams (if any), process it, and send results to outputs. Subclasses implement this method to define the gizmo's behavior.

Important guidelines for implementation

  • Check self._abort periodically and exit the loop if it becomes True.

  • If reading from an input stream via get() or get_nowait(), check for the poison pill (Stream._poison). If encountered, exit the loop.

  • For example, a typical single-input loop could be:

for data in self.get_input(0):
    if self._abort:
        break
    result = self.process(data)
    self.send_result(result)

send_result(data)

send_result(data)

Send a result to all connected output streams.

Parameters:

Name
Type
Description
Default

data

The data result to send. If None (or a poison pill) is provided, all connected outputs will be closed.

required

Composition

Composition

Orchestrates and runs a set of connected gizmos.

Usage

  1. Add all gizmos to the composition using add() or by calling the composition instance.

  2. Connect the gizmos together using connect_to() or the >> operator.

  3. Start the execution by calling start().

  4. To stop the execution, call stop() (or use the composition as a context manager).

Functions

__call__(gizmo)

__call__(gizmo)

Add a gizmo to this composition (callable syntax).

Equivalent to calling add(gizmo).

Parameters:

Name
Type
Description
Default

gizmo

The gizmo to add.

required

Returns:

Name
Type
Description

Gizmo

The same gizmo.

__exit__(exc_type, ...)

__exit__(exc_type, exc_val, exc_tb)

On exiting a context, wait for all gizmos to finish (and raise any errors).

Automatically calls wait() to ensure all threads have completed.

__init__(*gizmos)

__init__(*gizmos)

Initialize the composition with optional initial gizmos.

Parameters:

Name
Type
Description
Default

*gizmos

Optional gizmos (or iterables of gizmos) to add initially. If a Gizmo is provided, all gizmos connected to it (including itself) are added. If an iterator of gizmos is provided, all those gizmos (and their connected gizmos) are added.

()

get_bottlenecks

get_bottlenecks()

Get gizmos that experienced input queue bottlenecks in the last run.

For this to be meaningful, the composition must have been started with detect_bottlenecks=True.

Returns:

Type
Description

List[dict]

A list of dictionaries where each key is a gizmo name and the value is the number of frames dropped for that gizmo.

get_current_queue_sizes

get_current_queue_sizes()

Get current sizes of each gizmo's input queues. Can be used to analyze deadlocks.

Returns:

Type
Description

List[dict]

A list of dictionaries where each key is a gizmo name and the value is a list containing the gizmo's result count followed by the size of each of its input queues.

request_stop

request_stop()

Signal all gizmos in this composition to stop (abort).

This sets each gizmo's abort flag, clears all remaining items from their input queues, and sends poison pills to unblock any waiting gets. This method does not wait for threads to finish; call wait() to join threads.

stop

stop()

Stop the composition by aborting all gizmos and waiting for all threads to finish.

wait

wait()

Wait for all gizmo threads in the composition to finish.

Raises:

Type
Description

Exception

If the composition has not been started, or if any gizmo raised an error during execution (the exception message will contain details).

Important: Never modify a received or its stored objects in-place, because it may create side effects for upstream components. Call clone() if you need to make changes.clone() creates a shallow copy of the metainfo list and a copy of the tag-index map.

A video source gizmo creates a new , appends frame info under tag "Video".

CAUTION: Never modify the existing metadata objects in place. If you need to adapt previously stored metadata for your own use, first copy the data structure or call clone() on the .

()

Append an additional metainfo object to this 's metadata.

meta is a object containing accumulated metadata.

Subclasses must implement the abstract run() method to define a gizmo processing loop. The run() method is launched in a separate thread by the and should run until no more data is available or until an abort signal is set.

When done, you do not need to manually send poison pills; the handles closing any downstream streams once each gizmo run() completes.

(, )

or tuple

int or

List[]

There is no need to send a poison pill to outputs; the will handle closing output streams.

or None

or Iterator[]

StreamMeta
StreamMeta
StreamMeta
StreamMeta
StreamMeta
StreamData
StreamMeta
Composition
Composition
Composition
StreamMeta
StreamMeta
StreamMeta
Gizmo
Stream
Gizmo
Gizmo
Gizmo
Stream
Gizmo
Stream
Stream
StreamData
Gizmo
Gizmo
Gizmo
Gizmo