Skip to content

sgn.compose

Composable elements for SGN.

This module provides functionality for composing multiple elements into a single element that behaves as one atomic unit. This enables: - Source + Transform(s) → ComposedSourceElement - Transform + Transform(s) → ComposedTransformElement - Transform(s) + Sink → ComposedSinkElement

The composed elements work by merging internal element graphs into the Pipeline's graph, letting the Pipeline's single TopologicalSorter handle all execution.

Example usage

from sgn.compose import Compose

Create a composed source from source + transforms

composed = ( ... Compose() ... .connect(my_source, transform1) ... .connect(transform1, transform2) ... .as_source(name="my_composed_source") ... )

Use like any other source element

pipeline.connect(composed, my_sink)

Compose

Bases: Graph


              flowchart TD
              sgn.compose.Compose[Compose]
              sgn.apps.Graph[Graph]

                              sgn.apps.Graph --> sgn.compose.Compose
                


              click sgn.compose.Compose href "" "sgn.compose.Compose"
              click sgn.apps.Graph href "" "sgn.apps.Graph"
            

Fluent builder for creating composed elements.

This class provides a chainable API for composing multiple elements into a single element. It inherits from Graph to reuse insert() and connect() methods with implicit linking strategies.

Example (linear chain): >>> composed_source = ( ... Compose() ... .connect(source, transform1) ... .connect(transform1, transform2) ... .as_source(name="my_source") ... )

Example (non-linear graph): >>> composed_source = ( ... Compose() ... .connect(source1, merge_transform) ... .connect(source2, merge_transform) ... .as_source(name="merged_source") ... )

Source code in src/sgn/compose.py
class Compose(Graph):
    """Fluent builder for creating composed elements.

    This class provides a chainable API for composing multiple elements
    into a single element. It inherits from Graph to reuse insert() and connect()
    methods with implicit linking strategies.

    Example (linear chain):
        >>> composed_source = (
        ...     Compose()
        ...     .connect(source, transform1)
        ...     .connect(transform1, transform2)
        ...     .as_source(name="my_source")
        ... )

    Example (non-linear graph):
        >>> composed_source = (
        ...     Compose()
        ...     .connect(source1, merge_transform)
        ...     .connect(source2, merge_transform)
        ...     .as_source(name="merged_source")
        ... )
    """

    def __init__(self, first_element: Element | None = None) -> None:
        """Initialize composition, optionally with a first element.

        Args:
            first_element: Optional first element for backwards compatibility
                with linear chain pattern. If None, use insert() to add elements.
        """
        super().__init__()
        if first_element is not None:
            self.insert(first_element)

    def _build_link_dict(self) -> dict[str, str]:
        """Extract link dictionary from internal graph.

        Returns a mapping of sink pad full names to source pad full names
        for all inter-element links.
        """
        links = {}
        for pad in self.graph:
            if isinstance(pad, SinkPad) and pad.other is not None:
                links[pad.name] = pad.other.name
        return links

    def as_source(
        self,
        name: str = "",
        also_expose_source_pads: list[str] | None = None,
        update_pad_names: dict[str, dict[str, str]] | None = None,
    ) -> ComposedSourceElement:
        """Finalize the composition as a ComposedSourceElement.

        Args:
            name: Optional name for the composed element
            also_expose_source_pads: Optional list of internal source pad full names
                (format: "element_name:src:pad_name") that should be exposed externally
                even when they are connected to internal sinks. This enables multilink
                patterns where a single source feeds both internal elements and
                external consumers.
            update_pad_names: Optional dictionary that has as keys the names
                of boundary pads that shall be replaced or renamed; each value
                is expected to be another dictionary. This should contain key(s),
                representing new pad name(s) for the boundary pad that is
                replaced, with the respctive values holding the full pad name(s)
                in the format "element_name:src:pad_name".

        Returns:
            A new ComposedSourceElement wrapping the composition
        """
        return ComposedSourceElement(
            name=name,
            internal_elements=self.elements.copy(),
            internal_links=self._build_link_dict(),
            also_expose_source_pads=also_expose_source_pads or [],
            update_pad_names=update_pad_names or {},
        )

    def as_transform(
        self,
        name: str = "",
        also_expose_source_pads: list[str] | None = None,
        update_pad_names: dict[str, dict[str, str]] | None = None,
    ) -> ComposedTransformElement:
        """Finalize the composition as a ComposedTransformElement.

        Args:
            name: Optional name for the composed element
            also_expose_source_pads: Optional list of internal source pad full names
                (format: "element_name:src:pad_name") that should be exposed externally
                even when connected internally. Enables multilink patterns.
            update_pad_names: Optional dictionary that has as keys the names
                of boundary pads that shall be replaced or renamed; each value
                is expected to be another dictionary. This should contain key(s),
                representing new pad name(s) for the boundary pad that is
                replaced, with the respctive values holding the full pad name(s)
                in the format "element_name:src:pad_name".

        Returns:
            A new ComposedTransformElement wrapping the composition
        """
        return ComposedTransformElement(
            name=name,
            internal_elements=self.elements.copy(),
            internal_links=self._build_link_dict(),
            also_expose_source_pads=also_expose_source_pads or [],
            update_pad_names=update_pad_names or {},
        )

    def as_sink(
        self,
        name: str = "",
        update_pad_names: dict[str, dict[str, str]] | None = None,
    ) -> ComposedSinkElement:
        """Finalize the composition as a ComposedSinkElement.

        Args:
            name: Optional name for the composed element
            update_pad_names: Optional dictionary that has as keys the names
                of boundary pads that shall be replaced or renamed; each value
                is expected to be another dictionary. This should contain key(s),
                representing new pad name(s) for the boundary pad that is
                replaced, with the respctive values holding the full pad name(s)
                in the format "element_name:src:pad_name".

        Returns:
            A new ComposedSinkElement wrapping the composition
        """
        return ComposedSinkElement(
            name=name,
            internal_elements=self.elements.copy(),
            internal_links=self._build_link_dict(),
            update_pad_names=update_pad_names or {},
        )

