Skip to content

sgn.base

Base classes for building a graph of elements and pads.

InternalPad dataclass

Bases: UniqueID, PadLike

A pad that sits inside an element and is called between sink and source pads. Internal pads are connected in the elements internal graph according to the below (data flows top to bottom)

snk1 ... snkN (if exist) \ ... // internal (always exists) // ... \ src1 ... srcM (if exist)

Parameters:

Name Type Description Default
element Element

Element, The Element instance associated with this pad

required
call Callable

Callable, The function that will be called during graph execution for this pad

required
name str

str, optional, The unique name for this object

''
Source code in sgn/base.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
@dataclass(eq=False, repr=False)
class InternalPad(UniqueID, PadLike):
    """A pad that sits inside an element and is called between sink and source pads.
    Internal pads are connected in the elements internal graph according to the below
    (data flows top to bottom)

    snk1   ...  snkN     (if exist)
      \\   ...   //
         internal      (always exists)
      //   ...   \\
     src1  ...  srcM     (if exist)

    Args:
        element:
            Element, The Element instance associated with this pad
        call:
            Callable, The function that will be called during graph execution for
            this pad
        name:
            str, optional, The unique name for this object
    """

    async def __call__(self) -> None:
        """When called, an internal pad receives a Frame from the element that the pad
        belongs to."""
        self.call()

__call__() async

When called, an internal pad receives a Frame from the element that the pad belongs to.

Source code in sgn/base.py
228
229
230
231
async def __call__(self) -> None:
    """When called, an internal pad receives a Frame from the element that the pad
    belongs to."""
    self.call()

SinkElement dataclass

Bases: ABC, ElementLike, Generic[FrameLike]

Sink element represents a terminal node in a pipeline, that typically writes data to disk, etc. Sink_pads must exist but not source_pads.

Parameters:

Name Type Description Default
name str

str, optional, The unique name for this object

''
sink_pad_names Sequence[str]

list, optional, Set the list of sink pad names. These need to be unique for an element but not for an application. The resulting full names will be made with ":sink:"

list()
Source code in sgn/base.py
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
@dataclass(kw_only=True)
class SinkElement(ABC, ElementLike, Generic[FrameLike]):
    """Sink element represents a terminal node in a pipeline, that typically writes data
    to disk, etc. Sink_pads must exist but not source_pads.

    Args:
        name:
            str, optional, The unique name for this object
        sink_pad_names:
            list, optional, Set the list of sink pad names. These need to be unique for
            an element but not for an application. The resulting full names will be
            made with "<self.name>:sink:<sink_pad_name>"
    """

    sink_pad_names: Sequence[str] = field(default_factory=list)

    def __post_init__(self):
        """Establish the sink pads and graph attributes."""
        super().__post_init__()
        self.sink_pads = [
            SinkPad(
                name=f"{self.name}:snk:{n}",
                element=self,
                call=self.pull,
            )
            for n in self.sink_pad_names
        ]
        # short names for easier recall
        self.snks = {n: p for n, p in zip(self.sink_pad_names, self.sink_pads)}
        self.rsnks = {p: n for n, p in zip(self.sink_pad_names, self.sink_pads)}
        self._at_eos = {p: False for p in self.sink_pads}
        assert self.sink_pads and not self.source_pads
        self.sink_pad_names_full = [p.name for p in self.sink_pads]

        # Update graph to be (all sinks -> internal)
        self.graph.update({self.internal_pad: set(self.sink_pads)})

    @property
    def at_eos(self) -> bool:
        """If frames on any sink pads are End of Stream (EOS), then mark this whole
        element as EOS.

        Returns:
            bool, True if any sink pad is at EOS, False otherwise
        """
        # TODO generalize this to be able to choose any v. all EOS propagation
        return any(self._at_eos.values())

    def mark_eos(self, pad: SinkPad) -> None:
        """Marks a sink pad as receiving the End of Stream (EOS). The EOS marker signals
        that no more frames will be received on this pad.

        Args:
            pad:
                SinkPad, The sink pad that is receiving the
        """
        self._at_eos[pad] = True

    @abstractmethod
    def pull(self, pad: SinkPad, frame: FrameLike) -> None:
        """Pull for a SinkElement represents the action of associating a frame with a
        particular input source pad a frame. This function must be provided by the
        subclass, and is where any "final" behavior must occur, e.g. writing to disk,
        etc.

        Args:
            pad:
                SinkPad, The sink pad that is receiving the frame
            frame:
                Frame, The frame that is being received
        """
        ...

