Skip to content

sgn.subprocess

Parallelize

Bases: SignalEOS

A context manager for running SGN pipelines with elements that implement separate processes or threads.

This class manages the lifecycle of workers (processes or threads) in an SGN pipeline, handling worker creation, execution, and cleanup. It also supports shared memory objects that will be automatically cleaned up on exit through the to_shm() method (only applicable for process mode).

Key features include: - Automatic management of worker lifecycle (creation, starting, joining, cleanup) - Shared memory management for efficient data sharing (process mode only) - Signal handling coordination between main process/thread and workers - Resilience against KeyboardInterrupt (Ctrl+C) - workers catch and ignore these signals, allowing the main process to coordinate a clean shutdown - Orderly shutdown to ensure all resources are properly released - Support for both multiprocessing and threading concurrency models

IMPORTANT: When using process mode, code using Parallelize MUST be wrapped within an if name == "main": block. This is required because SGN uses Python's multiprocessing module with the 'spawn' start method, which requires that the main module be importable.

Example with default process mode

def main(): pipeline = Pipeline() with Parallelize(pipeline) as parallelize: subprocess.run()

if name == "main": main()

Example with thread mode

def main(): pipeline = Pipeline() with Parallelize(pipeline, use_threading=True) as parallelize: subprocess.run()

if name == "main": main()

