import argparse
import time
import numpy as np
from typing import Union
from time import sleep
import sys
from simahostpy import *
import importlib
import os.path
[docs]
simaaihostpy_implementation = importlib.import_module(package)
[docs]
class BaseEnum():
@classmethod
[docs]
def values(cls):
return [member.value for member in cls]
[docs]
class SiMaErrorCode(BaseEnum):
[docs]
error_code_to_infer_state = {
"0": SiMaErrorCode.SUCCESS,
"1": SiMaErrorCode.FAILURE,
"2": SiMaErrorCode.INVALID_INPUT,
"3": SiMaErrorCode.OVERFLOW,
"4": SiMaErrorCode.RETRY,
}
[docs]
class Constants:
[docs]
DEVICE_NAME = "simapcie"
[docs]
FRAME_HEIGHT = 6336 # Original frame height
[docs]
FRAME_WIDTH = 9504 # Original frame width
[docs]
BUFFER_SIZE = int(FRAME_HEIGHT * FRAME_WIDTH * 3)
[docs]
DUMP_OUTPUT = True # Dump json output
[docs]
REMOVE_PROCESSED = False # Remove processed images from input_dir (This must be True for production env)
[docs]
MIN_TIME_OUT = 0.5 # Time out for RESET (Not in use)
[docs]
MAX_TIME_OUT = 3 # Time out for re-deploy in Seconds
[docs]
INFER_TIMEOUT = 5 # Maximum allowable response time
[docs]
SOC_WAIT_TIME = 10 # Sleep time after each deploy
[docs]
MAX_REDEPLOY_INTERVAL = 60 * 2 # Redeploy interval in Seconds
[docs]
EN_REDEPLOY = False # Enable/Disable Redeploy
[docs]
REDEPLOY_RETRY = 5 # Number of consecutive retries for REDEPLOY
[docs]
DEVICE_TIMEOUT = 60 # PCIE timeout
[docs]
DEVICE_QUEUE = 8 # PCIE buffer queue
[docs]
FRAME_COUNT = 5 # Allowed frame count before each re-deploy
[docs]
ANNOTATE = False # Annotate PMs and dump it to file
[docs]
class intf:
def __init__(self, dev_name, device_queue, device_timeout):
[docs]
self.dev_name = dev_name
[docs]
self.device_queue = device_queue
[docs]
self.device_timeout = device_timeout
[docs]
self.host_helper = None
self.host_helper = simaaihostpy_implementation.HostHelper()
if (self.host_helper == None):
raise Exception("Couldn't get the device inference interface")
self.guids = self.host_helper.enumerate_device()
if (len(self.guids) <= 0):
return
[docs]
def connect(self, guid, queue_entries = 0, queue_depth = 0):
if guid is None:
raise ValueError("Guid cannot be NULL, please pass a valid guid")
if self.host_helper is None:
raise Exception(" The inference interface is not initialized")
self.dev_ptr = self.host_helper.open(guid)
if self.dev_ptr is None:
raise Exception("Unable to connect to the device")
self.host_helper.print_slot_number(self.dev_ptr)
self.host_helper.update_device_defaults(self.dev_ptr,
queue_entries,
queue_depth)
return self.dev_ptr
[docs]
def load_model(self, device,
in_shape_list, out_shape_list,
metadata,
model_path = None, model_hdl:dict = None):
if (device != self.dev_ptr):
raise Exception("Device mismatch")
if ((in_shape_list is None) or (out_shape_list is None)):
raise ValueError('Shapes of in and out tensors cannot be None')
self.host_helper.prepare_tensors(in_shape_list,
out_shape_list, 0)
self.host_helper.set_metadata(metadata)
self.meta_data = metadata
self.host_helper.set_queue(device, self.device_queue)
self.host_helper.set_request_timeout(device, self.device_timeout)
print(f"args given: model_hdl: {model_hdl}, model_path: {model_path}")
if((model_hdl is not None) and (model_path is not None)):
model_def = self.host_helper.set_model_definition(
model_hdl["in_tensors"],
model_hdl["out_tensors"],
model_hdl["in_batch_sz"],
model_hdl["out_batch_sz"],
model_hdl["in_shape"],
model_hdl["out_shape"],
)
self.model_ref = self.host_helper.load(device,
model_path,
model_def)
if (self.model_ref is None):
raise Exception(f'Unable to load model_hdl to the device {self.model_ref}')
return self.model_ref
else:
self.model_ref = self.host_helper.load(device,
model_path, 0)
if (self.model_ref is None):
raise Exception(f'Unable to load model_hdl to the device {self.model_ref}')
return self.model_ref
# def __update__(self, in_data: Union[np.ndarray, bytes]):
# if in_data is None:
# raise Exception('Input numpy array cannot be None')
# self.host_helper.memcpy(in_data, 0)
# def __get_tensor__(self, out_data: Union[np.ndarray, bytes]):
# if out_data is None:
# raise Exception('Input numpy array cannot be None')
# self.host_helper.memcpy_tonp(out_data, 0)
[docs]
def unload_model(self, model):
if (self.model_ref != model):
raise Exception("Model refernce mismatch")
ret = self.host_helper.unload(model)
return error_code_to_infer_state[str(ret.value)]
[docs]
def disconnect(self,device):
if (device != self.dev_ptr):
raise Exception("Device handle mismatch")
ret = self.host_helper.close(device)
return error_code_to_infer_state[str(ret.value)]
[docs]
def run(mpk_package="project.mpk"):
soc_devices = simaaihostpy_implementation.HostHelper().enumerate_device()
for soc_device in soc_devices:
devintf = intf(Constants.DEVICE_NAME, Constants.DEVICE_QUEUE, Constants.DEVICE_TIMEOUT)
in_out_sz = int(Constants.BUFFER_SIZE * 0.25)
model_hdl_org = {
"in_tensors": 1,
"out_tensors": 1,
"out_batch_sz": 1,
"in_batch_sz": 1,
"in_shape": [in_out_sz],
"out_shape": [in_out_sz],
}
print(f"Connecting to PCIe device: {soc_device}")
device = devintf.connect(soc_device)
print(f"Connected to PCIe device: {soc_device}")
# Populate Model
model_hdl = {}
model_hdl['numInputTensors'] = 1
model_hdl['numOutputTensors'] = 1
model_hdl['outputBatchSize'] = 1
model_hdl['inputBatchSize'] = 1
model_hdl['inputShape'] = [[640, 480, 3]]
model_hdl['outputShape'] = [[1, 1, 9200], [1, 1, 400]]
model_hdl['qid'] = 0 # each queue id is for one parallelly running pipeline
model_hdl_org.update(model_hdl)
# Deploy model
print(f"Deploying {mpk_package} (resnet pipeline) to PCIe device: {soc_device}")
try:
model = devintf.load_model(device,[[int(in_out_sz)]],[[int(in_out_sz)]],['A', 'B'], mpk_package, model_hdl=model_hdl_org)
sleep(Constants.SOC_WAIT_TIME)
except Exception as e:
print("Initialize soc for %s failed with ERROR: ' %s' ", soc_device, str(e))
sys.exit(0)
print(f"Successfully deployed (loaded) the pipeline.")
# Sleep for 5 seconds
time.sleep(5)
# Unload model
print(f"Killing (unloading) the pipeline...")
ret = devintf.unload_model(model)
if ret != 0:
print("Failed to unload model")
print(f"Killed (unloaded) the pipeline.")
# Disconnect the device
print(f"Disconnecting the PCIe device: {soc_device}")
ret = devintf.disconnect(device)
if ret != 0:
print(f"Failed to disconnect PCIe slot: {soc_device}")
print(f"Disconnected the PCIe device: {soc_device}")
if __name__ == "__main__":
[docs]
parser = argparse.ArgumentParser(
prog='test_simahostpy.py',
description='Example python script to test connection, deployment of a pipeline, killing the pipeline and disconnection from the PCIe device.',
epilog='')
parser.add_argument('-f', help='Path to the mpk file to test with.')
args = parser.parse_args()
if not os.path.isfile(args.f):
print(f"File not found, please check and try again.")
sys.exit(0)
print(f"Testing with user provided project.mpk file...")
run(args.f)
print(f"Completed Testing.")