at_eos property

If frames on any sink pads are End of Stream (EOS), then mark this whole element as EOS.

Returns:

Type Description
bool

bool, True if any sink pad is at EOS, False otherwise

__post_init__()

Establish the sink pads and graph attributes.

Source code in sgn/base.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
def __post_init__(self):
    """Establish the sink pads and graph attributes."""
    super().__post_init__()
    self.sink_pads = [
        SinkPad(
            name=f"{self.name}:snk:{n}",
            element=self,
            call=self.pull,
        )
        for n in self.sink_pad_names
    ]
    # short names for easier recall
    self.snks = {n: p for n, p in zip(self.sink_pad_names, self.sink_pads)}
    self.rsnks = {p: n for n, p in zip(self.sink_pad_names, self.sink_pads)}
    self._at_eos = {p: False for p in self.sink_pads}
    assert self.sink_pads and not self.source_pads
    self.sink_pad_names_full = [p.name for p in self.sink_pads]

    # Update graph to be (all sinks -> internal)
    self.graph.update({self.internal_pad: set(self.sink_pads)})

mark_eos(pad)

Marks a sink pad as receiving the End of Stream (EOS). The EOS marker signals that no more frames will be received on this pad.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, The sink pad that is receiving the

required
Source code in sgn/base.py
465
466
467
468
469
470
471
472
473
def mark_eos(self, pad: SinkPad) -> None:
    """Marks a sink pad as receiving the End of Stream (EOS). The EOS marker signals
    that no more frames will be received on this pad.

    Args:
        pad:
            SinkPad, The sink pad that is receiving the
    """
    self._at_eos[pad] = True

pull(pad, frame) abstractmethod

Pull for a SinkElement represents the action of associating a frame with a particular input source pad a frame. This function must be provided by the subclass, and is where any "final" behavior must occur, e.g. writing to disk, etc.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, The sink pad that is receiving the frame

required
frame FrameLike

Frame, The frame that is being received

required
Source code in sgn/base.py
475
476
477
478
479
480
481
482
483
484
485
486
487
488
@abstractmethod
def pull(self, pad: SinkPad, frame: FrameLike) -> None:
    """Pull for a SinkElement represents the action of associating a frame with a
    particular input source pad a frame. This function must be provided by the
    subclass, and is where any "final" behavior must occur, e.g. writing to disk,
    etc.

    Args:
        pad:
            SinkPad, The sink pad that is receiving the frame
        frame:
            Frame, The frame that is being received
    """
    ...

SinkPad dataclass

Bases: UniqueID, PadLike

A pad that receives a data Frame when called. When linked, it returns a dictionary suitable for building a graph in graphlib.

Parameters:

Name Type Description Default
element Element

Element, The Element instance associated with this pad

required
call Callable

Callable, The function that will be called during graph execution for this pad, takes two arguments, the pad and the frame

required
name str

str, optional, The unique name for this object

''
other Optional[SourcePad]

SourcePad, optional, This holds the source pad that is linked to this sink pad, default None

None
input Optional[Frame]

Frame, optional, This holds the Frame provided by the linked source pad. Generally it gets set when this SinkPad is called, default None

None
data_spec Optional[DataSpec]

DataSpec, optional, This holds a specification for the data stored in frames, and is expected to be consistent across frames passing through this pad. This is set when this sink pad is first called

