#########################################################
# Copyright (C) 2022 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Ljubomir Papuga
#########################################################
import copy
import itertools
import logging
import os.path
import tempfile
import pickle
import numpy as np
import os
from typing import List, Optional, Dict, Iterable, Tuple, Any
import re
from mppe.mppe import MPPE
from sima_utils.logging import sima_logger
from sima_utils.common import Platform, print_progressbar
from afe import save_awesomenet as _save_awesomenet
from afe import load_awesomenet as _load_awesomenet
from afe.apis._sanitize_errors import sanitize_exceptions as _sanitize_exceptions
from afe.apis._sanitize_errors import sanitize_afe_error as _sanitize_afe_error
from afe.apis.compilation_job_base import GroundTruth
from afe.apis.defines import InputValues, gen1_target, ExceptionFuncType
from afe.apis.release_v1 import create_auxiliary_processing_network, compose_awesomenets
from afe.apis.statistic import Statistic
from afe.apis.transform import Transform
from afe.backends.mpk.interface import generate_mpk_json_file as _generate_mpk_json_file
from afe.backends.mpk.interface import L2CachingMode, TessellateParameters
from afe.core.graph_analyzer.analyzed_results import AnalyzedResultDict
from afe.core.graph_analyzer.graph_analyzer import QuantizedGraphAnalyzer
from afe.core.graph_analyzer.utils import QuantizedGraphAnalyzerMode, Metric
from afe.core.compile_networks import compile_net_to_elf as _compile_net_to_elf
from afe.core.utils import LengthHintedIterable
from afe.driver.passes import evaluation as _evaluation
from afe.ir import attributes as afe_attrs
from afe.ir.defines import Status, InputName, LayerStats, NodeName
from afe.ir.execute import create_node_quant_executor as _create_node_quant_executor
from afe.ir.net import AwesomeNet
from afe.ir.node import node_is_awesomenet, node_is_subgraph, node_is_tuple, node_is_tuple_get_item, node_is_external, \
node_is_ev
from afe.ir.sima_ir import SiMaIR
from afe.ir.tensor_type import TensorType
from afe.devkit.devkit_context_manager import SetupConnection
import devkit_inference_models.utils.connection_params as cp
def _get_data_sparsity(net: AwesomeNet) -> float:
"""
Get data sparsity.
Data sparsity can be approximated by the ratio of the number of zero weights
over the number of non-zero weights in the model.
Used for model performance evaluation in get_performance_metrics.
:param net: AwesomeNet.
:return: Data sparsity.
"""
num_of_elements = 0
num_of_non_zero_elements = 0
for n in net.execution_order:
node = net.nodes[n]
if node_is_awesomenet(node):
assert node_is_subgraph(node)
mla_subgraph = node.ir
for n1 in mla_subgraph.execution_order:
mla_node = mla_subgraph.nodes[n1]
# MLA nodes must be quantized.
assert isinstance(mla_node.ir, SiMaIR) and mla_node.ir.quant_attrs is not None
quant_attrs = mla_node.ir.quant_attrs
if isinstance(quant_attrs, afe_attrs.ConvQuantAttrs):
quant_data = quant_attrs.weight_quant_data
num_of_elements += quant_data.size
num_of_non_zero_elements += np.count_nonzero(quant_data)
# num_of_elements may be 0 in unit tests (no weight-based operators)
data_sparsity = (num_of_elements - num_of_non_zero_elements) / num_of_elements if num_of_elements > 0 else 0
return data_sparsity
def _get_error_per_layer(fp32_net: AwesomeNet, quant_net: AwesomeNet,
inputs: Iterable[InputValues],
metric: Metric, local_feed: bool) -> [List[AnalyzedResultDict]]:
"""
For given fp32 and quantized network finds the quantization error per layer.
:param fp32_net: Reference fp32 AwesomeNet.
:param quant_net: Quantized net.
:param inputs: Inputs that will be used to do inference, on both fp32 and quantized network.
:param metric: Metric that will be used for calculating error.
:param local_feed: If True QuantizedGraphAnalyzer will use local_feed mode, otherwise global_feed.
:return: Dictionary with results for each layer.
"""
mode = QuantizedGraphAnalyzerMode.local_feed if local_feed else QuantizedGraphAnalyzerMode.global_feed
graph_analyzer = QuantizedGraphAnalyzer(mode=mode)
graph_analyzer.analyze(fp32_net, quant_net, inputs, metric)
results = graph_analyzer.analyzed_results
return results
def _update_node_layer_stats(net: AwesomeNet, analysis_results: Dict) -> None:
"""
Update MLA nodes with layer statistics.
:param net: AwesomeNet.
:param analysis_results: Dictionary with error values for each layer.
"""
metric = list(analysis_results.keys())
error_values = list(analysis_results.values())
for n in net.execution_order:
node = net.nodes[n]
if node_is_awesomenet(node):
mla_subgraph: AwesomeNet = node.ir
_update_node_layer_stats(mla_subgraph, analysis_results)
else:
assert len(error_values) == 1, f'Expected len == 1, got {len(error_values)}'
assert len(metric) == 1, f'Expected len == 1, got {len(metric)}'
if node.name in error_values[0]:
# Skip tuple nodes, as error values for those nodes are copied from their input nodes
# Skip EV/APU nodes, error values are only relevant for MLA nodes
if node_is_tuple(node) or node_is_tuple_get_item(node) or node_is_external(node) or node_is_ev(node):
continue
else:
err_values = error_values[0][node.name]
err_value = sum(err_values) / len(error_values)
layer_stats = LayerStats(metric=metric[0], error_value=err_value)
node.update_layer_stats(layer_stats)
def _find_elf_file(root_path: str, model_name: str) -> str:
"""
Find if .elf file is in the specified path.
This function is used to find .elf file path if it exist and to
check if the model compiles into a single .elf file.
If the .elf file does not exist this function returns empty string
and False as a flag that there is no .elf file on the specified
location.
:param root_path: Path of the directory.
:param model_name: Name of the model that is compiled into .elf file.
:return: .elf file path and flag if the .elf file is valid or not.
"""
dirs = os.listdir(root_path)
elf_files = [os.path.join(root_path, file) for file in dirs if re.search(f"^{model_name}.*.elf$", file)]
so_files = [file for file in dirs if re.search(f"^{model_name}.*.so$", file)]
# There must be only one .elf file.
if len(elf_files) != 1 or len(so_files) != 0:
raise sima_logger.UserFacingException(
f"Expected only one .elf file for accelerator mode run, got {len(elf_files)} " +
f".elf and {len(so_files)} .so files"
)
if not os.path.isfile(elf_files[0]):
raise sima_logger.UserFacingException("No .elf file found after SDK compile.")
return elf_files[0]
def _save_intermediate_results_as_pkl_file(data: dict[NodeName, Any], filename: str) -> None:
"""
Helper function to dump data to file using pickle package. If data type is bfloat16, it is casted to float32.
Args:
data: Dictionary of data to be dumped.
filename: Name of a file in which data would be saved.
"""
from ml_kernels.types import bfloat16
def _convert_bfloat16_to_float32(data: tuple[np.ndarray, ...] | np.ndarray) -> tuple[np.ndarray, ...] | np.ndarray:
if isinstance(data, tuple):
return tuple(_convert_bfloat16_to_float32(d) for d in data)
return data.astype("float32") if data.dtype == bfloat16 else data
dirname = os.path.dirname(filename)
os.makedirs(dirname, exist_ok=True)
# Since pickle does not support bfloat16 type, cast those data to float32. Since node output may be tuple, this
# call must be recursive
saved_data = {name: _convert_bfloat16_to_float32(value) for name, value in data.items()}
with open(filename, "wb") as fp:
pickle.dump(saved_data, fp)
[docs]
class Model:
_net: AwesomeNet
_fp32_net: Optional[AwesomeNet]
def __init__(self, net: AwesomeNet, fp32_net: Optional[AwesomeNet] = None):
if net.status != Status.SIMA_QUANTIZED and net.has_mla_nodes():
error_message = "Model is not quantized, cannot create Model."
sima_logger.sima_log_error(error_message)
raise sima_logger.UserFacingException(error_message)
self._net = net
self._fp32_net = fp32_net
@_sanitize_exceptions(ExceptionFuncType.MODEL_EXECUTE)
[docs]
def execute(self, inputs: InputValues, *,
fast_mode: bool = False,
log_level: int | None = logging.NOTSET,
keep_layer_outputs: list[NodeName] | str | None = None,
output_file_path: str | None = None) -> List[np.ndarray]:
""" Run input data through the quantized model.
Args:
inputs: Dictionary of placeholder node names (str) to the input data.
fast_mode: If True, use a fast implementation of operators. If False, use an implementation that exactly
matches execution on the MLA.
log_level: Logging level.
keep_layer_outputs: List of quantized model layer output names that should be saved. Each element of a
list must have a valid name, according to the model layer outputs. Iff 'all', all intermediate results
are saved.
output_file_path: Location where the layer outputs should be saved. If defined, keep_layer_outputs
argument must also be provided by the user.
Returns: Outputs of quantized model.
Also, saves the requested intermediate results inside output_file_path location.
"""
with (sima_logger.ScopedLogLevel(log_level)):
sima_logger.sima_log_info("Executing quantized net with input data\n\t" +
f"\n\t".join([f"{name}: {input.shape}" for name, input in inputs.items()]))
# Check that keep_layer_outputs has one of the valid types
assert keep_layer_outputs is None or isinstance(keep_layer_outputs, list) or \
(isinstance(keep_layer_outputs, str) and keep_layer_outputs == "all"), \
"Provided value for keep_layer_outputs is not valid"
# Check if keep_layer_outputs and output_file_path are both defined/undefined
assert (output_file_path is None) == (keep_layer_outputs is None), \
"Both keep_layer_outputs and output_file_path must be defined/undefined"
try:
# If keep_layer_outputs is provided, intermediate results must be kept
keep_intermediate_results = keep_layer_outputs is not None
# All intermediate results are stored iff keep_layer_outputs is equal to 'all'
if keep_layer_outputs == "all":
keep_layer_outputs = list(node.name for node in self._net.iter_nodes_recursive())
# Initialize dictionary in which the intermediate results will be saved. Dictionary is initialized with
# key: None pairs, where keys are the layer output names provided inside keep_all_outputs list argument.
# Otherwise, create an empty dictionary.
node_outputs = dict.fromkeys(keep_layer_outputs, None) if isinstance(keep_layer_outputs, list) \
else None
# Execute model
output = self._net.run(inputs, node_callable=_create_node_quant_executor(fast_mode),
node_outputs=node_outputs, keep_intermediate_results=keep_intermediate_results)
# Dump all intermediate results to provided output_file_path location.
if keep_intermediate_results:
_save_intermediate_results_as_pkl_file(node_outputs, output_file_path)
except Exception as e:
_sanitize_afe_error("Error in model execution.", e)
return output
@_sanitize_exceptions(ExceptionFuncType.MODEL_SAVE)
[docs]
def save(self, model_name: str, output_directory: str = '',
*, log_level: Optional[int] = logging.NOTSET) -> None:
"""
Save the quantized model and its floating-point counterpart (if available) to the specified directory.
Defaults to the current working directory if no output directory is provided.
Args:
model_name (str): Name for the saved quantized model file with a `.sima` extension.
output_directory (str, optional): Directory to save the model files. Defaults to the current working directory.
log_level (Optional[int], optional): Logging level for the operation. Defaults to `logging.NOTSET`.
Raises:
UserFacingException: If an error occurs during the save process.
Example
~~~~~~~~
>>> model = Model(quantized_net, fp32_net)
>>> model.save("my_model", output_directory="models/")
"""
with sima_logger.ScopedLogLevel(log_level):
if output_directory and not os.path.exists(output_directory):
os.makedirs(output_directory, exist_ok=True)
_save_awesomenet(self._net, model_name, output_directory)
if self._fp32_net is not None:
fp32_name = model_name.removesuffix(".sima") + ".fp32.sima"
_save_awesomenet(self._fp32_net, fp32_name, output_directory)
@staticmethod
@_sanitize_exceptions(ExceptionFuncType.MODEL_LOAD)
[docs]
def load(model_name: str, network_directory: str = '',
*, log_level: Optional[int] = logging.NOTSET) -> "Model":
with sima_logger.ScopedLogLevel(log_level):
try:
net = _load_awesomenet(model_name, network_directory)
fp32_name = model_name.removesuffix(".sima") + ".fp32.sima"
if os.path.exists(os.path.join(network_directory, fp32_name)):
fp32_net = _load_awesomenet(fp32_name, network_directory)
else:
fp32_net = None
except Exception as e:
_sanitize_afe_error("Loading error.", e)
return Model(net, fp32_net)
@_sanitize_exceptions(ExceptionFuncType.MODEL_COMPILE)
[docs]
def compile(self, output_path: str, batch_size: int = 1, compress: bool = True,
log_level: Optional[int] = logging.NOTSET,
tessellate_parameters: Optional[TessellateParameters] = None,
l2_caching_mode: L2CachingMode = L2CachingMode.NONE, **kwargs) -> None:
"""
Compile the quantized model into a `.tar.gz` package for deployment in an MPK package.
The compiled package includes the binary model and a JSON structure file, saved in `output_path` as `<model_name>_mpk.tar.gz`.
Batch size can be specified, though compiler optimizations may adjust it for optimal performance. The first dimension of input
tensors must represent batch size.
Args:
output_path (str): Directory to save the compiled package. Created if it doesn't exist.
batch_size (int, optional): Batch size for compilation. Defaults to `1`.
compress (bool, optional): Enable DRAM data compression for the `.lm` file. Defaults to `True`.
log_level (Optional[int], optional): Logging level. Defaults to `logging.NOTSET`.
tessellate_parameters (Optional[TessellateParameters], optional): Internal use for MLA tessellation parameters.
l2_caching_mode (L2CachingMode, optional): Internal use for N2A compiler's L2 caching. Defaults to `L2CachingMode.NONE`.
**kwargs: Additional internal options, including:
- retained_temporary_directory_name (str): Path to retain intermediate files.
- use_power_limits (bool): Enable power limits during compilation.
- max_mla_power (float): Set maximum MLA power consumption.
- layer_norm_use_fp32_intermediates (bool): Use FP32 intermediates for layer normalization.
- rms_norm_use_fp32_intermediates (bool): Use FP32 intermediates for RMS normalization.
Raises:
UserFacingException: If compilation fails due to invalid parameters or errors.
Example:
~~~~~~~~
>>> model = Model(quantized_net)
>>> model.compile(output_path="compiled_models/", batch_size=4, compress=True)
"""
with sima_logger.ScopedLogLevel(log_level):
mlc_files_path = kwargs.get('retained_temporary_directory_name')
use_power_limits = kwargs.get('use_power_limits', False)
max_mla_power = kwargs.get('max_mla_power')
layer_norm_use_fp32_intermediates = kwargs.get('layer_norm_use_fp32_intermediates', False)
rms_norm_use_fp32_intermediates = kwargs.get('rms_norm_use_fp32_intermediates', False)
if output_path and not os.path.exists(output_path):
os.makedirs(output_path, exist_ok=True)
sima_logger.sima_log_info(f"Compiling quantized net \"{self._net.name}\"")
_generate_mpk_json_file(net=self._net, file_path=output_path, batch_size=batch_size,
compress=compress, tessellate_parameters=tessellate_parameters,
enable_large_tensors=True,
l2_caching_mode=l2_caching_mode,
mlc_files_path=mlc_files_path,
use_power_limits=use_power_limits,
max_mla_power=max_mla_power,
layer_norm_use_fp32_intermediates=layer_norm_use_fp32_intermediates,
rms_norm_use_fp32_intermediates=rms_norm_use_fp32_intermediates)
@staticmethod
@_sanitize_exceptions(ExceptionFuncType.MODEL_CREATE_AUXILIARY)
[docs]
def create_auxiliary_network(transforms: List[Transform], input_types: Dict[InputName, TensorType], *,
target: Platform = gen1_target,
log_level: Optional[int] = logging.NOTSET) -> "Model":
with sima_logger.ScopedLogLevel(log_level):
if target != gen1_target:
error_message = f"Unsupported target: {target}"
sima_logger.sima_log_error(error_message)
raise sima_logger.UserFacingException(error_message)
sima_logger.sima_log_dbg(f"Creating auxiliary net with transforms\n\t" +
f"\n\t".join([f"{t.__str__}" for t in transforms]))
try:
aux_net = create_auxiliary_processing_network(transforms, input_types, status=Status.SIMA_QUANTIZED,
target=target)
aux_net_fp32 = create_auxiliary_processing_network(transforms, input_types, status=Status.RELAY,
target=target)
except Exception as e:
_sanitize_afe_error("Issue in creating auxiliary net.", e)
return Model(aux_net, aux_net_fp32)
@staticmethod
@_sanitize_exceptions(ExceptionFuncType.MODEL_COMPOSE)
[docs]
def compose(nets: List["Model"], combined_model_name: str = 'main',
log_level: Optional[int] = logging.NOTSET) -> "Model":
with sima_logger.ScopedLogLevel(log_level):
sima_logger.sima_log_info(f"Composing a new Model net by combining\n\t" +
f"\n\t".join([f"{model._net.name}" for model in nets]))
composed_net = compose_awesomenets([model_net._net for model_net in nets], status=Status.SIMA_QUANTIZED,
combined_model_name=combined_model_name)
composed_net_fp32 = compose_awesomenets([model_net._fp32_net for model_net in nets], status=Status.RELAY,
combined_model_name=combined_model_name)
return Model(composed_net, composed_net_fp32)
@_sanitize_exceptions(ExceptionFuncType.MODEL_EVALUATE)
[docs]
def evaluate(self, evaluation_data: Iterable[Tuple[InputValues, GroundTruth]],
criterion: Statistic[Tuple[List[np.ndarray], GroundTruth], str], *,
fast_mode: bool = False,
log_level: Optional[int] = logging.NOTSET) -> str:
"""
Evaluate the model using the provided evaluation data and criterion.
This method runs the model on the given dataset and computes an aggregate result
using the specified criterion. It supports a fast execution mode for quicker evaluations
and customizable logging levels for diagnostic purposes.
:param evaluation_data: An iterable of tuples where each tuple contains input values
and the corresponding ground truth for evaluation.
:param criterion: A statistical function used to compute the evaluation metric
based on the model's outputs and ground truth.
:param fast_mode: Optional; if set to True, the evaluation will execute in a faster
but potentially less thorough manner. Defaults to False.
:param log_level: Optional; specifies the logging level for evaluation. Defaults to logging.NOTSET.
:return: A string representing the final result of the evaluation based on the criterion.
:raises Exception: If an error occurs during model execution, it is sanitized and re-raised.
"""
with sima_logger.ScopedLogLevel(log_level):
ev = _evaluation(criterion, fast_mode=fast_mode)
try:
finish = ev(self._net, evaluation_data).run()
except Exception as e:
_sanitize_afe_error("Error in model execution.", e)
return finish
@_sanitize_exceptions(ExceptionFuncType.QUANTIZATION_ERROR_ANALYSIS)
[docs]
def analyze_quantization_error(self,
evaluation_data: Iterable[InputValues],
error_metric: Metric, *,
local_feed: bool,
log_level: Optional[int] = logging.NOTSET):
with sima_logger.ScopedLogLevel(log_level):
if self._fp32_net is None:
error_message = "Cannot analyze quantization error for prequantized model."
sima_logger.sima_log_error(error_message)
raise sima_logger.UserFacingException(error_message)
if error_metric not in (Metric.mse, Metric.mae, Metric.psnr):
error_message = (f'Unsupported metric: {error_metric}.'
f'Currently supported metrics are: "mse", "mae" and "psnr".')
sima_logger.sima_log_error(error_message)
raise sima_logger.UserFacingException(error_message)
analysis_results = _get_error_per_layer(self._fp32_net, self._net, evaluation_data, error_metric,
local_feed)
_update_node_layer_stats(self._net, analysis_results)
@_sanitize_exceptions(ExceptionFuncType.MODEL_PERFORMANCE)
@_sanitize_exceptions(ExceptionFuncType.GENERATE_ELF_FILES)
[docs]
def generate_elf_and_reference_files(self, input_data: Iterable[InputValues], output_dir: str,
*,
batch_size: int = 1, compress: bool = True,
tessellate_parameters: Optional[TessellateParameters] = None,
log_level: Optional[int] = logging.NOTSET,
l2_caching_mode: L2CachingMode = L2CachingMode.NONE) -> None:
from afe.backends.mla.afe_to_n2a_compiler.n2a_backend_runner import create_n2a_backend_runner
with sima_logger.ScopedLogLevel(log_level):
os.makedirs(output_dir, exist_ok=True)
quantized_net = copy.deepcopy(self._net)
try:
_ = _compile_net_to_elf(quantized_net, output_dir,
desired_batch_size=batch_size, compress=compress,
tessellate_parameters=tessellate_parameters,
l2_caching_mode=l2_caching_mode,
enable_large_tensors=True)
except Exception as e:
_sanitize_afe_error("Compiling to elf failed.", e)
if quantized_net.status != Status.BACKEND_IR_COMPILED:
raise sima_logger.UserFacingException("User must compile the net before running this function.")
for idx, in_data in enumerate(input_data):
try:
backend_runner = create_n2a_backend_runner(quantized_net, out_dir=output_dir,
batch_size=batch_size,
file_name_postfix=f"_{idx}",
report_sim_failure=self._log_chk_failure_warning)
_ = quantized_net.run_batch(in_data, node_callable=backend_runner.execute_node)
except Exception as e:
_sanitize_afe_error("Generate elf and reference files failed.", e)
@_sanitize_exceptions(ExceptionFuncType.GENERATE_ELF_FILES)
[docs]
def execute_in_accelerator_mode(self, input_data: Iterable[InputValues], devkit: str,
*,
username: str = cp.DEFAULT_USERNAME, password: str = '',
batch_size: int = 1, compress: bool = True,
tessellate_parameters: Optional[TessellateParameters] = None,
log_level: Optional[int] = logging.NOTSET,
l2_caching_mode: L2CachingMode = L2CachingMode.NONE) -> List[np.ndarray]:
from afe.backends.mla.afe_to_n2a_compiler.n2a_backend_runner import create_n2a_backend_runner, RunMode
with sima_logger.ScopedLogLevel(log_level):
with tempfile.TemporaryDirectory() as tmp_dirname:
try:
print(f"Compiling model {self._net.name} to .elf file")
_ = _compile_net_to_elf(self._net, tmp_dirname,
desired_batch_size=batch_size, compress=compress,
tessellate_parameters=tessellate_parameters,
l2_caching_mode=l2_caching_mode,
enable_large_tensors=True,
do_pack=False)
except Exception as e:
_sanitize_afe_error("Compiling to elf failed.", e)
if self._net.status != Status.BACKEND_IR_COMPILED:
raise sima_logger.UserFacingException("User must compile the net before running this function.")
# Find ELF file path:
elf_output = None
elf_output = _find_elf_file(tmp_dirname, model_name=self._net.name)
if elf_output is None:
raise sima_logger.UserFacingException("No .elf file found after SDK compile.")
with SetupConnection(cp.DEFAULT_PORT, username, devkit, password,
cp.MAX_ATTEMPTS, elf_output, cp.ELF_FOLDER, cp.DEFAULT_ZMQ_PORT) as conn:
accel_pipeline = conn.get_accel_pipeline()
output = []
print("Executing model graph in accelerator mode:")
nsample = 0
if isinstance(input_data, LengthHintedIterable):
nsample = input_data.get_length()
for idx, in_data in enumerate(input_data):
try:
backend_runner = create_n2a_backend_runner(self._net, out_dir=tmp_dirname,
batch_size=batch_size,
file_name_postfix=f"_{idx}",
run_mode=RunMode.ACCELERATOR,
accel_pipeline=accel_pipeline)
out = self._net.run_batch(in_data, node_callable=backend_runner.execute_node)
output.append(out)
if nsample != 0:
# Known input size; show progress
print_progressbar(idx + 1, nsample, "Progress:",
"Complete. {}/{}".format(idx + 1, nsample), length=30, print_end="")
except Exception as e:
_sanitize_afe_error("Executing in accelerator mode failed.", e)
return output
@staticmethod
def _log_chk_failure_warning(location: str) -> None:
# Log a warning, but allow compiler to proceed.
sima_logger.sima_log_warning(f"Simulator detected a mismatch when verifying compiled code in {location}. "
"This may be caused by a numerical error in the compiler.")