#########################################################
# Copyright (C) 2020 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Joey Chou
#########################################################
from dataclasses import dataclass
import numpy as np
from enum import Enum, auto
from typing import TypeVar, Tuple, NewType, Union, List, Generic, Callable, Dict
from sima_utils.logging import sima_logger
_A = TypeVar("_A")
_B = TypeVar("_B")
_C = TypeVar("_C")
# Layouts
[docs]
AwesomeDataLayout = "NHWC"
[docs]
AwesomeDataLayout5D = "NDHWC"
[docs]
AwesomeConvWeightLayout = "HWIO"
[docs]
AwesomeConvWeightLayout5D = "DHWIO"
[docs]
AwesomeDepthwiseConvWeightLayout = "HWOI"
[docs]
AwesomeDepthwiseConvWeightLayout5D = "DHWOI"
[docs]
AwesomeTransposeConvWeightLayout5D = "DHWOI"
[docs]
NodeName = NewType("NodeName", str)
# Tensor format
# Model input shape
# Convolution padding from TVM
[docs]
ConvPad = Tuple[int, ...]
# Attributes
[docs]
AwesomePad2D = Tuple[Tuple[int, int], Tuple[int, int], Tuple[int, int], Tuple[int, int]]
[docs]
AwesomeStrides2D = Tuple[int, int, int, int]
[docs]
AwesomeDilation2D = Tuple[int, int, int, int]
[docs]
AwesomePoolSize2D = Tuple[int, int, int, int]
[docs]
AwesomePad3D = Tuple[Tuple[int, int], Tuple[int, int], Tuple[int, int], Tuple[int, int], Tuple[int, int]]
[docs]
AwesomeStrides3D = Tuple[int, int, int, int, int]
[docs]
AwesomeDilation3D = Tuple[int, int, int, int, int]
[docs]
AwesomePoolSize3D = Tuple[int, int, int, int, int]
[docs]
AwesomePad = Union[AwesomePad2D, AwesomePad3D]
[docs]
AwesomeStrides = Union[AwesomeStrides2D, AwesomeStrides3D]
[docs]
AwesomeDilation = Union[AwesomeDilation2D, AwesomeDilation3D]
[docs]
AwesomePoolSize = Union[AwesomePoolSize2D, AwesomePoolSize3D]
# Float
[docs]
Float = np.dtype("float32")
# Quantization
[docs]
QuantizedTensor = np.dtype("int32")
[docs]
QuantizedTensorNew = np.dtype("int8")
[docs]
QuantizedTensorInt16 = np.dtype("int16")
[docs]
class Status(str, Enum):
"""
Status for AwesomeNode
RELAY: Right after parsing from TVM Relay IR module
CALIBRATED: Calibrated
SIMA_QUANTIZED: SiMa Quantized
BACKEND_IR_LOWERED: After lowering MLA subgraphs to SiMa BackendIR
BACKEND_IR_COMPILED: After compilation using compile_awesomenet
"""
[docs]
CALIBRATED = "CALIBRATED"
[docs]
SIMA_QUANTIZED = "SIMA_QUANTIZED"
[docs]
BACKEND_IR_LOWERED = "BACKEND_IR_LOWERED"
[docs]
BACKEND_IR_COMPILED = "BACKEND_IR_COMPILED"
_TENSOR = TypeVar("_TENSOR")
[docs]
class DataValue(Generic[_TENSOR]):
"""
An abstract value in a network. The type parameter represents
the data type that stands in for a tensor value.
"""
pass
@dataclass
[docs]
class TensorValue(DataValue[_TENSOR]):
"""
An abstract value associated with a tensor in a network.
"""
@dataclass
[docs]
class TupleValue(DataValue[_TENSOR]):
"""
An abstract value associated with a tuple in a network.
An abstract value is associated with each element of the tuple.
"""
[docs]
elements: List[DataValue[_TENSOR]]
[docs]
def foreach_data_value(f: Callable[[_TENSOR], None], v: DataValue[_TENSOR]) -> None:
"""
Apply a function to each tensor value in a DataValue.
:param f: Function to apply
:param v: DataValue to traverse
"""
if isinstance(v, TensorValue):
f(v.value)
elif isinstance(v, TupleValue):
for e in v.elements:
foreach_data_value(f, e)
else:
raise TypeError("Expecting a DataValue")
[docs]
def data_value_elements(v: DataValue[_TENSOR]) -> List[_TENSOR]:
"""
Get all tensor values in a DataValue.
Since the DataValue structure is ignored, this function is only suitable when
it doesn't matter where the tensor values are located inside the DataValue.
"""
ret = []
foreach_data_value(lambda x: ret.append(x), v)
return ret
[docs]
def get_expected_tensor_value(v: DataValue[_TENSOR]) -> _TENSOR:
"""
Get a value from DataValue while expecting that the type of s DataValue is a TensorValue.
"""
assert isinstance(v, TensorValue)
return v.value
[docs]
def get_expected_tuple_values(v: DataValue[_TENSOR]) -> List[_TENSOR]:
"""
Get a list of values from DataValue while expecting that the type of s DataValue is
non-nested TupleValue.
"""
assert isinstance(v, TupleValue)
return [get_expected_tensor_value(e) for e in v.elements]
[docs]
def reduce_data_value(f: Callable[[_A, _TENSOR], _A], v: DataValue[_TENSOR], initial: _A) -> _A:
"""
Combine all values in a DataValue using the given function.
:param f: Combining function
:param v: DataValue to traverse
:param initial: Initial value of result
:return: Combined value
"""
if isinstance(v, TensorValue):
return f(initial, v.value)
elif isinstance(v, TupleValue):
acc = initial
for e in v.elements:
acc = reduce_data_value(f, e, acc)
return acc
else:
raise TypeError("Expecting a DataValue")
[docs]
def map_data_value(f: Callable[[_A], _B], v: DataValue[_A]) -> DataValue[_B]:
"""
Transform each tensor value in a DataValue according to the given function,
and return the results as a new DataValue.
:param f: Function to apply
:param v: DataValue to transform
:return: DataValue with all tensor values transformed
"""
if isinstance(v, TensorValue):
return TensorValue(f(v.value))
elif isinstance(v, TupleValue):
return TupleValue([map_data_value(f, e) for e in v.elements])
else:
raise TypeError("Expecting a DataValue")
[docs]
def zip_data_value(f: Callable[[_A, _B], _C], x: DataValue[_A], y: DataValue[_B]) -> DataValue[_C]:
"""
Apply f to each pair of tensor values at the same positions in x and y,
which must have the same shape. Return the results as a new DataValue having
the same shape as x and y.
:param f: Function to apply
:param x: DataValue to transform
:param y: DataValue to transform
:return: Transformed data
"""
match (x, y):
case (TensorValue(u), TensorValue(v)):
return TensorValue(f(u, v))
case (TupleValue(us), TupleValue(vs)):
assert len(us) == len(vs), "Tuple lengths do not match"
return TupleValue([zip_data_value(f, u, v) for u, v in zip(us, vs)])
case _:
raise ValueError("DataValue shapes do not match")
[docs]
def reconstruct_data_value(values: List[_TENSOR]) -> DataValue[_TENSOR]:
"""
Convert a list to a DataValue, using heuristics to guess the data structure. This function is provided
for compatibility with existing code that does not keep track of the data structure.
If the list has one item, it's treated as representing a single tensor. If it has many items, it's
treated as representing a tuple of tensors.
:param values: Values to interpret as a DataValue
"""
assert len(values) > 0, "Cannot infer data shape for empty list"
if len(values) == 1:
# Treat as a single tensor
return TensorValue(values[0])
# Treat as a tuple
return TupleValue([TensorValue(v) for v in values])
[docs]
class DataIndex:
"""The position of an A within a DataValue[A]. This is an algebraic data type."""
pass
@dataclass
[docs]
class TensorIndex(DataIndex):
"""Identifies the single value in a TensorValue."""
pass
@dataclass
[docs]
class TupleIndex(DataIndex):
"""Identifies a position in a TupleValue."""
[docs]
nested_index: DataIndex
[docs]
def index_data_value(v: DataValue[_TENSOR], i: DataIndex) -> _TENSOR:
"""
Get the value at the given index.
"""
match (v, i):
case (TensorValue(x), TensorIndex()):
return x
case (TupleValue(xs), TupleIndex(n, i2)):
return index_data_value(xs[n], i2)
case _:
raise IndexError("Shapes of given DataIndex and DataValue do not match")
@dataclass
[docs]
class NodeAssociatedValue(Generic[_TENSOR]):
"""
A set of abstract values associated with a network node's
inputs and outputs.
Input values are held in an ordered dictionary mapping strings to
data values. Inputs can be examined positionally or by name.
The output value is a single data value.
"""
[docs]
output: DataValue[_TENSOR]
def __post_init__(self):
for v in self.inputs.values():
assert isinstance(v, DataValue)
assert isinstance(self.output, DataValue)
[docs]
class RequantizationMode(Enum):
"""
A way of doing quantized arithmetic. Different modes make different arithmetic simplifications
embodying different speed accuracy tradeoffs. It is expected that TFLite-style quantization would
give better accuracy while Sima-style quantization will run faster.
The requantiaztion mode only applies to convolution operators.
"""
[docs]
sima = auto() # SiMa quantization. Whenever possible, multiply and add are factored out of requantization,
# so that ArithFoldedRequantization is used. It involves only shift and round.
[docs]
tflite = auto() # Prefer to use TFLiteRequantization. It involves multiply, shift, round, and add.
@dataclass(frozen=True)
[docs]
class Quantization:
"""
A quantization scale. It represents an encoding of real numbers r as integers q where:
L = -2^(bits-1) (integer range lower bound)
U = 2^(bits-1)-1 (integer range upper bound)
q_unbounded = round((r * scale) + zero_point) (linear mapping to representable range)
q = max(L, min(U, q_unbounded)) (clip to range)
Fields min_val and max_val give the range of floating-point values that are represented,
for instance the range that was selected by calibration.
This range must be representable within the integer range, that is,
L <= round((min_val * scale) + zero_point) <= round((max_val * scale) + zero_point) <= U
Often it spans the entire range from L to U. It may be smaller if the range was
expanded due to constraints on the quantized representation, such as when using
symmetric quantization for a numeric range that is not symmetric. If a larger numeric
range was clipped when quantizing, min_val and max_val still describe the representable
range and not the original range.
When a tensor contains only zero, scale is set to 0. and min_val = max_val = 0.
The default values represent quantization of the floating-point range [-128, 127] using
the integer range [-128, 127].
"""
[docs]
min_val: float = -128.0
def __post_init__(self):
assert isinstance(self.scale, float), f"Scale type {type(self.scale)}"
assert isinstance(self.zero_point, int), f"Zero point type {type(self.zero_point)}"
assert isinstance(self.bits, int), f"Bits type {type(self.bits)}"
assert isinstance(self.min_val, float), f"Min val type {type(self.min_val)}"
assert isinstance(self.max_val, float), f"Max val type {type(self.max_val)}"
if self.scale == 0.0:
assert self.min_val == self.max_val == 0.0, f'For scale {self.scale}, min_val and max_val should be 0,' \
f'Got min_val = {self.min_val}, max_val = {self.max_val}'
@staticmethod
[docs]
def representable(scale: float, zero_point: int, bits: int) -> "Quantization":
"""
Create a quantization scale that includes the entire representable integer range.
See Quantization for documentation of the parameters.
For zero tensors, scale is 0. and min_val = max_val = 0.
:param scale: Quantization scale.
:param zero_point: Quantization zero point.
:param bits: Quantization bits.
:return: Quantization scale constructed from the given parameters.
"""
if scale == 0.:
min_val = 0.
max_val = 0.
zero_point = 0
else:
min_val = (-2**(bits-1) - zero_point) / scale
max_val = (2**(bits-1)-1 - zero_point) / scale
return Quantization(scale, zero_point, bits, min_val, max_val)
[docs]
class RequantMethod(Enum):
"""
A requantization method as defined in ml_kernels. This enum is used to
select which type of requantization to use when a network is quantized.
"""
[docs]
fractional_zero = auto()
[docs]
scaled_fz = auto() # FractionalZeroRequantization restricted to zp_correction=0
[docs]
class QuantizationCast:
"""
A quantization-related conversion on data. When the algorithm detects
that a conversion needs to be inserted in a model graph, it's recorded using this class.
This is an algebraic data type.
"""
pass
@dataclass
[docs]
class IdentityCast(QuantizationCast):
"""
A conversion that does nothing. It represents the case where no conversion is needed.
"""
pass
@dataclass
[docs]
class QuantCast(QuantizationCast):
"""
A quantization cast. It represents a cast of a tensor having the given shape
from float32 to int8 or int32 by computing round(r * scale + zero_point).
"""
from afe.ir.tensor_type import ScalarType
@dataclass
[docs]
class DequantCast(QuantizationCast):
"""
A quantization cast. It represents a cast of a tensor having the given shape
from an integer type to float32 by computing (q - zero_point) / scale.
:param shape: Shape of tensor to dequantize
:param scale: Quantization scale
:param zero_point: Quantization zero point
:param input_dtype: Input data type. The valid Numpy data types are: np.int8, np.int16, or np.int32.
"""
@dataclass
[docs]
class RequantCast(QuantizationCast):
"""
A quantization cast. It represents a cast of a tensor having the given shape
from an int32 type to int16/int8.
:param shape: Shape of a tensor
:param in_scale: Input quantization scale
:param in_zero_point: Input quantization zero point
:param out_scale: Output quantization scale
:param out_zero_point: Output quantization zero point
:param input_32_bit: If True, the input type is int32. If False, the input type is int16.
:param output_type: Output data type, can be int16 or int8
:param requantization_type: Type of requantization to use.
If arith_folded is used, then the requantization will use only
a shift; the scales and zero points must be related by a power
of 2 factor to minimize rounding error.
"""
[docs]
requant_method: RequantMethod = RequantMethod.fractional_zero
[docs]
def get_output_quantization(self) -> Quantization:
num_bits = 16 if self.output_16_bit else 8
return Quantization(self.out_scale, self.out_zero_point, num_bits, self.min_val, self.max_val)
@dataclass
[docs]
class ConvertCast(QuantizationCast):
"""
A numeric conversion. It represents a conversion from one numeric type
to the nearest approximation in another numeric type.
:param shape: Shape of a tensor
:param in_type: Scalar type of input
:param out_type: Scalar type of output
"""
from afe.ir.tensor_type import ScalarType
@dataclass
[docs]
class TupleCast(QuantizationCast):
"""
A tuple cast. It applies a cast to each element of the tuple.
"""
[docs]
elements: List[QuantizationCast]
@dataclass
[docs]
class QuantizationCasts:
"""
A set of quantization casts to apply to a model. The casts are collected during a
traversal of the model, then applied after the traversal is finished.
Field `casts` holds the casts to apply to node inputs. If a node does not need casts, it
is omitted.
"""
[docs]
casts: Dict[NodeName, InputsQuantCast]
def __init__(self):
self.casts = dict()
[docs]
def insert(self, node: NodeName, cast: InputsQuantCast):
if cast.does_nothing():
return
self.casts[node] = cast
@dataclass
[docs]
class LayerStats:
"""
Layer statistics. For each MLA node, quantization error is calculated,
that information is than forwarded to .sima.json file, and it can be viewed in Netron.
:param metric: Metric that is used for calculating error value.
:param error_value: Error value.
"""
[docs]
class NodeReporter:
"""
A node reporter to display information or warning messages about a node during transformations
"""
[docs]
def info(self, msg: str):
raise NotImplementedError("NodeReporter.info is an abstract method")
[docs]
def debug(self, msg: str):
raise NotImplementedError("NodeReporter.debug is an abstract method")
[docs]
def warn(self, msg: str):
raise NotImplementedError("NodeReporter.warn is an abstract method")
[docs]
class LogNodeReporter(NodeReporter):
"""
A node reporter to display information or warning messages about a node during transformations
:param node_name: Name of the node
"""
def __init__(self, node_name: NodeName):
self.node_name = node_name
[docs]
def info(self, msg: str):
sima_logger.sima_log_info(f"In node {self.node_name}, {msg}")
[docs]
def debug(self, msg: str):
sima_logger.sima_log_dbg(f"In node {self.node_name}, {msg}")
[docs]
def warn(self, msg: str):
sima_logger.sima_log_warning(f"In node {self.node_name}, {msg}")
[docs]
class BiasCorrectionType(Enum):
"""
A bias correction method for convolution.
REGULAR: Bias correction using input mean estimated during calibration
ITERATIVE: Bias correction using input mean estimated by executing the
quantized model with a set of calibration inputs
NONE: No bias correction
"""
[docs]
ITERATIVE = 'ITERATIVE'