None
Source code in sgn/base.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
@dataclass(eq=False, repr=False)
class SinkPad(UniqueID, PadLike):
    """A pad that receives a data Frame when called.  When linked, it returns a
    dictionary suitable for building a graph in graphlib.

    Args:
        element:
            Element, The Element instance associated with this pad
        call:
            Callable, The function that will be called during graph execution for this
            pad, takes two arguments, the pad and the frame
        name:
            str, optional, The unique name for this object
        other:
            SourcePad, optional, This holds the source pad that is linked to this sink
            pad, default None
        input:
            Frame, optional, This holds the Frame provided by the linked source pad.
            Generally it gets set when this SinkPad is called, default None
        data_spec:
            DataSpec, optional, This holds a specification for the data stored
            in frames, and is expected to be consistent across frames passing
            through this pad. This is set when this sink pad is first called
    """

    other: Optional[SourcePad] = None
    input: Optional[Frame] = None
    data_spec: Optional[DataSpec] = None

    def link(self, other: SourcePad) -> dict[Pad, set[Pad]]:
        """Returns a dictionary of dependencies suitable for adding to a graphlib graph.

        Args:
            other:
                SourcePad, The source pad to link to this sink pad

        Notes:
            Many-to-one (source, sink) Not Supported:
                Only sink pads can be linked. A sink pad can be linked to only one
                source pad, but multiple sink pads may link to the same source pad.

        Returns:
            dict[SinkPad, set[SourcePad]], a dictionary of dependencies suitable for
            adding to a graphlib graph
        """
        assert isinstance(other, SourcePad), "other is not an instance of SourcePad"
        self.other = other
        self.is_linked = True
        other.is_linked = True
        return {self: {other}}

    async def __call__(self) -> None:
        """When called, a sink pad gets a Frame from the linked source pad and then
        calls the element's provided call function.

        Notes:
            Pad Call Order:
                pads must be called in the correct order such that the upstream sources
                have new information by the time call is invoked. This should be done
                within a directed acyclic graph such as those provided by the
                apps.Pipeline class.
        """
        assert isinstance(self.other, SourcePad), "Sink pad has not been linked"
        self.input = self.other.output
        assert isinstance(self.input, Frame)
        if self.data_spec is None:
            self.data_spec = self.input.spec
        if not self.data_spec == self.input.spec:
            msg = (
                f"frame received by {self.name} is inconsistent with "
                "previously received frames. previous data specification: "
                f"{self.data_spec}, current data specification: {self.input.spec}"
            )
            raise ValueError(msg)
        self.call(self, self.input)
        if self.element is not None:
            logger.getChild(self.element.name).info("\t%s:%s", self, self.input)

__call__() async

When called, a sink pad gets a Frame from the linked source pad and then calls the element's provided call function.

Notes

Pad Call Order: pads must be called in the correct order such that the upstream sources have new information by the time call is invoked. This should be done within a directed acyclic graph such as those provided by the apps.Pipeline class.

Source code in sgn/base.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
async def __call__(self) -> None:
    """When called, a sink pad gets a Frame from the linked source pad and then
    calls the element's provided call function.

    Notes:
        Pad Call Order:
            pads must be called in the correct order such that the upstream sources
            have new information by the time call is invoked. This should be done
            within a directed acyclic graph such as those provided by the
            apps.Pipeline class.
    """
    assert isinstance(self.other, SourcePad), "Sink pad has not been linked"
    self.input = self.other.output
    assert isinstance(self.input, Frame)
    if self.data_spec is None:
        self.data_spec = self.input.spec
    if not self.data_spec == self.input.spec:
        msg = (
            f"frame received by {self.name} is inconsistent with "
            "previously received frames. previous data specification: "
            f"{self.data_spec}, current data specification: {self.input.spec}"
        )
        raise ValueError(msg)
    self.call(self, self.input)
    if self.element is not None:
        logger.getChild(self.element.name).info("\t%s:%s", self, self.input)

Returns a dictionary of dependencies suitable for adding to a graphlib graph.

Parameters:

Name Type Description Default
other SourcePad

SourcePad, The source pad to link to this sink pad

required
Notes

Many-to-one (source, sink) Not Supported: Only sink pads can be linked. A sink pad can be linked to only one source pad, but multiple sink pads may link to the same source pad.

Returns:

Type Description
dict[Pad, set[Pad]]

dict[SinkPad, set[SourcePad]], a dictionary of dependencies suitable for

dict[Pad, set[Pad]]

adding to a graphlib graph

Source code in sgn/base.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def link(self, other: SourcePad) -> dict[Pad, set[Pad]]:
    """Returns a dictionary of dependencies suitable for adding to a graphlib graph.

    Args:
        other:
            SourcePad, The source pad to link to this sink pad

    Notes:
        Many-to-one (source, sink) Not Supported:
            Only sink pads can be linked. A sink pad can be linked to only one
            source pad, but multiple sink pads may link to the same source pad.

    Returns:
        dict[SinkPad, set[SourcePad]], a dictionary of dependencies suitable for
        adding to a graphlib graph
    """
    assert isinstance(other, SourcePad), "other is not an instance of SourcePad"
    self.other = other
    self.is_linked = True
    other.is_linked = True
    return {self: {other}}

SourceElement dataclass

Bases: ABC, ElementLike

Initialize with a list of source pads. Every source pad is added to the graph with no dependencies.

Parameters:

Name Type Description Default
name str

str, optional, The unique name for this object

''
source_pad_names Sequence[str]

list, optional, Set the list of source pad names. These need to be unique for an element but not for an application. The resulting full names will be made with ":src:"

