#########################################################
# Copyright (C) 2024-25 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
import os
import cv2
from . import operations as sima_ops
from mpk_parser import Parser
import numpy as np
from spy.logger import Logger, LogLevel
import tarfile
import shutil
from pathlib import Path
from spy.defines import SiMaModels
from spy.constants import Constants
from typing import Union, List
from ev_transforms.transforms import resize, tessellation, quantize, detessellation, dequantize
[docs]
def process_pipeline_tar(tar_file, pipeline, modelname):
"""
Extracts a tar.gz file, processes it, and moves specific files to a target directory.
Args:
tar_file (str): Path to the tar.gz file.
pipeline (str): Name of the application
modelname (str): Name of the MLSocSession:Model
Raises:
FileNotFoundError: If the tar.gz file is not found.
ValueError: If required files (.lm or .elf, and a valid .json) are not found in the archive.
RuntimeError: If any error occurs during extraction or file processing.
"""
# Define target directory
target_dir = f"{Constants.DEFAULT_PEPPI_ROOT_FOLDER}/{pipeline}/share/{modelname}"
os.makedirs(target_dir, exist_ok=True)
# Extract tar.gz file to a temporary directory
temp_dir = "/tmp/temp_extract"
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir) # Clean up any existing temp directory
os.makedirs(temp_dir)
try:
if not os.path.exists(tar_file):
logger.log(LogLevel.ERR, f"File not found: {tar_file}")
raise FileNotFoundError(f"File not found: {tar_file}")
with tarfile.open(tar_file, "r:gz") as tar:
tar.extractall(temp_dir)
# Initialize flags and trackers
model_file = None
largest_json_file = None
mpk_json = None
largest_json_size = -1
# Move and rename .lm/.elf, and .json files
for root, _, files in os.walk(temp_dir):
for file in files:
src_path = os.path.join(root, file)
# Match based on file extension
match file.split('.')[-1]:
case "lm":
# Handle model files (.lm or .elf)
model_file = file
shutil.copy2(src_path, target_dir)
logger.log(
LogLevel.INFO, f"Found {file}")
case "elf":
# Handle model files (.lm or .elf)
model_file = file
shutil.copy2(src_path, target_dir)
logger.log(
LogLevel.INFO, f"Found {file}")
case "json":
# Handle JSON files [targz may contain MPK json + EV74, MLA jsons]
file_size = os.path.getsize(src_path)
if file_size > largest_json_size:
largest_json_file = src_path
largest_json_size = file_size
case _: # Default case for unhandled file types
logger.log(LogLevel.DEBUG,
f"Skipping unrecognized file: {src_path}")
# Process the largest JSON file
if largest_json_file:
mpk_json = Path(largest_json_file).name
shutil.copy2(largest_json_file, target_dir)
logger.log(LogLevel.INFO, f"Found {largest_json_file}")
else:
logger.log(
LogLevel.ERR, "No valid JSON files found in the tar.gz archive.")
raise ValueError(
"No valid JSON files found in the tar.gz archive.")
# Check if required files are found
if model_file is None:
logger.log(
LogLevel.ERR, "Required files (.lm or .elf) are missing in the tar.gz archive.")
raise ValueError(
"Required files (.lm or .elf) are missing in the tar.gz archive.")
except tarfile.TarError as e:
logger.log(LogLevel.ERR, f"Error extracting tar.gz file: {e}")
raise RuntimeError(f"Error extracting tar.gz file: {e}")
except FileNotFoundError as e:
logger.log(LogLevel.ERR, f"Unexpected error: {e}")
raise RuntimeError(f"{e}")
except Exception as e:
logger.log(LogLevel.ERR, f"Unexpected error: {e}")
raise RuntimeError(f"Unexpected error: {e}")
finally:
# Clean up temporary directory
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
return (
f"{target_dir}/{model_file}",
f"{target_dir}/{mpk_json}"
)
[docs]
class MLSoCSession:
"""
A class for interacting with the MLSoC for model inference.
"""
def __init__(self, model_file: str, pipeline: str, frame_width: int, frame_height: int, session_name: str = "model1", ev_preproc=True):
"""
Initializes the object with the specified parameters.
Args:
model_file (str): Path to the model tar.gz file to be loaded for processing.
pipeline (str): Unique Name of the application.
frame_width (int): Input image width.
frame_height (int): Input image height.
session_name (str) Optional: Session name(For multi model cases, this has to be provided)
ev_preproc (bool): Use EV74 transform for the preprocessing
"""
[docs]
self.pipeline = pipeline
[docs]
self.session_name = session_name
[docs]
self.session_id = f"_________Pipeline: {self.pipeline}| Model: {self.session_name}_________"
logger.log(LogLevel.INFO, f"\n{self.session_id}\n")
self.lm_path, self.mpk_json_path = process_pipeline_tar(
model_file, pipeline, session_name)
[docs]
self.ev_preproc = ev_preproc
self.frame_width, self.frame_height = frame_width, frame_height
[docs]
self.parser_obj = Parser(self.mpk_json_path)
[docs]
self.model_file = model_file
[docs]
self.tensor_shapes = self.parser_obj.tensor_shapes
[docs]
def create_plugin(self, plugin_class):
return plugin_class(self.parser_obj)
[docs]
def a65_preprocess(self, in_frame: np.ndarray) -> np.ndarray:
if self.model_external_params["resize"]:
if len(in_frame.shape) != 3:
raise Exception(
f"resize expects 3 dimensional array, got {in_frame.shape} dimension")
in_frame = cv2.cvtColor(in_frame, cv2.COLOR_BGR2RGB)
in_frame = resize(np.expand_dims(in_frame, axis=0), self.target_width,
self.target_height, False, deposit_location=self.padding_type).squeeze()
in_frame = ((in_frame / 255. - self.mean) /
self.std_dv).astype(np.float32)
quantized = quantize(
in_frame, channel_params=self.quant_params, num_bits=8)
preprocessed = tessellation(np.expand_dims(
quantized, axis=0), self.tile_height, self.tile_width, self.tile_depth, self.align_c16)
return preprocessed
[docs]
def preprocess(self, in_frame: np.ndarray) -> np.ndarray:
"""
Preprocesses the input frame for model inference based on the EV74 configuration
Args:
in_frame (np.ndarray): The input frame as a NumPy array.
Returns:
np.ndarray: The preprocessed frame ready for model inference.
"""
if not self.ev_preproc:
preprocessed = self.a65_preprocess(in_frame)
else:
preprocessed = self.preproc_plugin.run(in_frame)
return preprocessed
[docs]
def run_model(self, frame: np.ndarray) -> Union[np.ndarray, List[np.ndarray]]:
"""
Runs the model inference (Preprocess, MLA, and Postprocess) on a given frame.
For the following models, the output is from SimaBoxDecode:
- ``centernet``
- ``yolo``
- ``detr``
- ``effdet``
Args:
frame (np.ndarray): Input frame as a NumPy array.
Returns:
np.ndarray: The output tensor after running the model and applying the appropriate postprocessing.
- For the models mentioned above, the output is from SimaBoxDecode containing bounding boxes.
- For other models, the output is from Detess Dequant.
"""
frame_preproc = self.preprocess(frame)
tensor_out = self.model_runner.run(frame_preproc)
if SiMaModels.is_supported(self.decode_type):
tensor_out = self.box_decoder.run(tensor_out)
else:
tensor_out = self.detess_dequant_plugin.run(tensor_out)
return tensor_out
[docs]
def set_log_level(self, level: LogLevel):
logger.set_level(level)
[docs]
def get_configs(self):
return self.preproc_plugin.configuration, self.model_runner.configuration
[docs]
def get_inference_resolution(self):
return self.preproc_plugin.target_width, self.preproc_plugin.target_height
[docs]
def release(self):
self.tensor_shapes = []