__init__(first_element=None)

Initialize composition, optionally with a first element.

Parameters:

Name Type Description Default
first_element Element | None

Optional first element for backwards compatibility with linear chain pattern. If None, use insert() to add elements.

None
Source code in src/sgn/compose.py
def __init__(self, first_element: Element | None = None) -> None:
    """Initialize composition, optionally with a first element.

    Args:
        first_element: Optional first element for backwards compatibility
            with linear chain pattern. If None, use insert() to add elements.
    """
    super().__init__()
    if first_element is not None:
        self.insert(first_element)

as_sink(name='', update_pad_names=None)

Finalize the composition as a ComposedSinkElement.

Parameters:

Name Type Description Default
name str

Optional name for the composed element

''
update_pad_names dict[str, dict[str, str]] | None

Optional dictionary that has as keys the names of boundary pads that shall be replaced or renamed; each value is expected to be another dictionary. This should contain key(s), representing new pad name(s) for the boundary pad that is replaced, with the respctive values holding the full pad name(s) in the format "element_name:src:pad_name".

None

Returns:

Type Description
ComposedSinkElement

A new ComposedSinkElement wrapping the composition

Source code in src/sgn/compose.py
def as_sink(
    self,
    name: str = "",
    update_pad_names: dict[str, dict[str, str]] | None = None,
) -> ComposedSinkElement:
    """Finalize the composition as a ComposedSinkElement.

    Args:
        name: Optional name for the composed element
        update_pad_names: Optional dictionary that has as keys the names
            of boundary pads that shall be replaced or renamed; each value
            is expected to be another dictionary. This should contain key(s),
            representing new pad name(s) for the boundary pad that is
            replaced, with the respctive values holding the full pad name(s)
            in the format "element_name:src:pad_name".

    Returns:
        A new ComposedSinkElement wrapping the composition
    """
    return ComposedSinkElement(
        name=name,
        internal_elements=self.elements.copy(),
        internal_links=self._build_link_dict(),
        update_pad_names=update_pad_names or {},
    )

as_source(name='', also_expose_source_pads=None, update_pad_names=None)

Finalize the composition as a ComposedSourceElement.

Parameters:

Name Type Description Default
name str

Optional name for the composed element

''
also_expose_source_pads list[str] | None

Optional list of internal source pad full names (format: "element_name:src:pad_name") that should be exposed externally even when they are connected to internal sinks. This enables multilink patterns where a single source feeds both internal elements and external consumers.

None
update_pad_names dict[str, dict[str, str]] | None

Optional dictionary that has as keys the names of boundary pads that shall be replaced or renamed; each value is expected to be another dictionary. This should contain key(s), representing new pad name(s) for the boundary pad that is replaced, with the respctive values holding the full pad name(s) in the format "element_name:src:pad_name".

None

Returns:

Type Description
ComposedSourceElement

A new ComposedSourceElement wrapping the composition

Source code in src/sgn/compose.py
def as_source(
    self,
    name: str = "",
    also_expose_source_pads: list[str] | None = None,
    update_pad_names: dict[str, dict[str, str]] | None = None,
) -> ComposedSourceElement:
    """Finalize the composition as a ComposedSourceElement.

    Args:
        name: Optional name for the composed element
        also_expose_source_pads: Optional list of internal source pad full names
            (format: "element_name:src:pad_name") that should be exposed externally
            even when they are connected to internal sinks. This enables multilink
            patterns where a single source feeds both internal elements and
            external consumers.
        update_pad_names: Optional dictionary that has as keys the names
            of boundary pads that shall be replaced or renamed; each value
            is expected to be another dictionary. This should contain key(s),
            representing new pad name(s) for the boundary pad that is
            replaced, with the respctive values holding the full pad name(s)
            in the format "element_name:src:pad_name".

    Returns:
        A new ComposedSourceElement wrapping the composition
    """
    return ComposedSourceElement(
        name=name,
        internal_elements=self.elements.copy(),
        internal_links=self._build_link_dict(),
        also_expose_source_pads=also_expose_source_pads or [],
        update_pad_names=update_pad_names or {},
    )

as_transform(name='', also_expose_source_pads=None, update_pad_names=None)

Finalize the composition as a ComposedTransformElement.

Parameters:

Name Type Description Default
name str

Optional name for the composed element

''
also_expose_source_pads list[str] | None

