Skip to content

Groups Module

The sgn.groups module provides powerful utilities for grouping elements and selecting specific pads in SGN pipelines. This module enables more intuitive and flexible pipeline construction by allowing you to work with collections of elements and specific pad selections.

Key Concepts

Element Grouping

Element grouping allows you to treat multiple elements as a single unit when connecting pipelines. This is particularly useful when you have multiple sources that need to connect to multiple sinks, or when you want to organize elements logically.

Pad Selection

Pad selection enables you to specify exactly which pads from an element should be used in pipeline operations. This provides fine-grained control over data flow and helps prevent unintended connections.

Classes and Functions

sgn.groups.PadSelection dataclass

Bases: PadIteratorMixin

Represents a selection of specific pads from an element.

This allows users to specify exactly which pads from an element should be used in grouping operations.

Source code in sgn/groups.py
@dataclass
class PadSelection(PadIteratorMixin):
    """Represents a selection of specific pads from an element.

    This allows users to specify exactly which pads from an element
    should be used in grouping operations.
    """

    element: Element
    pad_names: set[str]

    def __post_init__(self) -> None:
        """Validate that the selected pad names exist on the element."""
        all_pad_names = set()

        # Add source pad names if element has them
        if isinstance(self.element, (SourceElement, TransformElement)):
            all_pad_names.update(self.element.srcs.keys())

        # Add sink pad names if element has them
        if isinstance(self.element, (TransformElement, SinkElement)):
            all_pad_names.update(self.element.snks.keys())

        invalid_names = self.pad_names - all_pad_names
        if invalid_names:
            msg = (
                f"Pad names {invalid_names} not found on element '{self.element.name}'"
                f" Pad names available: {all_pad_names}"
            )
            raise ValueError(msg)

    @property
    def srcs(self) -> dict[str, SourcePad]:
        """Extract selected source pads from the element using names as keys."""
        if isinstance(self.element, (SourceElement, TransformElement)):
            return {
                name: pad
                for name, pad in self.element.srcs.items()
                if name in self.pad_names
            }
        return {}

    @property
    def snks(self) -> dict[str, SinkPad]:
        """Extract selected sink pads from the element using names as keys."""
        if isinstance(self.element, (TransformElement, SinkElement)):
            return {
                name: pad
                for name, pad in self.element.snks.items()
                if name in self.pad_names
            }
        return {}

    @property
    def elements(self) -> list[Element]:
        """Get the element referenced by this selection."""
        return [self.element]

elements property

Get the element referenced by this selection.

snks property

Extract selected sink pads from the element using names as keys.

srcs property

Extract selected source pads from the element using names as keys.

__post_init__()

Validate that the selected pad names exist on the element.

Source code in sgn/groups.py
def __post_init__(self) -> None:
    """Validate that the selected pad names exist on the element."""
    all_pad_names = set()

    # Add source pad names if element has them
    if isinstance(self.element, (SourceElement, TransformElement)):
        all_pad_names.update(self.element.srcs.keys())

    # Add sink pad names if element has them
    if isinstance(self.element, (TransformElement, SinkElement)):
        all_pad_names.update(self.element.snks.keys())

    invalid_names = self.pad_names - all_pad_names
    if invalid_names:
        msg = (
            f"Pad names {invalid_names} not found on element '{self.element.name}'"
            f" Pad names available: {all_pad_names}"
        )
        raise ValueError(msg)

sgn.groups.ElementGroup dataclass

Bases: PadIteratorMixin

A unified group for elements and pad selections.

This class holds a collection of elements and pad selections without determining upfront whether to extract source or sink pads. The actual pad extraction is deferred until the group is used in pipeline.connect(), where the context (source vs sink position) determines which pads to extract.

