Source code for afe.ir.operations

#########################################################
# Copyright (C) 2020 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Joey Chou
#########################################################
import dataclasses
import math
from abc import ABCMeta
from typing import (
    Type, cast, Callable, ClassVar,
    Mapping, Sequence)

import numpy as np
from ml_kernels.np_operators import ideal_udf
import ml_kernels.requantization as requantization
from ml_kernels.types import bfloat16
import ml_kernels.math_helpers

from sima_utils.logging import sima_logger

import afe.ir.quantization_conv
import afe.ir.utils as utils
import afe.ir.quantization_utils as quant_utils
import afe.ir.operation_functions as op_fn
from afe.ir import quantization_conv, bias_correction
from afe.ir.attributes import *
from afe.ir.defines import (
    InputName, QuantizedTensor, Float, DataValue, TensorValue, Quantization, TupleValue,
    QuantizedTensorNew, IdentityCast, QuantCast, DequantCast, QuantizationCast, InputsQuantCast,
    TupleCast, RequantCast, ConvertCast, get_expected_tensor_value, NodeReporter, RequantizationMode,
    BiasCorrectionType
)
from afe.ir.operation_functions import RunMode
from afe.ir.operation_type_check import type_check_operation_arguments
from afe.ir.quantization_interface import (
    OpQuantInterface, fix_input_to_int8, fix_input_to_int16, fix_input, keep_input,
    quantize_output, fix_output_to_int8, fix_output, fix_output_from_input,
    requantize_scaled, get_intermediate_min_max, fix_output_to_int16
)
from afe.ir.tensor_type import NodeType, ScalarType, TensorType, scalar_type_from_dtype, data_byte_size, \
    scalar_is_integral, scalar_is_floating
from afe.core.configs import RunConfigs, QuantizationConfigs, QuantizationPrecision