Source code in sgn/subprocess.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
class Parallelize(SignalEOS):
    """
    A context manager for running SGN pipelines with elements that implement
    separate processes or threads.

    This class manages the lifecycle of workers (processes or threads) in an SGN
    pipeline, handling worker creation, execution, and cleanup. It also supports
    shared memory objects that will be automatically cleaned up on exit through the
    to_shm() method (only applicable for process mode).

    Key features include:
    - Automatic management of worker lifecycle (creation, starting, joining, cleanup)
    - Shared memory management for efficient data sharing (process mode only)
    - Signal handling coordination between main process/thread and workers
    - Resilience against KeyboardInterrupt (Ctrl+C) - workers catch and ignore these
      signals, allowing the main process to coordinate a clean shutdown
    - Orderly shutdown to ensure all resources are properly released
    - Support for both multiprocessing and threading concurrency models

    IMPORTANT: When using process mode, code using Parallelize MUST be
    wrapped within an if __name__ == "__main__": block. This is required because SGN
    uses Python's multiprocessing module with the 'spawn' start method, which requires
    that the main module be importable.

    Example with default process mode:
        def main():
            pipeline = Pipeline()
            with Parallelize(pipeline) as parallelize:
                subprocess.run()

        if __name__ == "__main__":
            main()

    Example with thread mode:
        def main():
            pipeline = Pipeline()
            with Parallelize(pipeline, use_threading=True) as parallelize:
                subprocess.run()

        if __name__ == "__main__":
            main()
    """

    shm_list: list = []
    instance_list: list = []
    enabled: bool = False
    # The hard timeout before a worker gets terminated.
    # Workers should cleanup after themselves within this time and exit cleanly.
    # This is a "global" property applied to all subprocesses / subthreads
    join_timeout: float = 10.0  # Increased for CI environments
    # Default flag for whether to use threading (False means use multiprocessing)
    use_threading_default: bool = False
    # Instance variable for thread mode
    use_threading: bool = False

    def __init__(self, pipeline=None, use_threading: Optional[bool] = None):
        """
        Initialize the Parallelize context manager.

        Args:
            pipeline: The pipeline to run
            use_threading: Whether to use threading instead of multiprocessing.
                          If not specified, uses the use_threading_default
        """
        self.pipeline = pipeline
        # Use the specified mode, or fall back to the class default
        self.use_threading = (
            use_threading
            if use_threading is not None
            else Parallelize.use_threading_default
        )

    def __enter__(self):
        try:
            multiprocessing.set_start_method("spawn")
        except RuntimeError:
            pass
        super().__enter__()
        for e in Parallelize.instance_list:
            e.worker.start()
        Parallelize.enabled = True
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        super().__exit__(exc_type, exc_value, exc_traceback)
        # rejoin all the workers
        for e in Parallelize.instance_list:
            if e.in_queue is not None and hasattr(e.in_queue, "cancel_join_thread"):
                e.in_queue.cancel_join_thread()
            if e.out_queue is not None and hasattr(e.out_queue, "cancel_join_thread"):
                e.out_queue.cancel_join_thread()

            if (
                e.worker is not None
                and hasattr(e.worker, "is_alive")
                and e.worker.is_alive()
            ):
                e.worker.join(Parallelize.join_timeout)
                # Only processes can be killed, threads will naturally terminate
                if hasattr(e.worker, "kill") and e.worker.is_alive():
                    e.worker.kill()

        Parallelize.instance_list = []

        # Clean up shared memory (only applicable for process mode)
        if not self.use_threading:
            for d in Parallelize.shm_list:
                multiprocessing.shared_memory.SharedMemory(name=d["name"]).unlink()
            Parallelize.shm_list = []

        Parallelize.enabled = False

    @staticmethod
    def to_shm(name, bytez, **kwargs):
        """
        Create a shared memory object that can be accessed by subprocesses.

        Note: This is only applicable in process mode. In thread mode, shared memory
        is not necessary since threads share the same address space.

        This method creates a shared memory segment that will be automatically
        cleaned up when the Parallelize context manager exits. The shared memory can be
        used to efficiently share large data between processes without serialization
        overhead.

        Args:
            name (str): Unique identifier for the shared memory block
            bytez (bytes or bytearray): Data to store in shared memory
            **kwargs: Additional metadata to store with the shared memory reference

        Returns:
            dict: A dictionary containing the shared memory object and metadata
                  with keys:
                - "name": The name of the shared memory block
                - "shm": The SharedMemory object
                - Any additional key-value pairs from kwargs

        Raises:
            FileExistsError: If shared memory with the given name already exists

        Example:
            shared_data = bytearray("Hello world", "utf-8")
            shm_ref = SubProcess.to_shm("example_data", shared_data)
        """
        try:
            shm = multiprocessing.shared_memory.SharedMemory(
                name=name, create=True, size=len(bytez)
            )
        except FileExistsError as e:
            print(f"Shared memory: {name} already exists")
            print(
                "You can clear the memory by doing "
                f"multiprocessing.shared_memory.SharedMemory(name='{name}').unlink()\n"
            )
            for d in Parallelize.shm_list:
                multiprocessing.shared_memory.SharedMemory(name=d["name"]).unlink()
            Parallelize.shm_list = []
            raise e

        shm.buf[: len(bytez)] = bytez
        out = {"name": name, "shm": shm, **kwargs}
        Parallelize.shm_list.append(out)
        return out

    def run(self):
        """
        Run the pipeline managed by this Parallelize instance.

        This method executes the associated pipeline and ensures proper cleanup
        of worker resources, even in the case of exceptions. It signals all
        workers to stop when the pipeline execution completes or if an exception
        occurs.

        Raises:
            RuntimeError: If an exception occurs during pipeline execution
            AssertionError: If no pipeline was provided to the SubProcess
        """
        assert self.pipeline is not None
        try:
            self.pipeline.run()
        except Exception as e:
            # Signal all workers to stop when an exception occurs
            for p in Parallelize.instance_list:
                p.worker_stop.set()

            # Clean up all workers
            for p in Parallelize.instance_list:
                if p.in_queue is not None and hasattr(p.in_queue, "cancel_join_thread"):
                    p.in_queue.cancel_join_thread()
                if p.out_queue is not None and hasattr(
                    p.out_queue, "cancel_join_thread"
                ):
                    p.out_queue.cancel_join_thread()

                if (
                    p.worker is not None
                    and hasattr(p.worker, "is_alive")
                    and p.worker.is_alive()
                ):
                    p.worker.join(Parallelize.join_timeout)
                    if hasattr(p.worker, "kill") and p.worker.is_alive():
                        p.worker.kill()
            raise RuntimeError(e)

        # Signal all workers to stop when pipeline completes normally
        for p in Parallelize.instance_list:
            p.worker_stop.set()

__init__(pipeline=None, use_threading=None)

Initialize the Parallelize context manager.

Parameters:

Name Type Description Default
pipeline

The pipeline to run

None
use_threading Optional[bool]

