Skip to content

Groups Module

The sgn.groups module provides powerful utilities for grouping elements and selecting specific pads in SGN pipelines. This module enables more intuitive and flexible pipeline construction by allowing you to work with collections of elements and specific pad selections.

Key Concepts

Element Grouping

Element grouping allows you to treat multiple elements as a single unit when connecting pipelines. This is particularly useful when you have multiple sources that need to connect to multiple sinks, or when you want to organize elements logically.

Pad Selection

Pad selection enables you to specify exactly which pads from an element should be used in pipeline operations. This provides fine-grained control over data flow and helps prevent unintended connections.

Classes and Functions

sgn.groups.PadSelection dataclass

Represents a selection of specific pads from an element.

This allows users to specify exactly which pads from an element should be used in grouping operations.

Source code in sgn/groups.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@dataclass
class PadSelection:
    """Represents a selection of specific pads from an element.

    This allows users to specify exactly which pads from an element
    should be used in grouping operations.
    """

    element: Element
    pad_names: set[str]

    def __post_init__(self) -> None:
        """Validate that the selected pad names exist on the element."""
        all_pad_names = set()

        # Add source pad names if element has them
        if isinstance(self.element, (SourceElement, TransformElement)):
            all_pad_names.update(self.element.srcs.keys())

        # Add sink pad names if element has them
        if isinstance(self.element, (TransformElement, SinkElement)):
            all_pad_names.update(self.element.snks.keys())

        invalid_names = self.pad_names - all_pad_names
        if invalid_names:
            msg = (
                f"Pad names {invalid_names} not found on element '{self.element.name}'"
            )
            raise ValueError(msg)

    @property
    def srcs(self) -> Dict[str, SourcePad]:
        """Extract selected source pads from the element using names as keys."""
        if isinstance(self.element, (SourceElement, TransformElement)):
            return {
                name: pad
                for name, pad in self.element.srcs.items()
                if name in self.pad_names
            }
        return {}

    @property
    def snks(self) -> Dict[str, SinkPad]:
        """Extract selected sink pads from the element using names as keys."""
        if isinstance(self.element, (TransformElement, SinkElement)):
            return {
                name: pad
                for name, pad in self.element.snks.items()
                if name in self.pad_names
            }
        return {}

    @property
    def elements(self) -> list[Element]:
        """Get the element referenced by this selection."""
        return [self.element]

elements property

Get the element referenced by this selection.

snks property

Extract selected sink pads from the element using names as keys.

srcs property

Extract selected source pads from the element using names as keys.

__post_init__()

Validate that the selected pad names exist on the element.

Source code in sgn/groups.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __post_init__(self) -> None:
    """Validate that the selected pad names exist on the element."""
    all_pad_names = set()

    # Add source pad names if element has them
    if isinstance(self.element, (SourceElement, TransformElement)):
        all_pad_names.update(self.element.srcs.keys())

    # Add sink pad names if element has them
    if isinstance(self.element, (TransformElement, SinkElement)):
        all_pad_names.update(self.element.snks.keys())

    invalid_names = self.pad_names - all_pad_names
    if invalid_names:
        msg = (
            f"Pad names {invalid_names} not found on element '{self.element.name}'"
        )
        raise ValueError(msg)

sgn.groups.ElementGroup dataclass

A unified group for elements and pad selections.

This class holds a collection of elements and pad selections without determining upfront whether to extract source or sink pads. The actual pad extraction is deferred until the group is used in pipeline.connect(), where the context (source vs sink position) determines which pads to extract.

