import gi
import numpy as np
from typing import List, Dict, Optional, Tuple
import cv2
from dataclasses import dataclass
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
gi.require_version('GObject', '2.0')
from gi.repository import Gst, GObject, GstBase
from enum import Enum
import time
Gst.init(None)
[docs]
PLUGIN_CPU_TYPE = "APU"
[docs]
class SimaaiPythonBuffer:
def __init__(self, metadata: MetaStruct, map: Gst.MapInfo):
[docs]
class ValueType(Enum):
[docs]
INT64 = GObject.TYPE_INT64
[docs]
UINT64 = GObject.TYPE_UINT64
[docs]
STRING = GObject.TYPE_STRING
[docs]
DOUBLE = GObject.TYPE_DOUBLE
[docs]
class AggregatorTemplate(GstBase.Aggregator):
"""
A Python based gstreamer plugin template. Enables the user to:
- Accept incoming buffers from dynamic pads
- Define any custom plugin runtime logic
User has to only override the run() function
"""
[docs]
transmit = GObject.Property(type=bool, default=False, nick="Flag to enable/disable KPI transmission")
[docs]
silent = GObject.Property(type=bool, default=False, nick="Flag to enable/disable silent mode")
[docs]
config = GObject.Property(type=str, default="", nick="Path to config json file")
__gstmetadata__ = ('AggregatorTemplate', 'Aggregator', 'Custom Python Aggregator', 'YourName')
__gsttemplates__ = (
Gst.PadTemplate.new_with_gtype(
"sink_%u",
Gst.PadDirection.SINK,
Gst.PadPresence.REQUEST,
Gst.Caps.new_any(),
GstBase.AggregatorPad.__gtype__,
),
Gst.PadTemplate.new_with_gtype(
"src",
Gst.PadDirection.SRC,
Gst.PadPresence.ALWAYS,
Gst.Caps.new_any(),
GstBase.AggregatorPad.__gtype__,
)
)
def __init__(self, plugin_name, out_size):
super(AggregatorTemplate, self).__init__()
[docs]
self.plugin_name = plugin_name
[docs]
self.src_caps_set = False
[docs]
self.in_pcie_buf_id = 0
[docs]
self.stream_id = "unknown-stream"
[docs]
self.buffer_name = "default"
[docs]
self.plugin_id = "python-agg-template"
[docs]
self.out_size = out_size
#print("Inside INIT")
[docs]
def request_new_pad(self, templ, direction=None, name=None):
"""
Handle dynamic pad requests.
Pads are created when a new input stream is added dynamically.
"""
#print("[PYTHON] Requesting new dynamic pad")
pad_name = f"sink_{len(self.dynamic_pads)}"
new_pad = Gst.Pad.new_from_template(templ, pad_name)
self.dynamic_pads.append(new_pad)
return new_pad
[docs]
def do_start(self):
"""
Handle start even for the aggregator.
"""
#print("In do start")
self.plugin_id = self.get_name()
#print(f"[PLUGIN TEMPLATE] PLUGIN NAME: {self.plugin_id}")
stream_start_event = Gst.Event.new_stream_start("aggregator-template-stream")
self.srcpad.push_event(stream_start_event)
return True
[docs]
def finish_buffer(self, buffer):
"""
Finalizes and pushes the buffer downstream.
"""
#print("In finish buffer")
GstBase.Aggregator.finish_buffer(self, buffer)
[docs]
def do_set_property(self, property_id, value):
if property_id == "silent":
self.silent = value
elif property_id == "transmit":
self.transmit = value
elif property_id == "config":
self.config = value
else:
raise AttributeError(f"Unknown property {property_id}")
[docs]
def do_get_property(self, property_id):
if property_id == "silent":
return self.silent
elif property_id == "transmit":
return self.transmit
elif property_id == "config":
return self.config
else:
raise AttributeError(f"Unknown property {property_id}")
[docs]
def get_gobject_value(self, value, value_type:ValueType) -> GObject.Value:
gvalue = GObject.Value()
gvalue.init(value_type.value)
if value_type == ValueType.UINT64:
gvalue.set_uint64(value)
elif value_type == ValueType.INT64:
gvalue.set_int64(value)
elif value_type == ValueType.STRING:
gvalue.set_string(value)
elif value_type == ValueType.DOUBLE:
gvalue.set_float(value)
else:
raise ValueError(f"Unsupported ValueType: {value_type}")
return gvalue
[docs]
def do_aggregate(self, timeout):
"""
Called when buffers are queued on all sinkpads.
Calls the run() function defined by the user
"""
self.t0 = time.monotonic_ns()
input_pads = []
#print("In do aggreagate")
pad_iterator = self.iterate_sink_pads()
while True:
result, pad = pad_iterator.next()
if result == Gst.IteratorResult.OK:
#print(f"Sink Pad: {pad.get_name()}")
if pad.get_name().startswith("sink_"):
input_pads.append((pad, pad.get_name()))
elif result == Gst.IteratorResult.DONE:
break
input_buffers = [] #stores list of SimaaiPythonBufer obj
mapped_buffers = [] # stores mapping of each input buffer to it's map
#print("Number of pads: ", len(input_pads))
for pad, pad_name in input_pads:
if pad.is_linked():
buffer = pad.pop_buffer()
if buffer:
metaObj = self.extract_metadata(buffer)
success, buffer_map = buffer.map(Gst.MapFlags.READ)
input_buffers.append(SimaaiPythonBuffer(metadata=metaObj, map=buffer_map))
mapped_buffers.append((buffer, buffer_map))
output_buffer = Gst.Buffer.new_allocate(None, self.out_size, None)
output_map = output_buffer.map(Gst.MapFlags.WRITE | Gst.MapFlags.READ)
self.insert_metadata(output_buffer)
# call to the run function - main logic
if input_buffers:
self.run(input_buffers, output_map.data)
# unmap all input maps
for buffer, buffer_map in mapped_buffers:
buffer.unmap(buffer_map)
#unmap output buffer
output_buffer.unmap(output_map)
self.finish_buffer(output_buffer)
self.t1 = time.monotonic_ns()
elapsed_time = (self.t1 - self.t0) * 10e-6 #convert to millisecond
if self.transmit:
kernel_start, kernel_end = 0,0
kpi_struct = Gst.Structure.new_empty("kpi")
kpi_struct.set_value("plugin_start", self.get_gobject_value(self.t0 * 10e-6, ValueType.UINT64))
kpi_struct.set_value("plugin_end", self.get_gobject_value(self.t1 * 10e-6, ValueType.UINT64))
kpi_struct.set_value("duration", self.get_gobject_value(elapsed_time, ValueType.DOUBLE))
kpi_struct.set_value("kernel_start", self.get_gobject_value(kernel_start, ValueType.UINT64))
kpi_struct.set_value("kernel_end", self.get_gobject_value(kernel_end, ValueType.UINT64))
kpi_struct.set_value("frame_id", self.get_gobject_value(self.frame_id, ValueType.INT64))
kpi_struct.set_value("plugin_id", self.get_gobject_value(self.plugin_id, ValueType.STRING))
kpi_struct.set_value("plugin_type", self.get_gobject_value(PLUGIN_CPU_TYPE, ValueType.STRING))
kpi_struct.set_value("stream_id", self.get_gobject_value(self.stream_id, ValueType.STRING))
message = Gst.Message.new_application(self, kpi_struct)
# Post the message to the bus
bus = self.get_bus()
if bus:
bus.post(message)
else:
Gst.error("Could find a bus to send")
return Gst.FlowReturn.OK
[docs]
def run(self, input_buffers: List[Gst.Buffer], output_buffer: Gst.Buffer) -> None:
"""
Input:
input_buffers: List[Gst.Buffer] List of input buffers, source from each pad.
output_buffer: Gst.Buffer Output buffer that needs to be overwritten.
Implement your logic within this function. Process the input buffers, and modify the output
buffer.
"""
raise NotImplementedError("run def not implemented")