Using HTTP Control in SGN Pipelines¶
This tutorial explains how to use the HTTP control capabilities in SGN for monitoring and controlling pipeline elements via HTTP interfaces.
Overview¶
SGN provides classes in the control.py
module that allow you to expose your pipeline elements via an HTTP interface. This enables:
- Remote monitoring of element state
- Dynamic reconfiguration of elements during runtime
- Integration with external systems through HTTP
The main classes available are:
HTTPControl
: A context manager for managing the HTTP serverHTTPControlSourceElement
: A source element with HTTP control capabilitiesHTTPControlTransformElement
: A transform element with HTTP control capabilitiesHTTPControlSinkElement
: A sink element with HTTP control capabilities
Basic Usage¶
Let's start with a simple example that demonstrates how to use HTTPControl
with an SGN pipeline:
#!/usr/bin/env python3
from sgn.apps import Pipeline
from sgn.control import HTTPControl
from sgn.base import SourceElement, SinkElement, Frame
class MySource(SourceElement):
def new(self, pad):
return Frame(data="Hello from Source", EOS=True)
class MySink(SinkElement):
def pull(self, pad, frame):
print(f"Received: {frame.data}")
source = MySource(source_pad_names=("out",))
sink = MySink(sink_pad_names=("in",))
pipeline = Pipeline()
pipeline.insert(source, sink, link_map={sink.snks["in"]: source.srcs["out"]})
# Run the pipeline with HTTP control
with HTTPControl() as control:
pipeline.run()
When you run this example, an HTTP server will start on the default port (8080), and you'll see a message indicating the server address. The pipeline will run as usual, but now with an HTTP server available.
HTTP-Controllable Elements¶
Now let's create a more advanced example using the HTTP-controllable element classes:
#!/usr/bin/env python3
import time
from sgn.apps import Pipeline
from sgn.control import HTTPControl, HTTPControlSourceElement, HTTPControlSinkElement
from sgn.base import Frame
class ControlledSource(HTTPControlSourceElement):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.state = {"message": "Hello, World!", "count": 0}
def new(self, pad):
# Update state from HTTP if available
time.sleep(1)
HTTPControl.exchange_state(self.name, self.state)
# Increment counter and return frame
self.state["count"] += 1
return Frame(data=f"{self.state['message']} #{self.state['count']}", EOS=self.signaled_eos())
class ControlledSink(HTTPControlSinkElement):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.received_count = 0
self.last_message = ""
def pull(self, pad, frame):
if frame.EOS:
self.mark_eos(pad)
self.received_count += 1
self.last_message = frame.data
print(f"Sink received: {frame.data}")
# Update state for HTTP clients
HTTPControl.exchange_state(
self.name,
{"received_count": self.received_count, "last_message": self.last_message}
)
# Create pipeline elements
source = ControlledSource(name="source", source_pad_names=("out",))
sink = ControlledSink(name="sink", sink_pad_names=("in",))
# Create and connect pipeline
pipeline = Pipeline()
pipeline.insert(source, sink, link_map={sink.snks["in"]: source.srcs["out"]})
# Set custom port if needed
HTTPControl.port = 8080 # Default is already 8080
# Run with HTTP control
with HTTPControl() as control:
pipeline.run()
Interacting with HTTP Endpoints¶
Once your pipeline is running with HTTP control, you can interact with it using HTTP requests. Here are some examples using curl
:
Get Element State¶
To retrieve the current state of an element:
curl http://localhost:8080/get/source
This will return a JSON object with the element's state, for example:
{"message": "Hello, World!", "count": 42}
You can also request a specific property:
curl http://localhost:8080/get/source/message
Response:
"Hello, World!"
Update Element State¶
To update an element's state:
curl -X POST -H "Content-Type: application/json" -d '{"message": "Updated message"}' http://localhost:8080/post/source
This will update the message
property of the source element to "Updated message".
Advanced Example: Adding Transforms with HTTP Control¶
Let's extend our example to include a transform element with HTTP control:
#!/usr/bin/env python3
import time
from sgn.apps import Pipeline
from sgn.control import HTTPControl, HTTPControlSourceElement, HTTPControlTransformElement, HTTPControlSinkElement
from sgn.base import Frame
class ControlledSource(HTTPControlSourceElement):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.state = {"message": "Hello, World!", "count": 0}
def new(self, pad):
# Update state from HTTP if available
time.sleep(1)
HTTPControl.exchange_state(self.name, self.state)
# Increment counter and return frame
self.state["count"] += 1
return Frame(data=f"{self.state['message']} #{self.state['count']}", EOS=self.signaled_eos())
class ControlledTransform(HTTPControlTransformElement):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.state = {"prefix": "Transformed: ", "active": True}
self.current_frame = None
def pull(self, pad, frame):
# Store the incoming frame
self.current_frame = frame
# Update state from HTTP if available
HTTPControl.exchange_state(self.name, self.state)
def new(self, pad):
# Check if we have a frame to process
if self.current_frame:
# Apply transformation if active
if self.state['active']:
return Frame(
data=f"{self.state['prefix']}{self.current_frame.data}",
EOS=self.current_frame.EOS
)
else:
# Pass through without transformation
return Frame(
data=self.current_frame.data,
EOS=self.current_frame.EOS
)
# Fallback if no frame is available
return Frame(data="No input available")
class ControlledSink(HTTPControlSinkElement):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.received_count = 0
def pull(self, pad, frame):
if frame.EOS:
self.mark_eos(pad)
self.received_count += 1
print(f"Received ({self.received_count}): {frame.data}")
HTTPControl.exchange_state(self.name, {"received_count": self.received_count})
# Create pipeline elements
source = ControlledSource(name="source", source_pad_names=("out",))
transform = ControlledTransform(
name="transform",
sink_pad_names=("in",),
source_pad_names=("out",)
)
sink = ControlledSink(name="sink", sink_pad_names=("in",))
# Create and connect pipeline
pipeline = Pipeline()
pipeline.insert(
source, transform, sink,
link_map={
transform.snks["in"]: source.srcs["out"],
sink.snks["in"]: transform.srcs["out"]
}
)
# Run with HTTP control
with HTTPControl() as control:
# This writes the HTTP endpoint to registry.txt
pipeline.run()
With this pipeline, you can:
- Change the source message
- Enable/disable the transform
- Change the transform prefix
- Monitor how many frames the sink has received
$ curl -X POST -H "Content-Type: application/json" -d '{"active": false}' http://localhost:8080/post/transform
$ curl -X POST -H "Content-Type: application/json" -d '{"active": true}' http://localhost:8080/post/transform
Customizing HTTP Control¶
You can customize how HTTPControl
works by setting class attributes before using it:
# Set custom host and port
HTTPControl.host = "0.0.0.0" # Listen on all interfaces
HTTPControl.port = 9090 # Use port 9090 instead of default 8080
# Set a custom registry file
HTTPControl.registry_file = "my_registry.txt"
# Set a URL path prefix/tag
HTTPControl.tag = "my_app" # URLs will be /my_app/get/... instead of /get/...
# Then use it as usual
with HTTPControl() as control:
pipeline.run()
With the tag set, your endpoints would be accessible at paths like:
http://localhost:9090/my_app/get/source
http://localhost:9090/my_app/post/transform
Performance Considerations¶
- The HTTP control mechanisms add overhead to your pipeline
- For high-throughput applications, consider reducing how frequently you call
exchange_state()
- Queue size is fixed at 1 by default, which means only the latest data is kept
Conclusion¶
The HTTP control capabilities in SGN provide a powerful way to monitor and control your pipeline elements during runtime. This is particularly useful for:
- Debugging and monitoring
- Building interactive applications
- Integrating with dashboards or other visualization tools
- Remote configuration of long-running pipelines
By exposing your pipeline elements via HTTP, you make your SGN application more flexible and easier to integrate with other systems.