Source code for sima.mini_pipeline_base

#########################################################
# Copyright (C) 2024-25 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################

import os
import cv2
from . import operations as sima_ops
from mpk_parser import Parser
import numpy as np
from spy.logger import Logger, LogLevel
import tarfile
import shutil
from pathlib import Path
from spy.defines import SiMaModels
from spy.constants import Constants
from typing import Union, List

from ev_transforms.transforms import resize, tessellation, quantize, detessellation, dequantize
[docs] logger = Logger()
[docs] def process_pipeline_tar(tar_file, pipeline, modelname): """ Extracts a tar.gz file, processes it, and moves specific files to a target directory. Args: tar_file (str): Path to the tar.gz file. pipeline (str): Name of the application modelname (str): Name of the MLSocSession:Model Raises: FileNotFoundError: If the tar.gz file is not found. ValueError: If required files (.lm or .elf, and a valid .json) are not found in the archive. RuntimeError: If any error occurs during extraction or file processing. """ # Define target directory target_dir = f"{Constants.DEFAULT_PEPPI_ROOT_FOLDER}/{pipeline}/share/{modelname}" os.makedirs(target_dir, exist_ok=True) # Extract tar.gz file to a temporary directory temp_dir = "/tmp/temp_extract" if os.path.exists(temp_dir): shutil.rmtree(temp_dir) # Clean up any existing temp directory os.makedirs(temp_dir) try: if not os.path.exists(tar_file): logger.log(LogLevel.ERR, f"File not found: {tar_file}") raise FileNotFoundError(f"File not found: {tar_file}") with tarfile.open(tar_file, "r:gz") as tar: tar.extractall(temp_dir) # Initialize flags and trackers model_file = None largest_json_file = None mpk_json = None largest_json_size = -1 # Move and rename .lm/.elf, and .json files for root, _, files in os.walk(temp_dir): for file in files: src_path = os.path.join(root, file) # Match based on file extension match file.split('.')[-1]: case "lm": # Handle model files (.lm or .elf) model_file = file shutil.copy2(src_path, target_dir) logger.log( LogLevel.INFO, f"Found {file}") case "elf": # Handle model files (.lm or .elf) model_file = file shutil.copy2(src_path, target_dir) logger.log( LogLevel.INFO, f"Found {file}") case "json": # Handle JSON files [targz may contain MPK json + EV74, MLA jsons] file_size = os.path.getsize(src_path) if file_size > largest_json_size: largest_json_file = src_path largest_json_size = file_size case _: # Default case for unhandled file types logger.log(LogLevel.DEBUG, f"Skipping unrecognized file: {src_path}") # Process the largest JSON file if largest_json_file: mpk_json = Path(largest_json_file).name shutil.copy2(largest_json_file, target_dir) logger.log(LogLevel.INFO, f"Found {largest_json_file}") else: logger.log( LogLevel.ERR, "No valid JSON files found in the tar.gz archive.") raise ValueError( "No valid JSON files found in the tar.gz archive.") # Check if required files are found if model_file is None: logger.log( LogLevel.ERR, "Required files (.lm or .elf) are missing in the tar.gz archive.") raise ValueError( "Required files (.lm or .elf) are missing in the tar.gz archive.") except tarfile.TarError as e: logger.log(LogLevel.ERR, f"Error extracting tar.gz file: {e}") raise RuntimeError(f"Error extracting tar.gz file: {e}") except FileNotFoundError as e: logger.log(LogLevel.ERR, f"Unexpected error: {e}") raise RuntimeError(f"{e}") except Exception as e: logger.log(LogLevel.ERR, f"Unexpected error: {e}") raise RuntimeError(f"Unexpected error: {e}") finally: # Clean up temporary directory if os.path.exists(temp_dir): shutil.rmtree(temp_dir) return ( f"{target_dir}/{model_file}", f"{target_dir}/{mpk_json}" )
[docs] class MLSoCSession: """ A class for interacting with the MLSoC for model inference. """ def __init__(self, model_file: str, pipeline: str, frame_width: int, frame_height: int, session_name: str = "model1", ev_preproc=True): """ Initializes the object with the specified parameters. Args: model_file (str): Path to the model tar.gz file to be loaded for processing. pipeline (str): Unique Name of the application. frame_width (int): Input image width. frame_height (int): Input image height. session_name (str) Optional: Session name(For multi model cases, this has to be provided) ev_preproc (bool): Use EV74 transform for the preprocessing """
[docs] self.pipeline = pipeline
[docs] self.session_name = session_name
[docs] self.session_id = f"_________Pipeline: {self.pipeline}| Model: {self.session_name}_________"
logger.log(LogLevel.INFO, f"\n{self.session_id}\n") self.lm_path, self.mpk_json_path = process_pipeline_tar( model_file, pipeline, session_name)
[docs] self.ev_preproc = ev_preproc
self.frame_width, self.frame_height = frame_width, frame_height
[docs] self.parser_obj = Parser(self.mpk_json_path)
[docs] self.model_file = model_file
[docs] self.tensor_shapes = self.parser_obj.tensor_shapes
[docs] def create_plugin(self, plugin_class): return plugin_class(self.parser_obj)
[docs] def configure(self, model_external_params) -> bool: """ Configures EV74, MLA components with external model parameters Args: model_external_params (dict): A dictionary containing external parameters for configuring the model and related components. """ model_external_params["model_path"] = self.lm_path model_external_params["original_width"] = self.frame_width model_external_params["original_height"] = self.frame_height self.box_decoder = self.create_plugin(sima_ops.SimaBoxDecode) self.box_decoder.configure(model_external_params) self.decode_type = model_external_params.get("decode_type", "NA") if model_external_params["normalize"]: model_external_params["input_width"] = self.frame_width model_external_params["input_height"] = self.frame_height self.preproc_plugin = self.create_plugin( sima_ops.SimaGenericPreproc) else: self.preproc_plugin = self.create_plugin(sima_ops.SimaQuantTess) self.model_runner = self.create_plugin(sima_ops.SimaModelRunner) self.detess_dequant_plugin = self.create_plugin( sima_ops.SimaDetessDequant) self.preproc_plugin.configure(model_external_params) self.model_runner.configure(model_external_params) self.model_external_params = model_external_params if not self.ev_preproc: self.input_depth = self.preproc_plugin.configuration["input_depth"] qzp = self.preproc_plugin.configuration["q_zp"] qscale = self.preproc_plugin.configuration["q_scale"] self.quant_params = [ [np.float32(qscale), np.int32(qzp)]] * self.input_depth self.tile_height = self.preproc_plugin.configuration["tile_height"] self.tile_width = self.preproc_plugin.configuration["tile_width"] self.tile_depth = self.preproc_plugin.configuration["tile_depth"] self.aspect_ratio = model_external_params["aspect_ratio"] self.padding_type = "".join( model_external_params["padding_type"].lower().split("_")) self.align_c16 = 0 self.mean = np.array(model_external_params["channel_mean"]) self.std_dv = np.array(model_external_params["channel_stddev"]) self.target_width, self.target_height = self.get_inference_resolution() return True
[docs] def a65_preprocess(self, in_frame: np.ndarray) -> np.ndarray: if self.model_external_params["resize"]: if len(in_frame.shape) != 3: raise Exception( f"resize expects 3 dimensional array, got {in_frame.shape} dimension") in_frame = cv2.cvtColor(in_frame, cv2.COLOR_BGR2RGB) in_frame = resize(np.expand_dims(in_frame, axis=0), self.target_width, self.target_height, False, deposit_location=self.padding_type).squeeze() in_frame = ((in_frame / 255. - self.mean) / self.std_dv).astype(np.float32) quantized = quantize( in_frame, channel_params=self.quant_params, num_bits=8) preprocessed = tessellation(np.expand_dims( quantized, axis=0), self.tile_height, self.tile_width, self.tile_depth, self.align_c16) return preprocessed
[docs] def preprocess(self, in_frame: np.ndarray) -> np.ndarray: """ Preprocesses the input frame for model inference based on the EV74 configuration Args: in_frame (np.ndarray): The input frame as a NumPy array. Returns: np.ndarray: The preprocessed frame ready for model inference. """ if not self.ev_preproc: preprocessed = self.a65_preprocess(in_frame) else: preprocessed = self.preproc_plugin.run(in_frame) return preprocessed
[docs] def run_model(self, frame: np.ndarray) -> Union[np.ndarray, List[np.ndarray]]: """ Runs the model inference (Preprocess, MLA, and Postprocess) on a given frame. For the following models, the output is from SimaBoxDecode: - ``centernet`` - ``yolo`` - ``detr`` - ``effdet`` Args: frame (np.ndarray): Input frame as a NumPy array. Returns: np.ndarray: The output tensor after running the model and applying the appropriate postprocessing. - For the models mentioned above, the output is from SimaBoxDecode containing bounding boxes. - For other models, the output is from Detess Dequant. """ frame_preproc = self.preprocess(frame) tensor_out = self.model_runner.run(frame_preproc) if SiMaModels.is_supported(self.decode_type): tensor_out = self.box_decoder.run(tensor_out) else: tensor_out = self.detess_dequant_plugin.run(tensor_out) return tensor_out
[docs] def set_log_level(self, level: LogLevel): logger.set_level(level)
[docs] def get_configs(self): return self.preproc_plugin.configuration, self.model_runner.configuration
[docs] def get_inference_resolution(self): return self.preproc_plugin.target_width, self.preproc_plugin.target_height
[docs] def release(self): self.tensor_shapes = []