Whether to use threading instead of multiprocessing. If not specified, uses the use_threading_default

None
Source code in sgn/subprocess.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def __init__(self, pipeline=None, use_threading: Optional[bool] = None):
    """
    Initialize the Parallelize context manager.

    Args:
        pipeline: The pipeline to run
        use_threading: Whether to use threading instead of multiprocessing.
                      If not specified, uses the use_threading_default
    """
    self.pipeline = pipeline
    # Use the specified mode, or fall back to the class default
    self.use_threading = (
        use_threading
        if use_threading is not None
        else Parallelize.use_threading_default
    )

run()

Run the pipeline managed by this Parallelize instance.

This method executes the associated pipeline and ensures proper cleanup of worker resources, even in the case of exceptions. It signals all workers to stop when the pipeline execution completes or if an exception occurs.

Raises:

Type Description
RuntimeError

If an exception occurs during pipeline execution

AssertionError

If no pipeline was provided to the SubProcess

Source code in sgn/subprocess.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def run(self):
    """
    Run the pipeline managed by this Parallelize instance.

    This method executes the associated pipeline and ensures proper cleanup
    of worker resources, even in the case of exceptions. It signals all
    workers to stop when the pipeline execution completes or if an exception
    occurs.

    Raises:
        RuntimeError: If an exception occurs during pipeline execution
        AssertionError: If no pipeline was provided to the SubProcess
    """
    assert self.pipeline is not None
    try:
        self.pipeline.run()
    except Exception as e:
        # Signal all workers to stop when an exception occurs
        for p in Parallelize.instance_list:
            p.worker_stop.set()

        # Clean up all workers
        for p in Parallelize.instance_list:
            if p.in_queue is not None and hasattr(p.in_queue, "cancel_join_thread"):
                p.in_queue.cancel_join_thread()
            if p.out_queue is not None and hasattr(
                p.out_queue, "cancel_join_thread"
            ):
                p.out_queue.cancel_join_thread()

            if (
                p.worker is not None
                and hasattr(p.worker, "is_alive")
                and p.worker.is_alive()
            ):
                p.worker.join(Parallelize.join_timeout)
                if hasattr(p.worker, "kill") and p.worker.is_alive():
                    p.worker.kill()
        raise RuntimeError(e)

    # Signal all workers to stop when pipeline completes normally
    for p in Parallelize.instance_list:
        p.worker_stop.set()

to_shm(name, bytez, **kwargs) staticmethod

Create a shared memory object that can be accessed by subprocesses.

Note: This is only applicable in process mode. In thread mode, shared memory is not necessary since threads share the same address space.

This method creates a shared memory segment that will be automatically cleaned up when the Parallelize context manager exits. The shared memory can be used to efficiently share large data between processes without serialization overhead.

Parameters:

Name Type Description Default
name str

Unique identifier for the shared memory block

required
bytez bytes or bytearray

Data to store in shared memory

required
**kwargs

Additional metadata to store with the shared memory reference

{}

Returns:

Name Type Description
dict

A dictionary containing the shared memory object and metadata with keys: - "name": The name of the shared memory block - "shm": The SharedMemory object - Any additional key-value pairs from kwargs

Raises:

Type Description
FileExistsError

If shared memory with the given name already exists

Example

shared_data = bytearray("Hello world", "utf-8") shm_ref = SubProcess.to_shm("example_data", shared_data)

