import numpy as np
import cv2
from python_plugin_template import AggregatorTemplate
from python_plugin_template import SimaaiPythonBuffer, MetaStruct
import gi
from typing import List, Tuple
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
gi.require_version('GObject', '2.0')
from gi.repository import Gst, GObject, GstBase
"""
To use Metadata fieds from the input buffers:
Parse the MetaStruct object. It has the following 4 fields:
class MetaStruct:
def __init__(self, buffer_name, stream_id, timestamp, frame_id):
self.buffer_name = buffer_name
self.stream_id = stream_id
self.timestamp = timestamp
self.frame_id = frame_id
"""
[docs]
plugin_name = "yolov8_postproc_overlay" #define PLUGIN_NAME HERE
[docs]
class MyPlugin(AggregatorTemplate):
def __init__(self):
[docs]
self.out_size = int(1280 * 720 * 1.5) # outsize of plugin in bytes
super(MyPlugin, self).__init__(plugin_name=plugin_name, out_size=self.out_size)
[docs]
self.color_palette = np.random.uniform(0, 255, size=(self.classes, 3))
[docs]
self.confidence_thres = 0.5
[docs]
self.src_caps_set = False
[docs]
self.MODEL_HEIGHT = 480
[docs]
self.FRAME_WIDTH = 1280
[docs]
self.FRAME_HEIGHT = 720
[docs]
self.model_outs = [
(60,80,4),
(30,40,4),
(15,20,4),
(60,80,87),
(30,40,87),
(15,20,87),
]
[docs]
def draw_detections(self, img, box, score, class_id):
x1, y1, x2, y2 = box
color = self.color_palette[class_id]
cv2.rectangle(img, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)
[docs]
def get_model_outputs(self, input_buffer):
start = 0
model_outputs = []
# first get box inputs
for out_shape in self.model_outs:
seg_size = np.prod(out_shape)
arr = input_buffer[start: start+seg_size].reshape(out_shape)
model_outputs.append(arr)
start += seg_size
return model_outputs
[docs]
def compute_iou(self,box, boxes):
# Compute xmin, ymin, xmax, ymax for both boxes
xmin = np.maximum(box[0], boxes[:, 0])
ymin = np.maximum(box[1], boxes[:, 1])
xmax = np.minimum(box[2], boxes[:, 2])
ymax = np.minimum(box[3], boxes[:, 3])
# Compute intersection area
intersection_area = np.maximum(0, xmax - xmin) * np.maximum(0, ymax - ymin)
# Compute union area
box_area = (box[2] - box[0]) * (box[3] - box[1])
boxes_area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
union_area = box_area + boxes_area - intersection_area
# Compute IoU
iou = intersection_area / union_area
return iou
[docs]
def nms(self, boxes, scores, iou_threshold):
# Sort by score
sorted_indices = np.argsort(scores)[::-1]
keep_boxes = []
i = 0
while sorted_indices.size > 0:
# Pick the last box
box_id = sorted_indices[0]
keep_boxes.append(box_id)
# Compute IoU of the picked box with the rest
ious = self.compute_iou(boxes[box_id, :], boxes[sorted_indices[1:], :])
# Remove boxes with IoU over the threshold
keep_indices = np.where(ious < iou_threshold)[0]
sorted_indices = sorted_indices[keep_indices + 1]
i = i +1
return keep_boxes
[docs]
def rescale_boxes(self,boxes, input_width, input_height, target_width, target_height):
# Calculate the scale factors
x_scale = target_width / input_width
y_scale = target_height / input_height
# Rescale the boxes
boxes[:, [0, 2]] = boxes[:, [0, 2]] * x_scale # Scale xmin and xmax
boxes[:, [1, 3]] = boxes[:, [1, 3]] * y_scale # Scale ymin and ymax
return boxes
[docs]
def xywh2xyxy(self,x):
# Convert bounding box (x, y, w, h) to bounding box (x1, y1, x2, y2)
y = np.copy(x)
y[:, 0] = x[:, 0] - x[:, 2] / 2
y[:, 1] = x[:, 1] - x[:, 3] / 2
y[:, 2] = x[:, 0] + x[:, 2] / 2
y[:, 3] = x[:, 1] + x[:, 3] / 2
return y
[docs]
def run(self, input_buffers: List[SimaaiPythonBuffer], output_buffer: bytes) -> None:
"""
Define your plugin logic HERE
Inputs:
input_buffers List[SimaaiPythonBuffer]: List of input buffers
Object of class SimaaiPythonBuffer has three fields:
1. metadata MetaStruct Refer to the structure above
2. data bytes - raw bytes of the incoming buffer
3. size int - size of incoming buffer in bytes
"""
#read model output
model_output_buffer = np.frombuffer(input_buffers[0].data, dtype=np.float32)
model_out_list = self.get_model_outputs(model_output_buffer)
box_outs = model_out_list[:3]
score_outs = model_out_list[3:6]
box_arr = np.concatenate([x.reshape(-1,4) for x in box_outs], axis=0) #6300, 4
score_arr = np.concatenate([x.reshape(-1,self.classes) for x in score_outs], axis=0) #6300,87
scores = np.max(score_arr, axis=1)
labels = np.argmax(score_arr, axis=1)
#filter by score
keep = scores > self.confidence_thres
boxes = box_arr[keep]
scores = scores[keep]
labels = labels[keep]
boxes = self.extract_boxes(boxes) #boxes wrt 1280, 720
indices = self.nms(boxes, scores, self.iou_thres)
y_size = self.FRAME_HEIGHT * self.FRAME_WIDTH
y_ = np.frombuffer(input_buffers[1].data[:y_size], dtype=np.uint8).reshape(self.FRAME_HEIGHT, self.FRAME_WIDTH)
uv_ = np.frombuffer(input_buffers[1].data[y_size:], dtype=np.uint8).reshape(self.FRAME_HEIGHT // 2, self.FRAME_WIDTH // 2, 2)
for i in indices:
box = boxes[i]
score = scores[i]
class_id = labels[i]
x1, y1, x2, y2 = box
color = self.color_palette[class_id]
# @Petro We need to hide away this from the user.
# cv2.rectangle(rgb) should be convered into whatever it is below, or leave it to the user to do this?
cv2.rectangle(y_, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)
cv2.rectangle(uv_, (int(x1//2), int(y1//2)), (int(x2//2), int(y2//2)), color, 2)
output_buffer[:self.out_size] = np.concatenate([y_.flatten(), uv_.flatten()]).tobytes()
GObject.type_register(MyPlugin)
__gstelementfactory__ = (plugin_name, Gst.Rank.NONE, MyPlugin)