#########################################################
# Copyright (C) 2020 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Joey Chou
#########################################################
import dataclasses
import math
from abc import ABCMeta
from typing import (
Type, cast, Callable, ClassVar,
Mapping, Sequence)
import numpy as np
from ml_kernels.np_operators import ideal_udf
import ml_kernels.requantization as requantization
from ml_kernels.types import bfloat16
import ml_kernels.math_helpers
from sima_utils.logging import sima_logger
import afe.ir.quantization_conv
import afe.ir.utils as utils
import afe.ir.quantization_utils as quant_utils
import afe.ir.operation_functions as op_fn
from afe.ir import quantization_conv, bias_correction
from afe.ir.attributes import *
from afe.ir.defines import (
InputName, QuantizedTensor, Float, DataValue, TensorValue, Quantization, TupleValue,
QuantizedTensorNew, IdentityCast, QuantCast, DequantCast, QuantizationCast, InputsQuantCast,
TupleCast, RequantCast, ConvertCast, get_expected_tensor_value, NodeReporter, RequantizationMode,
BiasCorrectionType
)
from afe.ir.operation_functions import RunMode
from afe.ir.operation_type_check import type_check_operation_arguments
from afe.ir.quantization_interface import (
OpQuantInterface, fix_input_to_int8, fix_input_to_int16, fix_input, keep_input,
quantize_output, fix_output_to_int8, fix_output, fix_output_from_input,
requantize_scaled, get_intermediate_min_max, fix_output_to_int16
)
from afe.ir.tensor_type import NodeType, ScalarType, TensorType, scalar_type_from_dtype, data_byte_size, \
scalar_is_integral, scalar_is_floating
from afe.core.configs import RunConfigs, QuantizationConfigs, QuantizationPrecision
[docs]
AWESOME_ATTRS = TypeVar("AWESOME_ATTRS", bound=AwesomeAttributes)
[docs]
QUANT_ATTRS = TypeVar("QUANT_ATTRS", bound=AwesomeQuantAttrBase)
[docs]
AVGPOOL_TYPES = TypeVar("AVGPOOL_TYPES", AvgPoolAttrs, AdaptiveAvgPool2DAttrs)
[docs]
AVGPOOL_CLASSES = (AvgPoolAttrs, AdaptiveAvgPool2DAttrs)
# Data that describes a node's output during quantization.
# The data consists of the output quantization info, observed distribution and calibration inputs.
[docs]
QuantizationTensorData = Tuple[DataValue[QuantResultTensorType], Optional[Dict[str, ObservedDistribution]],
Optional[List[np.ndarray]]]
"""
TODO:
* Merge the quantization in single node and composite node.
Ex: Use Conv2DOp.quantize in ConvAddActivationOp
* Merge quantization, run_quant for Conv2D and Conv2DTranspose
* Create check_attrs function to check attrs and quant_attrs
"""
def _warn_for_nonzero_zero_point(zp: int) -> None:
"""
Log a warning if the zero point is not zero, because the compiler
can't handle it. Issue SWMLA-4306.
"""
if zp != 0:
sima_logger.sima_log_warning("Convolution or matrix multiply has a nonzero output zero point. "
"This operation will not compile correctly.")
def _quantize_type(t: TensorType) -> TensorType:
"""
Convert a tensor type to an int8 tensor type having the same shape.
This is the most common way that a type is changed during quantization.
:param t: Tensor type to convert
:return: Similar type as a tensor of int8
"""
return replace(t, scalar=ScalarType.int8)
def _binary_op_type(input_list: List[InputName], t: TensorType) -> NodeType:
"""
Construct the type of a binary elementwise operator having the given input names.
Both input tensors and the output tensor have the same type.
:param input_list: The input list of the AwesomeOperation. It should have 2 items.
:param t: Type of the operator's output and inputs
:return: Type of the operator
"""
assert len(input_list) == 2
data_type = TensorValue(t)
return NodeType({name: data_type for name in input_list}, data_type)
def _unary_op_type(input_list: List[InputName], t: TensorType) -> NodeType:
"""
Construct the type of a unary elementwise operator having the given input name.
The input tensor and the output tensor have the same type.
:param input_list: The input list of the AwesomeOperation. It should have 1 item.
:param t: Type of the operator's output and input
:return: Type of the operator
"""
assert len(input_list) == 1
data_type = TensorValue(t)
return NodeType({name: data_type for name in input_list}, data_type)
def _pooling_output_shape(input_shape: Tuple[int, ...], kernel_shape: Tuple[int, ...],
padding: Tuple[Tuple[int, int], ...],
stride: Tuple[int, ...], dilation: Tuple[int, ...]) -> Tuple[int, ...]:
"""
Calculate the shape of the output of the pooling operator in the dimensions
that are pooled. All parameters should be tuples of the same length, having
one element for each dimension that is pooled.
The dimensions are normally interpreted as H, W for two-dimensional pooling
or D, H, W for three-dimensional pooling.
:param input_shape: Shape of the input tensor
:param kernel_shape: Shape of the pooling kernel
:param padding: Number of padding elements applied to the pooling input on the
lower-index and upper-index ends.
:param stride: Stride between pooling evaluation indices
:param dilation: Dilation of the pooling kernel
:return: Shape of the pooling output tensor
"""
assert len(input_shape) == len(kernel_shape) == len(padding) == len(stride) == len(dilation)
assert all(x == 1 for x in dilation) # Other dilation values are not handled
ret = []
for n, k, (p_lo, p_hi), s, d in zip(input_shape, kernel_shape, padding, stride, dilation):
# Number of valid window positions
num_window_positions = (n + p_lo + p_hi) - k + 1
# Some positions may be skipped according to stride.
# Divide number of window positions by stride, rounding up.
o = (num_window_positions - 1) // s + 1
ret.append(o)
return tuple(ret)
def _get_spatial_dimensions(attrs: PoolAttrs):
"""
Get the dimensions where the pooling operator is applied.
:return: "HW" for 2D and "DHW" for 3D
"""
if len(attrs.pool_size) == 4:
return "HW"
else:
assert len(attrs.pool_size) == 5, f"Expected length {5}, got {len(attrs.pool_size)}"
return "DHW"
def _pooling_op_type(pooling_attrs: PoolAttrs, in_type: ScalarType, out_type: ScalarType) -> NodeType:
"""
Construct the type of pooling operator. It is a
one-input, one output node type where the input and output
shapes are related by the pooling operation.
:param pooling_attrs: Attributes of a pooling operator.
The layout, out_layout, padding, pool_size, strides, dilation,
and input_shape fields are used.
:param in_type: The scalar type of the input tensor.
:param out_type: The scalar type of the output tensor.
:return: A pooling operator type.
"""
layout = pooling_attrs.layout
out_layout = pooling_attrs.out_layout if pooling_attrs.out_layout != "" else layout
spatial_dimensions = _get_spatial_dimensions(pooling_attrs)
# Calculate output shape in HW dimensions
p_input_shape = utils.transpose_attr_according_to_layout_strings(pooling_attrs.input_shape, layout,
spatial_dimensions)
p_pool_size = utils.transpose_attr_according_to_layout_strings(pooling_attrs.pool_size, layout, spatial_dimensions)
p_padding = utils.transpose_attr_according_to_layout_strings(pooling_attrs.padding, layout, spatial_dimensions)
p_strides = utils.transpose_attr_according_to_layout_strings(pooling_attrs.strides, layout, spatial_dimensions)
p_dilation = utils.transpose_attr_according_to_layout_strings(pooling_attrs.dilation, layout, spatial_dimensions)
p_output_shape = _pooling_output_shape(p_input_shape, p_pool_size, p_padding, p_strides, p_dilation)
# Build the full output shape
output_shape = utils.transpose_attr_according_to_layout_strings(pooling_attrs.input_shape, layout, out_layout)
output_shape = utils.insert_according_to_layout_strings(output_shape, p_output_shape, out_layout,
spatial_dimensions)
# Create type
input_type = TensorValue(TensorType(in_type, pooling_attrs.input_shape))
output_type = TensorValue(TensorType(out_type, output_shape))
return NodeType({'data': input_type}, output_type)
[docs]
def make_quantized_pool_attrs(attrs: PoolAttrs, *, pad_value: int, input_int16: bool,
requant: Optional[BaseRequantization] = None) -> PoolQuantAttrs:
"""
Construct a PoolQuantAttrs, using values from a PoolAttrs and additional values
that were computed during quantization.
"""
return PoolQuantAttrs(pool_attrs=attrs,
pad_value=pad_value,
rounding_type=RoundType.TOEVEN,
input_int16=input_int16,
requant=requant)
def _conv_op_type(attrs: Union[ConvAddActivationAttrs, ConvQuantAttrs],
input_scalar_type: ScalarType,
output_scalar_type: ScalarType) -> NodeType:
"""
Construct the type of convolution operator. It is a
one-input, one output node type where the input and output
shapes are related by the convolution operation.
:param attrs: Attributes of a ConvAddActivationOp.
:param input_scalar_type: The scalar type of the input tensor.
:param output_scalar_type: The scalar type of the input tensor.
:return: A convolution operator type.
"""
input_type = TensorValue(TensorType(input_scalar_type, attrs.conv_attrs.input_shape))
output_type = TensorValue(TensorType(output_scalar_type, attrs.conv_attrs.output_shape))
return NodeType({'data': input_type}, output_type)
[docs]
def get_output_shape(attrs: Union[SumAttrs, MeanAttrs, ProdAttrs, ExtmAttrs, ArgMaxAttrs]):
"""
Get the output shape for the dimension-reduction operators (SumOp, MeanOp, ProdOp, ExtmOp & ArgMaxOp)
using attributes from their AwesomeAttributes class.
:param attrs: AwesomeAttributes class
:return: Output shape
"""
# Extracting attributes
axis = attrs.axis
exclude = bool(attrs.exclude)
keepdims = bool(attrs.keepdims)
shape = list(attrs.shape)
if exclude:
axis = utils.exclude_axes(len(shape), axis)
if keepdims:
for a in axis:
shape[a] = 1
else:
new_shape = []
for idx, el in enumerate(shape):
if idx not in axis:
new_shape.append(el)
shape = new_shape
return shape
[docs]
def node_type_for_dimension_reduction_operators(attrs: Union[SumAttrs, MeanAttrs, ProdAttrs, ExtmAttrs, ArgMaxAttrs],
input_dtype: Union[np.dtype, Type[np.number]],
output_dtype: Union[np.dtype, Type[np.number]]):
"""
Get NodeType for the dimension-reduction opreators (SumOp, MeanOp, ProdOp, ExtmOp & ArgMaxOp)
:param attrs: AwesomeAttributes class
:param dtype: Data type
:return: NodeType
"""
input_shape = attrs.shape
output_shape = get_output_shape(attrs)
data_type = output_dtype
return NodeType({'data': TensorValue(TensorType(ScalarType.from_numpy(input_dtype), input_shape))},
TensorValue(TensorType(ScalarType.from_numpy(data_type), tuple(output_shape))))
def _rescale_int8_to_int32_quantization(qrtt: QuantResultTensorType, right_shift: Union[int, np.ndarray]) \
-> Tuple[QuantResultTensorType, Union[int, np.ndarray]]:
"""
Transform an operator's output quantization from int8 to int32 by adjusting the way the
operator's output is shifted. Reduce the right shift so that its value
(or minimum value, for per-channel) is 0. This preserves as much precision as possible
from the 32-bit intermediate result while ensuring it uses per-tensor quantization.
:param qrtt: Quantized type having int8 precision
:param right_shift: Right shift performed at the end of the operator
:return: New quantized type having int32 precision and new right shift.
"""
assert qrtt.type.scalar == ScalarType.int8
assert qrtt.quant is not None
min_rs = int(np.amin(right_shift)) if isinstance(right_shift, np.ndarray) else right_shift
scale_factor = 2**min_rs
new_type = dataclasses.replace(qrtt.type, scalar=ScalarType.int32)
new_quant = Quantization(qrtt.quant.scale * scale_factor, qrtt.quant.zero_point * scale_factor,
bits=32, min_val=qrtt.quant.min_val, max_val=qrtt.quant.max_val)
# Because a power-of-2 scale factor was used, it's suitable for ArithFoldedRequantization
new_qrtt = QuantResultTensorType(new_type, new_quant, RequantMethod.arith_folded)
right_shift = right_shift - min_rs
return new_qrtt, right_shift
def _ceildiv(a, b):
"""Helper function for ceil division."""
return -(a // -b)
[docs]
def expand_indices_to_shape_length(begin: List[int], end: List[int], strides: List[int], axes: Optional[List[int]],
input_shape: List[int]) -> Tuple[List[int], List[int], List[int]]:
"""
Helper function for expanding begin, end and strides to match the shape length.
"""
if axes is not None:
new_begin = len(input_shape) * [0]
new_end = input_shape
new_strides = len(input_shape) * [1]
for i, e in enumerate(axes):
new_begin[e] = begin[i]
new_end[e] = end[i]
new_strides[e] = strides[i]
begin = new_begin
end = new_end
strides = new_strides
return begin, end, strides
[docs]
def get_strided_slice_out_shape(attrs: StridedSliceAttrs) -> Tuple[int, ...]:
"""
Get StridedSliceOp output shape.
:param attrs: StridedSlice attributes class.
:return: Output shape.
"""
begin = list(attrs.begin)
end = list(attrs.end)
strides = list(attrs.strides)
input_shape = list(attrs.input_shape)
begin, end, strides = expand_indices_to_shape_length(begin=begin, end=end, strides=strides,
axes=attrs.axes, input_shape=input_shape)
output_shape = np.ones(len(input_shape), dtype=np.int32).tolist()
# tf.strided_slice documentation
if attrs.slice_mode == 'size':
for idx, (b, e) in enumerate(zip(begin, end)):
output_shape[idx] = e # b + e - b
else:
assert attrs.slice_mode == 'end', f"Expected 'end', got {attrs.slice_mode}"
for idx, (b, e, s) in enumerate(zip(begin, end, strides)):
output_shape[idx] = _ceildiv(e - b, s)
return tuple(output_shape)
[docs]
def get_squeeze_out_shape(axis: list[int], input_shape: tuple[int, ...]) -> tuple[int, ...]:
"""
Get SqueezeOp output shape.
Args:
axis: Set of axes to remove
input_shape: Shape of input tensor
Returns:
Output shape.
"""
new_shape = []
for idx, el in enumerate(input_shape):
if idx not in axis:
new_shape.append(el)
output_shape = tuple(new_shape)
return output_shape
def _get_transpose_out_shape(attrs: TransposeAttrs) -> Tuple[int, ...]:
"""
Get TransposeOp output shape.
:param attrs: Transpose attributes class.
:return: Output shape.
"""
input_shape = list(attrs.input_shape)
output_shape = []
for a in attrs.axes:
output_shape.append(input_shape[a])
return tuple(output_shape)
def _get_depth_to_space_out_shape(attrs: DepthToSpaceAttrs) -> Tuple[int, ...]:
"""Get DepthToSpaceOp output shape.
Args:
attrs: DepthToSpace attributes class.
Returns:
Output shape.
"""
output_shape = list(attrs.input_shape)
output_shape[-1] = output_shape[-1] // (attrs.block_size * attrs.block_size)
output_shape[-2] = output_shape[-2] * attrs.block_size
output_shape[-3] = output_shape[-3] * attrs.block_size
return tuple(output_shape)
[docs]
def get_expand_dims_out_shape(attrs: ExpandDimsAttrs) -> Tuple[int, ...]:
"""
Get ExpandDimsOp output shape.
:param attrs: ExpanDims attributes class.
:return: Output shape.
"""
input_shape = list(attrs.input_shape)
axis = attrs.axis
num_newaxis = attrs.num_newaxis
output_shape = input_shape[:]
if axis == -1:
output_shape.extend([1] * num_newaxis)
elif axis < -1:
axis = axis + 1
output_shape[axis:axis] = [1] * num_newaxis
else:
output_shape[axis:axis] = [1] * num_newaxis
return tuple(output_shape)
def _get_split_out_shape(attrs: SplitAttrs) -> Tuple[Tuple[int, ...], ...]:
"""
Get SplitOp output shapes.
:param attrs: Split attributes class.
:return: Output shape.
"""
input_shape = list(attrs.input_shape)
indices = attrs.indices_or_sections
axis = attrs.axis
output_shape = input_shape
output_shapes = list()
# If indices_or_sections is an integer, N, the array will be divided into N equal arrays along axis
if isinstance(indices, int):
assert input_shape[axis] % indices == 0, f"{input_shape[axis]} is not divisible by {indices}"
output_shape[axis] = input_shape[axis] // indices
for i in range(indices):
output_shapes.append(tuple(output_shape))
else:
# If indices_or_sections is a 1-D array of sorted integers,
# the entries indicate where along axis the array is split
assert isinstance(indices, Tuple)
begin = 0
for val in indices:
size = val - begin
output_shape[axis] = size
begin = val
output_shapes.append(tuple(output_shape))
output_shape[axis] = attrs.input_shape[axis] - indices[-1]
output_shapes.append(tuple(output_shape))
return tuple(output_shapes)
def _get_out_shape_for_op_with_2_inputs(attrs: Union[MultiplyAttrs, MultiplyQuantAttrs,
DivideAttrs, DivideQuantAttrs, AddAttrs, AddQuantAttrs,
ConstantMultiplyAddAttrs, PowerAttrs, SubtractAttrs]) -> Tuple[int, ...]:
"""
Get output shape for operators that have 2 inputs with possible different shapes.
Function is made to support TVM style broadcasting,
with the fact that N2A backend currently does not support broadcasting.
:param attrs: Attributes class.
:return: Output shape.
"""
lhs_input_shape = list(attrs.lhs_input_shape)
rhs_input_shape = list(attrs.rhs_input_shape)
lhs_len = len(lhs_input_shape)
rhs_len = len(rhs_input_shape)
if lhs_len > rhs_len:
rhs_input_shape = [1] * (lhs_len - rhs_len) + rhs_input_shape
elif rhs_len > lhs_len:
lhs_input_shape = [1] * (rhs_len - lhs_len) + lhs_input_shape
output_shape = lhs_input_shape
for i, (lhs, rhs) in enumerate(zip(lhs_input_shape, rhs_input_shape)):
if lhs > rhs:
assert rhs == 1, f"Incompatible shapes."
output_shape[i] = lhs
elif rhs > lhs:
assert lhs == 1, f'Incompatible shapes.'
output_shape[i] = rhs
else:
assert lhs == rhs, f'Incompatible shapes.'
output_shape[i] = lhs
return tuple(output_shape)
def _get_output_pad_shape(attrs: PadAttrs) -> Tuple[int, ...]:
"""
Get output shape for PadOp.
:param attrs: Pad attributes class.
:return: Output shape.
"""
input_shape = list(attrs.input_shape)
pad = list(attrs.pad_width)
output_shape = input_shape
for i, pad in enumerate(pad):
for val in pad:
output_shape[i] += val
return tuple(output_shape)
def _get_image_resize2d_out_shape(attrs: ImageResize2DAttrs) -> Tuple[int, ...]:
"""
Get output shape for ImageResize2DOp.
:param attrs: ImageResize2D attributes class.
:return: Output shape.
"""
input_shape = attrs.input_shape
layout = attrs.layout
output_shape_h, output_shape_w = attrs.size
output_shape = utils.insert_according_to_layout_strings(input_shape, (output_shape_h, output_shape_w),
layout, "HW")
return output_shape
def _get_concat_out_shape(attrs: ConcatenateAttrs) -> Tuple[int, ...]:
"""
Get output shape for ConcatenateOp.
:param: Concatenate attributes class.
:return: Output shape.
"""
input_types = attrs.input_types
axis = attrs.axis
input_shapes = [input_type.shape for input_type in input_types]
out_shape = list(input_shapes[0])
concat_dim_size = np.sum([input_shape[axis] for input_shape in input_shapes])
out_shape[axis] = int(concat_dim_size)
return tuple(out_shape)
def _get_take_out_shape(attrs: TakeAttrs) -> Tuple[int, ...]:
"""
Get output shape for TakeOp.
:param attrs: Take attributes class.
:return: Output shape.
"""
output_shape = list(attrs.input_shape)
indices_shape = list(attrs.indices_shape)
axis = attrs.axis
assert len(indices_shape) == 1
output_shape[axis] = indices_shape[0]
return tuple(output_shape)
[docs]
def make_quantization_cast(provided_type: DataValue[QuantResultTensorType],
wanted_type: DataValue[QuantResultTensorType]) \
-> QuantizationCast:
"""
Make a quantization cast for one value.
:param provided_type: Type and quantization of the value
:param wanted_type: Type and quantization that it should be cast to
:return: Cast
"""
if isinstance(provided_type, TensorValue):
assert isinstance(wanted_type, TensorValue)
assert provided_type.value.type.shape == wanted_type.value.type.shape, \
"Tensor shape unexpectedly changed during quantization"
shape = provided_type.value.type.shape
provided_scalar_type = provided_type.value.type.scalar
provided_quant = provided_type.value.quant
provided_requant_method = provided_type.value.requant_method
wanted_scalar_type = wanted_type.value.type.scalar
wanted_quant = wanted_type.value.quant
if provided_scalar_type == wanted_scalar_type:
# Same type. No cast required. If they both have quantization, quantization must be equal.
assert provided_quant is None or wanted_quant is None or provided_quant == wanted_quant
return IdentityCast()
elif scalar_is_floating(provided_scalar_type) and wanted_scalar_type in (ScalarType.int8, ScalarType.int16):
# Insert a quantize node
assert wanted_quant is not None
num_bits = 8 if wanted_scalar_type == ScalarType.int8 else 16
return QuantCast(shape, wanted_quant.scale, wanted_quant.zero_point, num_bits, wanted_scalar_type)
elif provided_scalar_type in (ScalarType.int8, ScalarType.int16, ScalarType.int32) \
and scalar_is_floating(wanted_scalar_type):
# Insert a dequantize node
assert provided_quant is not None
return DequantCast(shape, provided_quant.scale, provided_quant.zero_point,
input_dtype=provided_scalar_type.numpy_type(),
output_dtype=wanted_scalar_type.numpy_type())
elif provided_scalar_type in (ScalarType.int8, ScalarType.int16, ScalarType.int32) \
and wanted_scalar_type in (ScalarType.int16, ScalarType.int8):
# Insert a requantize node
assert provided_quant is not None and wanted_quant is not None
assert provided_requant_method is not None
return RequantCast(shape, provided_quant.scale, provided_quant.zero_point,
wanted_quant.scale, wanted_quant.zero_point,
min_val=wanted_quant.min_val, max_val=wanted_quant.max_val,
input_32_bit=provided_scalar_type == ScalarType.int32,
output_16_bit=wanted_scalar_type == ScalarType.int16,
requant_method=provided_requant_method)
elif scalar_is_floating(provided_scalar_type) and scalar_is_floating(wanted_scalar_type):
# Insert a convert node
return ConvertCast(shape, provided_scalar_type, wanted_scalar_type)
else:
raise RuntimeError("Unable to insert a conversion between quantized representations")
elif isinstance(provided_type, TupleValue):
assert isinstance(wanted_type, TupleValue)
assert len(provided_type.elements) == len(wanted_type.elements)
element_casts = [make_quantization_cast(p, w)
for p, w in zip(provided_type.elements, wanted_type.elements)]
if all(isinstance(c, IdentityCast) for c in element_casts):
return IdentityCast()
return TupleCast(element_casts)
else:
raise TypeError("Unrecognized type")
[docs]
def make_quantization_casts(provided_input_types: Dict[InputName, DataValue[QuantResultTensorType]],
wanted_input_types: Dict[InputName, DataValue[QuantResultTensorType]]) \
-> InputsQuantCast:
"""
Create casts for a quantized node's input types by comparing the input data type with the type
that the node requires.
:param provided_input_types: Type and quantization of a node's inputs, after quantization
:param wanted_input_types: Type and quantization that the quantized node requires
:return: Casts for the node
"""
assert list(provided_input_types.keys()) == list(wanted_input_types.keys()) # Compare keys including ordering
casts = dict()
for name in provided_input_types.keys():
p = provided_input_types[name]
w = wanted_input_types[name]
casts[name] = make_quantization_cast(p, w)
return InputsQuantCast(casts)
def _require_integer_precision(op_name: str, config: QuantizationConfigs) -> None:
"""
Require the quantization precision to be int8 or int16. Raise an exception otherwise.
"""
assert config.quantization_precision.get() in (QuantizationPrecision.INT_8, QuantizationPrecision.INT_16), \
f"Only integer quantization is supported for operator {op_name}"
[docs]
class AwesomeOperation(Generic[AWESOME_ATTRS, QUANT_ATTRS]):
"""
An abstract class
Stores a list of input key names expected to be passed in by the AwesomeNode for developer reference.
input_list: ClassVar[Optional[List[InputName]]]. Used as reference when getting inputs
from a dictionary. If input_list is None, AFE will skip validating input_list
at runtime
intermediate_names: ClassVar[List[str]]. Used for creation of intermediate observers. If the
list is empty list, empty dict for intermediate observers will be created.
"""
@classmethod
[docs]
def get_type(cls, attrs: Union[AWESOME_ATTRS, QUANT_ATTRS]) -> NodeType:
"""
Get the type of this node given its attributes. The parameter should be a QUANT_ATTRS
if that data has been created, or an AWESOME_ATTRIBUTES otherwise.
:param attrs: Attributes associated with the operator. It is an AWESOME_ATTRIBUTES if
quantization has not transformed the node, or a QUANT_ATTRS if it has.
:return: The node's type.
"""
raise NotImplementedError(f"{cls.__name__} does not have get_type function implemented")
@classmethod
[docs]
def run(cls, attrs: AWESOME_ATTRS, input_dict: Dict[InputName, Any],
config: RunConfigs) -> Any:
"""
Executes the operation in floating point
:param attrs: AwesomeAttributes associated with this operation
:param input_dict: Dictionary of names (eg. 'weights' 'data') to numpy arrays
:param config: Configuration parameters for how to run the network
:return: Output tensor(s) whose type is dependent on the subclass.
"""
raise NotImplementedError(f"{cls.__name__} does not have run function implemented")
@classmethod
[docs]
def run_quant(cls, quant_attrs: QUANT_ATTRS, input_dict: Dict[InputName, Any],
config: RunConfigs) -> Any:
"""
Execute the operation using quantized arithmetic.
:param quant_attrs: Parameters that define the quantized operation
:param input_dict: Dictionary of names (eg. 'weights' 'data') to numpy arrays
:param config: Configuration parameters for how to run the network
:return: Output tensor(s) whose type is dependent on the subclass.
"""
raise NotImplementedError(f"{cls.__name__} does not have run_quant function implemented")
@classmethod
[docs]
def calibrate(cls, attrs: AWESOME_ATTRS, calib_attrs: AwesomeCalibAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> Any:
"""
The default calibration method. Executes the operation in floating point.
Update the observer if the operation is associated with one. Otherwise,
the operation's quantization parameters will be calculated based on it's
input's quantization parameters.
Update the min/max values using the outputs and use the updated min/max
to compute the scales and zero points.
:param attrs: AwesomeAttributes associated with this operation
:param calib_attrs: AwesomeCalibAttrs associated with operation's node.
:param input_dict: Dictionary of names (eg. 'weights' 'data') to numpy arrays
:return: Output tensor(s) whose type is dependent on the subclass.
"""
outputs = cls.run(attrs, input_dict, config)
if calib_attrs.observer is not None:
# Observers only monitor single output nodes
assert isinstance(outputs, np.ndarray)
calib_attrs.observer.update(outputs.astype(np.float32, copy=False))
return outputs
@classmethod
@classmethod
[docs]
def get_observed_distribution(cls, calib_attrs: AwesomeCalibAttrs,
inputs: Dict[InputName, QuantizationTensorData]) \
-> Tuple[Optional[ObservedDistribution], Dict[str, ObservedDistribution]]:
"""
Get observed distribution and intermediate observed distributions.
If a node doesn't have observer, values from previous node are used.
ExternalOp, TupleOp, TupleGetItemOp, LayoutTransformOp, ReshapeOp don't use observed distribution and
those values won't be passed to any other MLA node, so observed distribution for those are set to None.
:param calib_attrs: Calibration attributes.
:param inputs: Properties of the inputs.
It has quantization scales of the input tensors and attributes of the nodes that calculate the inputs.
:return: Tuple of observed distribution and dictionary of intermediate observed
distributions.
"""
if calib_attrs.observer is not None:
intermediate_distributions = dict() if calib_attrs.intermediate_observers is None else \
{k: ObservedDistribution(v) for k, v in calib_attrs.intermediate_observers.items()}
return ObservedDistribution(calib_attrs.observer), intermediate_distributions
else:
assert cls.input_list is not None
assert len(cls.input_list) == 1
assert cls.input_list[0] in inputs
distribution = inputs[cls.input_list[0]][1]
assert distribution is not None
return distribution, {}
@classmethod
[docs]
def quantize(cls, attrs: AWESOME_ATTRS, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> QUANT_ATTRS:
"""
Compute quantized operator attributes, input quantization, and output quantization
from floating-point operator attributes and the result of calibration.
When this function is called, calib_attrs.input_quant has the types and quantization
of the input values (after the inputs have been transformed by quantization), and
calib_attrs.quant holds a type and quantization of the output, which this function
may overwrite. The output quantization is computed based on calibration. The output
type should not be used.
This function must assign to calib_attrs.quant the output type and quantization
that this operator has after quantization. It may use the default quantization
if appropriate.
This function may modify attrs. It should modify attrs if the same attribute class
is used for both the floating-point and the quantized operator, which would mean that
it's designed to store any quantization information in attrs.
This function may modify calib_attrs.input_quant to direct quantization to supply
different inputs to this operator. The quantization algorithm will insert
quantize or dequantize nodes so that the inputs have the type and quantization that
were assigned. An exception will be raised if the input can't be provided by
inserting a quantize or dequantize node or leaving the input unchanged.
The quantized operator attributes are returned.
:param attrs: Floating-point operator attributes.
:param calib_attrs: Calibration results.
:param config: Parameters controlling how to quantize.
:param error_reporter: Node reporter of the node to be quantized.
:return: Quantized operator attributes
"""
raise NotImplementedError(f"{cls.__name__} does not have quantize function implemented")
@classmethod
[docs]
def type_check(cls, value: Any, expected_type: Type[T]) -> T:
"""
Each op expects a more specific type of inputs / AwesomeAttributes so this function helps with type checking
:param value: AwesomeAttributes
:param expected_type: a type
"""
assert isinstance(value, expected_type), "Error: Op ({}) expects ({}). Got ({})".format(cls.__name__,
expected_type,
type(value))
return cast(T, value)
def _cast_to_quant_tensor_new(data: np.ndarray) -> np.ndarray:
"""
Temporary cast to int8, until all operators return int8 in run_quant functions.
Should be used only on operator inputs.
To be removed once all operators' run_quant methods are returning int8 data.
:param data: data
:return: data with int8 values
"""
return data.astype(QuantizedTensorNew)
def _get_input_precision(quantizer_interface: OpQuantInterface,
quantization_precision: QuantizationPrecision,
input_name: InputName) -> ScalarType:
"""
Get input precision of a node.
If input_type is int8, input_precision is set to ScalarType.int8 and no changes are needed.
If input_type is float32, bfloat16, int16, or int32, input_precision should be set according to the
value set in quantization_precision argument.
Fix_input function is called later to fix input type to chosen input precision.
"""
input_quant = quantizer_interface.get_input_quant()
match get_expected_tensor_value(input_quant[input_name]).type.scalar:
case ScalarType.int8:
input_precision = ScalarType.int8
case ScalarType.int16 | ScalarType.int32 | ScalarType.bfloat16 | ScalarType.float32:
input_precision = quantization_precision.to_scalar_type()
case _: raise ValueError("Unrecognized precision")
return input_precision
###########################
# PLACEHOLDER and CONSTANT
###########################
[docs]
class PlaceholderOp(AwesomeOperation[PlaceholderAttrs, PlaceholderQuantAttrs]):
[docs]
placeholder_fn: Callable[[np.ndarray], np.ndarray] = op_fn.placeholder
[docs]
quant_fn: Callable[[np.ndarray, float, int, int], np.ndarray] = quant_utils.linear_quantize
@classmethod
[docs]
def get_type(cls, attrs: Union[PlaceholderAttrs, PlaceholderQuantAttrs]) -> NodeType:
return NodeType({}, TensorValue(attrs.type))
@classmethod
@type_check_operation_arguments(types=[PlaceholderAttrs, (np.ndarray, tuple)], dict_mask=[False, True])
[docs]
def run(cls, attrs: PlaceholderAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> np.ndarray:
data = input_dict[InputName('data')]
return cls.placeholder_fn(data.astype(attrs.type.scalar.numpy_type(), copy=False))
@classmethod
@classmethod
[docs]
def quantize(cls, attrs: PlaceholderAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) \
-> PlaceholderQuantAttrs:
# Use the quantization that was passed to this placeholder
quant = quantizer_interface.get_placeholder_quant()
assert quant is not None
quantizer_interface.set_chosen_output_quant(quant)
# Input type does not change during quantization
new_type = attrs.type
return PlaceholderQuantAttrs(new_type, get_expected_tensor_value(quant).quant)
@classmethod
[docs]
def run_quant(cls, quant_attrs: PlaceholderQuantAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> np.ndarray:
data = input_dict[InputName('data')]
# Coerce to the correct type. This should be unnecessary but some code passes the wrong
# input type.
data = data.astype(quant_attrs.type.scalar.numpy_type(), copy=False)
return cls.placeholder_fn(data)
[docs]
class ConstantOp(AwesomeOperation[ConstantAttrs, ConstantQuantAttrs]):
[docs]
constant_fn: Callable[[np.ndarray], np.ndarray] = op_fn.constant
@classmethod
[docs]
def get_type(cls, attrs: Union[ConstantAttrs, ConstantQuantAttrs]) -> NodeType:
if isinstance(attrs, ConstantAttrs):
data = attrs.data
else:
data = attrs.quant_data
return NodeType({}, TensorValue(TensorType(ScalarType.from_numpy(data.dtype), data.shape)))
@classmethod
@type_check_operation_arguments(types=[ConstantAttrs], dict_mask=[False])
[docs]
def run(cls, attrs: ConstantAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> np.ndarray:
if attrs is None:
raise ValueError("Error: Floating point constants missing. This can happen if you have already quantized"
"but are running the network in a floating point mode")
return cls.constant_fn(attrs.data)
@classmethod
@type_check_operation_arguments(types=[ConstantAttrs, AwesomeCalibAttrs], dict_mask=[False, False])
[docs]
def calibrate(cls, attrs: ConstantAttrs, calib_attrs: AwesomeCalibAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> np.ndarray:
tensor_value = cls.run(attrs, input_dict, config)
# Consider doing update just once
assert calib_attrs.observer is not None
assert isinstance(tensor_value, np.ndarray)
calib_attrs.observer.update(tensor_value)
return tensor_value
@classmethod
@type_check_operation_arguments(types=[ConstantAttrs, OpQuantInterface, QuantizationConfigs, NodeReporter],
dict_mask=[False, False, False, False])
[docs]
def quantize(cls, attrs: ConstantAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) \
-> Union[ConstantAttrs, ConstantQuantAttrs]:
# TODO(Joey): Check if we can quantize the constant to int32. For now we quantize the constant to int8
# If the constant is weight or bias, the quant_data will be overwritten while quantizing the Convoluton/Dense.
const_data = attrs.data
if const_data.dtype in (np.int8, np.int16, np.int32, np.int64):
# Do not quantize integer data
ty = TensorType(ScalarType.from_numpy(const_data.dtype), const_data.shape)
quant = QuantResultTensorType.from_type(ty)
quantizer_interface.set_chosen_output_quant(TensorValue(quant))
quant_data = const_data
return ConstantQuantAttrs(quant_data)
else:
assert const_data.dtype == np.float32
quantized_type = config.quantization_precision.get().to_scalar_type()
quant = fix_output(quantizer_interface, quantized_type, const_data.shape, config.asymmetry.get())
if scalar_is_integral(quantized_type):
quant_data = quant_utils.linear_quantize_with_quantization(const_data, quant.quant).\
astype(quantized_type.numpy_type())
return ConstantQuantAttrs(quant_data)
else:
return dataclasses.replace(attrs, data=const_data.astype(np.dtype(bfloat16)))
@classmethod
@type_check_operation_arguments(types=[ConstantQuantAttrs, tuple], dict_mask=[False, True])
[docs]
def run_quant(cls, quant_attrs: ConstantQuantAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> np.ndarray:
if quant_attrs is None:
raise ValueError("Error: quantized constants missing. This can happen if you have not quantized"
"the network but are running the network in a quantized mode")
# TODO(Joey): Try to remove the copy
return cls.constant_fn(np.copy(quant_attrs.quant_data))
#########################
# COMPUTATION OPERATIONS
#########################
class _MaxPoolOp(AwesomeOperation[MaxPoolAttrs, PoolQuantAttrs]):
maxpool_fn: Callable[[MaxPoolAttrs, np.ndarray, Union[float, int]], np.ndarray] = op_fn.maxpool
@classmethod
def get_type(cls, attrs: Union[MaxPoolAttrs, PoolQuantAttrs]) -> NodeType:
if isinstance(attrs, MaxPoolAttrs):
in_type = out_type = attrs.scalar_type
else:
in_type = ScalarType.int16 if attrs.input_int16 else ScalarType.int8
out_type = ScalarType.from_numpy(attrs.requant.out_dtype) if attrs.requant is not None else ScalarType.int8
attrs = attrs.pool_attrs
node_type = _pooling_op_type(attrs, in_type, out_type)
return node_type
@classmethod
@type_check_operation_arguments(types=[MaxPoolAttrs, np.ndarray], dict_mask=[False, True])
def run(cls, attrs: MaxPoolAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
data = input_dict[InputName('data')]
if not config.fast_mode or data.dtype == np.int16:
run_mode = RunMode.MLA_MODE
else:
run_mode = RunMode.FAST_MODE
return cls.maxpool_fn(attrs, data, pad_value=-float('inf'), mode=run_mode)
@classmethod
@type_check_operation_arguments(types=[MaxPoolAttrs, OpQuantInterface, QuantizationConfigs, NodeReporter],
dict_mask=[False, False, False, False])
def quantize(cls, attrs: MaxPoolAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> PoolQuantAttrs:
input_qrtt = quantizer_interface.get_input_quant()[InputName('data')]
input_precision = get_expected_tensor_value(input_qrtt).type.scalar
# Choose what numeric type to use at the input and output.
if input_precision == ScalarType.float32:
# Quantize the input to the selected precision
compute_type = config.quantization_precision.get().to_scalar_type()
elif config.quantization_precision.get() == QuantizationPrecision.INT_8:
# Use int8 precision. Requantize if necessary.
compute_type = ScalarType.int8
else:
# Use higher precision. Ignore the selected precision and use the
# input data type instead, since that affords the best accuracy.
compute_type = input_precision
input_quant = fix_input(quantizer_interface, compute_type, InputName('data'), config.asymmetry.get())
output_shape = get_expected_tensor_value(cls.get_type(attrs).output).shape
match compute_type:
case ScalarType.int16:
# When int16 is enabled, output has the same quantization as the input, except that the data type
# is changed.
output_bits = 32 if config.intermediate_int32.get() else 16
output_quant = dataclasses.replace(input_quant.quant, bits=output_bits)
output_type = ScalarType.int32 if config.intermediate_int32.get() else ScalarType.int16
requant = requantization.get_id_requantization(output_type.numpy_type())
requant_method = RequantMethod.arith_folded
case ScalarType.int8:
# Input and output use int8
output_type = ScalarType.int8
output_quant = input_quant.quant
requant = requantization.get_id_requantization(np.int8)
requant_method = RequantMethod.arith_folded
case ScalarType.bfloat16:
output_type = ScalarType.bfloat16
output_quant = None
requant = requantization.get_id_requantization(bfloat16)
requant_method = None
case _:
raise ValueError("Unexpected QuantizationPrecision")
output_quant = QuantResultTensorType(TensorType(output_type, output_shape), output_quant,
requant_method)
quantizer_interface.set_chosen_output_quant(TensorValue(output_quant))
# Put the results into new operator attributes
quant_attrs: Union[PoolAttrs, PoolQuantAttrs]
if output_quant.quant is None:
quant_attrs = dataclasses.replace(attrs, scalar_type=compute_type)
else:
pad_value_type = compute_type.numpy_type()
pad_value = ml_kernels.math_helpers.get_dtype_min(pad_value_type)
quant_attrs = make_quantized_pool_attrs(attrs, pad_value=pad_value,
input_int16=compute_type == ScalarType.int16,
requant=requant)
return quant_attrs
@classmethod
@type_check_operation_arguments(types=[PoolQuantAttrs, np.ndarray], dict_mask=[False, True])
def run_quant(cls, quant_attrs: PoolQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
data = input_dict[InputName('data')]
if not config.fast_mode or data.dtype == np.int16:
run_mode = RunMode.MLA_MODE
else:
run_mode = RunMode.FAST_MODE
out = cls.maxpool_fn(quant_attrs.pool_attrs, data, pad_value=quant_attrs.pad_value, mode=run_mode)
if quant_attrs.requant is not None:
return requantization.requantize(out, quant_attrs.requant)
else:
return out
[docs]
class MaxPool2DOp(_MaxPoolOp):
[docs]
class MaxPool3DOp(_MaxPoolOp):
class _AvgPoolOp(AwesomeOperation[AVGPOOL_TYPES, PoolQuantAttrs], Generic[AVGPOOL_TYPES]):
avgpool_fn: Callable[[AVGPOOL_TYPES, np.ndarray, Union[float, int]], np.ndarray] = op_fn.avgpool
@classmethod
def get_type(cls, attrs: Union[AvgPoolAttrs, PoolQuantAttrs]) -> NodeType:
if isinstance(attrs, AvgPoolAttrs):
in_type = out_type = attrs.scalar_type
else:
in_type = ScalarType.int16 if attrs.input_int16 else ScalarType.int8
out_type = ScalarType.from_numpy(attrs.requant.out_dtype) if attrs.requant is not None else ScalarType.int8
attrs = attrs.pool_attrs
node_type = _pooling_op_type(attrs, in_type, out_type)
return node_type
@classmethod
@type_check_operation_arguments(types=[AVGPOOL_CLASSES, np.ndarray], dict_mask=[False, True])
def run(cls, attrs: AVGPOOL_TYPES, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
data = input_dict[InputName('data')]
if not config.fast_mode or data.dtype == np.int16:
run_mode = RunMode.MLA_MODE
else:
run_mode = RunMode.FAST_MODE
return cls.avgpool_fn(attrs, data, pad_value=0, mode=run_mode)
@classmethod
@type_check_operation_arguments(types=[AVGPOOL_CLASSES, OpQuantInterface, QuantizationConfigs, NodeReporter],
dict_mask=[False, False, False, False])
def quantize(cls, attrs: AVGPOOL_TYPES, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> PoolQuantAttrs:
compute_type = _get_input_precision(quantizer_interface, config.quantization_precision.get(),
InputName('data'))
pool_size = utils.transpose_attr_according_to_layout_strings(
attrs.pool_size, attrs.layout,"NDHWC" if len(attrs.layout) == 5 else "NHWC")
divisor = math.prod(pool_size[1:-1])
if compute_type == ScalarType.int16 and divisor > 1024:
# Cannot quantize int16 with a large divisor because too many bits are needed for summation and
# requant scale factor. Switch to int8.
sima_logger.sima_log_warning(
f"AvgPool precision was reduced to int8 due to large pooling size ({divisor})"
)
compute_type = ScalarType.int8
input_quant = fix_input(quantizer_interface, compute_type, InputName('data'), config.asymmetry.get())
output_shape = get_expected_tensor_value(cls.get_type(attrs).output).shape
match compute_type:
case ScalarType.int16 if config.intermediate_int32.get():
# Perform integer summation without division.
# Incorporate the omitted divisor into the output's quantization scale.
output_quant = Quantization(scale=input_quant.quant.scale * divisor,
zero_point=input_quant.quant.zero_point * divisor,
bits=32,
min_val=input_quant.quant.min_val,
max_val=input_quant.quant.max_val)
output_scalar_type = ScalarType.int32
requant = requantization.id_requantization
# Use scaled_fz because production compiler does not do zero point correction in this operator
requant_method = RequantMethod.scaled_fz
case ScalarType.int8 | ScalarType.int16:
# Perform integer summation and quantized division.
# Quantize (1/sum_factor) = (scale_multiplier * 2**-scale_sh).
# Choose scale_multiplier small enough so that Sum(input) * scale_multiplier does not overflow.
input_bits = np.iinfo(compute_type.numpy_type()).bits
scale_sh = 31 - input_bits
scale_multiplier = round((1 << scale_sh) / divisor)
# Check if the scale_multiplier has at least 5 fractional bits
pool_bits = int(np.floor(np.log2(divisor))) + 1
if (scale_sh - pool_bits) < 5:
sima_logger.sima_log_warning(
f"AvgPool quantized divisor has {scale_sh - pool_bits} fractional bits (less than 5), "
f"pool size {divisor}, quantization precision is {compute_type}"
)
# Output quantization is same as the input
output_scalar_type = compute_type
output_quant = input_quant.quant
requant = requantization.TFLiteRequantization(sc_correction=scale_multiplier, zp_correction=0,
shift=scale_sh, rounding=RoundType.TOEVEN,
out_dtype=output_scalar_type.numpy_type())
requant_method = RequantMethod.scaled_fz
case ScalarType.bfloat16:
# Do not quantize.
output_scalar_type = compute_type
output_quant = None
requant = None
requant_method = None
case _:
raise ValueError("Unexpected type")
output_type = TensorType(output_scalar_type, output_shape)
output_qrtt = QuantResultTensorType(output_type, output_quant, requant_method)
quantizer_interface.set_chosen_output_quant(TensorValue(output_qrtt))
# Create quantization parameters
quant_attrs: Union[PoolAttrs, PoolQuantAttrs]
if output_quant is None:
quant_attrs = dataclasses.replace(attrs, scalar_type=compute_type)
else:
pad_value = input_quant.quant.zero_point
quant_attrs = make_quantized_pool_attrs(attrs, pad_value=pad_value,
input_int16=(compute_type == ScalarType.int16),
requant=requant)
return quant_attrs
@classmethod
@type_check_operation_arguments(types=[PoolQuantAttrs, np.ndarray], dict_mask=[False, True])
def run_quant(cls, quant_attrs: PoolQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
data = input_dict[InputName('data')]
if not config.fast_mode or data.dtype == np.int16:
run_mode = RunMode.MLA_MODE
else:
run_mode = RunMode.FAST_MODE
out = cls.avgpool_fn(quant_attrs.pool_attrs, data, requant=quant_attrs.requant,
pad_value=quant_attrs.pad_value,
quantized=True, rounding_type=quant_attrs.rounding_type,
mode=run_mode)
return out
[docs]
class AvgPool2DOp(_AvgPoolOp[AvgPoolAttrs]):
[docs]
class AvgPool3DOp(_AvgPoolOp[AvgPoolAttrs]):
[docs]
class AdaptiveAvgPool2DOp(_AvgPoolOp[AdaptiveAvgPool2DAttrs]):
[docs]
avgpool_fn: Callable[[AdaptiveAvgPool2DAttrs, np.ndarray], np.ndarray] = op_fn.adaptive_avgpool2d
[docs]
class VarianceOp(AwesomeOperation[VarianceAttrs, VarianceQuantAttrs]):
[docs]
var_fn = op_fn.variance
@classmethod
[docs]
def get_type(cls, attrs: VarianceAttrs | VarianceQuantAttrs) -> NodeType:
if isinstance(attrs, VarianceAttrs):
in_type = out_type = attrs.scalar_type
else:
in_type = out_type = ScalarType.int8
attrs = attrs.attrs
lhs_shape = attrs.input_data_shape
rhs_shape = attrs.mean_shape
out_shape = attrs.mean_shape
return NodeType({cls.input_list[0]: TensorValue(TensorType(in_type, lhs_shape)),
cls.input_list[1]: TensorValue(TensorType(in_type, rhs_shape))},
TensorValue(TensorType(out_type, out_shape)))
@classmethod
[docs]
def run(cls, attrs: VarianceAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray:
input_data = input_dict[InputName('data')]
mean = input_dict[InputName('mean')]
return cls.var_fn(input_data, mean)
@classmethod
[docs]
def quantize(cls, attrs: VarianceAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> VarianceAttrs | VarianceQuantAttrs:
quant_precision = config.quantization_precision.get().to_scalar_type()
compute_type = ScalarType.int8 if scalar_is_integral(quant_precision) else quant_precision
input_data_shape = attrs.input_data_shape
divisor = math.prod(input_data_shape[1:-1])
if compute_type == ScalarType.int16 and divisor > 1024:
# Cannot quantize int16 with a large divisor because too many bits are needed for summation and
# requant scale factor. Switch to int8.
sima_logger.sima_log_warning(
f"VarianceOp precision was reduced to int8 due to large pooling size ({divisor})"
)
compute_type = ScalarType.int8
data_quant = fix_input(quantizer_interface, compute_type, InputName('data'), config.asymmetry.get())
mean_quant = fix_input(quantizer_interface, compute_type, InputName('mean'), config.asymmetry.get())
output_quant = fix_output(quantizer_interface, compute_type, attrs.mean_shape, config.asymmetry.get())
if scalar_is_integral(compute_type):
# Perform integer summation and quantized division.
# Quantize (1/sum_factor) = (scale_multiplier * 2**-scale_sh).
# Choose scale_multiplier small enough so that Sum(input) * scale_multiplier does not overflow.
# Number of input bits will be 18 because of the multiplication (data - mean) * (data - mean)
input_bits = (np.iinfo(compute_type.numpy_type()).bits + 1) * 2
scale_sh = 31 - input_bits
scale_multiplier = round((1 << scale_sh) / divisor)
if scale_multiplier == 0:
raise sima_logger.UserFacingException(f"Cannot compile a pooling operator with pool size {divisor}"
f" and quantization precision {compute_type}."
f" Please recompile with higher precision to handle this size.")
# Check if the scale_multiplier has at least 5 fractional bits
pool_bits = int(np.floor(np.log2(divisor))) + 1
if (scale_sh - pool_bits) < 5:
sima_logger.sima_log_warning(
f"VarianceOp quantized divisor has {scale_sh - pool_bits} fractional bits (less than 5), "
f"pool size {divisor}, quantization precision is {compute_type}"
)
requant = requantization.TFLiteRequantization(sc_correction=scale_multiplier, zp_correction=0,
shift=scale_sh, rounding=RoundType.TOEVEN,
out_dtype=np.int32)
# Get output requantization
product_scale = data_quant.quant.scale * mean_quant.quant.scale
product_quant = Quantization(scale=product_scale, bits=32)
sc_corr, zp_corr, shift = quant_utils.requantization_tflite(product_quant, output_quant.quant)
requant_var = TFLiteRequantization(sc_correction=sc_corr, zp_correction=zp_corr, shift=shift,
rounding=RoundType.TOEVEN, out_dtype=np.int8)
return VarianceQuantAttrs(attrs, requant=requant, requant_var=requant_var)
else:
return dataclasses.replace(attrs, scalar_type=ScalarType.bfloat16)
@classmethod
[docs]
def run_quant(cls, quant_attrs: QUANT_ATTRS, input_dict: Dict[InputName, Any],
config: RunConfigs) -> Any:
input_data = input_dict[InputName('data')]
mean = input_dict[InputName('mean')]
return cls.var_fn(input_data, mean, quant_attrs.requant, quant_attrs.requant_var)
[docs]
class MultiplyOp(AwesomeOperation[MultiplyAttrs, MultiplyQuantAttrs]):
[docs]
multiply_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] = op_fn.multiply
[docs]
requantize_fn: Callable[[np.ndarray, int, Union[int, np.ndarray], int, bool, str], np.ndarray] = quant_utils.requantize
@classmethod
[docs]
def get_type(cls, attrs: Union[MultiplyAttrs, MultiplyQuantAttrs]) -> NodeType:
if isinstance(attrs, MultiplyAttrs):
in_type = out_type = attrs.scalar_type
else:
in_type = ScalarType.int16 if attrs.input_int16 else ScalarType.int8
out_type = ScalarType.from_numpy(attrs.requant.out_dtype)
assert out_type in (ScalarType.int8, ScalarType.int16, ScalarType.int32)
lhs_input_shape = attrs.lhs_input_shape
rhs_input_shape = attrs.rhs_input_shape
output_shape = _get_out_shape_for_op_with_2_inputs(attrs)
return NodeType({cls.input_list[0]: TensorValue(TensorType(in_type, lhs_input_shape)),
cls.input_list[1]: TensorValue(TensorType(in_type, rhs_input_shape))},
TensorValue(TensorType(out_type, output_shape)))
@classmethod
@type_check_operation_arguments(types=[MultiplyAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: AwesomeAttributes, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.multiply_fn(input_dict[InputName('lhs')], input_dict[InputName('rhs')])
@classmethod
[docs]
def quantize(cls, attrs: MultiplyAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) \
-> Union[MultiplyAttrs, MultiplyQuantAttrs]:
# Preferred output precision. When 16-bit is enabled, quantization may choose int32 instead of this.
compute_precision = config.quantization_precision.get().to_scalar_type()
lhs_quant = fix_input(quantizer_interface, compute_precision, InputName('lhs'), config.asymmetry.get())
rhs_quant = fix_input(quantizer_interface, compute_precision, InputName('rhs'), config.asymmetry.get())
output_shape = _get_out_shape_for_op_with_2_inputs(attrs)
quantization = quantize_output(quantizer_interface, compute_precision, output_shape,
config.asymmetry.get())
if scalar_is_integral(compute_precision):
allow_full_output_precision = compute_precision == ScalarType.int16 and config.intermediate_int32.get()
intrinsic_shift, requant, new_output_quant = \
quant_utils.quantize_multiply(lhs_quant.quant, rhs_quant.quant, quantization.quant,
allow_full_output_precision)
# Determine attributes and quantization using the result of quantize_multiply
out_scalar_type = ScalarType.from_numpy(requant.out_dtype)
requant_method = RequantMethod.fractional_zero
else:
# Floating-point multiply
requant = None
requant_method = None
out_scalar_type = compute_precision
new_output_quant = None
output_type = TensorType(out_scalar_type, output_shape)
quantization = QuantResultTensorType(output_type, new_output_quant, requant_method)
quantizer_interface.set_chosen_output_quant(TensorValue(quantization))
if new_output_quant is None:
return dataclasses.replace(attrs, scalar_type=compute_precision)
else:
return MultiplyQuantAttrs(attrs.lhs_input_shape, attrs.rhs_input_shape,
compute_precision == ScalarType.int16, intrinsic_shift,
requant, lhs_quant.quant.zero_point, rhs_quant.quant.zero_point,
quantization.quant.bits)
@classmethod
[docs]
def run_quant(cls, quant_attrs: MultiplyQuantAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> np.ndarray:
in1_data = input_dict[InputName('lhs')]
in2_data = input_dict[InputName('rhs')]
return op_fn.quantized_multiply(quant_attrs, in1_data, in2_data)
[docs]
class PadOp(AwesomeOperation[PadAttrs, AwesomeQuantAttrBase]):
[docs]
pad_fn: Callable[[PadAttrs, np.ndarray, np.ndarray], np.ndarray] = op_fn.pad
@classmethod
[docs]
def get_type(cls, attrs: Union[PadAttrs, AwesomeQuantAttrBase]) -> NodeType:
data_type = ScalarType.float32 if isinstance(attrs, PadAttrs) else ScalarType.int8
out_shape = _get_output_pad_shape(attrs)
return NodeType({cls.input_list[0]: TensorValue(TensorType(data_type, attrs.input_shape)),
cls.input_list[1]: TensorValue(TensorType(data_type, (1,)))},
TensorValue(TensorType(data_type, out_shape)))
@classmethod
@type_check_operation_arguments(types=[PadAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: PadAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
pad_value = input_dict[InputName('pad_val')]
return cls.pad_fn(attrs, input_dict[InputName('data')], pad_value)
[docs]
class MeanOp(AwesomeOperation[MeanAttrs, MeanQuantAttrs]):
[docs]
mean_fn: Callable[[MeanAttrs, np.ndarray], np.ndarray] = op_fn.mean
@classmethod
[docs]
def get_type(cls, attrs: Union[MeanAttrs, MeanQuantAttrs]) -> NodeType:
if isinstance(attrs, MeanAttrs):
node_type = node_type_for_dimension_reduction_operators(attrs, np.float32, np.float32)
else:
node_type = node_type_for_dimension_reduction_operators(attrs.attrs, np.int8, np.int8)
return node_type
@classmethod
@type_check_operation_arguments(types=[MeanAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: MeanAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.mean_fn(attrs, input_dict[InputName('data')])
@classmethod
@type_check_operation_arguments(types=[MeanAttrs, OpQuantInterface, QuantizationConfigs, NodeReporter],
dict_mask=[False, False, False, False])
[docs]
def quantize(cls, attrs: MeanAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> MeanQuantAttrs:
input_quantization = fix_input_to_int8(quantizer_interface, InputName('data'), config.asymmetry.get())
assert input_quantization.quant is not None
# Mean's output has the same quantization as its input
fix_output_from_input(quantizer_interface, tuple(get_output_shape(attrs)))
node_scales = input_quantization.quant.scale
node_zps = input_quantization.quant.zero_point
quant_attrs = MeanQuantAttrs(attrs=attrs, node_scales=node_scales, node_zps=node_zps)
return quant_attrs
@classmethod
@type_check_operation_arguments(types=[MeanQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run_quant(cls, quant_attrs: MeanQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
data = _cast_to_quant_tensor_new( input_dict[InputName('data')])
return np.round(cls.mean_fn(quant_attrs.attrs, data, quantized=True))
[docs]
class ArgMaxOp(AwesomeOperation[ArgMaxAttrs, ArgMaxQuantAttrs]):
[docs]
argmax_fn: Callable[[ArgMaxAttrs, np.ndarray], np.ndarray] = op_fn.argmax
@classmethod
[docs]
def get_type(cls, attrs: Union[ArgMaxAttrs, ArgMaxQuantAttrs]) -> NodeType:
assert isinstance(attrs, (ArgMaxAttrs, ArgMaxQuantAttrs))
if isinstance(attrs, ArgMaxQuantAttrs):
attrs = attrs.attrs
return node_type_for_dimension_reduction_operators(attrs, attrs.input_scalar_type.numpy_type(),
attrs.result_scalar_type.numpy_type())
@classmethod
[docs]
def quantize(cls, attrs: ArgMaxAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> ArgMaxQuantAttrs:
"""
Quantize argmax.
The quantized operator takes int8 or bfloat16 values and returns int32 values.
The int32 values represent an array index, not real numbers, so they do not have quantization scale.
No quantization info is saved in attrs, as argmax's computation is oblivious to quantization.
"""
input_qrtt = quantizer_interface.get_input_quant()[InputName('data')]
input_precision = get_expected_tensor_value(input_qrtt).type.scalar
quantization_precision = config.quantization_precision.get()
# Choose input type. Use the given input type if it is supported.
# Otherwise choose the type that best matches quantization_precision.
match input_precision:
case ScalarType.int8:
input_type = ScalarType.int8
case ScalarType.bfloat16:
input_type = ScalarType.bfloat16
case ScalarType.int16 | ScalarType.float32:
input_type = ScalarType.bfloat16 if quantization_precision == QuantizationPrecision.BFLOAT_16 \
else ScalarType.int8
case _:
raise ValueError("Unexpected scalar type in ArgMaxOp.quantize")
fix_input(quantizer_interface, input_type, InputName('data'), config.asymmetry.get())
output_shape = tuple(get_output_shape(attrs))
output_quant = TensorValue(QuantResultTensorType(TensorType(ScalarType.int32, output_shape), None, None))
quantizer_interface.set_chosen_output_quant(output_quant)
new_attrs = dataclasses.replace(attrs, input_scalar_type=input_type, result_scalar_type=ScalarType.int32)
return ArgMaxQuantAttrs(new_attrs) if scalar_is_integral(input_type) else new_attrs
@classmethod
@type_check_operation_arguments(types=[ArgMaxAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: ArgMaxAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.argmax_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs]
def run_quant(cls, attrs: ArgMaxQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.argmax_fn(attrs.attrs, input_dict[InputName('data')])
[docs]
class SoftmaxOp(AwesomeOperation[SoftmaxAttrs, SoftmaxQuantAttrs]):
[docs]
softmax_fn: Callable[[SoftmaxAttrs, np.ndarray], np.ndarray] = op_fn.softmax
@classmethod
[docs]
def get_type(cls, attrs: Union[SoftmaxAttrs, SoftmaxQuantAttrs]) -> NodeType:
if isinstance(attrs, SoftmaxAttrs):
scalar_type = attrs.scalar_type
else:
scalar_type = ScalarType.int16 if attrs.enable_int16 else ScalarType.int8
tensor_type = TensorType(scalar_type, attrs.input_shape)
return _unary_op_type(cls.input_list, tensor_type)
@classmethod
@type_check_operation_arguments(types=[SoftmaxAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: SoftmaxAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.softmax_fn(attrs, input_dict[InputName('data')])
@classmethod
@type_check_operation_arguments(types=[SoftmaxAttrs, OpQuantInterface, QuantizationConfigs, NodeReporter],
dict_mask=[False, False, False, False])
[docs]
def quantize(cls, attrs: SoftmaxAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) \
-> Union[SoftmaxAttrs, SoftmaxQuantAttrs]:
input_precision = get_expected_tensor_value(quantizer_interface.get_input_quant()[cls.input_list[0]]).type
compute_precision = config.quantization_precision.get().to_scalar_type()
if scalar_is_integral(compute_precision):
if input_precision.scalar != ScalarType.int8 and config.quantization_precision.get().is_int16_precision():
input_quant = fix_input_to_int16(quantizer_interface, InputName('data'), config.asymmetry.get())
output_quant = fix_output_to_int16(quantizer_interface, attrs.input_shape, config.asymmetry.get())
enable_int16 = True
else:
input_quant = fix_input_to_int8(quantizer_interface, InputName('data'), config.asymmetry.get())
output_quant = fix_output_to_int8(quantizer_interface, attrs.input_shape, config.asymmetry.get())
enable_int16 = False
intermediate_min_max = get_intermediate_min_max(quantizer_interface)
return quant_utils.quantize_softmax(attrs, input_quant.quant, output_quant.quant,
intermediate_min_max, enable_int16)
else:
fix_input(quantizer_interface, compute_precision, InputName('data'), config.asymmetry.get())
output_shape = get_expected_tensor_value(cls.get_type(attrs).output).shape
fix_output_from_input(quantizer_interface, output_shape)
return dataclasses.replace(attrs, scalar_type=compute_precision)
@classmethod
@type_check_operation_arguments(types=[SoftmaxQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run_quant(cls, quant_attrs: SoftmaxQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.softmax_fn(quant_attrs, input_dict[InputName('data')])
@classmethod
[docs]
def calibrate(cls, attrs: AWESOME_ATTRS, calib_attrs: AwesomeCalibAttrs,
input_dict: Dict[InputName, Any], config: RunConfigs) -> Any:
"""
Softmax calibration method.
Executes default calibration to get results of Softmax operation in floating point.
Additionally, calculate intermediate results and update the observers for intermediate
values.
:param attrs: AwesomeAttributes associated with this operation
:param calib_attrs: AwesomeCalibAttrs associated with operation's node.
:param input_dict: Dictionary of names (eg. 'weights' 'data') to numpy arrays
:param config: Parameters controlling how to calibrate.
:return: Output tensor(s) whose type is dependent on the subclass.
"""
# Run default calibration.
outputs = super().calibrate(attrs, calib_attrs, input_dict, config)
# Calculate intermediate values.
# i = ReduceSum(Exp(input), axis=axis, keepdims=1)
data = input_dict[InputName('data')]
sum_exp_fun = lambda x: (np.sum(np.exp(x - np.max(x, axis=attrs.axis, keepdims=True)),
axis=attrs.axis, keepdims=True))
sum_exp_output = sum_exp_fun(data)
# Update observers for intermediate values.
assert calib_attrs.intermediate_observers
assert ('sum_exp' in calib_attrs.intermediate_observers and
calib_attrs.intermediate_observers['sum_exp'] is not None)
calib_attrs.intermediate_observers['sum_exp'].update(sum_exp_output.astype(np.float32))
return outputs
[docs]
class LRNOp(AwesomeOperation[LRNAttrs, LRNQuantAttrs]):
[docs]
lrn_fn: Callable[[LRNAttrs, np.ndarray], np.ndarray] = op_fn.lrn
@classmethod
[docs]
def get_type(cls, attrs: Union[LRNAttrs, LRNQuantAttrs]) -> NodeType:
scalar_type = ScalarType.float32 if isinstance(attrs, LRNAttrs) else ScalarType.int8
tensor_type = TensorType(scalar_type, attrs.shape)
return _unary_op_type(cls.input_list, tensor_type)
@classmethod
@type_check_operation_arguments(types=[LRNAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: LRNAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.lrn_fn(attrs, input_dict[InputName('data')])
@classmethod
@type_check_operation_arguments(types=[LRNAttrs, OpQuantInterface, QuantizationConfigs, NodeReporter],
dict_mask=[False, False, False, False])
[docs]
def quantize(cls, attrs: LRNAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> LRNQuantAttrs:
input_quant = fix_input_to_int8(quantizer_interface, InputName('data'), config.asymmetry.get())
output_quant = fix_output_to_int8(quantizer_interface, attrs.shape, config.asymmetry.get())
return quant_utils.quantize_lrn(attrs, input_quant.quant, output_quant.quant)
@classmethod
@type_check_operation_arguments(types=[LRNQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run_quant(cls, quant_attrs: LRNQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
data = _cast_to_quant_tensor_new(input_dict[InputName('data')])
return cls.lrn_fn(quant_attrs, data)
[docs]
class ExtmOp(AwesomeOperation[ExtmAttrs, AwesomeQuantAttrBase]):
"""
Extremum op, can be either min or max operation. Attributes contain a boolean to determine the operation.
"""
[docs]
min_fn: Callable[[ExtmAttrs, np.ndarray], np.ndarray] = op_fn.min_op
[docs]
max_fn: Callable[[ExtmAttrs, np.ndarray], np.ndarray] = op_fn.max_op
@classmethod
[docs]
def get_type(cls, attrs: Union[ExtmAttrs, AwesomeQuantAttrBase]) -> NodeType:
assert isinstance(attrs, ExtmAttrs)
node_type = node_type_for_dimension_reduction_operators(attrs, np.float32, np.float32)
return node_type
@classmethod
@type_check_operation_arguments(types=[ExtmAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: ExtmAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
if attrs.max:
return cls.max_fn(attrs, input_dict[InputName('data')])
else:
return cls.min_fn(attrs, input_dict[InputName('data')])
[docs]
class SumOp(AwesomeOperation[SumAttrs, AwesomeQuantAttrBase]):
[docs]
sum_fn: Callable[[SumAttrs, np.ndarray], np.ndarray] = op_fn.sum_op
[docs]
requantize_fn: Callable[[np.ndarray, int, Union[int, np.ndarray], int, bool, str], np.ndarray] = \
quant_utils.requantize
@classmethod
[docs]
def get_type(cls, attrs: Union[SumAttrs, AwesomeQuantAttrBase]) -> NodeType:
assert isinstance(attrs, SumAttrs)
return node_type_for_dimension_reduction_operators(attrs, np.float32, np.float32)
@classmethod
@type_check_operation_arguments(types=[SumAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: SumAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.sum_fn(attrs, input_dict[InputName('data')])
[docs]
class ProdOp(AwesomeOperation[ProdAttrs, AwesomeQuantAttrBase]):
[docs]
prod_fn: Callable[[ProdAttrs, np.ndarray], np.ndarray] = op_fn.prod
@classmethod
[docs]
def get_type(cls, attrs: Union[ProdAttrs, QUANT_ATTRS]) -> NodeType:
node_type = node_type_for_dimension_reduction_operators(attrs, np.float32, np.float32)
return node_type
@classmethod
@type_check_operation_arguments(types=[ProdAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: ProdAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.prod_fn(attrs, input_dict[InputName('data')])
[docs]
class SubtractOp(AwesomeOperation[SubtractAttrs, SubtractQuantAttrs]):
[docs]
subtract_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] = op_fn.subtract
[docs]
requantize_fn: Callable[
[np.ndarray, int, Union[int, np.ndarray], int, bool, str], np.ndarray] = quant_utils.requantize
@classmethod
[docs]
def get_type(cls, attrs: Union[SubtractAttrs, SubtractQuantAttrs]) -> NodeType:
if isinstance(attrs, SubtractAttrs):
in_type = out_type = attrs.scalar_type
else:
in_type = ScalarType.int16 if attrs.input_int16 else ScalarType.int8
out_type = ScalarType.from_numpy(attrs.requant.out_dtype)
assert out_type in (ScalarType.int8, ScalarType.int16, ScalarType.int32)
attrs = attrs if isinstance(attrs, SubtractAttrs) else attrs.attrs
lhs_input_shape = attrs.lhs_input_shape
rhs_input_shape = attrs.rhs_input_shape
output_shape = _get_out_shape_for_op_with_2_inputs(attrs)
return NodeType({cls.input_list[0]: TensorValue(TensorType(in_type, lhs_input_shape)),
cls.input_list[1]: TensorValue(TensorType(in_type, rhs_input_shape))},
TensorValue(TensorType(out_type, output_shape)))
@classmethod
@type_check_operation_arguments(types=[AwesomeAttributes, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: SubtractAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.subtract_fn(input_dict[InputName('lhs')], input_dict[InputName('rhs')])
@classmethod
[docs]
def quantize(cls, attrs: SubtractAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) \
-> Union[SubtractAttrs, SubtractQuantAttrs]:
compute_precision = config.quantization_precision.get().to_scalar_type()
output_shape = _get_out_shape_for_op_with_2_inputs(attrs)
lhs_quantization = fix_input(quantizer_interface, compute_precision, InputName('lhs'), config.asymmetry.get())
rhs_quantization = fix_input(quantizer_interface, compute_precision, InputName('rhs'), config.asymmetry.get())
if scalar_is_integral(compute_precision):
# Select an int8 quantization
quantization = quantize_output(quantizer_interface, ScalarType.int8,
output_shape, config.asymmetry.get())
input_scales = [lhs_quantization.quant.scale, rhs_quantization.quant.scale]
input_zps = [lhs_quantization.quant.zero_point, rhs_quantization.quant.zero_point]
scale = quantization.quant.scale
zero_point = quantization.quant.zero_point
layer_bits = quantization.quant.bits
scales, zp_corr, shift = quant_utils.quantize_add_subtract(is_subtract=True,
input_scales=input_scales,
input_zps=input_zps,
scale=scale,
zero_point=zero_point,
layer_bits=layer_bits)
if compute_precision == ScalarType.int8:
# Use the int8 quantization that was selected.
# Requantize to int8 using the calculated shift and zp_corr.
requant = requantization.FractionalZeroRequantization(
1, zp_corr, requantization.Narrowing(shift, RoundType.TOEVEN, np.int8)
)
output_quantization = quantization
else:
assert compute_precision == ScalarType.int16
# Convert the int8 requantization to an int16 requantization.
# Subtract 8 from shift, but don't reduce shift below 0.
shift_adjustment = min(shift, 8)
shift -= shift_adjustment
scale *= (1 << shift_adjustment)
zero_point *= (1 << shift_adjustment)
# Create the output quantization and requantization information
quantization_16 = Quantization(scale, zero_point, bits=16,
min_val=quantization.quant.min_val,
max_val=quantization.quant.max_val)
type_16 = TensorType(ScalarType.int16, output_shape)
output_quantization = QuantResultTensorType(type_16, quantization_16, RequantMethod.fractional_zero)
requant = requantization.FractionalZeroRequantization(1, zp_corr,
utils.create_and_verify_narrowing(shift,
RoundType.TOEVEN,
np.int16))
layer_bits = 16
# Save results for quantized subtract operator
quantizer_interface.set_chosen_output_quant(TensorValue(output_quantization))
quant_attrs = SubtractQuantAttrs(attrs, compute_precision == ScalarType.int16,
requant, scales[0], scales[1], layer_bits)
else:
# Save results for floating-point subtract operator
fix_output(quantizer_interface, ScalarType.bfloat16, output_shape, config.asymmetry.get())
quant_attrs = dataclasses.replace(attrs, scalar_type=ScalarType.bfloat16)
return quant_attrs
@classmethod
[docs]
def run_quant(cls, quant_attrs: SubtractQuantAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> np.ndarray:
output = cls.subtract_fn(input_dict[InputName('lhs')].astype(np.int32) * quant_attrs.lhs_scale,
input_dict[InputName('rhs')].astype(np.int32) * quant_attrs.rhs_scale)
quantized_output = requantization.requantize(output, quant_attrs.requant)
return quantized_output
[docs]
class PowerOp(AwesomeOperation[PowerAttrs, AwesomeQuantAttrBase]):
[docs]
power_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] = op_fn.power
@classmethod
[docs]
def get_type(cls, attrs: Union[PowerAttrs, QUANT_ATTRS]) -> NodeType:
lhs_input_shape = attrs.lhs_input_shape
rhs_input_shape = attrs.rhs_input_shape
output_shape = _get_out_shape_for_op_with_2_inputs(attrs)
return NodeType({cls.input_list[0]: TensorValue(TensorType(ScalarType.float32, lhs_input_shape)),
cls.input_list[1]: TensorValue(TensorType(ScalarType.float32, rhs_input_shape))},
TensorValue(TensorType(ScalarType.float32, output_shape)))
@classmethod
@type_check_operation_arguments(types=[AwesomeAttributes, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: AwesomeAttributes, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.power_fn(input_dict[InputName('lhs')], input_dict[InputName('rhs')])
[docs]
class MaximumOp(AwesomeOperation[MaximumAttrs, AwesomeQuantAttrBase]):
[docs]
maximum_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] = op_fn.maximum
@classmethod
[docs]
def get_type(cls, attrs: Union[MaximumAttrs, AwesomeQuantAttrBase]) -> NodeType:
assert isinstance(attrs, MaximumAttrs)
shape = attrs.input_shape
data_type = ScalarType.float32
tensor_type = TensorType(scalar=data_type, shape=shape)
return _binary_op_type(input_list=cls.input_list, t=tensor_type)
@classmethod
@type_check_operation_arguments(types=[MaximumAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: MaximumAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.maximum_fn(input_dict[InputName('lhs')], input_dict[InputName('rhs')])
[docs]
class MinimumOp(AwesomeOperation[MinimumAttrs, AwesomeQuantAttrBase]):
[docs]
minimum_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] = op_fn.minimum
@classmethod
[docs]
def get_type(cls, attrs: Union[MinimumAttrs, AwesomeQuantAttrBase]) -> NodeType:
assert isinstance(attrs, MinimumAttrs)
shape = attrs.input_shape
data_type = ScalarType.float32
tensor_type = TensorType(scalar=data_type, shape=shape)
return _binary_op_type(input_list=cls.input_list, t=tensor_type)
@classmethod
@type_check_operation_arguments(types=[AwesomeAttributes, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: AwesomeAttributes, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.minimum_fn(input_dict[InputName('lhs')], input_dict[InputName('rhs')])
[docs]
class FullOp(AwesomeOperation[FullAttrs, AwesomeQuantAttrBase]):
[docs]
full_fn: Callable[[FullAttrs, np.ndarray], np.ndarray] = op_fn.full
@classmethod
@type_check_operation_arguments(types=[FullAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: FullAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.full_fn(attrs, input_dict[InputName('fill_value')])
[docs]
class TileOp(AwesomeOperation[TileAttrs, AwesomeQuantAttrBase]):
[docs]
tile_fn: Callable[[TileAttrs, np.ndarray], np.ndarray] = op_fn.tile
@classmethod
@type_check_operation_arguments(types=[TileAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: TileAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.tile_fn(attrs, input_dict[InputName('data')])
[docs]
class PReluOp(AwesomeOperation[PReluAttrs, PReluQuantAttrs]):
[docs]
relu_fn: Callable[[np.ndarray, int], np.ndarray] = op_fn.relu
[docs]
prelu_fn: Callable[[PReluAttrs, np.ndarray], np.ndarray] = op_fn.prelu
[docs]
requantize_fn: Callable[[np.ndarray, int, Union[int, np.ndarray], int, bool, str], np.ndarray] = \
quant_utils.requantize
@classmethod
[docs]
def get_type(cls, attrs: Union[PReluAttrs, PReluQuantAttrs]) -> NodeType:
shape = attrs.input_shape
data_type = attrs.scalar_type if isinstance(attrs, PReluAttrs) else ScalarType.int8
tensor_type = TensorType(scalar=data_type, shape=shape)
return _unary_op_type(input_list=cls.input_list, t=tensor_type)
@classmethod
@type_check_operation_arguments(types=[PReluAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: PReluAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.prelu_fn(input_dict[InputName('data')], attrs.alpha, attrs.axis)
@classmethod
[docs]
def quantize(cls, attrs: PReluAttrs, quantizer_interface: OpQuantInterface,
configs: QuantizationConfigs, error_reporter: NodeReporter) -> Union[PReluAttrs, PReluQuantAttrs]:
compute_precision = configs.quantization_precision.get().to_scalar_type()
input_scalar_type = ScalarType.bfloat16 if scalar_is_floating(compute_precision) else ScalarType.int8
input_quant = fix_input(quantizer_interface, input_scalar_type, InputName('data'), configs.asymmetry.get())
fix_output_from_input(quantizer_interface, input_quant.type.shape)
if scalar_is_integral(compute_precision):
input_zp = input_quant.quant.zero_point
bits = input_quant.quant.bits
alpha_quant, shift = quant_utils.quantize_alpha(attrs.alpha, bits)
quant_attrs = PReluQuantAttrs(attrs.axis, attrs.input_shape, alpha_quant, shift, input_zp)
return quant_attrs
else:
return dataclasses.replace(attrs, scalar_type=compute_precision, alpha=attrs.alpha.astype(bfloat16))
@classmethod
[docs]
def run_quant(cls, quant_attrs: PReluQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
data = _cast_to_quant_tensor_new(input_dict[InputName('data')])
alpha = quant_attrs.quant_alpha
data_zp = quant_attrs.data_zero_point
axis = quant_attrs.axis
data = utils.transpose_axis_to_the_last(data, axis)
positive = cls.relu_fn(data, data_zp)
negative = (data.astype(np.int32) - positive) * alpha.astype(np.int32)
negative = cls.requantize_fn(data=negative, bits=8, right_shifts=quant_attrs.alpha_shift, axis=axis,
rounding_type=RoundType.UPWARD)
res = negative + positive
res = utils.transpose_axis_to_the_last(res, axis)
return res
[docs]
class BroadcastToOp(AwesomeOperation[BroadcastToAttrs, AwesomeQuantAttrBase]):
[docs]
broadcast_to_fn = op_fn.broadcast_to
@classmethod
[docs]
def get_type(cls, attrs: BroadcastToAttrs | BroadcastToQuantAttrs) -> NodeType:
input_shape = attrs.input_type.shape
dtype = attrs.input_type.scalar
output_shape = attrs.output_shape
return NodeType({cls.input_list[0]: TensorValue(TensorType(dtype, input_shape))},
TensorValue(TensorType(dtype, tuple(output_shape))))
@classmethod
@type_check_operation_arguments(types=[BroadcastToAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: BroadcastToAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.broadcast_to_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs]
def quantize(cls, attrs: BroadcastToAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> BroadcastToAttrs | BroadcastToQuantAttrs:
# Use the input type and quantization
input_precision = _get_input_precision(quantizer_interface, config.quantization_precision.get(),
InputName('data'))
input_quant = fix_input(quantizer_interface, input_precision, InputName('data'), config.asymmetry.get())
fix_output_from_input(quantizer_interface, attrs.output_shape)
scalar_type = input_quant.type.scalar
if scalar_is_integral(scalar_type):
quant_attrs = BroadcastToQuantAttrs(TensorType(scalar_type, attrs.input_type.shape), attrs.output_shape)
return quant_attrs
else:
tensor_type = TensorType(scalar=scalar_type, shape=attrs.input_type.shape)
return dataclasses.replace(attrs, input_type=tensor_type)
@classmethod
@type_check_operation_arguments(types=[BroadcastToQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run_quant(cls, attrs: BroadcastToQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.broadcast_to_fn(attrs, input_dict[InputName('data')])
###############################
# UDF(Lookup table) OPERATIONS
###############################
[docs]
class UDFOp(AwesomeOperation[UDFAttrs, UDFQuantAttrs], metaclass=ABCMeta):
[docs]
udf_fn: Optional[Callable[[np.ndarray], np.ndarray]] = None
@classmethod
[docs]
def get_type(cls, attrs: Union[UDFAttrs, UDFQuantAttrs]) -> NodeType:
if isinstance(attrs, UDFAttrs):
shape = attrs.input_shape
in_data_type = out_data_type = attrs.scalar_type
else:
shape = attrs.attrs.input_shape
in_data_type = ScalarType.int16 if attrs.input_int16 else ScalarType.int8
out_data_type = ScalarType.from_numpy(attrs.requant.out_dtype)
return NodeType({cls.input_list[0]: TensorValue(TensorType(scalar=in_data_type, shape=shape))},
TensorValue(TensorType(scalar=out_data_type, shape=shape)))
@classmethod
@type_check_operation_arguments(types=[UDFAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: UDFAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
if cls.udf_fn is None:
raise NotImplementedError(f"Please implement the UDF function for {cls.__class__}")
if attrs.scalar_type == ScalarType.bfloat16:
out = cls.udf_fn(input_dict[InputName('data')])
out = out.astype(bfloat16)
else:
out = cls.udf_fn(input_dict[InputName('data')]).astype(np.float32)
return out
@classmethod
[docs]
def quantize(cls, attrs: UDFAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> Union[UDFAttrs, UDFQuantAttrs]:
def udf_scalar(x: float) -> float:
return float(cls.udf_fn(np.array(x, dtype=np.float32)).item())
input_name = InputName('data')
input_precision = get_expected_tensor_value(quantizer_interface.get_input_quant()[input_name]).type
compute_precision = config.quantization_precision.get().to_scalar_type()
if scalar_is_integral(compute_precision):
if input_precision.scalar != ScalarType.int8 and compute_precision == ScalarType.int16:
input_quant = fix_input_to_int16(quantizer_interface, input_name, config.asymmetry.get())
input_int16 = True
else:
input_quant = fix_input_to_int8(quantizer_interface, input_name, config.asymmetry.get())
input_int16 = False
out_scalar_type = ScalarType.int8 if compute_precision == ScalarType.int8 else ScalarType.int16
output_quant = quantize_output(quantizer_interface, out_scalar_type, input_precision.shape,
config.asymmetry.get(), RequantMethod.arith_folded)
lut_input_type = np.int16 if input_int16 else np.int8
lookup_table = quant_utils.quantize_udf(input_quant.quant, output_quant.quant, lut_input_type,
out_scalar_type.numpy_type(), udf_scalar)
quantizer_interface.set_chosen_output_quant(TensorValue(output_quant))
requant = requantization.narrowing_requantization(shift=0, rounding=RoundType.TOEVEN,
out_dtype=out_scalar_type.numpy_type())
return UDFQuantAttrs(lookup_table=lookup_table, attrs=attrs, input_int16=input_int16, requant=requant)
else:
fix_input(quantizer_interface, compute_precision, InputName('data'), config.asymmetry.get())
output_shape = get_expected_tensor_value(cls.get_type(attrs).output).shape
fix_output_from_input(quantizer_interface, output_shape)
return dataclasses.replace(attrs, scalar_type=compute_precision)
@classmethod
[docs]
def run_quant(cls, quant_attrs: UDFQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
lut = quant_attrs.lookup_table
output = requantization.requantize(ideal_udf(input_dict[InputName('data')], table=lut),
quant_attrs.requant)
return output
[docs]
class SqrtOp(UDFOp):
[docs]
udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.sqrt
[docs]
class RsqrtOp(UDFOp):
[docs]
udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.rsqrt
[docs]
class TanhOp(UDFOp):
[docs]
udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.tanh
[docs]
class SigmoidOp(UDFOp):
[docs]
udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.sigmoid
[docs]
class LogOp(UDFOp):
[docs]
udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.log
[docs]
class Log2Op(UDFOp):
[docs]
udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.log2
[docs]
class Log10Op(UDFOp):
[docs]
udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.log10
[docs]
class ReciprocalOp(UDFOp):
[docs]
udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.reciprocal
[docs]
class EluOp(UDFOp):
[docs]
udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.elu
[docs]
class SoftplusOp(UDFOp):
[docs]
udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.softplus
[docs]
class ErfOp(UDFOp):
[docs]
udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.erf
[docs]
class GeluOp(UDFOp):
[docs]
udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.gelu
[docs]
class DivideOp(AwesomeOperation[DivideAttrs, DivideQuantAttrs], metaclass=ABCMeta):
[docs]
divide_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] = op_fn.divide
[docs]
reciprocal_op: ReciprocalOp = ReciprocalOp()
[docs]
multiply_op: MultiplyOp = MultiplyOp()
@classmethod
[docs]
def get_type(cls, attrs: Union[DivideAttrs, DivideQuantAttrs]) -> NodeType:
udf_type = cls.reciprocal_op.get_type(attrs.udf_attrs)
mul_type = cls.multiply_op.get_type(attrs.multiply_attrs)
return NodeType(
{
cls.input_list[0]: mul_type.inputs[cls.multiply_op.input_list[0]],
cls.input_list[1]: udf_type.inputs[cls.reciprocal_op.input_list[0]]
},
mul_type.output
)
@classmethod
@type_check_operation_arguments(types=[AwesomeAttributes, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: AwesomeAttributes, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.divide_fn(input_dict[InputName('lhs')], input_dict[InputName('rhs')])
@classmethod
[docs]
def calibrate(cls, attrs: AWESOME_ATTRS, calib_attrs: AwesomeCalibAttrs,
input_dict: Dict[InputName, Any], config: RunConfigs) -> Any:
"""
DivideOp calibration method.
Executes default calibration to get results of Divide operation in floating point.
Additionally, calculate intermediate results for reciprocal(rhs) and update the
observer for intermediate values.
:param attrs: AwesomeAttributes associated with this operation
:param calib_attrs: AwesomeCalibAttrs associated with operation's node.
:param input_dict: Dictionary of names (eg. 'weights' 'data') to numpy arrays
:param config: Parameters controlling how to calibrate.
:return: Output tensor(s) whose type is dependent on the subclass.
"""
# Run default calibration.
outputs = super().calibrate(attrs, calib_attrs, input_dict, config)
# Calculate intermediate values.
data = input_dict[InputName('rhs')]
rhs_reciprocal = np.reciprocal(data).astype(np.float32)
# Replace inf values with the next non inf maximum number.
inf_mask = rhs_reciprocal == np.inf
rhs_reciprocal[inf_mask] = -np.inf
rhs_reciprocal[inf_mask] = np.max(rhs_reciprocal)
# Update observers for intermediate values.
assert calib_attrs.intermediate_observers
assert ('rhs_reciprocal' in calib_attrs.intermediate_observers and
calib_attrs.intermediate_observers['rhs_reciprocal'] is not None)
calib_attrs.intermediate_observers['rhs_reciprocal'].update(rhs_reciprocal)
return outputs
@classmethod
[docs]
def quantize(cls, attrs: DivideAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> DivideQuantAttrs:
from afe.ir.quantization_interface import make_quantize_op_interface
# Quantize 1/rhs as ReciprocalOp, using UDFOp infrastructure.
# Extract data from quantizer_interface in order to crate a new interface to be used
# for quantization of the reciprocal (UDF) part.
udf_input_name = cls.input_list[-1]
udf_input_data = {
cls.reciprocal_op.input_list[0]: (
quantizer_interface.get_input_quant()[udf_input_name],
quantizer_interface.get_input_distributions()[udf_input_name],
quantizer_interface.get_calibration_data()
)
}
intermediate_distributions = quantizer_interface.get_intermediate_distributions()
assert 'rhs_reciprocal' in intermediate_distributions
udf_output_distribution = intermediate_distributions['rhs_reciprocal']
placeholder_data = quantizer_interface.get_placeholder_quant()
# Create quantization interface for reciprocal part.
udf_quant_interface, udf_quant_result = \
make_quantize_op_interface(udf_input_data, placeholder_data, udf_output_distribution, None)
udf_quant_attrs = cls.reciprocal_op.quantize(attrs.udf_attrs, udf_quant_interface, config, error_reporter)
# Quantize DivideOp as MultiplyOp(lhs, 1/rhs).
# Extract data from quant_interface and udf_quant_interface in order to create a new interface
# to be used for quantization of the multiplication part.
mul_input_data = {
cls.multiply_op.input_list[0]: (
quantizer_interface.get_input_quant()[cls.input_list[0]],
quantizer_interface.get_input_distributions()[cls.input_list[0]],
None
),
cls.multiply_op.input_list[1]: (
udf_quant_result.get_result().output,
udf_quant_interface.get_output_distribution(),
None
)
}
# Create quantization interface for multiplication part.
mul_quant_interface, mul_quant_result = \
make_quantize_op_interface(mul_input_data, placeholder_data,
quantizer_interface.get_output_distribution(), None)
mul_quant_attrs = cls.multiply_op.quantize(attrs.multiply_attrs, mul_quant_interface, config, error_reporter)
# Use results from UDF and multiplication parts to set chosen values in quantizer_interface.
quantizer_interface.set_chosen_input_quant(
cls.input_list[0], mul_quant_result.get_result().inputs[cls.multiply_op.input_list[0]]
)
quantizer_interface.set_chosen_input_quant(
cls.input_list[1], udf_quant_result.get_result().inputs[cls.reciprocal_op.input_list[0]]
)
quantizer_interface.set_chosen_output_quant(mul_quant_result.get_result().output)
if isinstance(udf_quant_attrs, UDFAttrs):
assert isinstance(mul_quant_attrs, MultiplyAttrs)
return DivideAttrs(udf_quant_attrs, mul_quant_attrs)
else:
return DivideQuantAttrs(udf_quant_attrs, mul_quant_attrs)
@classmethod
[docs]
def run_quant(cls, quant_attrs: DivideQuantAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> np.ndarray:
# Create input_dict for reciprocal part and run as UDFOp.
udf_input_dict = {cls.reciprocal_op.input_list[0]: input_dict[cls.input_list[1]]}
rhs_reciprocal = cls.reciprocal_op.run_quant(quant_attrs.udf_attrs, udf_input_dict, config)
# Create input_dict for multiplication part and run as MultiplyOp.
mul_input_dict = {
cls.multiply_op.input_list[0]: input_dict[cls.input_list[0]],
cls.multiply_op.input_list[1]: rhs_reciprocal
}
return cls.multiply_op.run_quant(quant_attrs.multiply_attrs, mul_input_dict, config)
[docs]
class ExpOp(UDFOp):
[docs]
udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.exp
[docs]
class SwishOp(UDFOp):
[docs]
udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.swish
[docs]
class HardSigmoidOp(UDFOp):
[docs]
udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.hard_sigmoid
[docs]
class HardSwishOp(UDFOp):
[docs]
udf_fn: Callable[[np.ndarray], np.ndarray] = op_fn.hard_swish
[docs]
class UpsamplingOp(AwesomeOperation[UpsamplingAttrs, UpsamplingQuantAttrs]):
[docs]
upsampling_fn: Callable[[np.ndarray], np.ndarray] = op_fn.upsample
@classmethod
[docs]
def get_type(cls, attrs: Union[UpsamplingAttrs, UpsamplingQuantAttrs]) -> NodeType:
if isinstance(attrs, UpsamplingAttrs):
scalar_type = attrs.scalar_type
uattrs = attrs
else:
assert isinstance(attrs, UpsamplingQuantAttrs)
scalar_type = ScalarType.int16 if attrs.input_int16 else ScalarType.int8
uattrs = attrs.upsampling_attrs
input_shape = uattrs.input_shape
layout = uattrs.layout
# Multiply H, W size by the upsampling scale factors to produce output shape
input_shape_h, input_shape_w = utils.transpose_attr_according_to_layout_strings(input_shape, layout, "HW")
output_shape_h = int(input_shape_h * uattrs.scale_h)
output_shape_w = int(input_shape_w * uattrs.scale_w)
output_shape = utils.insert_according_to_layout_strings(input_shape, (output_shape_h, output_shape_w),
layout, "HW")
input_type = TensorType(scalar_type, input_shape)
output_type = TensorType(scalar_type, output_shape)
return NodeType({'data': TensorValue(input_type)}, TensorValue(output_type))
@classmethod
@type_check_operation_arguments(types=[UpsamplingAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: UpsamplingAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.upsampling_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs]
def quantize(cls, attrs: UpsamplingAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter)\
-> Union[UpsamplingAttrs, UpsamplingQuantAttrs]:
compute_precision = config.quantization_precision.get().to_scalar_type()
input_quant = fix_input(quantizer_interface, compute_precision, InputName('data'), config.asymmetry.get())
output_shape = get_expected_tensor_value(cls.get_type(attrs).output).shape
fix_output_from_input(quantizer_interface, output_shape)
if compute_precision == ScalarType.bfloat16:
return dataclasses.replace(attrs, scalar_type=compute_precision)
else:
input_zp = input_quant.quant.zero_point
input_scale = input_quant.quant.scale
quant_attrs = UpsamplingQuantAttrs(upsampling_attrs=attrs,
input_zp=input_zp,
input_scale=input_scale,
input_int16=compute_precision == ScalarType.int16)
return quant_attrs
@classmethod
@type_check_operation_arguments(types=[UpsamplingQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run_quant(cls, quant_attrs: UpsamplingQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.upsampling_fn(quant_attrs.upsampling_attrs, input_dict[InputName('data')],
rounding=quant_attrs.rounding_type)
[docs]
class ImageResize2DOp(AwesomeOperation[ImageResize2DAttrs, ImageResize2DQuantAttrs]):
[docs]
image_resize_fn: Callable[[ImageResize2DAttrs, np.ndarray], np.ndarray] = op_fn.image_resize2d
@classmethod
[docs]
def get_type(cls, attrs: Union[ImageResize2DAttrs, ImageResize2DQuantAttrs]) -> NodeType:
if isinstance(attrs, ImageResize2DAttrs):
in_dtype = out_dtype = scalar_type_from_dtype(attrs.out_dtype)
else:
in_dtype = ScalarType.int16 if attrs.input_int16 else ScalarType.int8
out_dtype = ScalarType.from_numpy(attrs.requant.out_dtype) if attrs.requant is not None else ScalarType.int8
attrs = attrs if isinstance(attrs, ImageResize2DAttrs) else attrs.image_resize2d_attrs
input_shape = attrs.input_shape
output_shape = _get_image_resize2d_out_shape(attrs)
return NodeType({cls.input_list[0]: TensorValue(TensorType(in_dtype, input_shape))},
TensorValue(TensorType(out_dtype, output_shape)))
@classmethod
@type_check_operation_arguments(types=[ImageResize2DAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: ImageResize2DAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.image_resize_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs]
def quantize(cls, attrs: ImageResize2DAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter)\
-> Union[ImageResize2DAttrs,ImageResize2DQuantAttrs]:
"""
In MLA implementation of resize, output type is the same as input type.
There is no intermediate int32 result. Always use int8, if integer scaling factor != (1, 2, 4).
<input_type> <enable_int16> <input_quant> <resize_kernel> <output_type>
int8 True int8 int8 int8
int8 False int8 int8 int8
int16 False int8 int8 int8
int16 True int16 int16 int16
"""
input_precision = _get_input_precision(quantizer_interface, config.quantization_precision.get(),
InputName('data'))
h_axis, w_axis = attrs.layout.find('H'), attrs.layout.find('W')
scaling_h, mod_h = divmod(attrs.size[0], attrs.input_shape[h_axis])
scaling_w, mod_w = divmod(attrs.size[1], attrs.input_shape[w_axis])
use_int16 = False
if not all([sc in (1, 2, 4) for sc in (scaling_h, scaling_w)]) or not all([m == 0 for m in (mod_h, mod_w)]):
# If the integer scaling factor is not in (1, 2, 4), use int8 quantization for input and output.
io_type = ScalarType.int8
elif scalar_is_integral(input_precision):
use_int16 = (config.quantization_precision.get().is_int16_precision()
and (input_precision == ScalarType.int16))
io_type = ScalarType.int16 if use_int16 else ScalarType.int8
else:
io_type = ScalarType.bfloat16
input_quant = fix_input(quantizer_interface, io_type, InputName('data'), config.asymmetry.get())
output_shape = get_expected_tensor_value(cls.get_type(attrs).output).shape
output_quant = quantize_output(quantizer_interface, io_type, output_shape,
config.asymmetry.get())
quantizer_interface.set_chosen_output_quant(TensorValue(output_quant))
if scalar_is_integral(io_type):
requant = requantization.narrowing_requantization(shift=0, rounding=RoundType.TOEVEN,
out_dtype=io_type.numpy_type())
input_zp = input_quant.quant.zero_point
input_scale = input_quant.quant.scale
quant_attrs = ImageResize2DQuantAttrs(image_resize2d_attrs=attrs,
input_zp=input_zp, input_scale=input_scale,
input_int16=use_int16, requant=requant)
return quant_attrs
else:
return dataclasses.replace(attrs, out_dtype='bfloat16')
@classmethod
@type_check_operation_arguments(types=[ImageResize2DQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run_quant(cls, quant_attrs: ImageResize2DQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
data = input_dict[InputName('data')]
out = cls.image_resize_fn(quant_attrs.image_resize2d_attrs, data,
rounding=quant_attrs.rounding_type)
if quant_attrs.requant is not None:
return requantization.requantize(out, quant_attrs.requant)
else:
return out
[docs]
class GridSampleOp(AwesomeOperation[GridSampleAttrs, AwesomeQuantAttrBase]):
[docs]
gridsample_fn: Callable[[GridSampleAttrs, np.ndarray, np.ndarray], np.ndarray] = op_fn.gridsample
@classmethod
[docs]
def get_type(cls, attrs: GridSampleAttrs) -> NodeType:
assert isinstance(attrs, GridSampleAttrs)
in_data_type = out_data_type = attrs.scalar_type
input_shape = attrs.input_shape
grid_shape = attrs.grid_shape
assert len(input_shape) == 4 and grid_shape[-1] == 2, "Only 2D GridSample is supported"
# We are dealing with NHWC layout here
H_out, W_out = grid_shape[1], grid_shape[2]
output_shape = (input_shape[0], H_out, W_out, input_shape[-1])
return NodeType({cls.input_list[0]: TensorValue(TensorType(scalar=in_data_type, shape=input_shape)),
cls.input_list[1]: TensorValue(TensorType(scalar=in_data_type, shape=grid_shape))},
TensorValue(TensorType(scalar=out_data_type, shape=output_shape)))
@classmethod
@type_check_operation_arguments(types=[GridSampleAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: GridSampleAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.gridsample_fn(attrs, input_dict[InputName('data')], input_dict[InputName('grid')])
@classmethod
[docs]
def quantize(cls, attrs: GridSampleAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter)\
-> GridSampleAttrs:
compute_precision = config.quantization_precision.get().to_scalar_type()
assert compute_precision == ScalarType.bfloat16, "Only bfloat16 is supported for quantization of GridSample"
fix_input(quantizer_interface, compute_precision, InputName('data'), config.asymmetry.get())
fix_input(quantizer_interface, compute_precision, InputName('grid'), config.asymmetry.get())
output_shape = get_expected_tensor_value(cls.get_type(attrs).output).shape
fix_output(quantizer_interface, compute_precision, output_shape, config.asymmetry.get())
return dataclasses.replace(attrs, scalar_type=compute_precision)
#################################
# TENSOR MANIPULATION OPERATIONS
#################################
[docs]
class TupleOp(AwesomeOperation[TupleAttrs, AwesomeQuantAttrBase]):
"""
TupleOp takes in multiple tensors, returns a tuple
"""
# Tuple can have a various number of inputs so we can't establish input names across all tuple attrs
[docs]
tuple_fn: Callable[[List[np.ndarray]], tuple] = tuple
@classmethod
[docs]
def get_type(cls, attrs: TupleAttrs) -> NodeType:
input_types = [TensorValue(input_type) for input_type in attrs.input_types]
return NodeType({'input_{}'.format(i): i_type for i, i_type in enumerate(input_types)},
TupleValue(input_types))
@classmethod
@type_check_operation_arguments(types=[TupleAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, _: TupleAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> Tuple[np.ndarray, ...]:
return cls.tuple_fn(input_dict.values())
@classmethod
[docs]
def quantize(cls, attrs: TupleAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> TupleAttrs:
# Inputs are propagated to outputs. Type or quantization is not changed.
for input_name, input_quant in quantizer_interface.get_input_quant().items():
quantizer_interface.set_chosen_input_quant(input_name, input_quant)
output_quant = TupleValue(list(quantizer_interface.get_input_quant().values()))
quantizer_interface.set_chosen_output_quant(output_quant)
# Update attributes to use the quantized types
attrs.input_types = [TensorType(input_quant.value.type.scalar, input_type.shape)
for input_quant, input_type
in zip(quantizer_interface.get_input_quant().values(), attrs.input_types)]
return attrs
@classmethod
[docs]
def get_observed_distribution(cls, calib_attrs: AwesomeCalibAttrs,
inputs: Dict[InputName, QuantizationTensorData]) \
-> Tuple[Optional[ObservedDistribution], Dict[str, ObservedDistribution]]:
return None, {}
[docs]
class TupleGetItemOp(AwesomeOperation[TupleGetItemAttrs, AwesomeQuantAttrBase]):
"""
TupleGetItemOp takes in a tuple, returns a tensor
"""
[docs]
tuple_get_item_fn: Callable[[TupleGetItemAttrs, tuple], np.ndarray] = op_fn.tuple_get_item
@classmethod
[docs]
def get_type(cls, attrs: TupleGetItemAttrs) -> NodeType:
return NodeType({cls.input_list[0]: TupleValue([TensorValue(input_type) for input_type in attrs.input_types])},
TensorValue(attrs.input_types[attrs.index]))
@classmethod
@type_check_operation_arguments(types=[TupleGetItemAttrs, tuple], dict_mask=[False, True])
[docs]
def run(cls, attrs: TupleGetItemAttrs, input_dict: Dict[InputName, tuple],
config: RunConfigs) -> np.ndarray:
return cls.tuple_get_item_fn(attrs, input_dict[InputName('tuple_value')])
@classmethod
[docs]
def quantize(cls, attrs: TupleGetItemAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> TupleGetItemAttrs:
value = quantizer_interface.get_input_quant()[InputName('tuple_value')]
assert isinstance(value, TupleValue), "Expected quantization scale of TupleGetItemOp's input to be a TupleValue"
quantizer_interface.set_chosen_input_quant(InputName('tuple_value'), value) # Quantization is not changed
quantizer_interface.set_chosen_output_quant(value.elements[attrs.index])
# Update attributes to use the quantized types
attrs.input_types = [TensorType(input_quant.value.type.scalar, input_type.shape)
for input_quant, input_type
in zip(value.elements, attrs.input_types)]
return attrs
@classmethod
[docs]
def get_observed_distribution(cls, calib_attrs: AwesomeCalibAttrs,
inputs: Dict[InputName, QuantizationTensorData]) \
-> Tuple[Optional[ObservedDistribution], Dict[str, ObservedDistribution]]:
return None, {}
[docs]
class SqueezeOp(AwesomeOperation[SqueezeAttrs, AwesomeQuantAttrBase]):
[docs]
squeeze_fn: Callable[[SqueezeAttrs, np.ndarray], np.ndarray] = op_fn.squeeze
@classmethod
[docs]
def get_type(cls, attrs: Union[SqueezeAttrs, QUANT_ATTRS]) -> NodeType:
input_shape = attrs.input_shape
output_shape = get_squeeze_out_shape(attrs.axis, input_shape)
dtype = attrs.input_type
return NodeType({cls.input_list[0]: TensorValue(TensorType(dtype, input_shape))},
TensorValue(TensorType(dtype, tuple(output_shape))))
@classmethod
[docs]
def run(cls, attrs: SqueezeAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.squeeze_fn(attrs, input_dict[InputName('data')])
[docs]
class ConcatenateOp(AwesomeOperation[ConcatenateAttrs, ConcatQuantAttrs]):
# ConcatenateOp can have a various number of inputs so we can't establish input names across all inputs
[docs]
concatenate_fn: Callable[[ConcatenateAttrs, List[np.ndarray]], np.ndarray] = op_fn.concatenate
[docs]
requantize_fn: Callable[[np.ndarray, int, Union[int, np.ndarray], int, bool, str], np.ndarray] = \
quant_utils.requantize
@classmethod
[docs]
def get_type(cls, attrs: Union[ConcatenateAttrs, ConcatQuantAttrs]) -> NodeType:
if isinstance(attrs, ConcatenateAttrs):
out_dtype = attrs.scalar_type
input_types = [TensorValue(input_type) for input_type in attrs.input_types]
out_shape = _get_concat_out_shape(attrs)
elif isinstance(attrs, TupleConcatenateAttrs):
out_dtype = attrs.concat_attrs.scalar_type
input_types = [TensorValue(input_type) for input_type in attrs.concat_attrs.input_types]
out_shape = _get_concat_out_shape(attrs.concat_attrs)
else:
out_dtype = ScalarType.from_numpy(attrs.requants[0].out_dtype) if attrs.requants is not None \
else ScalarType.int8
input_types = [TensorValue(input_type) for input_type in attrs.attrs.input_types]
out_shape = _get_concat_out_shape(attrs.attrs)
return NodeType({'input_{}'.format(i): input_type for i, input_type in enumerate(input_types)},
TensorValue(TensorType(out_dtype, out_shape)))
@classmethod
@type_check_operation_arguments(types=[ConcatenateAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: ConcatenateAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.concatenate_fn(attrs, list(input_dict.values()))
@classmethod
@type_check_operation_arguments(types=[ConcatenateAttrs, OpQuantInterface, QuantizationConfigs, NodeReporter],
dict_mask=[False, False, False, False])
[docs]
def quantize(cls, attrs: ConcatenateAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) \
-> Union[ConcatenateAttrs, ConcatQuantAttrs]:
assert len(attrs.input_types) == len(quantizer_interface.get_input_quant())
input_precisions = []
for i in range(len(attrs.input_types)):
input_name = InputName("input_{}".format(i))
ip = _get_input_precision(quantizer_interface, config.quantization_precision.get(), input_name)
input_precisions.append(ip)
# Quantize to int8 if inputs are int8 or don't use the same precision.
io_type = ScalarType.int8
use_bfloat16 = False
use_int16 = False
if len(set(input_precisions)) == 1:
# All input precisions are the same. Set I/O type and precision flags.
io_type = input_precisions[0]
use_bfloat16 = (
config.quantization_precision.get().to_scalar_type() == ScalarType.bfloat16
and scalar_is_floating(input_precisions[0])
)
use_int16 = config.quantization_precision.get().is_int16_precision() and \
input_precisions[0] == ScalarType.int16
# Quantize inputs and output with the same type.
input_quants = []
for i in range(len(attrs.input_types)):
input_name = InputName("input_{}".format(i))
q = fix_input(quantizer_interface, io_type, input_name, config.asymmetry.get())
input_quants.append(q)
output_quant = fix_output(quantizer_interface, io_type, _get_concat_out_shape(attrs), config.asymmetry.get())
if use_bfloat16:
return dataclasses.replace(attrs, scalar_type=io_type)
else:
# Calculate requantization factors
input_scales = [q.quant.scale for q in input_quants]
out_dtype = np.int16 if use_int16 else np.int8
sc_correction_bits = 32 if use_int16 else 8 # The int8 algorithm stores sc_corr in 8 bits
requants = []
for q in input_quants:
sc_corr, zp_corr, shift = quant_utils.requantization(q.quant, output_quant.quant,
sc_correction_bits=sc_correction_bits)
rq = requantization.FractionalZeroRequantization(sc_corr, zp_corr,
utils.create_and_verify_narrowing(shift,
RoundType.TOEVEN,
out_dtype))
requants.append(rq)
# Create quantization parameters
attrs = dataclasses.replace(attrs, input_types=[dataclasses.replace(t, scalar=io_type) for t in attrs.input_types])
quant_attrs: ConcatQuantAttrs = \
ConcatQuantAttrs(attrs=attrs,
requants=requants,
layer_bits=[8],
input_scales=input_scales,
node_scales=[output_quant.quant.scale],
node_zps=[output_quant.quant.zero_point])
return quant_attrs
@classmethod
@type_check_operation_arguments(types=[ConcatQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run_quant(cls, quant_attrs: ConcatQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
data_list = []
for data, requant in zip(list(input_dict.values()), quant_attrs.requants):
output_8 = requantization.requantize(data.astype(np.int32), requant)
data_list.append(output_8)
quantized_output = cls.concatenate_fn(quant_attrs.attrs, data_list)
return quantized_output
[docs]
class TransposeOp(AwesomeOperation[TransposeAttrs, AwesomeQuantAttrBase]):
[docs]
transpose_fn: Callable[[TransposeAttrs, np.ndarray], np.ndarray] = op_fn.transpose
@classmethod
[docs]
def get_type(cls, attrs: Union[TransposeAttrs, QUANT_ATTRS]) -> NodeType:
input_shape = attrs.input_shape
output_shape = _get_transpose_out_shape(attrs)
dtype = attrs.input_type
return NodeType({cls.input_list[0]: TensorValue(TensorType(dtype, input_shape))},
TensorValue(TensorType(dtype, tuple(output_shape))))
@classmethod
@type_check_operation_arguments(types=[TransposeAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: TransposeAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.transpose_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs]
def quantize(cls, attrs: TransposeAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> TransposeAttrs:
compute_precision = config.quantization_precision.get().to_scalar_type()
input_type = ScalarType.int8 if scalar_is_integral(compute_precision) else ScalarType.bfloat16
input_quant = fix_input(quantizer_interface, input_type, InputName('data'), config.asymmetry.get())
output_shape = _get_transpose_out_shape(attrs)
fix_output_from_input(quantizer_interface, output_shape)
# Update attributes to use the specified type.
attrs.input_type = input_quant.type.scalar
return attrs
[docs]
class DepthToSpaceOp(AwesomeOperation[DepthToSpaceAttrs, AwesomeQuantAttrBase]):
[docs]
depth_to_space_fn: Callable[[DepthToSpaceAttrs, np.ndarray], np.ndarray] = op_fn.depth_to_space
@classmethod
[docs]
def get_type(cls, attrs: Union[DepthToSpaceAttrs, QUANT_ATTRS]) -> NodeType:
input_shape = attrs.input_shape
output_shape = _get_depth_to_space_out_shape(attrs)
dtype = attrs.input_type
return NodeType({cls.input_list[0]: TensorValue(TensorType(dtype, input_shape))},
TensorValue(TensorType(dtype, tuple(output_shape))))
@classmethod
@type_check_operation_arguments(types=[DepthToSpaceAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: DepthToSpaceAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.depth_to_space_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs]
def quantize(cls, attrs: DepthToSpaceAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> DepthToSpaceAttrs:
compute_precision = config.quantization_precision.get().to_scalar_type()
input_type = ScalarType.int8 if scalar_is_integral(compute_precision) else ScalarType.bfloat16
input_quant = fix_input(quantizer_interface, input_type, InputName('data'), config.asymmetry.get())
output_shape = _get_depth_to_space_out_shape(attrs)
fix_output_from_input(quantizer_interface, output_shape)
return dataclasses.replace(attrs, input_type=input_quant.type.scalar)
[docs]
class ReshapeOp(AwesomeOperation[ReshapeAttrs, AwesomeQuantAttrBase]):
[docs]
reshape_fn: Callable[[ReshapeAttrs, np.ndarray], np.ndarray] = op_fn.reshape
@classmethod
[docs]
def get_type(cls, attrs: ReshapeAttrs) -> NodeType:
data_type = attrs.dtype
return NodeType({cls.input_list[0]: TensorValue(TensorType(data_type, attrs.input_shape))},
TensorValue(TensorType(data_type, tuple(attrs.newshape))))
@classmethod
@type_check_operation_arguments(types=[ReshapeAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: ReshapeAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.reshape_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs]
def quantize(cls, attrs: ReshapeAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> ReshapeAttrs:
q = keep_input(quantizer_interface, InputName('data'))
fix_output_from_input(quantizer_interface, tuple(attrs.newshape))
attrs.dtype = q.type.scalar
return attrs
@classmethod
[docs]
def get_observed_distribution(cls, calib_attrs: AwesomeCalibAttrs,
inputs: Dict[InputName, QuantizationTensorData]) \
-> Tuple[Optional[ObservedDistribution], Dict[str, ObservedDistribution]]:
return None, {}
[docs]
class ExpandDimsOp(AwesomeOperation[ExpandDimsAttrs, AwesomeQuantAttrBase]):
[docs]
expand_dims_fn: Callable[[ReshapeAttrs, np.ndarray], np.ndarray] = op_fn.expand_dims
@classmethod
[docs]
def get_type(cls, attrs: Union[ExpandDimsAttrs, QUANT_ATTRS]) -> NodeType:
input_shape = attrs.input_shape
output_shape = get_expand_dims_out_shape(attrs)
dtype = attrs.input_type
return NodeType({cls.input_list[0]: TensorValue(TensorType(dtype, input_shape))},
TensorValue(TensorType(dtype, tuple(output_shape))))
@classmethod
@type_check_operation_arguments(types=[ExpandDimsAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: ExpandDimsAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.expand_dims_fn(attrs, input_dict[InputName('data')])
[docs]
class SplitOp(AwesomeOperation[SplitAttrs, AwesomeQuantAttrBase]):
"""
SplitOp takes in one tensor, returns a tuple
"""
[docs]
split_fn: Callable[[SplitAttrs, np.ndarray], np.ndarray] = op_fn.split
@classmethod
[docs]
def get_type(cls, attrs: Union[SplitAttrs, QUANT_ATTRS]) -> NodeType:
input_shape = attrs.input_shape
outputs = _get_split_out_shape(attrs)
dtype = attrs.input_type
return NodeType({cls.input_list[0]: TensorValue(TensorType(dtype, input_shape))},
TupleValue([TensorValue(TensorType(dtype, tuple(output))) for output in outputs]))
@classmethod
@type_check_operation_arguments(types=[SplitAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: SplitAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> Tuple[np.ndarray, ...]:
return cls.split_fn(attrs, input_dict[InputName('data')])
[docs]
class TakeOp(AwesomeOperation[TakeAttrs, AwesomeQuantAttrBase]):
[docs]
take_fn: Callable[[TakeAttrs, np.ndarray, np.ndarray], np.ndarray] = op_fn.take
@classmethod
[docs]
def get_type(cls, attrs: Union[TakeAttrs, QUANT_ATTRS]) -> NodeType:
input_shape, indices_shape = attrs.input_shape, attrs.indices_shape
dtype = attrs.input_type
output_shape = _get_take_out_shape(attrs)
return NodeType({cls.input_list[0]: TensorValue(TensorType(dtype, input_shape)),
cls.input_list[1]: TensorValue(TensorType(ScalarType.int32, indices_shape))},
TensorValue(TensorType(dtype, output_shape)))
@classmethod
@type_check_operation_arguments(types=[TakeAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: TakeAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.take_fn(attrs, input_dict[InputName('data')], input_dict[InputName('indices')])
[docs]
class StridedSliceOp(AwesomeOperation[StridedSliceAttrs, AwesomeQuantAttrBase]):
[docs]
strided_slice_fn: Callable[[StridedSliceAttrs, np.ndarray], np.ndarray] = op_fn.strided_slice
@classmethod
[docs]
def get_type(cls, attrs: Union[StridedSliceAttrs, QUANT_ATTRS]) -> NodeType:
input_shape = attrs.input_shape
output_shape = get_strided_slice_out_shape(attrs)
dtype = attrs.input_type
return NodeType({cls.input_list[0]: TensorValue(TensorType(dtype, input_shape))},
TensorValue(TensorType(dtype, tuple(output_shape))))
@classmethod
@type_check_operation_arguments(types=[StridedSliceAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: StridedSliceAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.strided_slice_fn(attrs, input_dict[InputName('data')])
@classmethod
[docs]
def quantize(cls, attrs: StridedSliceAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> StridedSliceAttrs:
# Use the input type and quantization
q = keep_input(quantizer_interface, InputName('data'))
fix_output_from_input(quantizer_interface, get_strided_slice_out_shape(attrs))
attrs.input_type = q.type.scalar
return attrs
[docs]
class CastOp(AwesomeOperation[CastAttrs, AwesomeQuantAttrBase]):
[docs]
cast_fn: Callable[[CastAttrs, np.ndarray], np.ndarray] = op_fn.cast
@classmethod
[docs]
def get_type(cls, attrs: Union[CastAttrs, QUANT_ATTRS]) -> NodeType:
in_dtype = attrs.input_type
shape = attrs.input_shape
out_dtype = scalar_type_from_dtype(attrs.out_dtype)
return NodeType({cls.input_list[0]: TensorValue(TensorType(in_dtype, shape))},
TensorValue(TensorType(out_dtype, shape)))
@classmethod
@type_check_operation_arguments(types=[CastAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: CastAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.cast_fn(attrs, input_dict[InputName('data')])
#######################
# COMPOSITE OPERATIONS
#######################
###################
# Add, Activations
###################
[docs]
class AddActivationOp(AwesomeOperation[AddActivationAttrs, AddQuantAttrs]):
"""
The AddActivationOp can only handle the:
* Add + Relu
* Add + Clip
"""
[docs]
add_fn: Callable[[np.ndarray, np.ndarray, Optional[int]], np.ndarray] = op_fn.add
[docs]
relu_fn: Callable[[np.ndarray, int], np.ndarray] = op_fn.relu
[docs]
clip_fn: Callable[[ClipAttrs | ClipQuantAttrs, np.ndarray], np.ndarray] = op_fn.clip
[docs]
requantize_fn: Callable[
[np.ndarray, int, Union[int, np.ndarray], int, bool, str], np.ndarray] = quant_utils.requantize
@classmethod
[docs]
def get_type(cls, attrs: Union[AddActivationAttrs, AddQuantAttrs]) -> NodeType:
if isinstance(attrs, AddActivationAttrs):
in_type = out_type = attrs.add_attrs.scalar_type
else:
in_type = ScalarType.int16 if attrs.input_int16 else ScalarType.int8
out_type = ScalarType.from_numpy(attrs.requant.out_dtype)
assert out_type in (ScalarType.int8, ScalarType.int16, ScalarType.int32)
attrs = attrs.add_attrs if isinstance(attrs, AddActivationAttrs) else attrs
lhs_input_shape = attrs.lhs_input_shape
rhs_input_shape = attrs.rhs_input_shape
output_shape = _get_out_shape_for_op_with_2_inputs(attrs)
return NodeType({cls.input_list[0]: TensorValue(TensorType(in_type, lhs_input_shape)),
cls.input_list[1]: TensorValue(TensorType(in_type, rhs_input_shape))},
TensorValue(TensorType(out_type, output_shape)))
@classmethod
@type_check_operation_arguments(types=[AddActivationAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: AddActivationAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
add_out = cls.add_fn(input_dict[InputName('lhs')], input_dict[InputName('rhs')])
if isinstance(attrs.activ_attrs, ReluAttrs):
return cls.relu_fn(add_out)
elif isinstance(attrs.activ_attrs, ClipAttrs):
return cls.clip_fn(attrs.activ_attrs, add_out)
return add_out
@classmethod
[docs]
def quantize(cls, attrs: AddActivationAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) \
-> Union[AddActivationAttrs, AddQuantAttrs]:
output_shape = _get_out_shape_for_op_with_2_inputs(attrs.add_attrs)
quantization: Optional[QuantResultTensorType] = None
# Detect input's quantization precision
compute_precision = config.quantization_precision.get().to_scalar_type()
use_int16 = compute_precision == ScalarType.int16
lhs_quantization = fix_input(quantizer_interface, compute_precision, InputName('lhs'), config.asymmetry.get())
rhs_quantization = fix_input(quantizer_interface, compute_precision, InputName('rhs'), config.asymmetry.get())
# For Add + Clip, try to remove the clip by adjusting the output quantization if it meets certain conditions.
# If the clip cannot be removed, later quantize_activation will try to convert it to RELU.
if config.quantization_precision.get() in (QuantizationPrecision.INT_8, QuantizationPrecision.INT_16) \
and isinstance(attrs.activ_attrs, ClipAttrs | ClipQuantAttrs) and config.asymmetry.get():
# Try to quantize output with clip range and remove Clip
fp_min = attrs.activ_attrs.a_min
fp_max = attrs.activ_attrs.a_max
# Zero must be inside the clip range
if (fp_min <= 0) and (0 <= fp_max):
clipped_scale = quant_utils.compute_scale(True, 8, fp_min, fp_max)
clipped_zp = quant_utils.compute_zero_point(True, 8, fp_min, fp_max)
clipped_quant = Quantization(clipped_scale, clipped_zp, bits=8, min_val=fp_min, max_val=fp_max)
type_int8 = TensorType(ScalarType.int8, output_shape)
quantization = QuantResultTensorType(type_int8, clipped_quant, RequantMethod.fractional_zero)
attrs.activ_attrs = None
if scalar_is_integral(compute_precision):
# Quantize the operator
# If output quantization was not set, then set it according to calibration results
if quantization is None:
quantization = quantize_output(quantizer_interface, ScalarType.int8, output_shape,
config.asymmetry.get())
input_scales = [lhs_quantization.quant.scale, rhs_quantization.quant.scale]
input_zps = [lhs_quantization.quant.zero_point, rhs_quantization.quant.zero_point]
scale = quantization.quant.scale
zero_point = quantization.quant.zero_point
layer_bits = quantization.quant.bits
assert quantization.type.scalar == ScalarType.int8 # quantize_add_subtract was designed for int8
scales, zp_corr, shift = quant_utils.quantize_add_subtract(False, input_scales, input_zps, scale,
zero_point, layer_bits)
# Ensure that right-shift is nonnegative. Handle a negative shift by reducing the scale and zero point.
if shift < 0:
scale_adjustment = 1 << -shift
adjusted_quant = dataclasses.replace(quantization.quant,
scale=quantization.quant.scale / scale_adjustment,
zero_point=int(quantization.quant.zero_point / scale_adjustment))
quantization = dataclasses.replace(quantization, quant=adjusted_quant)
shift = 0
if not use_int16:
# Requantize to int8 using the calculated shift and zp_corr
requant = requantization.FractionalZeroRequantization(
1, zp_corr, utils.create_and_verify_narrowing(shift, RoundType.TOEVEN, np.int8)
)
output_quantization = quantization
else:
# Convert the int8 requantization to an int16 requantization.
# Subtract 8 from shift, but don't reduce shift below 0.
shift_adjustment = min(shift, 8)
shift -= shift_adjustment
scale *= (1 << shift_adjustment)
zero_point *= (1 << shift_adjustment)
# Create the output quantization and requantization information
quantization_16 = Quantization(scale, zero_point, bits=16,
min_val=quantization.quant.min_val,
max_val=quantization.quant.max_val)
type_16 = TensorType(ScalarType.int16, quantization.type.shape)
output_quantization = QuantResultTensorType(type_16, quantization_16, RequantMethod.fractional_zero)
requant = requantization.FractionalZeroRequantization(1, zp_corr,
utils.create_and_verify_narrowing(shift,
RoundType.TOEVEN,
np.int16))
layer_bits = 16
# Save results for quantized add operator
quantizer_interface.set_chosen_output_quant(TensorValue(output_quantization))
activ_attrs = quant_utils.quantize_activation(attrs.activ_attrs, output_quantization.quant,
compute_precision)
quant_attrs = AddQuantAttrs(attrs.add_attrs.lhs_input_shape, attrs.add_attrs.rhs_input_shape, use_int16,
requant, zero_point, scales[0], scales[1], layer_bits, activ_attrs)
else:
# Save results for floating-point add operator
fix_output(quantizer_interface, ScalarType.bfloat16, output_shape, config.asymmetry.get())
quant_attrs = dataclasses.replace(attrs, add_attrs=dataclasses.replace(attrs.add_attrs,
scalar_type=ScalarType.bfloat16))
return quant_attrs
@classmethod
[docs]
def run_quant(cls, quant_attrs: AddQuantAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> np.ndarray:
# TODO: Multiplication with in_scale in case of const may be done at quantize phase
# The constant will be 32-bits in size when multiplied with the in_scale
output = cls.add_fn(input_dict[InputName('lhs')].astype(np.int32, copy=False) * quant_attrs.lhs_scale,
input_dict[InputName('rhs')].astype(np.int32, copy=False) * quant_attrs.rhs_scale)
quantized_output = requantization.requantize(output, quant_attrs.requant)
if isinstance(quant_attrs.activ_attrs, ReluAttrs | ReluQuantAttrs):
quantized_output = cls.relu_fn(quantized_output, quant_attrs.relu_zero_point)
elif isinstance(quant_attrs.activ_attrs, ClipAttrs | ClipQuantAttrs):
return cls.clip_fn(quant_attrs.activ_attrs, quantized_output)
return quantized_output
[docs]
class ConstantMultiplyAddOp(AddActivationOp):
"""
An add operator fused with multiplication by a scalar constant.
The operator performs the floating-point operation (a*c + b*d), where c and d are scalar constants.
After quantization, it behaves like an add operator. The multiplication is incorporated into the
add operator's requantization.
"""
[docs]
multiply_fn: Callable[[np.ndarray, np.ndarray], np.ndarray] = op_fn.multiply
@classmethod
[docs]
def get_type(cls, attrs: Union[ConstantMultiplyAddAttrs, AddQuantAttrs]) -> NodeType:
if isinstance(attrs, ConstantMultiplyAddAttrs):
data_type = attrs.scalar_type
else:
data_type = ScalarType.int8
lhs_input_shape = attrs.lhs_input_shape
rhs_input_shape = attrs.rhs_input_shape
output_shape = _get_out_shape_for_op_with_2_inputs(attrs)
return NodeType({cls.input_list[0]: TensorValue(TensorType(data_type, lhs_input_shape)),
cls.input_list[1]: TensorValue(TensorType(data_type, rhs_input_shape))},
TensorValue(TensorType(data_type, output_shape)))
@classmethod
@type_check_operation_arguments(types=[ConstantMultiplyAddAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: ConstantMultiplyAddAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
in1_data = cls.multiply_fn(input_dict[InputName('lhs')], attrs.in1_const_attrs.data[0]) \
if attrs.in1_const_attrs is not None else input_dict[InputName('lhs')]
in2_data = cls.multiply_fn(input_dict[InputName('rhs')], attrs.in2_const_attrs.data[0]) \
if attrs.in2_const_attrs is not None else input_dict[InputName('rhs')]
return cls.add_fn(in1_data, in2_data)
@classmethod
[docs]
def quantize(
cls, attrs: ConstantMultiplyAddAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter
) -> Union[AddQuantAttrs, ConstantMultiplyAddAttrs]:
in1_scale_const = attrs.in1_const_attrs.data[0]
in2_scale_const = attrs.in2_const_attrs.data[0] if attrs.in2_const_attrs is not None else 1
output_type = cls.get_type(attrs).output
assert isinstance(output_type, TensorValue)
output_shape = output_type.value.shape
compute_precision = config.quantization_precision.get().to_scalar_type()
if scalar_is_integral(compute_precision):
lhs_quantization = fix_input_to_int8(quantizer_interface, InputName('lhs'), config.asymmetry.get())
rhs_quantization = fix_input_to_int8(quantizer_interface, InputName('rhs'), config.asymmetry.get())
quantization = fix_output_to_int8(quantizer_interface, output_shape, config.asymmetry.get())
input_scales = [lhs_quantization.quant.scale, rhs_quantization.quant.scale]
input_zps = [lhs_quantization.quant.zero_point, rhs_quantization.quant.zero_point]
scale = quantization.quant.scale
zero_point = quantization.quant.zero_point
layer_bits = quantization.quant.bits
scales, zp_corr, shift = quant_utils.quantize_add_subtract(
False, input_scales, input_zps, scale, zero_point, layer_bits,
in1_scale_const=in1_scale_const, in2_scale_const=in2_scale_const
)
requant = requantization.FractionalZeroRequantization(
1, zp_corr, utils.create_and_verify_narrowing(shift, RoundType.TOEVEN, np.int8)
)
return AddQuantAttrs(
attrs.lhs_input_shape, attrs.rhs_input_shape, False,
requant, 0, scales[0], scales[1], layer_bits
)
else:
# bfloat16
fix_input(quantizer_interface, compute_precision, InputName('lhs'), config.asymmetry.get())
fix_input(quantizer_interface, compute_precision, InputName('rhs'), config.asymmetry.get())
quantization = QuantResultTensorType(TensorType(compute_precision, output_shape), None, None)
quantizer_interface.set_chosen_output_quant(TensorValue(quantization))
in1_const_attrs = in2_const_attrs = None
if attrs.in1_const_attrs:
in1_const_attrs = ConstantAttrs(attrs.in1_const_attrs.data.astype(np.dtype(bfloat16)))
if attrs.in2_const_attrs:
in2_const_attrs = ConstantAttrs(attrs.in2_const_attrs.data.astype(np.dtype(bfloat16)))
return ConstantMultiplyAddAttrs(
compute_precision, attrs.lhs_input_shape, attrs.rhs_input_shape,
in1_const_attrs, in2_const_attrs
)
################################
# Convolution, Add, Activations
################################
[docs]
class ConvAddActivationOp(AwesomeOperation[ConvAddActivationAttrs, ConvQuantAttrs]):
[docs]
add_fn: Callable[[np.ndarray, np.ndarray, Optional[int]], np.ndarray] = op_fn.add
[docs]
requantize_fn: Callable[[np.ndarray, int, Union[int, np.ndarray], int, bool, str], np.ndarray] = \
quant_utils.requantize
[docs]
relu_fn: Callable[[np.ndarray, int], np.ndarray] = op_fn.relu
[docs]
clip_fn: Callable[[ClipAttrs | ClipQuantAttrs, np.ndarray], np.ndarray] = op_fn.clip
@classmethod
[docs]
def get_type(cls, attrs: ConvAddActivationAttrs | ConvQuantAttrs) -> NodeType:
if isinstance(attrs, ConvAddActivationAttrs):
input_type = output_type = attrs.conv_attrs.input_type
assert input_type in (ScalarType.float32, ScalarType.bfloat16)
else:
input_type = ScalarType.int16 if attrs.input_int16 else ScalarType.int8
output_type = ScalarType.from_numpy(attrs.requant.out_dtype)
return _conv_op_type(attrs, input_type, output_type)
@classmethod
@type_check_operation_arguments(types=[ConvAddActivationAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: ConvAddActivationAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
data = input_dict[InputName('data')]
if not config.fast_mode:
run_mode = RunMode.MLA_MODE
else:
run_mode = RunMode.FAST_MODE
# TODO: always use float_convolution
if data.dtype == bfloat16:
return op_fn.float_convolution(attrs, data, run_mode)
conv_out = op_fn.conv_tensorflow(attrs.conv_attrs, data, attrs.weights_attrs.data)
if attrs.bias_attrs:
conv_out = cls.add_fn(conv_out, attrs.bias_attrs.data, axis=-1)
if attrs.activ_attrs:
if isinstance(attrs.activ_attrs, ReluAttrs | ReluQuantAttrs):
conv_out = cls.relu_fn(conv_out)
elif isinstance(attrs.activ_attrs, ClipAttrs | ClipQuantAttrs):
conv_out = cls.clip_fn(attrs.activ_attrs, conv_out)
return conv_out
@classmethod
[docs]
def quantize(cls, attrs: ConvAddActivationAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter)\
-> ConvAddActivationAttrs | ConvQuantAttrs:
original_output_type = get_expected_tensor_value(cls.get_type(attrs).output)
original_input_type = get_expected_tensor_value(quantizer_interface.get_input_quant()[InputName('data')])
quantization_type = config.quantization_precision.get().to_scalar_type()
# Choose scalar type for the input.
# If possible, use the given input without converting. Otherwise, convert it to quantization_type.
if original_input_type.type.scalar in (ScalarType.int8, ScalarType.int16) \
and quantization_type in (ScalarType.int8, ScalarType.int16):
# All combinations of int8 and int16 input/output are supported. Use the given input type.
input_type = original_input_type.type.scalar
else:
input_type = quantization_type
input_quant = fix_input(quantizer_interface, input_type, InputName('data'), config.asymmetry.get())
if scalar_is_integral(input_type):
# Quantize convolution
assert input_quant.quant is not None
# Prepare bias correction
biascorr_type = BiasCorrectionType.NONE if attrs.bias_attrs is None else config.biascorr_type.get()
match biascorr_type:
case BiasCorrectionType.NONE:
bias_corrector = bias_correction.NullBiasCorrector()
case BiasCorrectionType.REGULAR:
# Check that the observer is set up to do mean estimation
intermediate_distributions = quantizer_interface.get_intermediate_distributions()
assert InputName('mean') in intermediate_distributions
input_mean = intermediate_distributions[InputName('mean')].get_mean().squeeze(axis=0)
assert len(input_mean.shape) == 1
bias_corrector = bias_correction.MeanBiasCorrector(input_mean)
case BiasCorrectionType.ITERATIVE:
layer_inputs = quantizer_interface.get_calibration_data()
assert layer_inputs is not None
input_mean = bias_correction.prepare_input_mean(layer_inputs[InputName('data')],
original_input_type.quant)
bias_corrector = bias_correction.MeanBiasCorrector(input_mean)
case _:
raise ValueError("Unexpected bias correction type")
weight_const_attr = attrs.weights_attrs
bias_const_attr_data = attrs.bias_attrs.data if attrs.bias_attrs else None
# Select quantization method.
use_tflite_quant = config.requantization_mode.get() == RequantizationMode.tflite
if config.output_int32.get():
convolution_precision = quantization_conv.ConvolutionPrecision.sima_int32
requant_method = RequantMethod.arith_folded
elif config.quantization_precision.get().is_int16_precision():
convolution_precision = quantization_conv.ConvolutionPrecision.tflite_int16 if use_tflite_quant\
else quantization_conv.ConvolutionPrecision.sima_int16
requant_method = RequantMethod.fractional_zero if use_tflite_quant else RequantMethod.arith_folded
elif config.quantization_precision.get().is_int8_precision():
convolution_precision = quantization_conv.ConvolutionPrecision.tflite_int8 if use_tflite_quant\
else quantization_conv.ConvolutionPrecision.sima_int8
requant_method = RequantMethod.fractional_zero if use_tflite_quant else RequantMethod.arith_folded
else:
raise ValueError("Unrecognized quantization precision")
# Quantization must adhere to additional restrictions if using relu activation with sima quantization
sima_relu_workaround = isinstance(attrs.activ_attrs, ReluAttrs) and \
convolution_precision in (quantization_conv.ConvolutionPrecision.sima_int8,
quantization_conv.ConvolutionPrecision.sima_int16)
# Choose quantization parameters
quantized_weight, quantized_bias, requant, output_scalar_type, output_quant, msb_left_shift = \
afe.ir.quantization_conv.quantize_convolution_parameters(
input_quant.quant, quantizer_interface.get_output_distribution(),
weight_const_attr.data, bias_const_attr_data,
bias_corrector=bias_corrector,
per_channel=config.per_channel.get(), asymmetry=config.asymmetry.get(),
use_int15=input_type == ScalarType.int16,
precision=convolution_precision,
allow_full_output_precision=config.intermediate_int32.get(),
use_sima_relu_workaround=sima_relu_workaround, error_reporter=error_reporter)
if output_scalar_type != ScalarType.int32:
# When the output type is not int32, this convolution can't be fused with requantization.
# Use the least restrictive requantization method, since the choice will not affect fusion.
requant_method = RequantMethod.fractional_zero
# Quantize activation attributes
activ_attrs = quant_utils.quantize_activation(attrs.activ_attrs, output_quant, output_scalar_type)
# Put all results into the output QuantResultTensorType and Conv2DQuantAttrs
output_qrtt = QuantResultTensorType(type=TensorType(output_scalar_type, original_output_type.shape),
quant=output_quant, requant_method=requant_method)
quantizer_interface.set_chosen_output_quant(TensorValue(output_qrtt))
input_zp = input_quant.quant.zero_point
zero_point = output_quant.zero_point
scale = output_quant.scale
return ConvQuantAttrs(conv_attrs=attrs.conv_attrs, scale=scale,
zero_point=zero_point, input_zp=input_zp, weight_quant_data=quantized_weight,
requant=requant,
bias_quant_data=quantized_bias,
per_channel=config.per_channel.get(), activ_attrs=activ_attrs,
input_int16=input_type == ScalarType.int16, msb_left_shift=msb_left_shift)
else:
# Produce a bfloat16 convolution
assert input_type == ScalarType.bfloat16
output_qrtt = QuantResultTensorType.from_type(TensorType(ScalarType.bfloat16, original_output_type.shape))
quantizer_interface.set_chosen_output_quant(TensorValue(output_qrtt))
if config.quantization_precision.get().is_bfloat16_with_int_weights():
bits = 8 if config.quantization_precision.get().is_bfloat16_with_int8_weights() else 4
# Bfloat16 convolution with int8 or int4 weights
quantized_weight, bias, requant = afe.ir.quantization_conv.get_bfloat16_with_int_weights_quant_params(
attrs=attrs, per_channel=config.per_channel.get(), bits=bits)
return ConvQuantAttrs(conv_attrs=attrs.conv_attrs,
activ_attrs=attrs.activ_attrs,
weight_quant_data=quantized_weight,
bias_quant_data=bias,
requant=requant)
else:
# In the attributes, convert weight type to float32, because Netron doesn't support bfloat16 weight.
# In execution later, weight is converted to bfloat16. Bias is always float32 for bfloat16 execution.
weight_const_attr = ConstantAttrs(attrs.weights_attrs.data.astype(np.float32))
bias_const_attr = ConstantAttrs(attrs.bias_attrs.data.astype(np.float32)) if attrs.bias_attrs is not None\
else None
conv_attr = dataclasses.replace(attrs.conv_attrs, input_type=ScalarType.bfloat16)
match attrs.add_attrs:
case AddAttrs() as a:
add_attr = dataclasses.replace(a, scalar_type=ScalarType.bfloat16)
case BiasAddAttrs() as a:
add_attr = a
case None:
add_attr = None
return ConvAddActivationAttrs(weights_attrs=weight_const_attr,
conv_attrs=conv_attr,
bias_attrs=bias_const_attr,
add_attrs=add_attr,
activ_attrs=attrs.activ_attrs)
@classmethod
[docs]
def run_quant(cls, quant_attrs: ConvQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
data = input_dict[InputName('data')]
# Use accurate convolution algorithm if it is chosen by config flags
# Also use it for int16, because the fast algorithm does not handle int16
if not config.fast_mode or data.dtype == np.int16:
run_mode = RunMode.MLA_MODE
else:
run_mode = RunMode.FAST_MODE
ofm = op_fn.quantized_convolution(quant_attrs, data, run_mode)
return ofm
@classmethod
[docs]
def calibrate(cls, attrs: ConvAddActivationAttrs, calib_attrs: AwesomeCalibAttrs,
input_dict: Dict[InputName, Any], config: RunConfigs) -> Any:
"""
ConvAddActivation calibration method.
Executes default calibration to get results of ConvAdd operation in floating point.
Additionally, update intermediate observers for tracking mean values.
:param attrs: AwesomeAttributes associated with this operation
:param calib_attrs: AwesomeCalibAttrs associated with operation's node.
:param input_dict: Dictionary of names (eg. 'weights' 'data') to numpy arrays
:param config: Parameters controlling how to calibrate.
:return: Output tensor(s) whose type is dependent on the subclass.
"""
# Run default calibration.
outputs = super().calibrate(attrs, calib_attrs, input_dict, config)
# Update observers for intermediate values.
if attrs.bias_attrs is not None:
assert calib_attrs.intermediate_observers
assert ('mean' in calib_attrs.intermediate_observers and
calib_attrs.intermediate_observers['mean'] is not None)
data = input_dict[InputName('data')]
calib_attrs.intermediate_observers['mean'].update(data)
return outputs
######################
# Tuple + Concatenate
######################
[docs]
class TupleConcatenateOp(AwesomeOperation[TupleConcatenateAttrs, ConcatQuantAttrs]):
"""
This composite node reuse ConcatenateOp run, quantize, and run_quant methods
"""
# Tuple can have a various number of inputs so we can't establish input names across all inputs
[docs]
tuple_fn: Callable[[List[np.ndarray]], tuple] = tuple
[docs]
concatenate_op: AwesomeOperation = ConcatenateOp
@classmethod
[docs]
def get_type(cls, attrs: Union[TupleConcatenateAttrs, ConcatQuantAttrs]) -> NodeType:
return cls.concatenate_op.get_type(attrs)
@classmethod
@type_check_operation_arguments(types=[Union[TupleConcatenateAttrs, ConcatenateAttrs], np.ndarray],
dict_mask=[False, True])
[docs]
def run(cls, attrs: Union[TupleConcatenateAttrs, ConcatenateAttrs], input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
if isinstance(attrs, ConcatenateAttrs):
# In bfloat16 case, we will have ConcatenateAttrs returned from quantize, and execute run method.
return cls.concatenate_op.run(attrs, input_dict, config)
else:
return cls.concatenate_op.run(attrs.concat_attrs, input_dict, config)
@classmethod
@type_check_operation_arguments(
types=[TupleConcatenateAttrs, OpQuantInterface, QuantizationConfigs, NodeReporter],
dict_mask=[False, False, False, False])
[docs]
def quantize(cls, attrs: TupleConcatenateAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> ConcatQuantAttrs:
return cls.concatenate_op.quantize(attrs.concat_attrs, quantizer_interface, config, error_reporter)
@classmethod
@type_check_operation_arguments(types=[ConcatQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run_quant(cls, quant_attrs: ConcatQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.concatenate_op.run_quant(quant_attrs, input_dict, config)
##########################
# PARTITIONING OPERATIONS
##########################
[docs]
class ExternalOp(AwesomeOperation[ExternalAttrs, AwesomeQuantAttrBase]):
# external ops can have a various number of inputs so we can't establish the same
# input names across all ExternalAttrs
[docs]
external_fn: Callable[[ExternalAttrs, Dict], Union[np.ndarray, tuple]] = op_fn.external
@classmethod
[docs]
def get_type(cls, attrs: Union[ExternalAttrs, AwesomeQuantAttrBase]) -> NodeType:
assert isinstance(attrs, ExternalAttrs)
return attrs.node_type
@classmethod
@type_check_operation_arguments(types=[ExternalAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: ExternalAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> np.ndarray:
return cls.external_fn(attrs, input_dict)
@classmethod
[docs]
def quantize(cls, attrs: ExternalAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> ExternalAttrs:
# The type is not changed by quantization. Set the input and output types to their original values.
assert list(attrs.node_type.inputs.keys()) == list(quantizer_interface.get_input_quant().keys())
for k, t in attrs.node_type.inputs.items():
quantizer_interface.set_chosen_input_quant(k, map_data_value(QuantResultTensorType.from_type, t))
quantizer_interface.set_chosen_output_quant(map_data_value(QuantResultTensorType.from_type, attrs.node_type.output))
return attrs
@classmethod
[docs]
def get_observed_distribution(cls, calib_attrs: AwesomeCalibAttrs,
inputs: Dict[InputName, QuantizationTensorData]) \
-> Tuple[Optional[ObservedDistribution], Dict[str, ObservedDistribution]]:
return None, {}
#############################
# QNN OPERATIONS
#############################
[docs]
class QNNQuantizeOp(AwesomeOperation[QNNQuantizeAttrs, AwesomeQuantAttrBase]):
[docs]
quant_fn: Callable[[QNNQuantizeAttrs, np.ndarray, np.ndarray, np.ndarray], np.ndarray] = op_fn.qnn_quantize
@classmethod
[docs]
def get_type(cls, attrs: Union[QNNQuantizeAttrs, QUANT_ATTRS]) -> NodeType:
input_data_type = attrs.input_type
out_shape = attrs.input_type.shape
out_data_type = scalar_type_from_dtype(attrs.out_dtype)
return NodeType({cls.input_list[0]: TensorValue(input_data_type)},
TensorValue(TensorType(out_data_type, out_shape)))
# TODO: move the call of quant_fn to run_quant, remove run afterwards
@classmethod
@type_check_operation_arguments(types=[QNNQuantizeAttrs], dict_mask=[False])
[docs]
def run(cls, attrs: QNNQuantizeAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> np.ndarray:
output = cls.quant_fn(attrs, input_dict[InputName('data')], attrs.output_scale, attrs.output_zero_point)
return output.astype(attrs.out_dtype)
[docs]
class RequantizeOp(AwesomeOperation[RequantizeAttrs, RequantizeQuantAttrs]):
@classmethod
[docs]
def get_type(cls, attrs: Union[RequantizeAttrs, RequantizeQuantAttrs]) -> NodeType:
attrs = attrs.attrs if isinstance(attrs, RequantizeQuantAttrs) else attrs
input_type = attrs.input_type
out_shape = attrs.input_type.shape
out_data_type = scalar_type_from_dtype(attrs.out_dtype)
output_type = TensorType(out_data_type, out_shape)
return NodeType({cls.input_list[0]: TensorValue(input_type)}, TensorValue(output_type))
@classmethod
[docs]
def run_quant(cls, quant_attrs: RequantizeQuantAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> np.ndarray:
output = requantization.requantize(input_dict[InputName('data')], quant_attrs.requant)
return output
[docs]
class QNNDequantizeOp(AwesomeOperation[QNNDequantizeAttrs, AwesomeQuantAttrBase]):
[docs]
dequant_fn: Callable[[QNNDequantizeAttrs, np.ndarray, np.ndarray, np.ndarray], np.ndarray] = op_fn.qnn_dequantize
@classmethod
[docs]
def get_type(cls, attrs: Union[QNNDequantizeAttrs, QUANT_ATTRS]) -> NodeType:
input_data_type = attrs.input_type
out_shape = attrs.input_type.shape
return NodeType({cls.input_list[0]: TensorValue(input_data_type)},
TensorValue(TensorType(ScalarType.float32, out_shape)))
# TODO: move the call of dequant_fn to run_quant, remove run afterwards
@classmethod
@type_check_operation_arguments(types=[QNNDequantizeAttrs], dict_mask=[False])
[docs]
def run(cls, attrs: QNNDequantizeAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> np.ndarray:
output = cls.dequant_fn(attrs, input_dict[InputName('data')], attrs.input_scale, attrs.input_zero_point)
return output.astype(Float)
[docs]
class QNNMulOp(AwesomeOperation[AwesomeAttributes, AwesomeQuantAttrBase]):
[docs]
mul_fn: Callable[[AwesomeAttributes, np.ndarray, np.ndarray, float, int, float, int, float, int], np.ndarray] = op_fn.qnn_mul
# TODO: move the call of requant_fn to run_quant, remove run_float afterwards
@classmethod
@type_check_operation_arguments(types=[AwesomeAttributes], dict_mask=[False])
[docs]
def run(cls, attrs: AwesomeAttributes, input_dict: Dict[InputName, Any],
config: RunConfigs) -> np.ndarray:
output = cls.mul_fn(attrs,
input_dict[InputName('lhs')],
input_dict[InputName('rhs')],
input_dict[InputName('lhs_scale')],
input_dict[InputName('lhs_zero_point')],
input_dict[InputName('rhs_scale')],
input_dict[InputName('rhs_zero_point')],
input_dict[InputName('output_scale')],
input_dict[InputName('output_zero_point')])
return output.astype(QuantizedTensor)
###################
# CUSTOM OPERATION
###################
[docs]
class CustomOp(AwesomeOperation[CustomOpAttrs, AwesomeQuantAttrBase]):
# Custom operations can have a variable number of inputs.
[docs]
custom_op_fn: Callable[[CustomOpAttrs, Dict[InputName, np.ndarray]], np.ndarray] = op_fn.execute_custom_op
[docs]
quant_fn: Callable[[np.ndarray, float, int, int], np.ndarray] = quant_utils.linear_quantize
[docs]
dequant_fn: Callable[[np.ndarray, float, int], np.ndarray] = quant_utils.dequantize
@classmethod
@type_check_operation_arguments(types=[CustomOpAttrs], dict_mask=[False])
[docs]
def run(cls, attrs: CustomOpAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> Union[np.ndarray, tuple]:
op_fn.init_custom_op(attrs, input_dict, attrs.output_types[0].shape)
return cls.custom_op_fn(attrs, input_dict)
@classmethod
[docs]
def quantize(cls, attrs: CustomOpAttrs, calib_attrs: AwesomeCalibAttrs,
config: QuantizationConfigs, error_reporter: NodeReporter) -> CustomOpQuantAttrs:
input_scales: List[List[float]] = list()
input_zps: List[List[int]] = list()
for input_quant in calib_attrs.input_quant.values():
scales, zero_points, _, _, _ = quant_utils.quantization_data_value_to_output_list(
get_data_value_quant_result_scale_with_dummy(input_quant))
input_scales.append(scales)
input_zps.append(zero_points)
node_scales, node_zps, layer_bits, _, _ = quant_utils.quantization_data_value_to_output_list(
get_data_value_quant_result_scale_with_dummy(calib_attrs.quant).quant)
return CustomOpQuantAttrs(attrs, input_scales=input_scales, input_zps=input_zps,
node_scales=node_scales, node_zps=node_zps, layer_bits=layer_bits)
@classmethod
[docs]
def run_quant(cls, quant_attrs: CustomOpQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
# Dequantize the input tensors if needed
_custom_op_attrs = quant_attrs.custom_op_attrs.custom_op_attrs
assert isinstance(_custom_op_attrs, Dict)
assert "do_dequantize" in _custom_op_attrs
for i, (in_name, do_dequantize) in enumerate(zip(input_dict.keys(), _custom_op_attrs["do_dequantize"])):
if do_dequantize is False:
continue
scale = quant_attrs.input_scales[i][0]
zp = quant_attrs.input_zps[i][0]
input_dict[InputName(in_name)] = cls.dequant_fn(input_dict[InputName(in_name)], 1. / scale, zp)
output = cls.custom_op_fn(quant_attrs.custom_op_attrs, input_dict, config)
# Quantize the output tensor if needed
assert "do_quantize" in _custom_op_attrs
if _custom_op_attrs["do_quantize"][0] is True:
scale = quant_attrs.node_scales[0]
zp = quant_attrs.node_zps[0]
output = cls.quant_fn(output, scale, zp, quant_attrs.layer_bits[0])
return output
[docs]
class LeakyReluCompositeOp(AwesomeOperation[LeakyReluAttrs, LeakyReluCompositeQuantAttrs]):
@classmethod
[docs]
def get_type(cls, attrs: Union[LeakyReluAttrs, LeakyReluCompositeQuantAttrs]) -> NodeType:
if isinstance(attrs, LeakyReluAttrs):
shape = attrs.input_shape
in_data_type = out_data_type = ScalarType.float32
else:
shape = attrs.attrs.input_shape
is_udf = attrs.udf_quant_attrs is not None
if is_udf:
in_data_type = ScalarType.int16 if attrs.udf_quant_attrs.input_int16 else ScalarType.int8
out_data_type = ScalarType.from_numpy(attrs.udf_quant_attrs.requant.out_dtype)
else:
in_data_type = out_data_type = ScalarType.int8
return NodeType({cls.input_list[0]: TensorValue(TensorType(scalar=in_data_type, shape=shape))},
TensorValue(TensorType(scalar=out_data_type, shape=shape)))
@classmethod
@type_check_operation_arguments(types=[LeakyReluAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: LeakyReluAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return op_fn.leaky_relu(input_dict[InputName('data')], attrs.alpha)
@classmethod
[docs]
def quantize(cls, attrs: LeakyReluAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> LeakyReluCompositeQuantAttrs:
input_name = InputName('data')
input_precision = get_expected_tensor_value(quantizer_interface.get_input_quant()[input_name]).type
scalar_type = ScalarType.int16 if (input_precision.scalar == ScalarType.int16 and
config.quantization_precision.get().is_int16_precision()) else \
ScalarType.int8
input_quantization = fix_input(quantizer_interface, scalar_type, input_name, config.asymmetry.get()).quant
output_quantization = fix_output(quantizer_interface, scalar_type, input_precision.shape,
config.asymmetry.get()).quant
input_int16 = scalar_type == ScalarType.int16
leaky_relu_quant_attrs: Optional[LeakyReluQuantAttrs] = None
udf_quant_attrs: Optional[UDFQuantAttrs] = None
if config.leaky_relu_uses_udf.get():
# Generate UDF LUT
def leaky_relu_scalar(x: float) -> float:
return float(op_fn.leaky_relu(np.array(x, dtype=np.float32), attrs.alpha).item())
input_type = output_type = scalar_type.numpy_type()
lookup_table = quant_utils.quantize_udf(
input_quantization, output_quantization, input_type, output_type, leaky_relu_scalar
)
intermediate_type = np.int32 if config.intermediate_int32.get() else np.int16
out_dtype = intermediate_type if input_int16 else np.int8
requant = requantization.narrowing_requantization(shift=0, rounding=RoundType.TOEVEN,
out_dtype=out_dtype)
udf_quant_attrs = UDFQuantAttrs(lookup_table=lookup_table, attrs=attrs, input_int16=input_int16,
requant=requant)
else:
# Create LeakyReluQuantAttrs for arithmetic implementation of leaky relu
bits = input_quantization.bits
zero_point = input_quantization.zero_point
alpha, right_shift = quant_utils.quantize_prelu(bits, attrs.alpha)
leaky_relu_quant_attrs = LeakyReluQuantAttrs(alpha, attrs.input_shape, right_shift, zero_point, bits,
RoundType.TOEVEN)
return LeakyReluCompositeQuantAttrs(attrs, config.leaky_relu_uses_udf.get(), leaky_relu_quant_attrs,
udf_quant_attrs)
@classmethod
[docs]
def run_quant(cls, quant_attrs: LeakyReluCompositeQuantAttrs,
input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
data = input_dict[InputName('data')]
assert data.dtype in (np.int8, np.int16)
if quant_attrs.leaky_relu_uses_udf:
assert quant_attrs.udf_quant_attrs is not None
lut = quant_attrs.udf_quant_attrs.lookup_table
return ideal_udf(data, lut)
else:
assert quant_attrs.leaky_relu_quant_attrs is not None
# Positive part
output_p = op_fn.relu(data, quant_attrs.leaky_relu_quant_attrs.zero_point)
# Negative part
output_n = quant_attrs.leaky_relu_quant_attrs.alpha * (data.astype(np.int32) - output_p)
quantized_output_n = quant_utils.requantize(
output_n, quant_attrs.leaky_relu_quant_attrs.bits, quant_attrs.leaky_relu_quant_attrs.right_shift,
zp=None, rounding_type=quant_attrs.leaky_relu_quant_attrs.rounding_type)
# Complete output
quantized_output = output_p + quantized_output_n
return quantized_output
[docs]
class ReluOp(AwesomeOperation[ReluAttrs, ReluQuantAttrs]):
[docs]
relu_fn: Callable[[np.ndarray, int], np.ndarray] = op_fn.relu
@classmethod
[docs]
def get_type(cls, attrs: Union[ReluAttrs, ReluQuantAttrs]) -> NodeType:
data_type = attrs.scalar_type if isinstance(attrs, ReluAttrs) else ScalarType.int8
shape = attrs.input_shape
return NodeType({cls.input_list[0]: TensorValue(TensorType(data_type, shape))},
TensorValue(TensorType(data_type, shape)))
@classmethod
@type_check_operation_arguments(types=[ReluAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: ReluAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.relu_fn(input_dict[InputName('data')])
@classmethod
[docs]
def quantize(cls, attrs: ReluAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> Union[ReluAttrs, ReluQuantAttrs]:
compute_precision = config.quantization_precision.get().to_scalar_type()
input_scalar_type = ScalarType.bfloat16 if scalar_is_floating(compute_precision) else ScalarType.int8
input_quant = fix_input(quantizer_interface, input_scalar_type, InputName('data'), config.asymmetry.get())
fix_output_from_input(quantizer_interface, input_quant.type.shape)
if scalar_is_floating(compute_precision):
return dataclasses.replace(attrs, scalar_type=input_scalar_type)
else:
return ReluQuantAttrs(attrs.input_shape, input_quant.quant.zero_point)
@classmethod
[docs]
def run_quant(cls, quant_attrs: ReluQuantAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> np.ndarray:
return cls.relu_fn(input_dict[InputName('data')], quant_attrs.zero_point)
[docs]
class ClipOp(AwesomeOperation[ClipAttrs, ClipQuantAttrs]):
[docs]
clip_fn: Callable[[ClipAttrs | ClipQuantAttrs, np.ndarray], np.ndarray] = op_fn.clip
@classmethod
[docs]
def get_type(cls, attrs: Union[ClipAttrs, ClipQuantAttrs]) -> NodeType:
data_type = attrs.scalar_type
shape = attrs.shape
return NodeType({cls.input_list[0]: TensorValue(TensorType(data_type, shape))},
TensorValue(TensorType(data_type, shape)))
@classmethod
[docs]
def run(cls, attrs: ClipAttrs, input_dict: Dict[InputName, Any], config: RunConfigs) -> Any:
return cls.clip_fn(attrs, input_dict[cls.input_list[0]])
@classmethod
[docs]
def quantize(cls, attrs: ClipAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> ClipAttrs | ClipQuantAttrs:
compute_precision = config.quantization_precision.get().to_scalar_type()
input_scalar_type = ScalarType.bfloat16 if scalar_is_floating(compute_precision) else ScalarType.int8
input_quant = fix_input(quantizer_interface, input_scalar_type, InputName('data'), config.asymmetry.get())
fix_output_from_input(quantizer_interface, input_quant.type.shape)
if scalar_is_floating(compute_precision):
return dataclasses.replace(attrs, scalar_type=input_scalar_type)
else:
return quant_utils.quantize_clip_attrs(attrs, input_scalar_type, input_quant.quant)
@classmethod
[docs]
def run_quant(cls, attrs: ClipAttrs | ClipQuantAttrs, input_dict: Dict[InputName, Any],
config: RunConfigs) -> np.ndarray:
return cls.clip_fn(attrs, input_dict[InputName('data')])
class _BatchMatmulBaseOp(AwesomeOperation[BatchMatmulAttrs, BatchMatmulQuantAttrs]):
"""
Base class implementing batch matmul operation. Subclasses need to override following:
- cls.input_list: ClassVar[List[InputName]]: List of input names used in derived operation
- cls._get_inputs() method: Method for extracting the values used in batch matmul operation
from a dictionary of input values.
Current implementation assumes one or two inputs are used in batch matmul operation. In case of
single input operation, same input is used as both first and second argument to the batch matmul
operation.
"""
@classmethod
def get_type(cls, attrs: Union[BatchMatmulAttrs, BatchMatmulQuantAttrs]) -> NodeType:
assert len(cls.input_list) in (1, 2)
if isinstance(attrs, BatchMatmulAttrs):
in_type = out_type = attrs.scalar_type
else:
assert isinstance(attrs, BatchMatmulQuantAttrs)
in_type = ScalarType.int8
out_type = ScalarType.from_numpy(attrs.requant.out_dtype)
attrs = attrs.attrs
return NodeType(
{
name: TensorValue(TensorType(in_type, shape))
for name, shape in zip(cls.input_list, attrs.input_shapes)
},
TensorValue(TensorType(out_type, attrs.get_output_shape()))
)
@classmethod
def _get_inputs(cls, input_dict: Dict[InputName, np.ndarray]) -> Tuple[np.ndarray, np.ndarray]:
raise NotImplementedError(
"Method _get_inputs needs to be overridden by classes that inherit _BatchMatmulBaseOp."
)
@classmethod
@type_check_operation_arguments(types=[BatchMatmulAttrs, np.ndarray], dict_mask=[False, True])
def run(cls, attrs: BatchMatmulAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
lhs, rhs = cls._get_inputs(input_dict)
return op_fn.batch_matmul(lhs, rhs, attrs)
@classmethod
def quantize(cls, attrs: BatchMatmulAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) \
-> Union[BatchMatmulAttrs, BatchMatmulQuantAttrs]:
assert len(cls.input_list) in (1, 2)
compute_precision = config.quantization_precision.get().to_scalar_type()
if scalar_is_integral(compute_precision):
input_type = ScalarType.int8
output_type = config.quantization_precision.get().to_expected_int_scalar_type()
else:
input_type = output_type = compute_precision
lhs_quant = fix_input(quantizer_interface, input_type, cls.input_list[0], config.asymmetry.get())
rhs_quant = fix_input(quantizer_interface, input_type, cls.input_list[1], config.asymmetry.get()) \
if len(cls.input_list) > 1 else lhs_quant
output_shape = attrs.get_output_shape()
quantization = quantize_output(quantizer_interface, output_type, output_shape,
config.asymmetry.get())
if scalar_is_integral(compute_precision):
intrinsic_shift, requant, new_output_quant = quant_utils.quantize_batch_matmul(
lhs_quant.quant, rhs_quant.quant, quantization.quant)
# Determine attributes and quantization using the result of quantize_multiply
output_type = TensorType(ScalarType.from_numpy(requant.out_dtype), output_shape)
quantization = QuantResultTensorType(output_type, new_output_quant, RequantMethod.fractional_zero)
quantizer_interface.set_chosen_output_quant(TensorValue(quantization))
return BatchMatmulQuantAttrs(attrs, lhs_quant.quant.zero_point, rhs_quant.quant.zero_point, requant,
intrinsic_shift)
else:
quantizer_interface.set_chosen_output_quant(TensorValue(quantization))
return dataclasses.replace(attrs, scalar_type=input_type)
@classmethod
def run_quant(cls, quant_attrs: BatchMatmulQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
lhs, rhs = cls._get_inputs(input_dict)
return op_fn.batch_matmul(lhs, rhs, quant_attrs)
[docs]
class BatchMatmulOp(_BatchMatmulBaseOp):
"""
Standard batch matmul operator where arguments to batch matmul operation are outputs of two different nodes.
"""
@classmethod
def _get_inputs(cls, input_dict: Dict[InputName, np.ndarray]) -> Tuple[np.ndarray, np.ndarray]:
assert all([name in input_dict for name in cls.input_list]) and len(input_dict) == 2
lhs = input_dict[cls.input_list[0]]
rhs = input_dict[cls.input_list[1]]
return lhs, rhs
[docs]
class UnaryBatchMatmulOp(_BatchMatmulBaseOp):
"""
Special case of batch matmul operator where both arguments to batch matmul operation are output of a same node.
"""
@classmethod
def _get_inputs(cls, input_dict: Dict[InputName, np.ndarray]) -> Tuple[np.ndarray, np.ndarray]:
assert all([name in input_dict for name in cls.input_list]) and len(input_dict) == 1
input_data = input_dict[cls.input_list[0]]
return input_data, input_data
[docs]
class LayerNormOp(AwesomeOperation[LayerNormAttrs, LayerNormQuantAttrs]):
[docs]
layer_norm_fn: Callable[[LayerNormAttrs, np.ndarray], np.ndarray] = op_fn.layer_norm
@classmethod
[docs]
def get_type(cls, attrs: LayerNormAttrs | LayerNormQuantAttrs) -> NodeType:
scalar_type = attrs.scalar_type if isinstance(attrs, LayerNormAttrs) else ScalarType.int8
tensor_type = TensorType(scalar_type, attrs.input_shape)
return _unary_op_type(cls.input_list, tensor_type)
@classmethod
@type_check_operation_arguments(types=[LayerNormAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: LayerNormAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.layer_norm_fn(attrs, input_dict[InputName('data')])
@classmethod
@type_check_operation_arguments(types=[LayerNormAttrs, OpQuantInterface, QuantizationConfigs],
dict_mask=[False, False, False])
[docs]
def quantize(cls, attrs: LayerNormAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> LayerNormAttrs | LayerNormQuantAttrs:
compute_precision = config.quantization_precision.get().to_scalar_type()
if scalar_is_integral(compute_precision):
scalar_type = ScalarType.int8
else:
scalar_type = compute_precision
input_quant = fix_input(quantizer_interface, scalar_type, InputName('data'), config.asymmetry.get())
output_quant = fix_output(quantizer_interface, scalar_type, attrs.input_shape, config.asymmetry.get())
if scalar_is_integral(compute_precision):
intermediate_min_max = get_intermediate_min_max(quantizer_interface)
return quant_utils.quantize_layer_norm(attrs, input_quant.quant, output_quant.quant,
intermediate_min_max)
else:
return dataclasses.replace(attrs, scalar_type=scalar_type)
@classmethod
@type_check_operation_arguments(types=[LayerNormQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run_quant(cls, quant_attrs: LayerNormQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
return cls.layer_norm_fn(quant_attrs, input_dict[InputName('data')])
@classmethod
[docs]
def calibrate(cls, attrs: AWESOME_ATTRS, calib_attrs: AwesomeCalibAttrs,
input_dict: Dict[InputName, Any], config: RunConfigs) -> Any:
"""
Layer Norm calibration method.
Executes default calibration to get results of LN operation in floating point.
Additionally, calculate intermediate results and update the observers for intermediate
values.
:param attrs: AwesomeAttributes associated with this operation
:param calib_attrs: AwesomeCalibAttrs associated with operation's node.
:param input_dict: Dictionary of names (eg. 'weights' 'data') to numpy arrays
:param config: Parameters controlling how to calibrate.
:return: Output tensor(s) whose type is dependent on the subclass.
"""
# Run default calibration.
outputs = super().calibrate(attrs, calib_attrs, input_dict, config)
# Calculate intermediate values.
# m = ReduceMean(input, axis, keepdims=True)
# sq_mean = (input - m) ** 2
data = input_dict[InputName('data')]
sq_mean_fun = lambda x: (x - np.mean(x, axis=attrs.axis, keepdims=True)) ** 2
sq_mean_output = sq_mean_fun(data)
# var = ReduceMean((input - m) ** 2, axis, keepdims=True).
var_fun = lambda x: np.mean(x, axis=attrs.axis, keepdims=True)
var_output = var_fun(sq_mean_output)
# Update observers for intermediate values.
assert calib_attrs.intermediate_observers
assert ('var' in calib_attrs.intermediate_observers and
calib_attrs.intermediate_observers['var'] is not None)
calib_attrs.intermediate_observers['var'].update(var_output.astype(np.float32))
return outputs
[docs]
class InstanceNormOp(AwesomeOperation[InstanceNormAttrs, InstanceNormQuantAttrs]):
[docs]
instance_norm_fn: Callable[[np.ndarray], np.ndarray] = op_fn.instance_norm
@classmethod
[docs]
def get_type(cls, attrs: InstanceNormAttrs | InstanceNormQuantAttrs) -> NodeType:
if isinstance(attrs, InstanceNormAttrs):
scalar_type = attrs.scalar_type
else:
assert isinstance(attrs, InstanceNormQuantAttrs)
attrs = attrs.attrs
scalar_type = ScalarType.int8
input_data_shape = attrs.input_data_shape
mean_shape = attrs.mean_shape
variance_shape = attrs.variance_shape
return NodeType({cls.input_list[0]: TensorValue(TensorType(scalar_type, input_data_shape)),
cls.input_list[1]: TensorValue(TensorType(scalar_type, mean_shape)),
cls.input_list[2]: TensorValue(TensorType(scalar_type, variance_shape))},
TensorValue(TensorType(scalar_type, input_data_shape)))
@classmethod
@type_check_operation_arguments(types=[InstanceNormAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: InstanceNormAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
data = input_dict[InputName('data')]
mean = input_dict[InputName('mean')]
variance = input_dict[InputName('variance')]
return cls.instance_norm_fn(data, mean, variance, attrs)
@classmethod
@type_check_operation_arguments(types=[InstanceNormAttrs, OpQuantInterface, QuantizationConfigs],
dict_mask=[False, False, False])
[docs]
def quantize(cls, attrs: InstanceNormAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) \
-> InstanceNormAttrs | InstanceNormQuantAttrs:
compute_precision = config.quantization_precision.get().to_scalar_type()
scalar_type = ScalarType.int8 if scalar_is_integral(compute_precision) else compute_precision
input_data_quant = fix_input(quantizer_interface, scalar_type, InputName('data'), config.asymmetry.get())
mean_quant = fix_input(quantizer_interface, scalar_type, InputName('mean'), config.asymmetry.get())
variance_quant = fix_input(quantizer_interface, scalar_type, InputName('variance'), config.asymmetry.get())
output_quant = fix_output(quantizer_interface, scalar_type, attrs.input_data_shape,
config.asymmetry.get())
if scalar_is_integral(compute_precision):
return quant_utils.quantize_instance_norm(attrs, input_data_quant.quant, mean_quant.quant,
variance_quant.quant, output_quant.quant)
else:
return dataclasses.replace(attrs, scalar_type=scalar_type)
@classmethod
@type_check_operation_arguments(types=[InstanceNormQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run_quant(cls, quant_attrs: InstanceNormQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
data = input_dict[InputName('data')]
mean = input_dict[InputName('mean')]
variance = input_dict[InputName('variance')]
return cls.instance_norm_fn(data, mean, variance, quant_attrs)
[docs]
class RMSNormOp(AwesomeOperation[RMSNormAttrs, RMSNormQuantAttrs]):
[docs]
rms_norm_fn: Callable[[RMSNormAttrs, np.ndarray], np.ndarray] = op_fn.rms_norm
@classmethod
[docs]
def get_type(cls, attrs: Union[RMSNormAttrs, RMSNormQuantAttrs]) -> NodeType:
scalar_type = attrs.scalar_type if isinstance(attrs, RMSNormAttrs) else ScalarType.int8
tensor_type = TensorType(scalar_type, attrs.input_shape)
return _unary_op_type(cls.input_list, tensor_type)
@classmethod
@type_check_operation_arguments(types=[RMSNormAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: RMSNormAttrs, input_dict: Dict[InputName, np.ndarray], config: RunConfigs) -> np.ndarray:
return cls.rms_norm_fn(input_dict[InputName('data')], attrs)
@classmethod
@type_check_operation_arguments(types=[RMSNormAttrs, OpQuantInterface, QuantizationConfigs],
dict_mask=[False, False, False])
[docs]
def quantize(cls, attrs: RMSNormAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> Union[RMSNormAttrs, RMSNormQuantAttrs]:
compute_precision = config.quantization_precision.get().to_scalar_type()
if scalar_is_integral(compute_precision):
input_quant = fix_input_to_int8(quantizer_interface, InputName('data'), config.asymmetry.get())
output_quant = fix_output_to_int8(quantizer_interface, attrs.input_shape, config.asymmetry.get())
# Setting it always to True, as it will yield better performance
enable_lut_int16 = True
intermediate_min_max = get_intermediate_min_max(quantizer_interface)
return quant_utils.quantize_rms_norm(attrs, input_quant.quant, output_quant.quant, intermediate_min_max,
enable_lut_int16)
else:
fix_input(quantizer_interface, compute_precision, InputName('data'), config.asymmetry.get())
output_shape = get_expected_tensor_value(cls.get_type(attrs).output).shape
fix_output_from_input(quantizer_interface, output_shape)
return dataclasses.replace(attrs, scalar_type=compute_precision)
@classmethod
@type_check_operation_arguments(types=[RMSNormQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run_quant(cls, quant_attrs: RMSNormQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
data = _cast_to_quant_tensor_new(input_dict[InputName('data')])
return cls.rms_norm_fn(data, quant_attrs)
@classmethod
[docs]
def calibrate(cls, attrs: RMSNormAttrs, calib_attrs: AwesomeCalibAttrs,
input_dict: Dict[InputName, Any], config: RunConfigs) -> Any:
"""
RMS Norm calibration method.
Executes default calibration to get results of RMSNorm operation in floating point.
Additionally, calculate intermediate results and update the observers for intermediate
values.
"""
# Run default calibration.
outputs = super().calibrate(attrs, calib_attrs, input_dict, config)
# Calculate intermediate values, ReduceMean(input * input, axis=-1, keepdims=True)
data = input_dict[InputName('data')]
reduce_mean_f = lambda x: np.mean(x * x, axis=-1, keepdims=True)
reduce_mean_output = reduce_mean_f(data)
# Update observers for intermediate values.
assert calib_attrs.intermediate_observers
assert ('reduce_mean' in calib_attrs.intermediate_observers and
calib_attrs.intermediate_observers['reduce_mean'] is not None)
calib_attrs.intermediate_observers['reduce_mean'].update(reduce_mean_output.astype(np.float32))
return outputs
[docs]
class SliceConcatOp(AwesomeOperation[SliceConcatAttrs, SliceConcatQuantAttrs]):
"""
This composite node uses infrastructure from StridedSliceOp and ConcatenateOp run.
"""
@classmethod
[docs]
def get_type(cls, attrs: Union[SliceConcatAttrs, SliceConcatQuantAttrs]) -> NodeType:
return NodeType(StridedSliceOp.get_type(attrs.slice_attrs[0]).inputs,
TupleConcatenateOp.get_type(attrs.tuple_concat_attrs).output)
@classmethod
@type_check_operation_arguments(types=[SliceConcatAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run(cls, attrs: SliceConcatAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
input_data = input_dict[cls.input_list[0]]
slice_outputs = [op_fn.strided_slice(slice_attrs, input_data) for slice_attrs in attrs.slice_attrs]
return op_fn.concatenate(attrs.tuple_concat_attrs.concat_attrs, slice_outputs)
@classmethod
@type_check_operation_arguments(
types=[SliceConcatAttrs, OpQuantInterface, QuantizationConfigs, NodeReporter],
dict_mask=[False, False, False, False])
[docs]
def quantize(cls, attrs: SliceConcatAttrs, quantizer_interface: OpQuantInterface,
config: QuantizationConfigs, error_reporter: NodeReporter) -> Union[SliceConcatAttrs, SliceConcatQuantAttrs]:
compute_precision = config.quantization_precision.get().to_scalar_type()
output_shape_list = list(get_strided_slice_out_shape(attrs.slice_attrs[0]))
output_shape_list[attrs.tuple_concat_attrs.concat_attrs.axis] *= len(attrs.slice_attrs)
if scalar_is_integral(compute_precision):
# Use the input type and quantization
q = fix_input_to_int8(quantizer_interface, InputName('data'), config.asymmetry.get())
output_quant = fix_output_from_input(quantizer_interface, tuple(output_shape_list))
quant_slice_attrs = [dataclasses.replace(slice_attrs, input_type=q.type.scalar)
for slice_attrs in attrs.slice_attrs]
concat_quant_attrs = ConcatQuantAttrs(
attrs=attrs.tuple_concat_attrs.concat_attrs,
requants=[
requantization.FractionalZeroRequantization(
1, 0, utils.create_and_verify_narrowing(0, RoundType.TOEVEN, np.int8)
) for _ in range(len(attrs.slice_attrs))
],
layer_bits=[8],
input_scales=[q.quant.scale for _ in range(len(attrs.slice_attrs))],
node_scales=[output_quant.quant.scale],
node_zps=[output_quant.quant.zero_point]
)
return SliceConcatQuantAttrs(quant_slice_attrs, concat_quant_attrs)
else: # bfloat16
fix_input(quantizer_interface, compute_precision, InputName('data'), config.asymmetry.get())
fix_output_from_input(quantizer_interface, tuple(output_shape_list))
slice_attrs = attrs.slice_attrs
tuple_attrs = attrs.tuple_concat_attrs.tuple_attrs
concat_attrs = dataclasses.replace(attrs.tuple_concat_attrs.concat_attrs, scalar_type=compute_precision)
return SliceConcatAttrs(slice_attrs, TupleConcatenateAttrs(tuple_attrs, concat_attrs))
@classmethod
@type_check_operation_arguments(types=[SliceConcatQuantAttrs, np.ndarray], dict_mask=[False, True])
[docs]
def run_quant(cls, quant_attrs: SliceConcatQuantAttrs, input_dict: Dict[InputName, np.ndarray],
config: RunConfigs) -> np.ndarray:
input_data = input_dict[cls.input_list[0]]
slice_output_dict = {idx: op_fn.strided_slice(slice_attrs, input_data)
for idx, slice_attrs in enumerate(quant_attrs.slice_attrs)}
return ConcatenateOp.run_quant(quant_attrs.tuple_concat_attrs, slice_output_dict, config)