Source code for afe.ir.defines

#########################################################
# Copyright (C) 2020 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Joey Chou
#########################################################
from dataclasses import dataclass

import numpy as np
from enum import Enum, auto
from typing import TypeVar, Tuple, NewType, Union, List, Generic, Callable, Dict

from sima_utils.logging import sima_logger


_A = TypeVar("_A")
_B = TypeVar("_B")
_C = TypeVar("_C")

# Layouts
[docs] AwesomeDataLayout = "NHWC"
[docs] AwesomeDataLayout5D = "NDHWC"
[docs] AwesomeConvWeightLayout = "HWIO"
[docs] AwesomeConvWeightLayout5D = "DHWIO"
[docs] AwesomeDepthwiseConvWeightLayout = "HWOI"
[docs] AwesomeDepthwiseConvWeightLayout5D = "DHWOI"
[docs] AwesomeTransposeConvWeightLayout5D = "DHWOI"
[docs] NoneType = type(None)
[docs] NodeName = NewType("NodeName", str)
[docs] InputName = NewType("InputName", str)
# Tensor format
[docs] TensorFormat = Union[np.ndarray, Tuple[np.ndarray, ...], List[np.ndarray]]
# Model input shape
[docs] InputShape = Tuple[int, int, int, int]
# Convolution padding from TVM
[docs] ConvPad = Tuple[int, ...]
# Attributes
[docs] AwesomePad2D = Tuple[Tuple[int, int], Tuple[int, int], Tuple[int, int], Tuple[int, int]]
[docs] AwesomeStrides2D = Tuple[int, int, int, int]
[docs] AwesomeDilation2D = Tuple[int, int, int, int]
[docs] AwesomePoolSize2D = Tuple[int, int, int, int]
[docs] AwesomePad3D = Tuple[Tuple[int, int], Tuple[int, int], Tuple[int, int], Tuple[int, int], Tuple[int, int]]
[docs] AwesomeStrides3D = Tuple[int, int, int, int, int]
[docs] AwesomeDilation3D = Tuple[int, int, int, int, int]
[docs] AwesomePoolSize3D = Tuple[int, int, int, int, int]
[docs] AwesomePad = Union[AwesomePad2D, AwesomePad3D]
[docs] AwesomeStrides = Union[AwesomeStrides2D, AwesomeStrides3D]
[docs] AwesomeDilation = Union[AwesomeDilation2D, AwesomeDilation3D]
[docs] AwesomePoolSize = Union[AwesomePoolSize2D, AwesomePoolSize3D]
# Float
[docs] Float = np.dtype("float32")
# Quantization
[docs] QuantizedTensor = np.dtype("int32")
[docs] QuantizedTensorNew = np.dtype("int8")
[docs] QuantizedTensorInt16 = np.dtype("int16")
[docs] QuantizedParam = int
[docs] class Status(str, Enum): """ Status for AwesomeNode RELAY: Right after parsing from TVM Relay IR module CALIBRATED: Calibrated SIMA_QUANTIZED: SiMa Quantized BACKEND_IR_LOWERED: After lowering MLA subgraphs to SiMa BackendIR BACKEND_IR_COMPILED: After compilation using compile_awesomenet """
[docs] RELAY = "RELAY"
[docs] CALIBRATED = "CALIBRATED"
[docs] SIMA_QUANTIZED = "SIMA_QUANTIZED"
[docs] BACKEND_IR_LOWERED = "BACKEND_IR_LOWERED"
[docs] BACKEND_IR_COMPILED = "BACKEND_IR_COMPILED"
_TENSOR = TypeVar("_TENSOR")
[docs] class DataValue(Generic[_TENSOR]): """ An abstract value in a network. The type parameter represents the data type that stands in for a tensor value. """ pass
@dataclass
[docs] class TensorValue(DataValue[_TENSOR]): """ An abstract value associated with a tensor in a network. """
[docs] value: _TENSOR
@dataclass
[docs] class TupleValue(DataValue[_TENSOR]): """ An abstract value associated with a tuple in a network. An abstract value is associated with each element of the tuple. """
[docs] elements: List[DataValue[_TENSOR]]
[docs] def foreach_data_value(f: Callable[[_TENSOR], None], v: DataValue[_TENSOR]) -> None: """ Apply a function to each tensor value in a DataValue. :param f: Function to apply :param v: DataValue to traverse """ if isinstance(v, TensorValue): f(v.value) elif isinstance(v, TupleValue): for e in v.elements: foreach_data_value(f, e) else: raise TypeError("Expecting a DataValue")
[docs] def data_value_elements(v: DataValue[_TENSOR]) -> List[_TENSOR]: """ Get all tensor values in a DataValue. Since the DataValue structure is ignored, this function is only suitable when it doesn't matter where the tensor values are located inside the DataValue. """ ret = [] foreach_data_value(lambda x: ret.append(x), v) return ret
[docs] def get_expected_tensor_value(v: DataValue[_TENSOR]) -> _TENSOR: """ Get a value from DataValue while expecting that the type of s DataValue is a TensorValue. """ assert isinstance(v, TensorValue) return v.value
[docs] def get_expected_tuple_values(v: DataValue[_TENSOR]) -> List[_TENSOR]: """ Get a list of values from DataValue while expecting that the type of s DataValue is non-nested TupleValue. """ assert isinstance(v, TupleValue) return [get_expected_tensor_value(e) for e in v.elements]
[docs] def reduce_data_value(f: Callable[[_A, _TENSOR], _A], v: DataValue[_TENSOR], initial: _A) -> _A: """ Combine all values in a DataValue using the given function. :param f: Combining function :param v: DataValue to traverse :param initial: Initial value of result :return: Combined value """ if isinstance(v, TensorValue): return f(initial, v.value) elif isinstance(v, TupleValue): acc = initial for e in v.elements: acc = reduce_data_value(f, e, acc) return acc else: raise TypeError("Expecting a DataValue")
[docs] def map_data_value(f: Callable[[_A], _B], v: DataValue[_A]) -> DataValue[_B]: """ Transform each tensor value in a DataValue according to the given function, and return the results as a new DataValue. :param f: Function to apply :param v: DataValue to transform :return: DataValue with all tensor values transformed """ if isinstance(v, TensorValue): return TensorValue(f(v.value)) elif isinstance(v, TupleValue): return TupleValue([map_data_value(f, e) for e in v.elements]) else: raise TypeError("Expecting a DataValue")
[docs] def zip_data_value(f: Callable[[_A, _B], _C], x: DataValue[_A], y: DataValue[_B]) -> DataValue[_C]: """ Apply f to each pair of tensor values at the same positions in x and y, which must have the same shape. Return the results as a new DataValue having the same shape as x and y. :param f: Function to apply :param x: DataValue to transform :param y: DataValue to transform :return: Transformed data """ match (x, y): case (TensorValue(u), TensorValue(v)): return TensorValue(f(u, v)) case (TupleValue(us), TupleValue(vs)): assert len(us) == len(vs), "Tuple lengths do not match" return TupleValue([zip_data_value(f, u, v) for u, v in zip(us, vs)]) case _: raise ValueError("DataValue shapes do not match")
[docs] def reconstruct_data_value(values: List[_TENSOR]) -> DataValue[_TENSOR]: """ Convert a list to a DataValue, using heuristics to guess the data structure. This function is provided for compatibility with existing code that does not keep track of the data structure. If the list has one item, it's treated as representing a single tensor. If it has many items, it's treated as representing a tuple of tensors. :param values: Values to interpret as a DataValue """ assert len(values) > 0, "Cannot infer data shape for empty list" if len(values) == 1: # Treat as a single tensor return TensorValue(values[0]) # Treat as a tuple return TupleValue([TensorValue(v) for v in values])
[docs] class DataIndex: """The position of an A within a DataValue[A]. This is an algebraic data type.""" pass
@dataclass
[docs] class TensorIndex(DataIndex): """Identifies the single value in a TensorValue.""" pass
@dataclass
[docs] class TupleIndex(DataIndex): """Identifies a position in a TupleValue."""
[docs] index: int
[docs] nested_index: DataIndex
[docs] def index_data_value(v: DataValue[_TENSOR], i: DataIndex) -> _TENSOR: """ Get the value at the given index. """ match (v, i): case (TensorValue(x), TensorIndex()): return x case (TupleValue(xs), TupleIndex(n, i2)): return index_data_value(xs[n], i2) case _: raise IndexError("Shapes of given DataIndex and DataValue do not match")
@dataclass
[docs] class NodeAssociatedValue(Generic[_TENSOR]): """ A set of abstract values associated with a network node's inputs and outputs. Input values are held in an ordered dictionary mapping strings to data values. Inputs can be examined positionally or by name. The output value is a single data value. """
[docs] inputs: Dict[Union[NodeName, InputName], DataValue[_TENSOR]]
[docs] output: DataValue[_TENSOR]
def __post_init__(self): for v in self.inputs.values(): assert isinstance(v, DataValue) assert isinstance(self.output, DataValue)
[docs] class RequantizationMode(Enum): """ A way of doing quantized arithmetic. Different modes make different arithmetic simplifications embodying different speed accuracy tradeoffs. It is expected that TFLite-style quantization would give better accuracy while Sima-style quantization will run faster. The requantiaztion mode only applies to convolution operators. """
[docs] sima = auto() # SiMa quantization. Whenever possible, multiply and add are factored out of requantization,
# so that ArithFoldedRequantization is used. It involves only shift and round.
[docs] tflite = auto() # Prefer to use TFLiteRequantization. It involves multiply, shift, round, and add.
@dataclass(frozen=True)
[docs] class Quantization: """ A quantization scale. It represents an encoding of real numbers r as integers q where: L = -2^(bits-1) (integer range lower bound) U = 2^(bits-1)-1 (integer range upper bound) q_unbounded = round((r * scale) + zero_point) (linear mapping to representable range) q = max(L, min(U, q_unbounded)) (clip to range) Fields min_val and max_val give the range of floating-point values that are represented, for instance the range that was selected by calibration. This range must be representable within the integer range, that is, L <= round((min_val * scale) + zero_point) <= round((max_val * scale) + zero_point) <= U Often it spans the entire range from L to U. It may be smaller if the range was expanded due to constraints on the quantized representation, such as when using symmetric quantization for a numeric range that is not symmetric. If a larger numeric range was clipped when quantizing, min_val and max_val still describe the representable range and not the original range. When a tensor contains only zero, scale is set to 0. and min_val = max_val = 0. The default values represent quantization of the floating-point range [-128, 127] using the integer range [-128, 127]. """
[docs] scale: float = 1.0
[docs] zero_point: int = 0
[docs] bits: int = 8
[docs] min_val: float = -128.0
[docs] max_val: float = 127.0
def __post_init__(self): assert isinstance(self.scale, float), f"Scale type {type(self.scale)}" assert isinstance(self.zero_point, int), f"Zero point type {type(self.zero_point)}" assert isinstance(self.bits, int), f"Bits type {type(self.bits)}" assert isinstance(self.min_val, float), f"Min val type {type(self.min_val)}" assert isinstance(self.max_val, float), f"Max val type {type(self.max_val)}" if self.scale == 0.0: assert self.min_val == self.max_val == 0.0, f'For scale {self.scale}, min_val and max_val should be 0,' \ f'Got min_val = {self.min_val}, max_val = {self.max_val}' @staticmethod
[docs] def representable(scale: float, zero_point: int, bits: int) -> "Quantization": """ Create a quantization scale that includes the entire representable integer range. See Quantization for documentation of the parameters. For zero tensors, scale is 0. and min_val = max_val = 0. :param scale: Quantization scale. :param zero_point: Quantization zero point. :param bits: Quantization bits. :return: Quantization scale constructed from the given parameters. """ if scale == 0.: min_val = 0. max_val = 0. zero_point = 0 else: min_val = (-2**(bits-1) - zero_point) / scale max_val = (2**(bits-1)-1 - zero_point) / scale return Quantization(scale, zero_point, bits, min_val, max_val)
[docs] class RequantMethod(Enum): """ A requantization method as defined in ml_kernels. This enum is used to select which type of requantization to use when a network is quantized. """
[docs] fractional_zero = auto()
[docs] arith_folded = auto()
[docs] scaled_fz = auto() # FractionalZeroRequantization restricted to zp_correction=0
[docs] class QuantizationCast: """ A quantization-related conversion on data. When the algorithm detects that a conversion needs to be inserted in a model graph, it's recorded using this class. This is an algebraic data type. """ pass
@dataclass
[docs] class IdentityCast(QuantizationCast): """ A conversion that does nothing. It represents the case where no conversion is needed. """ pass
@dataclass
[docs] class QuantCast(QuantizationCast): """ A quantization cast. It represents a cast of a tensor having the given shape from float32 to int8 or int32 by computing round(r * scale + zero_point). """ from afe.ir.tensor_type import ScalarType
[docs] shape: Tuple[int, ...]
[docs] scale: float
[docs] zero_point: int
[docs] num_bits: int
[docs] out_type: ScalarType
@dataclass
[docs] class DequantCast(QuantizationCast): """ A quantization cast. It represents a cast of a tensor having the given shape from an integer type to float32 by computing (q - zero_point) / scale. :param shape: Shape of tensor to dequantize :param scale: Quantization scale :param zero_point: Quantization zero point :param input_dtype: Input data type. The valid Numpy data types are: np.int8, np.int16, or np.int32. """
[docs] shape: Tuple[int, ...]
[docs] scale: float
[docs] zero_point: int
[docs] input_dtype: np.dtype
[docs] output_dtype: np.dtype
@dataclass
[docs] class RequantCast(QuantizationCast): """ A quantization cast. It represents a cast of a tensor having the given shape from an int32 type to int16/int8. :param shape: Shape of a tensor :param in_scale: Input quantization scale :param in_zero_point: Input quantization zero point :param out_scale: Output quantization scale :param out_zero_point: Output quantization zero point :param input_32_bit: If True, the input type is int32. If False, the input type is int16. :param output_type: Output data type, can be int16 or int8 :param requantization_type: Type of requantization to use. If arith_folded is used, then the requantization will use only a shift; the scales and zero points must be related by a power of 2 factor to minimize rounding error. """
[docs] shape: Tuple[int, ...]
[docs] in_scale: float
[docs] in_zero_point: int
[docs] out_scale: float
[docs] out_zero_point: int
[docs] min_val: float
[docs] max_val: float
[docs] input_32_bit: bool
[docs] output_16_bit: bool
[docs] requant_method: RequantMethod = RequantMethod.fractional_zero
[docs] def get_input_quantization(self) -> Quantization: num_bits = 32 if self.input_32_bit else 16 return Quantization(self.in_scale, self.in_zero_point, num_bits, self.min_val, self.max_val)
[docs] def get_output_quantization(self) -> Quantization: num_bits = 16 if self.output_16_bit else 8 return Quantization(self.out_scale, self.out_zero_point, num_bits, self.min_val, self.max_val)
@dataclass
[docs] class ConvertCast(QuantizationCast): """ A numeric conversion. It represents a conversion from one numeric type to the nearest approximation in another numeric type. :param shape: Shape of a tensor :param in_type: Scalar type of input :param out_type: Scalar type of output """ from afe.ir.tensor_type import ScalarType
[docs] shape: Tuple[int, ...]
[docs] in_type: ScalarType
[docs] out_type: ScalarType
@dataclass
[docs] class TupleCast(QuantizationCast): """ A tuple cast. It applies a cast to each element of the tuple. """
[docs] elements: List[QuantizationCast]
@dataclass
[docs] class InputsQuantCast: """ A set of quantization casts to apply to a node's inputs. The dict has an entry for each input. """
[docs] casts: Dict[InputName, QuantizationCast]
[docs] def does_nothing(self) -> bool: """Return true if this cast does nothing.""" return all(isinstance(c, IdentityCast) for c in self.casts.values())
[docs] class QuantizationCasts: """ A set of quantization casts to apply to a model. The casts are collected during a traversal of the model, then applied after the traversal is finished. Field `casts` holds the casts to apply to node inputs. If a node does not need casts, it is omitted. """
[docs] casts: Dict[NodeName, InputsQuantCast]
def __init__(self): self.casts = dict()
[docs] def insert(self, node: NodeName, cast: InputsQuantCast): if cast.does_nothing(): return self.casts[node] = cast
@dataclass
[docs] class LayerStats: """ Layer statistics. For each MLA node, quantization error is calculated, that information is than forwarded to .sima.json file, and it can be viewed in Netron. :param metric: Metric that is used for calculating error value. :param error_value: Error value. """
[docs] metric: str
[docs] error_value: float
[docs] class NodeReporter: """ A node reporter to display information or warning messages about a node during transformations """
[docs] def info(self, msg: str): raise NotImplementedError("NodeReporter.info is an abstract method")
[docs] def debug(self, msg: str): raise NotImplementedError("NodeReporter.debug is an abstract method")
[docs] def warn(self, msg: str): raise NotImplementedError("NodeReporter.warn is an abstract method")
[docs] class LogNodeReporter(NodeReporter): """ A node reporter to display information or warning messages about a node during transformations :param node_name: Name of the node """
[docs] node_name: NodeName
def __init__(self, node_name: NodeName): self.node_name = node_name
[docs] def info(self, msg: str): sima_logger.sima_log_info(f"In node {self.node_name}, {msg}")
[docs] def debug(self, msg: str): sima_logger.sima_log_dbg(f"In node {self.node_name}, {msg}")
[docs] def warn(self, msg: str): sima_logger.sima_log_warning(f"In node {self.node_name}, {msg}")
[docs] class BiasCorrectionType(Enum): """ A bias correction method for convolution. REGULAR: Bias correction using input mean estimated during calibration ITERATIVE: Bias correction using input mean estimated by executing the quantized model with a set of calibration inputs NONE: No bias correction """
[docs] REGULAR = 'REGULAR'
[docs] ITERATIVE = 'ITERATIVE'
[docs] NONE = 'NONE'