Source code for sima_host

import argparse
import time
import numpy as np
from typing import Union
from time import sleep
import sys
from simahostpy import *
import importlib
import os.path

[docs] package = 'simahostpy'
[docs] simaaihostpy_implementation = importlib.import_module(package)
[docs] class BaseEnum(): @classmethod
[docs] def values(cls): return [member.value for member in cls]
[docs] class SiMaErrorCode(BaseEnum):
[docs] SUCCESS = 0
[docs] FAILURE = 1
[docs] INVALID_INPUT = 2
[docs] OVERFLOW = 3
[docs] RETRY = 4
[docs] error_code_to_infer_state = { "0": SiMaErrorCode.SUCCESS, "1": SiMaErrorCode.FAILURE, "2": SiMaErrorCode.INVALID_INPUT, "3": SiMaErrorCode.OVERFLOW, "4": SiMaErrorCode.RETRY, }
[docs] class Constants:
[docs] DEVICE_NAME = "simapcie"
[docs] FRAME_HEIGHT = 6336 # Original frame height
[docs] FRAME_WIDTH = 9504 # Original frame width
[docs] BUFFER_SIZE = int(FRAME_HEIGHT * FRAME_WIDTH * 3)
[docs] DUMP_OUTPUT = True # Dump json output
[docs] REMOVE_PROCESSED = False # Remove processed images from input_dir (This must be True for production env)
[docs] MIN_TIME_OUT = 0.5 # Time out for RESET (Not in use)
[docs] MAX_TIME_OUT = 3 # Time out for re-deploy in Seconds
[docs] INFER_TIMEOUT = 5 # Maximum allowable response time
[docs] SOC_WAIT_TIME = 10 # Sleep time after each deploy
[docs] MAX_REDEPLOY_INTERVAL = 60 * 2 # Redeploy interval in Seconds
[docs] EN_REDEPLOY = False # Enable/Disable Redeploy
[docs] EN_RESET = True
[docs] REDEPLOY_RETRY = 5 # Number of consecutive retries for REDEPLOY
[docs] DEVICE_TIMEOUT = 60 # PCIE timeout
[docs] DEVICE_QUEUE = 8 # PCIE buffer queue
[docs] FRAME_COUNT = 5 # Allowed frame count before each re-deploy
[docs] ANNOTATE = False # Annotate PMs and dump it to file
[docs] class intf: def __init__(self, dev_name, device_queue, device_timeout):
[docs] self.dev_name = dev_name
[docs] self.device_queue = device_queue
[docs] self.device_timeout = device_timeout
[docs] self.host_helper = None
[docs] self.dev_ptr = None
[docs] self.guids = []
[docs] self.model_ref = None
[docs] self.meta_data = []
self.host_helper = simaaihostpy_implementation.HostHelper() if (self.host_helper == None): raise Exception("Couldn't get the device inference interface") self.guids = self.host_helper.enumerate_device() if (len(self.guids) <= 0): return
[docs] def connect(self, guid, queue_entries = 0, queue_depth = 0): if guid is None: raise ValueError("Guid cannot be NULL, please pass a valid guid") if self.host_helper is None: raise Exception(" The inference interface is not initialized") self.dev_ptr = self.host_helper.open(guid) if self.dev_ptr is None: raise Exception("Unable to connect to the device") self.host_helper.print_slot_number(self.dev_ptr) self.host_helper.update_device_defaults(self.dev_ptr, queue_entries, queue_depth) return self.dev_ptr
[docs] def load_model(self, device, in_shape_list, out_shape_list, metadata, model_path = None, model_hdl:dict = None): if (device != self.dev_ptr): raise Exception("Device mismatch") if ((in_shape_list is None) or (out_shape_list is None)): raise ValueError('Shapes of in and out tensors cannot be None') self.host_helper.prepare_tensors(in_shape_list, out_shape_list, 0) self.host_helper.set_metadata(metadata) self.meta_data = metadata self.host_helper.set_queue(device, self.device_queue) self.host_helper.set_request_timeout(device, self.device_timeout) print(f"args given: model_hdl: {model_hdl}, model_path: {model_path}") if((model_hdl is not None) and (model_path is not None)): model_def = self.host_helper.set_model_definition( model_hdl["in_tensors"], model_hdl["out_tensors"], model_hdl["in_batch_sz"], model_hdl["out_batch_sz"], model_hdl["in_shape"], model_hdl["out_shape"], ) self.model_ref = self.host_helper.load(device, model_path, model_def) if (self.model_ref is None): raise Exception(f'Unable to load model_hdl to the device {self.model_ref}') return self.model_ref else: self.model_ref = self.host_helper.load(device, model_path, 0) if (self.model_ref is None): raise Exception(f'Unable to load model_hdl to the device {self.model_ref}') return self.model_ref
# def __update__(self, in_data: Union[np.ndarray, bytes]): # if in_data is None: # raise Exception('Input numpy array cannot be None') # self.host_helper.memcpy(in_data, 0) # def __get_tensor__(self, out_data: Union[np.ndarray, bytes]): # if out_data is None: # raise Exception('Input numpy array cannot be None') # self.host_helper.memcpy_tonp(out_data, 0)
[docs] def unload_model(self, model): if (self.model_ref != model): raise Exception("Model refernce mismatch") ret = self.host_helper.unload(model) return error_code_to_infer_state[str(ret.value)]
[docs] def disconnect(self,device): if (device != self.dev_ptr): raise Exception("Device handle mismatch") ret = self.host_helper.close(device) return error_code_to_infer_state[str(ret.value)]
[docs] def run(mpk_package="project.mpk"): soc_devices = simaaihostpy_implementation.HostHelper().enumerate_device() for soc_device in soc_devices: devintf = intf(Constants.DEVICE_NAME, Constants.DEVICE_QUEUE, Constants.DEVICE_TIMEOUT) in_out_sz = int(Constants.BUFFER_SIZE * 0.25) model_hdl_org = { "in_tensors": 1, "out_tensors": 1, "out_batch_sz": 1, "in_batch_sz": 1, "in_shape": [in_out_sz], "out_shape": [in_out_sz], } print(f"Connecting to PCIe device: {soc_device}") device = devintf.connect(soc_device) print(f"Connected to PCIe device: {soc_device}") # Populate Model model_hdl = {} model_hdl['numInputTensors'] = 1 model_hdl['numOutputTensors'] = 1 model_hdl['outputBatchSize'] = 1 model_hdl['inputBatchSize'] = 1 model_hdl['inputShape'] = [[640, 480, 3]] model_hdl['outputShape'] = [[1, 1, 9200], [1, 1, 400]] model_hdl['qid'] = 0 # each queue id is for one parallelly running pipeline model_hdl_org.update(model_hdl) # Deploy model print(f"Deploying {mpk_package} (resnet pipeline) to PCIe device: {soc_device}") try: model = devintf.load_model(device,[[int(in_out_sz)]],[[int(in_out_sz)]],['A', 'B'], mpk_package, model_hdl=model_hdl_org) sleep(Constants.SOC_WAIT_TIME) except Exception as e: print("Initialize soc for %s failed with ERROR: ' %s' ", soc_device, str(e)) sys.exit(0) print(f"Successfully deployed (loaded) the pipeline.") # Sleep for 5 seconds time.sleep(5) # Unload model print(f"Killing (unloading) the pipeline...") ret = devintf.unload_model(model) if ret != 0: print("Failed to unload model") print(f"Killed (unloaded) the pipeline.") # Disconnect the device print(f"Disconnecting the PCIe device: {soc_device}") ret = devintf.disconnect(device) if ret != 0: print(f"Failed to disconnect PCIe slot: {soc_device}") print(f"Disconnected the PCIe device: {soc_device}")
if __name__ == "__main__":
[docs] parser = argparse.ArgumentParser( prog='test_simahostpy.py', description='Example python script to test connection, deployment of a pipeline, killing the pipeline and disconnection from the PCIe device.', epilog='')
parser.add_argument('-f', help='Path to the mpk file to test with.') args = parser.parse_args() if not os.path.isfile(args.f): print(f"File not found, please check and try again.") sys.exit(0) print(f"Testing with user provided project.mpk file...") run(args.f) print(f"Completed Testing.")