Source code in sgn/subprocess.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
@staticmethod
def to_shm(name, bytez, **kwargs):
    """
    Create a shared memory object that can be accessed by subprocesses.

    Note: This is only applicable in process mode. In thread mode, shared memory
    is not necessary since threads share the same address space.

    This method creates a shared memory segment that will be automatically
    cleaned up when the Parallelize context manager exits. The shared memory can be
    used to efficiently share large data between processes without serialization
    overhead.

    Args:
        name (str): Unique identifier for the shared memory block
        bytez (bytes or bytearray): Data to store in shared memory
        **kwargs: Additional metadata to store with the shared memory reference

    Returns:
        dict: A dictionary containing the shared memory object and metadata
              with keys:
            - "name": The name of the shared memory block
            - "shm": The SharedMemory object
            - Any additional key-value pairs from kwargs

    Raises:
        FileExistsError: If shared memory with the given name already exists

    Example:
        shared_data = bytearray("Hello world", "utf-8")
        shm_ref = SubProcess.to_shm("example_data", shared_data)
    """
    try:
        shm = multiprocessing.shared_memory.SharedMemory(
            name=name, create=True, size=len(bytez)
        )
    except FileExistsError as e:
        print(f"Shared memory: {name} already exists")
        print(
            "You can clear the memory by doing "
            f"multiprocessing.shared_memory.SharedMemory(name='{name}').unlink()\n"
        )
        for d in Parallelize.shm_list:
            multiprocessing.shared_memory.SharedMemory(name=d["name"]).unlink()
        Parallelize.shm_list = []
        raise e

    shm.buf[: len(bytez)] = bytez
    out = {"name": name, "shm": shm, **kwargs}
    Parallelize.shm_list.append(out)
    return out

ParallelizeSinkElement dataclass

Bases: SinkElement, _ParallelizeBase, Parallelize

A Sink element that runs data consumption logic in a separate process or thread.

This class extends the standard SinkElement to execute its processing in a separate worker (process or thread). It communicates with the main process/thread through input and output queues, and manages the worker lifecycle. Subclasses must implement the sub_process_internal method to define the consumption logic that runs in the worker.

The design intentionally avoids passing class or instance references to the worker to prevent pickling issues when using process mode. Instead, it passes all necessary data and resources via function arguments.

The implementation includes special handling for KeyboardInterrupt signals. When Ctrl+C is pressed in the terminal, workers will catch and ignore the KeyboardInterrupt, allowing them to continue processing while the main process coordinates a graceful shutdown. This prevents data loss and ensures all resources are properly cleaned up.

Attributes:

Name Type Description
worker_argdict dict

Custom arguments to pass to the worker

queue_maxsize int

Maximum size of the communication queues

err_maxsize int

Maximum size for error data

_use_threading_override bool

Set to True to use threading or False to use multiprocessing. If not specified, uses the Parallelize.use_threading_default

Example with default process mode

@dataclass class MyLoggingSinkElement(ParallelizeSinkElement): def post_init(self): super().post_init()

def pull(self, pad, frame):
    if frame.EOS:
        self.mark_eos(pad)
    # Send the frame to the worker
    self.in_queue.put((pad.name, frame))

@staticmethod
def sub_process_internal(**kwargs):
    inq, worker_stop = kwargs["inq"], kwargs["worker_stop"]

    try:
        # Get data from the main process/thread
        pad_name, frame = inq.get(timeout=1)

        # Process or log the data
        if not frame.EOS:
            print(f"Sink received on {pad_name}: {frame.data}")
        else:
            print(f"Sink received EOS on {pad_name}")

    except Empty:
        pass
Example with thread mode

@dataclass class MyThreadedSinkElement(ParallelizeSinkElement): _use_threading_override = True

