Source code for python_plugin_template

import gi
import numpy as np 
from typing import List, Dict, Optional, Tuple
import cv2
from dataclasses import dataclass

gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
gi.require_version('GObject', '2.0')
from gi.repository import Gst, GObject, GstBase
from enum import Enum
import time

Gst.init(None)

[docs] SIMAAI_META_STR = "GstSimaMeta"
[docs] PLUGIN_CPU_TYPE = "APU"
[docs] class MetaStruct: def __init__(self, buffer_name, stream_id, timestamp, frame_id):
[docs] self.buffer_name = buffer_name
[docs] self.stream_id = stream_id
[docs] self.timestamp = timestamp
[docs] self.frame_id = frame_id
[docs] class SimaaiPythonBuffer: def __init__(self, metadata: MetaStruct, map: Gst.MapInfo):
[docs] self.metadata = metadata
[docs] self.data = map.data
[docs] self.size = map.size
[docs] class ValueType(Enum):
[docs] INT64 = GObject.TYPE_INT64
[docs] UINT64 = GObject.TYPE_UINT64
[docs] STRING = GObject.TYPE_STRING
[docs] DOUBLE = GObject.TYPE_DOUBLE
[docs] class AggregatorTemplate(GstBase.Aggregator): """ A Python based gstreamer plugin template. Enables the user to: - Accept incoming buffers from dynamic pads - Define any custom plugin runtime logic User has to only override the run() function """
[docs] transmit = GObject.Property(type=bool, default=False, nick="Flag to enable/disable KPI transmission")
[docs] silent = GObject.Property(type=bool, default=False, nick="Flag to enable/disable silent mode")
[docs] config = GObject.Property(type=str, default="", nick="Path to config json file")
__gstmetadata__ = ('AggregatorTemplate', 'Aggregator', 'Custom Python Aggregator', 'YourName') __gsttemplates__ = ( Gst.PadTemplate.new_with_gtype( "sink_%u", Gst.PadDirection.SINK, Gst.PadPresence.REQUEST, Gst.Caps.new_any(), GstBase.AggregatorPad.__gtype__, ), Gst.PadTemplate.new_with_gtype( "src", Gst.PadDirection.SRC, Gst.PadPresence.ALWAYS, Gst.Caps.new_any(), GstBase.AggregatorPad.__gtype__, ) ) def __init__(self, plugin_name, out_size): super(AggregatorTemplate, self).__init__()
[docs] self.plugin_name = plugin_name
[docs] self.dynamic_pads = []
[docs] self.src_caps_set = False
[docs] self.timestamp = 0
[docs] self.frame_id = 0
[docs] self.is_pcie = False
[docs] self.in_pcie_buf_id = 0
[docs] self.stream_id = "unknown-stream"
[docs] self.buffer_name = "default"
[docs] self.plugin_id = "python-agg-template"
[docs] self.t0 = None
[docs] self.t1 = None
[docs] self.out_size = out_size
#print("Inside INIT")
[docs] def request_new_pad(self, templ, direction=None, name=None): """ Handle dynamic pad requests. Pads are created when a new input stream is added dynamically. """ #print("[PYTHON] Requesting new dynamic pad") pad_name = f"sink_{len(self.dynamic_pads)}" new_pad = Gst.Pad.new_from_template(templ, pad_name) self.dynamic_pads.append(new_pad) return new_pad
[docs] def do_start(self): """ Handle start even for the aggregator. """ #print("In do start") self.plugin_id = self.get_name() #print(f"[PLUGIN TEMPLATE] PLUGIN NAME: {self.plugin_id}") stream_start_event = Gst.Event.new_stream_start("aggregator-template-stream") self.srcpad.push_event(stream_start_event) return True
[docs] def finish_buffer(self, buffer): """ Finalizes and pushes the buffer downstream. """ #print("In finish buffer") GstBase.Aggregator.finish_buffer(self, buffer)
[docs] def do_set_property(self, property_id, value): if property_id == "silent": self.silent = value elif property_id == "transmit": self.transmit = value elif property_id == "config": self.config = value else: raise AttributeError(f"Unknown property {property_id}")
[docs] def do_get_property(self, property_id): if property_id == "silent": return self.silent elif property_id == "transmit": return self.transmit elif property_id == "config": return self.config else: raise AttributeError(f"Unknown property {property_id}")
[docs] def extract_metadata(self, buffer: Gst.Buffer) -> None: """ Input: buffer Gst.Buffer: Input buffer from which metadata will be extracted. """ meta = buffer.get_custom_meta(SIMAAI_META_STR) if meta: s = meta.get_structure() if s: # check if pcie ret, pcie_buffer_id = s.get_int64("pcie-buffer-id") if ret: self.is_pcie = True self.in_pcie_buf_id = pcie_buffer_id #print("PCIE Buffer Id: ", pcie_buffer_id) #buffer-id ret, buffer_id = s.get_int64('buffer-id') if ret: self.buffer_id = buffer_id #print("Buffer Id: ", buffer_id) #frame-id ret, frame_id = s.get_int64("frame-id") if ret: self.frame_id = frame_id #print("frame id: ", frame_id) #stream-id stream_id = s.get_string('stream-id') if stream_id: self.stream_id = stream_id #print("stream Id: ", stream_id) #timestamp ret, timestamp = s.get_uint64('timestamp') if ret: self.timestamp = timestamp #print("Timestamp: ", timestamp) #buffer-name buffer_name = s.get_string('buffer-name') if buffer_name: self.buffer_name = buffer_name #print("buffer_name: ", buffer_name) #buffer-offset ret, buffer_offset = s.get_int64('buffer-offset') if ret: self.buffer_offset = buffer_offset #print("Buffer offset: ", buffer_offset) return MetaStruct(frame_id=frame_id, buffer_name=buffer_name, \ timestamp=timestamp, stream_id=stream_id) else: Gst.error("Unable to extract metadata from incoming buffer")
[docs] def insert_metadata(self, buffer: Gst.Buffer) -> None: """ Input: buffer Gst.Buffer: Buffer to be inserted with custom metadata """ meta = Gst.Structure.new_empty(SIMAAI_META_STR) # for each metadata field, get value with the get gobject value, and set it #buffer id buffer_id_val = self.get_gobject_value(self.buffer_id, ValueType.INT64) meta.set_value('buffer-id', buffer_id_val) #buffer-name buffer_name_value = self.get_gobject_value(self.plugin_id, ValueType.STRING) meta.set_value('buffer-name', buffer_name_value) #buffer-offset buffer_offset_value = self.get_gobject_value(0, ValueType.INT64) meta.set_value('buffer-offset', buffer_offset_value) #frame-id frame_id_value = self.get_gobject_value(self.frame_id, ValueType.INT64) meta.set_value('frame-id', frame_id_value) # stream-id stream_id_value = self.get_gobject_value(self.stream_id, ValueType.STRING) meta.set_value('stream-id', stream_id_value) #timestamp timestamp_value = self.get_gobject_value(self.timestamp, ValueType.UINT64) meta.set_value('timestamp', timestamp_value) #check if pcie pipeline if self.is_pcie: pcie_val = self.get_gobject_value(self.in_pcie_buf_id, ValueType.INT64) meta.set_value('pcie-buffer-id', pcie_val)
[docs] def get_gobject_value(self, value, value_type:ValueType) -> GObject.Value: gvalue = GObject.Value() gvalue.init(value_type.value) if value_type == ValueType.UINT64: gvalue.set_uint64(value) elif value_type == ValueType.INT64: gvalue.set_int64(value) elif value_type == ValueType.STRING: gvalue.set_string(value) elif value_type == ValueType.DOUBLE: gvalue.set_float(value) else: raise ValueError(f"Unsupported ValueType: {value_type}") return gvalue
[docs] def do_aggregate(self, timeout): """ Called when buffers are queued on all sinkpads. Calls the run() function defined by the user """ self.t0 = time.monotonic_ns() input_pads = [] #print("In do aggreagate") pad_iterator = self.iterate_sink_pads() while True: result, pad = pad_iterator.next() if result == Gst.IteratorResult.OK: #print(f"Sink Pad: {pad.get_name()}") if pad.get_name().startswith("sink_"): input_pads.append((pad, pad.get_name())) elif result == Gst.IteratorResult.DONE: break input_buffers = [] #stores list of SimaaiPythonBufer obj mapped_buffers = [] # stores mapping of each input buffer to it's map #print("Number of pads: ", len(input_pads)) for pad, pad_name in input_pads: if pad.is_linked(): buffer = pad.pop_buffer() if buffer: metaObj = self.extract_metadata(buffer) success, buffer_map = buffer.map(Gst.MapFlags.READ) input_buffers.append(SimaaiPythonBuffer(metadata=metaObj, map=buffer_map)) mapped_buffers.append((buffer, buffer_map)) output_buffer = Gst.Buffer.new_allocate(None, self.out_size, None) output_map = output_buffer.map(Gst.MapFlags.WRITE | Gst.MapFlags.READ) self.insert_metadata(output_buffer) # call to the run function - main logic if input_buffers: self.run(input_buffers, output_map.data) # unmap all input maps for buffer, buffer_map in mapped_buffers: buffer.unmap(buffer_map) #unmap output buffer output_buffer.unmap(output_map) self.finish_buffer(output_buffer) self.t1 = time.monotonic_ns() elapsed_time = (self.t1 - self.t0) * 10e-6 #convert to millisecond if self.transmit: kernel_start, kernel_end = 0,0 kpi_struct = Gst.Structure.new_empty("kpi") kpi_struct.set_value("plugin_start", self.get_gobject_value(self.t0 * 10e-6, ValueType.UINT64)) kpi_struct.set_value("plugin_end", self.get_gobject_value(self.t1 * 10e-6, ValueType.UINT64)) kpi_struct.set_value("duration", self.get_gobject_value(elapsed_time, ValueType.DOUBLE)) kpi_struct.set_value("kernel_start", self.get_gobject_value(kernel_start, ValueType.UINT64)) kpi_struct.set_value("kernel_end", self.get_gobject_value(kernel_end, ValueType.UINT64)) kpi_struct.set_value("frame_id", self.get_gobject_value(self.frame_id, ValueType.INT64)) kpi_struct.set_value("plugin_id", self.get_gobject_value(self.plugin_id, ValueType.STRING)) kpi_struct.set_value("plugin_type", self.get_gobject_value(PLUGIN_CPU_TYPE, ValueType.STRING)) kpi_struct.set_value("stream_id", self.get_gobject_value(self.stream_id, ValueType.STRING)) message = Gst.Message.new_application(self, kpi_struct) # Post the message to the bus bus = self.get_bus() if bus: bus.post(message) else: Gst.error("Could find a bus to send") return Gst.FlowReturn.OK
[docs] def run(self, input_buffers: List[Gst.Buffer], output_buffer: Gst.Buffer) -> None: """ Input: input_buffers: List[Gst.Buffer] List of input buffers, source from each pad. output_buffer: Gst.Buffer Output buffer that needs to be overwritten. Implement your logic within this function. Process the input buffers, and modify the output buffer. """ raise NotImplementedError("run def not implemented")