Source code in sgn/groups.py
@dataclass
class ElementGroup(PadIteratorMixin):
    """A unified group for elements and pad selections.

    This class holds a collection of elements and pad selections without
    determining upfront whether to extract source or sink pads. The actual
    pad extraction is deferred until the group is used in pipeline.connect(),
    where the context (source vs sink position) determines which pads to extract.
    """

    items: list[Element | PadSelection] = field(default_factory=list)

    def select(self, *pad_names: str) -> ElementGroup:
        """Select pads from items in the group by pad name."""
        selected_items: list[Element | PadSelection] = []
        pad_names_set = set(pad_names)

        for item in self.items:
            if isinstance(item, PadSelection):
                # For existing pad selections, intersect with requested pad names
                element = item.element
                available_pads = item.pad_names
            else:
                # For elements, get all their pad names
                element = item
                available_pads = set()
                if isinstance(element, (SourceElement, TransformElement)):
                    available_pads.update(element.srcs.keys())
                if isinstance(element, (TransformElement, SinkElement)):
                    available_pads.update(element.snks.keys())

            # Find intersection of available pads with requested pad names
            matching_pads = available_pads & pad_names_set
            if matching_pads:
                selected_items.append(
                    PadSelection(element=element, pad_names=matching_pads)
                )

        return ElementGroup(items=selected_items)

    @property
    def elements(self) -> list[Element]:
        """Get all unique elements referenced by this group."""
        seen = set()
        unique_elements = []
        for item in self.items:
            if isinstance(item, PadSelection):
                element = item.element
            else:
                element = item
            element_id = id(element)
            if element_id not in seen:
                seen.add(element_id)
                unique_elements.append(element)
        return unique_elements

    @property
    def srcs(self) -> dict[str, SourcePad]:
        """Extract source pads from all items in the group using names as keys."""
        combined_pads = {}

        for item in self.items:
            if isinstance(item, SinkElement):
                msg = f"Element '{item.name}' is a SinkElement and has no source pads"
                raise ValueError(msg)

            for pad_name, pad in item.srcs.items():
                if pad_name in combined_pads:
                    msg = f"Duplicate pad name '{pad_name}' in group"
                    raise KeyError(msg)
                combined_pads[pad_name] = pad

        return combined_pads

    @property
    def snks(self) -> dict[str, SinkPad]:
        """Extract sink pads from all items in the group using names as keys."""
        combined_pads = {}

        for item in self.items:
            if isinstance(item, SourceElement):
                msg = f"Element '{item.name}' is a SourceElement and has no sink pads"
                raise ValueError(msg)

            for pad_name, pad in item.snks.items():
                if pad_name in combined_pads:
                    msg = f"Duplicate pad name '{pad_name}' in group"
                    raise KeyError(msg)
                combined_pads[pad_name] = pad

        return combined_pads

elements property

Get all unique elements referenced by this group.

snks property

Extract sink pads from all items in the group using names as keys.

srcs property

Extract source pads from all items in the group using names as keys.

select(*pad_names)

Select pads from items in the group by pad name.

Source code in sgn/groups.py
def select(self, *pad_names: str) -> ElementGroup:
    """Select pads from items in the group by pad name."""
    selected_items: list[Element | PadSelection] = []
    pad_names_set = set(pad_names)

    for item in self.items:
        if isinstance(item, PadSelection):
            # For existing pad selections, intersect with requested pad names
            element = item.element
            available_pads = item.pad_names
        else:
            # For elements, get all their pad names
            element = item
            available_pads = set()
            if isinstance(element, (SourceElement, TransformElement)):
                available_pads.update(element.srcs.keys())
            if isinstance(element, (TransformElement, SinkElement)):
                available_pads.update(element.snks.keys())

        # Find intersection of available pads with requested pad names
        matching_pads = available_pads & pad_names_set
        if matching_pads:
            selected_items.append(
                PadSelection(element=element, pad_names=matching_pads)
            )

    return ElementGroup(items=selected_items)

sgn.groups.select(target, *pad_names)

select(target: Element, *pad_names: str) -> PadSelection
select(target: PadSelection, *pad_names: str) -> PadSelection
select(target: ElementGroup, *pad_names: str) -> ElementGroup

Create or refine pad selections from elements, existing selections, or groups.

Parameters:

Name Type Description Default
target Element | PadSelection | ElementGroup

The element, pad selection, or element group to select pads from.

required
*pad_names str

Names of the pads to select.

()

Returns:

Type Description
PadSelection | ElementGroup

A PadSelection if target is Element or PadSelection,

