Streams
Last updated
Was this helpful?
Last updated
Was this helpful?
This module provides a streaming toolkit for building multi-threaded processing pipelines, where data (images, video frames, or arbitrary objects) flows through a series of processing blocks called gizmos. The toolkit allows you to:
Acquire or generate data from one or more sources (e.g., camera feeds, video files).
Process the data in a pipeline (possibly in parallel), chaining multiple gizmos together.
Optionally display or save the processed data, or feed it into AI inference models.
Orchestrate everything in a , which manages the lifecycle (threads) of all connected gizmos.
Stream:
Represents a queue of data items , such as frames from a camera or images from a directory.
Gizmos push (put
) data into Streams or read (get
) data from them.
Streams can optionally drop data (the oldest item) if they reach a specified maximum queue size, preventing pipeline bottlenecks.
Gizmo:
A gizmo is a discrete processing node in the pipeline.
Each gizmo runs in its own thread, pulling data from its input stream(s), processing it, and pushing results to its output stream(s).
Example gizmos include:
Video-sourcing gizmos that read frames from a webcam or file.
AI inference gizmos that run a model on incoming frames.
Video display or saving gizmos that show or store processed frames.
Gizmos that perform transformations (resizing, cropping, analyzing) on data.
Gizmos communicate via Streams. A gizmo output Stream can feed multiple downstream gizmos.
Gizmos keep a list of input streams that they are connected to.
Gizmos own their input streams.
Composition:
A is a container that holds and manages multiple gizmos (and their Streams).
Once gizmos are connected, you can call composition.start()
to begin processing. Each gizmo run()
method executes in a dedicated thread.
Call composition.stop()
to gracefully stop processing and wait for threads to finish.
StreamData and StreamMeta:
Each item in the pipeline is encapsulated by a object, which holds:
data
: The actual payload (e.g., an image array, a frame).
meta
: A object that can hold extra metadata from each gizmo (e.g., a detection result, timestamps, bounding boxes, etc.).
Gizmos can append to so that metadata accumulates across the pipeline.
Metadata Flow (StreamMeta):
How works:
itself is a container that can hold any number of "meta info" objects.
Each meta info object is "tagged" with one or more string tags, such as "dgt_video"
, "dgt_inference"
, etc.
You append new meta info by calling meta.append(my_info, [list_of_tags])
.
You can retrieve meta info objects by searching with meta.find("tag")
(returns all matches) or meta.find_last("tag")
(returns the most recent match).
Important: A gizmo generally clones (.clone()
) the incoming before appending its own metadata to avoid upstream side effects.
This design lets each gizmo add new metadata, while preserving what was provided by upstream gizmos.
High-Level Example:
A camera gizmo outputs frames with meta tagged "dgt_video"
containing properties like FPS, width, height, etc.
An AI inference gizmo downstream takes StreamData(data=frame, meta=...)
, runs inference, then:
Clones the metadata container.
Appends its inference results under the "dgt_inference"
tag.
If two AI gizmos run in series, both will append metadata with the same "dgt_inference"
tag. A later consumer can call meta.find("dgt_inference")
to get both sets of results or meta.find_last("dgt_inference")
to get the most recent result.
A simple pipeline might look like this:
Create your gizmos (e.g., VideoSourceGizmo
, VideoDisplayGizmo
, AI inference gizmos, etc.).
Connect them together using the >>
operator (or connect_to()
method) to form a processing graph.
E.g.:
(Optional) Wait for the pipeline to finish or perform other tasks. You can query statuses, queue sizes, or get partial results in real time.
Stop the pipeline when done.
Non-blocking vs Blocking: Streams can drop items if configured (allow_drop=True
) to handle real-time feeds.
Multiple Inputs or Outputs: Some gizmos can have multiple input streams and/or broadcast results to multiple outputs.
For practical code examples, see the dgstreams_demo.ipynb
notebook in the PySDKExamples.
load_composition(description, global_context=None, local_context=None)
The description can be provided as a JSON or YAML file path, a YAML string, or a Python dictionary
conforming to the JSON schema defined in composition_definition_schema
.
Parameters:
description
str or dict
required
global_context
dict
Global context for evaluating expressions (like using globals()). Defaults to None.
None
local_context
dict
Local context for evaluating expressions (like using locals()). Defaults to None.
None
Returns:
Composition
Initialize a with the top-level gizmo(s).
Start the to launch each gizmo in its own thread.
Error Handling: If any gizmo encounters an error, the can stop the whole pipeline, allowing you to handle exceptions centrally.
Load a of gizmos and connections from a description.
Text description of the in YAML format, or a path to a .json, .yaml, or .yml file containing such a description, or a Python dictionary with the same structure.
A object representing the described gizmo pipeline.