Streams Base
Last updated
Was this helpful?
Last updated
Was this helpful?
StreamMeta
Stream metainfo class (metadata container).
Overview
A instance is a container that holds a chronologically ordered list of metainfo objects (called "meta infos") produced by gizmos in a streaming pipeline.
Each time a gizmo adds new metadata (e.g., inference results, resizing information), it is appended to the tail of this list.
The gizmo may associate the appended metadata with one or more tags, so that downstream gizmos or the user can retrieve specific metadata objects by those tags.
Appending and Tagging
To store new metadata, a gizmo calls self.meta.append(meta_obj, tags)
,
where meta_obj
is the metadata to attach, and tags
is a string or list of strings
labeling that metadata (e.g., "tag_inference", "tag_resize").
Internally, keeps track of a list of appended objects and a mapping of tags to the indices in that list.
Retrieving Metadata
You can retrieve all metadata objects tagged with a certain tag via find(tag)
,
which returns a list of all matching objects in the order they were appended.
You can retrieve only the most recently appended object with find_last(tag)
.
For example, an inference gizmo might attach an inference result with the tag"tag_inference"
, so a downstream gizmo can do:inference_result = stream_data.meta.find_last("tag_inference")
.
If no metadata matches the requested tag, these methods return []
or None
.
Modifications and Cloning
If you want to remove the most recent entry associated with a certain tag,
call remove_last(tag)
(occasionally useful in advanced pipeline scenarios).
Typical Usage
A typical processing pipeline might look like
A resizing gizmo appends new dimension info under tag "Resize"
.
An AI inference gizmo appends the inference result under tag "Inference"
.
A display gizmo reads the final metadata to overlay bounding boxes, etc.
This incremental metadata accumulation is extremely flexible and allows each gizmo to contribute to a unified record of the data's journey.
Example:
__init__(meta=None, ...)
__init__(meta=None, tags=[])
Constructor.
Parameters:
meta
Any
Initial metainfo object. Defaults to None.
None
tags
Union[str, List[str]]
Tag or list of tags to associate with the initial metainfo object.
[]
append(meta, ...)
append(meta, tags=[])
Append a metainfo object to this StreamMeta.
Parameters:
meta
Any
The metainfo object to append.
required
tags
Union[str, List[str]]
Tag or list of tags to associate with the metainfo object.
[]
clone
clone()
Shallow clone this StreamMeta.
This creates a copy of the internal list and tags dictionary, but does not deep-copy the metainfo objects.
Returns:
StreamMeta
A cloned StreamMeta instance.
find(tag)
find(tag)
Find metainfo objects by tag.
Parameters:
tag
str
The tag to search for.
required
Returns:
List[Any]
List[Any]: A list of metainfo objects that have the given tag (empty if none).
find_last(tag)
find_last(tag)
Find the last metainfo object with a given tag.
Parameters:
tag
str
The tag to search for.
required
Returns:
Any
optional
The last metainfo object associated with the tag, or None if not found.
remove_last(tag)
remove_last(tag)
Remove the last metainfo object associated with a tag.
Parameters:
tag
str
The tag whose last associated metainfo object should be removed.
required
StreamData
Single data element of the streaming pipeline.
__init__(data, ...)
__init__(data, meta=StreamMeta())
Constructor.
Parameters:
data
Any
The data payload.
required
meta
The metainfo associated with the data. Defaults to a new empty StreamMeta.
append_meta(meta, ...)
append_meta(meta, tags=[])
Parameters:
meta
Any
The metainfo object to append.
required
tags
List[str]
Tags to associate with this metainfo object. Defaults to [].
[]
Stream
Bases: Queue
Queue-based iterable stream with optional item drop.
__init__(maxsize=0, ...)
__init__(maxsize=0, allow_drop=False)
Constructor.
Parameters:
maxsize
int
Maximum stream depth (queue size); use 0 for unlimited depth. Defaults to 0.
0
allow_drop
bool
If True, allow dropping the oldest item when the stream is full on put(). Defaults to False.
False
Raises:
Exception
If maxsize is non-zero and less than min_queue_size
.
__iter__
__iter__()
Return an iterator over the stream's items.
close
close()
Close the stream by inserting a poison pill.
put(item, ...)
put(item, block=True, timeout=None)
Put an item into the stream, with optional dropping.
If the stream is full and allow_drop
is True, the oldest item will be removed to make room.
Parameters:
item
Any
The item to put.
required
block
bool
Whether to block if the stream is full (ignored if dropping is enabled). Defaults to True.
True
timeout
float
Timeout in seconds for the blocking put. Defaults to None (no timeout).
None
Gizmo
Bases: ABC
Base class for all gizmos (streaming pipeline processing blocks).
Each gizmo owns zero or more input streams that deliver data for processing (data-generating gizmos have no input stream).
A gizmo can be connected to other gizmos to receive data from them. One gizmo can broadcast data to multiple others (a single gizmo output feeding multiple destinations).
A data element moving through the pipeline is a tuple (data, meta)
where:
data
is the raw data (e.g., an image, a frame, or any object),
The run()
implementation should:
Periodically check the _abort
flag (set via abort()
) to see if it should terminate.
Handle poison pills (Stream._poison
) if they appear in the input streams, which signal "no more data."
Below is a minimal example similar to ResizingGizmo
. This gizmo simply reads items from its single input,
processes them, and sends results downstream until either _abort
is set or the input stream is exhausted:
Notes
If your gizmo has multiple inputs, you can call self.get_input(i)
for each input or iterate overself.get_inputs()
if you need to merge or synchronize multiple streams.
Always check _abort
periodically inside your main loop if your gizmo could run for a long time or block on I/O.
If, instead of self.get_input(0)
, you use self.get_input(0).get()
or .get_nowait()
, you must check if you receive a poison pill.
In simple loops, self.get_input(0)
will terminate the loop.
In multi-input gizmos where simple nested for-loops aren't usable, get_nowait() is typically used to read input streams.
This way the gizmo code may query all inputs on a non-blocking manner and properly terminate loops.
__getitem__(index)
__getitem__(index)
Enable gizmo[index]
syntax for specifying connections.
Returns a tuple (self, input_stream)
which can be used on the right side of the >>
operator
for connecting gizmos (e.g., source_gizmo >> target_gizmo[index]
).
Parameters:
index
int
The input stream index on this gizmo.
required
Returns:
(Gizmo, Stream)
tuple: A tuple of (this gizmo, the Stream at the given input index).
__init__(input_stream_sizes=[])
__init__(input_stream_sizes=[])
Constructor.
Parameters:
input_stream_sizes
List[tuple]
List of (maxsize, allow_drop) tuples for each input stream. Use an empty list for no inputs. Each tuple defines the input stream's depth (0 means unlimited) and whether dropping is allowed.
[]
__rshift__(other_gizmo)
__rshift__(other_gizmo)
Connect another gizmo to this gizmo using the >>
operator.
This implements the right-shift operator, allowing syntax like source >> target
or source >> target[input_index]
.
Parameters:
other_gizmo
Either a Gizmo to connect (assumes input 0), or a tuple (gizmo, inp)
where inp
is the input index or Stream of that gizmo.
required
Returns:
Gizmo
The source gizmo (other_gizmo), enabling chaining of connections.
abort(abort=True)
abort(abort=True)
Set or clear the abort flag to stop the run loop.
Parameters:
abort
bool
True to request aborting the run loop, False to clear the abort signal.
True
connect_to(other_gizmo, ...)
connect_to(other_gizmo, inp=0)
Connect an input stream of this gizmo to another gizmo's output.
Parameters:
other_gizmo
The source gizmo to connect from.
required
inp
`int or Stream
The input index of this gizmo (or an input Stream) to use for the connection. Defaults to 0.
0
Returns:
Gizmo
This gizmo (to allow chaining).
get_connected
get_connected()
Recursively gather all gizmos connected to this gizmo.
Returns:
set
set
A set of Gizmo objects that are connected (directly or indirectly) to this gizmo.
get_input(inp)
get_input(inp)
Get a specific input stream by index.
Parameters:
inp
int
Index of the input stream to retrieve.
required
Returns:
Stream
The input stream at the given index.
Raises:
Exception
If the requested input index does not exist.
get_inputs
get_inputs()
Get all input streams of this gizmo.
Returns:
List[Stream]
List[Stream]: List of input stream objects.
run
run()abstractmethod
Run the gizmo's processing loop.
This method should retrieve data from input streams (if any), process it, and send results to outputs. Subclasses implement this method to define the gizmo's behavior.
Important guidelines for implementation
Check self._abort
periodically and exit the loop if it becomes True.
If reading from an input stream via get()
or get_nowait()
, check for the poison pill (Stream._poison
). If encountered, exit the loop.
For example, a typical single-input loop could be:
send_result(data)
send_result(data)
Send a result to all connected output streams.
Parameters:
data
The data result to send. If None (or a poison pill) is provided, all connected outputs will be closed.
required
Composition
Orchestrates and runs a set of connected gizmos.
Usage
Add all gizmos to the composition using add()
or by calling the composition instance.
Connect the gizmos together using connect_to()
or the >>
operator.
Start the execution by calling start()
.
To stop the execution, call stop()
(or use the composition as a context manager).
__call__(gizmo)
__call__(gizmo)
Add a gizmo to this composition (callable syntax).
Equivalent to calling add(gizmo)
.
Parameters:
gizmo
The gizmo to add.
required
Returns:
Gizmo
The same gizmo.
__exit__(exc_type, ...)
__exit__(exc_type, exc_val, exc_tb)
On exiting a context, wait for all gizmos to finish (and raise any errors).
Automatically calls wait()
to ensure all threads have completed.
__init__(*gizmos)
__init__(*gizmos)
Initialize the composition with optional initial gizmos.
Parameters:
*gizmos
Optional gizmos (or iterables of gizmos) to add initially. If a Gizmo is provided, all gizmos connected to it (including itself) are added. If an iterator of gizmos is provided, all those gizmos (and their connected gizmos) are added.
()
get_bottlenecks
get_bottlenecks()
Get gizmos that experienced input queue bottlenecks in the last run.
For this to be meaningful, the composition must have been started with detect_bottlenecks=True
.
Returns:
List[dict]
A list of dictionaries where each key is a gizmo name and the value is the number of frames dropped for that gizmo.
get_current_queue_sizes
get_current_queue_sizes()
Get current sizes of each gizmo's input queues. Can be used to analyze deadlocks.
Returns:
List[dict]
A list of dictionaries where each key is a gizmo name and the value is a list containing the gizmo's result count followed by the size of each of its input queues.
request_stop
request_stop()
Signal all gizmos in this composition to stop (abort).
This sets each gizmo's abort flag, clears all remaining items from their input queues, and sends poison pills to unblock any waiting gets. This method does not wait for threads to finish; call wait()
to join threads.
stop
stop()
Stop the composition by aborting all gizmos and waiting for all threads to finish.
wait
wait()
Wait for all gizmo threads in the composition to finish.
Raises:
Exception
If the composition has not been started, or if any gizmo raised an error during execution (the exception message will contain details).
Important: Never modify a received or its stored objects in-place,
because it may create side effects for upstream components.
Call clone()
if you need to make changes.clone()
creates a shallow copy of the metainfo list and a copy of the tag-index map.
A video source gizmo creates a new , appends frame info under tag "Video"
.
CAUTION:
Never modify the existing metadata objects in place. If you need to
adapt previously stored metadata for your own use, first copy the
data structure or call clone()
on the .
()`
Append an additional metainfo object to this 's metadata.
meta
is a object containing accumulated metadata.
Subclasses must implement the abstract run()
method to define a gizmo processing loop. The run()
method is launched in a separate thread by the and should run until no more data is available
or until an abort signal is set.
When done, you do not need to manually send poison pills; the handles closing any downstream streams once each gizmo run()
completes.
or tuple`
There is no need to send a poison pill to outputs; the will handle closing output streams.
or None`
or Iterator[]