#########################################################
# Copyright (C) 2022 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Joey Chou
#########################################################
import numpy as np
from typing import Dict, List, Union, Any, Optional, ClassVar, Iterable
from afe.apis.defines import InputValues
from sima_utils.data.data_generator import DataGenerator
from sima_utils.logging import sima_logger
from afe.core.graph_analyzer.analyzed_results import AnalyzedResultDict
from afe.core.graph_analyzer.utils import QuantizedGraphAnalyzerMode, Metric, get_metric_func
from afe.ir.operations import PlaceholderOp, TupleOp, TupleGetItemOp, QuantizationTransformOp, \
DequantizationTransformOp, RequantizeOp, CastOp
from afe.ir.defines import Status, TensorFormat, DataValue, Quantization, data_value_elements, NodeName, InputName, \
map_data_value
from afe.ir.execute import execute_node, execute_node_quant
from afe.ir.net import AwesomeNet
from afe.ir.node import node_is_awesomenet, AwesomeNode, get_node_inputs, node_is_sima_ir
from afe.ir.quantization_utils import dequantize_value, quantize_value
def _node_is_excluded(node: AwesomeNode) -> bool:
"""
Return True if the node is excluded from analysis.
Nodes are excluded if there is no reliable way to identify
corresponding nodes before and after quantization.
"""
if not node_is_sima_ir(node):
return True
# Any node that may be inserted by quantization is excluded from analysis
return isinstance(node.ir.operation, (PlaceholderOp, TupleOp, TupleGetItemOp,
QuantizationTransformOp, DequantizationTransformOp, RequantizeOp, CastOp))
[docs]
class BaseGraphAnalyzer:
"""
Base class for GraphAnalyzer. Contains property to show analyzed
results and setter to prevent users from directly modifying the
analyzed results.
"""
_analyzed_results: Optional[Dict[Metric, AnalyzedResultDict]] = None
_SUPPORTED_METRICS: ClassVar[List[Metric]]
_default_metrics: ClassVar[List[Metric]]
def _normalize_and_validate_metrics(self, metrics: Optional[Union[Metric, List[Metric]]]) -> List[Metric]:
# Set to default if metrics is None
if metrics is None:
metrics = self._default_metrics
# Make sure the metrics is iterable
if not isinstance(metrics, list):
metrics = [metrics]
if not all([metric in self._SUPPORTED_METRICS for metric in metrics]):
err_msg = (f"{self.__class__.__name__} only support {self._SUPPORTED_METRICS} metrics."
f" Got {metrics}")
raise ValueError(err_msg)
return metrics
def _dequantize_intermediates(self,
net: AwesomeNet,
intermediates: Dict[str, TensorFormat]) -> None:
"""
Dequantize intermediates dictionary inplace. Use the given AwesomeNet
to extract quantization parameters for each node and use them to
dequantize the corresponding intermediates.
Remove nodes that are excluded from analysis.
Parameters
----------
:param net: AwesomeNet. The quantized AwesomeNet that contains quantization
parameters for each node.
:param intermediates: TensorFormat. The dictionary contains intermediates for
each node
"""
assert net.status == Status.SIMA_QUANTIZED, \
f"Please make sure the AwesomeNet is quantized. Got status = {net.status}"
for node in net.iter_nodes_recursive():
node_name = node.name
if node_is_awesomenet(node):
# Use the output node's quant_attrs as the sub-graph's quant_attrs
calib_attrs = node.ir.nodes[node.ir.output_node_name].ir.calib_attrs
else:
# Node contains a SiMa IR
calib_attrs = node.ir.calib_attrs
if _node_is_excluded(node):
intermediates.pop(node_name, None)
continue
intermediates[node_name] = \
dequantize_value(intermediates[node_name], map_data_value(lambda x: x.quant, calib_attrs.quant))
def _execute_net(self,
net: AwesomeNet,
inputs: Dict[str, np.ndarray],
dequantize_intermediate: bool = False,
) -> Dict[str, TensorFormat]:
"""
Execute the given AwesomeNet using the given inputs.
Save the intermediates and return it.
This function returns all intermediates, even if they would be excluded from analysis.
Parameters
----------
:param net: AwesomeNet. The AwesomeNet that will be executed
:param inputs: Dict[str, TensorFormat]. Input dictionary that will be fed into
the given AwesomeNet
:param dequantize_intermediate: bool. Default is False. If True, will dequantize
each intermediate results to float
Return
------
:return: Dict[str, TensorFormat]. A dictionary contains intermediates for each node
"""
intermediates = {name: None for name in list(node.name for node in net.iter_nodes_recursive())}
node_callable = execute_node_quant if net.status == Status.SIMA_QUANTIZED else execute_node
net.run(inputs, node_outputs=intermediates, keep_intermediate_results=True, node_callable=node_callable)
if dequantize_intermediate:
self._dequantize_intermediates(net, intermediates)
return intermediates
@property
[docs]
def analyzed_results(self) -> Optional[Dict[Metric, AnalyzedResultDict]]:
return self._analyzed_results
@analyzed_results.setter
def analyzed_results(self, _: Any):
print("analyzed_results can only be changed by running analyze function.")
[docs]
def print_analysis_results(self):
assert self._analyzed_results is not None, "No analysis results found. Please run analyze() method."
for metric, metric_results in self._analyzed_results.items():
print(f"Analysis results using metric {metric}:")
for node, res in metric_results.items():
print(f"Layer {node}, {metric} = {res}")
##############################
# Intermediates Graph Analyzer
##############################
##########################
# Quantized Graph Analyzer
##########################
def _remove_excluded_nodes(intermediates: Dict[NodeName, Any], net: AwesomeNet) -> None:
"""
Given intermediates that were obtained by calling AwesomeNet.run,
remove intermediates of nodes that are excluded from analysis.
:param intermediates: Intermediates produced by executing the graph. It will be modified.
:param net: Net that was used to produce intermediates
"""
for node in net.iter_nodes_recursive():
if _node_is_excluded(node):
intermediates.pop(node.name, None)
[docs]
class QuantizedGraphAnalyzer(BaseGraphAnalyzer):
"""
Given two AwesomeNets net_reference and net_quantized, execute them and save the intermediates.
QuantizedGraphAnalyzer supports two different modes:
* QuantizedGraphAnalyzerMode.global_feed:
Execute the both fp32 AwesomeNet and quantized AwesomeNet using the same inputs.
Compare intermediates between both AwesomeNets and calculate the targeted metrics
* QuantizedGraphAnalyzerMode.local_feed:
Execute the fp32 AwesomeNet. Execute the given quantized AwesomeNet using the
intermediates from fp32 AwesomeNet. When execute each node in the quantized
AwesomeNet, instead using the output from its previous node(s), it uses the
intermediates from the fp32 AwesomeNet. Each input to the node will be quantized to
int8 before execution. Compare intermediates between both AwesomeNets and calculate
the targeted metrics
Use the intermediates to calculate the target metrics. It currently supports
metrics listed below:
* Metric.mae - mean absolute error
* Metric.mse - mean squared error
* Metric.psnr - peak signal-to-noise ratio
The metrics will be saved in self.analyzed_results.
Example
-------
The following code block analyze two AwesomeNets using MAE and PSNR metrics. The first
AwesomeNet is un-quantized (fp32) and the second AwesomeNet is quantized
.. code-block:: python
inputs = get_input_generator()
mode = QuantizedGraphAnalyzerMode.local_feed
metrics = [Metric.mae, Metric.psnr]
analyzer = QuantizedGraphAnalyzer()
analyzer.analyze(net_reference, net_quantized, inputs, metrics)
analyzed_results = analyzer.analyzed_results
Parameters
----------
:param analyzed_results: List[AnalyzedResultDict]. The analyzed results in a
dictionary format. The structure is:
{
METRIC_0_NAME: {
LAYER_NAME_0: [metric_result_0_0, metric_result_0_1, ...]
LAYER_NAME_1: [metric_result_1_0, metric_result_1_1, ...]
...
},
METRIC_1_NAME: {
LAYER_NAME_0: [metric_result_0_0, metric_result_0_1, ...]
LAYER_NAME_1: [metric_result_1_0, metric_result_1_1, ...]
...
}
},
# Metrics for each node using 1 input data point
{
...
},
"""
[docs]
mode: QuantizedGraphAnalyzerMode
_SUPPORTED_METRICS: ClassVar[List[Metric]] = [Metric.mae, Metric.mse, Metric.psnr]
_default_metrics: ClassVar[List[Metric]] = [Metric.psnr]
def __init__(self, mode: QuantizedGraphAnalyzerMode = QuantizedGraphAnalyzerMode.global_feed):
assert isinstance(mode, QuantizedGraphAnalyzerMode), \
f"Got unsupported mode {mode} in QuantizedGraphAnalyzer"
self.mode = mode
@staticmethod
def _read_node_inputs(node: AwesomeNode,
net_reference: AwesomeNet,
ref_node_intermediates: Dict[str, TensorFormat],
) -> Optional[Dict[str, TensorFormat]]:
"""
Reads the node inputs from a dictionary of intermediate values obtained by running the
reference floating-point network. Quantize the inputs as needed so they can be passed
as inputs of node.
:param node: Quantized node whose inputs should be obtained. This node normally
belongs to a network that was created by quantizing net_reference.
:param net_reference: The reference AwesomeNet, which was executed to compute ref_node_intermediates.
:param ref_node_intermediates: ref_node_intermediates[n] is the output of executing the reference
AwesomeNet node named n.
:return: A dictionary containing node inputs. None if the node was not found.
"""
assert not _node_is_excluded(node)
# Find the inputs of the corresponding node in the reference network
reference_node = net_reference.nodes[node.name]
inputs = get_node_inputs(reference_node, ref_node_intermediates)
# Quantize the inputs so they can be used as input of the quantized node
input_quant = node.ir.calib_attrs.input_quant
assert list(inputs.keys()) == list(input_quant.keys())
return {name: quantize_value(inputs[name], map_data_value(lambda x: x.quant, input_quant[name]))
for name in inputs.keys()}
@staticmethod
def _execute_quantized_net_node(node: AwesomeNode, inputs: Dict[str, TensorFormat],
node_outputs: Dict[str, TensorFormat]):
"""
Executes the node of a quantized AwesomeNet using floating-point inputs.
If the given node is expecting the quantized values, input values are quantized.
:param node: AwesomeNode. A node which is being executed.
:param inputs: Dict[str, TensorFormat]. Dictionary containing node inputs in floating-point precision.
:param node_outputs: Dict[str, TensorFormat]. Dictionary containing node outputs.
"""
execute_node_quant(node, inputs, node_outputs)
def _execute_quantized_net_with_local_inputs(self,
net_quantized: AwesomeNet,
net_reference: AwesomeNet,
inputs: Optional[Dict[str, np.ndarray]],
node_intermediates: Dict[str, TensorFormat],
dequantize_intermediate: bool = False
) -> Dict[str, TensorFormat]:
"""
Execute the given quantized AwesomeNet using the given intermediates from fp32
AwesomeNet, skipping nodes that are excluded from analysis. Input values for
executing nodes in the quantized AwesomeNet are obtained
from given node_intermediates dictionary, instead of using the output from its previous
node(s). If the input node(s) to the quantized AwesomeNet node is not present in the
node_intermediates dictionary, output of the previous node is used. Each input to the
node will be quantized to int8 before execution. Save the intermediates and return it.
Parameters
----------
:param net_quantized: AwesomeNet. The quantized AwesomeNet that will be executed
:param net_reference: AwesomeNet. The reference AwesomeNet. Used to extract node's inputs.
The node's inputs are read based on the node's inputs before quantization.
:param inputs: Dict[str, np.ndarray]. Input dictionary that is fed into the given AwesomeNet.
:param node_intermediates: Dict[str, TensorFormat]. Dictionary where the key is the node name
and the value is the output of the node obtained by running the
reference fp32 AwesomeNet.
:param dequantize_intermediate: bool. Default is False. If True, will dequantize each intermediate
result to float.
Return
------
:return: Dict[str, TensorFormat]. A dictionary containing intermediates for each node.
"""
intermediates = {name: None for name in list(node.name for node in net_quantized.iter_nodes_recursive())}
for node_name in net_quantized.execution_order:
node = net_quantized.nodes[node_name]
if node_is_awesomenet(node):
# If the AwesomeNode contains a sub-graph, execute the sub-graph using
# the same function
node_reference = net_reference.nodes[node_name]
assert node_is_awesomenet(node_reference)
intermediates.update(
self._execute_quantized_net_with_local_inputs(node.ir, node_reference.ir, inputs,
node_intermediates, False))
continue
if _node_is_excluded(node):
continue
node_inputs = self._read_node_inputs(node, net_reference, node_intermediates)
if node_inputs is None:
# This should not happen. Since the analysis can tolerate missing information,
# just ignore this node.
sima_logger.sima_log_debug(f"Not analyzing {node.name} because "
"its floating-point inputs were not be found")
continue
# Execute the node and store the result in the shared dictionary
self._execute_quantized_net_node(node, node_inputs, intermediates)
if dequantize_intermediate:
self._dequantize_intermediates(net_quantized, intermediates)
return intermediates
[docs]
def analyze(self,
net_reference: AwesomeNet,
net_quantized: AwesomeNet,
inputs: Union[DataGenerator, Dict[str, np.ndarray], Iterable[InputValues]],
metrics: Optional[Union[Metric, List[Metric]]] = None,
) -> None:
"""
Given two AwesomeNets, using the inputs to calculate the targeted metrics.
Parameters
----------
:param net_reference: AwesomeNet. Reference AwesomeNet.
:param net_quantized: AwesomeNet. Quantized AwesomeNet that will be analyzed.
:param inputs: Union[DataGenerator, Dict[str, np.ndarray]. Inputs that will be used to
do inference for both net_reference and net_quantized. Inputs can be
DataGenerator containing multiple data points, or a single data point
dictionary.
:param metrics: Optional[Union[Metric, List[Metric]]]. A single Metric enum or a list
of Metric enums.
"""
# Validate metrics
metrics = self._normalize_and_validate_metrics(metrics)
assert net_quantized.status == Status.SIMA_QUANTIZED, \
f"Please make sure the net_quantized is quantized. Got status = {net_quantized.status}"
# Initialize AnalyzedResultDict object
self._analyzed_results = dict()
for metric in metrics:
self._analyzed_results[metric] = AnalyzedResultDict()
# Make sure the inputs is iterable
if isinstance(inputs, dict):
inputs = [inputs]
# Iterate each inputs
for i, _inputs in enumerate(inputs):
# Calculate intermediates for reference AwesomeNet.
# Don't remove nodes that are excluded from analysis, as they may be needed for local feed.
net_reference_intermediates = self._execute_net(net_reference, _inputs)
if self.mode == QuantizedGraphAnalyzerMode.global_feed:
net_quantized_intermediates = self._execute_net(net_quantized, _inputs, dequantize_intermediate=True)
else: # self.mode == QuantizedGraphAnalyzerMode.local_feed:
# Calculate intermediates for quantized AwesomeNet
net_quantized_intermediates = \
self._execute_quantized_net_with_local_inputs(net_quantized,
net_reference,
_inputs,
net_reference_intermediates,
dequantize_intermediate=True)
_remove_excluded_nodes(net_quantized_intermediates, net_quantized)
# Compute statistics using the values that were produced during execution
for metric in metrics:
metric_func = get_metric_func(metric)
analyzed_result_dict = self._analyzed_results[metric]
# For each node where the output was saved from both the reference and quantized execution
for k, v_1 in net_quantized_intermediates.items():
try:
v_2 = net_reference_intermediates[k]
except KeyError:
continue
# Only analyze arrays, not tuples
if not isinstance(v_1, np.ndarray):
continue
assert isinstance(v_2, np.ndarray)
if k not in analyzed_result_dict:
analyzed_result_dict[k] = []
# Value to the intermediates is the calculated metric results
analyzed_result_dict[k].append(metric_func(v_1, v_2))