list()
Source code in sgn/base.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
@dataclass(repr=False, kw_only=True)
class SourceElement(ABC, ElementLike):
    """Initialize with a list of source pads. Every source pad is added to the graph
    with no dependencies.

    Args:
        name:
            str, optional, The unique name for this object
        source_pad_names:
            list, optional, Set the list of source pad names. These need to be unique
            for an element but not for an application. The resulting full names will be
            made with "<self.name>:src:<source_pad_name>"
    """

    source_pad_names: Sequence[str] = field(default_factory=list)

    def __post_init__(self):
        """Establish the source pads and graph attributes."""
        super().__post_init__()
        self.source_pads = [
            SourcePad(
                name=f"{self.name}:src:{n}",
                element=self,
                call=self.new,
            )
            for n in self.source_pad_names
        ]
        # short names for easier recall
        self.srcs = {n: p for n, p in zip(self.source_pad_names, self.source_pads)}
        self.rsrcs = {p: n for n, p in zip(self.source_pad_names, self.source_pads)}
        assert self.source_pads and not self.sink_pads
        self.graph.update({s: {self.internal_pad} for s in self.source_pads})

    @abstractmethod
    def new(self, pad: SourcePad) -> Frame:
        """New frames are created on "pad". Must be provided by subclass.

        Args:
            pad:
                SourcePad, The source pad through which the frame is passed

        Returns:
            Frame, The new frame to be passed through the source pad
        """
        ...

__post_init__()

Establish the source pads and graph attributes.

Source code in sgn/base.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def __post_init__(self):
    """Establish the source pads and graph attributes."""
    super().__post_init__()
    self.source_pads = [
        SourcePad(
            name=f"{self.name}:src:{n}",
            element=self,
            call=self.new,
        )
        for n in self.source_pad_names
    ]
    # short names for easier recall
    self.srcs = {n: p for n, p in zip(self.source_pad_names, self.source_pads)}
    self.rsrcs = {p: n for n, p in zip(self.source_pad_names, self.source_pads)}
    assert self.source_pads and not self.sink_pads
    self.graph.update({s: {self.internal_pad} for s in self.source_pads})

new(pad) abstractmethod

New frames are created on "pad". Must be provided by subclass.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, The source pad through which the frame is passed

required

Returns:

Type Description
Frame

Frame, The new frame to be passed through the source pad

Source code in sgn/base.py
320
321
322
323
324
325
326
327
328
329
330
331
@abstractmethod
def new(self, pad: SourcePad) -> Frame:
    """New frames are created on "pad". Must be provided by subclass.

    Args:
        pad:
            SourcePad, The source pad through which the frame is passed

    Returns:
        Frame, The new frame to be passed through the source pad
    """
    ...

SourcePad dataclass

Bases: UniqueID, PadLike

A pad that provides a data Frame when called.

Parameters:

Name Type Description Default
element Element

Element, The Element instance associated with this pad

required
call Callable

Callable, The function that will be called during graph execution for this pad

required
name str

str, optional, The unique name for this object

''
output Optional[Frame]

Frame, optional, This attribute is set to be the output Frame when the pad is called.

None
Source code in sgn/base.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@dataclass(eq=False, repr=False)
class SourcePad(UniqueID, PadLike):
    """A pad that provides a data Frame when called.

    Args:
        element:
            Element, The Element instance associated with this pad
        call:
            Callable, The function that will be called during graph execution for
            this pad
        name:
            str, optional, The unique name for this object
        output:
            Frame, optional, This attribute is set to be the output Frame when the pad
            is called.
    """

    output: Optional[Frame] = None

    async def __call__(self) -> None:
        """When called, a source pad receives a Frame from the element that the pad
        belongs to."""
        self.output = self.call(pad=self)
        assert isinstance(self.output, Frame)
        if self.element is not None:
            logger.getChild(self.element.name).info("\t%s : %s", self, self.output)

__call__() async

When called, a source pad receives a Frame from the element that the pad belongs to.

Source code in sgn/base.py
118
119
120
121
122
123
124
async def __call__(self) -> None:
    """When called, a source pad receives a Frame from the element that the pad
    belongs to."""
    self.output = self.call(pad=self)
    assert isinstance(self.output, Frame)
    if self.element is not None:
        logger.getChild(self.element.name).info("\t%s : %s", self, self.output)

TransformElement dataclass

Bases: ABC, ElementLike, Generic[FrameLike]