# Rest of implementation same as above
Source code in sgn/subprocess.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
@dataclass
class ParallelizeSinkElement(SinkElement, _ParallelizeBase, Parallelize):
    """
    A Sink element that runs data consumption logic in a separate process or thread.

    This class extends the standard SinkElement to execute its processing in a
    separate worker (process or thread). It communicates with the main process/thread
    through input and output queues, and manages the worker lifecycle. Subclasses must
    implement the sub_process_internal method to define the consumption logic that runs
    in the worker.

    The design intentionally avoids passing class or instance references to the
    worker to prevent pickling issues when using process mode. Instead, it passes all
    necessary data and resources via function arguments.

    The implementation includes special handling for KeyboardInterrupt signals.
    When Ctrl+C is pressed in the terminal, workers will catch and ignore the
    KeyboardInterrupt, allowing them to continue processing while the main process
    coordinates a graceful shutdown. This prevents data loss and ensures all resources
    are properly cleaned up.

    Attributes:
        worker_argdict (dict, optional): Custom arguments to pass to the worker
        queue_maxsize (int, optional): Maximum size of the communication queues
        err_maxsize (int): Maximum size for error data
        _use_threading_override (bool, optional): Set to True to use threading or
            False to use multiprocessing. If not specified, uses the
            Parallelize.use_threading_default

    Example with default process mode:
        @dataclass
        class MyLoggingSinkElement(ParallelizeSinkElement):
            def __post_init__(self):
                super().__post_init__()

            def pull(self, pad, frame):
                if frame.EOS:
                    self.mark_eos(pad)
                # Send the frame to the worker
                self.in_queue.put((pad.name, frame))

            @staticmethod
            def sub_process_internal(**kwargs):
                inq, worker_stop = kwargs["inq"], kwargs["worker_stop"]

                try:
                    # Get data from the main process/thread
                    pad_name, frame = inq.get(timeout=1)

                    # Process or log the data
                    if not frame.EOS:
                        print(f"Sink received on {pad_name}: {frame.data}")
                    else:
                        print(f"Sink received EOS on {pad_name}")

                except Empty:
                    pass

    Example with thread mode:
        @dataclass
        class MyThreadedSinkElement(ParallelizeSinkElement):
            _use_threading_override = True

            # Rest of implementation same as above
    """

    internal = _ParallelizeBase.internal

    def __post_init__(self):
        SinkElement.__post_init__(self)
        _ParallelizeBase.__post_init__(self)

ParallelizeSourceElement dataclass

Bases: SourceElement, _ParallelizeBase, Parallelize

A Source element that generates data in a separate process or thread.

This class extends the standard SourceElement to execute its data generation logic in a separate worker (process or thread). It communicates with the main process through output queues, and manages the worker lifecycle. Subclasses must implement the sub_process_internal method to define the data generation logic that runs in the worker.

The design intentionally avoids passing class or instance references to the worker to prevent pickling issues when using process mode. Instead, it passes all necessary data and resources via function arguments.

The implementation includes special handling for KeyboardInterrupt signals. When Ctrl+C is pressed in the terminal, workers will catch and ignore the KeyboardInterrupt, allowing them to continue processing while the main process coordinates a graceful shutdown. This prevents data loss and ensures all resources are properly cleaned up.

Attributes:

Name Type Description
worker_argdict dict

Custom arguments to pass to the worker

queue_maxsize int

Maximum size of the communication queues

err_maxsize int

Maximum size for error data

frame_factory Callable

Function to create Frame objects

at_eos bool

Flag indicating if End-Of-Stream has been reached

_use_threading_override bool

Set to True to use threading or False to use multiprocessing. If not specified, uses the Parallelize.use_threading_default

Example with default process mode

@dataclass class MyDataSourceElement(ParallelizeSourceElement): def post_init(self): super().post_init() # Dictionary to track EOS status for each pad self.pad_eos = {pad.name: False for pad in self.source_pads}

def new(self, pad):
    # Check if this pad has already reached EOS
    if self.pad_eos[pad.name]:
        return Frame(data=None, EOS=True)

    try:
        # Get data generated by the worker
        # In a real implementation, you might use pad-specific queues
        # or have the worker send pad-specific data
        data = self.out_queue.get(timeout=1)

        # Check for EOS signal (None typically indicates EOS)
        if data is None:
            self.pad_eos[pad.name] = True
            # If all pads have reached EOS, set global EOS flag
            if all(self.pad_eos.values()):
                self.at_eos = True
            return Frame(data=None, EOS=True)

        # For data intended for other pads, you might implement
        # custom routing logic here

        return Frame(data=data)
    except queue.Empty:
        # Return an empty frame if no data is available
        return Frame(data=None)

@staticmethod
def sub_process_internal(**kwargs):
    outq, worker_stop = kwargs["outq"], kwargs["worker_stop"]

    # Generate data and send it back to the main process/thread
    for i in range(10):
        if worker_stop.is_set():
            break
        outq.put(f"Generated data {i}")
        time.sleep(0.5)

    # Signal end of stream with None
    outq.put(None)

    # Wait for worker_stop before terminating
    # This prevents "worker stopped before EOS" errors
    while not worker_stop.is_set():
        time.sleep(0.1)
Example with thread mode

@dataclass class MyThreadedSourceElement(ParallelizeSourceElement): _use_threading_override = True