Source code in sgn/groups.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@dataclass
class ElementGroup:
    """A unified group for elements and pad selections.

    This class holds a collection of elements and pad selections without
    determining upfront whether to extract source or sink pads. The actual
    pad extraction is deferred until the group is used in pipeline.connect(),
    where the context (source vs sink position) determines which pads to extract.
    """

    items: list[Union[Element, PadSelection]] = field(default_factory=list)

    def select(self, *element_names: str) -> ElementGroup:
        """Select a subset of items by element name."""
        selected_items = []
        for item in self.items:
            if isinstance(item, PadSelection):
                element_name = item.element.name
            else:
                element_name = item.name
            if element_name in element_names:
                selected_items.append(item)
        return ElementGroup(items=selected_items)

    @property
    def elements(self) -> list[Element]:
        """Get all unique elements referenced by this group."""
        seen = set()
        unique_elements = []
        for item in self.items:
            if isinstance(item, PadSelection):
                element = item.element
            else:
                element = item
            element_id = id(element)
            if element_id not in seen:
                seen.add(element_id)
                unique_elements.append(element)
        return unique_elements

    @property
    def srcs(self) -> Dict[str, SourcePad]:
        """Extract source pads from all items in the group using names as keys."""
        combined_pads = {}

        for item in self.items:
            if isinstance(item, SinkElement):
                msg = f"Element '{item.name}' is a SinkElement and has no source pads"
                raise ValueError(msg)

            for pad_name, pad in item.srcs.items():
                if pad_name in combined_pads:
                    msg = f"Duplicate pad name '{pad_name}' in group"
                    raise KeyError(msg)
                combined_pads[pad_name] = pad

        return combined_pads

    @property
    def snks(self) -> Dict[str, SinkPad]:
        """Extract sink pads from all items in the group using names as keys."""
        combined_pads = {}

        for item in self.items:
            if isinstance(item, SourceElement):
                msg = f"Element '{item.name}' is a SourceElement and has no sink pads"
                raise ValueError(msg)

            for pad_name, pad in item.snks.items():
                if pad_name in combined_pads:
                    msg = f"Duplicate pad name '{pad_name}' in group"
                    raise KeyError(msg)
                combined_pads[pad_name] = pad

        return combined_pads

elements property

Get all unique elements referenced by this group.

snks property

Extract sink pads from all items in the group using names as keys.

srcs property

Extract source pads from all items in the group using names as keys.

select(*element_names)

Select a subset of items by element name.

Source code in sgn/groups.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def select(self, *element_names: str) -> ElementGroup:
    """Select a subset of items by element name."""
    selected_items = []
    for item in self.items:
        if isinstance(item, PadSelection):
            element_name = item.element.name
        else:
            element_name = item.name
        if element_name in element_names:
            selected_items.append(item)
    return ElementGroup(items=selected_items)

sgn.groups.select(element, *pad_names)

Create a PadSelection for specific pads from an element.

Parameters:

Name Type Description Default
element Element

The element to select pads from.

required
*pad_names str

Names of the pads to select.

()

Returns:

Type Description
PadSelection

A selection representing the specified pads from the element.

Examples:

>>> src = IterSource(name="src", source_pad_names=["H1", "L1"])
>>> h1_only = select(src, "H1")
>>> group_with_selection = group(other_src, h1_only)
Source code in sgn/groups.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def select(element: Element, *pad_names: str) -> PadSelection:
    """Create a PadSelection for specific pads from an element.

    Args:
        element: The element to select pads from.
        *pad_names: Names of the pads to select.

    Returns:
        A selection representing the specified pads from the element.

    Examples:
        >>> src = IterSource(name="src", source_pad_names=["H1", "L1"])
        >>> h1_only = select(src, "H1")
        >>> group_with_selection = group(other_src, h1_only)
    """
    return PadSelection(element=element, pad_names=set(pad_names))

sgn.groups.group(*items)

Create a unified group from elements, pad selections, and existing groups.

This function always returns an ElementGroup that holds elements and/or pad selections. The decision of which pads to extract is deferred until the group is used in pipeline.connect().

Parameters:

Name Type Description Default
*items Union[Element, PadSelection, ElementGroup]

Elements, pad selections, and/or existing ElementGroups to combine.

()

Returns:

Type Description
ElementGroup

A group containing all the provided items.

Examples:

>>> src1 = IterSource(name="src1", source_pad_names=["H1"])
>>> src2 = IterSource(name="src2", source_pad_names=["L1", "V1"])
>>> snk = Sink(name="snk", sink_pad_names=["H1", "L1"])
>>>
>>> # Group elements
>>> sources = group(src1, src2)
>>>
>>> # Group with pad selection
>>> l1_only = select(src2, "L1")
>>> sources_with_selection = group(src1, l1_only)
>>>
>>> # Connect groups
>>> pipeline.connect(sources_with_selection, snk)
Source code in sgn/groups.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def group(*items: Union[Element, PadSelection, ElementGroup]) -> ElementGroup:
    """Create a unified group from elements, pad selections, and existing groups.

    This function always returns an ElementGroup that holds elements and/or
    pad selections. The decision of which pads to extract is deferred until
    the group is used in pipeline.connect().

    Args:
        *items: Elements, pad selections, and/or existing ElementGroups to combine.

    Returns:
        A group containing all the provided items.

    Examples:
        >>> src1 = IterSource(name="src1", source_pad_names=["H1"])
        >>> src2 = IterSource(name="src2", source_pad_names=["L1", "V1"])
        >>> snk = Sink(name="snk", sink_pad_names=["H1", "L1"])
        >>>
        >>> # Group elements
        >>> sources = group(src1, src2)
        >>>
        >>> # Group with pad selection
        >>> l1_only = select(src2, "L1")
        >>> sources_with_selection = group(src1, l1_only)
        >>>
        >>> # Connect groups
        >>> pipeline.connect(sources_with_selection, snk)
    """
    all_items: list[Union[Element, PadSelection]] = []

    for item in items:
        if isinstance(item, PadSelection):
            all_items.append(item)
        elif isinstance(item, (SourceElement, TransformElement, SinkElement)):
            all_items.append(item)
        elif isinstance(item, ElementGroup):
            all_items.extend(item.items)
        else:
            msg = f"Expected Element, PadSelection, or ElementGroup, got {type(item)}"
            raise TypeError(msg)

    return ElementGroup(items=all_items)