Optional list of internal source pad full names (format: "element_name:src:pad_name") that should be exposed externally even when connected internally. Enables multilink patterns.

None
update_pad_names dict[str, dict[str, str]] | None

Optional dictionary that has as keys the names of boundary pads that shall be replaced or renamed; each value is expected to be another dictionary. This should contain key(s), representing new pad name(s) for the boundary pad that is replaced, with the respctive values holding the full pad name(s) in the format "element_name:src:pad_name".

None

Returns:

Type Description
ComposedTransformElement

A new ComposedTransformElement wrapping the composition

Source code in src/sgn/compose.py
def as_transform(
    self,
    name: str = "",
    also_expose_source_pads: list[str] | None = None,
    update_pad_names: dict[str, dict[str, str]] | None = None,
) -> ComposedTransformElement:
    """Finalize the composition as a ComposedTransformElement.

    Args:
        name: Optional name for the composed element
        also_expose_source_pads: Optional list of internal source pad full names
            (format: "element_name:src:pad_name") that should be exposed externally
            even when connected internally. Enables multilink patterns.
        update_pad_names: Optional dictionary that has as keys the names
            of boundary pads that shall be replaced or renamed; each value
            is expected to be another dictionary. This should contain key(s),
            representing new pad name(s) for the boundary pad that is
            replaced, with the respctive values holding the full pad name(s)
            in the format "element_name:src:pad_name".

    Returns:
        A new ComposedTransformElement wrapping the composition
    """
    return ComposedTransformElement(
        name=name,
        internal_elements=self.elements.copy(),
        internal_links=self._build_link_dict(),
        also_expose_source_pads=also_expose_source_pads or [],
        update_pad_names=update_pad_names or {},
    )

ComposedElementMixin

Bases: ABC


              flowchart TD
              sgn.compose.ComposedElementMixin[ComposedElementMixin]

              

              click sgn.compose.ComposedElementMixin href "" "sgn.compose.ComposedElementMixin"
            

Mixin class providing composition functionality for composed elements.

This mixin provides the infrastructure for managing composed elements by: 1. Building a merged graph from internal elements 2. Identifying boundary pads (entry/exit points) 3. Creating virtual source pads for frame injection (for Transform/Sink)

The key insight is that composed elements don't execute internally - they merge their internal graphs into the Pipeline, which handles all execution.

Source code in src/sgn/compose.py
class ComposedElementMixin(ABC):
    """Mixin class providing composition functionality for composed elements.

    This mixin provides the infrastructure for managing composed elements by:
    1. Building a merged graph from internal elements
    2. Identifying boundary pads (entry/exit points)
    3. Creating virtual source pads for frame injection (for Transform/Sink)

    The key insight is that composed elements don't execute internally - they
    merge their internal graphs into the Pipeline, which handles all execution.
    """

    # These will be set by the dataclass in the concrete class
    internal_elements: list[Element]
    internal_links: dict[str, str]

    # Runtime state (initialized in _build_internal_infrastructure)
    _internal_graph: Graph
    _boundary_source_pads: dict[str, SourcePad]
    _boundary_sink_pads: dict[str, SinkPad]
    _virtual_sources: dict[str, SourcePad]

    def _build_internal_infrastructure(self) -> None:
        """Build the internal graph and identify boundary pads.

        This method:
        1. Creates a Graph to manage internal elements
        2. Inserts all internal elements
        3. Links internal pads according to internal_links
        4. Identifies boundary pads (unlinked entry/exit points)
        """
        self._internal_graph = Graph()
        self._virtual_sources = {}

        # Insert all internal elements
        for element in self.internal_elements:
            self._internal_graph.insert(element)

        # Apply internal links
        for sink_pad_name, source_pad_name in self.internal_links.items():
            sink_pad = self._internal_graph[sink_pad_name]
            source_pad = self._internal_graph[source_pad_name]
            assert isinstance(sink_pad, SinkPad)
            assert isinstance(source_pad, SourcePad)
            self._internal_graph.link({sink_pad: source_pad})

        # Identify boundary pads
        self._identify_boundary_pads()

    def _identify_boundary_pads(self) -> None:
        """Find pads that should be exposed externally.

        Boundary source pads: source pads from internal elements that are NOT
            linked to any internal sink pad (these become outputs)
        Boundary sink pads: sink pads from internal elements that are NOT
            linked from any internal source pad (these become inputs)
        """
        internally_linked_sinks = set(self.internal_links.keys())
        internally_linked_sources = set(self.internal_links.values())

        self._boundary_source_pads = {}
        self._boundary_sink_pads = {}

        for element in self.internal_elements:
            # Check source pads - unlinked ones become outputs
            if isinstance(element, (SourceElement, TransformElement)):
                for pad_name, pad in element.srcs.items():
                    if pad.name not in internally_linked_sources:
                        self._boundary_source_pads[pad_name] = pad

            # Check sink pads - unlinked ones become inputs
            if isinstance(element, (TransformElement, SinkElement)):
                for pad_name, pad in element.snks.items():
                    if pad.name not in internally_linked_sinks:
                        self._boundary_sink_pads[pad_name] = pad

ComposedSinkElement dataclass