def __post_init__(self):
    super().__post_init__()
    # Dictionary to track EOS status for each pad
    self.pad_eos = {pad.name: False for pad in self.source_pads}

def new(self, pad):
    # Similar implementation as in the process mode example,
    # but might use threading-specific features if needed
    if self.pad_eos[pad.name]:
        return Frame(data=None, EOS=True)

    # Rest of implementation same as the process mode example
Source code in sgn/subprocess.py
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
@dataclass
class ParallelizeSourceElement(SourceElement, _ParallelizeBase, Parallelize):
    """
    A Source element that generates data in a separate process or thread.

    This class extends the standard SourceElement to execute its data generation logic
    in a separate worker (process or thread). It communicates with the main process
    through output queues, and manages the worker lifecycle. Subclasses must implement
    the sub_process_internal method to define the data generation logic that runs in
    the worker.

    The design intentionally avoids passing class or instance references to the
    worker to prevent pickling issues when using process mode. Instead, it passes all
    necessary data and resources via function arguments.

    The implementation includes special handling for KeyboardInterrupt signals.
    When Ctrl+C is pressed in the terminal, workers will catch and ignore the
    KeyboardInterrupt, allowing them to continue processing while the main process
    coordinates a graceful shutdown. This prevents data loss and ensures all resources
    are properly cleaned up.

    Attributes:
        worker_argdict (dict, optional): Custom arguments to pass to the worker
        queue_maxsize (int, optional): Maximum size of the communication queues
        err_maxsize (int): Maximum size for error data
        frame_factory (Callable, optional): Function to create Frame objects
        at_eos (bool): Flag indicating if End-Of-Stream has been reached
        _use_threading_override (bool, optional): Set to True to use threading or
            False to use multiprocessing. If not specified, uses the
            Parallelize.use_threading_default

    Example with default process mode:
        @dataclass
        class MyDataSourceElement(ParallelizeSourceElement):
            def __post_init__(self):
                super().__post_init__()
                # Dictionary to track EOS status for each pad
                self.pad_eos = {pad.name: False for pad in self.source_pads}

            def new(self, pad):
                # Check if this pad has already reached EOS
                if self.pad_eos[pad.name]:
                    return Frame(data=None, EOS=True)

                try:
                    # Get data generated by the worker
                    # In a real implementation, you might use pad-specific queues
                    # or have the worker send pad-specific data
                    data = self.out_queue.get(timeout=1)

                    # Check for EOS signal (None typically indicates EOS)
                    if data is None:
                        self.pad_eos[pad.name] = True
                        # If all pads have reached EOS, set global EOS flag
                        if all(self.pad_eos.values()):
                            self.at_eos = True
                        return Frame(data=None, EOS=True)

                    # For data intended for other pads, you might implement
                    # custom routing logic here

                    return Frame(data=data)
                except queue.Empty:
                    # Return an empty frame if no data is available
                    return Frame(data=None)

            @staticmethod
            def sub_process_internal(**kwargs):
                outq, worker_stop = kwargs["outq"], kwargs["worker_stop"]

                # Generate data and send it back to the main process/thread
                for i in range(10):
                    if worker_stop.is_set():
                        break
                    outq.put(f"Generated data {i}")
                    time.sleep(0.5)

                # Signal end of stream with None
                outq.put(None)

                # Wait for worker_stop before terminating
                # This prevents "worker stopped before EOS" errors
                while not worker_stop.is_set():
                    time.sleep(0.1)

    Example with thread mode:
        @dataclass
        class MyThreadedSourceElement(ParallelizeSourceElement):
            _use_threading_override = True

            def __post_init__(self):
                super().__post_init__()
                # Dictionary to track EOS status for each pad
                self.pad_eos = {pad.name: False for pad in self.source_pads}

            def new(self, pad):
                # Similar implementation as in the process mode example,
                # but might use threading-specific features if needed
                if self.pad_eos[pad.name]:
                    return Frame(data=None, EOS=True)

                # Rest of implementation same as the process mode example
    """

    frame_factory: Callable = Frame
    at_eos: bool = False

    def __post_init__(self):
        SourceElement.__post_init__(self)
        _ParallelizeBase.__post_init__(self)

ParallelizeTransformElement dataclass