Both "source_pads" and "sink_pads" must exist. The execution scheduling flow of the logic within a TransformElement is as follows: 1.) all sink pads, 2.) the internal pad, 3.) all source pads. The execution of all downstream logic will be blocked until logic in all upstream pads within the same TransformElement has exited.

Parameters:

Name Type Description Default
name str

str, optional, The unique name for this object

''
source_pad_names Sequence[str]

list, optional, Set the list of source pad names. These need to be unique for an element but not for an application. The resulting full names will be made with ":src:"

list()
sink_pad_names Sequence[str]

list, optional, Set the list of sink pad names. These need to be unique for an element but not for an application. The resulting full names will be made with ":snk:"

list()
Source code in sgn/base.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
@dataclass(repr=False, kw_only=True)
class TransformElement(ABC, ElementLike, Generic[FrameLike]):
    """Both "source_pads" and "sink_pads" must exist. The execution scheduling
    flow of the logic within a TransformElement is as follows: 1.) all sink
    pads, 2.) the internal pad, 3.) all source pads. The execution of all
    downstream logic will be blocked until logic in all upstream pads within
    the same TransformElement has exited.

    Args:
        name:
            str, optional, The unique name for this object
        source_pad_names:
            list, optional, Set the list of source pad names. These need to be unique
            for an element but not for an application. The resulting full names will
            be made with "<self.name>:src:<source_pad_name>"
        sink_pad_names:
            list, optional, Set the list of sink pad names. These need to be unique
            for an element but not for an application. The resulting full names will
            be made with "<self.name>:snk:<sink_pad_name>"
    """

    source_pad_names: Sequence[str] = field(default_factory=list)
    sink_pad_names: Sequence[str] = field(default_factory=list)

    def __post_init__(self):
        """Establish the source pads and sink pads and graph attributes."""
        super().__post_init__()
        self.source_pads = [
            SourcePad(
                name=f"{self.name}:src:{n}",
                element=self,
                call=self.new,
            )
            for n in self.source_pad_names
        ]
        self.sink_pads = [
            SinkPad(
                name=f"{self.name}:snk:{n}",
                element=self,
                call=self.pull,
            )
            for n in self.sink_pad_names
        ]
        # short names for easier recall
        self.srcs = {n: p for n, p in zip(self.source_pad_names, self.source_pads)}
        self.snks = {n: p for n, p in zip(self.sink_pad_names, self.sink_pads)}
        self.rsrcs = {p: n for n, p in zip(self.source_pad_names, self.source_pads)}
        self.rsnks = {p: n for n, p in zip(self.sink_pad_names, self.sink_pads)}
        assert self.source_pads and self.sink_pads

        # Make maximal bipartite graph in two pieces
        # First, (all sinks -> internal)
        self.graph.update({self.internal_pad: set(self.sink_pads)})
        # Second, (internal -> all sources)
        self.graph.update({s: {self.internal_pad} for s in self.source_pads})

    @abstractmethod
    def pull(self, pad: SinkPad, frame: FrameLike) -> None:
        """Pull data from the input pads (source pads of upstream elements), must be
        implemented by subclasses.

        Args:
            pad:
                SinkPad, The sink pad that is receiving the frame
            frame:
                Frame, The frame that is pulled from the source pad
        """
        ...

    @abstractmethod
    def new(self, pad: SourcePad) -> FrameLike:
        """New frames are created on "pad". Must be provided by subclass.

        Args:
            pad:
                SourcePad, The source pad through which the frame is passed

        Returns:
            Frame, The new frame to be passed through the source pad
        """
        ...

__post_init__()

Establish the source pads and sink pads and graph attributes.

Source code in sgn/base.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
def __post_init__(self):
    """Establish the source pads and sink pads and graph attributes."""
    super().__post_init__()
    self.source_pads = [
        SourcePad(
            name=f"{self.name}:src:{n}",
            element=self,
            call=self.new,
        )
        for n in self.source_pad_names
    ]
    self.sink_pads = [
        SinkPad(
            name=f"{self.name}:snk:{n}",
            element=self,
            call=self.pull,
        )
        for n in self.sink_pad_names
    ]
    # short names for easier recall
    self.srcs = {n: p for n, p in zip(self.source_pad_names, self.source_pads)}
    self.snks = {n: p for n, p in zip(self.sink_pad_names, self.sink_pads)}
    self.rsrcs = {p: n for n, p in zip(self.source_pad_names, self.source_pads)}
    self.rsnks = {p: n for n, p in zip(self.sink_pad_names, self.sink_pads)}
    assert self.source_pads and self.sink_pads

    # Make maximal bipartite graph in two pieces
    # First, (all sinks -> internal)
    self.graph.update({self.internal_pad: set(self.sink_pads)})
    # Second, (internal -> all sources)
    self.graph.update({s: {self.internal_pad} for s in self.source_pads})