PadSelection | ElementGroup

or ElementGroup if target is ElementGroup.

Examples:

>>> src = IterSource(name="src", source_pad_names=["H1", "L1", "V1"])
>>> h1_only = select(src, "H1")
>>> h1_l1_only = select(src, "H1", "L1")
>>>
>>> # Narrow an existing selection
>>> h1_from_selection = select(h1_l1_only, "H1")
>>>
>>> # Select from a group
>>> sources = group(src1, src2)
>>> h1_from_group = select(sources, "H1")
Source code in sgn/groups.py
def select(
    target: Element | PadSelection | ElementGroup, *pad_names: str
) -> PadSelection | ElementGroup:
    """Create or refine pad selections from elements, existing selections, or groups.

    Args:
        target: The element, pad selection, or element group to select pads from.
        *pad_names: Names of the pads to select.

    Returns:
        A PadSelection if target is Element or PadSelection,
        or ElementGroup if target is ElementGroup.

    Examples:
        >>> src = IterSource(name="src", source_pad_names=["H1", "L1", "V1"])
        >>> h1_only = select(src, "H1")
        >>> h1_l1_only = select(src, "H1", "L1")
        >>>
        >>> # Narrow an existing selection
        >>> h1_from_selection = select(h1_l1_only, "H1")
        >>>
        >>> # Select from a group
        >>> sources = group(src1, src2)
        >>> h1_from_group = select(sources, "H1")
    """
    match target:
        case SourceElement() | TransformElement() | SinkElement():
            return PadSelection(element=target, pad_names=set(pad_names))
        case PadSelection():
            # Narrow the existing selection by intersecting pad names
            new_pad_names = target.pad_names & set(pad_names)
            if not new_pad_names:
                msg = (
                    f"No matching pads found. Requested: {set(pad_names)}, "
                    f"Available: {target.pad_names}"
                )
                raise ValueError(msg)
            return PadSelection(element=target.element, pad_names=new_pad_names)
        case ElementGroup():
            return target.select(*pad_names)
        case _:
            msg = f"Expected Element, PadSelection, or ElementGroup, got {type(target)}"
            raise TypeError(msg)

sgn.groups.group(*items)

Create a unified group from elements, pad selections, and existing groups.

This function always returns an ElementGroup that holds elements and/or pad selections. The decision of which pads to extract is deferred until the group is used in pipeline.connect().

Parameters:

Name Type Description Default
*items Element | PadSelection | ElementGroup

Elements, pad selections, and/or existing ElementGroups to combine.

()

Returns:

Type Description
ElementGroup

A group containing all the provided items.

Examples:

>>> src1 = IterSource(name="src1", source_pad_names=["H1"])
>>> src2 = IterSource(name="src2", source_pad_names=["L1", "V1"])
>>> snk = Sink(name="snk", sink_pad_names=["H1", "L1"])
>>>
>>> # Group elements
>>> sources = group(src1, src2)
>>>
>>> # Group with pad selection
>>> l1_only = select(src2, "L1")
>>> sources_with_selection = group(src1, l1_only)
>>>
>>> # Connect groups
>>> pipeline.connect(sources_with_selection, snk)
Source code in sgn/groups.py
def group(*items: Element | PadSelection | ElementGroup) -> ElementGroup:
    """Create a unified group from elements, pad selections, and existing groups.

    This function always returns an ElementGroup that holds elements and/or
    pad selections. The decision of which pads to extract is deferred until
    the group is used in pipeline.connect().

    Args:
        *items: Elements, pad selections, and/or existing ElementGroups to combine.

    Returns:
        A group containing all the provided items.

    Examples:
        >>> src1 = IterSource(name="src1", source_pad_names=["H1"])
        >>> src2 = IterSource(name="src2", source_pad_names=["L1", "V1"])
        >>> snk = Sink(name="snk", sink_pad_names=["H1", "L1"])
        >>>
        >>> # Group elements
        >>> sources = group(src1, src2)
        >>>
        >>> # Group with pad selection
        >>> l1_only = select(src2, "L1")
        >>> sources_with_selection = group(src1, l1_only)
        >>>
        >>> # Connect groups
        >>> pipeline.connect(sources_with_selection, snk)
    """
    all_items: list[Element | PadSelection] = []

    for item in items:
        if isinstance(item, PadSelection):
            all_items.append(item)
        elif isinstance(item, (SourceElement, TransformElement, SinkElement)):
            all_items.append(item)
        elif isinstance(item, ElementGroup):
            all_items.extend(item.items)
        else:
            msg = f"Expected Element, PadSelection, or ElementGroup, got {type(item)}"
            raise TypeError(msg)

    return ElementGroup(items=all_items)

