Source code for yolov8_postproc

import numpy as np 
import cv2
from python_plugin_template import AggregatorTemplate
from python_plugin_template import SimaaiPythonBuffer, MetaStruct
import gi
from typing import List, Tuple
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
gi.require_version('GObject', '2.0')
from gi.repository import Gst, GObject, GstBase

"""
To use Metadata fieds from the input buffers:  
Parse the MetaStruct object. It has the following 4 fields:  
class MetaStruct:
    def __init__(self, buffer_name, stream_id, timestamp, frame_id):
        self.buffer_name = buffer_name
        self.stream_id = stream_id
        self.timestamp = timestamp
        self.frame_id = frame_id

"""

[docs] plugin_name = "yolov8_postproc_overlay" #define PLUGIN_NAME HERE
[docs] class MyPlugin(AggregatorTemplate): def __init__(self):
[docs] self.out_size = int(1280 * 720 * 1.5) # outsize of plugin in bytes
super(MyPlugin, self).__init__(plugin_name=plugin_name, out_size=self.out_size)
[docs] self.classes = 87
[docs] self.color_palette = np.random.uniform(0, 255, size=(self.classes, 3))
[docs] self.confidence_thres = 0.5
[docs] self.iou_thres = 0.3
[docs] self.boxes = []
[docs] self.scores = []
[docs] self.class_ids = []
[docs] self.src_caps_set = False
[docs] self.MODEL_WIDTH = 640
[docs] self.MODEL_HEIGHT = 480
[docs] self.FRAME_WIDTH = 1280
[docs] self.FRAME_HEIGHT = 720
[docs] self.model_outs = [ (60,80,4), (30,40,4), (15,20,4), (60,80,87), (30,40,87), (15,20,87), ]
[docs] def draw_detections(self, img, box, score, class_id): x1, y1, x2, y2 = box color = self.color_palette[class_id] cv2.rectangle(img, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)
[docs] def get_model_outputs(self, input_buffer): start = 0 model_outputs = [] # first get box inputs for out_shape in self.model_outs: seg_size = np.prod(out_shape) arr = input_buffer[start: start+seg_size].reshape(out_shape) model_outputs.append(arr) start += seg_size return model_outputs
[docs] def compute_iou(self,box, boxes): # Compute xmin, ymin, xmax, ymax for both boxes xmin = np.maximum(box[0], boxes[:, 0]) ymin = np.maximum(box[1], boxes[:, 1]) xmax = np.minimum(box[2], boxes[:, 2]) ymax = np.minimum(box[3], boxes[:, 3]) # Compute intersection area intersection_area = np.maximum(0, xmax - xmin) * np.maximum(0, ymax - ymin) # Compute union area box_area = (box[2] - box[0]) * (box[3] - box[1]) boxes_area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) union_area = box_area + boxes_area - intersection_area # Compute IoU iou = intersection_area / union_area return iou
[docs] def nms(self, boxes, scores, iou_threshold): # Sort by score sorted_indices = np.argsort(scores)[::-1] keep_boxes = [] i = 0 while sorted_indices.size > 0: # Pick the last box box_id = sorted_indices[0] keep_boxes.append(box_id) # Compute IoU of the picked box with the rest ious = self.compute_iou(boxes[box_id, :], boxes[sorted_indices[1:], :]) # Remove boxes with IoU over the threshold keep_indices = np.where(ious < iou_threshold)[0] sorted_indices = sorted_indices[keep_indices + 1] i = i +1 return keep_boxes
[docs] def rescale_boxes(self,boxes, input_width, input_height, target_width, target_height): # Calculate the scale factors x_scale = target_width / input_width y_scale = target_height / input_height # Rescale the boxes boxes[:, [0, 2]] = boxes[:, [0, 2]] * x_scale # Scale xmin and xmax boxes[:, [1, 3]] = boxes[:, [1, 3]] * y_scale # Scale ymin and ymax return boxes
[docs] def xywh2xyxy(self,x): # Convert bounding box (x, y, w, h) to bounding box (x1, y1, x2, y2) y = np.copy(x) y[:, 0] = x[:, 0] - x[:, 2] / 2 y[:, 1] = x[:, 1] - x[:, 3] / 2 y[:, 2] = x[:, 0] + x[:, 2] / 2 y[:, 3] = x[:, 1] + x[:, 3] / 2 return y
[docs] def extract_boxes(self,boxes): # Scale boxes to original image dimensions boxes = self.rescale_boxes(boxes, self.MODEL_WIDTH, self.MODEL_HEIGHT, self.FRAME_WIDTH, self.FRAME_HEIGHT) # Convert boxes to xyxy format boxes = self.xywh2xyxy(boxes) # Check the boxes are within the image boxes[:, 0] = np.clip(boxes[:, 0], 0, self.FRAME_WIDTH) boxes[:, 1] = np.clip(boxes[:, 1], 0, self.FRAME_HEIGHT) boxes[:, 2] = np.clip(boxes[:, 2], 0, self.FRAME_WIDTH) boxes[:, 3] = np.clip(boxes[:, 3], 0, self.FRAME_HEIGHT) return boxes
[docs] def run(self, input_buffers: List[SimaaiPythonBuffer], output_buffer: bytes) -> None: """ Define your plugin logic HERE Inputs: input_buffers List[SimaaiPythonBuffer]: List of input buffers Object of class SimaaiPythonBuffer has three fields: 1. metadata MetaStruct Refer to the structure above 2. data bytes - raw bytes of the incoming buffer 3. size int - size of incoming buffer in bytes """ #read model output model_output_buffer = np.frombuffer(input_buffers[0].data, dtype=np.float32) model_out_list = self.get_model_outputs(model_output_buffer) box_outs = model_out_list[:3] score_outs = model_out_list[3:6] box_arr = np.concatenate([x.reshape(-1,4) for x in box_outs], axis=0) #6300, 4 score_arr = np.concatenate([x.reshape(-1,self.classes) for x in score_outs], axis=0) #6300,87 scores = np.max(score_arr, axis=1) labels = np.argmax(score_arr, axis=1) #filter by score keep = scores > self.confidence_thres boxes = box_arr[keep] scores = scores[keep] labels = labels[keep] boxes = self.extract_boxes(boxes) #boxes wrt 1280, 720 indices = self.nms(boxes, scores, self.iou_thres) y_size = self.FRAME_HEIGHT * self.FRAME_WIDTH y_ = np.frombuffer(input_buffers[1].data[:y_size], dtype=np.uint8).reshape(self.FRAME_HEIGHT, self.FRAME_WIDTH) uv_ = np.frombuffer(input_buffers[1].data[y_size:], dtype=np.uint8).reshape(self.FRAME_HEIGHT // 2, self.FRAME_WIDTH // 2, 2) for i in indices: box = boxes[i] score = scores[i] class_id = labels[i] x1, y1, x2, y2 = box color = self.color_palette[class_id] # @Petro We need to hide away this from the user. # cv2.rectangle(rgb) should be convered into whatever it is below, or leave it to the user to do this? cv2.rectangle(y_, (int(x1), int(y1)), (int(x2), int(y2)), color, 2) cv2.rectangle(uv_, (int(x1//2), int(y1//2)), (int(x2//2), int(y2//2)), color, 2) output_buffer[:self.out_size] = np.concatenate([y_.flatten(), uv_.flatten()]).tobytes()
GObject.type_register(MyPlugin) __gstelementfactory__ = (plugin_name, Gst.Rank.NONE, MyPlugin)