Bases: TransformElement, _ParallelizeBase, Parallelize

A Transform element that runs processing logic in a separate process or thread.

This class extends the standard TransformElement to execute its processing in a separate worker (process or thread). It communicates with the main process/thread through input and output queues, and manages the worker lifecycle. Subclasses must implement the sub_process_internal method to define the processing logic that runs in the worker.

The design intentionally avoids passing class or instance references to the worker to prevent pickling issues when using process mode. Instead, it passes all necessary data and resources via function arguments.

The implementation includes special handling for KeyboardInterrupt signals. When Ctrl+C is pressed in the terminal, workers will catch and ignore the KeyboardInterrupt, allowing them to continue processing while the main process coordinates a graceful shutdown. This prevents data loss and ensures all resources are properly cleaned up.

Attributes:

Name Type Description
worker_argdict dict

Custom arguments to pass to the worker

queue_maxsize int

Maximum size of the communication queues

err_maxsize int

Maximum size for error data

at_eos bool

Flag indicating if End-Of-Stream has been reached

_use_threading_override bool

Set to True to use threading or False to use multiprocessing. If not specified, uses the Parallelize.use_threading_default

Example with default process mode

@dataclass class MyProcessingElement(ParallelizeTransformElement): def post_init(self): super().post_init()

def pull(self, pad, frame):
    # Send the frame to the worker
    self.in_queue.put(frame)

@staticmethod
def sub_process_internal(**kwargs):
    # Process data in the worker
    inq, outq = kwargs["inq"], kwargs["outq"]
    frame = inq.get(timeout=1)
    # Process frame data
    outq.put(processed_frame)

def new(self, pad):
    # Get processed data from the worker
    return self.out_queue.get()
Example with thread mode

@dataclass class MyThreadedElement(ParallelizeTransformElement): _use_threading_override = True

# Rest of implementation same as above
Source code in sgn/subprocess.py
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
@dataclass
class ParallelizeTransformElement(TransformElement, _ParallelizeBase, Parallelize):
    """
    A Transform element that runs processing logic in a separate process or thread.

    This class extends the standard TransformElement to execute its processing in a
    separate worker (process or thread). It communicates with the main process/thread
    through input and output queues, and manages the worker lifecycle. Subclasses must
    implement the sub_process_internal method to define the processing logic that runs
    in the worker.

    The design intentionally avoids passing class or instance references to the
    worker to prevent pickling issues when using process mode. Instead, it passes all
    necessary data and resources via function arguments.

    The implementation includes special handling for KeyboardInterrupt signals.
    When Ctrl+C is pressed in the terminal, workers will catch and ignore the
    KeyboardInterrupt, allowing them to continue processing while the main process
    coordinates a graceful shutdown. This prevents data loss and ensures all resources
    are properly cleaned up.

    Attributes:
        worker_argdict (dict, optional): Custom arguments to pass to the worker
        queue_maxsize (int, optional): Maximum size of the communication queues
        err_maxsize (int): Maximum size for error data
        at_eos (bool): Flag indicating if End-Of-Stream has been reached
        _use_threading_override (bool, optional): Set to True to use threading or
            False to use multiprocessing. If not specified, uses the
            Parallelize.use_threading_default

    Example with default process mode:
        @dataclass
        class MyProcessingElement(ParallelizeTransformElement):
            def __post_init__(self):
                super().__post_init__()

            def pull(self, pad, frame):
                # Send the frame to the worker
                self.in_queue.put(frame)

            @staticmethod
            def sub_process_internal(**kwargs):
                # Process data in the worker
                inq, outq = kwargs["inq"], kwargs["outq"]
                frame = inq.get(timeout=1)
                # Process frame data
                outq.put(processed_frame)

            def new(self, pad):
                # Get processed data from the worker
                return self.out_queue.get()

    Example with thread mode:
        @dataclass
        class MyThreadedElement(ParallelizeTransformElement):
            _use_threading_override = True

            # Rest of implementation same as above
    """

    at_eos: bool = False

    internal = _ParallelizeBase.internal

    def __post_init__(self):
        TransformElement.__post_init__(self)
        _ParallelizeBase.__post_init__(self)