Usage Examples

Basic Element Grouping

from sgn import Pipeline, IterSource, NullSink
from sgn.groups import group

# Create multiple sources
src1 = IterSource(name="src1", source_pad_names=["H1"])
src2 = IterSource(name="src2", source_pad_names=["L1"])

# Create multiple sinks
sink1 = NullSink(name="sink1", sink_pad_names=["H1"])
sink2 = NullSink(name="sink2", sink_pad_names=["L1"])

# Group elements for convenient connection
sources = group(src1, src2)
sinks = group(sink1, sink2)

pipeline = Pipeline()
pipeline.connect(sources, sinks)  # Connects H1->H1, L1->L1 automatically

Pad Selection

from sgn import Pipeline, IterSource, NullSink
from sgn.groups import select

# Create a source with multiple pads
src = IterSource(name="src", source_pad_names=["H1", "L1", "V1"])

# Create a sink that only needs specific data
sink = NullSink(name="sink", sink_pad_names=["H1", "L1"])

# Select only the pads we need from the source
selected_pads = select(src, "H1", "L1")  # Exclude V1

pipeline = Pipeline()
pipeline.connect(selected_pads, sink)  # Only H1 and L1 are connected

Combined Grouping and Selection

from sgn import Pipeline, IterSource, NullSink
from sgn.groups import group, select

# Create sources with different pad configurations
src1 = IterSource(name="src1", source_pad_names=["H1"])
src2 = IterSource(name="src2", source_pad_names=["L1", "V1", "extra"])

# Create a sink that expects specific pads
sink = NullSink(name="sink", sink_pad_names=["H1", "L1"])

# Group elements, selecting only the needed pads from src2
sources = group(src1, select(src2, "L1"))  # src1 all pads, src2 only L1

pipeline = Pipeline()
pipeline.connect(sources, sink)  # H1->H1, L1->L1

Explicit Mapping with Groups

You can still use explicit mapping when working with groups for cases where automatic matching isn't sufficient:

from sgn import Pipeline, IterSource, NullSink
from sgn.groups import group

# Create sources and sinks with different pad names
src1 = IterSource(name="src1", source_pad_names=["out1"])
src2 = IterSource(name="src2", source_pad_names=["out2"])

sink1 = NullSink(name="sink1", sink_pad_names=["in1"])
sink2 = NullSink(name="sink2", sink_pad_names=["in2"])

sources = group(src1, src2)
sinks = group(sink1, sink2)

pipeline = Pipeline()
pipeline.connect(sources, sinks, link_map={
    "in1": "out1",  # Map sink pad names to source pad names
    "in2": "out2"
})

Linking Strategies

When using pipeline.connect() with groups, SGN employs intelligent linking strategies:

1-to-1 Matching

When source and sink groups have the same number of pads with matching names:

# H1->H1, L1->L1 (perfect match)
sources = group(src_h1, src_l1)  # H1, L1 pads
sinks = group(sink_h1, sink_l1)  # H1, L1 pads
pipeline.connect(sources, sinks)

N-to-1 Linking

Multiple source pads connecting to a single sink pad:

# Two sources -> single H1 sink
sources = group(src1, src2)  # Two source pads
sink = NullSink(name="sink", sink_pad_names=["H1"])  # One H1 pad
pipeline.connect(sources, sink)

1-to-N Linking

Single source pad connecting to multiple sink pads:

# Single H1 source -> two sinks
src = IterSource(name="src", source_pad_names=["H1"])  # One source pad
sinks = group(sink1, sink2)  # Two sink pads
pipeline.connect(src, sinks)