Source code for afe.ir.quantization_interface

#########################################################
# Copyright (C) 2023 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Christopher Rodrigues
#########################################################
"""
Classes and functions that are used in the interface
between operator code and the quantization algorithm.

Data structures in this file are temporary data that is
only used during Quantization.

Calibration and quantization information is available for
some tensors in a network, as explained below.

Calibration is passed using ObservedDistribution objects.
Calibration is determined for the output of all nodes
satisfying node_uses_observer, regardless of whether
they are quantized.

Quantization information is passed using the quant field
of QuantResultTensorType.  Quantization information is
included only for quantized integer tensors that were
floating-point before quantization; otherwise, it is None.
As a corollary, if a tensor's type was not changed by
quantization, then its quantization information will be None.
"""
import dataclasses
import math
from dataclasses import dataclass
from typing import Mapping, Optional, Dict, Tuple, List

import numpy as np

from afe.ir.attributes import ObservedDistribution, QuantResultTensorType
from afe.ir.defines import (
    InputName, DataValue, TensorValue, get_expected_tensor_value, Quantization,
    RequantMethod, NodeAssociatedValue
)
from afe.ir.tensor_type import ScalarType, TensorType, scalar_is_integral, scalar_is_floating, scalar_byte_size


@dataclass
class _QuantizeOpData:
    """
    Data describing the quantization and type of a node's input and output before
    and after quantization.  It is used while deciding how to quantize a node.

    The chosen quantizations are initialized to None for each input and output
    before the node is quantized.  They are assigned the node's quantization when
    the node is quantized.

    Attributes:
        input_distributions: Value distributions of the node's inputs, as
           observed during calibration.  None for inputs where no observer was used.
        output_distribution: Value distribution of the node's output, as observed
           during calibration.  None if no observer was used for the output.
        input_quant: Quantization of the node's inputs.  This is the quantization
           of the output of the nodes that compute this node's input.
        placeholder_quant: Quantization of the node's value, if the node is a
           placeholder.  None otherwise.
        calibration_inputs: Inputs that were used during calibration for a node.
            It is meant to be used if Ada Round setting is on, None otherwise.
        chosen_input_quant: The quantization of the node's inputs.  This is the
           quantization that the node assumes it receives for its inputs.  If
           it does not the quantization in input_quant, that means the quantization
           algorithm should insert a node to convert the data.
        chosen_output_quant: The quantization of the node's output.  This is the
           quantization of the node's output.
    """
    # Read-only fields
    input_distributions: Mapping[InputName, Optional[ObservedDistribution]]
    intermediate_distributions: Mapping[InputName, Optional[ObservedDistribution]]
    output_distribution: Optional[ObservedDistribution]
    input_quant: Mapping[InputName, DataValue[QuantResultTensorType]]
    placeholder_quant: Optional[DataValue[QuantResultTensorType]]
    calibration_inputs: Optional[Dict[InputName, List[np.ndarray]]]

    # Write-only for OpQuantInterface
    # Read-only for OpQuantResult
    chosen_input_quant: Dict[InputName, Optional[DataValue[QuantResultTensorType]]]
    chosen_output_quant: Optional[DataValue[QuantResultTensorType]]