Usage Examples

Basic Element Grouping

from sgn import Pipeline, IterSource, NullSink
from sgn.groups import group

# Create multiple sources
src1 = IterSource(name="src1", source_pad_names=["H1"])
src2 = IterSource(name="src2", source_pad_names=["L1"])

# Create multiple sinks
sink1 = NullSink(name="sink1", sink_pad_names=["H1"])
sink2 = NullSink(name="sink2", sink_pad_names=["L1"])

# Group elements for convenient connection
sources = group(src1, src2)
sinks = group(sink1, sink2)

pipeline = Pipeline()
pipeline.connect(sources, sinks)  # Connects H1->H1, L1->L1 automatically

Pad Selection

from sgn import Pipeline, IterSource, NullSink
from sgn.groups import select

# Create a source with multiple pads
src = IterSource(name="src", source_pad_names=["H1", "L1", "V1"])

# Create a sink that only needs specific data
sink = NullSink(name="sink", sink_pad_names=["H1", "L1"])

# Select only the pads we need from the source
selected_pads = select(src, "H1", "L1")  # Exclude V1

pipeline = Pipeline()
pipeline.connect(selected_pads, sink)  # Only H1 and L1 are connected

Combined Grouping and Selection

from sgn import Pipeline, IterSource, NullSink
from sgn.groups import group, select

# Create sources with different pad configurations
src1 = IterSource(name="src1", source_pad_names=["H1"])
src2 = IterSource(name="src2", source_pad_names=["L1", "V1", "extra"])

# Create a sink that expects specific pads
sink = NullSink(name="sink", sink_pad_names=["H1", "L1"])

# Group elements, selecting only the needed pads from src2
sources = group(src1, select(src2, "L1"))  # src1 all pads, src2 only L1

pipeline = Pipeline()
pipeline.connect(sources, sink)  # H1->H1, L1->L1

Explicit Mapping with Groups

You can still use explicit mapping when working with groups for cases where automatic matching isn't sufficient:

from sgn import Pipeline, IterSource, NullSink
from sgn.groups import group

# Create sources and sinks with different pad names
src1 = IterSource(name="src1", source_pad_names=["out1"])
src2 = IterSource(name="src2", source_pad_names=["out2"])

sink1 = NullSink(name="sink1", sink_pad_names=["in1"])
sink2 = NullSink(name="sink2", sink_pad_names=["in2"])

sources = group(src1, src2)
sinks = group(sink1, sink2)

pipeline = Pipeline()
pipeline.connect(sources, sinks, link_map={
    "in1": "out1",  # Map sink pad names to source pad names
    "in2": "out2"
})

Linking Strategies

When using pipeline.connect() with groups, SGN employs intelligent linking strategies:

1-to-1 Matching

When source and sink groups have the same number of pads with matching names:

# H1->H1, L1->L1 (perfect match)
sources = group(src_h1, src_l1)  # H1, L1 pads
sinks = group(sink_h1, sink_l1)  # H1, L1 pads
pipeline.connect(sources, sinks)

N-to-1 Linking

Multiple source pads connecting to a single sink pad:

# Two sources -> single H1 sink
sources = group(src1, src2)  # Two source pads
sink = NullSink(name="sink", sink_pad_names=["H1"])  # One H1 pad
pipeline.connect(sources, sink)

1-to-N Linking

Single source pad connecting to multiple sink pads:

# Single H1 source -> two sinks
src = IterSource(name="src", source_pad_names=["H1"])  # One source pad
sinks = group(sink1, sink2)  # Two sink pads
pipeline.connect(src, sinks)