# Streams

{% hint style="info" %}
This API Reference is based on DeGirum Tools version 1.2.0.
{% endhint %}

## Streaming Toolkit Overview <a href="#streaming-toolkit-overview" id="streaming-toolkit-overview"></a>

This module provides a streaming toolkit for building multi-threaded processing pipelines, where data (images, video frames, or arbitrary objects) flows through a series of *processing blocks* called gizmos. The toolkit allows you to:

* Acquire or generate data from one or more sources (e.g., camera feeds, video files).
* Process the data in a pipeline (possibly in parallel), chaining multiple gizmos together.
* Optionally display or save the processed data, or feed it into AI inference models.
* Orchestrate everything in a [Composition](/degirum-tools/streams/streams_base.md#composition), which manages the lifecycle (threads) of all connected gizmos.

### Core Concepts <a href="#core-concepts" id="core-concepts"></a>

1. **Stream**:
   * Represents a queue of data items [StreamData](/degirum-tools/streams/streams_base.md#streamdata), such as frames from a camera or images from a directory.
   * Gizmos push (`put`) data into Streams or read (`get`) data from them.
   * Streams can optionally drop data (the oldest item) if they reach a specified maximum queue size, preventing pipeline bottlenecks.
2. **Gizmo**:
   * A gizmo is a discrete processing node in the pipeline.
   * Each gizmo runs in its own thread, pulling data from its input stream(s), processing it, and pushing results to its output stream(s).
   * Example gizmos include:
     * Video-sourcing gizmos that read frames from a webcam or file.
     * AI inference gizmos that run a model on incoming frames.
     * Video display or saving gizmos that show or store processed frames.
     * Gizmos that perform transformations (resizing, cropping, analyzing) on data.
   * Gizmos communicate via Streams. A gizmo output Stream can feed multiple downstream gizmos.
   * Gizmos keep a list of input streams that they are connected to.
   * Gizmos own their input streams.
3. **Composition**:
   * A [Composition](/degirum-tools/streams/streams_base.md#composition) is a container that holds and manages multiple gizmos (and their Streams).
   * Once gizmos are connected, you can call `composition.start()` to begin processing. Each gizmo `run()` method executes in a dedicated thread.
   * Call `composition.stop()` to gracefully stop processing and wait for threads to finish.
4. **StreamData** and **StreamMeta**:
   * Each item in the pipeline is encapsulated by a [StreamData](/degirum-tools/streams/streams_base.md#streamdata) object, which holds:
     * `data`: The actual payload (e.g., an image array, a frame).
     * `meta`: A [StreamMeta](/degirum-tools/streams/streams_base.md#streammeta) object that can hold extra metadata from each gizmo (e.g., a detection result, timestamps, bounding boxes, etc.).
       * Gizmos can append to [StreamMeta](/degirum-tools/streams/streams_base.md#streammeta) so that metadata accumulates across the pipeline.
5. **Metadata Flow (StreamMeta)**:
   * How [StreamMeta](/degirum-tools/streams/streams_base.md#streammeta) works:
     * [StreamMeta](/degirum-tools/streams/streams_base.md#streammeta) itself is a container that can hold any number of "meta info" objects.
     * Each meta info object is "tagged" with one or more string tags, such as `"dgt_video"`, `"dgt_inference"`, etc.
     * You append new meta info by calling `meta.append(my_info, [list_of_tags])`.
     * You can retrieve meta info objects by searching with `meta.find("tag")` (returns *all* matches) or `meta.find_last("tag")` (returns the *most recent* match).
     * **Important**: A gizmo generally clones (`.clone()`) the incoming [StreamMeta](/degirum-tools/streams/streams_base.md#streammeta) before appending its own metadata to avoid upstream side effects.
     * This design lets each gizmo add new metadata, while preserving what was provided by upstream gizmos.
   * High-Level Example:
     * A camera gizmo outputs frames with meta tagged `"dgt_video"` containing properties like FPS, width, height, etc.
     * An AI inference gizmo downstream takes `StreamData(data=frame, meta=...)`, runs inference, then:
       1. Clones the metadata container.
       2. Appends its inference results under the `"dgt_inference"` tag.
     * If *two* AI gizmos run in series, both will append metadata with the same `"dgt_inference"` tag. A later consumer can call `meta.find("dgt_inference")` to get both sets of results or `meta.find_last("dgt_inference")` to get the most recent result.

### Basic Usage Example <a href="#basic-usage-example" id="basic-usage-example"></a>

A simple pipeline might look like this:

{% code overflow="wrap" %}

```python
import time
import cv2
import degirum_tools.streams as streams

# Create gizmos. If you are on a laptop or have a webcam attached,
# VideoSourceGizmo(0) will typically select the default camera.
video_source = streams.VideoSourceGizmo(0)
video_display = streams.VideoDisplayGizmo("Camera Preview")

# Connect them
video_source >> video_display

# Build composition
composition = streams.Composition(video_source, video_display)
composition.start(wait=False)  # Don't block main thread

try:
    start_time = time.time()
    while time.time() - start_time < 10:  # Run for 10 seconds
        cv2.waitKey(5)  # Let OpenCV handle window events
finally:
    composition.stop()
    cv2.destroyAllWindows()
```

{% endcode %}

### Key Steps <a href="#key-steps" id="key-steps"></a>

1. **Create** your gizmos (e.g., `VideoSourceGizmo`, `VideoDisplayGizmo`, AI inference gizmos, etc.).
2. **Connect** them together using the `>>` operator (or `connect_to()` method) to form a processing graph. E.g.:

{% code overflow="wrap" %}

```
   source >> processor >> sink
```

{% endcode %}

3. **Initialize** a [Composition](/degirum-tools/streams/streams_base.md#composition) with the top-level gizmo(s).
4. **Start** the [Composition](/degirum-tools/streams/streams_base.md#composition) to launch each gizmo in its own thread.
5. (Optional) **Wait** for the pipeline to finish or perform other tasks. You can query statuses, queue sizes, or get partial results in real time.
6. **Stop** the pipeline when done.

### Advanced Topics <a href="#advanced-topics" id="advanced-topics"></a>

* **Non-blocking vs Blocking**: Streams can drop items if configured (`allow_drop=True`) to handle real-time feeds.
* **Multiple Inputs or Outputs**: Some gizmos can have multiple input streams and/or broadcast results to multiple outputs.
* **Error Handling**: If any gizmo encounters an error, the [Composition](/degirum-tools/streams/streams_base.md#composition) can stop the whole pipeline, allowing you to handle exceptions centrally.

For practical code examples, see the `dgstreams_demo.ipynb` notebook in the PySDKExamples.

## Functions <a href="#functions" id="functions"></a>

#### load\_composition(description, ...) <a href="#load_composition" id="load_composition"></a>

`load_composition(description, global_context=None, local_context=None)`

Load a [Composition](/degirum-tools/streams/streams_base.md#composition) of gizmos and connections from a description.

The description can be provided as a JSON/YAML file path, a YAML string, or a Python dictionary conforming to the JSON schema defined in `composition_definition_schema`.

Composition Description Schema (YAML):

{% code overflow="wrap" %}

```yaml
type: object
additionalProperties: false
required: [gizmos, connections]
properties:
    vars:
        type: object
        description: The collection of variables, keyed by variable name
        additionalProperties: false
        patternProperties:
            "^[a-zA-Z_][a-zA-Z0-9_]*$":
                oneOf:
                  - type: [string, number, boolean, array]
                    description: The variable value; can be $(expression) to evaluate
                  - type: object
                    description: The only object key is the class name to instantiate; value are the constructor parameters
                    additionalProperties: false
                    minProperties: 1
                    maxProperties: 1
                    patternProperties:
                        "^[a-zA-Z_][a-zA-Z0-9_.]*$":
                            type: object
                            description: The constructor parameters of the object
                            additionalProperties: true
    gizmos:
        type: object
        description: The collection of gizmos, keyed by gizmo instance name
        additionalProperties: false
        patternProperties:
            "^[a-zA-Z_][a-zA-Z0-9_]*$":
                oneOf:
                  - type: string
                    description: The gizmo class name to instantiate (if no parameters are needed)
                  - type: object
                    description: The only object key is the class name to instantiate; value are the constructor parameters
                    additionalProperties: false
                    minProperties: 1
                    maxProperties: 1
                    patternProperties:
                        "^[a-zA-Z_][a-zA-Z0-9_.]*$":
                            type: object
                            description: The constructor parameters of the gizmo
                            additionalProperties: true
    connections:
        type: array
        description: The list of connections between gizmos
        items:
            type: array
            description: The connection between gizmos
            items:
                oneOf:
                    - type: string
                    - type: array
                      description: Gizmo with input index
                      prefixItems:
                        - type: string
                        - type: number
                      items: false
```

{% endcode %}

Parameters:

| Name             | Type          | Description                                                                                                   | Default    |
| ---------------- | ------------- | ------------------------------------------------------------------------------------------------------------- | ---------- |
| `description`    | `str or dict` | A YAML string or dict describing the Composition, or a path to a .json/.yaml file containing the description. | *required* |
| `global_context` | `dict`        | Global context for evaluating expressions in the description (variables, etc.). Defaults to None.             | `None`     |
| `local_context`  | `dict`        | Local context for evaluating expressions. Defaults to None.                                                   | `None`     |

Returns:

| Name          | Type          | Description                                                                                                           |
| ------------- | ------------- | --------------------------------------------------------------------------------------------------------------------- |
| `Composition` | `Composition` | A [Composition](/degirum-tools/streams/streams_base.md#composition) object representing the described gizmo pipeline. |


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.degirum.com/degirum-tools/streams.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