new(pad) abstractmethod

New frames are created on "pad". Must be provided by subclass.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, The source pad through which the frame is passed

required

Returns:

Type Description
FrameLike

Frame, The new frame to be passed through the source pad

Source code in sgn/base.py
403
404
405
406
407
408
409
410
411
412
413
414
@abstractmethod
def new(self, pad: SourcePad) -> FrameLike:
    """New frames are created on "pad". Must be provided by subclass.

    Args:
        pad:
            SourcePad, The source pad through which the frame is passed

    Returns:
        Frame, The new frame to be passed through the source pad
    """
    ...

pull(pad, frame) abstractmethod

Pull data from the input pads (source pads of upstream elements), must be implemented by subclasses.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, The sink pad that is receiving the frame

required
frame FrameLike

Frame, The frame that is pulled from the source pad

required
Source code in sgn/base.py
390
391
392
393
394
395
396
397
398
399
400
401
@abstractmethod
def pull(self, pad: SinkPad, frame: FrameLike) -> None:
    """Pull data from the input pads (source pads of upstream elements), must be
    implemented by subclasses.

    Args:
        pad:
            SinkPad, The sink pad that is receiving the frame
        frame:
            Frame, The frame that is pulled from the source pad
    """
    ...

UniqueID dataclass

Generic class from which all classes that participate in an execution graph should be derived. Enforces a unique name and hashes based on that name.

Parameters:

Name Type Description Default
name str

str, optional, The unique name for this object, defaults to the objects unique uuid4 hex string if not specified

''
Source code in sgn/base.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@dataclass
class UniqueID:
    """Generic class from which all classes that participate in an execution graph
    should be derived. Enforces a unique name and hashes based on that name.

    Args:
        name:
            str, optional, The unique name for this object, defaults to the objects
            unique uuid4 hex string if not specified
    """

    name: str = ""
    _id: str = field(init=False)

    def __post_init__(self):
        """Handle setup of the UniqueID class, including the `._id` attribute."""
        # give every element a truly unique identifier
        self._id = uuid.uuid4().hex
        if not self.name:
            self.name = self._id

    def __hash__(self) -> int:
        """Compute the hash of the object based on the unique id.

        Notes:
            Motivation:
                we need the Base class to be hashable, so that it can be
                used as a key in a dictionary, but mutable dataclasses are not
                hashable by default, so we have to define our own hash function
                here.
            Stability:
                As currently implemented, the hash of a UniqueID object will not be
                stable across python sessions, and should therefore not be used for
                checksum purposes.

        Returns:
            int, hash of the object
        """
        return hash(self._id)

    def __eq__(self, other) -> bool:
        """Check if two objects are equal based on their unique id and types."""
        return hash(self) == hash(other)

__eq__(other)

Check if two objects are equal based on their unique id and types.

Source code in sgn/base.py
58
59
60
def __eq__(self, other) -> bool:
    """Check if two objects are equal based on their unique id and types."""
    return hash(self) == hash(other)

__hash__()

Compute the hash of the object based on the unique id.

Notes

Motivation: we need the Base class to be hashable, so that it can be used as a key in a dictionary, but mutable dataclasses are not hashable by default, so we have to define our own hash function here. Stability: As currently implemented, the hash of a UniqueID object will not be stable across python sessions, and should therefore not be used for checksum purposes.

Returns:

Type Description
int

int, hash of the object

Source code in sgn/base.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def __hash__(self) -> int:
    """Compute the hash of the object based on the unique id.

    Notes:
        Motivation:
            we need the Base class to be hashable, so that it can be
            used as a key in a dictionary, but mutable dataclasses are not
            hashable by default, so we have to define our own hash function
            here.
        Stability:
            As currently implemented, the hash of a UniqueID object will not be
            stable across python sessions, and should therefore not be used for
            checksum purposes.

    Returns:
        int, hash of the object
    """
    return hash(self._id)

__post_init__()

Handle setup of the UniqueID class, including the ._id attribute.

Source code in sgn/base.py
32
33
34
35
36
37
def __post_init__(self):
    """Handle setup of the UniqueID class, including the `._id` attribute."""
    # give every element a truly unique identifier
    self._id = uuid.uuid4().hex
    if not self.name:
        self.name = self._id