#########################################################
# Copyright (C) 2022 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Nenad Nikolic
#########################################################
import copy
import dataclasses
import itertools
import os
from typing import Tuple, Dict, TypeVar, Callable, Optional, List, Union
from enum import Enum
from dataclasses import dataclass
import os
from sima_utils.logging import sima_logger
from afe._tvm._defines import TVMIRModule
from afe.ir.defines import NodeName, InputShape
from afe.ir.tensor_type import ScalarType
from afe.load.importers.tensorflow import import_tensorflow_pb_to_tvm, import_tensorflow2_pb_to_tvm
from afe.load.importers.tflite import import_tflite_to_tvm
from afe.load.importers.pytorch import import_pytorch_to_tvm
from afe.load.importers.onnx import import_onnx_to_tvm
from afe.load.importers.keras import import_keras_to_tvm
from afe.load.importers.caffe import import_caffe_to_tvm
from afe.load.importers.caffe2 import import_caffe2_to_tvm
_A = TypeVar('_A')
[docs]
TensorShape = Tuple[int, ...]
[docs]
NodeConversionMapping = Dict[str, Callable]
# Type of layout for a model is catogorized by channel position.
# CHANNEL-FIRST layout: 'NCHW' for 4D or 'NCDHW' for 5D.
# CHANNEL-LAST layout: 'NHWC' for 4D or 'NDHWC' for 5D.
_DEFAULT_LAYOUT_TABLE: Dict[ModelFormat, str] = {
ModelFormat.tensorflow: 'CHANNEL-LAST',
ModelFormat.tflite: 'CHANNEL-LAST',
ModelFormat.keras: 'CHANNEL-FIRST',
ModelFormat.pytorch: 'CHANNEL-FIRST',
ModelFormat.onnx: 'CHANNEL-FIRST',
ModelFormat.caffe2: 'CHANNEL-FIRST',
ModelFormat.caffe: 'CHANNEL-FIRST'
}
[docs]
def make_model_name_from_path(path: str) -> str:
"""
Create a model name from the name of the file the model was loaded from.
This is used in functions that do not take the model name as user input.
"""
return os.path.splitext(os.path.basename(path))[0]
[docs]
def default_layout(model_format: ModelFormat, ndim: int = 4) -> str:
"""
Get the default layout for a model format. This is the data layout that
the format uses by convention. Returns either 4D or 5D layout.
"""
assert ndim <= 5
channel_format = _DEFAULT_LAYOUT_TABLE[model_format]
layout = ['N', 'H', 'W'] if ndim <= 4 else ['N', 'D', 'H', 'W']
if channel_format == 'CHANNEL-FIRST':
layout.insert(1, 'C')
else:
layout.append('C')
return ''.join(layout)
def _reconstruct_dict(names: Optional[List[str]], values: Union[None, InputMapping[_A], List[_A]],
names_name: str, values_name: str) -> InputMapping[_A]:
"""
Convert values to a dict. If it is a list, create a dict using names as the dict's keys.
This is a helper function for retrieving data from ImporterParams. The parameters match the
types of ImporterParams's fields.
"""
assert values is not None, values_name + " cannot be None, make sure they are initialized"
if isinstance(values, list):
assert names is not None, names_name + " cannot be None, make sure they are initialized"
if len(names) != len(values):
raise ValueError(f"{names_name} and {values_name} must be the same length, {len(names)} and "
f"{len(values)}")
return {NodeName(name): shape for name, shape in zip(names, values)}
else:
assert isinstance(values, dict)
return values
@dataclass(frozen=True)
[docs]
class ImporterParams:
[docs]
output_names: Optional[List[str]] = None
[docs]
layout: Optional[str] = None
[docs]
custom_convert_map: Optional[NodeConversionMapping] = None
@property
[docs]
def shape_dict(self) -> Dict[NodeName, InputShape]:
return _reconstruct_dict(self.input_names, self.input_shapes, "input_names", "input_shapes")
@property
[docs]
def dtype_dict(self) -> Dict[NodeName, ScalarType]:
return _reconstruct_dict(self.input_names, self.input_types, "input_names", "input_types")
[docs]
def import_from_import_params(import_params: ImporterParams) -> Tuple[TVMIRModule, Optional[List[str]]]:
"""
Import a module into TVM.
:param import_params: Parameters specifying what to import
:return: The imported module and its outputs' names.
The output names are only returned if the source model format has output names.
"""
from afe.tvm_converter.custom_convert_maps import CUSTOM_CONVERT_MAP_DICT
if import_params.input_names is not None:
if not len(import_params.input_names):
raise sima_logger.UserFacingException("List of input tensor names is empty")
if any(name == "" for name in import_params.input_names):
raise sima_logger.UserFacingException("Input tensor name cannot be an empty string")
if import_params.output_names is not None:
if not len(import_params.output_names):
raise sima_logger.UserFacingException("List of output tensor names is empty")
if any(name == "" for name in import_params.output_names):
raise sima_logger.UserFacingException("Output tensor name cannot be an empty string")
if import_params.format == ModelFormat.tensorflow:
assert import_params.output_names is not None, "output_names is not specified in import params"
assert import_params.layout is not None, "layout is not specified in import params"
return import_tensorflow_pb_to_tvm(import_params.file_paths[0], import_params.shape_dict,
import_params.output_names, import_params.layout,
import_params.custom_convert_map), import_params.output_names
elif import_params.format == ModelFormat.tensorflow2:
assert import_params.output_names is not None, "output_names is not specified in import params"
assert import_params.layout is not None, "layout is not specified in import params"
return import_tensorflow2_pb_to_tvm(import_params.file_paths[0], import_params.shape_dict,
import_params.output_names, import_params.layout,
import_params.custom_convert_map), import_params.output_names
elif import_params.format == ModelFormat.tflite:
return (import_tflite_to_tvm(import_params.file_paths[0], import_params.shape_dict, import_params.dtype_dict),
None)
elif import_params.format == ModelFormat.keras:
assert import_params.layout is not None, "layout is not specified in import params"
return (import_keras_to_tvm(import_params.file_paths[0], import_params.shape_dict, import_params.layout),
None)
elif import_params.format == ModelFormat.pytorch:
assert import_params.input_names is not None, "input_names is not specified in import params"
assert import_params.input_shapes is not None, "input_shapes is not specified in import params"
return (import_pytorch_to_tvm(import_params.file_paths[0], import_params.input_names, import_params.input_shapes,
None, import_params.custom_convert_map),
None)
elif import_params.format == ModelFormat.onnx:
# Load ModelSDK's built-in ONNX custom convert map. Copy the built-in custom convert
# map and the user provided custom convert map to avoid polluting it.
built_in_convert_map = CUSTOM_CONVERT_MAP_DICT["ONNX"]
if import_params.custom_convert_map:
# Error if there is both a user conversion and built-in conversion, as we can't combine them.
shared_keys = set(built_in_convert_map.keys()).intersection(import_params.custom_convert_map.keys())
assert not shared_keys, "Cannot override importer for operators " + " ".join(shared_keys)
custom_convert_map = dict(itertools.chain(import_params.custom_convert_map.items(),
built_in_convert_map.items()))
else:
custom_convert_map = copy.deepcopy(built_in_convert_map)
return import_onnx_to_tvm(import_params.file_paths[0], import_params.shape_dict, import_params.dtype_dict,
custom_convert_map)
elif import_params.format == ModelFormat.caffe:
assert len(import_params.file_paths) == 2, "Caffe model should have 2 paths, .caffemodel and .prototxt file"
return (import_caffe_to_tvm(import_params.file_paths[0], import_params.file_paths[1], import_params.shape_dict,
import_params.dtype_dict), None)
elif import_params.format == ModelFormat.caffe2:
assert len(import_params.file_paths) == 2, "Caffe2 model should have 2 paths, both in .pb format"
return (import_caffe2_to_tvm(import_params.file_paths[0], import_params.file_paths[1], import_params.shape_dict,
import_params.dtype_dict), None)
else:
raise ValueError(f"Model format {import_params.format} is wrong, probably due to wrong format detection.")
[docs]
def import_model_to_tvm(format: Optional[ModelFormat] = None, file_paths: List[str] = [],
input_names: Optional[List[str]] = None, output_names: Optional[List[str]] = None,
input_types: Union[None, InputMapping[ScalarType], List[ScalarType]] = None,
input_shapes: Union[None, InputMapping[TensorShape], List[TensorShape]] = None,
layout: Optional[str] = None,
custom_convert_map: Optional[NodeConversionMapping] = None):
"""
General-purpose import function that wraps the other importers and is permissive in what inputs it takes.
"""
assert len(file_paths) > 0, "File path(s) must be specified"
if format is None:
format = detect_format(file_paths)
else:
try:
format = ModelFormat[format]
except Exception as e:
print(e)
print("Make sure that format is in supported ModelFormat values")
import_params = ImporterParams(format, file_paths, input_names, output_names, input_types, input_shapes,
layout, custom_convert_map)
return import_from_import_params(import_params)
[docs]
def onnx_source(model_path: str, shape_dict: Dict[str, Tuple[int, ...]],
dtype_dict: Dict[str, ScalarType]) -> ImporterParams:
"""
Construct ImporterParams with required values for importing an ONNX model.
:param model_path: Path to the model file.
:param shape_dict: Dictionary with names and shapes of the model's input tensors.
:param dtype_dict: Dictionary with names and scalar numeric types of the model's input tensors.
:return: ImporterParams.
"""
assert shape_dict.keys() == dtype_dict.keys()
input_names = list()
input_shape = list()
input_type = list()
for k, v in shape_dict.items():
input_names.append(k)
input_shape.append(v)
for v in dtype_dict.values():
input_type.append(v)
# We don't support mixed dimensions, so all inputs must have same dimension.
assert all(len(x) == len(input_shape[0]) for x in input_shape)
ndim = len(input_shape[0])
layout = default_layout(ModelFormat.onnx, ndim)
ip = ImporterParams(format=ModelFormat.onnx, file_paths=[model_path], input_names=input_names,
input_shapes=input_shape, input_types=input_type, layout=layout)
return ip
[docs]
def pytorch_source(model_path: str, input_names: List[str], input_shapes: List[Tuple[int, ...]]) \
-> ImporterParams:
"""
Construct ImporterParams with required values for importing a PyTorch model.
:param model_path: Path to the model file.
:param input_names: Names of the model's input tensors.
:param input_shapes: Shapes of the model's input tensors.
:return: ImporterParams.
"""
# We don't support mixed dimensions, so all inputs must have same dimension.
assert all(len(x) == len(input_shapes[0]) for x in input_shapes)
ndim = len(input_shapes[0])
layout = default_layout(ModelFormat.pytorch, ndim)
ip = ImporterParams(format=ModelFormat.pytorch, file_paths=[model_path], input_names=input_names,
input_shapes=input_shapes, layout=layout)
return ip
[docs]
def tensorflow_source(model_path: str, shape_dict: Dict[str, Tuple[int, ...]],
output_names: List[str]) -> ImporterParams:
"""
Construct ImporterParams with required values for importing a Tensorflow model.
:param model_path: Path to the model file.
:param shape_dict: Dictionary with names and shapes of the model's input tensors.
:param output_names: Names of the model's output tensors.
:return: ImporterParams.
"""
input_names = list()
input_shape = list()
for k, v in shape_dict.items():
input_names.append(k)
input_shape.append(v)
# We don't support mixed dimensions, so all inputs must have same dimension.
assert all(len(x) == len(input_shape[0]) for x in input_shape)
ndim = len(input_shape[0])
layout = default_layout(ModelFormat.tensorflow, ndim)
ip = ImporterParams(format=ModelFormat.tensorflow, file_paths=[model_path], input_names=input_names,
input_shapes=input_shape, output_names=output_names, layout=layout)
return ip
[docs]
def tensorflow2_source(model_path: str, shape_dict: Dict[str, Tuple[int, ...]],
output_names: List[str]) -> ImporterParams:
"""
Construct ImporterParams with required values for importing a Tensorflow model.
:param model_path: Path to the model file.
:param shape_dict: Dictionary with names and shapes of the model's input tensors.
:param output_names: Names of the model's output tensors.
:return: ImporterParams.
"""
input_names = list()
input_shape = list()
for k, v in shape_dict.items():
input_names.append(k)
input_shape.append(v)
# We don't support mixed dimensions, so all inputs must have same dimension.
assert all(len(x) == len(input_shape[0]) for x in input_shape)
ndim = len(input_shape[0])
layout = default_layout(ModelFormat.tensorflow, ndim)
ip = ImporterParams(format=ModelFormat.tensorflow2, file_paths=[model_path], input_names=input_names,
input_shapes=input_shape, output_names=output_names, layout=layout)
return ip
[docs]
def tflite_source(model_path: str, shape_dict: Dict[str, Tuple[int, ...]],
dtype_dict: Dict[str, ScalarType]) -> ImporterParams:
"""
Construct ImporterParams with required values for importing a TFLite model.
:param model_path: Path to the model file.
:param shape_dict: Dictionary with names and shapes of the model's input tensors.
:param dtype_dict: Dictionary with names and scalar numeric types of the model's input tensors.
:return: ImporterParams.
"""
assert shape_dict.keys() == dtype_dict.keys()
input_names = list()
input_shape = list()
input_types = list()
for k, v in shape_dict.items():
input_names.append(k)
input_shape.append(v)
for v in dtype_dict.values():
input_types.append(v)
# We don't support mixed dimensions, so all inputs must have same dimension.
assert all(len(x) == len(input_shape[0]) for x in input_shape)
ndim = len(input_shape[0])
layout = default_layout(ModelFormat.tflite, ndim)
ip = ImporterParams(format=ModelFormat.tflite, file_paths=[model_path], input_names=input_names,
input_shapes=input_shape, input_types=input_types, layout=layout)
return ip
[docs]
def keras_source(model_path: str, shape_dict: Dict[str, Tuple[int, ...]], layout: str) -> ImporterParams:
"""
Construct ImporterParams with required values for importing a Keras model.
:param model_path: Path to the model file.
:param shape_dict: Dictionary with names and shapes of the model's input tensors.
:param layout: Keras can be in both channel-first and channel-last layout.
:return: ImporterParams.
"""
input_names = list()
input_shape = list()
for k, v in shape_dict.items():
input_names.append(k)
input_shape.append(v)
# We don't support mixed dimensions, so all inputs must have same dimension as specified by the layout.
assert all(len(x) == len(layout) for x in input_shape)
ip = ImporterParams(format=ModelFormat.keras, file_paths=[model_path], input_names=input_names,
input_shapes=input_shape, layout=layout)
return ip
[docs]
def caffe_source(prototxt_file_path: str, caffemodel_file_path: str,
shape_dict: Dict[str, Tuple[int, ...]], dtype_dict: Dict[str, ScalarType]) -> ImporterParams:
"""
Construct ImporterParams with required values for importing a Caffe model.
:param prototxt_file_path: Prototxt file path.
:param caffemodel_file_path: Caffe model file path.
:param shape_dict: Dictionary with names and shapes of the model's input tensors.
:param dtype_dict: Dictionary with names and scalar numeric types of the model's input tensors.
:return: ImporterParams.
"""
assert shape_dict.keys() == dtype_dict.keys()
input_names = list()
input_shape = list()
input_types = list()
for k, v in shape_dict.items():
input_names.append(k)
input_shape.append(v)
for v in dtype_dict.values():
input_types.append(v)
# We don't support mixed dimensions, so all inputs must have same dimension.
assert all(len(x) == len(input_shape[0]) for x in input_shape)
ndim = len(input_shape[0])
file_paths = [prototxt_file_path, caffemodel_file_path]
layout = default_layout(ModelFormat.caffe, ndim)
ip = ImporterParams(format=ModelFormat.caffe, file_paths=file_paths, input_names=input_names,
input_shapes=input_shape, input_types=input_types, layout=layout)
return ip
[docs]
def caffe2_source(init_net_file_path: str, predict_net_file_path: str,
shape_dict: Dict[str, Tuple[int, ...]], dtype_dict: Dict[str, ScalarType]) -> ImporterParams:
"""
Construct ImporterParams with required values for importing a Caffe2 model.
:param init_net_file_path:
:param predict_net_file_path:
:param shape_dict: Dictionary with names and shapes of the model's input tensors.
:param dtype_dict: Dictionary with names and scalar numeric types of the model's input tensors.
:return: ImporterParams.
"""
assert shape_dict.keys() == dtype_dict.keys()
input_names = list()
input_shape = list()
input_types = list()
for k, v in shape_dict.items():
input_names.append(k)
input_shape.append(v)
for v in dtype_dict.values():
input_types.append(v)
# We don't support mixed dimensions, so all inputs must have same dimension.
assert all(len(x) == len(input_shape[0]) for x in input_shape)
ndim = len(input_shape[0])
file_paths = [init_net_file_path, predict_net_file_path]
layout = default_layout(ModelFormat.caffe2, ndim)
ip = ImporterParams(format=ModelFormat.caffe2, file_paths=file_paths, input_names=input_names,
input_shapes=input_shape, input_types=input_types, layout=layout)
return ip