[docs] class OpQuantInterface: """ Quantization-related properties of a node's interface before and after quantization, for use when quantizing the node. An operator's quantize method may call the "get" methods to read calibration information about its inputs and output, and quantization information about its input. An operator must call set_chosen_input_quant and set_chosen_output_quant to set the quantization that the quantized operator uses at each input and output. If the operator uses an input quantization that is different from the input's quantization as returned by get_input_quant, the quantization algorithm will cast the input. """ _data: _QuantizeOpData def __init__(self, data: _QuantizeOpData): self._data = data
[docs] def get_input_quant(self) -> Mapping[InputName, DataValue[QuantResultTensorType]]: return self._data.input_quant
[docs] def get_placeholder_quant(self) -> Optional[DataValue[QuantResultTensorType]]: return self._data.placeholder_quant
[docs] def get_input_distributions(self) -> Mapping[InputName, Optional[ObservedDistribution]]: return self._data.input_distributions
[docs] def get_intermediate_distributions(self) -> Mapping[InputName, Optional[ObservedDistribution]]: return self._data.intermediate_distributions
[docs] def get_output_distribution(self) -> Optional[ObservedDistribution]: return self._data.output_distribution
[docs] def set_chosen_input_quant(self, name: InputName, quant: DataValue[QuantResultTensorType]): assert name in self._data.chosen_input_quant assert self._data.chosen_input_quant[name] is None, "Input quantization is already assigned" assert isinstance(quant, DataValue) self._data.chosen_input_quant[name] = quant
[docs] def get_chosen_input_quant(self, name: InputName) -> DataValue[QuantResultTensorType]: assert name in self._data.chosen_input_quant quant = self._data.chosen_input_quant[name] assert quant is not None, "Input quantization is not assigned" return quant
[docs] def set_chosen_output_quant(self, quant: DataValue[QuantResultTensorType]): assert self._data.chosen_output_quant is None, "Output quantization is already assigned" assert isinstance(quant, DataValue) self._data.chosen_output_quant = quant
[docs] def get_calibration_data(self): return self._data.calibration_inputs
[docs] class OpQuantResult: """ Quantization-related properties of a node's interface after quantization, for use in the quantization algorithm. After a node is quantized, the quantization algorithm may call get_result to get the node's quantization. """ _data: _QuantizeOpData def __init__(self, data: _QuantizeOpData): self._data = data
[docs] def get_result(self) -> NodeAssociatedValue[QuantResultTensorType]: input_quant: Dict[InputName, DataValue[QuantResultTensorType]] = {} for k, v in self._data.chosen_input_quant.items(): assert v is not None input_quant[k] = v output_quant = self._data.chosen_output_quant assert output_quant is not None return NodeAssociatedValue(input_quant, output_quant)
[docs] def make_quantize_op_interface(input_data: Mapping[InputName, Tuple[DataValue[QuantResultTensorType], Optional[ObservedDistribution]]], placeholder_quant: Optional[DataValue[QuantResultTensorType]], output_distribution: Optional[ObservedDistribution], intemediate_distributions: Optional[Dict[str, ObservedDistribution]]) \ -> Tuple[OpQuantInterface, OpQuantResult]: """ Create data structures for the interface between the quantization algorithm and an operator's quantize function. :param input_data: The quantization and value distribution of the node's inputs. :param placeholder_quant: The quantization of the node's value, if the node is a placeholder. :param output_distribution: The value distribution of the node's output. :return: The interface for the operator's quantize function and the interface for the quantization algorithm. """ input_distributions = {} input_quant = {} calibration_inputs = {} chosen_input_quant = {} for name, (t, d, c) in input_data.items(): input_distributions[name] = d input_quant[name] = t calibration_inputs[name] = c chosen_input_quant[name] = None data = _QuantizeOpData(input_distributions, intemediate_distributions, output_distribution, input_quant, placeholder_quant, calibration_inputs, chosen_input_quant, None) return OpQuantInterface(data), OpQuantResult(data)
def _quantize(d: ObservedDistribution, quantized_type: ScalarType, shape: Tuple[int, ...], asymmetry: bool, requant_method: RequantMethod = RequantMethod.fractional_zero) -> QuantResultTensorType: """ Choose a quantized type for a tensor based on the given calibration results. :param d: Value distribution from calibration :param quantized_type: The data type to quantize for :param shape: Shape of the tensor to be quantized :param asymmetry: Whether to use asymmetric quantization :param requant_method: Requantization method. :return: Type and quantization of the quantized tensor """ # Use the range of int values for the selected scalar type, adjusted for the symmetry option. if quantized_type == ScalarType.int8: qrange = (-127, 127) if not asymmetry else (-128, 127) elif quantized_type == ScalarType.int16: qrange = (-32767, 32767) if not asymmetry else (-32768, 32767) else: raise ValueError("Quantization only supports int8 and int16") q = get_expected_tensor_value(d.calculate_quantization(qrange)) return QuantResultTensorType(TensorType(quantized_type, shape), q, requant_method) def _float_scale_for_requantization(input_quant: Quantization, integer_range: Tuple[int, int]) \ -> float: """ Calculate a floating-point scale factor 0 < S <= 1 that scales input_quant so that its quantized values can be represented with output_scalar_type. A value X would be requantized by the calculation round(S * X). :param input_quant: Original quantization, which may or may not be representable in output_scalar_type :param integer_range: Inclusive range of integer values that the quantized values will be represented in :return: Floating-point scale factor for requantization """ int_lo_bound, int_hi_bound = integer_range assert int_lo_bound < 0 < int_hi_bound, "Only signed integer types are supported" # Find the integer value range of input_quant quantized_min: float = input_quant.min_val * input_quant.scale + input_quant.zero_point quantized_max: float = input_quant.max_val * input_quant.scale + input_quant.zero_point # Determine scale correction to make the range fit the output type. # Check both endpoints of the range and use the smallest scale correction. # If correction is greater than 1, then the input range already fits the # output type, so use scale=1 meaning no change to the input. sc_min = int_lo_bound / quantized_min if quantized_min < -1 else 1 sc_max = int_hi_bound / quantized_max if quantized_max > 1 else 1 scale = min(sc_min, sc_max, 1) assert 0 < scale <= 1, "Error in scale calculation" return scale
[docs] def requantize_scaled(input_quant: Quantization, output_scalar_type: ScalarType, shape: Tuple[int, ...], *, restrict_to_pow2: bool = False) \ -> QuantResultTensorType: """ Scale input_quant so that its quantized values can be represented with output_scalar_type. Construct a QuantResultTensorType with the new quantization and type. :param input_quant: Original quantization, which may or may not be representable in output_scalar_type :param output_scalar_type: Scalar type that quantized values will be represented in :param input_shape: Shape of the tensor that is requantized :param restrict_to_pow2: If true, the scale factor will be restricted to a power of 2. This allows requantization to be implemented by a right-shift. :return: Type and quantization of the requantized tensor """ # For calculating the scale factor, add 1 to the integer range's max value so that it is # an exact power of 2. This avoids pessimistic rounding when the input and the output # use a full power-of-2 range. The ratio of max values (2^M - 1)/(2^N - 1) is not an # exact power of 2 and will be rounded in the wrong direction when restrict_to_pow2 is True. # Changing the denominator to (2^N) gives us a ratio that rounds properly. iinfo = np.iinfo(output_scalar_type.numpy_type()) scale = _float_scale_for_requantization(input_quant, (iinfo.min, iinfo.max+1)) # If scale == 1, then scale_correction will be 1 and # the output quantization will have the same scale and zero point as the input. if restrict_to_pow2: # Round the scale correction to a power of 2. To avoid saturation, prefer to round up. shift = -math.log2(scale) if shift % 1 < 0.01: shift = math.floor(shift) else: shift = math.ceil(shift) scale_correction = 2**(-shift) else: scale_correction = scale # Construct new quantization with this shift new_quant = Quantization(scale=input_quant.scale * scale_correction, zero_point=round(input_quant.zero_point * scale_correction), bits=iinfo.bits, min_val=input_quant.min_val, max_val=input_quant.max_val) return QuantResultTensorType(TensorType(output_scalar_type, shape), new_quant, RequantMethod.fractional_zero)
def _quantize_to_int8(d: ObservedDistribution, shape: Tuple[int, ...], asymmetry: bool) -> QuantResultTensorType: return _quantize(d, ScalarType.int8, shape, asymmetry) def _quantize_to_int16(d: ObservedDistribution, shape: Tuple[int, ...], asymmetry: bool) -> QuantResultTensorType: return _quantize(d, ScalarType.int16, shape, asymmetry)
[docs] def quantize_output(i: OpQuantInterface, quantized_type: ScalarType, output_shape: Tuple[int, ...], asymmetry: bool, requant_method: RequantMethod = RequantMethod.fractional_zero) -> QuantResultTensorType: """ Calculate a quantization that could be used for the output, using calibration results. The output must have float32 type when quantization begins. If quantized_type is bfloat16 then the return value will not have quantization information. This function does not change the state of OpQuantInterface; it only returns the quantization. :param i: Object describing the interface of the node to transform. The node must output a tensor, not a tuple. :param quantized_type: The data type to quantize for. :param output_shape: Shape of the node's output. :param asymmetry: Whether to use asymmetric quantization. :param requant_method: Requantization method. :return: A type suitable to use in the result of quantization. """ if quantized_type == ScalarType.bfloat16: return QuantResultTensorType.from_type(TensorType(ScalarType.bfloat16, output_shape)) d = i.get_output_distribution() if d is None: raise ValueError("No observer data for this tensor") return _quantize(d, quantized_type, output_shape, asymmetry, requant_method)
[docs] def fix_output(i: OpQuantInterface, quantized_type: ScalarType, output_shape: Tuple[int, ...], asymmetry: bool) -> QuantResultTensorType: """ Fix the output to the selected quantized type and set the output's quantization. Use calibration results to decide how to quantize it. The output must have float32 type when quantization begins. :param i: Object describing the interface of the node to transform. The node must output a tensor, not a tuple. :param output_shape: Shape of the node's output. :param asymmetry: Whether to use asymmetric quantization. :return: The quantized type of the output. """ quant = quantize_output(i, quantized_type, output_shape, asymmetry) i.set_chosen_output_quant(TensorValue(quant)) return quant
[docs] def fix_output_to_int8(i: OpQuantInterface, output_shape: Tuple[int, ...], asymmetry: bool) -> QuantResultTensorType: """ Fix the output to int8. See fix_output for documentation. """ return fix_output(i, ScalarType.int8, output_shape, asymmetry)
[docs] def fix_output_to_int16(i: OpQuantInterface, output_shape: Tuple[int, ...], asymmetry: bool) -> QuantResultTensorType: """ Fix the output to int16. See fix_output for documentation. """ return fix_output(i, ScalarType.int16, output_shape, asymmetry)
[docs] def fix_input(i: OpQuantInterface, quantized_type: ScalarType, name: InputName, asymmetry: bool) -> QuantResultTensorType: """ Fix the input having the given name to the given type and set the input's quantization. If the input already has the desired type, use the given type and quantization. If quantized_type is bfloat16, then the input will not be quantized. The given type is used without quantization or requantization. If the input type is float32, quantize it. If the input type is int32, requantize it to the narrower integer size. :param i: Object describing the interface of the node to transform. :param quantized_type: The quantized type to use. It must be int8 or int16. :param name: The name of the input to select. The input must have a tensor type. :param asymmetry: Whether to use asymmetric quantization. :return: The quantized type of the input. """ assert quantized_type in (ScalarType.int8, ScalarType.int16, ScalarType.bfloat16) quant: QuantResultTensorType = get_expected_tensor_value(i.get_input_quant()[name]) input_type = quant.type if input_type.scalar == quantized_type: # Input already has the wanted type new_quant = quant elif quantized_type == ScalarType.bfloat16: # Do not quantize when the type is bfloat16. # If the input has an integer type, its quantization must be known so that it can be dequantized. if scalar_is_integral(input_type.scalar): assert quant.quant is not None, \ "Cannot convert integers to bfloat16 due to missing quantization information" new_quant = QuantResultTensorType(TensorType(quantized_type, input_type.shape), None, None) elif scalar_is_floating(input_type.scalar): # Quantize the floating-point input value_distribution = i.get_input_distributions()[name] assert value_distribution is not None, "Cannot quantize due to missing calibration information" new_quant = _quantize(value_distribution, quantized_type, input_type.shape, asymmetry) elif input_type.scalar in (ScalarType.int32, ScalarType.int16, ScalarType.int8): # Convert between signed integer types. Since the type has changed, it must be due to quantization. if scalar_byte_size(input_type.scalar) < scalar_byte_size(quantized_type): # The input type is smaller than the output. Increase the scale by 8 bits so that it # uses more of the numeric range. new_bits = 8 * scalar_byte_size(quantized_type) quantiz = Quantization(scale=quant.quant.scale * (2 ** 8), zero_point=quant.quant.zero_point * (2 ** 8), bits=new_bits, min_val=quant.quant.min_val, max_val=quant.quant.max_val) new_quant = QuantResultTensorType(TensorType(quantized_type, input_type.shape), quant=quantiz, requant_method=quant.requant_method) else: # Requantize to a narrower range. value_distribution = i.get_input_distributions()[name] assert value_distribution is not None, "Cannot quantize due to missing calibration information" assert quant.quant is not None if quant.requant_method == RequantMethod.arith_folded: # Requantize by power-of-2 scale only new_quant = requantize_scaled(quant.quant, quantized_type, input_type.shape, restrict_to_pow2=True) elif quant.requant_method == RequantMethod.scaled_fz: # Requantize by a scale factor new_quant = requantize_scaled(quant.quant, quantized_type, input_type.shape) elif quant.requant_method == RequantMethod.fractional_zero: # Choose new quantization based on the calibration result new_quant = _quantize(value_distribution, quantized_type, input_type.shape, asymmetry) # Requantization should always convert to a smaller scale. # If new_quant has a larger scale, then use the input's scale instead. if new_quant.quant.scale > quant.quant.scale: iinfo = np.iinfo(quantized_type.numpy_type()) # Rescale new_quant's zero point. If the two scales are similar, we need # to use a similar zero point to avoid saturation. new_zp = round(new_quant.quant.zero_point * (quant.quant.scale / new_quant.quant.scale)) # Ensure that min_value and max_value stay within the representable value range new_min_value = max(quant.quant.min_val, (iinfo.min - new_zp) / quant.quant.scale) new_max_value = min(quant.quant.max_val, (iinfo.max - new_zp) / quant.quant.scale) new_quant = dataclasses.replace(new_quant, quant=Quantization(quant.quant.scale, new_zp, new_quant.quant.bits, new_min_value, new_max_value)) else: raise ValueError("Unrecognized value of requant_method") else: raise ValueError("Cannot convert {} to {} during quantization".format(input_type.scalar, quantized_type.name)) i.set_chosen_input_quant(name, TensorValue(new_quant)) return new_quant
[docs] def fix_input_to_int8(i: OpQuantInterface, name: InputName, asymmetry: bool) -> QuantResultTensorType: """ Fix the input having the given name to int8. See fix_input for documentation. """ return fix_input(i, ScalarType.int8, name, asymmetry)
[docs] def fix_input_to_int16(i: OpQuantInterface, name: InputName, asymmetry: bool) -> QuantResultTensorType: """ Fix the input having the given name to int16. See fix_input for documentation. """ return fix_input(i, ScalarType.int16, name, asymmetry)
[docs] def fix_input_to_float32(i: OpQuantInterface, name: InputName) -> QuantResultTensorType: """ Fix the input having the given name to float32. If the input's type is not already float32, then dequantize it. The input must have a known quantization in this case. Set the input's quantization to reflect a float32 type and no quantization. :param i: Object describing the interface of the node to transform. :param name: The name of the input to select. The input must have a tensor type. :param asymmetry: Whether to use asymmetric quantization. :return: The quantized type of the input. """ quant: QuantResultTensorType = get_expected_tensor_value(i.get_input_quant()[name]) input_type = quant.type if input_type.scalar == ScalarType.float32: # Type is already float32 new_quant = quant elif input_type.scalar in (ScalarType.int8, ScalarType.int16, ScalarType.int32): # Dequantize the integer input assert quant.quant is not None, "Tensor must be dequantized, but its quantization is unknown" new_quant = QuantResultTensorType.from_type(TensorType(ScalarType.float32, input_type.shape)) else: raise ValueError("Cannot convert {} to float32 during quantization".format(input_type.scalar)) i.set_chosen_input_quant(name, TensorValue(new_quant)) return new_quant
[docs] def keep_input(i: OpQuantInterface, name: InputName) -> QuantResultTensorType: q = i.get_input_quant()[name] assert isinstance(q, TensorValue), "Expecting a tensor (not tuple) for data" i.set_chosen_input_quant(name, q) return q.value
[docs] def fix_output_from_input(i: OpQuantInterface, shape: Tuple[int, ...], name: Optional[InputName] = None) \ -> QuantResultTensorType: """ Set the output to use the same scalar type and quantization as the input. The input's quantization must be set by set_input_quant first. :param i: Object describing the interface of the node to transform. :param shape: Shape of the output. It will be set to this shape. :param name: The name of the input whose type and quantization will be copied. The input's quantization must have been set by set_input_quant. If it is None, the node must have exactly one input, and that input will be used. :return: The quantized type of the output. """ if name is None: input_quant_dict = i.get_input_quant() assert len(input_quant_dict) == 1, "Node must have exactly one input when name is not given" name = next(iter(input_quant_dict.keys())) input_quant = get_expected_tensor_value(i.get_chosen_input_quant(name)) output_type = TensorType(input_quant.type.scalar, shape) output_quant = QuantResultTensorType(output_type, input_quant.quant, input_quant.requant_method) i.set_chosen_output_quant(TensorValue(output_quant)) return output_quant
[docs] def get_intermediate_min_max(i: OpQuantInterface) \ -> dict[str, tuple[float, float]]: """ Get min and max values of intermediate calibration data. This function does not change the state of OpQuantInterface; it only returns the dict of intermediate min/max values. :param i: Object describing the interface of the node to transform. :return: The dict of intermediate min/max values. """ intermediate_min_max = dict() intermediate_distributions = i.get_intermediate_distributions() for n, d in intermediate_distributions.items(): if d is None: raise ValueError(f"No data for {n} intermediate observer.") intermediate_min_max[n] = d.get_min_max() return intermediate_min_max