#########################################################
# Copyright (C) 2024-25 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
from mpk_parser import Parser, sima_mpk
import spy
import numpy as np
import json
import time
from spy.logger import Logger, LogLevel
from typing import Any
from spy.defines import BoundingBox, SiMaModels
from spy.constants import Constants
from spy import ApuDirect
from abc import ABC, abstractmethod
from typing import List
from enum import Enum
from ev_transforms.transforms import (
resize,
tessellation,
quantize,
detessellation,
dequantize,
)
# Base class: Generic operation handler
[docs]
class ColorConversionCodes(Enum):
[docs]
COLOR_BGRTORGB = "SIMA_COLOR_BGRTORGB"
[docs]
COLOR_RGBTOBGR = "SIMA_COLOR_RGBTOBGR"
[docs]
COLOR_BGRTOGRAY = "SIMA_COLOR_BGRTOGRAY"
[docs]
COLOR_RGBTOGRAY = "SIMA_COLOR_RGBTOGRAY"
[docs]
COLOR_IYUVTOBGR = "SIMA_COLOR_IYUVTOBGR"
[docs]
COLOR_IYUVTONV12 = "SIMA_COLOR_IYUVTONV12"
[docs]
COLOR_NV12TOBGR = "SIMA_COLOR_NV12TOBGR"
[docs]
COLOR_BGRTONV12 = "SIMA_COLOR_BGRTONV12"
[docs]
COLOR_RGBTONV12 = "SIMA_COLOR_RGBTONV12"
[docs]
COLOR_NV12TORGB = "SIMA_COLOR_NV12TORGB"
[docs]
class SimaOperation:
def __init__(self, name: str, configuration: dict):
"""
Base class for Sima operations.
"""
[docs]
self.configuration = configuration
@abstractmethod
[docs]
def run(self, tensor: np.ndarray) -> np.ndarray:
"""
Abstract method for executing the operation.
"""
pass
[docs]
class SimaOperationCVU(SimaOperation):
def __init__(self, name: str, configuration: dict):
"""
Derived class for operations requiring CVU.
"""
super().__init__(name, configuration)
[docs]
def run(self, tensor: np.ndarray) -> np.ndarray:
result_tens = self.cvu_obj.run(tensor)
return result_tens
[docs]
class SimaGenericPreproc(SimaOperationCVU):
def __init__(self, parser_obj):
"""
A Generic preprocess implementation.
"""
configuration = parser_obj.get_config(sima_mpk.preproc)[0]
self.target_width, self.target_height = configuration[
"output_width"], configuration["output_height"]
super().__init__("gen_preproc", configuration)
[docs]
def run(self, tensor: np.ndarray) -> np.ndarray:
"""
Run the CVU operation.
"""
if self.cvu_obj is None:
raise RuntimeError(
"CVU object is not configured. Call 'configure' first.")
result_tens = super().run(tensor)
if not self.offset:
self.offset = result_tens.size//2
return result_tens[:self.offset]
[docs]
class SimaQuantTess(SimaOperationCVU):
def __init__(self, parser_obj):
"""
Quant-Tess Kernel.
"""
configuration = parser_obj.get_config(sima_mpk.quant_tess)[0]
self.target_width, self.target_height = configuration[
"input_width"], configuration["input_height"]
super().__init__("quant_tess", configuration)
[docs]
def run(self, tensor: np.ndarray) -> np.ndarray:
"""
Run the CVU operation.
"""
if self.cvu_obj is None:
raise RuntimeError(
"CVU object is not configured. Call 'configure' first.")
if not tensor.dtype==np.float32:
tensor = tensor.astype(np.float32)
uint8_buffer = np.frombuffer(tensor.tobytes(), dtype=np.uint8)
result_tens = super().run(uint8_buffer)
return result_tens
[docs]
class SimaDetessDequant(SimaOperationCVU):
def __init__(self, parser_obj):
configuration = parser_obj.get_config(sima_mpk.detess_dequant)[0]
super().__init__("detessdequant", configuration)
self.configure(self.configuration)
[docs]
self.tensor_shapes = parser_obj.tensor_shapes
[docs]
def reshape_ev_buffer(self, tensor: np.ndarray) -> List[np.ndarray]:
"""Reshape flattened EV buffer to list of tensors"""
tensor_list = []
start = 0
num_tensors = len(self.tensor_shapes)
for idx in range(num_tensors):
tensor_shape = self.tensor_shapes[idx][0]
offset = int(np.prod(tensor_shape))
tensor_list.append(
tensor[start:start+offset].reshape(tensor_shape))
start += offset
return tensor_list
[docs]
def run(self, tensor: np.ndarray) -> np.ndarray:
result_tens = super().run(tensor)
result_tens = self.reshape_ev_buffer(result_tens.view(np.float32))
return result_tens
[docs]
class SimaModelRunner(SimaOperation):
def __init__(self, parser_obj):
[docs]
self.parser_obj = parser_obj
# Initialize SimaOperation
configuration = parser_obj.get_config(sima_mpk.mla)[0]
SimaOperation.__init__(self, "process_mla", configuration)
[docs]
def run(self, tensor: np.ndarray) -> np.ndarray:
start_time = time.time() # Record start time
tensor_out = self.mla_obj.run([tensor])[0]
end_time = time.time() # Record end time
execution_time_ms = (end_time - start_time) * 1000
logger.log(
LogLevel.DEBUG, f"SimaModelRunner | Execution time: {execution_time_ms:.3f} ms")
return tensor_out
[docs]
class SimaBoxRender:
"""
A class for rendering bounding boxes on images using ARM A65 Core.
"""
@classmethod
def _configure(cls, frame_width: int, frame_height: int, labelfile: str, render_type: str = "bboxs", image_format: str = "NV12", debug: bool = False):
"""
Internal method to initialize the label map and colors if not already initialized.
Args:
label_map_file (str): Path to the label map file.
"""
if cls.apu_obj is None:
cls.apu_obj = ApuDirect(Constants.BOX_RENDER_LIB)
render_config = {
"version": "0.1",
"render_type": render_type,
"image_format": image_format,
"label_file": f"{labelfile}",
"original_width": frame_width,
"original_height": frame_height,
"debug": debug,
"color_box": Constants.DEFAULT_COLOR_BOX,
"color_text" : Constants.DEFAULT_COLOR_TEXT,
"class_special" : Constants.DEFAULT_CLASS_SPECIAL,
"color_box_special" : Constants.DEFAULT_COLOR_BOX_SPECIAL,
"threshold": Constants.DEFAULT_THRESHOLD
}
_ = cls.apu_obj.config(render_config)
@classmethod
[docs]
def render(cls, image: np.ndarray, boxes: BoundingBox, frame_width: int, frame_height: int, labelfile: str) -> np.ndarray:
"""
Renders bounding boxes and labels on the given image.
Args:
image (np.ndarray): Input image.
boxes (list[BoundingBox]): List of bounding boxes with attributes (_x, _y, _w, _h, _class_id).
frame_height (int): Height of the frame.
frame_width (int): Width of the frame.
label_map_file (str): Path to the label map file (used only for first-time initialization).
Returns:
np.ndarray: Image with rendered bounding boxes and labels.
"""
# Initialize label map and colors if not already done
cls._configure(frame_width, frame_height, labelfile)
# rendering is done directly on the input buffer
_, ret = cls.apu_obj.run(image, boxes)
return image
[docs]
class SimaBoxDecode(SimaOperation):
def __init__(self, parser_obj):
[docs]
self.parser_obj = parser_obj
[docs]
self.apu_obj = ApuDirect(Constants.GENERIC_BOX_DECODER_LIB)
[docs]
def run(self, tensor: np.ndarray) -> np.ndarray:
if SiMaModels.is_supported(self.decode_type):
start_time = time.time() # Record start time
result_tens, status = self.apu_obj.run(tensor)
end_time = time.time() # Record end time
execution_time_ms = (end_time - start_time) * 1000
logger.log(
LogLevel.DEBUG, f"BoxDecoder: {self.decode_type} | Execution time: {execution_time_ms:.3f} ms")
return result_tens
else:
logger.log(
LogLevel.DEBUG, f"Box decoding for {self.decode_type} not supported, yet")
return tensor
[docs]
def get_json(config, name):
json_object = json.dumps(config, indent=4)
dst_path = f"/tmp/{name}.json"
logger.log(
LogLevel.INFO, f"{name} config:\n{json_object}")
with open(dst_path, "w", encoding="utf8") as outfile:
outfile.write(json_object)
return dst_path
[docs]
def set_log_level(level: LogLevel):
"""
Set the log level for the PePPi component.
This function configures the verbosity of logging for PePPi, controlling which log messages
are recorded. Logs are written to the file located at: ``/var/log/simaai_peppi.log``.
Args:
level (LogLevel): The desired logging level (e.g., DEBUG, INFO, WARNING, ERROR).
"""
logger.set_level(level)
[docs]
def resize(image, target_width, target_height) -> np.ndarray:
"""
Resize image to the specified width and height using EV74 kernel.
Args:
image (numpy.ndarray): The input image as a NumPy array.
target_width (int): The desired width of the resized image.
target_height (int): The desired height of the resized image.
Returns:
numpy.ndarray: The resized image as a NumPy array.
"""
input_height, input_width = image.shape[:2]
parser_obj = Parser(is_v2_format=True)
user_config = {"input_width": input_width,
"input_height": input_height,
"output_width": target_width,
"output_height": target_height
}
config_dict = parser_obj.get_config(
sima_mpk.resize, user_config=user_config)[0]
cvu_obj = spy.cvu(get_json(config_dict, "resize"))
resized_array = cvu_obj.run(image.flatten())
del cvu_obj
return resized_array
[docs]
def cvtColor(image, width: int, height: int, color_type: ColorConversionCodes) -> np.ndarray:
"""
Convert the color space of an image using the EV74's Transform Color Conversion kernel.
This operation is optimized for performance and leverages hardware acceleration for color conversion.
Args:
image (numpy.ndarray): The input image as a NumPy array.
width (int): The width of the image.
height (int): The height of the image.
color_type (ColorConversionCodes): The color conversion code indicating
the desired color transformation.
Available color conversion options:
- sima.COLOR_BGRTORGB
- sima.COLOR_RGBTOBGR
- sima.COLOR_BGRTOGRAY
- sima.COLOR_RGBTOGRAY
- sima.COLOR_IYUVTOBGR
- sima.COLOR_IYUVTONV12
- sima.COLOR_NV12TOBGR
- sima.COLOR_BGRTONV12
- sima.COLOR_RGBTONV12
- sima.COLOR_NV12TORGB
Returns:
numpy.ndarray: The image with the converted color space.
"""
parser_obj = Parser(is_v2_format=True)
config_dict = parser_obj.get_config(sima_mpk.cvtcolor, user_config={
"input_width": width, "input_height": height, "conv_type": color_type})[0]
cvu_obj = cvu_obj = spy.cvu(get_json(config_dict, "cvt_color"))
output_array = cvu_obj.run(image.flatten())
del cvu_obj
return output_array