Bases: ComposedElementMixin, SinkElement


              flowchart TD
              sgn.compose.ComposedSinkElement[ComposedSinkElement]
              sgn.compose.ComposedElementMixin[ComposedElementMixin]
              sgn.base.SinkElement[SinkElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.compose.ComposedElementMixin --> sgn.compose.ComposedSinkElement
                
                sgn.base.SinkElement --> sgn.compose.ComposedSinkElement
                                sgn.base.ElementLike --> sgn.base.SinkElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                




              click sgn.compose.ComposedSinkElement href "" "sgn.compose.ComposedSinkElement"
              click sgn.compose.ComposedElementMixin href "" "sgn.compose.ComposedElementMixin"
              click sgn.base.SinkElement href "" "sgn.base.SinkElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

A composed element that behaves like a SinkElement.

Created from: One or more SinkElements, optionally with TransformElements. Exposes: Sink pads from the boundary (unlinked sink pads become inputs).

Uses virtual source pads to inject frames into internal boundary sink pads. The Pipeline executes all internal elements via topological sort.

Supports both linear chains and non-linear graphs (e.g., a transform that fans out to multiple sinks).

Source code in src/sgn/compose.py
@dataclass(kw_only=True)
class ComposedSinkElement(ComposedElementMixin, SinkElement):
    """A composed element that behaves like a SinkElement.

    Created from: One or more SinkElements, optionally with TransformElements.
    Exposes: Sink pads from the boundary (unlinked sink pads become inputs).

    Uses virtual source pads to inject frames into internal boundary sink pads.
    The Pipeline executes all internal elements via topological sort.

    Supports both linear chains and non-linear graphs (e.g., a transform that
    fans out to multiple sinks).
    """

    # Composition inputs
    internal_elements: list[Element] = field(default_factory=list)
    internal_links: dict[str, str] = field(default_factory=dict)

    # Additional: rename boundary pads
    update_pad_names: dict[str, dict[str, str]] = field(default_factory=dict)

    # Override to not require sink_pad_names at init
    sink_pad_names: list[str] = field(default_factory=list)

    # Runtime state
    _internal_graph: Graph = field(init=False)
    _boundary_source_pads: dict[str, SourcePad] = field(
        init=False, default_factory=dict
    )
    _boundary_sink_pads: dict[str, SinkPad] = field(init=False, default_factory=dict)
    _virtual_sources: dict[str, SourcePad] = field(init=False, default_factory=dict)
    _internal_sinks: list[SinkElement] = field(init=False, default_factory=list)

    def __post_init__(self):
        # First, set up UniqueID
        UniqueID.__post_init__(self)

        # Validate composition
        if not self.internal_elements:
            raise ValueError("ComposedSinkElement requires at least one element")

        # Must have at least one SinkElement
        self._internal_sinks = [
            elem for elem in self.internal_elements if isinstance(elem, SinkElement)
        ]
        if not self._internal_sinks:
            raise TypeError("ComposedSinkElement requires at least one SinkElement")

        # Cannot contain any SourceElements
        for elem in self.internal_elements:
            if isinstance(elem, SourceElement):
                raise TypeError(
                    f"ComposedSinkElement cannot contain SourceElement, "
                    f"got {elem.name}"
                )

        # Build internal infrastructure
        self._build_internal_infrastructure()

        if self.update_pad_names:
            # Names of certain boundary pads shall be updated. This requires
            # (i) removing the old pad from the boundary dict and
            # (ii) adding the new pad(s) by looking them up in the internal graph.
            for old_pad_name, new_pads in self.update_pad_names.items():
                self._boundary_sink_pads.pop(old_pad_name)
                # pop automatically raises KeyError if not found
                for new_pad_name, new_pad_full_name in new_pads.items():
                    self._boundary_sink_pads[new_pad_name] = self._internal_graph[
                        new_pad_full_name
                    ]

        # Set up sink pads from boundary pads
        self.sink_pad_names = list(self._boundary_sink_pads.keys())

        # Initialize graph and internal pad
        self.graph = {}
        self.internal_pad = InternalPad(name="inl", element=self, call=self.internal)

        # Create our own sink pads
        self.sink_pads = [
            SinkPad(name=pad_name, element=self, call=self.pull)
            for pad_name in self.sink_pad_names
        ]

        # Create virtual source pads for frame injection
        for snk_name in self.sink_pad_names:
            virtual_src = SourcePad(
                name=f"_vs_{snk_name}",
                element=self,
                call=lambda pad: pad.output,
            )
            self._virtual_sources[snk_name] = virtual_src

        self.snks = {n: p for n, p in zip(self.sink_pad_names, self.sink_pads)}
        self.rsnks = {p: n for n, p in zip(self.sink_pad_names, self.sink_pads)}
        self._at_eos = {p: False for p in self.sink_pads}

        if not self.sink_pads:  # pragma: no cover
            raise ValueError("ComposedSinkElement must have at least one sink pad")

        self.sink_pad_names_full = [p.name for p in self.sink_pads]

        # Build composed element's graph:
        # 1. Include all internal element graphs
        self.graph.update(self._internal_graph.graph)

        # 2. Link internal boundary sinks to virtual sources and add edges
        for snk_name, virtual_src in self._virtual_sources.items():
            internal_snk = self._boundary_sink_pads[snk_name]
            link_graph = internal_snk.link(virtual_src)
            self.graph.update(link_graph)

        # 3. Virtual sources depend on composed sink pads
        for snk_name, virtual_src in self._virtual_sources.items():
            composed_snk = self.snks[snk_name]
            self.graph[virtual_src] = {composed_snk}

        # 4. Composed internal_pad depends on all internal sinks' internal_pads
        # This ensures composed element waits for all internal sinks to complete
        internal_sink_pads = {sink.internal_pad for sink in self._internal_sinks}
        self.graph[self.internal_pad] = internal_sink_pads

    @property
    def pad_list(self) -> list[Pad]:
        """Return all pads including internal element pads and virtual sources."""
        pads: list[Pad] = []
        # Add our own pads
        pads.extend(self.sink_pads)
        pads.append(self.internal_pad)
        # Add virtual sources
        pads.extend(self._virtual_sources.values())
        # Add all internal element pads
        for element in self.internal_elements:
            pads.extend(element.pad_list)
        return pads

    @property
    def at_eos(self) -> bool:
        """Return True when all internal sinks are at EOS."""
        if self._internal_sinks:
            return all(sink.at_eos for sink in self._internal_sinks)
        return any(self._at_eos.values())  # pragma: no cover

    def pull(self, pad: SinkPad, frame: Frame) -> None:
        """Inject frame into virtual source for internal boundary sink."""
        virtual_src = self._virtual_sources[pad.pad_name]
        virtual_src.output = frame

        # Propagate EOS if needed
        if frame.EOS:
            self.mark_eos(pad)

    def internal(self) -> None:
        """No-op - Pipeline handles all execution via merged graph."""
        pass

at_eos property

Return True when all internal sinks are at EOS.

pad_list property

Return all pads including internal element pads and virtual sources.

internal()

No-op - Pipeline handles all execution via merged graph.

Source code in src/sgn/compose.py
def internal(self) -> None:
    """No-op - Pipeline handles all execution via merged graph."""
    pass

pull(pad, frame)

Inject frame into virtual source for internal boundary sink.

Source code in src/sgn/compose.py
def pull(self, pad: SinkPad, frame: Frame) -> None:
    """Inject frame into virtual source for internal boundary sink."""
    virtual_src = self._virtual_sources[pad.pad_name]
    virtual_src.output = frame

    # Propagate EOS if needed
    if frame.EOS:
        self.mark_eos(pad)

ComposedSourceElement dataclass

Bases: ComposedElementMixin, SourceElement


              flowchart TD
              sgn.compose.ComposedSourceElement[ComposedSourceElement]
              sgn.compose.ComposedElementMixin[ComposedElementMixin]
              sgn.base.SourceElement[SourceElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.compose.ComposedElementMixin --> sgn.compose.ComposedSourceElement
                
                sgn.base.SourceElement --> sgn.compose.ComposedSourceElement
                                sgn.base.ElementLike --> sgn.base.SourceElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                




              click sgn.compose.ComposedSourceElement href "" "sgn.compose.ComposedSourceElement"
              click sgn.compose.ComposedElementMixin href "" "sgn.compose.ComposedElementMixin"
              click sgn.base.SourceElement href "" "sgn.base.SourceElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

A composed element that behaves like a SourceElement.

Created from: One or more SourceElements, optionally with TransformElements. Exposes: Source pads from the boundary (unlinked source pads become outputs).

The internal elements' graphs are merged into the Pipeline's graph. When executed, the Pipeline runs all internal elements via topological sort, and this element's source pads return the internal boundary pads' outputs.

Supports both linear chains and non-linear graphs (e.g., multiple sources feeding into a merge transform).

The also_expose_source_pads parameter allows source pads to be exposed externally even when they are also connected to internal sinks. This enables multilink patterns where a single source feeds both internal elements (e.g., latency tracking) and external consumers.

Example with exposed internal source

H1 pad is connected to latency internally but also exposed

composed = ComposedSourceElement( ... name="source_with_latency", ... internal_elements=[strain_source, latency_element], ... internal_links={"latency:snk:data": "strain:src:H1"}, ... also_expose_source_pads=["strain:src:H1"], ... )

Source code in src/sgn/compose.py
@dataclass(repr=False, kw_only=True)
class ComposedSourceElement(ComposedElementMixin, SourceElement):
    """A composed element that behaves like a SourceElement.

    Created from: One or more SourceElements, optionally with TransformElements.
    Exposes: Source pads from the boundary (unlinked source pads become outputs).

    The internal elements' graphs are merged into the Pipeline's graph.
    When executed, the Pipeline runs all internal elements via topological sort,
    and this element's source pads return the internal boundary pads' outputs.

    Supports both linear chains and non-linear graphs (e.g., multiple sources
    feeding into a merge transform).

    The `also_expose_source_pads` parameter allows source pads to be exposed
    externally even when they are also connected to internal sinks. This
    enables multilink patterns where a single source feeds both internal
    elements (e.g., latency tracking) and external consumers.

    Example with exposed internal source:
        >>> # H1 pad is connected to latency internally but also exposed
        >>> composed = ComposedSourceElement(
        ...     name="source_with_latency",
        ...     internal_elements=[strain_source, latency_element],
        ...     internal_links={"latency:snk:data": "strain:src:H1"},
        ...     also_expose_source_pads=["strain:src:H1"],
        ... )
    """

    # Composition inputs
    internal_elements: list[Element] = field(default_factory=list)
    internal_links: dict[str, str] = field(default_factory=dict)

    # Additional: source pads to expose even when internally linked
    also_expose_source_pads: list[str] = field(default_factory=list)

    # Additional: rename boundary pads
    update_pad_names: dict[str, dict[str, str]] = field(default_factory=dict)

    # Override to not require source_pad_names at init
    source_pad_names: list[str] = field(default_factory=list)

    # Runtime state
    _internal_graph: Graph = field(init=False)
    _boundary_source_pads: dict[str, SourcePad] = field(
        init=False, default_factory=dict
    )
    _boundary_sink_pads: dict[str, SinkPad] = field(init=False, default_factory=dict)
    _virtual_sources: dict[str, SourcePad] = field(init=False, default_factory=dict)

    def __post_init__(self):
        # First, set up UniqueID
        UniqueID.__post_init__(self)

        # Validate composition
        if not self.internal_elements:
            raise ValueError("ComposedSourceElement requires at least one element")

        # Must have at least one SourceElement
        has_source = any(
            isinstance(elem, SourceElement) for elem in self.internal_elements
        )
        if not has_source:
            raise TypeError("ComposedSourceElement requires at least one SourceElement")

        # Cannot contain any SinkElements
        for elem in self.internal_elements:
            if isinstance(elem, SinkElement):
                raise TypeError(
                    f"ComposedSourceElement cannot contain SinkElement, "
                    f"got {elem.name}"
                )

        # Build internal infrastructure
        self._build_internal_infrastructure()

        # Add explicitly exposed source pads that were filtered out
        # (they are internally linked but should also be exposed externally)
        if self.also_expose_source_pads:
            for pad_full_name in self.also_expose_source_pads:
                pad = self._internal_graph[pad_full_name]
                if isinstance(pad, SourcePad):
                    pad_name = pad.pad_name
                    if pad_name not in self._boundary_source_pads:
                        self._boundary_source_pads[pad_name] = pad

        if self.update_pad_names:
            # Names of certain boundary pads shall be updated. This requires
            # (i) removing the old pad from the boundary dict and
            # (ii) adding the new pad(s) by looking them up in the internal graph.
            for old_pad_name, new_pads in self.update_pad_names.items():
                self._boundary_source_pads.pop(old_pad_name)
                # pop automatically raises KeyError if not found
                for new_pad_name, new_pad_full_name in new_pads.items():
                    self._boundary_source_pads[new_pad_name] = self._internal_graph[
                        new_pad_full_name
                    ]

        # Set up source pads from boundary pads
        self.source_pad_names = list(self._boundary_source_pads.keys())

        # Create our own pads that delegate to internal boundary pads
        self.graph = {}
        self.internal_pad = InternalPad(name="inl", element=self, call=self.internal)

        self.source_pads = [
            SourcePad(name=pad_name, element=self, call=self.new)
            for pad_name in self.source_pad_names
        ]
        self.srcs = {n: p for n, p in zip(self.source_pad_names, self.source_pads)}
        self.rsrcs = {p: n for n, p in zip(self.source_pad_names, self.source_pads)}

        if not self.source_pads:  # pragma: no cover
            raise ValueError("ComposedSourceElement must have at least one source pad")

        # Build composed element's graph:
        # 1. Include all internal element graphs
        self.graph.update(self._internal_graph.graph)

        # 2. Composed internal_pad depends on internal boundary source pads
        internal_boundary_srcs = set(self._boundary_source_pads.values())
        self.graph[self.internal_pad] = internal_boundary_srcs

        # 3. Composed source pads depend on composed internal_pad
        self.graph.update({s: {self.internal_pad} for s in self.source_pads})

    @property
    def pad_list(self) -> list[Pad]:
        """Return all pads including internal element pads."""
        pads: list[Pad] = []
        # Add our own pads
        pads.extend(self.source_pads)
        pads.append(self.internal_pad)
        # Add all internal element pads
        for element in self.internal_elements:
            pads.extend(element.pad_list)
        return pads

    def internal(self) -> None:
        """No-op - Pipeline handles all execution via merged graph."""
        pass

    def new(self, pad: SourcePad) -> Frame:
        """Return output from internal boundary pad (already executed by Pipeline)."""
        internal_pad = self._boundary_source_pads[pad.pad_name]
        output = internal_pad.output
        assert output is not None, f"Internal pad {internal_pad.name} has no output"
        return output

pad_list property

Return all pads including internal element pads.

internal()

No-op - Pipeline handles all execution via merged graph.

Source code in src/sgn/compose.py
def internal(self) -> None:
    """No-op - Pipeline handles all execution via merged graph."""
    pass

new(pad)

Return output from internal boundary pad (already executed by Pipeline).

Source code in src/sgn/compose.py
def new(self, pad: SourcePad) -> Frame:
    """Return output from internal boundary pad (already executed by Pipeline)."""
    internal_pad = self._boundary_source_pads[pad.pad_name]
    output = internal_pad.output
    assert output is not None, f"Internal pad {internal_pad.name} has no output"
    return output

ComposedTransformElement dataclass

Bases: ComposedElementMixin, TransformElement


              flowchart TD
              sgn.compose.ComposedTransformElement[ComposedTransformElement]
              sgn.compose.ComposedElementMixin[ComposedElementMixin]
              sgn.base.TransformElement[TransformElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.compose.ComposedElementMixin --> sgn.compose.ComposedTransformElement
                
                sgn.base.TransformElement --> sgn.compose.ComposedTransformElement
                                sgn.base.ElementLike --> sgn.base.TransformElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                




              click sgn.compose.ComposedTransformElement href "" "sgn.compose.ComposedTransformElement"
              click sgn.compose.ComposedElementMixin href "" "sgn.compose.ComposedElementMixin"
              click sgn.base.TransformElement href "" "sgn.base.TransformElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

A composed element that behaves like a TransformElement.

Created from: TransformElement → TransformElement* (one or more transforms) Exposes: Sink pads from first element, Source pads from last element

Uses virtual source pads to inject frames into internal boundary sink pads. The Pipeline executes all internal elements via topological sort.

The also_expose_source_pads parameter allows source pads to be exposed externally even when they are also connected to internal sinks. This enables multilink patterns where a single source feeds both internal elements (e.g., latency tracking, kernel computation) and external consumers.

Example with exposed internal source

spectrum pad is connected internally but also exposed externally

composed = ComposedTransformElement( ... name="transform_with_multilink", ... internal_elements=[whiten, kernel, afir], ... internal_links={"kernel:snk:spectrum": "whiten:src:spectrum"}, ... also_expose_source_pads=["whiten:src:spectrum"], ... )

Source code in src/sgn/compose.py
@dataclass(repr=False, kw_only=True)
class ComposedTransformElement(ComposedElementMixin, TransformElement):
    """A composed element that behaves like a TransformElement.

    Created from: TransformElement → TransformElement* (one or more transforms)
    Exposes: Sink pads from first element, Source pads from last element

    Uses virtual source pads to inject frames into internal boundary sink pads.
    The Pipeline executes all internal elements via topological sort.

    The `also_expose_source_pads` parameter allows source pads to be exposed
    externally even when they are also connected to internal sinks. This
    enables multilink patterns where a single source feeds both internal
    elements (e.g., latency tracking, kernel computation) and external consumers.

    Example with exposed internal source:
        >>> # spectrum pad is connected internally but also exposed externally
        >>> composed = ComposedTransformElement(
        ...     name="transform_with_multilink",
        ...     internal_elements=[whiten, kernel, afir],
        ...     internal_links={"kernel:snk:spectrum": "whiten:src:spectrum"},
        ...     also_expose_source_pads=["whiten:src:spectrum"],
        ... )
    """

    # Composition inputs
    internal_elements: list[Element] = field(default_factory=list)
    internal_links: dict[str, str] = field(default_factory=dict)

    # Additional: source pads to expose even when internally linked
    also_expose_source_pads: list[str] = field(default_factory=list)

    # Additional: rename boundary pads
    update_pad_names: dict[str, dict[str, str]] = field(default_factory=dict)

    # Override to not require pad names at init
    source_pad_names: list[str] = field(default_factory=list)
    sink_pad_names: list[str] = field(default_factory=list)

    # Runtime state
    _internal_graph: Graph = field(init=False)
    _boundary_source_pads: dict[str, SourcePad] = field(
        init=False, default_factory=dict
    )
    _boundary_sink_pads: dict[str, SinkPad] = field(init=False, default_factory=dict)
    _virtual_sources: dict[str, SourcePad] = field(init=False, default_factory=dict)

    def __post_init__(self):
        # First, set up UniqueID
        UniqueID.__post_init__(self)

        # Validate composition
        if not self.internal_elements:
            raise ValueError("ComposedTransformElement requires at least one element")

        for elem in self.internal_elements:
            if isinstance(elem, SourceElement):
                raise TypeError(
                    f"Transform composition cannot contain SourceElements, "
                    f"got {type(elem).__name__}"
                )

        # Build internal infrastructure
        self._build_internal_infrastructure()

        # Add explicitly exposed source pads that were filtered out
        # (they are internally linked but should also be exposed externally)
        if self.also_expose_source_pads:
            for pad_full_name in self.also_expose_source_pads:
                pad = self._internal_graph[pad_full_name]
                if isinstance(pad, SourcePad):
                    pad_name = pad.pad_name
                    if pad_name not in self._boundary_source_pads:
                        self._boundary_source_pads[pad_name] = pad

        if self.update_pad_names:
            # Names of certain boundary pads shall be updated. This requires
            # (i) removing the old pad from the boundary dict and
            # (ii) adding the new pad(s) by looking them up in the internal graph.
            for old_pad_name, new_pads in self.update_pad_names.items():
                if old_pad_name in self._boundary_sink_pads:
                    self._boundary_sink_pads.pop(old_pad_name)
                    for new_pad_name, new_pad_full_name in new_pads.items():
                        self._boundary_sink_pads[new_pad_name] = self._internal_graph[
                            new_pad_full_name
                        ]
                elif old_pad_name in self._boundary_source_pads:
                    self._boundary_source_pads.pop(old_pad_name)
                    for new_pad_name, new_pad_full_name in new_pads.items():
                        self._boundary_source_pads[new_pad_name] = self._internal_graph[
                            new_pad_full_name
                        ]
                else:
                    raise KeyError(
                        f"Pad name {old_pad_name} not found in boundary pads"
                    )

        # Set up pads from boundary pads
        self.sink_pad_names = list(self._boundary_sink_pads.keys())
        self.source_pad_names = list(self._boundary_source_pads.keys())

        # Initialize graph and internal pad
        self.graph = {}
        self.internal_pad = InternalPad(name="inl", element=self, call=self.internal)

        # Create our own sink pads
        self.sink_pads = [
            SinkPad(name=pad_name, element=self, call=self.pull)
            for pad_name in self.sink_pad_names
        ]

        # Create virtual source pads for frame injection
        # These hold frames that internal boundary sinks will read from
        # We'll add the graph edges after initializing self.graph
        for snk_name in self.sink_pad_names:
            virtual_src = SourcePad(
                name=f"_vs_{snk_name}",
                element=self,
                call=lambda pad: pad.output,  # Output is pre-set by pull()
            )
            self._virtual_sources[snk_name] = virtual_src

        # Create our own source pads
        self.source_pads = [
            SourcePad(name=pad_name, element=self, call=self.new)
            for pad_name in self.source_pad_names
        ]

        self.srcs = {n: p for n, p in zip(self.source_pad_names, self.source_pads)}
        self.snks = {n: p for n, p in zip(self.sink_pad_names, self.sink_pads)}
        self.rsrcs = {p: n for n, p in zip(self.source_pad_names, self.source_pads)}
        self.rsnks = {p: n for n, p in zip(self.sink_pad_names, self.sink_pads)}

        if not self.source_pads or not self.sink_pads:  # pragma: no cover
            raise ValueError(
                "ComposedTransformElement must have both sink and source pads"
            )

        # Build composed element's graph:
        # 1. Include all internal element graphs
        self.graph.update(self._internal_graph.graph)

        # 2. Link internal boundary sinks to virtual sources and add edges
        for snk_name, virtual_src in self._virtual_sources.items():
            internal_snk = self._boundary_sink_pads[snk_name]
            link_graph = internal_snk.link(virtual_src)
            self.graph.update(link_graph)

        # 3. Virtual sources depend on composed sink pads
        for snk_name, virtual_src in self._virtual_sources.items():
            composed_snk = self.snks[snk_name]
            self.graph[virtual_src] = {composed_snk}

        # 4. Composed internal_pad depends on internal boundary source pads
        internal_boundary_srcs = set(self._boundary_source_pads.values())
        self.graph[self.internal_pad] = internal_boundary_srcs

        # 5. Composed source pads depend on composed internal_pad
        self.graph.update({s: {self.internal_pad} for s in self.source_pads})

    @property
    def pad_list(self) -> list[Pad]:
        """Return all pads including internal element pads and virtual sources."""
        pads: list[Pad] = []
        # Add our own pads
        pads.extend(self.source_pads)
        pads.extend(self.sink_pads)
        pads.append(self.internal_pad)
        # Add virtual sources
        pads.extend(self._virtual_sources.values())
        # Add all internal element pads
        for element in self.internal_elements:
            pads.extend(element.pad_list)
        return pads

    def pull(self, pad: SinkPad, frame: Frame) -> None:
        """Inject frame into virtual source for internal boundary sink."""
        virtual_src = self._virtual_sources[pad.pad_name]
        virtual_src.output = frame

    def internal(self) -> None:
        """No-op - Pipeline handles all execution via merged graph."""
        pass

    def new(self, pad: SourcePad) -> Frame:
        """Return output from internal boundary pad (already executed by Pipeline)."""
        internal_pad = self._boundary_source_pads[pad.pad_name]
        output = internal_pad.output
        assert output is not None, f"Internal pad {internal_pad.name} has no output"
        return output

pad_list property

Return all pads including internal element pads and virtual sources.

internal()

No-op - Pipeline handles all execution via merged graph.

Source code in src/sgn/compose.py
def internal(self) -> None:
    """No-op - Pipeline handles all execution via merged graph."""
    pass

new(pad)

Return output from internal boundary pad (already executed by Pipeline).

Source code in src/sgn/compose.py
def new(self, pad: SourcePad) -> Frame:
    """Return output from internal boundary pad (already executed by Pipeline)."""
    internal_pad = self._boundary_source_pads[pad.pad_name]
    output = internal_pad.output
    assert output is not None, f"Internal pad {internal_pad.name} has no output"
    return output

pull(pad, frame)

Inject frame into virtual source for internal boundary sink.

Source code in src/sgn/compose.py
def pull(self, pad: SinkPad, frame: Frame) -> None:
    """Inject frame into virtual source for internal boundary sink."""
    virtual_src = self._virtual_sources[pad.pad_name]
    virtual_src.output = frame