#########################################################
# Copyright (C) 2024 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
"""
@package sima.host
SiMa.Ai's host python interfaces for working with PCIe capable devices
"""
import numpy as np
import sys
from typing import Union
from time import time
from enum import Enum
from sima.impl.implementation_singleton import simaaihostpy_implementation as _simaaihostpy
[docs]
class BaseEnum():
@classmethod
[docs]
def values(cls):
return [member.value for member in cls]
[docs]
class SiMaErrorCode(BaseEnum):
[docs]
error_code_to_infer_state = {
"0": SiMaErrorCode.SUCCESS,
"1": SiMaErrorCode.FAILURE,
"2": SiMaErrorCode.INVALID_INPUT,
"3": SiMaErrorCode.OVERFLOW,
"4": SiMaErrorCode.RETRY,
}
[docs]
class intf:
def __init__(self, dev_name, device_queue, device_timeout):
[docs]
self.dev_name = dev_name
[docs]
self.device_queue = device_queue
[docs]
self.device_timeout = device_timeout
[docs]
self.host_helper = None
self.host_helper = _simaaihostpy.HostHelper()
if (self.host_helper == None):
raise Exception("Couldn't get the device inference interface")
self.guids = self.host_helper.enumerate_device()
if (len(self.guids) <= 0):
return
[docs]
def connect(self, guid, queue_entries = 0, queue_depth = 0):
if guid is None:
raise ValueError("Guid cannot be NULL, please pass a valid guid")
if self.host_helper is None:
raise Exception(" The inference interface is not initialized")
self.dev_ptr = self.host_helper.open(guid)
if self.dev_ptr is None:
raise Exception("Unable to connect to the device")
self.host_helper.print_slot_number(self.dev_ptr)
self.host_helper.update_device_defaults(self.dev_ptr,
queue_entries,
queue_depth)
return self.dev_ptr
[docs]
def load_model(self, device,
in_shape_list, out_shape_list,
metadata,
model_path = None, model_hdl:dict = None):
if (device != self.dev_ptr):
raise Exception("Device mismatch")
if ((in_shape_list is None) or (out_shape_list is None)):
raise ValueError('Shapes of in and out tensors cannot be None')
self.host_helper.prepare_tensors(in_shape_list,
out_shape_list)
self.host_helper.set_metadata(metadata)
self.meta_data = metadata
self.host_helper.set_queue(device, self.device_queue)
self.host_helper.set_request_timeout(device, self.device_timeout)
if ((model_hdl is not None) and (model_path is not None)):
model_def = self.host_helper.set_model_definition(
model_hdl["in_tensors"],
model_hdl["out_tensors"],
model_hdl["in_batch_sz"],
model_hdl["out_batch_sz"],
model_hdl["in_shape"],
model_hdl["out_shape"],
)
self.model_ref = self.host_helper.load(device,
model_path,
model_def)
if (self.model_ref is None):
raise Exception(f'Unable to load model_hdl to the device {self.model_ref}')
return self.model_ref
if (model_hdl is not None):
model_def = self.host_helper.set_model_definition(
model_hdl["in_tensors"],
model_hdl["out_tensors"],
model_hdl["in_batch_sz"],
model_hdl["out_batch_sz"],
model_hdl["in_shape"],
model_hdl["out_shape"],
)
self.model_ref = self.host_helper.load(device,
model_def)
else:
self.model_ref = self.host_helper.load(device,
model_path)
if (self.model_ref is None):
raise Exception(f'Unable to load model_hdl to the device {self.model_ref}')
return self.model_ref
def __update__(self, in_data: Union[np.ndarray, bytes]):
if in_data is None:
raise Exception('Input numpy array cannot be None')
self.host_helper.memcpy(in_data, 0)
def __get_tensor__(self, out_data: Union[np.ndarray, bytes]):
if out_data is None:
raise Exception('Input numpy array cannot be None')
self.host_helper.memcpy_tonp(out_data, 0)
# Supports only sychronous execution.
[docs]
def run_inference(self, model,
in_data: Union[np.ndarray, bytes],
do_async=False,
callback_f=None):
if (model != self.model_ref):
raise Exception("Model reference mismatch")
# Update Input tensor
self.host_helper.memcpy(in_data, 0)
if (do_async == False):
ret = self.host_helper.run_inference(model)
else:
ret = self.host_helper.run_inference(model,
callback_f)
return error_code_to_infer_state[str(ret.value)]
[docs]
def unload_model(self, model):
if (self.model_ref != model):
raise Exception("Model refernce mismatch")
ret = self.host_helper.unload(model)
return error_code_to_infer_state[str(ret.value)]
[docs]
def disconnect(self,device):
if (device != self.dev_ptr):
raise Exception("Device handle mismatch")
ret = self.host_helper.close(device)
return error_code_to_infer_state[str(ret.value)]
[docs]
def device_disconnect(self, device):
if (device != self.dev_ptr):
raise Exception("Device handle mismatch")
ret = self.host_helper.disconnect_device(device)
return error_code_to_infer_state[str(ret.value)]
[docs]
def reset(self, device):
ret = self.host_helper.reset(device)
return error_code_to_infer_state[str(ret.value)]
[docs]
def reset_queue(self, device):
ret = self.host_helper.reset_queue(device)
return error_code_to_infer_state[str(ret.value)]
[docs]
def open_device(self, device):
self.host_helper.open(device)
def __get_guids__(self):
return self.guids
[docs]
def print_guids(self):
for i in self.guids:
print(f"GUID: {i}")