[docs] T = TypeVar('T')
[docs] AWESOME_ATTRS = TypeVar("AWESOME_ATTRS", bound=AwesomeAttributes)
[docs] QUANT_ATTRS = TypeVar("QUANT_ATTRS", bound=AwesomeQuantAttrBase)
[docs] AVGPOOL_TYPES = TypeVar("AVGPOOL_TYPES", AvgPoolAttrs, AdaptiveAvgPool2DAttrs)
[docs] AVGPOOL_CLASSES = (AvgPoolAttrs, AdaptiveAvgPool2DAttrs)
# Data that describes a node's output during quantization. # The data consists of the output quantization info, observed distribution and calibration inputs.
[docs] QuantizationTensorData = Tuple[DataValue[QuantResultTensorType], Optional[Dict[str, ObservedDistribution]], Optional[List[np.ndarray]]]
""" TODO: * Merge the quantization in single node and composite node. Ex: Use Conv2DOp.quantize in ConvAddActivationOp * Merge quantization, run_quant for Conv2D and Conv2DTranspose * Create check_attrs function to check attrs and quant_attrs """ def _warn_for_nonzero_zero_point(zp: int) -> None: """ Log a warning if the zero point is not zero, because the compiler can't handle it. Issue SWMLA-4306. """ if zp != 0: sima_logger.sima_log_warning("Convolution or matrix multiply has a nonzero output zero point. " "This operation will not compile correctly.") def _quantize_type(t: TensorType) -> TensorType: """ Convert a tensor type to an int8 tensor type having the same shape. This is the most common way that a type is changed during quantization. :param t: Tensor type to convert :return: Similar type as a tensor of int8 """ return replace(t, scalar=ScalarType.int8) def _binary_op_type(input_list: List[InputName], t: TensorType) -> NodeType: """ Construct the type of a binary elementwise operator having the given input names. Both input tensors and the output tensor have the same type. :param input_list: The input list of the AwesomeOperation. It should have 2 items. :param t: Type of the operator's output and inputs :return: Type of the operator """ assert len(input_list) == 2 data_type = TensorValue(t) return NodeType({name: data_type for name in input_list}, data_type) def _unary_op_type(input_list: List[InputName], t: TensorType) -> NodeType: """ Construct the type of a unary elementwise operator having the given input name. The input tensor and the output tensor have the same type. :param input_list: The input list of the AwesomeOperation. It should have 1 item. :param t: Type of the operator's output and input :return: Type of the operator """ assert len(input_list) == 1 data_type = TensorValue(t) return NodeType({name: data_type for name in input_list}, data_type) def _pooling_output_shape(input_shape: Tuple[int, ...], kernel_shape: Tuple[int, ...], padding: Tuple[Tuple[int, int], ...], stride: Tuple[int, ...], dilation: Tuple[int, ...]) -> Tuple[int, ...]: """ Calculate the shape of the output of the pooling operator in the dimensions that are pooled. All parameters should be tuples of the same length, having one element for each dimension that is pooled. The dimensions are normally interpreted as H, W for two-dimensional pooling or D, H, W for three-dimensional pooling. :param input_shape: Shape of the input tensor :param kernel_shape: Shape of the pooling kernel :param padding: Number of padding elements applied to the pooling input on the lower-index and upper-index ends. :param stride: Stride between pooling evaluation indices :param dilation: Dilation of the pooling kernel :return: Shape of the pooling output tensor """ assert len(input_shape) == len(kernel_shape) == len(padding) == len(stride) == len(dilation) assert all(x == 1 for x in dilation) # Other dilation values are not handled ret = [] for n, k, (p_lo, p_hi), s, d in zip(input_shape, kernel_shape, padding, stride, dilation): # Number of valid window positions num_window_positions = (n + p_lo + p_hi) - k + 1 # Some positions may be skipped according to stride. # Divide number of window positions by stride, rounding up. o = (num_window_positions - 1) // s + 1 ret.append(o) return tuple(ret) def _get_spatial_dimensions(attrs: PoolAttrs): """ Get the dimensions where the pooling operator is applied. :return: "HW" for 2D and "DHW" for 3D """ if len(attrs.pool_size) == 4: return "HW" else: assert len(attrs.pool_size) == 5, f"Expected length {5}, got {len(attrs.pool_size)}" return "DHW" def _pooling_op_type(pooling_attrs: PoolAttrs, in_type: ScalarType, out_type: ScalarType) -> NodeType: """ Construct the type of pooling operator. It is a one-input, one output node type where the input and output shapes are related by the pooling operation. :param pooling_attrs: Attributes of a pooling operator. The layout, out_layout, padding, pool_size, strides, dilation, and input_shape fields are used. :param in_type: The scalar type of the input tensor. :param out_type: The scalar type of the output tensor. :return: A pooling operator type. """ layout = pooling_attrs.layout out_layout = pooling_attrs.out_layout if pooling_attrs.out_layout != "" else layout spatial_dimensions = _get_spatial_dimensions(pooling_attrs) # Calculate output shape in HW dimensions p_input_shape = utils.transpose_attr_according_to_layout_strings(pooling_attrs.input_shape, layout, spatial_dimensions) p_pool_size = utils.transpose_attr_according_to_layout_strings(pooling_attrs.pool_size, layout, spatial_dimensions) p_padding = utils.transpose_attr_according_to_layout_strings(pooling_attrs.padding, layout, spatial_dimensions) p_strides = utils.transpose_attr_according_to_layout_strings(pooling_attrs.strides, layout, spatial_dimensions) p_dilation = utils.transpose_attr_according_to_layout_strings(pooling_attrs.dilation, layout, spatial_dimensions) p_output_shape = _pooling_output_shape(p_input_shape, p_pool_size, p_padding, p_strides, p_dilation) # Build the full output shape output_shape = utils.transpose_attr_according_to_layout_strings(pooling_attrs.input_shape, layout, out_layout) output_shape = utils.insert_according_to_layout_strings(output_shape, p_output_shape, out_layout, spatial_dimensions) # Create type input_type = TensorValue(TensorType(in_type, pooling_attrs.input_shape)) output_type = TensorValue(TensorType(out_type, output_shape)) return NodeType({'data': input_type}, output_type)
[docs] def make_quantized_pool_attrs(attrs: PoolAttrs, *, pad_value: int, input_int16: bool, requant: Optional[BaseRequantization] = None) -> PoolQuantAttrs: """ Construct a PoolQuantAttrs, using values from a PoolAttrs and additional values that were computed during quantization. """ return PoolQuantAttrs(pool_attrs=attrs, pad_value=pad_value, rounding_type=RoundType.TOEVEN, input_int16=input_int16, requant=requant)
def _conv_op_type(attrs: Union[ConvAddActivationAttrs, ConvQuantAttrs], input_scalar_type: ScalarType, output_scalar_type: ScalarType) -> NodeType: """ Construct the type of convolution operator. It is a one-input, one output node type where the input and output shapes are related by the convolution operation. :param attrs: Attributes of a ConvAddActivationOp. :param input_scalar_type: The scalar type of the input tensor. :param output_scalar_type: The scalar type of the input tensor. :return: A convolution operator type. """ input_type = TensorValue(TensorType(input_scalar_type, attrs.conv_attrs.input_shape)) output_type = TensorValue(TensorType(output_scalar_type, attrs.conv_attrs.output_shape)) return NodeType({'data': input_type}, output_type)
[docs] def get_output_shape(attrs: Union[SumAttrs, MeanAttrs, ProdAttrs, ExtmAttrs, ArgMaxAttrs]): """ Get the output shape for the dimension-reduction operators (SumOp, MeanOp, ProdOp, ExtmOp & ArgMaxOp) using attributes from their AwesomeAttributes class. :param attrs: AwesomeAttributes class :return: Output shape """ # Extracting attributes axis = attrs.axis exclude = bool(attrs.exclude) keepdims = bool(attrs.keepdims) shape = list(attrs.shape) if exclude: axis = utils.exclude_axes(len(shape), axis) if keepdims: for a in axis: shape[a] = 1 else: new_shape = [] for idx, el in enumerate(shape): if idx not in axis: new_shape.append(el) shape = new_shape return shape
[docs] def node_type_for_dimension_reduction_operators(attrs: Union[SumAttrs, MeanAttrs, ProdAttrs, ExtmAttrs, ArgMaxAttrs], input_dtype: Union[np.dtype, Type[np.number]], output_dtype: Union[np.dtype, Type[np.number]]): """ Get NodeType for the dimension-reduction opreators (SumOp, MeanOp, ProdOp, ExtmOp & ArgMaxOp) :param attrs: AwesomeAttributes class :param dtype: Data type :return: NodeType """ input_shape = attrs.shape output_shape = get_output_shape(attrs) data_type = output_dtype return NodeType({'data': TensorValue(TensorType(ScalarType.from_numpy(input_dtype), input_shape))}, TensorValue(TensorType(ScalarType.from_numpy(data_type), tuple(output_shape))))
[docs] def has_any_int8_input(quantizer_interface: OpQuantInterface, input_names: Sequence[InputName]) -> bool: """ Return True if any of the inputs identified by input_names was quantized with int8 precision. """ input_quant = quantizer_interface.get_input_quant() for name in input_names: if get_expected_tensor_value(input_quant[name]).type.scalar == ScalarType.int8: return True return False
def _rescale_int8_to_int32_quantization(qrtt: QuantResultTensorType, right_shift: Union[int, np.ndarray]) \ -> Tuple[QuantResultTensorType, Union[int, np.ndarray]]: """ Transform an operator's output quantization from int8 to int32 by adjusting the way the operator's output is shifted. Reduce the right shift so that its value (or minimum value, for per-channel) is 0. This preserves as much precision as possible from the 32-bit intermediate result while ensuring it uses per-tensor quantization. :param qrtt: Quantized type having int8 precision :param right_shift: Right shift performed at the end of the operator :return: New quantized type having int32 precision and new right shift. """ assert qrtt.type.scalar == ScalarType.int8 assert qrtt.quant is not None min_rs = int(np.amin(right_shift)) if isinstance(right_shift, np.ndarray) else right_shift scale_factor = 2**min_rs new_type = dataclasses.replace(qrtt.type, scalar=ScalarType.int32) new_quant = Quantization(qrtt.quant.scale * scale_factor, qrtt.quant.zero_point * scale_factor, bits=32, min_val=qrtt.quant.min_val, max_val=qrtt.quant.max_val) # Because a power-of-2 scale factor was used, it's suitable for ArithFoldedRequantization new_qrtt = QuantResultTensorType(new_type, new_quant, RequantMethod.arith_folded) right_shift = right_shift - min_rs return new_qrtt, right_shift def _ceildiv(a, b): """Helper function for ceil division.""" return -(a // -b)
[docs] def expand_indices_to_shape_length(begin: List[int], end: List[int], strides: List[int], axes: Optional[List[int]], input_shape: List[int]) -> Tuple[List[int], List[int], List[int]]: """ Helper function for expanding begin, end and strides to match the shape length. """ if axes is not None: new_begin = len(input_shape) * [0] new_end = input_shape new_strides = len(input_shape) * [1] for i, e in enumerate(axes): new_begin[e] = begin[i] new_end[e] = end[i] new_strides[e] = strides[i] begin = new_begin end = new_end strides = new_strides return begin, end, strides
[docs] def get_strided_slice_out_shape(attrs: StridedSliceAttrs) -> Tuple[int, ...]: """ Get StridedSliceOp output shape. :param attrs: StridedSlice attributes class. :return: Output shape. """ begin = list(attrs.begin) end = list(attrs.end) strides = list(attrs.strides) input_shape = list(attrs.input_shape) begin, end, strides = expand_indices_to_shape_length(begin=begin, end=end, strides=strides, axes=attrs.axes, input_shape=input_shape) output_shape = np.ones(len(input_shape), dtype=np.int32).tolist() # tf.strided_slice documentation if attrs.slice_mode == 'size': for idx, (b, e) in enumerate(zip(begin, end)): output_shape[idx] = e # b + e - b else: assert attrs.slice_mode == 'end', f"Expected 'end', got {attrs.slice_mode}" for idx, (b, e, s) in enumerate(zip(begin, end, strides)): output_shape[idx] = _ceildiv(e - b, s) return tuple(output_shape)
[docs] def get_squeeze_out_shape(axis: list[int], input_shape: tuple[int, ...]) -> tuple[int, ...]: """ Get SqueezeOp output shape. Args: axis: Set of axes to remove input_shape: Shape of input tensor Returns: Output shape. """ new_shape = [] for idx, el in enumerate(input_shape): if idx not in axis: new_shape.append(el) output_shape = tuple(new_shape) return output_shape
def _get_transpose_out_shape(attrs: TransposeAttrs) -> Tuple[int, ...]: """ Get TransposeOp output shape. :param attrs: Transpose attributes class. :return: Output shape. """ input_shape = list(attrs.input_shape) output_shape = [] for a in attrs.axes: output_shape.append(input_shape[a]) return tuple(output_shape) def _get_depth_to_space_out_shape(attrs: DepthToSpaceAttrs) -> Tuple[int, ...]: """Get DepthToSpaceOp output shape. Args: attrs: DepthToSpace attributes class. Returns: Output shape. """ output_shape = list(attrs.input_shape) output_shape[-1] = output_shape[-1] // (attrs.block_size * attrs.block_size) output_shape[-2] = output_shape[-2] * attrs.block_size output_shape[-3] = output_shape[-3] * attrs.block_size return tuple(output_shape)
[docs] def get_expand_dims_out_shape(attrs: ExpandDimsAttrs) -> Tuple[int, ...]: """ Get ExpandDimsOp output shape. :param attrs: ExpanDims attributes class. :return: Output shape. """ input_shape = list(attrs.input_shape) axis = attrs.axis num_newaxis = attrs.num_newaxis output_shape = input_shape[:] if axis == -1: output_shape.extend([1] * num_newaxis) elif axis < -1: axis = axis + 1 output_shape[axis:axis] = [1] * num_newaxis else: output_shape[axis:axis] = [1] * num_newaxis return tuple(output_shape)
def _get_split_out_shape(attrs: SplitAttrs) -> Tuple[Tuple[int, ...], ...]: """ Get SplitOp output shapes. :param attrs: Split attributes class. :return: Output shape. """ input_shape = list(attrs.input_shape) indices = attrs.indices_or_sections axis = attrs.axis output_shape = input_shape output_shapes = list() # If indices_or_sections is an integer, N, the array will be divided into N equal arrays along axis if isinstance(indices, int): assert input_shape[axis] % indices == 0, f"{input_shape[axis]} is not divisible by {indices}" output_shape[axis] = input_shape[axis] // indices for i in range(indices): output_shapes.append(tuple(output_shape)) else: # If indices_or_sections is a 1-D array of sorted integers, # the entries indicate where along axis the array is split assert isinstance(indices, Tuple) begin = 0 for val in indices: size = val - begin output_shape[axis] = size begin = val output_shapes.append(tuple(output_shape)) output_shape[axis] = attrs.input_shape[axis] - indices[-1] output_shapes.append(tuple(output_shape)) return tuple(output_shapes) def _get_out_shape_for_op_with_2_inputs(attrs: Union[MultiplyAttrs, MultiplyQuantAttrs, DivideAttrs, DivideQuantAttrs, AddAttrs, AddQuantAttrs, ConstantMultiplyAddAttrs, PowerAttrs, SubtractAttrs]) -> Tuple[int, ...]: """ Get output shape for operators that have 2 inputs with possible different shapes. Function is made to support TVM style broadcasting, with the fact that N2A backend currently does not support broadcasting. :param attrs: Attributes class. :return: Output shape. """ lhs_input_shape = list(attrs.lhs_input_shape) rhs_input_shape = list(attrs.rhs_input_shape) lhs_len = len(lhs_input_shape) rhs_len = len(rhs_input_shape) if lhs_len > rhs_len: rhs_input_shape = [1] * (lhs_len - rhs_len) + rhs_input_shape elif rhs_len > lhs_len: lhs_input_shape = [1] * (rhs_len - lhs_len) + lhs_input_shape output_shape = lhs_input_shape for i, (lhs, rhs) in enumerate(zip(lhs_input_shape, rhs_input_shape)): if lhs > rhs: assert rhs == 1, f"Incompatible shapes." output_shape[i] = lhs elif rhs > lhs: assert lhs == 1, f'Incompatible shapes.' output_shape[i] = rhs else: assert lhs == rhs, f'Incompatible shapes.' output_shape[i] = lhs return tuple(output_shape) def _get_output_pad_shape(attrs: PadAttrs) -> Tuple[int, ...]: """ Get output shape for PadOp. :param attrs: Pad attributes class. :return: Output shape. """ input_shape = list(attrs.input_shape) pad = list(attrs.pad_width) output_shape = input_shape for i, pad in enumerate(pad): for val in pad: output_shape[i] += val return tuple(output_shape) def _get_image_resize2d_out_shape(attrs: ImageResize2DAttrs) -> Tuple[int, ...]: """ Get output shape for ImageResize2DOp. :param attrs: ImageResize2D attributes class. :return: Output shape. """ input_shape = attrs.input_shape layout = attrs.layout output_shape_h, output_shape_w = attrs.size output_shape = utils.insert_according_to_layout_strings(input_shape, (output_shape_h, output_shape_w), layout, "HW") return output_shape def _get_concat_out_shape(attrs: ConcatenateAttrs) -> Tuple[int, ...]: """ Get output shape for ConcatenateOp. :param: Concatenate attributes class. :return: Output shape. """ input_types = attrs.input_types axis = attrs.axis input_shapes = [input_type.shape for input_type in input_types] out_shape = list(input_shapes[0]) concat_dim_size = np.sum([input_shape[axis] for input_shape in input_shapes]) out_shape[axis] = int(concat_dim_size) return tuple(out_shape) def _get_take_out_shape(attrs: TakeAttrs) -> Tuple[int, ...]: """ Get output shape for TakeOp. :param attrs: Take attributes class. :return: Output shape. """ output_shape = list(attrs.input_shape) indices_shape = list(attrs.indices_shape) axis = attrs.axis assert len(indices_shape) == 1 output_shape[axis] = indices_shape[0] return tuple(output_shape)
[docs] def get_pack_input_types(input_types: List[TensorType]) -> List[TensorType]: """ Get pack operator input types. If input tensor has 4D shape it will be reshaped to 2D MLA buffer shape. """ shapes = [] for input_type in input_types: shape = input_type.shape if len(shape) == 4: shape = op_fn.get_mla_padded_2d_shape( tensor_shape=shape, elem_size=np.dtype(input_type.scalar.numpy_type()).itemsize ) shapes.append(TensorType(scalar=input_type.scalar, shape=shape)) return shapes
[docs] def make_quantization_cast(provided_type: DataValue[QuantResultTensorType], wanted_type: DataValue[QuantResultTensorType]) \ -> QuantizationCast: """ Make a quantization cast for one value. :param provided_type: Type and quantization of the value :param wanted_type: Type and quantization that it should be cast to :return: Cast """ if isinstance(provided_type, TensorValue): assert isinstance(wanted_type, TensorValue) assert provided_type.value.type.shape == wanted_type.value.type.shape, \ "Tensor shape unexpectedly changed during quantization" shape = provided_type.value.type.shape provided_scalar_type = provided_type.value.type.scalar provided_quant = provided_type.value.quant provided_requant_method = provided_type.value.requant_method wanted_scalar_type = wanted_type.value.type.scalar wanted_quant = wanted_type.value.quant if provided_scalar_type == wanted_scalar_type: # Same type. No cast required. If they both have quantization, quantization must be equal. assert provided_quant is None or wanted_quant is None or provided_quant == wanted_quant return IdentityCast() elif scalar_is_floating(provided_scalar_type) and wanted_scalar_type in (ScalarType.int8, ScalarType.int16): # Insert a quantize node assert wanted_quant is not None num_bits = 8 if wanted_scalar_type == ScalarType.int8 else 16 return QuantCast(shape, wanted_quant.scale, wanted_quant.zero_point, num_bits, wanted_scalar_type) elif provided_scalar_type in (ScalarType.int8, ScalarType.int16, ScalarType.int32) \ and scalar_is_floating(wanted_scalar_type): # Insert a dequantize node assert provided_quant is not None return DequantCast(shape, provided_quant.scale, provided_quant.zero_point, input_dtype=provided_scalar_type.numpy_type(), output_dtype=wanted_scalar_type.numpy_type()) elif provided_scalar_type in (ScalarType.int8, ScalarType.int16, ScalarType.int32) \ and wanted_scalar_type in (ScalarType.int16, ScalarType.int8): # Insert a requantize node assert provided_quant is not None and wanted_quant is not None assert provided_requant_method is not None return RequantCast(shape, provided_quant.scale, provided_quant.zero_point, wanted_quant.scale, wanted_quant.zero_point, min_val=wanted_quant.min_val, max_val=wanted_quant.max_val, input_32_bit=provided_scalar_type == ScalarType.int32, output_16_bit=wanted_scalar_type == ScalarType.int16, requant_method=provided_requant_method) elif scalar_is_floating(provided_scalar_type) and scalar_is_floating(wanted_scalar_type): # Insert a convert node return ConvertCast(shape, provided_scalar_type, wanted_scalar_type) else: raise RuntimeError("Unable to insert a conversion between quantized representations") elif isinstance(provided_type, TupleValue): assert isinstance(wanted_type, TupleValue) assert len(provided_type.elements) == len(wanted_type.elements) element_casts = [make_quantization_cast(p, w) for p, w in zip(provided_type.elements, wanted_type.elements)] if all(isinstance(c, IdentityCast) for c in element_casts): return IdentityCast() return TupleCast(element_casts) else: raise TypeError("Unrecognized type")
[docs] def make_quantization_casts(provided_input_types: Dict[InputName, DataValue[QuantResultTensorType]], wanted_input_types: Dict[InputName, DataValue[QuantResultTensorType]]) \ -> InputsQuantCast: """ Create casts for a quantized node's input types by comparing the input data type with the type that the node requires. :param provided_input_types: Type and quantization of a node's inputs, after quantization :param wanted_input_types: Type and quantization that the quantized node requires :return: Casts for the node """ assert list(provided_input_types.keys()) == list(wanted_input_types.keys()) # Compare keys including ordering casts = dict() for name in provided_input_types.keys(): p = provided_input_types[name] w = wanted_input_types[name] casts[name] = make_quantization_cast(p, w) return InputsQuantCast(casts)
def _require_integer_precision(op_name: str, config: QuantizationConfigs) -> None: """ Require the quantization precision to be int8 or int16. Raise an exception otherwise. """ assert config.quantization_precision.get() in (QuantizationPrecision.INT_8, QuantizationPrecision.INT_16), \ f"Only integer quantization is supported for operator {op_name}"
[docs] class AwesomeOperation(Generic[AWESOME_ATTRS, QUANT_ATTRS]): """ An abstract class Stores a list of input key names expected to be passed in by the AwesomeNode for developer reference. input_list: ClassVar[Optional[List[InputName]]]. Used as reference when getting inputs from a dictionary. If input_list is None, AFE will skip validating input_list at runtime intermediate_names: ClassVar[List[str]]. Used for creation of intermediate observers. If the list is empty list, empty dict for intermediate observers will be created. """
[docs] input_list: ClassVar[Optional[List[InputName]]] = []
[docs] intermediate_names: ClassVar[List[str]] = []
@classmethod
[docs] def get_type(cls, attrs: Union[AWESOME_ATTRS, QUANT_ATTRS]) -> NodeType: """ Get the type of this node given its attributes. The parameter should be a QUANT_ATTRS if that data has been created, or an AWESOME_ATTRIBUTES otherwise. :param attrs: Attributes associated with the operator. It is an AWESOME_ATTRIBUTES if quantization has not transformed the node, or a QUANT_ATTRS if it has. :return: The node's type. """ raise NotImplementedError(f"{cls.__name__} does not have get_type function implemented")
@classmethod
[docs] def run(cls, attrs: AWESOME_ATTRS, input_dict: Dict[InputName, Any], config: RunConfigs) -> Any: """ Executes the operation in floating point :param attrs: AwesomeAttributes associated with this operation :param input_dict: Dictionary of names (eg. 'weights' 'data') to numpy arrays :param config: Configuration parameters for how to run the network :return: Output tensor(s) whose type is dependent on the subclass. """ raise NotImplementedError(f"{cls.__name__} does not have run function implemented")
@classmethod
[docs] def run_quant(cls, quant_attrs: QUANT_ATTRS, input_dict: Dict[InputName, Any], config: RunConfigs) -> Any: """ Execute the operation using quantized arithmetic. :param quant_attrs: Parameters that define the quantized operation :param input_dict: Dictionary of names (eg. 'weights' 'data') to numpy arrays :param config: Configuration parameters for how to run the network :return: Output tensor(s) whose type is dependent on the subclass. """ raise NotImplementedError(f"{cls.__name__} does not have run_quant function implemented")
@classmethod
[docs] def calibrate(cls, attrs: AWESOME_ATTRS, calib_attrs: AwesomeCalibAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> Any: """ The default calibration method. Executes the operation in floating point. Update the observer if the operation is associated with one. Otherwise, the operation's quantization parameters will be calculated based on it's input's quantization parameters. Update the min/max values using the outputs and use the updated min/max to compute the scales and zero points. :param attrs: AwesomeAttributes associated with this operation :param calib_attrs: AwesomeCalibAttrs associated with operation's node. :param input_dict: Dictionary of names (eg. 'weights' 'data') to numpy arrays :return: Output tensor(s) whose type is dependent on the subclass. """ outputs = cls.run(attrs, input_dict, config) if calib_attrs.observer is not None: # Observers only monitor single output nodes assert isinstance(outputs, np.ndarray) calib_attrs.observer.update(outputs.astype(np.float32, copy=False)) return outputs
@classmethod
[docs] def update_input_quant(cls, calib_attrs: AwesomeCalibAttrs, input_dict: Mapping[InputName, Optional[DataValue[QuantResultTensorType]]]): """ Record quantization scales of the input tensors. :param calib_attrs: Calibration results holding dynamic ranges. It will be updated with quantization parameters of the node's inputs. :param input_dict: Quantization parameters of the node's inputs. """ # This method does not work for PlaceholderOp because input_dict is initialized in a special way for that type. # PlaceholderOp overrides this method to make it work. assert not isinstance(cls, PlaceholderOp) # This assertion fails for PlaceholderOp (see above) assert all(isinstance(x, DataValue) for x in input_dict.values()) calib_attrs.input_quant = dict(input_dict)
@classmethod
[docs] def get_observed_distribution(cls, calib_attrs: AwesomeCalibAttrs, inputs: Dict[InputName, QuantizationTensorData]) \ -> Tuple[Optional[ObservedDistribution], Dict[str, ObservedDistribution]]: """ Get observed distribution and intermediate observed distributions. If a node doesn't have observer, values from previous node are used. ExternalOp, TupleOp, TupleGetItemOp, LayoutTransformOp, ReshapeOp don't use observed distribution and those values won't be passed to any other MLA node, so observed distribution for those are set to None. :param calib_attrs: Calibration attributes. :param inputs: Properties of the inputs. It has quantization scales of the input tensors and attributes of the nodes that calculate the inputs. :return: Tuple of observed distribution and dictionary of intermediate observed distributions. """ if calib_attrs.observer is not None: intermediate_distributions = dict() if calib_attrs.intermediate_observers is None else \ {k: ObservedDistribution(v) for k, v in calib_attrs.intermediate_observers.items()} return ObservedDistribution(calib_attrs.observer), intermediate_distributions else: assert cls.input_list is not None assert len(cls.input_list) == 1 assert cls.input_list[0] in inputs distribution = inputs[cls.input_list[0]][1] assert distribution is not None return distribution, {}
@classmethod
[docs] def quantize(cls, attrs: AWESOME_ATTRS, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> QUANT_ATTRS: """ Compute quantized operator attributes, input quantization, and output quantization from floating-point operator attributes and the result of calibration. When this function is called, calib_attrs.input_quant has the types and quantization of the input values (after the inputs have been transformed by quantization), and calib_attrs.quant holds a type and quantization of the output, which this function may overwrite. The output quantization is computed based on calibration. The output type should not be used. This function must assign to calib_attrs.quant the output type and quantization that this operator has after quantization. It may use the default quantization if appropriate. This function may modify attrs. It should modify attrs if the same attribute class is used for both the floating-point and the quantized operator, which would mean that it's designed to store any quantization information in attrs. This function may modify calib_attrs.input_quant to direct quantization to supply different inputs to this operator. The quantization algorithm will insert quantize or dequantize nodes so that the inputs have the type and quantization that were assigned. An exception will be raised if the input can't be provided by inserting a quantize or dequantize node or leaving the input unchanged. The quantized operator attributes are returned. :param attrs: Floating-point operator attributes. :param calib_attrs: Calibration results. :param config: Parameters controlling how to quantize. :param error_reporter: Node reporter of the node to be quantized. :return: Quantized operator attributes """ raise NotImplementedError(f"{cls.__name__} does not have quantize function implemented")
@classmethod
[docs] def type_check(cls, value: Any, expected_type: Type[T]) -> T: """ Each op expects a more specific type of inputs / AwesomeAttributes so this function helps with type checking :param value: AwesomeAttributes :param expected_type: a type """ assert isinstance(value, expected_type), "Error: Op ({}) expects ({}). Got ({})".format(cls.__name__, expected_type, type(value)) return cast(T, value)
def _cast_to_quant_tensor_new(data: np.ndarray) -> np.ndarray: """ Temporary cast to int8, until all operators return int8 in run_quant functions. Should be used only on operator inputs. To be removed once all operators' run_quant methods are returning int8 data. :param data: data :return: data with int8 values """ return data.astype(QuantizedTensorNew) def _get_input_precision(quantizer_interface: OpQuantInterface, quantization_precision: QuantizationPrecision, input_name: InputName) -> ScalarType: """ Get input precision of a node. If input_type is int8, input_precision is set to ScalarType.int8 and no changes are needed. If input_type is float32, bfloat16, int16, or int32, input_precision should be set according to the value set in quantization_precision argument. Fix_input function is called later to fix input type to chosen input precision. """ input_quant = quantizer_interface.get_input_quant() match get_expected_tensor_value(input_quant[input_name]).type.scalar: case ScalarType.int8: input_precision = ScalarType.int8 case ScalarType.int16 | ScalarType.int32 | ScalarType.bfloat16 | ScalarType.float32: input_precision = quantization_precision.to_scalar_type() case _: raise ValueError("Unrecognized precision") return input_precision ########################### # PLACEHOLDER and CONSTANT ###########################
[docs] class PlaceholderOp(AwesomeOperation[PlaceholderAttrs, PlaceholderQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] placeholder_fn: Callable[[np.ndarray], np.ndarray] = op_fn.placeholder
[docs] quant_fn: Callable[[np.ndarray, float, int, int], np.ndarray] = quant_utils.linear_quantize
@classmethod
[docs] def get_type(cls, attrs: Union[PlaceholderAttrs, PlaceholderQuantAttrs]) -> NodeType: return NodeType({}, TensorValue(attrs.type))
@classmethod @type_check_operation_arguments(types=[PlaceholderAttrs, (np.ndarray, tuple)], dict_mask=[False, True])
[docs] def run(cls, attrs: PlaceholderAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> np.ndarray: data = input_dict[InputName('data')] return cls.placeholder_fn(data.astype(attrs.type.scalar.numpy_type(), copy=False))
@classmethod
[docs] def update_input_quant(cls, calib_attrs: AwesomeCalibAttrs, input_dict: Mapping[InputName, Optional[DataValue[QuantResultTensorType]]]): """ Record quantization scales of the input tensors. :param calib_attrs: Calibration results holding dynamic ranges. It will be updated with quantization scales of the node's inputs. :param input_dict: Quantization scales of the node's inputs. """ # This op has no inputs, but the system provides an input dict containing key "data". Ignore the key. calib_attrs.input_quant = {}
@classmethod
[docs] def quantize(cls, attrs: PlaceholderAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) \ -> PlaceholderQuantAttrs: # Use the quantization that was passed to this placeholder quant = quantizer_interface.get_placeholder_quant() assert quant is not None quantizer_interface.set_chosen_output_quant(quant) # Input type does not change during quantization new_type = attrs.type return PlaceholderQuantAttrs(new_type, get_expected_tensor_value(quant).quant)
@classmethod
[docs] def run_quant(cls, quant_attrs: PlaceholderQuantAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> np.ndarray: data = input_dict[InputName('data')] # Coerce to the correct type. This should be unnecessary but some code passes the wrong # input type. data = data.astype(quant_attrs.type.scalar.numpy_type(), copy=False) return cls.placeholder_fn(data)
[docs] class ConstantOp(AwesomeOperation[ConstantAttrs, ConstantQuantAttrs]):
[docs] constant_fn: Callable[[np.ndarray], np.ndarray] = op_fn.constant
@classmethod
[docs] def get_type(cls, attrs: Union[ConstantAttrs, ConstantQuantAttrs]) -> NodeType: if isinstance(attrs, ConstantAttrs): data = attrs.data else: data = attrs.quant_data return NodeType({}, TensorValue(TensorType(ScalarType.from_numpy(data.dtype), data.shape)))
@classmethod @type_check_operation_arguments(types=[ConstantAttrs], dict_mask=[False])
[docs] def run(cls, attrs: ConstantAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> np.ndarray: if attrs is None: raise ValueError("Error: Floating point constants missing. This can happen if you have already quantized" "but are running the network in a floating point mode") return cls.constant_fn(attrs.data)
@classmethod @type_check_operation_arguments(types=[ConstantAttrs, AwesomeCalibAttrs], dict_mask=[False, False])
[docs] def calibrate(cls, attrs: ConstantAttrs, calib_attrs: AwesomeCalibAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> np.ndarray: tensor_value = cls.run(attrs, input_dict, config) # Consider doing update just once assert calib_attrs.observer is not None assert isinstance(tensor_value, np.ndarray) calib_attrs.observer.update(tensor_value) return tensor_value
@classmethod @type_check_operation_arguments(types=[ConstantAttrs, OpQuantInterface, QuantizationConfigs, NodeReporter], dict_mask=[False, False, False, False])
[docs] def quantize(cls, attrs: ConstantAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) \ -> Union[ConstantAttrs, ConstantQuantAttrs]: # TODO(Joey): Check if we can quantize the constant to int32. For now we quantize the constant to int8 # If the constant is weight or bias, the quant_data will be overwritten while quantizing the Convoluton/Dense. const_data = attrs.data if const_data.dtype in (np.int8, np.int16, np.int32, np.int64): # Do not quantize integer data ty = TensorType(ScalarType.from_numpy(const_data.dtype), const_data.shape) quant = QuantResultTensorType.from_type(ty) quantizer_interface.set_chosen_output_quant(TensorValue(quant)) quant_data = const_data return ConstantQuantAttrs(quant_data) else: assert const_data.dtype == np.float32 quantized_type = config.quantization_precision.get().to_scalar_type() quant = fix_output(quantizer_interface, quantized_type, const_data.shape, config.asymmetry.get()) if scalar_is_integral(quantized_type): quant_data = quant_utils.linear_quantize_with_quantization(const_data, quant.quant).\ astype(quantized_type.numpy_type()) return ConstantQuantAttrs(quant_data) else: return dataclasses.replace(attrs, data=const_data.astype(np.dtype(bfloat16)))
@classmethod @type_check_operation_arguments(types=[ConstantQuantAttrs, tuple], dict_mask=[False, True])
[docs] def run_quant(cls, quant_attrs: ConstantQuantAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> np.ndarray: if quant_attrs is None: raise ValueError("Error: quantized constants missing. This can happen if you have not quantized" "the network but are running the network in a quantized mode") # TODO(Joey): Try to remove the copy return cls.constant_fn(np.copy(quant_attrs.quant_data))
######################### # COMPUTATION OPERATIONS ######################### class _MaxPoolOp(AwesomeOperation[MaxPoolAttrs, PoolQuantAttrs]): maxpool_fn: Callable[[MaxPoolAttrs, np.ndarray, Union[float, int]], np.ndarray] = op_fn.maxpool @classmethod def get_type(cls, attrs: Union[MaxPoolAttrs, PoolQuantAttrs]) -> NodeType: if isinstance(attrs, MaxPoolAttrs): in_type = out_type = attrs.scalar_type else: in_type = ScalarType.int16 if attrs.input_int16 else ScalarType.int8 out_type = ScalarType.from_numpy(attrs.requant.out_dtype) if attrs.requant is not None else ScalarType.int8 attrs = attrs.pool_attrs node_type = _pooling_op_type(attrs, in_type, out_type) return node_type @classmethod @type_check_operation_arguments(types=[MaxPoolAttrs, np.ndarray], dict_mask=[False, True]) def run(cls, attrs: MaxPoolAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: data = input_dict[InputName('data')] if not config.fast_mode or data.dtype == np.int16: run_mode = RunMode.MLA_MODE else: run_mode = RunMode.FAST_MODE return cls.maxpool_fn(attrs, data, pad_value=-float('inf'), mode=run_mode) @classmethod @type_check_operation_arguments(types=[MaxPoolAttrs, OpQuantInterface, QuantizationConfigs, NodeReporter], dict_mask=[False, False, False, False]) def quantize(cls, attrs: MaxPoolAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> PoolQuantAttrs: input_qrtt = quantizer_interface.get_input_quant()[InputName('data')] input_precision = get_expected_tensor_value(input_qrtt).type.scalar # Choose what numeric type to use at the input and output. if input_precision == ScalarType.float32: # Quantize the input to the selected precision compute_type = config.quantization_precision.get().to_scalar_type() elif config.quantization_precision.get() == QuantizationPrecision.INT_8: # Use int8 precision. Requantize if necessary. compute_type = ScalarType.int8 else: # Use higher precision. Ignore the selected precision and use the # input data type instead, since that affords the best accuracy. compute_type = input_precision input_quant = fix_input(quantizer_interface, compute_type, InputName('data'), config.asymmetry.get()) output_shape = get_expected_tensor_value(cls.get_type(attrs).output).shape match compute_type: case ScalarType.int16: # When int16 is enabled, output has the same quantization as the input, except that the data type # is changed. output_bits = 32 if config.intermediate_int32.get() else 16 output_quant = dataclasses.replace(input_quant.quant, bits=output_bits) output_type = ScalarType.int32 if config.intermediate_int32.get() else ScalarType.int16 requant = requantization.get_id_requantization(output_type.numpy_type()) requant_method = RequantMethod.arith_folded case ScalarType.int8: # Input and output use int8 output_type = ScalarType.int8 output_quant = input_quant.quant requant = requantization.get_id_requantization(np.int8) requant_method = RequantMethod.arith_folded case ScalarType.bfloat16: output_type = ScalarType.bfloat16 output_quant = None requant = requantization.get_id_requantization(bfloat16) requant_method = None case _: raise ValueError("Unexpected QuantizationPrecision") output_quant = QuantResultTensorType(TensorType(output_type, output_shape), output_quant, requant_method) quantizer_interface.set_chosen_output_quant(TensorValue(output_quant)) # Put the results into new operator attributes quant_attrs: Union[PoolAttrs, PoolQuantAttrs] if output_quant.quant is None: quant_attrs = dataclasses.replace(attrs, scalar_type=compute_type) else: pad_value_type = compute_type.numpy_type() pad_value = ml_kernels.math_helpers.get_dtype_min(pad_value_type) quant_attrs = make_quantized_pool_attrs(attrs, pad_value=pad_value, input_int16=compute_type == ScalarType.int16, requant=requant) return quant_attrs @classmethod @type_check_operation_arguments(types=[PoolQuantAttrs, np.ndarray], dict_mask=[False, True]) def run_quant(cls, quant_attrs: PoolQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: data = input_dict[InputName('data')] if not config.fast_mode or data.dtype == np.int16: run_mode = RunMode.MLA_MODE else: run_mode = RunMode.FAST_MODE out = cls.maxpool_fn(quant_attrs.pool_attrs, data, pad_value=quant_attrs.pad_value, mode=run_mode) if quant_attrs.requant is not None: return requantization.requantize(out, quant_attrs.requant) else: return out
[docs] class MaxPool2DOp(_MaxPoolOp):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] class MaxPool3DOp(_MaxPoolOp):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
class _AvgPoolOp(AwesomeOperation[AVGPOOL_TYPES, PoolQuantAttrs], Generic[AVGPOOL_TYPES]): avgpool_fn: Callable[[AVGPOOL_TYPES, np.ndarray, Union[float, int]], np.ndarray] = op_fn.avgpool @classmethod def get_type(cls, attrs: Union[AvgPoolAttrs, PoolQuantAttrs]) -> NodeType: if isinstance(attrs, AvgPoolAttrs): in_type = out_type = attrs.scalar_type else: in_type = ScalarType.int16 if attrs.input_int16 else ScalarType.int8 out_type = ScalarType.from_numpy(attrs.requant.out_dtype) if attrs.requant is not None else ScalarType.int8 attrs = attrs.pool_attrs node_type = _pooling_op_type(attrs, in_type, out_type) return node_type @classmethod @type_check_operation_arguments(types=[AVGPOOL_CLASSES, np.ndarray], dict_mask=[False, True]) def run(cls, attrs: AVGPOOL_TYPES, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: data = input_dict[InputName('data')] if not config.fast_mode or data.dtype == np.int16: run_mode = RunMode.MLA_MODE else: run_mode = RunMode.FAST_MODE return cls.avgpool_fn(attrs, data, pad_value=0, mode=run_mode) @classmethod @type_check_operation_arguments(types=[AVGPOOL_CLASSES, OpQuantInterface, QuantizationConfigs, NodeReporter], dict_mask=[False, False, False, False]) def quantize(cls, attrs: AVGPOOL_TYPES, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> PoolQuantAttrs: compute_type = _get_input_precision(quantizer_interface, config.quantization_precision.get(), InputName('data')) pool_size = utils.transpose_attr_according_to_layout_strings( attrs.pool_size, attrs.layout,"NDHWC" if len(attrs.layout) == 5 else "NHWC") divisor = math.prod(pool_size[1:-1]) if compute_type == ScalarType.int16 and divisor > 1024: # Cannot quantize int16 with a large divisor because too many bits are needed for summation and # requant scale factor. Switch to int8. sima_logger.sima_log_warning( f"AvgPool precision was reduced to int8 due to large pooling size ({divisor})" ) compute_type = ScalarType.int8 input_quant = fix_input(quantizer_interface, compute_type, InputName('data'), config.asymmetry.get()) output_shape = get_expected_tensor_value(cls.get_type(attrs).output).shape match compute_type: case ScalarType.int16 if config.intermediate_int32.get(): # Perform integer summation without division. # Incorporate the omitted divisor into the output's quantization scale. output_quant = Quantization(scale=input_quant.quant.scale * divisor, zero_point=input_quant.quant.zero_point * divisor, bits=32, min_val=input_quant.quant.min_val, max_val=input_quant.quant.max_val) output_scalar_type = ScalarType.int32 requant = requantization.id_requantization # Use scaled_fz because production compiler does not do zero point correction in this operator requant_method = RequantMethod.scaled_fz case ScalarType.int8 | ScalarType.int16: # Perform integer summation and quantized division. # Quantize (1/sum_factor) = (scale_multiplier * 2**-scale_sh). # Choose scale_multiplier small enough so that Sum(input) * scale_multiplier does not overflow. input_bits = np.iinfo(compute_type.numpy_type()).bits scale_sh = 31 - input_bits scale_multiplier = round((1 << scale_sh) / divisor) # Check if the scale_multiplier has at least 5 fractional bits pool_bits = int(np.floor(np.log2(divisor))) + 1 if (scale_sh - pool_bits) < 5: sima_logger.sima_log_warning( f"AvgPool quantized divisor has {scale_sh - pool_bits} fractional bits (less than 5), " f"pool size {divisor}, quantization precision is {compute_type}" ) # Output quantization is same as the input output_scalar_type = compute_type output_quant = input_quant.quant requant = requantization.TFLiteRequantization(sc_correction=scale_multiplier, zp_correction=0, shift=scale_sh, rounding=RoundType.TOEVEN, out_dtype=output_scalar_type.numpy_type()) requant_method = RequantMethod.scaled_fz case ScalarType.bfloat16: # Do not quantize. output_scalar_type = compute_type output_quant = None requant = None requant_method = None case _: raise ValueError("Unexpected type") output_type = TensorType(output_scalar_type, output_shape) output_qrtt = QuantResultTensorType(output_type, output_quant, requant_method) quantizer_interface.set_chosen_output_quant(TensorValue(output_qrtt)) # Create quantization parameters quant_attrs: Union[PoolAttrs, PoolQuantAttrs] if output_quant is None: quant_attrs = dataclasses.replace(attrs, scalar_type=compute_type) else: pad_value = input_quant.quant.zero_point quant_attrs = make_quantized_pool_attrs(attrs, pad_value=pad_value, input_int16=(compute_type == ScalarType.int16), requant=requant) return quant_attrs @classmethod @type_check_operation_arguments(types=[PoolQuantAttrs, np.ndarray], dict_mask=[False, True]) def run_quant(cls, quant_attrs: PoolQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: data = input_dict[InputName('data')] if not config.fast_mode or data.dtype == np.int16: run_mode = RunMode.MLA_MODE else: run_mode = RunMode.FAST_MODE out = cls.avgpool_fn(quant_attrs.pool_attrs, data, requant=quant_attrs.requant, pad_value=quant_attrs.pad_value, quantized=True, rounding_type=quant_attrs.rounding_type, mode=run_mode) return out
[docs] class AvgPool2DOp(_AvgPoolOp[AvgPoolAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] class AvgPool3DOp(_AvgPoolOp[AvgPoolAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] class AdaptiveAvgPool2DOp(_AvgPoolOp[AdaptiveAvgPool2DAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] avgpool_fn: Callable[[AdaptiveAvgPool2DAttrs, np.ndarray], np.ndarray] = op_fn.adaptive_avgpool2d
[docs] class VarianceOp(AwesomeOperation[VarianceAttrs, VarianceQuantAttrs]):
[docs] input_list: ClassVar[list[InputName]] = [InputName('data'), InputName('mean')]
[docs] var_fn = op_fn.variance
@classmethod
[docs] def get_type(cls, attrs: VarianceAttrs | VarianceQuantAttrs) -> NodeType: if isinstance(attrs, VarianceAttrs): in_type = out_type = attrs.scalar_type else: in_type = out_type = ScalarType.int8 attrs = attrs.attrs lhs_shape = attrs.input_data_shape rhs_shape = attrs.mean_shape out_shape = attrs.mean_shape return NodeType({cls.input_list[0]: TensorValue(TensorType(in_type, lhs_shape)), cls.input_list[1]: TensorValue(TensorType(in_type, rhs_shape))}, TensorValue(TensorType(out_type, out_shape)))
@classmethod
[docs] def run(cls, attrs: VarianceAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: input_data = input_dict[InputName('data')] mean = input_dict[InputName('mean')] return cls.var_fn(input_data, mean)
@classmethod
[docs] def quantize(cls, attrs: VarianceAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> VarianceAttrs | VarianceQuantAttrs: quant_precision = config.quantization_precision.get().to_scalar_type() compute_type = ScalarType.int8 if scalar_is_integral(quant_precision) else quant_precision input_data_shape = attrs.input_data_shape divisor = math.prod(input_data_shape[1:-1]) if compute_type == ScalarType.int16 and divisor > 1024: # Cannot quantize int16 with a large divisor because too many bits are needed for summation and # requant scale factor. Switch to int8. sima_logger.sima_log_warning( f"VarianceOp precision was reduced to int8 due to large pooling size ({divisor})" ) compute_type = ScalarType.int8 data_quant = fix_input(quantizer_interface, compute_type, InputName('data'), config.asymmetry.get()) mean_quant = fix_input(quantizer_interface, compute_type, InputName('mean'), config.asymmetry.get()) output_quant = fix_output(quantizer_interface, compute_type, attrs.mean_shape, config.asymmetry.get()) if scalar_is_integral(compute_type): # Perform integer summation and quantized division. # Quantize (1/sum_factor) = (scale_multiplier * 2**-scale_sh). # Choose scale_multiplier small enough so that Sum(input) * scale_multiplier does not overflow. # Number of input bits will be 18 because of the multiplication (data - mean) * (data - mean) input_bits = (np.iinfo(compute_type.numpy_type()).bits + 1) * 2 scale_sh = 31 - input_bits scale_multiplier = round((1 << scale_sh) / divisor) if scale_multiplier == 0: raise sima_logger.UserFacingException(f"Cannot compile a pooling operator with pool size {divisor}" f" and quantization precision {compute_type}." f" Please recompile with higher precision to handle this size.") # Check if the scale_multiplier has at least 5 fractional bits pool_bits = int(np.floor(np.log2(divisor))) + 1 if (scale_sh - pool_bits) < 5: sima_logger.sima_log_warning( f"VarianceOp quantized divisor has {scale_sh - pool_bits} fractional bits (less than 5), " f"pool size {divisor}, quantization precision is {compute_type}" ) requant = requantization.TFLiteRequantization(sc_correction=scale_multiplier, zp_correction=0, shift=scale_sh, rounding=RoundType.TOEVEN, out_dtype=np.int32) # Get output requantization product_scale = data_quant.quant.scale * mean_quant.quant.scale product_quant = Quantization(scale=product_scale, bits=32) sc_corr, zp_corr, shift = quant_utils.requantization_tflite(product_quant, output_quant.quant) requant_var = TFLiteRequantization(sc_correction=sc_corr, zp_correction=zp_corr, shift=shift, rounding=RoundType.TOEVEN, out_dtype=np.int8) return VarianceQuantAttrs(attrs, requant=requant, requant_var=requant_var) else: return dataclasses.replace(attrs, scalar_type=ScalarType.bfloat16)
@classmethod
[docs] def run_quant(cls, quant_attrs: QUANT_ATTRS, input_dict: Dict[InputName, Any], config: RunConfigs) -> Any: input_data = input_dict[InputName('data')] mean = input_dict[InputName('mean')] return cls.var_fn(input_data, mean, quant_attrs.requant, quant_attrs.requant_var)
[docs] class MultiplyOp(AwesomeOperation[MultiplyAttrs, MultiplyQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('lhs'), InputName('rhs')]
[docs] multiply_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] = op_fn.multiply
[docs] requantize_fn: Callable[[np.ndarray, int, Union[int, np.ndarray], int, bool, str], np.ndarray] = quant_utils.requantize
@classmethod
[docs] def get_type(cls, attrs: Union[MultiplyAttrs, MultiplyQuantAttrs]) -> NodeType: if isinstance(attrs, MultiplyAttrs): in_type = out_type = attrs.scalar_type else: in_type = ScalarType.int16 if attrs.input_int16 else ScalarType.int8 out_type = ScalarType.from_numpy(attrs.requant.out_dtype) assert out_type in (ScalarType.int8, ScalarType.int16, ScalarType.int32) lhs_input_shape = attrs.lhs_input_shape rhs_input_shape = attrs.rhs_input_shape output_shape = _get_out_shape_for_op_with_2_inputs(attrs) return NodeType({cls.input_list[0]: TensorValue(TensorType(in_type, lhs_input_shape)), cls.input_list[1]: TensorValue(TensorType(in_type, rhs_input_shape))}, TensorValue(TensorType(out_type, output_shape)))
@classmethod @type_check_operation_arguments(types=[MultiplyAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: AwesomeAttributes, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.multiply_fn(input_dict[InputName('lhs')], input_dict[InputName('rhs')])
@classmethod
[docs] def quantize(cls, attrs: MultiplyAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) \ -> Union[MultiplyAttrs, MultiplyQuantAttrs]: # Preferred output precision. When 16-bit is enabled, quantization may choose int32 instead of this. compute_precision = config.quantization_precision.get().to_scalar_type() lhs_quant = fix_input(quantizer_interface, compute_precision, InputName('lhs'), config.asymmetry.get()) rhs_quant = fix_input(quantizer_interface, compute_precision, InputName('rhs'), config.asymmetry.get()) output_shape = _get_out_shape_for_op_with_2_inputs(attrs) quantization = quantize_output(quantizer_interface, compute_precision, output_shape, config.asymmetry.get()) if scalar_is_integral(compute_precision): allow_full_output_precision = compute_precision == ScalarType.int16 and config.intermediate_int32.get() intrinsic_shift, requant, new_output_quant = \ quant_utils.quantize_multiply(lhs_quant.quant, rhs_quant.quant, quantization.quant, allow_full_output_precision) # Determine attributes and quantization using the result of quantize_multiply out_scalar_type = ScalarType.from_numpy(requant.out_dtype) requant_method = RequantMethod.fractional_zero else: # Floating-point multiply requant = None requant_method = None out_scalar_type = compute_precision new_output_quant = None output_type = TensorType(out_scalar_type, output_shape) quantization = QuantResultTensorType(output_type, new_output_quant, requant_method) quantizer_interface.set_chosen_output_quant(TensorValue(quantization)) if new_output_quant is None: return dataclasses.replace(attrs, scalar_type=compute_precision) else: return MultiplyQuantAttrs(attrs.lhs_input_shape, attrs.rhs_input_shape, compute_precision == ScalarType.int16, intrinsic_shift, requant, lhs_quant.quant.zero_point, rhs_quant.quant.zero_point, quantization.quant.bits)
@classmethod
[docs] def run_quant(cls, quant_attrs: MultiplyQuantAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> np.ndarray: in1_data = input_dict[InputName('lhs')] in2_data = input_dict[InputName('rhs')] return op_fn.quantized_multiply(quant_attrs, in1_data, in2_data)
[docs] class PadOp(AwesomeOperation[PadAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data'), InputName('pad_val')]
[docs] pad_fn: Callable[[PadAttrs, np.ndarray, np.ndarray], np.ndarray] = op_fn.pad
@classmethod
[docs] def get_type(cls, attrs: Union[PadAttrs, AwesomeQuantAttrBase]) -> NodeType: data_type = ScalarType.float32 if isinstance(attrs, PadAttrs) else ScalarType.int8 out_shape = _get_output_pad_shape(attrs) return NodeType({cls.input_list[0]: TensorValue(TensorType(data_type, attrs.input_shape)), cls.input_list[1]: TensorValue(TensorType(data_type, (1,)))}, TensorValue(TensorType(data_type, out_shape)))
@classmethod @type_check_operation_arguments(types=[PadAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: PadAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: pad_value = input_dict[InputName('pad_val')] return cls.pad_fn(attrs, input_dict[InputName('data')], pad_value)
[docs] class MeanOp(AwesomeOperation[MeanAttrs, MeanQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] mean_fn: Callable[[MeanAttrs, np.ndarray], np.ndarray] = op_fn.mean
@classmethod
[docs] def get_type(cls, attrs: Union[MeanAttrs, MeanQuantAttrs]) -> NodeType: if isinstance(attrs, MeanAttrs): node_type = node_type_for_dimension_reduction_operators(attrs, np.float32, np.float32) else: node_type = node_type_for_dimension_reduction_operators(attrs.attrs, np.int8, np.int8) return node_type
@classmethod @type_check_operation_arguments(types=[MeanAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: MeanAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.mean_fn(attrs, input_dict[InputName('data')])
@classmethod @type_check_operation_arguments(types=[MeanAttrs, OpQuantInterface, QuantizationConfigs, NodeReporter], dict_mask=[False, False, False, False])
[docs] def quantize(cls, attrs: MeanAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> MeanQuantAttrs: input_quantization = fix_input_to_int8(quantizer_interface, InputName('data'), config.asymmetry.get()) assert input_quantization.quant is not None # Mean's output has the same quantization as its input fix_output_from_input(quantizer_interface, tuple(get_output_shape(attrs))) node_scales = input_quantization.quant.scale node_zps = input_quantization.quant.zero_point quant_attrs = MeanQuantAttrs(attrs=attrs, node_scales=node_scales, node_zps=node_zps) return quant_attrs
@classmethod @type_check_operation_arguments(types=[MeanQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run_quant(cls, quant_attrs: MeanQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: data = _cast_to_quant_tensor_new( input_dict[InputName('data')]) return np.round(cls.mean_fn(quant_attrs.attrs, data, quantized=True))
[docs] class ArgMaxOp(AwesomeOperation[ArgMaxAttrs, ArgMaxQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] argmax_fn: Callable[[ArgMaxAttrs, np.ndarray], np.ndarray] = op_fn.argmax
@classmethod
[docs] def get_type(cls, attrs: Union[ArgMaxAttrs, ArgMaxQuantAttrs]) -> NodeType: assert isinstance(attrs, (ArgMaxAttrs, ArgMaxQuantAttrs)) if isinstance(attrs, ArgMaxQuantAttrs): attrs = attrs.attrs return node_type_for_dimension_reduction_operators(attrs, attrs.input_scalar_type.numpy_type(), attrs.result_scalar_type.numpy_type())
@classmethod
[docs] def quantize(cls, attrs: ArgMaxAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> ArgMaxQuantAttrs: """ Quantize argmax. The quantized operator takes int8 or bfloat16 values and returns int32 values. The int32 values represent an array index, not real numbers, so they do not have quantization scale. No quantization info is saved in attrs, as argmax's computation is oblivious to quantization. """ input_qrtt = quantizer_interface.get_input_quant()[InputName('data')] input_precision = get_expected_tensor_value(input_qrtt).type.scalar quantization_precision = config.quantization_precision.get() # Choose input type. Use the given input type if it is supported. # Otherwise choose the type that best matches quantization_precision. match input_precision: case ScalarType.int8: input_type = ScalarType.int8 case ScalarType.bfloat16: input_type = ScalarType.bfloat16 case ScalarType.int16 | ScalarType.float32: input_type = ScalarType.bfloat16 if quantization_precision == QuantizationPrecision.BFLOAT_16 \ else ScalarType.int8 case _: raise ValueError("Unexpected scalar type in ArgMaxOp.quantize") fix_input(quantizer_interface, input_type, InputName('data'), config.asymmetry.get()) output_shape = tuple(get_output_shape(attrs)) output_quant = TensorValue(QuantResultTensorType(TensorType(ScalarType.int32, output_shape), None, None)) quantizer_interface.set_chosen_output_quant(output_quant) new_attrs = dataclasses.replace(attrs, input_scalar_type=input_type, result_scalar_type=ScalarType.int32) return ArgMaxQuantAttrs(new_attrs) if scalar_is_integral(input_type) else new_attrs
@classmethod @type_check_operation_arguments(types=[ArgMaxAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: ArgMaxAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.argmax_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs] def run_quant(cls, attrs: ArgMaxQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.argmax_fn(attrs.attrs, input_dict[InputName('data')])
[docs] class SoftmaxOp(AwesomeOperation[SoftmaxAttrs, SoftmaxQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] softmax_fn: Callable[[SoftmaxAttrs, np.ndarray], np.ndarray] = op_fn.softmax
[docs] intermediate_names: ClassVar[List[str]] = ["sum_exp"]
@classmethod
[docs] def get_type(cls, attrs: Union[SoftmaxAttrs, SoftmaxQuantAttrs]) -> NodeType: if isinstance(attrs, SoftmaxAttrs): scalar_type = attrs.scalar_type else: scalar_type = ScalarType.int16 if attrs.enable_int16 else ScalarType.int8 tensor_type = TensorType(scalar_type, attrs.input_shape) return _unary_op_type(cls.input_list, tensor_type)
@classmethod @type_check_operation_arguments(types=[SoftmaxAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: SoftmaxAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.softmax_fn(attrs, input_dict[InputName('data')])
@classmethod @type_check_operation_arguments(types=[SoftmaxAttrs, OpQuantInterface, QuantizationConfigs, NodeReporter], dict_mask=[False, False, False, False])
[docs] def quantize(cls, attrs: SoftmaxAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) \ -> Union[SoftmaxAttrs, SoftmaxQuantAttrs]: input_precision = get_expected_tensor_value(quantizer_interface.get_input_quant()[cls.input_list[0]]).type compute_precision = config.quantization_precision.get().to_scalar_type() if scalar_is_integral(compute_precision): if input_precision.scalar != ScalarType.int8 and config.quantization_precision.get().is_int16_precision(): input_quant = fix_input_to_int16(quantizer_interface, InputName('data'), config.asymmetry.get()) output_quant = fix_output_to_int16(quantizer_interface, attrs.input_shape, config.asymmetry.get()) enable_int16 = True else: input_quant = fix_input_to_int8(quantizer_interface, InputName('data'), config.asymmetry.get()) output_quant = fix_output_to_int8(quantizer_interface, attrs.input_shape, config.asymmetry.get()) enable_int16 = False intermediate_min_max = get_intermediate_min_max(quantizer_interface) return quant_utils.quantize_softmax(attrs, input_quant.quant, output_quant.quant, intermediate_min_max, enable_int16) else: fix_input(quantizer_interface, compute_precision, InputName('data'), config.asymmetry.get()) output_shape = get_expected_tensor_value(cls.get_type(attrs).output).shape fix_output_from_input(quantizer_interface, output_shape) return dataclasses.replace(attrs, scalar_type=compute_precision)
@classmethod @type_check_operation_arguments(types=[SoftmaxQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run_quant(cls, quant_attrs: SoftmaxQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.softmax_fn(quant_attrs, input_dict[InputName('data')])
@classmethod
[docs] def calibrate(cls, attrs: AWESOME_ATTRS, calib_attrs: AwesomeCalibAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> Any: """ Softmax calibration method. Executes default calibration to get results of Softmax operation in floating point. Additionally, calculate intermediate results and update the observers for intermediate values. :param attrs: AwesomeAttributes associated with this operation :param calib_attrs: AwesomeCalibAttrs associated with operation's node. :param input_dict: Dictionary of names (eg. 'weights' 'data') to numpy arrays :param config: Parameters controlling how to calibrate. :return: Output tensor(s) whose type is dependent on the subclass. """ # Run default calibration. outputs = super().calibrate(attrs, calib_attrs, input_dict, config) # Calculate intermediate values. # i = ReduceSum(Exp(input), axis=axis, keepdims=1) data = input_dict[InputName('data')] sum_exp_fun = lambda x: (np.sum(np.exp(x - np.max(x, axis=attrs.axis, keepdims=True)), axis=attrs.axis, keepdims=True)) sum_exp_output = sum_exp_fun(data) # Update observers for intermediate values. assert calib_attrs.intermediate_observers assert ('sum_exp' in calib_attrs.intermediate_observers and calib_attrs.intermediate_observers['sum_exp'] is not None) calib_attrs.intermediate_observers['sum_exp'].update(sum_exp_output.astype(np.float32)) return outputs
[docs] class LRNOp(AwesomeOperation[LRNAttrs, LRNQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] lrn_fn: Callable[[LRNAttrs, np.ndarray], np.ndarray] = op_fn.lrn
@classmethod
[docs] def get_type(cls, attrs: Union[LRNAttrs, LRNQuantAttrs]) -> NodeType: scalar_type = ScalarType.float32 if isinstance(attrs, LRNAttrs) else ScalarType.int8 tensor_type = TensorType(scalar_type, attrs.shape) return _unary_op_type(cls.input_list, tensor_type)
@classmethod @type_check_operation_arguments(types=[LRNAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: LRNAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.lrn_fn(attrs, input_dict[InputName('data')])
@classmethod @type_check_operation_arguments(types=[LRNAttrs, OpQuantInterface, QuantizationConfigs, NodeReporter], dict_mask=[False, False, False, False])
[docs] def quantize(cls, attrs: LRNAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> LRNQuantAttrs: input_quant = fix_input_to_int8(quantizer_interface, InputName('data'), config.asymmetry.get()) output_quant = fix_output_to_int8(quantizer_interface, attrs.shape, config.asymmetry.get()) return quant_utils.quantize_lrn(attrs, input_quant.quant, output_quant.quant)
@classmethod @type_check_operation_arguments(types=[LRNQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run_quant(cls, quant_attrs: LRNQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: data = _cast_to_quant_tensor_new(input_dict[InputName('data')]) return cls.lrn_fn(quant_attrs, data)
[docs] class ExtmOp(AwesomeOperation[ExtmAttrs, AwesomeQuantAttrBase]): """ Extremum op, can be either min or max operation. Attributes contain a boolean to determine the operation. """
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] min_fn: Callable[[ExtmAttrs, np.ndarray], np.ndarray] = op_fn.min_op
[docs] max_fn: Callable[[ExtmAttrs, np.ndarray], np.ndarray] = op_fn.max_op
@classmethod
[docs] def get_type(cls, attrs: Union[ExtmAttrs, AwesomeQuantAttrBase]) -> NodeType: assert isinstance(attrs, ExtmAttrs) node_type = node_type_for_dimension_reduction_operators(attrs, np.float32, np.float32) return node_type
@classmethod @type_check_operation_arguments(types=[ExtmAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: ExtmAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: if attrs.max: return cls.max_fn(attrs, input_dict[InputName('data')]) else: return cls.min_fn(attrs, input_dict[InputName('data')])
[docs] class SumOp(AwesomeOperation[SumAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] sum_fn: Callable[[SumAttrs, np.ndarray], np.ndarray] = op_fn.sum_op
[docs] requantize_fn: Callable[[np.ndarray, int, Union[int, np.ndarray], int, bool, str], np.ndarray] = \ quant_utils.requantize
@classmethod
[docs] def get_type(cls, attrs: Union[SumAttrs, AwesomeQuantAttrBase]) -> NodeType: assert isinstance(attrs, SumAttrs) return node_type_for_dimension_reduction_operators(attrs, np.float32, np.float32)
@classmethod @type_check_operation_arguments(types=[SumAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: SumAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.sum_fn(attrs, input_dict[InputName('data')])
[docs] class ProdOp(AwesomeOperation[ProdAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] prod_fn: Callable[[ProdAttrs, np.ndarray], np.ndarray] = op_fn.prod
@classmethod
[docs] def get_type(cls, attrs: Union[ProdAttrs, QUANT_ATTRS]) -> NodeType: node_type = node_type_for_dimension_reduction_operators(attrs, np.float32, np.float32) return node_type
@classmethod @type_check_operation_arguments(types=[ProdAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: ProdAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.prod_fn(attrs, input_dict[InputName('data')])
[docs] class SubtractOp(AwesomeOperation[SubtractAttrs, SubtractQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('lhs'), InputName('rhs')]
[docs] subtract_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] = op_fn.subtract
[docs] requantize_fn: Callable[ [np.ndarray, int, Union[int, np.ndarray], int, bool, str], np.ndarray] = quant_utils.requantize
@classmethod
[docs] def get_type(cls, attrs: Union[SubtractAttrs, SubtractQuantAttrs]) -> NodeType: if isinstance(attrs, SubtractAttrs): in_type = out_type = attrs.scalar_type else: in_type = ScalarType.int16 if attrs.input_int16 else ScalarType.int8 out_type = ScalarType.from_numpy(attrs.requant.out_dtype) assert out_type in (ScalarType.int8, ScalarType.int16, ScalarType.int32) attrs = attrs if isinstance(attrs, SubtractAttrs) else attrs.attrs lhs_input_shape = attrs.lhs_input_shape rhs_input_shape = attrs.rhs_input_shape output_shape = _get_out_shape_for_op_with_2_inputs(attrs) return NodeType({cls.input_list[0]: TensorValue(TensorType(in_type, lhs_input_shape)), cls.input_list[1]: TensorValue(TensorType(in_type, rhs_input_shape))}, TensorValue(TensorType(out_type, output_shape)))
@classmethod @type_check_operation_arguments(types=[AwesomeAttributes, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: SubtractAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.subtract_fn(input_dict[InputName('lhs')], input_dict[InputName('rhs')])
@classmethod
[docs] def quantize(cls, attrs: SubtractAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) \ -> Union[SubtractAttrs, SubtractQuantAttrs]: compute_precision = config.quantization_precision.get().to_scalar_type() output_shape = _get_out_shape_for_op_with_2_inputs(attrs) lhs_quantization = fix_input(quantizer_interface, compute_precision, InputName('lhs'), config.asymmetry.get()) rhs_quantization = fix_input(quantizer_interface, compute_precision, InputName('rhs'), config.asymmetry.get()) if scalar_is_integral(compute_precision): # Select an int8 quantization quantization = quantize_output(quantizer_interface, ScalarType.int8, output_shape, config.asymmetry.get()) input_scales = [lhs_quantization.quant.scale, rhs_quantization.quant.scale] input_zps = [lhs_quantization.quant.zero_point, rhs_quantization.quant.zero_point] scale = quantization.quant.scale zero_point = quantization.quant.zero_point layer_bits = quantization.quant.bits scales, zp_corr, shift = quant_utils.quantize_add_subtract(is_subtract=True, input_scales=input_scales, input_zps=input_zps, scale=scale, zero_point=zero_point, layer_bits=layer_bits) if compute_precision == ScalarType.int8: # Use the int8 quantization that was selected. # Requantize to int8 using the calculated shift and zp_corr. requant = requantization.FractionalZeroRequantization( 1, zp_corr, requantization.Narrowing(shift, RoundType.TOEVEN, np.int8) ) output_quantization = quantization else: assert compute_precision == ScalarType.int16 # Convert the int8 requantization to an int16 requantization. # Subtract 8 from shift, but don't reduce shift below 0. shift_adjustment = min(shift, 8) shift -= shift_adjustment scale *= (1 << shift_adjustment) zero_point *= (1 << shift_adjustment) # Create the output quantization and requantization information quantization_16 = Quantization(scale, zero_point, bits=16, min_val=quantization.quant.min_val, max_val=quantization.quant.max_val) type_16 = TensorType(ScalarType.int16, output_shape) output_quantization = QuantResultTensorType(type_16, quantization_16, RequantMethod.fractional_zero) requant = requantization.FractionalZeroRequantization(1, zp_corr, utils.create_and_verify_narrowing(shift, RoundType.TOEVEN, np.int16)) layer_bits = 16 # Save results for quantized subtract operator quantizer_interface.set_chosen_output_quant(TensorValue(output_quantization)) quant_attrs = SubtractQuantAttrs(attrs, compute_precision == ScalarType.int16, requant, scales[0], scales[1], layer_bits) else: # Save results for floating-point subtract operator fix_output(quantizer_interface, ScalarType.bfloat16, output_shape, config.asymmetry.get()) quant_attrs = dataclasses.replace(attrs, scalar_type=ScalarType.bfloat16) return quant_attrs
@classmethod
[docs] def run_quant(cls, quant_attrs: SubtractQuantAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> np.ndarray: output = cls.subtract_fn(input_dict[InputName('lhs')].astype(np.int32) * quant_attrs.lhs_scale, input_dict[InputName('rhs')].astype(np.int32) * quant_attrs.rhs_scale) quantized_output = requantization.requantize(output, quant_attrs.requant) return quantized_output
[docs] class PowerOp(AwesomeOperation[PowerAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('lhs'), InputName('rhs')]
[docs] power_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] = op_fn.power
@classmethod
[docs] def get_type(cls, attrs: Union[PowerAttrs, QUANT_ATTRS]) -> NodeType: lhs_input_shape = attrs.lhs_input_shape rhs_input_shape = attrs.rhs_input_shape output_shape = _get_out_shape_for_op_with_2_inputs(attrs) return NodeType({cls.input_list[0]: TensorValue(TensorType(ScalarType.float32, lhs_input_shape)), cls.input_list[1]: TensorValue(TensorType(ScalarType.float32, rhs_input_shape))}, TensorValue(TensorType(ScalarType.float32, output_shape)))
@classmethod @type_check_operation_arguments(types=[AwesomeAttributes, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: AwesomeAttributes, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.power_fn(input_dict[InputName('lhs')], input_dict[InputName('rhs')])
[docs] class MaximumOp(AwesomeOperation[MaximumAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('lhs'), InputName('rhs')]
[docs] maximum_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] = op_fn.maximum
@classmethod
[docs] def get_type(cls, attrs: Union[MaximumAttrs, AwesomeQuantAttrBase]) -> NodeType: assert isinstance(attrs, MaximumAttrs) shape = attrs.input_shape data_type = ScalarType.float32 tensor_type = TensorType(scalar=data_type, shape=shape) return _binary_op_type(input_list=cls.input_list, t=tensor_type)
@classmethod @type_check_operation_arguments(types=[MaximumAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: MaximumAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.maximum_fn(input_dict[InputName('lhs')], input_dict[InputName('rhs')])
[docs] class MinimumOp(AwesomeOperation[MinimumAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('lhs'), InputName('rhs')]
[docs] minimum_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] = op_fn.minimum
@classmethod
[docs] def get_type(cls, attrs: Union[MinimumAttrs, AwesomeQuantAttrBase]) -> NodeType: assert isinstance(attrs, MinimumAttrs) shape = attrs.input_shape data_type = ScalarType.float32 tensor_type = TensorType(scalar=data_type, shape=shape) return _binary_op_type(input_list=cls.input_list, t=tensor_type)
@classmethod @type_check_operation_arguments(types=[AwesomeAttributes, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: AwesomeAttributes, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.minimum_fn(input_dict[InputName('lhs')], input_dict[InputName('rhs')])
[docs] class FullOp(AwesomeOperation[FullAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('fill_value')]
[docs] full_fn: Callable[[FullAttrs, np.ndarray], np.ndarray] = op_fn.full
@classmethod @type_check_operation_arguments(types=[FullAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: FullAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.full_fn(attrs, input_dict[InputName('fill_value')])
[docs] class TileOp(AwesomeOperation[TileAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] tile_fn: Callable[[TileAttrs, np.ndarray], np.ndarray] = op_fn.tile
@classmethod @type_check_operation_arguments(types=[TileAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: TileAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.tile_fn(attrs, input_dict[InputName('data')])
[docs] class PReluOp(AwesomeOperation[PReluAttrs, PReluQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] relu_fn: Callable[[np.ndarray, int], np.ndarray] = op_fn.relu
[docs] prelu_fn: Callable[[PReluAttrs, np.ndarray], np.ndarray] = op_fn.prelu
[docs] requantize_fn: Callable[[np.ndarray, int, Union[int, np.ndarray], int, bool, str], np.ndarray] = \ quant_utils.requantize
@classmethod
[docs] def get_type(cls, attrs: Union[PReluAttrs, PReluQuantAttrs]) -> NodeType: shape = attrs.input_shape data_type = attrs.scalar_type if isinstance(attrs, PReluAttrs) else ScalarType.int8 tensor_type = TensorType(scalar=data_type, shape=shape) return _unary_op_type(input_list=cls.input_list, t=tensor_type)
@classmethod @type_check_operation_arguments(types=[PReluAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: PReluAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.prelu_fn(input_dict[InputName('data')], attrs.alpha, attrs.axis)
@classmethod
[docs] def quantize(cls, attrs: PReluAttrs, quantizer_interface: OpQuantInterface, configs: QuantizationConfigs, error_reporter: NodeReporter) -> Union[PReluAttrs, PReluQuantAttrs]: compute_precision = configs.quantization_precision.get().to_scalar_type() input_scalar_type = ScalarType.bfloat16 if scalar_is_floating(compute_precision) else ScalarType.int8 input_quant = fix_input(quantizer_interface, input_scalar_type, InputName('data'), configs.asymmetry.get()) fix_output_from_input(quantizer_interface, input_quant.type.shape) if scalar_is_integral(compute_precision): input_zp = input_quant.quant.zero_point bits = input_quant.quant.bits alpha_quant, shift = quant_utils.quantize_alpha(attrs.alpha, bits) quant_attrs = PReluQuantAttrs(attrs.axis, attrs.input_shape, alpha_quant, shift, input_zp) return quant_attrs else: return dataclasses.replace(attrs, scalar_type=compute_precision, alpha=attrs.alpha.astype(bfloat16))
@classmethod
[docs] def run_quant(cls, quant_attrs: PReluQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: data = _cast_to_quant_tensor_new(input_dict[InputName('data')]) alpha = quant_attrs.quant_alpha data_zp = quant_attrs.data_zero_point axis = quant_attrs.axis data = utils.transpose_axis_to_the_last(data, axis) positive = cls.relu_fn(data, data_zp) negative = (data.astype(np.int32) - positive) * alpha.astype(np.int32) negative = cls.requantize_fn(data=negative, bits=8, right_shifts=quant_attrs.alpha_shift, axis=axis, rounding_type=RoundType.UPWARD) res = negative + positive res = utils.transpose_axis_to_the_last(res, axis) return res
[docs] class BroadcastToOp(AwesomeOperation[BroadcastToAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] broadcast_to_fn = op_fn.broadcast_to
@classmethod
[docs] def get_type(cls, attrs: BroadcastToAttrs | BroadcastToQuantAttrs) -> NodeType: input_shape = attrs.input_type.shape dtype = attrs.input_type.scalar output_shape = attrs.output_shape return NodeType({cls.input_list[0]: TensorValue(TensorType(dtype, input_shape))}, TensorValue(TensorType(dtype, tuple(output_shape))))
@classmethod @type_check_operation_arguments(types=[BroadcastToAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: BroadcastToAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.broadcast_to_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs] def quantize(cls, attrs: BroadcastToAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> BroadcastToAttrs | BroadcastToQuantAttrs: # Use the input type and quantization input_precision = _get_input_precision(quantizer_interface, config.quantization_precision.get(), InputName('data')) input_quant = fix_input(quantizer_interface, input_precision, InputName('data'), config.asymmetry.get()) fix_output_from_input(quantizer_interface, attrs.output_shape) scalar_type = input_quant.type.scalar if scalar_is_integral(scalar_type): quant_attrs = BroadcastToQuantAttrs(TensorType(scalar_type, attrs.input_type.shape), attrs.output_shape) return quant_attrs else: tensor_type = TensorType(scalar=scalar_type, shape=attrs.input_type.shape) return dataclasses.replace(attrs, input_type=tensor_type)
@classmethod @type_check_operation_arguments(types=[BroadcastToQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run_quant(cls, attrs: BroadcastToQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.broadcast_to_fn(attrs, input_dict[InputName('data')])
############################### # UDF(Lookup table) OPERATIONS ###############################
[docs] class UDFOp(AwesomeOperation[UDFAttrs, UDFQuantAttrs], metaclass=ABCMeta):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] udf_fn: Optional[Callable[[np.ndarray], np.ndarray]] = None
@classmethod
[docs] def get_type(cls, attrs: Union[UDFAttrs, UDFQuantAttrs]) -> NodeType: if isinstance(attrs, UDFAttrs): shape = attrs.input_shape in_data_type = out_data_type = attrs.scalar_type else: shape = attrs.attrs.input_shape in_data_type = ScalarType.int16 if attrs.input_int16 else ScalarType.int8 out_data_type = ScalarType.from_numpy(attrs.requant.out_dtype) return NodeType({cls.input_list[0]: TensorValue(TensorType(scalar=in_data_type, shape=shape))}, TensorValue(TensorType(scalar=out_data_type, shape=shape)))
@classmethod @type_check_operation_arguments(types=[UDFAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: UDFAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: if cls.udf_fn is None: raise NotImplementedError(f"Please implement the UDF function for {cls.__class__}") if attrs.scalar_type == ScalarType.bfloat16: out = cls.udf_fn(input_dict[InputName('data')]) out = out.astype(bfloat16) else: out = cls.udf_fn(input_dict[InputName('data')]).astype(np.float32) return out
@classmethod
[docs] def quantize(cls, attrs: UDFAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> Union[UDFAttrs, UDFQuantAttrs]: def udf_scalar(x: float) -> float: return float(cls.udf_fn(np.array(x, dtype=np.float32)).item()) input_name = InputName('data') input_precision = get_expected_tensor_value(quantizer_interface.get_input_quant()[input_name]).type compute_precision = config.quantization_precision.get().to_scalar_type() if scalar_is_integral(compute_precision): if input_precision.scalar != ScalarType.int8 and compute_precision == ScalarType.int16: input_quant = fix_input_to_int16(quantizer_interface, input_name, config.asymmetry.get()) input_int16 = True else: input_quant = fix_input_to_int8(quantizer_interface, input_name, config.asymmetry.get()) input_int16 = False out_scalar_type = ScalarType.int8 if compute_precision == ScalarType.int8 else ScalarType.int16 output_quant = quantize_output(quantizer_interface, out_scalar_type, input_precision.shape, config.asymmetry.get(), RequantMethod.arith_folded) lut_input_type = np.int16 if input_int16 else np.int8 lookup_table = quant_utils.quantize_udf(input_quant.quant, output_quant.quant, lut_input_type, out_scalar_type.numpy_type(), udf_scalar) quantizer_interface.set_chosen_output_quant(TensorValue(output_quant)) requant = requantization.narrowing_requantization(shift=0, rounding=RoundType.TOEVEN, out_dtype=out_scalar_type.numpy_type()) return UDFQuantAttrs(lookup_table=lookup_table, attrs=attrs, input_int16=input_int16, requant=requant) else: fix_input(quantizer_interface, compute_precision, InputName('data'), config.asymmetry.get()) output_shape = get_expected_tensor_value(cls.get_type(attrs).output).shape fix_output_from_input(quantizer_interface, output_shape) return dataclasses.replace(attrs, scalar_type=compute_precision)
@classmethod
[docs] def run_quant(cls, quant_attrs: UDFQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: lut = quant_attrs.lookup_table output = requantization.requantize(ideal_udf(input_dict[InputName('data')], table=lut), quant_attrs.requant) return output
[docs] class SqrtOp(UDFOp):
[docs] udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.sqrt
[docs] class RsqrtOp(UDFOp):
[docs] udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.rsqrt
[docs] class TanhOp(UDFOp):
[docs] udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.tanh
[docs] class SigmoidOp(UDFOp):
[docs] udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.sigmoid
[docs] class LogOp(UDFOp):
[docs] udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.log
[docs] class Log2Op(UDFOp):
[docs] udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.log2
[docs] class Log10Op(UDFOp):
[docs] udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.log10
[docs] class ReciprocalOp(UDFOp):
[docs] udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.reciprocal
[docs] class EluOp(UDFOp):
[docs] udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.elu
[docs] class SoftplusOp(UDFOp):
[docs] udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.softplus
[docs] class ErfOp(UDFOp):
[docs] udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.erf
[docs] class GeluOp(UDFOp):
[docs] udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.gelu
[docs] class DivideOp(AwesomeOperation[DivideAttrs, DivideQuantAttrs], metaclass=ABCMeta):
[docs] input_list: ClassVar[List[InputName]] = [InputName('lhs'), InputName('rhs')]
[docs] intermediate_names: ClassVar[List[str]] = ['rhs_reciprocal']
[docs] divide_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] = op_fn.divide
[docs] reciprocal_op: ReciprocalOp = ReciprocalOp()
[docs] multiply_op: MultiplyOp = MultiplyOp()
@classmethod
[docs] def get_type(cls, attrs: Union[DivideAttrs, DivideQuantAttrs]) -> NodeType: udf_type = cls.reciprocal_op.get_type(attrs.udf_attrs) mul_type = cls.multiply_op.get_type(attrs.multiply_attrs) return NodeType( { cls.input_list[0]: mul_type.inputs[cls.multiply_op.input_list[0]], cls.input_list[1]: udf_type.inputs[cls.reciprocal_op.input_list[0]] }, mul_type.output )
@classmethod @type_check_operation_arguments(types=[AwesomeAttributes, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: AwesomeAttributes, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.divide_fn(input_dict[InputName('lhs')], input_dict[InputName('rhs')])
@classmethod
[docs] def calibrate(cls, attrs: AWESOME_ATTRS, calib_attrs: AwesomeCalibAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> Any: """ DivideOp calibration method. Executes default calibration to get results of Divide operation in floating point. Additionally, calculate intermediate results for reciprocal(rhs) and update the observer for intermediate values. :param attrs: AwesomeAttributes associated with this operation :param calib_attrs: AwesomeCalibAttrs associated with operation's node. :param input_dict: Dictionary of names (eg. 'weights' 'data') to numpy arrays :param config: Parameters controlling how to calibrate. :return: Output tensor(s) whose type is dependent on the subclass. """ # Run default calibration. outputs = super().calibrate(attrs, calib_attrs, input_dict, config) # Calculate intermediate values. data = input_dict[InputName('rhs')] rhs_reciprocal = np.reciprocal(data).astype(np.float32) # Replace inf values with the next non inf maximum number. inf_mask = rhs_reciprocal == np.inf rhs_reciprocal[inf_mask] = -np.inf rhs_reciprocal[inf_mask] = np.max(rhs_reciprocal) # Update observers for intermediate values. assert calib_attrs.intermediate_observers assert ('rhs_reciprocal' in calib_attrs.intermediate_observers and calib_attrs.intermediate_observers['rhs_reciprocal'] is not None) calib_attrs.intermediate_observers['rhs_reciprocal'].update(rhs_reciprocal) return outputs
@classmethod
[docs] def quantize(cls, attrs: DivideAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> DivideQuantAttrs: from afe.ir.quantization_interface import make_quantize_op_interface # Quantize 1/rhs as ReciprocalOp, using UDFOp infrastructure. # Extract data from quantizer_interface in order to crate a new interface to be used # for quantization of the reciprocal (UDF) part. udf_input_name = cls.input_list[-1] udf_input_data = { cls.reciprocal_op.input_list[0]: ( quantizer_interface.get_input_quant()[udf_input_name], quantizer_interface.get_input_distributions()[udf_input_name], quantizer_interface.get_calibration_data() ) } intermediate_distributions = quantizer_interface.get_intermediate_distributions() assert 'rhs_reciprocal' in intermediate_distributions udf_output_distribution = intermediate_distributions['rhs_reciprocal'] placeholder_data = quantizer_interface.get_placeholder_quant() # Create quantization interface for reciprocal part. udf_quant_interface, udf_quant_result = \ make_quantize_op_interface(udf_input_data, placeholder_data, udf_output_distribution, None) udf_quant_attrs = cls.reciprocal_op.quantize(attrs.udf_attrs, udf_quant_interface, config, error_reporter) # Quantize DivideOp as MultiplyOp(lhs, 1/rhs). # Extract data from quant_interface and udf_quant_interface in order to create a new interface # to be used for quantization of the multiplication part. mul_input_data = { cls.multiply_op.input_list[0]: ( quantizer_interface.get_input_quant()[cls.input_list[0]], quantizer_interface.get_input_distributions()[cls.input_list[0]], None ), cls.multiply_op.input_list[1]: ( udf_quant_result.get_result().output, udf_quant_interface.get_output_distribution(), None ) } # Create quantization interface for multiplication part. mul_quant_interface, mul_quant_result = \ make_quantize_op_interface(mul_input_data, placeholder_data, quantizer_interface.get_output_distribution(), None) mul_quant_attrs = cls.multiply_op.quantize(attrs.multiply_attrs, mul_quant_interface, config, error_reporter) # Use results from UDF and multiplication parts to set chosen values in quantizer_interface. quantizer_interface.set_chosen_input_quant( cls.input_list[0], mul_quant_result.get_result().inputs[cls.multiply_op.input_list[0]] ) quantizer_interface.set_chosen_input_quant( cls.input_list[1], udf_quant_result.get_result().inputs[cls.reciprocal_op.input_list[0]] ) quantizer_interface.set_chosen_output_quant(mul_quant_result.get_result().output) if isinstance(udf_quant_attrs, UDFAttrs): assert isinstance(mul_quant_attrs, MultiplyAttrs) return DivideAttrs(udf_quant_attrs, mul_quant_attrs) else: return DivideQuantAttrs(udf_quant_attrs, mul_quant_attrs)
@classmethod
[docs] def run_quant(cls, quant_attrs: DivideQuantAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> np.ndarray: # Create input_dict for reciprocal part and run as UDFOp. udf_input_dict = {cls.reciprocal_op.input_list[0]: input_dict[cls.input_list[1]]} rhs_reciprocal = cls.reciprocal_op.run_quant(quant_attrs.udf_attrs, udf_input_dict, config) # Create input_dict for multiplication part and run as MultiplyOp. mul_input_dict = { cls.multiply_op.input_list[0]: input_dict[cls.input_list[0]], cls.multiply_op.input_list[1]: rhs_reciprocal } return cls.multiply_op.run_quant(quant_attrs.multiply_attrs, mul_input_dict, config)
[docs] class ExpOp(UDFOp):
[docs] udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.exp
[docs] class SwishOp(UDFOp):
[docs] udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.swish
[docs] class HardSigmoidOp(UDFOp):
[docs] udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.hard_sigmoid
[docs] class HardSwishOp(UDFOp):
[docs] udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.hard_swish
[docs] class UpsamplingOp(AwesomeOperation[UpsamplingAttrs, UpsamplingQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] upsampling_fn: Callable[[np.ndarray], np.ndarray] = op_fn.upsample
@classmethod
[docs] def get_type(cls, attrs: Union[UpsamplingAttrs, UpsamplingQuantAttrs]) -> NodeType: if isinstance(attrs, UpsamplingAttrs): scalar_type = attrs.scalar_type uattrs = attrs else: assert isinstance(attrs, UpsamplingQuantAttrs) scalar_type = ScalarType.int16 if attrs.input_int16 else ScalarType.int8 uattrs = attrs.upsampling_attrs input_shape = uattrs.input_shape layout = uattrs.layout # Multiply H, W size by the upsampling scale factors to produce output shape input_shape_h, input_shape_w = utils.transpose_attr_according_to_layout_strings(input_shape, layout, "HW") output_shape_h = int(input_shape_h * uattrs.scale_h) output_shape_w = int(input_shape_w * uattrs.scale_w) output_shape = utils.insert_according_to_layout_strings(input_shape, (output_shape_h, output_shape_w), layout, "HW") input_type = TensorType(scalar_type, input_shape) output_type = TensorType(scalar_type, output_shape) return NodeType({'data': TensorValue(input_type)}, TensorValue(output_type))
@classmethod @type_check_operation_arguments(types=[UpsamplingAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: UpsamplingAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.upsampling_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs] def quantize(cls, attrs: UpsamplingAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter)\ -> Union[UpsamplingAttrs, UpsamplingQuantAttrs]: compute_precision = config.quantization_precision.get().to_scalar_type() input_quant = fix_input(quantizer_interface, compute_precision, InputName('data'), config.asymmetry.get()) output_shape = get_expected_tensor_value(cls.get_type(attrs).output).shape fix_output_from_input(quantizer_interface, output_shape) if compute_precision == ScalarType.bfloat16: return dataclasses.replace(attrs, scalar_type=compute_precision) else: input_zp = input_quant.quant.zero_point input_scale = input_quant.quant.scale quant_attrs = UpsamplingQuantAttrs(upsampling_attrs=attrs, input_zp=input_zp, input_scale=input_scale, input_int16=compute_precision == ScalarType.int16) return quant_attrs
@classmethod @type_check_operation_arguments(types=[UpsamplingQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run_quant(cls, quant_attrs: UpsamplingQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.upsampling_fn(quant_attrs.upsampling_attrs, input_dict[InputName('data')], rounding=quant_attrs.rounding_type)
[docs] class ImageResize2DOp(AwesomeOperation[ImageResize2DAttrs, ImageResize2DQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] image_resize_fn: Callable[[ImageResize2DAttrs, np.ndarray], np.ndarray] = op_fn.image_resize2d
@classmethod
[docs] def get_type(cls, attrs: Union[ImageResize2DAttrs, ImageResize2DQuantAttrs]) -> NodeType: if isinstance(attrs, ImageResize2DAttrs): in_dtype = out_dtype = scalar_type_from_dtype(attrs.out_dtype) else: in_dtype = ScalarType.int16 if attrs.input_int16 else ScalarType.int8 out_dtype = ScalarType.from_numpy(attrs.requant.out_dtype) if attrs.requant is not None else ScalarType.int8 attrs = attrs if isinstance(attrs, ImageResize2DAttrs) else attrs.image_resize2d_attrs input_shape = attrs.input_shape output_shape = _get_image_resize2d_out_shape(attrs) return NodeType({cls.input_list[0]: TensorValue(TensorType(in_dtype, input_shape))}, TensorValue(TensorType(out_dtype, output_shape)))
@classmethod @type_check_operation_arguments(types=[ImageResize2DAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: ImageResize2DAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.image_resize_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs] def quantize(cls, attrs: ImageResize2DAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter)\ -> Union[ImageResize2DAttrs,ImageResize2DQuantAttrs]: """ In MLA implementation of resize, output type is the same as input type. There is no intermediate int32 result. Always use int8, if integer scaling factor != (1, 2, 4). <input_type> <enable_int16> <input_quant> <resize_kernel> <output_type> int8 True int8 int8 int8 int8 False int8 int8 int8 int16 False int8 int8 int8 int16 True int16 int16 int16 """ input_precision = _get_input_precision(quantizer_interface, config.quantization_precision.get(), InputName('data')) h_axis, w_axis = attrs.layout.find('H'), attrs.layout.find('W') scaling_h, mod_h = divmod(attrs.size[0], attrs.input_shape[h_axis]) scaling_w, mod_w = divmod(attrs.size[1], attrs.input_shape[w_axis]) use_int16 = False if not all([sc in (1, 2, 4) for sc in (scaling_h, scaling_w)]) or not all([m == 0 for m in (mod_h, mod_w)]): # If the integer scaling factor is not in (1, 2, 4), use int8 quantization for input and output. io_type = ScalarType.int8 elif scalar_is_integral(input_precision): use_int16 = (config.quantization_precision.get().is_int16_precision() and (input_precision == ScalarType.int16)) io_type = ScalarType.int16 if use_int16 else ScalarType.int8 else: io_type = ScalarType.bfloat16 input_quant = fix_input(quantizer_interface, io_type, InputName('data'), config.asymmetry.get()) output_shape = get_expected_tensor_value(cls.get_type(attrs).output).shape output_quant = quantize_output(quantizer_interface, io_type, output_shape, config.asymmetry.get()) quantizer_interface.set_chosen_output_quant(TensorValue(output_quant)) if scalar_is_integral(io_type): requant = requantization.narrowing_requantization(shift=0, rounding=RoundType.TOEVEN, out_dtype=io_type.numpy_type()) input_zp = input_quant.quant.zero_point input_scale = input_quant.quant.scale quant_attrs = ImageResize2DQuantAttrs(image_resize2d_attrs=attrs, input_zp=input_zp, input_scale=input_scale, input_int16=use_int16, requant=requant) return quant_attrs else: return dataclasses.replace(attrs, out_dtype='bfloat16')
@classmethod @type_check_operation_arguments(types=[ImageResize2DQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run_quant(cls, quant_attrs: ImageResize2DQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: data = input_dict[InputName('data')] out = cls.image_resize_fn(quant_attrs.image_resize2d_attrs, data, rounding=quant_attrs.rounding_type) if quant_attrs.requant is not None: return requantization.requantize(out, quant_attrs.requant) else: return out
[docs] class GridSampleOp(AwesomeOperation[GridSampleAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data'), InputName('grid')]
[docs] gridsample_fn: Callable[[GridSampleAttrs, np.ndarray, np.ndarray], np.ndarray] = op_fn.gridsample
@classmethod
[docs] def get_type(cls, attrs: GridSampleAttrs) -> NodeType: assert isinstance(attrs, GridSampleAttrs) in_data_type = out_data_type = attrs.scalar_type input_shape = attrs.input_shape grid_shape = attrs.grid_shape assert len(input_shape) == 4 and grid_shape[-1] == 2, "Only 2D GridSample is supported" # We are dealing with NHWC layout here H_out, W_out = grid_shape[1], grid_shape[2] output_shape = (input_shape[0], H_out, W_out, input_shape[-1]) return NodeType({cls.input_list[0]: TensorValue(TensorType(scalar=in_data_type, shape=input_shape)), cls.input_list[1]: TensorValue(TensorType(scalar=in_data_type, shape=grid_shape))}, TensorValue(TensorType(scalar=out_data_type, shape=output_shape)))
@classmethod @type_check_operation_arguments(types=[GridSampleAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: GridSampleAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.gridsample_fn(attrs, input_dict[InputName('data')], input_dict[InputName('grid')])
@classmethod
[docs] def quantize(cls, attrs: GridSampleAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter)\ -> GridSampleAttrs: compute_precision = config.quantization_precision.get().to_scalar_type() assert compute_precision == ScalarType.bfloat16, "Only bfloat16 is supported for quantization of GridSample" fix_input(quantizer_interface, compute_precision, InputName('data'), config.asymmetry.get()) fix_input(quantizer_interface, compute_precision, InputName('grid'), config.asymmetry.get()) output_shape = get_expected_tensor_value(cls.get_type(attrs).output).shape fix_output(quantizer_interface, compute_precision, output_shape, config.asymmetry.get()) return dataclasses.replace(attrs, scalar_type=compute_precision)
################################# # TENSOR MANIPULATION OPERATIONS #################################
[docs] class TupleOp(AwesomeOperation[TupleAttrs, AwesomeQuantAttrBase]): """ TupleOp takes in multiple tensors, returns a tuple """ # Tuple can have a various number of inputs so we can't establish input names across all tuple attrs
[docs] input_list = None
[docs] tuple_fn: Callable[[List[np.ndarray]], tuple] = tuple
@classmethod
[docs] def get_type(cls, attrs: TupleAttrs) -> NodeType: input_types = [TensorValue(input_type) for input_type in attrs.input_types] return NodeType({'input_{}'.format(i): i_type for i, i_type in enumerate(input_types)}, TupleValue(input_types))
@classmethod @type_check_operation_arguments(types=[TupleAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, _: TupleAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> Tuple[np.ndarray, ...]: return cls.tuple_fn(input_dict.values())
@classmethod
[docs] def quantize(cls, attrs: TupleAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> TupleAttrs: # Inputs are propagated to outputs. Type or quantization is not changed. for input_name, input_quant in quantizer_interface.get_input_quant().items(): quantizer_interface.set_chosen_input_quant(input_name, input_quant) output_quant = TupleValue(list(quantizer_interface.get_input_quant().values())) quantizer_interface.set_chosen_output_quant(output_quant) # Update attributes to use the quantized types attrs.input_types = [TensorType(input_quant.value.type.scalar, input_type.shape) for input_quant, input_type in zip(quantizer_interface.get_input_quant().values(), attrs.input_types)] return attrs
@classmethod
[docs] def get_observed_distribution(cls, calib_attrs: AwesomeCalibAttrs, inputs: Dict[InputName, QuantizationTensorData]) \ -> Tuple[Optional[ObservedDistribution], Dict[str, ObservedDistribution]]: return None, {}
[docs] class TupleGetItemOp(AwesomeOperation[TupleGetItemAttrs, AwesomeQuantAttrBase]): """ TupleGetItemOp takes in a tuple, returns a tensor """
[docs] input_list: ClassVar[List[InputName]] = [InputName('tuple_value')]
[docs] tuple_get_item_fn: Callable[[TupleGetItemAttrs, tuple], np.ndarray] = op_fn.tuple_get_item
@classmethod
[docs] def get_type(cls, attrs: TupleGetItemAttrs) -> NodeType: return NodeType({cls.input_list[0]: TupleValue([TensorValue(input_type) for input_type in attrs.input_types])}, TensorValue(attrs.input_types[attrs.index]))
@classmethod @type_check_operation_arguments(types=[TupleGetItemAttrs, tuple], dict_mask=[False, True])
[docs] def run(cls, attrs: TupleGetItemAttrs, input_dict: Dict[InputName, tuple], config: RunConfigs) -> np.ndarray: return cls.tuple_get_item_fn(attrs, input_dict[InputName('tuple_value')])
@classmethod
[docs] def quantize(cls, attrs: TupleGetItemAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> TupleGetItemAttrs: value = quantizer_interface.get_input_quant()[InputName('tuple_value')] assert isinstance(value, TupleValue), "Expected quantization scale of TupleGetItemOp's input to be a TupleValue" quantizer_interface.set_chosen_input_quant(InputName('tuple_value'), value) # Quantization is not changed quantizer_interface.set_chosen_output_quant(value.elements[attrs.index]) # Update attributes to use the quantized types attrs.input_types = [TensorType(input_quant.value.type.scalar, input_type.shape) for input_quant, input_type in zip(value.elements, attrs.input_types)] return attrs
@classmethod
[docs] def get_observed_distribution(cls, calib_attrs: AwesomeCalibAttrs, inputs: Dict[InputName, QuantizationTensorData]) \ -> Tuple[Optional[ObservedDistribution], Dict[str, ObservedDistribution]]: return None, {}
[docs] class SqueezeOp(AwesomeOperation[SqueezeAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] squeeze_fn: Callable[[SqueezeAttrs, np.ndarray], np.ndarray] = op_fn.squeeze
@classmethod
[docs] def get_type(cls, attrs: Union[SqueezeAttrs, QUANT_ATTRS]) -> NodeType: input_shape = attrs.input_shape output_shape = get_squeeze_out_shape(attrs.axis, input_shape) dtype = attrs.input_type return NodeType({cls.input_list[0]: TensorValue(TensorType(dtype, input_shape))}, TensorValue(TensorType(dtype, tuple(output_shape))))
@classmethod
[docs] def run(cls, attrs: SqueezeAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.squeeze_fn(attrs, input_dict[InputName('data')])
[docs] class ConcatenateOp(AwesomeOperation[ConcatenateAttrs, ConcatQuantAttrs]): # ConcatenateOp can have a various number of inputs so we can't establish input names across all inputs
[docs] input_list: ClassVar[List[InputName]] = None
[docs] concatenate_fn: Callable[[ConcatenateAttrs, List[np.ndarray]], np.ndarray] = op_fn.concatenate
[docs] requantize_fn: Callable[[np.ndarray, int, Union[int, np.ndarray], int, bool, str], np.ndarray] = \ quant_utils.requantize
@classmethod
[docs] def get_type(cls, attrs: Union[ConcatenateAttrs, ConcatQuantAttrs]) -> NodeType: if isinstance(attrs, ConcatenateAttrs): out_dtype = attrs.scalar_type input_types = [TensorValue(input_type) for input_type in attrs.input_types] out_shape = _get_concat_out_shape(attrs) elif isinstance(attrs, TupleConcatenateAttrs): out_dtype = attrs.concat_attrs.scalar_type input_types = [TensorValue(input_type) for input_type in attrs.concat_attrs.input_types] out_shape = _get_concat_out_shape(attrs.concat_attrs) else: out_dtype = ScalarType.from_numpy(attrs.requants[0].out_dtype) if attrs.requants is not None \ else ScalarType.int8 input_types = [TensorValue(input_type) for input_type in attrs.attrs.input_types] out_shape = _get_concat_out_shape(attrs.attrs) return NodeType({'input_{}'.format(i): input_type for i, input_type in enumerate(input_types)}, TensorValue(TensorType(out_dtype, out_shape)))
@classmethod @type_check_operation_arguments(types=[ConcatenateAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: ConcatenateAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.concatenate_fn(attrs, list(input_dict.values()))
@classmethod @type_check_operation_arguments(types=[ConcatenateAttrs, OpQuantInterface, QuantizationConfigs, NodeReporter], dict_mask=[False, False, False, False])
[docs] def quantize(cls, attrs: ConcatenateAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) \ -> Union[ConcatenateAttrs, ConcatQuantAttrs]: assert len(attrs.input_types) == len(quantizer_interface.get_input_quant()) input_precisions = [] for i in range(len(attrs.input_types)): input_name = InputName("input_{}".format(i)) ip = _get_input_precision(quantizer_interface, config.quantization_precision.get(), input_name) input_precisions.append(ip) # Quantize to int8 if inputs are int8 or don't use the same precision. io_type = ScalarType.int8 use_bfloat16 = False use_int16 = False if len(set(input_precisions)) == 1: # All input precisions are the same. Set I/O type and precision flags. io_type = input_precisions[0] use_bfloat16 = ( config.quantization_precision.get().to_scalar_type() == ScalarType.bfloat16 and scalar_is_floating(input_precisions[0]) ) use_int16 = config.quantization_precision.get().is_int16_precision() and \ input_precisions[0] == ScalarType.int16 # Quantize inputs and output with the same type. input_quants = [] for i in range(len(attrs.input_types)): input_name = InputName("input_{}".format(i)) q = fix_input(quantizer_interface, io_type, input_name, config.asymmetry.get()) input_quants.append(q) output_quant = fix_output(quantizer_interface, io_type, _get_concat_out_shape(attrs), config.asymmetry.get()) if use_bfloat16: return dataclasses.replace(attrs, scalar_type=io_type) else: # Calculate requantization factors input_scales = [q.quant.scale for q in input_quants] out_dtype = np.int16 if use_int16 else np.int8 sc_correction_bits = 32 if use_int16 else 8 # The int8 algorithm stores sc_corr in 8 bits requants = [] for q in input_quants: sc_corr, zp_corr, shift = quant_utils.requantization(q.quant, output_quant.quant, sc_correction_bits=sc_correction_bits) rq = requantization.FractionalZeroRequantization(sc_corr, zp_corr, utils.create_and_verify_narrowing(shift, RoundType.TOEVEN, out_dtype)) requants.append(rq) # Create quantization parameters attrs = dataclasses.replace(attrs, input_types=[dataclasses.replace(t, scalar=io_type) for t in attrs.input_types]) quant_attrs: ConcatQuantAttrs = \ ConcatQuantAttrs(attrs=attrs, requants=requants, layer_bits=[8], input_scales=input_scales, node_scales=[output_quant.quant.scale], node_zps=[output_quant.quant.zero_point]) return quant_attrs
@classmethod @type_check_operation_arguments(types=[ConcatQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run_quant(cls, quant_attrs: ConcatQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: data_list = [] for data, requant in zip(list(input_dict.values()), quant_attrs.requants): output_8 = requantization.requantize(data.astype(np.int32), requant) data_list.append(output_8) quantized_output = cls.concatenate_fn(quant_attrs.attrs, data_list) return quantized_output
[docs] class TransposeOp(AwesomeOperation[TransposeAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] transpose_fn: Callable[[TransposeAttrs, np.ndarray], np.ndarray] = op_fn.transpose
@classmethod
[docs] def get_type(cls, attrs: Union[TransposeAttrs, QUANT_ATTRS]) -> NodeType: input_shape = attrs.input_shape output_shape = _get_transpose_out_shape(attrs) dtype = attrs.input_type return NodeType({cls.input_list[0]: TensorValue(TensorType(dtype, input_shape))}, TensorValue(TensorType(dtype, tuple(output_shape))))
@classmethod @type_check_operation_arguments(types=[TransposeAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: TransposeAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.transpose_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs] def quantize(cls, attrs: TransposeAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> TransposeAttrs: compute_precision = config.quantization_precision.get().to_scalar_type() input_type = ScalarType.int8 if scalar_is_integral(compute_precision) else ScalarType.bfloat16 input_quant = fix_input(quantizer_interface, input_type, InputName('data'), config.asymmetry.get()) output_shape = _get_transpose_out_shape(attrs) fix_output_from_input(quantizer_interface, output_shape) # Update attributes to use the specified type. attrs.input_type = input_quant.type.scalar return attrs
[docs] class DepthToSpaceOp(AwesomeOperation[DepthToSpaceAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] depth_to_space_fn: Callable[[DepthToSpaceAttrs, np.ndarray], np.ndarray] = op_fn.depth_to_space
@classmethod
[docs] def get_type(cls, attrs: Union[DepthToSpaceAttrs, QUANT_ATTRS]) -> NodeType: input_shape = attrs.input_shape output_shape = _get_depth_to_space_out_shape(attrs) dtype = attrs.input_type return NodeType({cls.input_list[0]: TensorValue(TensorType(dtype, input_shape))}, TensorValue(TensorType(dtype, tuple(output_shape))))
@classmethod @type_check_operation_arguments(types=[DepthToSpaceAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: DepthToSpaceAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.depth_to_space_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs] def quantize(cls, attrs: DepthToSpaceAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> DepthToSpaceAttrs: compute_precision = config.quantization_precision.get().to_scalar_type() input_type = ScalarType.int8 if scalar_is_integral(compute_precision) else ScalarType.bfloat16 input_quant = fix_input(quantizer_interface, input_type, InputName('data'), config.asymmetry.get()) output_shape = _get_depth_to_space_out_shape(attrs) fix_output_from_input(quantizer_interface, output_shape) return dataclasses.replace(attrs, input_type=input_quant.type.scalar)
[docs] class ReshapeOp(AwesomeOperation[ReshapeAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] reshape_fn: Callable[[ReshapeAttrs, np.ndarray], np.ndarray] = op_fn.reshape
@classmethod
[docs] def get_type(cls, attrs: ReshapeAttrs) -> NodeType: data_type = attrs.dtype return NodeType({cls.input_list[0]: TensorValue(TensorType(data_type, attrs.input_shape))}, TensorValue(TensorType(data_type, tuple(attrs.newshape))))
@classmethod @type_check_operation_arguments(types=[ReshapeAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: ReshapeAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.reshape_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs] def quantize(cls, attrs: ReshapeAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> ReshapeAttrs: q = keep_input(quantizer_interface, InputName('data')) fix_output_from_input(quantizer_interface, tuple(attrs.newshape)) attrs.dtype = q.type.scalar return attrs
@classmethod
[docs] def get_observed_distribution(cls, calib_attrs: AwesomeCalibAttrs, inputs: Dict[InputName, QuantizationTensorData]) \ -> Tuple[Optional[ObservedDistribution], Dict[str, ObservedDistribution]]: return None, {}
[docs] class ExpandDimsOp(AwesomeOperation[ExpandDimsAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] expand_dims_fn: Callable[[ReshapeAttrs, np.ndarray], np.ndarray] = op_fn.expand_dims
@classmethod
[docs] def get_type(cls, attrs: Union[ExpandDimsAttrs, QUANT_ATTRS]) -> NodeType: input_shape = attrs.input_shape output_shape = get_expand_dims_out_shape(attrs) dtype = attrs.input_type return NodeType({cls.input_list[0]: TensorValue(TensorType(dtype, input_shape))}, TensorValue(TensorType(dtype, tuple(output_shape))))
@classmethod @type_check_operation_arguments(types=[ExpandDimsAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: ExpandDimsAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.expand_dims_fn(attrs, input_dict[InputName('data')])
[docs] class SplitOp(AwesomeOperation[SplitAttrs, AwesomeQuantAttrBase]): """ SplitOp takes in one tensor, returns a tuple """
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] split_fn: Callable[[SplitAttrs, np.ndarray], np.ndarray] = op_fn.split
@classmethod
[docs] def get_type(cls, attrs: Union[SplitAttrs, QUANT_ATTRS]) -> NodeType: input_shape = attrs.input_shape outputs = _get_split_out_shape(attrs) dtype = attrs.input_type return NodeType({cls.input_list[0]: TensorValue(TensorType(dtype, input_shape))}, TupleValue([TensorValue(TensorType(dtype, tuple(output))) for output in outputs]))
@classmethod @type_check_operation_arguments(types=[SplitAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: SplitAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> Tuple[np.ndarray, ...]: return cls.split_fn(attrs, input_dict[InputName('data')])
[docs] class TakeOp(AwesomeOperation[TakeAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data'), InputName('indices')]
[docs] take_fn: Callable[[TakeAttrs, np.ndarray, np.ndarray], np.ndarray] = op_fn.take
@classmethod
[docs] def get_type(cls, attrs: Union[TakeAttrs, QUANT_ATTRS]) -> NodeType: input_shape, indices_shape = attrs.input_shape, attrs.indices_shape dtype = attrs.input_type output_shape = _get_take_out_shape(attrs) return NodeType({cls.input_list[0]: TensorValue(TensorType(dtype, input_shape)), cls.input_list[1]: TensorValue(TensorType(ScalarType.int32, indices_shape))}, TensorValue(TensorType(dtype, output_shape)))
@classmethod @type_check_operation_arguments(types=[TakeAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: TakeAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.take_fn(attrs, input_dict[InputName('data')], input_dict[InputName('indices')])
[docs] class StridedSliceOp(AwesomeOperation[StridedSliceAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] strided_slice_fn: Callable[[StridedSliceAttrs, np.ndarray], np.ndarray] = op_fn.strided_slice
@classmethod
[docs] def get_type(cls, attrs: Union[StridedSliceAttrs, QUANT_ATTRS]) -> NodeType: input_shape = attrs.input_shape output_shape = get_strided_slice_out_shape(attrs) dtype = attrs.input_type return NodeType({cls.input_list[0]: TensorValue(TensorType(dtype, input_shape))}, TensorValue(TensorType(dtype, tuple(output_shape))))
@classmethod @type_check_operation_arguments(types=[StridedSliceAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: StridedSliceAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.strided_slice_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs] def quantize(cls, attrs: StridedSliceAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> StridedSliceAttrs: # Use the input type and quantization q = keep_input(quantizer_interface, InputName('data')) fix_output_from_input(quantizer_interface, get_strided_slice_out_shape(attrs)) attrs.input_type = q.type.scalar return attrs
[docs] class LayoutTransformOp(AwesomeOperation[LayoutTransformAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] layout_transform_fn: Callable[[np.ndarray], np.ndarray] = op_fn.layout_transform
@classmethod
[docs] def get_type(cls, attrs: LayoutTransformAttrs) -> NodeType: output_shape = tuple([attrs.input_type.shape[attrs.src_layout.index(c)] for c in attrs.dst_layout]) return NodeType({cls.input_list[0]: TensorValue(attrs.input_type)}, TensorValue(TensorType(attrs.input_type.scalar, output_shape)))
@classmethod @type_check_operation_arguments(types=[LayoutTransformAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: LayoutTransformAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.layout_transform_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs] def quantize(cls, attrs: LayoutTransformAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> AwesomeQuantAttrBase: # Use the input type and quantization _require_integer_precision(cls.__name__, config) q = keep_input(quantizer_interface, InputName('data')) input_type = q.type output_shape = utils.transpose_attr_according_to_layout_strings(input_type.shape, attrs.src_layout, attrs.dst_layout) fix_output_from_input(quantizer_interface, tuple(output_shape)) attrs.input_type = TensorType(input_type.scalar, attrs.input_type.shape) return attrs
@classmethod
[docs] def get_observed_distribution(cls, calib_attrs: AwesomeCalibAttrs, inputs: Dict[InputName, QuantizationTensorData]) \ -> Tuple[Optional[ObservedDistribution], Dict[str, ObservedDistribution]]: return None, {}
[docs] class TessellationTransformOp(AwesomeOperation[TessellationTransformAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod
[docs] def get_type(cls, attrs: Union[AWESOME_ATTRS, QUANT_ATTRS]) -> NodeType: # TessellationTransformOp is explicitly created during compilation of AwesomeNet # and should always use TessellationTransformAttrs assert isinstance(attrs, TessellationTransformAttrs) frame_type = attrs.frame_type output_shape = op_fn.calculate_tessellated_tensor_shape( frame_type, attrs.slice_shape, attrs.align_c16 ) return NodeType({InputName("data"): TensorValue(frame_type)}, TensorValue(TensorType(ScalarType.int8, output_shape)))
@classmethod @type_check_operation_arguments(types=[TessellationTransformAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: TessellationTransformAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: # Temporary cast to int8 until quantize nodes are inserted input_data = input_dict[InputName('data')] input_type = attrs.frame_type.scalar.numpy_type() return op_fn.tessellation(attrs, input_data.astype(input_type))
[docs] class DetessellationTransformOp(AwesomeOperation[DetessellationTransformAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod
[docs] def get_type(cls, attrs: Union[AWESOME_ATTRS, QUANT_ATTRS]) -> NodeType: # DetessellationTransformOp is explicitly created during compilation of AwesomeNet # and should always use DetessellationTransformAttrs assert isinstance(attrs, DetessellationTransformAttrs) return NodeType({InputName("data"): TensorValue(TensorType(ScalarType.int8, attrs.input_shape))}, TensorValue(attrs.frame_type))
@classmethod @type_check_operation_arguments(types=[DetessellationTransformAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: DetessellationTransformAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return op_fn.detessellation(attrs, input_dict[InputName('data')])
[docs] class PackTransformOp(AwesomeOperation[PackTransformAttrs, AwesomeQuantAttrBase]): # PackTransform can have a various number of inputs, so we can't establish common input names.
[docs] input_list: ClassVar[List[InputName]] = None
@classmethod
[docs] def get_type(cls, attrs: PackTransformAttrs) -> NodeType: batch_size = attrs.input_shapes[0].shape[0] input_types = get_pack_input_types(attrs.input_shapes) input_types = [TensorValue(TensorType(input_type.scalar, input_type.shape[1:])) for input_type in input_types] data_type = ScalarType.int8 data_size = data_byte_size(TupleValue(input_types)) output_shape = (batch_size, data_size) return NodeType({'input_{}'.format(i): TensorValue(i_type) for i, i_type in enumerate(attrs.input_shapes)}, TensorValue(TensorType(data_type, output_shape)))
@classmethod @type_check_operation_arguments(types=[PackTransformAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, _: PackTransformAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return op_fn.pack(list(input_dict.values()))
[docs] class UnpackTransformOp(AwesomeOperation[UnpackTransformAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod
[docs] def get_type(cls, attrs: Union[AWESOME_ATTRS, QUANT_ATTRS]) -> NodeType: # UnpackTransformOp is explicitly created during compilation of AwesomeNet # and should always use UnpackTransformAttrs assert isinstance(attrs, UnpackTransformAttrs) return NodeType({InputName("data"): TensorValue(TensorType(ScalarType.int8, attrs.input_shape))}, TupleValue([TensorValue(output_type) for output_type in attrs.tensor_types]))
@classmethod @type_check_operation_arguments(types=[UnpackTransformAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: UnpackTransformAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return op_fn.unpack(attrs, input_dict[InputName('data')])
[docs] class NormalizationTransformOp(AwesomeOperation[NormalizationTransformAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod
[docs] def get_type(cls, attrs: Union[AWESOME_ATTRS, QUANT_ATTRS]) -> NodeType: # NormalizationTransformOp should be created from auxiliary transform and should # never be quantized internally. assert isinstance(attrs, NormalizationTransformAttrs) input_type = attrs.input_type return NodeType({InputName("data"): TensorValue(input_type)}, TensorValue(TensorType(ScalarType.float32, input_type.shape)))
@classmethod @type_check_operation_arguments(types=[NormalizationTransformAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: NormalizationTransformAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return op_fn.normalization(attrs, input_dict[InputName('data')])
[docs] class QuantizationTransformOp(AwesomeOperation[QuantizationTransformAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod
[docs] def get_type(cls, attrs: Union[AWESOME_ATTRS, QUANT_ATTRS]) -> NodeType: # QuantizationTransformOp should explicitly be created either during internal # quantization process or from auxiliary transform and its attributes should # never be quantized. assert isinstance(attrs, QuantizationTransformAttrs) input_shape = attrs.input_shape assert attrs.num_bits in (8, 16) output_type = attrs.output_data_type return NodeType({InputName("data"): TensorValue(TensorType(ScalarType.float32, input_shape))}, TensorValue(TensorType(output_type, input_shape)))
@classmethod @type_check_operation_arguments(types=[QuantizationTransformAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: QuantizationTransformAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: data = input_dict[InputName('data')] dtype = attrs.output_data_type.numpy_type() return op_fn.ev_quantize(attrs, data).astype(dtype, copy=False)
[docs] class DequantizationTransformOp(AwesomeOperation[DequantizationTransformAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod
[docs] def get_type(cls, attrs: Union[AWESOME_ATTRS, QUANT_ATTRS]) -> NodeType: # DequantizationTransformOp should explicitly be created either during internal # quantization process or from auxiliary transform and its attributes should # never be quantized. assert isinstance(attrs, DequantizationTransformAttrs) return NodeType({InputName("data"): TensorValue(attrs.input_type)}, TensorValue(TensorType(attrs.output_type, attrs.input_type.shape)))
@classmethod @type_check_operation_arguments(types=[DequantizationTransformAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: DequantizationTransformAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return op_fn.ev_dequantize(attrs, input_dict[InputName('data')])
[docs] class ResizeTransformOp(AwesomeOperation[ResizeTransformAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod
[docs] def get_type(cls, attrs: Union[AWESOME_ATTRS, QUANT_ATTRS]) -> NodeType: # ResizeTransformOp should be created from auxiliary transform and should never # be quantized internally. assert isinstance(attrs, ResizeTransformAttrs) input_type = attrs.input_type return NodeType({InputName("data"): TensorValue(input_type)}, TensorValue(TensorType(input_type.scalar, (input_type.shape[0], attrs.target_height, attrs.target_width, input_type.shape[3]))))
@classmethod @type_check_operation_arguments(types=[ResizeTransformAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: ResizeTransformAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return op_fn.ev_resize(attrs, input_dict[InputName('data')])
[docs] class ChromaUpsampleTransformOp(AwesomeOperation[ChromaUpsampleTransformAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod
[docs] def get_type(cls, attrs: Union[AWESOME_ATTRS, QUANT_ATTRS]) -> NodeType: # ChromaUpsampleTransformOp should be created from auxiliary transform # and should never be quantized internally. assert isinstance(attrs, ChromaUpsampleTransformAttrs) input_type = attrs.input_type return NodeType({InputName("data"): TensorValue(input_type)}, TensorValue(TensorType(input_type.scalar, (input_type.shape[0], attrs.frame_height, attrs.frame_width, 3))))
@classmethod @type_check_operation_arguments(types=[ChromaUpsampleTransformAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: ChromaUpsampleTransformAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return op_fn.chroma_upsample(attrs, input_dict[InputName('data')])
[docs] class YuvRgbConversionTransformOp(AwesomeOperation[YuvRgbConversionTransformAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod
[docs] def get_type(cls, attrs: Union[AWESOME_ATTRS, QUANT_ATTRS]) -> NodeType: # YuvRgbConversionTransformOp should be created from auxiliary transform # and should never be quantized internally. assert isinstance(attrs, YuvRgbConversionTransformAttrs) input_shape = attrs.input_shape return NodeType({InputName("data"): TensorValue(TensorType(ScalarType.uint8, input_shape))}, TensorValue(TensorType(ScalarType.uint8, input_shape)))
@classmethod @type_check_operation_arguments(types=[YuvRgbConversionTransformAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: YuvRgbConversionTransformAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return op_fn.yuv_rgb_conversion(attrs, input_dict[InputName('data')])
[docs] class BgrRgbConversionTransformOp(AwesomeOperation[BgrRgbConversionTransformAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod
[docs] def get_type(cls, attrs: Union[AWESOME_ATTRS, QUANT_ATTRS]) -> NodeType: # BgrRgbConversionTransformOp should be created from auxiliary transform # and should never be quantized internally. assert isinstance(attrs, BgrRgbConversionTransformAttrs) input_shape = attrs.input_shape return NodeType({InputName("data"): TensorValue(TensorType(ScalarType.uint8, input_shape))}, TensorValue(TensorType(ScalarType.uint8, input_shape)))
@classmethod @type_check_operation_arguments(types=[BgrRgbConversionTransformAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: BgrRgbConversionTransformAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return op_fn.bgr_rgb_conversion(attrs, input_dict[InputName('data')])
[docs] class SigmoidTransformOp(AwesomeOperation[SigmoidTransformAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod
[docs] def get_type(cls, attrs: Union[AWESOME_ATTRS, QUANT_ATTRS]) -> NodeType: # SigmoidTransformOp should be created from auxiliary transform and should # never be quantized internally. assert isinstance(attrs, SigmoidTransformAttrs) input_shape = attrs.input_shape output_type = ScalarType.int16 if attrs.save_int16 else ScalarType.float32 return NodeType({InputName("data"): TensorValue(TensorType(ScalarType.float32, input_shape))}, TensorValue(TensorType(output_type, input_shape)))
@classmethod @type_check_operation_arguments(types=[SigmoidTransformAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: SigmoidTransformAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return op_fn.ev_sigmoid(attrs, input_dict[InputName('data')])
[docs] class NmsMaxpoolTransformOp(AwesomeOperation[NmsMaxpoolTransformAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod
[docs] def get_type(cls, attrs: Union[AWESOME_ATTRS, QUANT_ATTRS]) -> NodeType: # NmsMaxpoolTransformOp should be created from auxiliary transform and should # never be quantized internally. assert isinstance(attrs, NmsMaxpoolTransformAttrs) return NodeType({InputName("data"): TensorValue(attrs.input_type)}, TensorValue(attrs.input_type))
@classmethod @type_check_operation_arguments(types=[NmsMaxpoolTransformAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: NmsMaxpoolTransformAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return op_fn.nms_maxpool(attrs, input_dict[InputName('data')])
[docs] class CastOp(AwesomeOperation[CastAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] cast_fn: Callable[[CastAttrs, np.ndarray], np.ndarray] = op_fn.cast
@classmethod
[docs] def get_type(cls, attrs: Union[CastAttrs, QUANT_ATTRS]) -> NodeType: in_dtype = attrs.input_type shape = attrs.input_shape out_dtype = scalar_type_from_dtype(attrs.out_dtype) return NodeType({cls.input_list[0]: TensorValue(TensorType(in_dtype, shape))}, TensorValue(TensorType(out_dtype, shape)))
@classmethod @type_check_operation_arguments(types=[CastAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: CastAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.cast_fn(attrs, input_dict[InputName('data')])
####################### # COMPOSITE OPERATIONS ####################### ################### # Add, Activations ###################
[docs] class AddActivationOp(AwesomeOperation[AddActivationAttrs, AddQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('lhs'), InputName('rhs')]
""" The AddActivationOp can only handle the: * Add + Relu * Add + Clip """
[docs] add_fn: Callable[[np.ndarray, np.ndarray, Optional[int]], np.ndarray] = op_fn.add
[docs] relu_fn: Callable[[np.ndarray, int], np.ndarray] = op_fn.relu
[docs] clip_fn: Callable[[ClipAttrs | ClipQuantAttrs, np.ndarray], np.ndarray] = op_fn.clip
[docs] requantize_fn: Callable[ [np.ndarray, int, Union[int, np.ndarray], int, bool, str], np.ndarray] = quant_utils.requantize
@classmethod
[docs] def get_type(cls, attrs: Union[AddActivationAttrs, AddQuantAttrs]) -> NodeType: if isinstance(attrs, AddActivationAttrs): in_type = out_type = attrs.add_attrs.scalar_type else: in_type = ScalarType.int16 if attrs.input_int16 else ScalarType.int8 out_type = ScalarType.from_numpy(attrs.requant.out_dtype) assert out_type in (ScalarType.int8, ScalarType.int16, ScalarType.int32) attrs = attrs.add_attrs if isinstance(attrs, AddActivationAttrs) else attrs lhs_input_shape = attrs.lhs_input_shape rhs_input_shape = attrs.rhs_input_shape output_shape = _get_out_shape_for_op_with_2_inputs(attrs) return NodeType({cls.input_list[0]: TensorValue(TensorType(in_type, lhs_input_shape)), cls.input_list[1]: TensorValue(TensorType(in_type, rhs_input_shape))}, TensorValue(TensorType(out_type, output_shape)))
@classmethod @type_check_operation_arguments(types=[AddActivationAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: AddActivationAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: add_out = cls.add_fn(input_dict[InputName('lhs')], input_dict[InputName('rhs')]) if isinstance(attrs.activ_attrs, ReluAttrs): return cls.relu_fn(add_out) elif isinstance(attrs.activ_attrs, ClipAttrs): return cls.clip_fn(attrs.activ_attrs, add_out) return add_out
@classmethod
[docs] def quantize(cls, attrs: AddActivationAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) \ -> Union[AddActivationAttrs, AddQuantAttrs]: output_shape = _get_out_shape_for_op_with_2_inputs(attrs.add_attrs) quantization: Optional[QuantResultTensorType] = None # Detect input's quantization precision compute_precision = config.quantization_precision.get().to_scalar_type() use_int16 = compute_precision == ScalarType.int16 lhs_quantization = fix_input(quantizer_interface, compute_precision, InputName('lhs'), config.asymmetry.get()) rhs_quantization = fix_input(quantizer_interface, compute_precision, InputName('rhs'), config.asymmetry.get()) # For Add + Clip, try to remove the clip by adjusting the output quantization if it meets certain conditions. # If the clip cannot be removed, later quantize_activation will try to convert it to RELU. if config.quantization_precision.get() in (QuantizationPrecision.INT_8, QuantizationPrecision.INT_16) \ and isinstance(attrs.activ_attrs, ClipAttrs | ClipQuantAttrs) and config.asymmetry.get(): # Try to quantize output with clip range and remove Clip fp_min = attrs.activ_attrs.a_min fp_max = attrs.activ_attrs.a_max # Zero must be inside the clip range if (fp_min <= 0) and (0 <= fp_max): clipped_scale = quant_utils.compute_scale(True, 8, fp_min, fp_max) clipped_zp = quant_utils.compute_zero_point(True, 8, fp_min, fp_max) clipped_quant = Quantization(clipped_scale, clipped_zp, bits=8, min_val=fp_min, max_val=fp_max) type_int8 = TensorType(ScalarType.int8, output_shape) quantization = QuantResultTensorType(type_int8, clipped_quant, RequantMethod.fractional_zero) attrs.activ_attrs = None if scalar_is_integral(compute_precision): # Quantize the operator # If output quantization was not set, then set it according to calibration results if quantization is None: quantization = quantize_output(quantizer_interface, ScalarType.int8, output_shape, config.asymmetry.get()) input_scales = [lhs_quantization.quant.scale, rhs_quantization.quant.scale] input_zps = [lhs_quantization.quant.zero_point, rhs_quantization.quant.zero_point] scale = quantization.quant.scale zero_point = quantization.quant.zero_point layer_bits = quantization.quant.bits assert quantization.type.scalar == ScalarType.int8 # quantize_add_subtract was designed for int8 scales, zp_corr, shift = quant_utils.quantize_add_subtract(False, input_scales, input_zps, scale, zero_point, layer_bits) # Ensure that right-shift is nonnegative. Handle a negative shift by reducing the scale and zero point. if shift < 0: scale_adjustment = 1 << -shift adjusted_quant = dataclasses.replace(quantization.quant, scale=quantization.quant.scale / scale_adjustment, zero_point=int(quantization.quant.zero_point / scale_adjustment)) quantization = dataclasses.replace(quantization, quant=adjusted_quant) shift = 0 if not use_int16: # Requantize to int8 using the calculated shift and zp_corr requant = requantization.FractionalZeroRequantization( 1, zp_corr, utils.create_and_verify_narrowing(shift, RoundType.TOEVEN, np.int8) ) output_quantization = quantization else: # Convert the int8 requantization to an int16 requantization. # Subtract 8 from shift, but don't reduce shift below 0. shift_adjustment = min(shift, 8) shift -= shift_adjustment scale *= (1 << shift_adjustment) zero_point *= (1 << shift_adjustment) # Create the output quantization and requantization information quantization_16 = Quantization(scale, zero_point, bits=16, min_val=quantization.quant.min_val, max_val=quantization.quant.max_val) type_16 = TensorType(ScalarType.int16, quantization.type.shape) output_quantization = QuantResultTensorType(type_16, quantization_16, RequantMethod.fractional_zero) requant = requantization.FractionalZeroRequantization(1, zp_corr, utils.create_and_verify_narrowing(shift, RoundType.TOEVEN, np.int16)) layer_bits = 16 # Save results for quantized add operator quantizer_interface.set_chosen_output_quant(TensorValue(output_quantization)) activ_attrs = quant_utils.quantize_activation(attrs.activ_attrs, output_quantization.quant, compute_precision) quant_attrs = AddQuantAttrs(attrs.add_attrs.lhs_input_shape, attrs.add_attrs.rhs_input_shape, use_int16, requant, zero_point, scales[0], scales[1], layer_bits, activ_attrs) else: # Save results for floating-point add operator fix_output(quantizer_interface, ScalarType.bfloat16, output_shape, config.asymmetry.get()) quant_attrs = dataclasses.replace(attrs, add_attrs=dataclasses.replace(attrs.add_attrs, scalar_type=ScalarType.bfloat16)) return quant_attrs
@classmethod
[docs] def run_quant(cls, quant_attrs: AddQuantAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> np.ndarray: # TODO: Multiplication with in_scale in case of const may be done at quantize phase # The constant will be 32-bits in size when multiplied with the in_scale output = cls.add_fn(input_dict[InputName('lhs')].astype(np.int32, copy=False) * quant_attrs.lhs_scale, input_dict[InputName('rhs')].astype(np.int32, copy=False) * quant_attrs.rhs_scale) quantized_output = requantization.requantize(output, quant_attrs.requant) if isinstance(quant_attrs.activ_attrs, ReluAttrs | ReluQuantAttrs): quantized_output = cls.relu_fn(quantized_output, quant_attrs.relu_zero_point) elif isinstance(quant_attrs.activ_attrs, ClipAttrs | ClipQuantAttrs): return cls.clip_fn(quant_attrs.activ_attrs, quantized_output) return quantized_output
[docs] class ConstantMultiplyAddOp(AddActivationOp): """ An add operator fused with multiplication by a scalar constant. The operator performs the floating-point operation (a*c + b*d), where c and d are scalar constants. After quantization, it behaves like an add operator. The multiplication is incorporated into the add operator's requantization. """
[docs] multiply_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] = op_fn.multiply
@classmethod
[docs] def get_type(cls, attrs: Union[ConstantMultiplyAddAttrs, AddQuantAttrs]) -> NodeType: if isinstance(attrs, ConstantMultiplyAddAttrs): data_type = attrs.scalar_type else: data_type = ScalarType.int8 lhs_input_shape = attrs.lhs_input_shape rhs_input_shape = attrs.rhs_input_shape output_shape = _get_out_shape_for_op_with_2_inputs(attrs) return NodeType({cls.input_list[0]: TensorValue(TensorType(data_type, lhs_input_shape)), cls.input_list[1]: TensorValue(TensorType(data_type, rhs_input_shape))}, TensorValue(TensorType(data_type, output_shape)))
@classmethod @type_check_operation_arguments(types=[ConstantMultiplyAddAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: ConstantMultiplyAddAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: in1_data = cls.multiply_fn(input_dict[InputName('lhs')], attrs.in1_const_attrs.data[0]) \ if attrs.in1_const_attrs is not None else input_dict[InputName('lhs')] in2_data = cls.multiply_fn(input_dict[InputName('rhs')], attrs.in2_const_attrs.data[0]) \ if attrs.in2_const_attrs is not None else input_dict[InputName('rhs')] return cls.add_fn(in1_data, in2_data)
@classmethod
[docs] def quantize( cls, attrs: ConstantMultiplyAddAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter ) -> Union[AddQuantAttrs, ConstantMultiplyAddAttrs]: in1_scale_const = attrs.in1_const_attrs.data[0] in2_scale_const = attrs.in2_const_attrs.data[0] if attrs.in2_const_attrs is not None else 1 output_type = cls.get_type(attrs).output assert isinstance(output_type, TensorValue) output_shape = output_type.value.shape compute_precision = config.quantization_precision.get().to_scalar_type() if scalar_is_integral(compute_precision): lhs_quantization = fix_input_to_int8(quantizer_interface, InputName('lhs'), config.asymmetry.get()) rhs_quantization = fix_input_to_int8(quantizer_interface, InputName('rhs'), config.asymmetry.get()) quantization = fix_output_to_int8(quantizer_interface, output_shape, config.asymmetry.get()) input_scales = [lhs_quantization.quant.scale, rhs_quantization.quant.scale] input_zps = [lhs_quantization.quant.zero_point, rhs_quantization.quant.zero_point] scale = quantization.quant.scale zero_point = quantization.quant.zero_point layer_bits = quantization.quant.bits scales, zp_corr, shift = quant_utils.quantize_add_subtract( False, input_scales, input_zps, scale, zero_point, layer_bits, in1_scale_const=in1_scale_const, in2_scale_const=in2_scale_const ) requant = requantization.FractionalZeroRequantization( 1, zp_corr, utils.create_and_verify_narrowing(shift, RoundType.TOEVEN, np.int8) ) return AddQuantAttrs( attrs.lhs_input_shape, attrs.rhs_input_shape, False, requant, 0, scales[0], scales[1], layer_bits ) else: # bfloat16 fix_input(quantizer_interface, compute_precision, InputName('lhs'), config.asymmetry.get()) fix_input(quantizer_interface, compute_precision, InputName('rhs'), config.asymmetry.get()) quantization = QuantResultTensorType(TensorType(compute_precision, output_shape), None, None) quantizer_interface.set_chosen_output_quant(TensorValue(quantization)) in1_const_attrs = in2_const_attrs = None if attrs.in1_const_attrs: in1_const_attrs = ConstantAttrs(attrs.in1_const_attrs.data.astype(np.dtype(bfloat16))) if attrs.in2_const_attrs: in2_const_attrs = ConstantAttrs(attrs.in2_const_attrs.data.astype(np.dtype(bfloat16))) return ConstantMultiplyAddAttrs( compute_precision, attrs.lhs_input_shape, attrs.rhs_input_shape, in1_const_attrs, in2_const_attrs )
################################ # Convolution, Add, Activations ################################
[docs] class ConvAddActivationOp(AwesomeOperation[ConvAddActivationAttrs, ConvQuantAttrs]):
[docs] add_fn: Callable[[np.ndarray, np.ndarray, Optional[int]], np.ndarray] = op_fn.add
[docs] requantize_fn: Callable[[np.ndarray, int, Union[int, np.ndarray], int, bool, str], np.ndarray] = \ quant_utils.requantize
[docs] relu_fn: Callable[[np.ndarray, int], np.ndarray] = op_fn.relu
[docs] clip_fn: Callable[[ClipAttrs | ClipQuantAttrs, np.ndarray], np.ndarray] = op_fn.clip
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod
[docs] def get_type(cls, attrs: ConvAddActivationAttrs | ConvQuantAttrs) -> NodeType: if isinstance(attrs, ConvAddActivationAttrs): input_type = output_type = attrs.conv_attrs.input_type assert input_type in (ScalarType.float32, ScalarType.bfloat16) else: input_type = ScalarType.int16 if attrs.input_int16 else ScalarType.int8 output_type = ScalarType.from_numpy(attrs.requant.out_dtype) return _conv_op_type(attrs, input_type, output_type)
@classmethod @type_check_operation_arguments(types=[ConvAddActivationAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: ConvAddActivationAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: data = input_dict[InputName('data')] if not config.fast_mode: run_mode = RunMode.MLA_MODE else: run_mode = RunMode.FAST_MODE # TODO: always use float_convolution if data.dtype == bfloat16: return op_fn.float_convolution(attrs, data, run_mode) conv_out = op_fn.conv_tensorflow(attrs.conv_attrs, data, attrs.weights_attrs.data) if attrs.bias_attrs: conv_out = cls.add_fn(conv_out, attrs.bias_attrs.data, axis=-1) if attrs.activ_attrs: if isinstance(attrs.activ_attrs, ReluAttrs | ReluQuantAttrs): conv_out = cls.relu_fn(conv_out) elif isinstance(attrs.activ_attrs, ClipAttrs | ClipQuantAttrs): conv_out = cls.clip_fn(attrs.activ_attrs, conv_out) return conv_out
@classmethod
[docs] def quantize(cls, attrs: ConvAddActivationAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter)\ -> ConvAddActivationAttrs | ConvQuantAttrs: original_output_type = get_expected_tensor_value(cls.get_type(attrs).output) original_input_type = get_expected_tensor_value(quantizer_interface.get_input_quant()[InputName('data')]) quantization_type = config.quantization_precision.get().to_scalar_type() # Choose scalar type for the input. # If possible, use the given input without converting. Otherwise, convert it to quantization_type. if original_input_type.type.scalar in (ScalarType.int8, ScalarType.int16) \ and quantization_type in (ScalarType.int8, ScalarType.int16): # All combinations of int8 and int16 input/output are supported. Use the given input type. input_type = original_input_type.type.scalar else: input_type = quantization_type input_quant = fix_input(quantizer_interface, input_type, InputName('data'), config.asymmetry.get()) if scalar_is_integral(input_type): # Quantize convolution assert input_quant.quant is not None # Prepare bias correction biascorr_type = BiasCorrectionType.NONE if attrs.bias_attrs is None else config.biascorr_type.get() match biascorr_type: case BiasCorrectionType.NONE: bias_corrector = bias_correction.NullBiasCorrector() case BiasCorrectionType.REGULAR: # Check that the observer is set up to do mean estimation intermediate_distributions = quantizer_interface.get_intermediate_distributions() assert InputName('mean') in intermediate_distributions input_mean = intermediate_distributions[InputName('mean')].get_mean().squeeze(axis=0) assert len(input_mean.shape) == 1 bias_corrector = bias_correction.MeanBiasCorrector(input_mean) case BiasCorrectionType.ITERATIVE: layer_inputs = quantizer_interface.get_calibration_data() assert layer_inputs is not None input_mean = bias_correction.prepare_input_mean(layer_inputs[InputName('data')], original_input_type.quant) bias_corrector = bias_correction.MeanBiasCorrector(input_mean) case _: raise ValueError("Unexpected bias correction type") weight_const_attr = attrs.weights_attrs bias_const_attr_data = attrs.bias_attrs.data if attrs.bias_attrs else None # Select quantization method. use_tflite_quant = config.requantization_mode.get() == RequantizationMode.tflite if config.output_int32.get(): convolution_precision = quantization_conv.ConvolutionPrecision.sima_int32 requant_method = RequantMethod.arith_folded elif config.quantization_precision.get().is_int16_precision(): convolution_precision = quantization_conv.ConvolutionPrecision.tflite_int16 if use_tflite_quant\ else quantization_conv.ConvolutionPrecision.sima_int16 requant_method = RequantMethod.fractional_zero if use_tflite_quant else RequantMethod.arith_folded elif config.quantization_precision.get().is_int8_precision(): convolution_precision = quantization_conv.ConvolutionPrecision.tflite_int8 if use_tflite_quant\ else quantization_conv.ConvolutionPrecision.sima_int8 requant_method = RequantMethod.fractional_zero if use_tflite_quant else RequantMethod.arith_folded else: raise ValueError("Unrecognized quantization precision") # Quantization must adhere to additional restrictions if using relu activation with sima quantization sima_relu_workaround = isinstance(attrs.activ_attrs, ReluAttrs) and \ convolution_precision in (quantization_conv.ConvolutionPrecision.sima_int8, quantization_conv.ConvolutionPrecision.sima_int16) # Choose quantization parameters quantized_weight, quantized_bias, requant, output_scalar_type, output_quant, msb_left_shift = \ afe.ir.quantization_conv.quantize_convolution_parameters( input_quant.quant, quantizer_interface.get_output_distribution(), weight_const_attr.data, bias_const_attr_data, bias_corrector=bias_corrector, per_channel=config.per_channel.get(), asymmetry=config.asymmetry.get(), use_int15=input_type == ScalarType.int16, precision=convolution_precision, allow_full_output_precision=config.intermediate_int32.get(), use_sima_relu_workaround=sima_relu_workaround, error_reporter=error_reporter) if output_scalar_type != ScalarType.int32: # When the output type is not int32, this convolution can't be fused with requantization. # Use the least restrictive requantization method, since the choice will not affect fusion. requant_method = RequantMethod.fractional_zero # Quantize activation attributes activ_attrs = quant_utils.quantize_activation(attrs.activ_attrs, output_quant, output_scalar_type) # Put all results into the output QuantResultTensorType and Conv2DQuantAttrs output_qrtt = QuantResultTensorType(type=TensorType(output_scalar_type, original_output_type.shape), quant=output_quant, requant_method=requant_method) quantizer_interface.set_chosen_output_quant(TensorValue(output_qrtt)) input_zp = input_quant.quant.zero_point zero_point = output_quant.zero_point scale = output_quant.scale return ConvQuantAttrs(conv_attrs=attrs.conv_attrs, scale=scale, zero_point=zero_point, input_zp=input_zp, weight_quant_data=quantized_weight, requant=requant, bias_quant_data=quantized_bias, per_channel=config.per_channel.get(), activ_attrs=activ_attrs, input_int16=input_type == ScalarType.int16, msb_left_shift=msb_left_shift) else: # Produce a bfloat16 convolution assert input_type == ScalarType.bfloat16 output_qrtt = QuantResultTensorType.from_type(TensorType(ScalarType.bfloat16, original_output_type.shape)) quantizer_interface.set_chosen_output_quant(TensorValue(output_qrtt)) if config.quantization_precision.get().is_bfloat16_with_int_weights(): bits = 8 if config.quantization_precision.get().is_bfloat16_with_int8_weights() else 4 # Bfloat16 convolution with int8 or int4 weights quantized_weight, bias, requant = afe.ir.quantization_conv.get_bfloat16_with_int_weights_quant_params( attrs=attrs, per_channel=config.per_channel.get(), bits=bits) return ConvQuantAttrs(conv_attrs=attrs.conv_attrs, activ_attrs=attrs.activ_attrs, weight_quant_data=quantized_weight, bias_quant_data=bias, requant=requant) else: # In the attributes, convert weight type to float32, because Netron doesn't support bfloat16 weight. # In execution later, weight is converted to bfloat16. Bias is always float32 for bfloat16 execution. weight_const_attr = ConstantAttrs(attrs.weights_attrs.data.astype(np.float32)) bias_const_attr = ConstantAttrs(attrs.bias_attrs.data.astype(np.float32)) if attrs.bias_attrs is not None\ else None conv_attr = dataclasses.replace(attrs.conv_attrs, input_type=ScalarType.bfloat16) match attrs.add_attrs: case AddAttrs() as a: add_attr = dataclasses.replace(a, scalar_type=ScalarType.bfloat16) case BiasAddAttrs() as a: add_attr = a case None: add_attr = None return ConvAddActivationAttrs(weights_attrs=weight_const_attr, conv_attrs=conv_attr, bias_attrs=bias_const_attr, add_attrs=add_attr, activ_attrs=attrs.activ_attrs)
@classmethod
[docs] def run_quant(cls, quant_attrs: ConvQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: data = input_dict[InputName('data')] # Use accurate convolution algorithm if it is chosen by config flags # Also use it for int16, because the fast algorithm does not handle int16 if not config.fast_mode or data.dtype == np.int16: run_mode = RunMode.MLA_MODE else: run_mode = RunMode.FAST_MODE ofm = op_fn.quantized_convolution(quant_attrs, data, run_mode) return ofm
@classmethod
[docs] def calibrate(cls, attrs: ConvAddActivationAttrs, calib_attrs: AwesomeCalibAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> Any: """ ConvAddActivation calibration method. Executes default calibration to get results of ConvAdd operation in floating point. Additionally, update intermediate observers for tracking mean values. :param attrs: AwesomeAttributes associated with this operation :param calib_attrs: AwesomeCalibAttrs associated with operation's node. :param input_dict: Dictionary of names (eg. 'weights' 'data') to numpy arrays :param config: Parameters controlling how to calibrate. :return: Output tensor(s) whose type is dependent on the subclass. """ # Run default calibration. outputs = super().calibrate(attrs, calib_attrs, input_dict, config) # Update observers for intermediate values. if attrs.bias_attrs is not None: assert calib_attrs.intermediate_observers assert ('mean' in calib_attrs.intermediate_observers and calib_attrs.intermediate_observers['mean'] is not None) data = input_dict[InputName('data')] calib_attrs.intermediate_observers['mean'].update(data) return outputs
###################### # Tuple + Concatenate ######################
[docs] class TupleConcatenateOp(AwesomeOperation[TupleConcatenateAttrs, ConcatQuantAttrs]): """ This composite node reuse ConcatenateOp run, quantize, and run_quant methods """ # Tuple can have a various number of inputs so we can't establish input names across all inputs
[docs] input_list = None
[docs] tuple_fn: Callable[[List[np.ndarray]], tuple] = tuple
[docs] concatenate_op: AwesomeOperation = ConcatenateOp
@classmethod
[docs] def get_type(cls, attrs: Union[TupleConcatenateAttrs, ConcatQuantAttrs]) -> NodeType: return cls.concatenate_op.get_type(attrs)
@classmethod @type_check_operation_arguments(types=[Union[TupleConcatenateAttrs, ConcatenateAttrs], np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: Union[TupleConcatenateAttrs, ConcatenateAttrs], input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: if isinstance(attrs, ConcatenateAttrs): # In bfloat16 case, we will have ConcatenateAttrs returned from quantize, and execute run method. return cls.concatenate_op.run(attrs, input_dict, config) else: return cls.concatenate_op.run(attrs.concat_attrs, input_dict, config)
@classmethod @type_check_operation_arguments( types=[TupleConcatenateAttrs, OpQuantInterface, QuantizationConfigs, NodeReporter], dict_mask=[False, False, False, False])
[docs] def quantize(cls, attrs: TupleConcatenateAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> ConcatQuantAttrs: return cls.concatenate_op.quantize(attrs.concat_attrs, quantizer_interface, config, error_reporter)
@classmethod @type_check_operation_arguments(types=[ConcatQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run_quant(cls, quant_attrs: ConcatQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.concatenate_op.run_quant(quant_attrs, input_dict, config)
########################## # PARTITIONING OPERATIONS ##########################
[docs] class ExternalOp(AwesomeOperation[ExternalAttrs, AwesomeQuantAttrBase]): # external ops can have a various number of inputs so we can't establish the same # input names across all ExternalAttrs
[docs] input_list = None
[docs] external_fn: Callable[[ExternalAttrs, Dict], Union[np.ndarray, tuple]] = op_fn.external
@classmethod
[docs] def get_type(cls, attrs: Union[ExternalAttrs, AwesomeQuantAttrBase]) -> NodeType: assert isinstance(attrs, ExternalAttrs) return attrs.node_type
@classmethod @type_check_operation_arguments(types=[ExternalAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: ExternalAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> np.ndarray: return cls.external_fn(attrs, input_dict)
@classmethod
[docs] def quantize(cls, attrs: ExternalAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> ExternalAttrs: # The type is not changed by quantization. Set the input and output types to their original values. assert list(attrs.node_type.inputs.keys()) == list(quantizer_interface.get_input_quant().keys()) for k, t in attrs.node_type.inputs.items(): quantizer_interface.set_chosen_input_quant(k, map_data_value(QuantResultTensorType.from_type, t)) quantizer_interface.set_chosen_output_quant(map_data_value(QuantResultTensorType.from_type, attrs.node_type.output)) return attrs
@classmethod
[docs] def get_observed_distribution(cls, calib_attrs: AwesomeCalibAttrs, inputs: Dict[InputName, QuantizationTensorData]) \ -> Tuple[Optional[ObservedDistribution], Dict[str, ObservedDistribution]]: return None, {}
############################# # QNN OPERATIONS #############################
[docs] class QNNQuantizeOp(AwesomeOperation[QNNQuantizeAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] quant_fn: Callable[[QNNQuantizeAttrs, np.ndarray, np.ndarray, np.ndarray], np.ndarray] = op_fn.qnn_quantize
@classmethod
[docs] def get_type(cls, attrs: Union[QNNQuantizeAttrs, QUANT_ATTRS]) -> NodeType: input_data_type = attrs.input_type out_shape = attrs.input_type.shape out_data_type = scalar_type_from_dtype(attrs.out_dtype) return NodeType({cls.input_list[0]: TensorValue(input_data_type)}, TensorValue(TensorType(out_data_type, out_shape)))
# TODO: move the call of quant_fn to run_quant, remove run afterwards @classmethod @type_check_operation_arguments(types=[QNNQuantizeAttrs], dict_mask=[False])
[docs] def run(cls, attrs: QNNQuantizeAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> np.ndarray: output = cls.quant_fn(attrs, input_dict[InputName('data')], attrs.output_scale, attrs.output_zero_point) return output.astype(attrs.out_dtype)
[docs] class RequantizeOp(AwesomeOperation[RequantizeAttrs, RequantizeQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod
[docs] def get_type(cls, attrs: Union[RequantizeAttrs, RequantizeQuantAttrs]) -> NodeType: attrs = attrs.attrs if isinstance(attrs, RequantizeQuantAttrs) else attrs input_type = attrs.input_type out_shape = attrs.input_type.shape out_data_type = scalar_type_from_dtype(attrs.out_dtype) output_type = TensorType(out_data_type, out_shape) return NodeType({cls.input_list[0]: TensorValue(input_type)}, TensorValue(output_type))
@classmethod
[docs] def run_quant(cls, quant_attrs: RequantizeQuantAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> np.ndarray: output = requantization.requantize(input_dict[InputName('data')], quant_attrs.requant) return output
[docs] class QNNDequantizeOp(AwesomeOperation[QNNDequantizeAttrs, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] dequant_fn: Callable[[QNNDequantizeAttrs, np.ndarray, np.ndarray, np.ndarray], np.ndarray] = op_fn.qnn_dequantize
@classmethod
[docs] def get_type(cls, attrs: Union[QNNDequantizeAttrs, QUANT_ATTRS]) -> NodeType: input_data_type = attrs.input_type out_shape = attrs.input_type.shape return NodeType({cls.input_list[0]: TensorValue(input_data_type)}, TensorValue(TensorType(ScalarType.float32, out_shape)))
# TODO: move the call of dequant_fn to run_quant, remove run afterwards @classmethod @type_check_operation_arguments(types=[QNNDequantizeAttrs], dict_mask=[False])
[docs] def run(cls, attrs: QNNDequantizeAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> np.ndarray: output = cls.dequant_fn(attrs, input_dict[InputName('data')], attrs.input_scale, attrs.input_zero_point) return output.astype(Float)
[docs] class QNNMulOp(AwesomeOperation[AwesomeAttributes, AwesomeQuantAttrBase]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('lhs'), InputName('rhs'), InputName('lhs_scale'), InputName('lhs_zero_point'), InputName('rhs_scale'), InputName('rhs_zero_point'), InputName('output_scale'), InputName('output_zero_point')]
[docs] mul_fn: Callable[[AwesomeAttributes, np.ndarray, np.ndarray, float, int, float, int, float, int], np.ndarray] = op_fn.qnn_mul
# TODO: move the call of requant_fn to run_quant, remove run_float afterwards @classmethod @type_check_operation_arguments(types=[AwesomeAttributes], dict_mask=[False])
[docs] def run(cls, attrs: AwesomeAttributes, input_dict: Dict[InputName, Any], config: RunConfigs) -> np.ndarray: output = cls.mul_fn(attrs, input_dict[InputName('lhs')], input_dict[InputName('rhs')], input_dict[InputName('lhs_scale')], input_dict[InputName('lhs_zero_point')], input_dict[InputName('rhs_scale')], input_dict[InputName('rhs_zero_point')], input_dict[InputName('output_scale')], input_dict[InputName('output_zero_point')]) return output.astype(QuantizedTensor)
################### # CUSTOM OPERATION ###################
[docs] class CustomOp(AwesomeOperation[CustomOpAttrs, AwesomeQuantAttrBase]): # Custom operations can have a variable number of inputs.
[docs] input_list = None
[docs] custom_op_fn: Callable[[CustomOpAttrs, Dict[InputName, np.ndarray]], np.ndarray] = op_fn.execute_custom_op
[docs] quant_fn: Callable[[np.ndarray, float, int, int], np.ndarray] = quant_utils.linear_quantize
[docs] dequant_fn: Callable[[np.ndarray, float, int], np.ndarray] = quant_utils.dequantize
@classmethod @type_check_operation_arguments(types=[CustomOpAttrs], dict_mask=[False])
[docs] def run(cls, attrs: CustomOpAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> Union[np.ndarray, tuple]: op_fn.init_custom_op(attrs, input_dict, attrs.output_types[0].shape) return cls.custom_op_fn(attrs, input_dict)
@classmethod
[docs] def quantize(cls, attrs: CustomOpAttrs, calib_attrs: AwesomeCalibAttrs, config: QuantizationConfigs, error_reporter: NodeReporter) -> CustomOpQuantAttrs: input_scales: List[List[float]] = list() input_zps: List[List[int]] = list() for input_quant in calib_attrs.input_quant.values(): scales, zero_points, _, _, _ = quant_utils.quantization_data_value_to_output_list( get_data_value_quant_result_scale_with_dummy(input_quant)) input_scales.append(scales) input_zps.append(zero_points) node_scales, node_zps, layer_bits, _, _ = quant_utils.quantization_data_value_to_output_list( get_data_value_quant_result_scale_with_dummy(calib_attrs.quant).quant) return CustomOpQuantAttrs(attrs, input_scales=input_scales, input_zps=input_zps, node_scales=node_scales, node_zps=node_zps, layer_bits=layer_bits)
@classmethod
[docs] def run_quant(cls, quant_attrs: CustomOpQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: # Dequantize the input tensors if needed _custom_op_attrs = quant_attrs.custom_op_attrs.custom_op_attrs assert isinstance(_custom_op_attrs, Dict) assert "do_dequantize" in _custom_op_attrs for i, (in_name, do_dequantize) in enumerate(zip(input_dict.keys(), _custom_op_attrs["do_dequantize"])): if do_dequantize is False: continue scale = quant_attrs.input_scales[i][0] zp = quant_attrs.input_zps[i][0] input_dict[InputName(in_name)] = cls.dequant_fn(input_dict[InputName(in_name)], 1. / scale, zp) output = cls.custom_op_fn(quant_attrs.custom_op_attrs, input_dict, config) # Quantize the output tensor if needed assert "do_quantize" in _custom_op_attrs if _custom_op_attrs["do_quantize"][0] is True: scale = quant_attrs.node_scales[0] zp = quant_attrs.node_zps[0] output = cls.quant_fn(output, scale, zp, quant_attrs.layer_bits[0]) return output
[docs] class LeakyReluCompositeOp(AwesomeOperation[LeakyReluAttrs, LeakyReluCompositeQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod
[docs] def get_type(cls, attrs: Union[LeakyReluAttrs, LeakyReluCompositeQuantAttrs]) -> NodeType: if isinstance(attrs, LeakyReluAttrs): shape = attrs.input_shape in_data_type = out_data_type = ScalarType.float32 else: shape = attrs.attrs.input_shape is_udf = attrs.udf_quant_attrs is not None if is_udf: in_data_type = ScalarType.int16 if attrs.udf_quant_attrs.input_int16 else ScalarType.int8 out_data_type = ScalarType.from_numpy(attrs.udf_quant_attrs.requant.out_dtype) else: in_data_type = out_data_type = ScalarType.int8 return NodeType({cls.input_list[0]: TensorValue(TensorType(scalar=in_data_type, shape=shape))}, TensorValue(TensorType(scalar=out_data_type, shape=shape)))
@classmethod @type_check_operation_arguments(types=[LeakyReluAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: LeakyReluAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return op_fn.leaky_relu(input_dict[InputName('data')], attrs.alpha)
@classmethod
[docs] def quantize(cls, attrs: LeakyReluAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> LeakyReluCompositeQuantAttrs: input_name = InputName('data') input_precision = get_expected_tensor_value(quantizer_interface.get_input_quant()[input_name]).type scalar_type = ScalarType.int16 if (input_precision.scalar == ScalarType.int16 and config.quantization_precision.get().is_int16_precision()) else \ ScalarType.int8 input_quantization = fix_input(quantizer_interface, scalar_type, input_name, config.asymmetry.get()).quant output_quantization = fix_output(quantizer_interface, scalar_type, input_precision.shape, config.asymmetry.get()).quant input_int16 = scalar_type == ScalarType.int16 leaky_relu_quant_attrs: Optional[LeakyReluQuantAttrs] = None udf_quant_attrs: Optional[UDFQuantAttrs] = None if config.leaky_relu_uses_udf.get(): # Generate UDF LUT def leaky_relu_scalar(x: float) -> float: return float(op_fn.leaky_relu(np.array(x, dtype=np.float32), attrs.alpha).item()) input_type = output_type = scalar_type.numpy_type() lookup_table = quant_utils.quantize_udf( input_quantization, output_quantization, input_type, output_type, leaky_relu_scalar ) intermediate_type = np.int32 if config.intermediate_int32.get() else np.int16 out_dtype = intermediate_type if input_int16 else np.int8 requant = requantization.narrowing_requantization(shift=0, rounding=RoundType.TOEVEN, out_dtype=out_dtype) udf_quant_attrs = UDFQuantAttrs(lookup_table=lookup_table, attrs=attrs, input_int16=input_int16, requant=requant) else: # Create LeakyReluQuantAttrs for arithmetic implementation of leaky relu bits = input_quantization.bits zero_point = input_quantization.zero_point alpha, right_shift = quant_utils.quantize_prelu(bits, attrs.alpha) leaky_relu_quant_attrs = LeakyReluQuantAttrs(alpha, attrs.input_shape, right_shift, zero_point, bits, RoundType.TOEVEN) return LeakyReluCompositeQuantAttrs(attrs, config.leaky_relu_uses_udf.get(), leaky_relu_quant_attrs, udf_quant_attrs)
@classmethod
[docs] def run_quant(cls, quant_attrs: LeakyReluCompositeQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: data = input_dict[InputName('data')] assert data.dtype in (np.int8, np.int16) if quant_attrs.leaky_relu_uses_udf: assert quant_attrs.udf_quant_attrs is not None lut = quant_attrs.udf_quant_attrs.lookup_table return ideal_udf(data, lut) else: assert quant_attrs.leaky_relu_quant_attrs is not None # Positive part output_p = op_fn.relu(data, quant_attrs.leaky_relu_quant_attrs.zero_point) # Negative part output_n = quant_attrs.leaky_relu_quant_attrs.alpha * (data.astype(np.int32) - output_p) quantized_output_n = quant_utils.requantize( output_n, quant_attrs.leaky_relu_quant_attrs.bits, quant_attrs.leaky_relu_quant_attrs.right_shift, zp=None, rounding_type=quant_attrs.leaky_relu_quant_attrs.rounding_type) # Complete output quantized_output = output_p + quantized_output_n return quantized_output
[docs] class ReluOp(AwesomeOperation[ReluAttrs, ReluQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] relu_fn: Callable[[np.ndarray, int], np.ndarray] = op_fn.relu
@classmethod
[docs] def get_type(cls, attrs: Union[ReluAttrs, ReluQuantAttrs]) -> NodeType: data_type = attrs.scalar_type if isinstance(attrs, ReluAttrs) else ScalarType.int8 shape = attrs.input_shape return NodeType({cls.input_list[0]: TensorValue(TensorType(data_type, shape))}, TensorValue(TensorType(data_type, shape)))
@classmethod @type_check_operation_arguments(types=[ReluAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: ReluAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.relu_fn(input_dict[InputName('data')])
@classmethod
[docs] def quantize(cls, attrs: ReluAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> Union[ReluAttrs, ReluQuantAttrs]: compute_precision = config.quantization_precision.get().to_scalar_type() input_scalar_type = ScalarType.bfloat16 if scalar_is_floating(compute_precision) else ScalarType.int8 input_quant = fix_input(quantizer_interface, input_scalar_type, InputName('data'), config.asymmetry.get()) fix_output_from_input(quantizer_interface, input_quant.type.shape) if scalar_is_floating(compute_precision): return dataclasses.replace(attrs, scalar_type=input_scalar_type) else: return ReluQuantAttrs(attrs.input_shape, input_quant.quant.zero_point)
@classmethod
[docs] def run_quant(cls, quant_attrs: ReluQuantAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> np.ndarray: return cls.relu_fn(input_dict[InputName('data')], quant_attrs.zero_point)
[docs] class ClipOp(AwesomeOperation[ClipAttrs, ClipQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] clip_fn: Callable[[ClipAttrs | ClipQuantAttrs, np.ndarray], np.ndarray] = op_fn.clip
@classmethod
[docs] def get_type(cls, attrs: Union[ClipAttrs, ClipQuantAttrs]) -> NodeType: data_type = attrs.scalar_type shape = attrs.shape return NodeType({cls.input_list[0]: TensorValue(TensorType(data_type, shape))}, TensorValue(TensorType(data_type, shape)))
@classmethod
[docs] def run(cls, attrs: ClipAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> Any: return cls.clip_fn(attrs, input_dict[cls.input_list[0]])
@classmethod
[docs] def quantize(cls, attrs: ClipAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> ClipAttrs | ClipQuantAttrs: compute_precision = config.quantization_precision.get().to_scalar_type() input_scalar_type = ScalarType.bfloat16 if scalar_is_floating(compute_precision) else ScalarType.int8 input_quant = fix_input(quantizer_interface, input_scalar_type, InputName('data'), config.asymmetry.get()) fix_output_from_input(quantizer_interface, input_quant.type.shape) if scalar_is_floating(compute_precision): return dataclasses.replace(attrs, scalar_type=input_scalar_type) else: return quant_utils.quantize_clip_attrs(attrs, input_scalar_type, input_quant.quant)
@classmethod
[docs] def run_quant(cls, attrs: ClipAttrs | ClipQuantAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> np.ndarray: return cls.clip_fn(attrs, input_dict[InputName('data')])
class _BatchMatmulBaseOp(AwesomeOperation[BatchMatmulAttrs, BatchMatmulQuantAttrs]): """ Base class implementing batch matmul operation. Subclasses need to override following: - cls.input_list: ClassVar[List[InputName]]: List of input names used in derived operation - cls._get_inputs() method: Method for extracting the values used in batch matmul operation from a dictionary of input values. Current implementation assumes one or two inputs are used in batch matmul operation. In case of single input operation, same input is used as both first and second argument to the batch matmul operation. """ @classmethod def get_type(cls, attrs: Union[BatchMatmulAttrs, BatchMatmulQuantAttrs]) -> NodeType: assert len(cls.input_list) in (1, 2) if isinstance(attrs, BatchMatmulAttrs): in_type = out_type = attrs.scalar_type else: assert isinstance(attrs, BatchMatmulQuantAttrs) in_type = ScalarType.int8 out_type = ScalarType.from_numpy(attrs.requant.out_dtype) attrs = attrs.attrs return NodeType( { name: TensorValue(TensorType(in_type, shape)) for name, shape in zip(cls.input_list, attrs.input_shapes) }, TensorValue(TensorType(out_type, attrs.get_output_shape())) ) @classmethod def _get_inputs(cls, input_dict: Dict[InputName, np.ndarray]) -> Tuple[np.ndarray, np.ndarray]: raise NotImplementedError( "Method _get_inputs needs to be overridden by classes that inherit _BatchMatmulBaseOp." ) @classmethod @type_check_operation_arguments(types=[BatchMatmulAttrs, np.ndarray], dict_mask=[False, True]) def run(cls, attrs: BatchMatmulAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: lhs, rhs = cls._get_inputs(input_dict) return op_fn.batch_matmul(lhs, rhs, attrs) @classmethod def quantize(cls, attrs: BatchMatmulAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) \ -> Union[BatchMatmulAttrs, BatchMatmulQuantAttrs]: assert len(cls.input_list) in (1, 2) compute_precision = config.quantization_precision.get().to_scalar_type() if scalar_is_integral(compute_precision): input_type = ScalarType.int8 output_type = config.quantization_precision.get().to_expected_int_scalar_type() else: input_type = output_type = compute_precision lhs_quant = fix_input(quantizer_interface, input_type, cls.input_list[0], config.asymmetry.get()) rhs_quant = fix_input(quantizer_interface, input_type, cls.input_list[1], config.asymmetry.get()) \ if len(cls.input_list) > 1 else lhs_quant output_shape = attrs.get_output_shape() quantization = quantize_output(quantizer_interface, output_type, output_shape, config.asymmetry.get()) if scalar_is_integral(compute_precision): intrinsic_shift, requant, new_output_quant = quant_utils.quantize_batch_matmul( lhs_quant.quant, rhs_quant.quant, quantization.quant) # Determine attributes and quantization using the result of quantize_multiply output_type = TensorType(ScalarType.from_numpy(requant.out_dtype), output_shape) quantization = QuantResultTensorType(output_type, new_output_quant, RequantMethod.fractional_zero) quantizer_interface.set_chosen_output_quant(TensorValue(quantization)) return BatchMatmulQuantAttrs(attrs, lhs_quant.quant.zero_point, rhs_quant.quant.zero_point, requant, intrinsic_shift) else: quantizer_interface.set_chosen_output_quant(TensorValue(quantization)) return dataclasses.replace(attrs, scalar_type=input_type) @classmethod def run_quant(cls, quant_attrs: BatchMatmulQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: lhs, rhs = cls._get_inputs(input_dict) return op_fn.batch_matmul(lhs, rhs, quant_attrs)
[docs] class BatchMatmulOp(_BatchMatmulBaseOp): """ Standard batch matmul operator where arguments to batch matmul operation are outputs of two different nodes. """
[docs] input_list: ClassVar[List[InputName]] = [InputName('lhs'), InputName('rhs')]
@classmethod def _get_inputs(cls, input_dict: Dict[InputName, np.ndarray]) -> Tuple[np.ndarray, np.ndarray]: assert all([name in input_dict for name in cls.input_list]) and len(input_dict) == 2 lhs = input_dict[cls.input_list[0]] rhs = input_dict[cls.input_list[1]] return lhs, rhs
[docs] class UnaryBatchMatmulOp(_BatchMatmulBaseOp): """ Special case of batch matmul operator where both arguments to batch matmul operation are output of a same node. """
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod def _get_inputs(cls, input_dict: Dict[InputName, np.ndarray]) -> Tuple[np.ndarray, np.ndarray]: assert all([name in input_dict for name in cls.input_list]) and len(input_dict) == 1 input_data = input_dict[cls.input_list[0]] return input_data, input_data
[docs] class LayerNormOp(AwesomeOperation[LayerNormAttrs, LayerNormQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] layer_norm_fn: Callable[[LayerNormAttrs, np.ndarray], np.ndarray] = op_fn.layer_norm
[docs] intermediate_names: ClassVar[List[str]] = ["var"]
@classmethod
[docs] def get_type(cls, attrs: LayerNormAttrs | LayerNormQuantAttrs) -> NodeType: scalar_type = attrs.scalar_type if isinstance(attrs, LayerNormAttrs) else ScalarType.int8 tensor_type = TensorType(scalar_type, attrs.input_shape) return _unary_op_type(cls.input_list, tensor_type)
@classmethod @type_check_operation_arguments(types=[LayerNormAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: LayerNormAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.layer_norm_fn(attrs, input_dict[InputName('data')])
@classmethod @type_check_operation_arguments(types=[LayerNormAttrs, OpQuantInterface, QuantizationConfigs], dict_mask=[False, False, False])
[docs] def quantize(cls, attrs: LayerNormAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> LayerNormAttrs | LayerNormQuantAttrs: compute_precision = config.quantization_precision.get().to_scalar_type() if scalar_is_integral(compute_precision): scalar_type = ScalarType.int8 else: scalar_type = compute_precision input_quant = fix_input(quantizer_interface, scalar_type, InputName('data'), config.asymmetry.get()) output_quant = fix_output(quantizer_interface, scalar_type, attrs.input_shape, config.asymmetry.get()) if scalar_is_integral(compute_precision): intermediate_min_max = get_intermediate_min_max(quantizer_interface) return quant_utils.quantize_layer_norm(attrs, input_quant.quant, output_quant.quant, intermediate_min_max) else: return dataclasses.replace(attrs, scalar_type=scalar_type)
@classmethod @type_check_operation_arguments(types=[LayerNormQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run_quant(cls, quant_attrs: LayerNormQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.layer_norm_fn(quant_attrs, input_dict[InputName('data')])
@classmethod
[docs] def calibrate(cls, attrs: AWESOME_ATTRS, calib_attrs: AwesomeCalibAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> Any: """ Layer Norm calibration method. Executes default calibration to get results of LN operation in floating point. Additionally, calculate intermediate results and update the observers for intermediate values. :param attrs: AwesomeAttributes associated with this operation :param calib_attrs: AwesomeCalibAttrs associated with operation's node. :param input_dict: Dictionary of names (eg. 'weights' 'data') to numpy arrays :param config: Parameters controlling how to calibrate. :return: Output tensor(s) whose type is dependent on the subclass. """ # Run default calibration. outputs = super().calibrate(attrs, calib_attrs, input_dict, config) # Calculate intermediate values. # m = ReduceMean(input, axis, keepdims=True) # sq_mean = (input - m) ** 2 data = input_dict[InputName('data')] sq_mean_fun = lambda x: (x - np.mean(x, axis=attrs.axis, keepdims=True)) ** 2 sq_mean_output = sq_mean_fun(data) # var = ReduceMean((input - m) ** 2, axis, keepdims=True). var_fun = lambda x: np.mean(x, axis=attrs.axis, keepdims=True) var_output = var_fun(sq_mean_output) # Update observers for intermediate values. assert calib_attrs.intermediate_observers assert ('var' in calib_attrs.intermediate_observers and calib_attrs.intermediate_observers['var'] is not None) calib_attrs.intermediate_observers['var'].update(var_output.astype(np.float32)) return outputs
[docs] class InstanceNormOp(AwesomeOperation[InstanceNormAttrs, InstanceNormQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data'), InputName('mean'), InputName('variance')]
[docs] instance_norm_fn: Callable[[np.ndarray], np.ndarray] = op_fn.instance_norm
@classmethod
[docs] def get_type(cls, attrs: InstanceNormAttrs | InstanceNormQuantAttrs) -> NodeType: if isinstance(attrs, InstanceNormAttrs): scalar_type = attrs.scalar_type else: assert isinstance(attrs, InstanceNormQuantAttrs) attrs = attrs.attrs scalar_type = ScalarType.int8 input_data_shape = attrs.input_data_shape mean_shape = attrs.mean_shape variance_shape = attrs.variance_shape return NodeType({cls.input_list[0]: TensorValue(TensorType(scalar_type, input_data_shape)), cls.input_list[1]: TensorValue(TensorType(scalar_type, mean_shape)), cls.input_list[2]: TensorValue(TensorType(scalar_type, variance_shape))}, TensorValue(TensorType(scalar_type, input_data_shape)))
@classmethod @type_check_operation_arguments(types=[InstanceNormAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: InstanceNormAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: data = input_dict[InputName('data')] mean = input_dict[InputName('mean')] variance = input_dict[InputName('variance')] return cls.instance_norm_fn(data, mean, variance, attrs)
@classmethod @type_check_operation_arguments(types=[InstanceNormAttrs, OpQuantInterface, QuantizationConfigs], dict_mask=[False, False, False])
[docs] def quantize(cls, attrs: InstanceNormAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) \ -> InstanceNormAttrs | InstanceNormQuantAttrs: compute_precision = config.quantization_precision.get().to_scalar_type() scalar_type = ScalarType.int8 if scalar_is_integral(compute_precision) else compute_precision input_data_quant = fix_input(quantizer_interface, scalar_type, InputName('data'), config.asymmetry.get()) mean_quant = fix_input(quantizer_interface, scalar_type, InputName('mean'), config.asymmetry.get()) variance_quant = fix_input(quantizer_interface, scalar_type, InputName('variance'), config.asymmetry.get()) output_quant = fix_output(quantizer_interface, scalar_type, attrs.input_data_shape, config.asymmetry.get()) if scalar_is_integral(compute_precision): return quant_utils.quantize_instance_norm(attrs, input_data_quant.quant, mean_quant.quant, variance_quant.quant, output_quant.quant) else: return dataclasses.replace(attrs, scalar_type=scalar_type)
@classmethod @type_check_operation_arguments(types=[InstanceNormQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run_quant(cls, quant_attrs: InstanceNormQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: data = input_dict[InputName('data')] mean = input_dict[InputName('mean')] variance = input_dict[InputName('variance')] return cls.instance_norm_fn(data, mean, variance, quant_attrs)
[docs] class RMSNormOp(AwesomeOperation[RMSNormAttrs, RMSNormQuantAttrs]):
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
[docs] rms_norm_fn: Callable[[RMSNormAttrs, np.ndarray], np.ndarray] = op_fn.rms_norm
[docs] intermediate_names: ClassVar[List[str]] = ["reduce_mean"]
@classmethod
[docs] def get_type(cls, attrs: Union[RMSNormAttrs, RMSNormQuantAttrs]) -> NodeType: scalar_type = attrs.scalar_type if isinstance(attrs, RMSNormAttrs) else ScalarType.int8 tensor_type = TensorType(scalar_type, attrs.input_shape) return _unary_op_type(cls.input_list, tensor_type)
@classmethod @type_check_operation_arguments(types=[RMSNormAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: RMSNormAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: return cls.rms_norm_fn(input_dict[InputName('data')], attrs)
@classmethod @type_check_operation_arguments(types=[RMSNormAttrs, OpQuantInterface, QuantizationConfigs], dict_mask=[False, False, False])
[docs] def quantize(cls, attrs: RMSNormAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> Union[RMSNormAttrs, RMSNormQuantAttrs]: compute_precision = config.quantization_precision.get().to_scalar_type() if scalar_is_integral(compute_precision): input_quant = fix_input_to_int8(quantizer_interface, InputName('data'), config.asymmetry.get()) output_quant = fix_output_to_int8(quantizer_interface, attrs.input_shape, config.asymmetry.get()) # Setting it always to True, as it will yield better performance enable_lut_int16 = True intermediate_min_max = get_intermediate_min_max(quantizer_interface) return quant_utils.quantize_rms_norm(attrs, input_quant.quant, output_quant.quant, intermediate_min_max, enable_lut_int16) else: fix_input(quantizer_interface, compute_precision, InputName('data'), config.asymmetry.get()) output_shape = get_expected_tensor_value(cls.get_type(attrs).output).shape fix_output_from_input(quantizer_interface, output_shape) return dataclasses.replace(attrs, scalar_type=compute_precision)
@classmethod @type_check_operation_arguments(types=[RMSNormQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run_quant(cls, quant_attrs: RMSNormQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: data = _cast_to_quant_tensor_new(input_dict[InputName('data')]) return cls.rms_norm_fn(data, quant_attrs)
@classmethod
[docs] def calibrate(cls, attrs: RMSNormAttrs, calib_attrs: AwesomeCalibAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> Any: """ RMS Norm calibration method. Executes default calibration to get results of RMSNorm operation in floating point. Additionally, calculate intermediate results and update the observers for intermediate values. """ # Run default calibration. outputs = super().calibrate(attrs, calib_attrs, input_dict, config) # Calculate intermediate values, ReduceMean(input * input, axis=-1, keepdims=True) data = input_dict[InputName('data')] reduce_mean_f = lambda x: np.mean(x * x, axis=-1, keepdims=True) reduce_mean_output = reduce_mean_f(data) # Update observers for intermediate values. assert calib_attrs.intermediate_observers assert ('reduce_mean' in calib_attrs.intermediate_observers and calib_attrs.intermediate_observers['reduce_mean'] is not None) calib_attrs.intermediate_observers['reduce_mean'].update(reduce_mean_output.astype(np.float32)) return outputs
[docs] class SliceConcatOp(AwesomeOperation[SliceConcatAttrs, SliceConcatQuantAttrs]): """ This composite node uses infrastructure from StridedSliceOp and ConcatenateOp run. """
[docs] input_list: ClassVar[List[InputName]] = [InputName('data')]
@classmethod
[docs] def get_type(cls, attrs: Union[SliceConcatAttrs, SliceConcatQuantAttrs]) -> NodeType: return NodeType(StridedSliceOp.get_type(attrs.slice_attrs[0]).inputs, TupleConcatenateOp.get_type(attrs.tuple_concat_attrs).output)
@classmethod @type_check_operation_arguments(types=[SliceConcatAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run(cls, attrs: SliceConcatAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: input_data = input_dict[cls.input_list[0]] slice_outputs = [op_fn.strided_slice(slice_attrs, input_data) for slice_attrs in attrs.slice_attrs] return op_fn.concatenate(attrs.tuple_concat_attrs.concat_attrs, slice_outputs)
@classmethod @type_check_operation_arguments( types=[SliceConcatAttrs, OpQuantInterface, QuantizationConfigs, NodeReporter], dict_mask=[False, False, False, False])
[docs] def quantize(cls, attrs: SliceConcatAttrs, quantizer_interface: OpQuantInterface, config: QuantizationConfigs, error_reporter: NodeReporter) -> Union[SliceConcatAttrs, SliceConcatQuantAttrs]: compute_precision = config.quantization_precision.get().to_scalar_type() output_shape_list = list(get_strided_slice_out_shape(attrs.slice_attrs[0])) output_shape_list[attrs.tuple_concat_attrs.concat_attrs.axis] *= len(attrs.slice_attrs) if scalar_is_integral(compute_precision): # Use the input type and quantization q = fix_input_to_int8(quantizer_interface, InputName('data'), config.asymmetry.get()) output_quant = fix_output_from_input(quantizer_interface, tuple(output_shape_list)) quant_slice_attrs = [dataclasses.replace(slice_attrs, input_type=q.type.scalar) for slice_attrs in attrs.slice_attrs] concat_quant_attrs = ConcatQuantAttrs( attrs=attrs.tuple_concat_attrs.concat_attrs, requants=[ requantization.FractionalZeroRequantization( 1, 0, utils.create_and_verify_narrowing(0, RoundType.TOEVEN, np.int8) ) for _ in range(len(attrs.slice_attrs)) ], layer_bits=[8], input_scales=[q.quant.scale for _ in range(len(attrs.slice_attrs))], node_scales=[output_quant.quant.scale], node_zps=[output_quant.quant.zero_point] ) return SliceConcatQuantAttrs(quant_slice_attrs, concat_quant_attrs) else: # bfloat16 fix_input(quantizer_interface, compute_precision, InputName('data'), config.asymmetry.get()) fix_output_from_input(quantizer_interface, tuple(output_shape_list)) slice_attrs = attrs.slice_attrs tuple_attrs = attrs.tuple_concat_attrs.tuple_attrs concat_attrs = dataclasses.replace(attrs.tuple_concat_attrs.concat_attrs, scalar_type=compute_precision) return SliceConcatAttrs(slice_attrs, TupleConcatenateAttrs(tuple_attrs, concat_attrs))
@classmethod @type_check_operation_arguments(types=[SliceConcatQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs] def run_quant(cls, quant_attrs: SliceConcatQuantAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray: input_data = input_dict[cls.input_list[0]] slice_output_dict = {idx: op_fn.strided_slice(slice_attrs, input_data) for idx, slice_attrs in enumerate(quant_attrs.slice_attrs)} return ConcatenateOp.run_quant(quant_attrs.tuple_concat_attrs, slice_output_dict, config)