Source code for afe.ir.quantization_conv

#########################################################
# Copyright (C) 2023 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Christopher Rodrigues
#########################################################
"""
Quantization functions for convolution and matrix multiply.
"""
import dataclasses
from typing import Union, Optional, Tuple, List, cast, Callable, TypeVar
import numpy as np
import math
import enum

from ml_kernels import requantization
from ml_kernels.types import bfloat16, iinfo, int4
from sima_utils.logging import sima_logger
from ml_kernels.math_helpers import RoundType
from ml_kernels.requantization import BaseRequantization, ArithFoldedRequantization, TFLiteRequantization, Narrowing

from afe.ir import attributes as attributes
from afe.ir.attributes import ObservedDistribution, ConvAddActivationAttrs
from afe.ir.bias_correction import BiasCorrector
from afe.ir.defines import Quantization, get_expected_tensor_value, NodeReporter, AwesomeConvWeightLayout, \
    AwesomeConvWeightLayout5D, QuantizedTensor
from afe.ir.quantization_utils import compute_scale
from afe.ir.tensor_type import ScalarType
from afe.ir.utils import create_and_verify_narrowing, transpose_tensor_according_to_layout_strings


_A = TypeVar("_A")

# A scale factor related to channels.  It is an array of float,
# one-dimensional for per-channel or zero-dimensional for per-tensor.
[docs] ChannelScale = np.ndarray
# A integer scale factor value related to channels. It is an array # of 8-bit int, one-dimensional for per-channel or zero-dimensional # for per-tensor.
[docs] ChannelQScale = np.ndarray
# A right-shift value related to channels. It is an array of int, # one-dimensional for per-channel or zero-dimensional for per-tensor. # Array elements are integers in the valid range for the input of the # shift operation, either [0, 31] or [0, 63].
[docs] ChannelShift = np.ndarray
# Intrinsic shift values used in int16 quantization
[docs] INTRINSIC_SHIFT_LO = 1
[docs] INTRINSIC_SHIFT_HI = 8
class _SilenceNumpyFPErrors: """ Context manager that silences floating-point errors in Numpy. On entry, Numpy is globally configured to ignore floating-point errors, and the old configuration is saved. On exit, the old configuration is restored. """ def __init__(self): self._err = None def __enter__(self): assert self._err is None self._err = np.geterr() np.seterr(all='ignore') def __exit__(self, exc_type, exc_val, exc_tb): np.seterr(**self._err) self._err = None class _Severity(enum.Enum): info = enum.auto() debug = enum.auto() warn = enum.auto() @dataclasses.dataclass(frozen=True) class _ReporterMessage: """A message that can be reported using a NodeReporter.""" severity: _Severity message: str def _report_message(reporter: NodeReporter, message: _ReporterMessage): """Use reporter to report message.""" match message.severity: case _Severity.info: reporter.info(message.message) case _Severity.debug: reporter.debug(message.message) case _Severity.warn: reporter.warn(message.message) case _: raise ValueError("Unrecognized severity") class _BufferedNodeReporter(NodeReporter): """ An error reporter that writes messages to a buffer. Messages can be retrieved by calling get_buffer. """ _buffer: List[_ReporterMessage] def __init__(self): self._buffer = [] def info(self, msg: str): self._buffer.append(_ReporterMessage(_Severity.info, msg)) def debug(self, msg: str): self._buffer.append(_ReporterMessage(_Severity.debug, msg)) def warn(self, msg: str): self._buffer.append(_ReporterMessage(_Severity.warn, msg)) def get_buffer(self) -> List[_ReporterMessage]: """Get a copy of the buffered messages.""" return list(self._buffer)
[docs] def reshape_weight_to_output_channels(weight: np.ndarray) -> np.ndarray: """ Reshape a weight tensor so that its last axis corresponds to a convolution operation's output channel axis. That is, the convolution's output at a given channel output[..., c] depends on reshaped_weights[..., c], bias[c], and some values from the convolution's input. This tensor shape is useful for code that computes per-channel information or does per-channel scaling on weights. """ return weight.reshape(weight.shape[:-2] + (-1,))
[docs] def get_quantization_range(dtype: Union[ScalarType, np.number], asymmetry: bool) -> Tuple[int, int]: """ Get the numeric range that should be used when quantizing numbers to be stored using dtype. The range is the entire value range when using asymmetric quantization, and is reduced to a symmetric range when using symmetric quantization. :param dtype: Quantized data type. It must be a signed integer type. :param asymmetry: Whether to use an asymmetric range :return: Numeric range """ if isinstance(dtype, ScalarType): dtype = dtype.numpy_type() iinfo = np.iinfo(dtype) if iinfo.min >= 0: raise ValueError("Quantized data type must be a signed integer type") lower_bound = iinfo.min if asymmetry else iinfo.min + 1 return lower_bound, iinfo.max
[docs] def decompose_power_of_2(x: ChannelScale, rounding: RoundType) -> Tuple[ChannelShift, ChannelScale]: """ Decompose x into a power-of-2 part i and a fractional part f such that x = f * 2**i The range of f is selected based on how i is rounded: UPWARD: 0.5 < f <= 1 TONEAREST: sqrt(0.5) <= f <= sqrt(2) TRUNC: 1 <= f < 2 Where x is 0, f and i will be 0. :param x: Number to decompose :param rounding: How to round the exponent :return: Decomposed values (i, f) """ is_zero: np.ndarray = x == 0 with _SilenceNumpyFPErrors(): exponent = np.where(is_zero, 0, np.log2(x)) match rounding: case RoundType.UPWARD: exponent = np.ceil(exponent) bounds = (0.5, 1.0) case RoundType.TRUNC: exponent = np.floor(exponent) bounds = (1.0, 2.0) case RoundType.TONEAREST: exponent = np.round(exponent) bounds = (math.sqrt(0.5), math.sqrt(2)) case _: raise ValueError("Unsupported rounding mode") power_of_2 = np.power(2, exponent) fractional = x / power_of_2 # Ensure that the fractional part is within range where x is not zero assert np.all(np.logical_or(is_zero, np.logical_and(fractional > bounds[0] - _POW2_ASSERTION_TOL, fractional < bounds[1] + _POW2_ASSERTION_TOL))) return exponent, fractional
[docs] def normalize_with_pow2(x: ChannelScale) -> Tuple[ChannelShift, ChannelScale]: """ Find powers of 2 that normalize each element of x to the range (0.5, 1.0]. :param x: Scale factors to normalize :return: Tuple (i, y) of exponents and normalized scale factors satisfying x = y * 2**i. """ return decompose_power_of_2(x, RoundType.UPWARD)
[docs] def weight_single_quantization_scale(weight: np.ndarray, bits: int = 8) -> float: """ Calculate a scalar quantization scale for a convolution or matrix multiply weight tensor. :param weight: Floating-point weight tensor :param bits: Number of bits used for quantization :return: Quantization scale. It has the same meaning as the scale field of class Quantization. """ min_val = np.min(weight) max_val = np.max(weight) if min_val == max_val == 0: return 0.0 return compute_scale(asymmetry=False, layer_bits=bits, min_val=min_val, max_val=max_val, include_real_zero_point=True)
[docs] def weight_quantization_scale(weight: np.ndarray, per_channel: bool, bits: int = 8) -> ChannelScale: """ Calculate a quantization scale for a convolution or matrix multiply weight tensor. :param weight: Floating-point weight tensor :param per_channel: Whether to do per-channel quantization :param bits: Number of bits to be used :return: Quantization scale. """ if per_channel: weight = reshape_weight_to_output_channels(weight) channels = weight.shape[-1] return np.array([weight_single_quantization_scale(weight[..., c], bits) for c in range(channels)]) else: return np.array(weight_single_quantization_scale(weight, bits))
[docs] class ConvolutionPrecision(enum.Enum): """ The precision to use for quantizing convolution. This determines how quantization does some calculations and chooses which integer type to use. Some choices (such as sima_int8) completely determine the integer type, while others do not. """ # Use int8 output and arith_folded requantization (right-shift only)
[docs] sima_int8 = enum.auto()
# Use int8 output and tflite requantization (multiply, shift, and add)
[docs] tflite_int8 = enum.auto()
# Use int8 output and tflite requantization, but force multiplier to be 1. # This method uses power-of-2 requantization like sima and zero point # correction like tflite.
[docs] restricted_tflite_int8 = enum.auto()
# Use int32 output with a power-of-2 range so that it can be # right-shifted and converted to either int8 or int16
[docs] sima_int16 = enum.auto()
# Use int32 output and use tflite requantization to convert to either # int8 or int16
[docs] tflite_int16 = enum.auto()
# Use int16 output and tflite requantization, but force multiplier to be 1. # This method uses power-of-2 requantization like sima and zero point # correction like tflite.
[docs] restricted_tflite_int16 = enum.auto()
# Use int32 output with full precision
[docs] sima_int32 = enum.auto()
[docs] def has_multiplier(self) -> bool: """ Return true if this quantization method can use a TFLite multiplier other than 1. Return False if it uses ArithFoldedRequantization or forces the multiplier to be 1. """ return self in _HAS_MULTIPLIER_PRECISIONS
[docs] def has_zp_correction(self) -> bool: """ Return true if this quantization method can use a zero point correction other than 0. """ return self in _HAS_ZP_CORRECTION_PRECISIONS
[docs] def is_arith_folded(self) -> bool: """Return true if this is one of the quantization methods that uses ArithFoldedRequantization.""" return self in _ARITH_FOLDED_PRECISIONS
[docs] def is_tflite(self) -> bool: """Return true if this is one of the quantization methods that uses TFLiteRequantization.""" return self in _HAS_ZP_CORRECTION_PRECISIONS
_ARITH_FOLDED_PRECISIONS = (ConvolutionPrecision.sima_int8, ConvolutionPrecision.sima_int16, ConvolutionPrecision.sima_int32) _HAS_MULTIPLIER_PRECISIONS = (ConvolutionPrecision.tflite_int8, ConvolutionPrecision.tflite_int16) _HAS_ZP_CORRECTION_PRECISIONS = (ConvolutionPrecision.tflite_int8, ConvolutionPrecision.restricted_tflite_int8, ConvolutionPrecision.tflite_int16, ConvolutionPrecision.restricted_tflite_int16) def _get_preferred_range(precision: ConvolutionPrecision, asymmetry: bool) -> Tuple[int, int]: """ Get the value range that this quantization method prefers to use. Tensors are generally quantized to this range unless it violates a limit on the quantized calculation. :param precision: Precision to quantize for. :param asymmetry: Whether to use asymmetric quantization. :return: The quantized integer value range. """ match precision: case ConvolutionPrecision.sima_int8 | ConvolutionPrecision.restricted_tflite_int8 \ | ConvolutionPrecision.tflite_int8: return get_quantization_range(ScalarType.int8, asymmetry) case ConvolutionPrecision.sima_int16 | ConvolutionPrecision.restricted_tflite_int16: # Use 256 times the int8 range, which is slightly less than the int16 range, # so that the result is consistent with int8 (l, u) = get_quantization_range(ScalarType.int8, asymmetry) return (256 * l, 256 * u) case ConvolutionPrecision.tflite_int16: return get_quantization_range(ScalarType.int16, asymmetry) case ConvolutionPrecision.sima_int32: return get_quantization_range(ScalarType.int32, asymmetry) case _: raise ValueError("Unrecognized precision")
[docs] class ConvPlanRequantization: """ Adjustable requantization for convolution. This class holds the requantization as both a floating-point number and a quantized representation. When these values are modified, they are kept consistent (modulo rounding) with the formula scale = multiplier * (2**-shift) :param scale: Requantization scale as a floating-point value. :param shift: Right shift to perform. Its shape must be the same as scale's. :param multiplier: Integer multiplier to use. Its shape must be either () or the same as scale's. """
[docs] scale: ChannelScale
[docs] shift: ChannelShift
[docs] multiplier: ChannelQScale
def __init__(self, scale: ChannelScale, shift: ChannelShift, multiplier: ChannelQScale): assert scale.shape == shift.shape assert multiplier.shape == () or multiplier.shape == shift.shape self.scale = scale self.shift = shift self.multiplier = multiplier
[docs] def deepcopy(self) -> "ConvPlanRequantization": """Make an independent copy of this object.""" return ConvPlanRequantization(self.scale.copy(), self.shift.copy(), self.multiplier.copy())
[docs] def adjust_shift(self, adjustment: Union[ChannelShift, int]): """ Add the given value to the right-shift value. """ self.shift = self.shift + adjustment self.scale = self.scale * np.exp2(-adjustment)
[docs] def set_unit_scale(self, positions: np.ndarray): """ Set the scale to 1 in the given positions. Shift is set to 0 and multiplier is set to 1 in the given positions. """ self.scale = np.where(positions, 1, self.scale) self.shift = np.where(positions, 0, self.shift) self.multiplier = np.where(positions, 1, self.multiplier)
@dataclasses.dataclass
[docs] class ConvPlanQuantizations: """ Adjustable quantization parameters for convolution or matrix multiply. This class holds parameters that may be modified while deciding how to quantize the calculation. The parameters relate a real-number calculation c = a * w + b to a quantized calculation (the actual calculation is not selected here, and it may be different from this formula) Qc = S * (Qa * Qw) / 2^h + constant_terms by Qw = w * Sw Qa = a * Sa Qc = c * Sc + Zc S = 2^h * Sc / (Sa * Sw). The factor of 2^h is a right-shift that is included in the integer convolution. :param weight: Scale factor Sw relating real weight w to quantized weight Qw. It may contain 0. :param output: Quantization (Sc, Zc) relating real output c to quantized output Qc :param requant: Requantization S relating quantized product to output Qc :param intrinsic_shift: Right-shift h, used to produce an additional scale factor in the convolution product """
[docs] weight: ChannelScale
[docs] output: Quantization
[docs] requant: ConvPlanRequantization
[docs] intrinsic_shift: np.ndarray
[docs] def deepcopy(self) -> "ConvPlanQuantizations": """Make an independent copy of this object.""" return ConvPlanQuantizations(self.weight.copy(), self.output, self.requant.deepcopy(), self.intrinsic_shift)
[docs] def set_intrinsic_shift(self, value: np.ndarray): """Set the intrinsic shift, h, to the given value.""" self.intrinsic_shift = value
[docs] def set_weight_zero(self, positions: np.ndarray): """ Set the weight scale, Sw, to 0 at the given channel positions. """ assert positions.dtype == np.bool_ self.weight = np.where(positions, 0, self.weight)
[docs] def set_requant_one(self, positions: np.ndarray): """ Set the requantization scale to 1 at the given channel positions. """ assert positions.dtype == np.bool_ self.requant.set_unit_scale(positions)
[docs] def scale_weight_pow2(self, exponent: Union[np.ndarray, int]): """ Multiply the weight quantization scale, Sw, by 2**exponent. """ self.weight *= np.exp2(exponent)
[docs] def scale_output_pow2(self, exponent: int): """ Multiply the output quantization scale, Sc, by 2**exponent. """ assert isinstance(exponent, int) q = self.output adjustment = np.exp2(exponent) # Adjust both the scale and zero point. Adjusting the zero point makes the # value range be closer to 0. # This reduces the likelihood of numeric saturation in sima quantization, but # it may prevent doing relu optimization. self.output = dataclasses.replace(q, scale=float(q.scale * adjustment), zero_point=int(q.zero_point * adjustment))
[docs] def scale_requant_pow2(self, exponent: np.ndarray): """ Multiply the requantization, S, by 2**exponent. """ # Negate the exponent to get a right-shift value shift_value = np.broadcast_to(-exponent, self.weight.shape) self.requant.adjust_shift(shift_value)
# Rounding error tolerance for power-of-2 assertion checks, e.g. assert (x > 0.5 - _POW2_ASSERTION_TOL) _POW2_ASSERTION_TOL = 1e-4
[docs] def select_convolution_scales( weight: np.ndarray, input_quant: Quantization, output_distribution: ObservedDistribution, *, precision: ConvolutionPrecision, asymmetry: bool, per_channel: bool ) -> ConvPlanQuantizations: """ Choose quantization parameters for a generalized matrix multiply based on the input's quantization and the optimal quantization of the weight and output. This choice does not account for value ranges of other integer constants and intermediate results. Those should be handled separately. :param weight: A weight tensor. :param input_quant: Quantization that was selected for the input of generalized matrix multiply. :param output_distribution: Value distribution of the output of generalized matrix multiply. :param precision: Precision to quantize for. :param asymmetry: Whether to use asymmetric quantization. :param per_channel: Whether to do per-channel quantization. If true, the scales will be a tensor with one value per channel. If false, the scales will be scalars. :return: Weight tensor scale, requantization scale, and quantization of the convolution output. """ # Quantize weights for int8 weight_scale = weight_quantization_scale(weight, per_channel) # Quantize output for the chosen value range output_qrange = _get_preferred_range(precision, asymmetry) output_quant = get_expected_tensor_value(output_distribution.calculate_quantization(output_qrange)) if output_quant.scale == 0: # Convolution's output would be zero, which is abnormal for a model. # As the quantization algorithm assumes scale is nonzero, raise an exception. raise sima_logger.UserFacingException("Cannot quantize a zero-valued convolution. This may be the result of " "unsuitable training or calibration data selection or model misconfiguration.") # Compute the requantization scale factor, S. # If product scale is 0, then S is a free parameter, and we choose S=1 so that bias is easy to quantize. product_scale = input_quant.scale * weight_scale assert np.all(product_scale >= 0) with _SilenceNumpyFPErrors(): rq_scale = output_quant.scale / product_scale rq_scale = np.where(np.isfinite(rq_scale), rq_scale, 1.0) # Normalize S. Values selected here may not be usable in the quantized algorithm; that is fixed later. rq_scale_exponent, rq_scale_fractional = normalize_with_pow2(rq_scale) # Quantize S = multiplier * 2**(-shift), with integer multiplier and shift if precision.has_multiplier(): # Use 7-bit multiplier rq_scale_multiplier = np.clip(np.round(rq_scale_fractional * (1 << 7)), -128, 127).astype(np.int8) rq_scale_exponent -= 7 else: # Adjust the weight scale so that the fractional part is 1. Multiplier must be 1. weight_scale *= rq_scale_fractional rq_scale_multiplier = np.array(1) with _SilenceNumpyFPErrors(): rq_scale = np.where(rq_scale_fractional == 0, rq_scale, rq_scale / rq_scale_fractional) rq_scale_shift = -rq_scale_exponent return ConvPlanQuantizations(weight_scale, output_quant, ConvPlanRequantization(rq_scale, rq_scale_shift, rq_scale_multiplier), 0)
@dataclasses.dataclass
[docs] class ConvBacktrackingParameters: """ Quantization parameters that are fixed at the beginning of the quantization algorithm, such that the algorithm has to restart if they are changed. These values may be modified in the backtracking loop. :param precision: Precision to use for output calculations. :param relu_fallback_precision: Alternative precision to use if "precision" can't be supported due to limitations in the backend's implementation of ReLU. If this is None, "precision" is assumed to be fully supported. :param intrinsic_shift_adjustment: Locations where extra right-shift is used with the int15 convolution algorithm. When the input is int8, it must be a 0D array of False. It is an array of bool, where True means to use extra right-shift. It is 0D for per-tensor or 1D for per-channel. :param weight_adjustment: Extra right-shift applied to weights. Values greater than zero reduce the weight's precision to fewer than 8 bits. It is an array of int. It is 0D for per-tensor or 1D for per-channel. """
[docs] precision: ConvolutionPrecision
[docs] relu_fallback_precision: Optional[ConvolutionPrecision]
[docs] intrinsic_shift_adjustment: np.ndarray
[docs] weight_adjustment: np.ndarray
@staticmethod
[docs] def default_intrinsic_shift_adjustment(n_channels: int, per_channel: bool, use_int15: bool) -> np.ndarray: """ Default value of intrinsic shift. The default is not to use any extra right-shift. :param n_channels: Number of channels in the convolution output :param per_channel: Whether per-channel quantization is used :param use_int15: Whether the int15 convolution algorithm is used :return: Default value of intrinsic shift """ return np.full(n_channels if per_channel and use_int15 else (), False, dtype=bool)
@staticmethod
[docs] def default_weight_adjustment(n_channels: int, per_channel: bool) -> np.ndarray: """ Default weight adjustment. The default is not to use any extra right-shift. :param n_channels: Number of channels in the convolution output :param per_channel: Whether per-channel quantization is used :return: Default value of weight adjustment """ return np.zeros(n_channels if per_channel else (), dtype=int)
class _Retry(Exception): """ Restart the current backtracking loop. This exception should only be raised by the callable parameter of run_backtracking_loop. """ pass
[docs] def run_backtracking_loop(f: Callable[[NodeReporter], _A], backtracking_limit: int, backtracking_error_message: str, error_reporter: Optional[NodeReporter] = None) -> _A: """ Retry the backtracking computation in f until it succeeds. The callable object in f represents a restartable function that uses some mutable state to represent its starting condition. It may update its mutable state and raise a _Retry exception to restart; the state change should help it make progress after it restarts. It may return a value to end the loop. :param f: Backtracking computation to run :param backtracking_limit: Maximum number of times to attempt f. If f is attempted this many times without returning a result, an exception will be raised. :param backtracking_error_message: Error message to use if f does not return. :param error_reporter: Used for reporting errors. :return: Return value of f. """ for i in range(backtracking_limit): # Buffer log messages. Only logs from the final iteration are returned. local_error_reporter = _BufferedNodeReporter() try: result = f(local_error_reporter) break except _Retry: continue else: raise RuntimeError(backtracking_error_message) # Transfer the buffered errors to the error reporter if error_reporter is not None: for e in local_error_reporter.get_buffer(): _report_message(error_reporter, e) return result
[docs] def adjust_plan_zero_weights(weights: np.ndarray, quantizations: ConvPlanQuantizations, per_channel: bool, error_reporter: NodeReporter): """ Adjust the convolution plan where the weights would be zero after quantization. :param weights: Floating-point weights. :param quantizations: Quantization parameters. Will be modified. :param per_channel: Whether to do per-channel quantization. :param error_reporter: Error reporter used for quantization warnings. """ # Use a per-channel view of weights weights = reshape_weight_to_output_channels(weights) # Where weight would become zero after quantization, the requantization scale is a free parameter. # Set the scale to 1 because that makes it easier to quantize bias. abs_quantized_weight = np.abs(weights * quantizations.weight) if per_channel: weight_is_zero = np.all(abs_quantized_weight <= 0.5, axis=tuple(range(len(weights.shape)-1))) else: weight_is_zero = np.all(abs_quantized_weight <= 0.5) quantizations.set_weight_zero(weight_is_zero) quantizations.set_requant_one(weight_is_zero) if np.all(weight_is_zero): error_reporter.warn("All weights are zero. Please consider re-training the model.") elif np.any(weight_is_zero): error_reporter.warn("All weights in some channels are zero. Please consider re-training the model.")
[docs] def try_increase_intrinsic_shift(backtracking_parameters: ConvBacktrackingParameters, positions: np.ndarray) -> None: """ Set backtracking_parameters.intrinsic_shift_adjustment to True where positions is True. Raise _Retry() if any backtracking parameters were changed. :param backtracking_parameters: Mutable variables for backtracking. May be modified. :param positions: Array of bool, containing True where the intrinsic shift adjustment should be set to True. """ old_value = backtracking_parameters.intrinsic_shift_adjustment new_value = np.logical_or(old_value, positions) if not np.array_equal(new_value, old_value): backtracking_parameters.intrinsic_shift_adjustment = new_value raise _Retry()
[docs] def try_adjust_plan_shift_value(backtracking_parameters: ConvBacktrackingParameters, quantizations: ConvPlanQuantizations, use_int15: bool, error_reporter: NodeReporter) -> None: """ Adjust the convolution plan where the shift value is out of range or where the shift is so large that it causes severe precision loss. Raise _Retry() if any backtracking parameters were changed. :param backtracking_parameters: Mutable variables for backtracking. May be modified. :param quantizations: Quantization parameters. May be modified. :param use_int15: Whether the plan is for int15 convolution. :param error_reporter: Error reporter used for quantization warnings. """ # If shift is above this threshold, reduce the shift to increase precision of the output. shift_rescale_threshold: int # If shift is equal or greater than this value, the output would always be 0. shift_value_limit: int if backtracking_parameters.precision.has_multiplier(): # Multiplier is used, so the input of right-shift has 39 bits of precision. # It is 39 bits because it's the product of one 32-bit and one 8-bit signed number. shift_rescale_threshold = 37 shift_rescale_amount = 2 shift_value_limit = 39 else: # No multiplier is used, so the input of shift is 32-bit. shift_rescale_threshold = 31 shift_rescale_amount = 2 shift_value_limit = 32 # Cannot use left shift. If the shift value is negative, # diminish the requantization scale and output scale so that the shift is 0. shift_too_small: np.ndarray = quantizations.requant.shift < 0 if np.any(shift_too_small): minimum_shift = np.min(quantizations.requant.shift) quantizations.scale_requant_pow2(minimum_shift) quantizations.scale_output_pow2(int(minimum_shift)) # This situation can happen if output type is int16. It is abnormal if output type is int8. if backtracking_parameters.precision in (ConvolutionPrecision.sima_int8, ConvolutionPrecision.restricted_tflite_int8, ConvolutionPrecision.tflite_int8): error_reporter.warn("Values after quantization are too small to attain full precision in convolution. " "Please consider changing quantization parameters or re-training the model.") # If the shift is very large, it will eliminate all bits of the result and # the output will be 0. To bring the shift value into range, # reduce the weight scale and increase the requantization scale. shift_too_large = np.array(quantizations.requant.shift >= shift_rescale_threshold) if np.any(shift_too_large): # Reduce the shift amount by increasing intrinsic_shift, if possible if use_int15: try_increase_intrinsic_shift(backtracking_parameters, shift_too_large) # Otherwise, change the weight scale adjustment = np.where(shift_too_large, shift_rescale_amount, 0) quantizations.scale_weight_pow2(-adjustment) quantizations.scale_requant_pow2(adjustment) error_reporter.warn("Precision of weights was reduced to avoid numeric saturation. " "Saturation was detected for integer convolution.") # If shift is still too large after adjustment, then the output will be 0, # so set the weights to 0. shift_too_large = np.array(quantizations.requant.shift >= shift_value_limit) if np.any(shift_too_large): quantizations.set_weight_zero(shift_too_large) quantizations.set_requant_one(shift_too_large) error_reporter.warn("Weights were set to zero to avoid numeric saturation. " "Saturation was detected for integer convolution.")
[docs] def try_adjust_plan_product_value(backtracking_parameters: ConvBacktrackingParameters, quantizations: ConvPlanQuantizations, use_int15: bool, error_reporter: NodeReporter) -> None: """ Adjust the convolution plan where the integer convolution result is not in the representable range. Raise _Retry() if any backtracking parameters were changed. :param backtracking_parameters: Mutable variables for backtracking. May be modified. :param quantizations: Quantization parameters. May be modified. :param use_int15: Whether the plan is for int15 convolution. :param error_reporter: Error reporter used for quantization warnings. """ # The following constraint is required when zp correction is combined with the bias array. if not backtracking_parameters.precision.has_zp_correction(): # Ensure that the quantized zero value is representable in intermediate calculations. # That is, ensure that (Zc / S) is in the int32 range. # If not, reduce the weight scale and increase the requantization scale. # This reduces precision in exchange for correct zero point. zero_point_intermediate_value = quantizations.output.zero_point / quantizations.requant.scale int32_info = np.iinfo(np.int32) # Ratio of zero point to int32 max/min value. If greater than 1, zero point is outside the range. zero_point_int32_ratio = np.maximum(zero_point_intermediate_value / int32_info.min, zero_point_intermediate_value / int32_info.max) zero_point_too_large = zero_point_int32_ratio > 1 if np.any(zero_point_too_large): # Calculate how much to adjust the scale so that zero point is within range with _SilenceNumpyFPErrors(): shift_adjustment = np.ceil(np.log2(zero_point_int32_ratio)).astype(int) shift_adjustment = np.where(zero_point_too_large, shift_adjustment, 0) assert np.all(shift_adjustment >= 0) quantizations.scale_requant_pow2(shift_adjustment) quantizations.scale_weight_pow2(-shift_adjustment) error_reporter.warn("Precision of weights was reduced to avoid numeric saturation. " "Saturation was detected in the zero point.") # Shift value was reduced. Ensure that the new shift value is in the valid range. try_adjust_plan_shift_value(backtracking_parameters, quantizations, use_int15, error_reporter)
[docs] def quantize_convolution_scales( quantizations: ConvPlanQuantizations, precision: ConvolutionPrecision, allow_full_output_precision: bool ) -> Tuple[ChannelScale, ChannelScale, BaseRequantization[np.ndarray], ScalarType, Quantization]: """ Adjust the quantization parameters based on zero values, limits on integer constants, and limits on integer intermediate results. The final choice of weight scale, bias scale, requantization, and output quantization are returned. :param quantizations: Quantization parameters. :param precision: The precision to use for quantizing convolution. :param allow_full_output_precision: Whether 16-bit precision can be widened to 32-bit output. If false, quantizing with 16-bit precision will always produce 16-bit output. :return: New quantization scale of weights, requantization to perform after convolution, type of output, and quantization of output. """ # Create the final quantization parameters based on the quantization data rq_scale_shift = quantizations.requant.shift if rq_scale_shift.shape == (): final_shift = int(rq_scale_shift.item()) else: final_shift = rq_scale_shift.astype(np.int32) use_per_tensor_shift = isinstance(final_shift, int) rq_scale_integral = quantizations.requant.multiplier if rq_scale_integral.shape == (): final_scale_multiplier = int(rq_scale_integral.item()) else: final_scale_multiplier = rq_scale_integral.astype(np.int8) # Choose output type match precision: case ConvolutionPrecision.sima_int8 | ConvolutionPrecision.restricted_tflite_int8 \ | ConvolutionPrecision.tflite_int8: output_type = ScalarType.int8 case ConvolutionPrecision.tflite_int16 | ConvolutionPrecision.restricted_tflite_int16: output_type = ScalarType.int16 case ConvolutionPrecision.sima_int16: if allow_full_output_precision and use_per_tensor_shift: # Use int32 output with no requantization. # This enables some optimizations related to requantization. output_type = ScalarType.int32 else: output_type = ScalarType.int16 case ConvolutionPrecision.sima_int32: output_type = ScalarType.int32 case _: raise ValueError("Unrecognized precision") # Bias scale must be derived from the output scale. # Can't use input scale if weights are 0. bias_scale = quantizations.output.scale / quantizations.requant.scale if precision.is_arith_folded(): assert np.all(final_scale_multiplier == 1), "SiMa quantization does not use a scale multiplier" output_quant = quantizations.output if output_type == ScalarType.int32 and use_per_tensor_shift: # Set final_shift to 0 to facilitate subsequent optimizations. Adjust output_quant accordingly. output_quant = dataclasses.replace(output_quant, scale=(1 << final_shift) * output_quant.scale, zero_point=(1 << final_shift) * output_quant.zero_point) final_shift = 0 # else, must keep final_shift to handle per-channel shift and/or produce the proper numeric type requantization = ArithFoldedRequantization(create_and_verify_narrowing(final_shift, RoundType.TOEVEN, output_type.numpy_type())) elif precision.is_tflite(): output_quant = quantizations.output requantization = TFLiteRequantization(final_scale_multiplier, final_shift, RoundType.TOEVEN, quantizations.output.zero_point, output_type.numpy_type()) else: raise ValueError("Unrecognized precision") return quantizations.weight, bias_scale, requantization, output_type, output_quant
[docs] def quantize_weight_tensor( weight: np.ndarray, weight_scale: ChannelScale, bits: int = 8 ) -> Tuple[np.ndarray, np.ndarray]: """ Create a quantized weight tensor. :param weight: np.ndarray, weights value being quantized :param weight_scale: np.ndarray Scale of the weights. :param bits: Number of bits used for quantized weights. :return: Tuple of np.ndarray. First returned value is quantized weights, while fake_quantized weights are calculated by dividing quantized weights by scale, thus returning them to similar fp32 values, and exposing quantization difference that is caused by rounding and clipping during quantization. """ if weight_scale.shape != (): # Per-channel scale. Reshape it to (groups, channels) so that it can be broadcast to weight. weight_scale = weight_scale.reshape(weight.shape[-2:]) scaled_weight = weight * weight_scale weight_dtype = int4 if bits == 4 else np.int8 weight_info = iinfo(weight_dtype) q_weight = np.clip(scaled_weight, weight_info.min, weight_info.max).round().astype(weight_dtype) # To handle instances where the scale for this channel is zero. The quantized weight should also be zero in that case with _SilenceNumpyFPErrors(): fake_q_weight = np.where(weight_scale == 0, 0, q_weight.astype(weight_dtype) / weight_scale) return q_weight, fake_q_weight
[docs] def try_quantize_bias_tensor(backtracking_parameters: ConvBacktrackingParameters, bias: Optional[np.ndarray], zp_correction: np.ndarray, bias_scale: ChannelScale, use_int15: bool, per_channel: bool) -> np.ndarray: """ Quantize a bias tensor. If it can't be quantized due to integer overflow, adjust backtracking parameters. Raise _Retry() if any backtracking parameters were changed. :param backtracking_parameters: Mutable variables for backtracking. May be modified. :param bias: Floating-point bias tensor. :param zp_correction: Integer zero point correction to be added to the bias. This may include correction for the input zero point and/or output zero point, depending on the quantization scheme. :param bias_scale: Quantization scale to use for bias. :param use_int15: Whether int15 convolution is used. :param per_channel: Whether per-channel quantization is used. :return: Quantized bias tensor. """ # Bias scaled to match the quantized representation, but still in floating-point if bias is not None: assert bias.shape == zp_correction.shape assert bias_scale.shape == () or bias_scale.shape == bias.shape scaled_bias = bias * bias_scale + zp_correction else: scaled_bias = zp_correction # If the tensor value would be outside the int32 range, then the bias would overflow. # Quantization failed. Expect the caller to retry with a more conservative scale. int32_iinfo = np.iinfo(np.int32) overflow_positions = np.logical_or(scaled_bias > int32_iinfo.max + 0.5, scaled_bias < int32_iinfo.min - 0.5) if np.any(overflow_positions): # Bias exceeds the int32 range. Adjust parameters to make bias_scale smaller. # If using per-tensor quantization, discard per-channel information. if not per_channel: overflow_positions = np.array(True) # Adjust intrinsic shift to make the bias's quantization scale smaller. if use_int15: try_increase_intrinsic_shift(backtracking_parameters, overflow_positions) # If it could not be adjusted, then adjust weight scale. backtracking_parameters.weight_adjustment += overflow_positions.astype(int) raise _Retry() # Quantization is successful. Convert to int32. return np.clip(scaled_bias, int32_iinfo.min, int32_iinfo.max).round().astype(np.int32)
[docs] def quantized_product_zero_value(q_weight: np.ndarray, zero_point: int, intrinsic_shift: Union[np.ndarray, int]) -> np.ndarray: """ Calculate the result of quantized generalized matrix multiply when the input is filled with the zero point value. This represents the zero point result, which should be subtracted to get the true product. :param q_weight: Quantized weight tensor :param zero_point: Zero point of input tensor :param intrinsic_shift: Right-shift that is performed by the convolution algorithm. :return: Convolution result as a 1D tensor """ # Sum all axes except the output channels q_weight = reshape_weight_to_output_channels(q_weight) axes = tuple(range(len(q_weight.shape) - 1)) if isinstance(intrinsic_shift, np.ndarray): intrinsic_shift = intrinsic_shift.astype(np.int32) else: assert isinstance(intrinsic_shift, int), f'Expected integer data type, got {type(intrinsic_shift)}' return np.round(zero_point * np.sum(q_weight, axis=axes, dtype=np.int64) / (1 << intrinsic_shift))
[docs] def output_zp_correction_in_bias(precision: ConvolutionPrecision, output_quant: Quantization, requantization: BaseRequantization[np.ndarray]) -> int: """ Calculate the zero point correction to add to the convolution or matrix multiply's bias array so that the output has the desired quantization. If the convolution will not combine zero point correction with bias, but instead will do two separate additions, then the result is 0. Otherwise, the result is the output's zero point, scaled based on the requantization. :param precision: Convolution precision type :param output_quant: Quantization of convolution's output :param requantization: Requantization that is performed at the end of convolution :return: Zero point correction that should be added to the bias array """ if precision.has_zp_correction(): return 0 assert precision.is_arith_folded() assert isinstance(requantization, ArithFoldedRequantization) return output_quant.zero_point << requantization.narrowing.shift
def _try_quantize_convolution_parameters( backtracking_parameters: ConvBacktrackingParameters, weight: np.ndarray, bias: Optional[np.ndarray], input_quant: Quantization, output_distribution: ObservedDistribution, *, per_channel: bool, bias_corrector: BiasCorrector, asymmetry: bool, use_int15: bool, allow_full_output_precision: bool, error_reporter: NodeReporter ) -> Tuple[np.ndarray, np.ndarray, BaseRequantization[np.ndarray], ScalarType, Quantization, bool]: """ Select quantized parameters for convolution or matrix multiply. This function may modify backtracking_parameters and raise _Retry to backtrack. See _quantize_convolution_parameters_main for documentation of parameter and return values. """ if np.any(backtracking_parameters.weight_adjustment > 0): error_reporter.warn("Precision of weights was reduced to avoid numeric saturation. " "Saturation was detected in the bias term.") # Initial choice of quantization parameters. # Chosen parameters may be modified below. quantizations = select_convolution_scales( weight, input_quant, output_distribution, precision=backtracking_parameters.precision, asymmetry=asymmetry, per_channel=per_channel ) assert quantizations.intrinsic_shift == 0 if use_int15: # Select low or high value for intrinsic_shift intrinsic_shifts = np.where(backtracking_parameters.intrinsic_shift_adjustment, INTRINSIC_SHIFT_HI, INTRINSIC_SHIFT_LO) else: # Intrinsic shift is 0 and can't be changed assert not np.any(backtracking_parameters.intrinsic_shift_adjustment) intrinsic_shifts = np.full(backtracking_parameters.intrinsic_shift_adjustment.shape, 0) # Modify quantizations using values from backtracking_parameters quantizations.set_intrinsic_shift(intrinsic_shifts) quantizations.scale_requant_pow2(intrinsic_shifts) quantizations.scale_weight_pow2(-backtracking_parameters.weight_adjustment) quantizations.scale_requant_pow2(backtracking_parameters.weight_adjustment) adjust_plan_zero_weights(weight, quantizations, per_channel, error_reporter) try_adjust_plan_shift_value(backtracking_parameters, quantizations, use_int15, error_reporter) try_adjust_plan_product_value(backtracking_parameters, quantizations, use_int15, error_reporter) weight_scale, bias_scale, requantization, output_type, output_quant = quantize_convolution_scales( quantizations, backtracking_parameters.precision, allow_full_output_precision ) q_weight, fakequantized_weight = quantize_weight_tensor(weight, weight_scale) quant_bias_correction_term = bias_corrector.calculate(weight, fakequantized_weight) corrected_bias: Optional[np.ndarray] if bias is not None and quant_bias_correction_term is not None: corrected_bias = bias + quant_bias_correction_term else: corrected_bias = bias # Bias is a sum of up to 4 terms: input zp, output zp, bias and bias correction. # This calculation is hard to directly normalize for int32, so # we quantize it and retry if overflow occurs. input_zp = quantized_product_zero_value(q_weight, input_quant.zero_point, quantizations.intrinsic_shift) output_zp = output_zp_correction_in_bias(backtracking_parameters.precision, output_quant, requantization) zp_correction = output_zp - input_zp q_bias = try_quantize_bias_tensor(backtracking_parameters, corrected_bias, zp_correction, bias_scale, use_int15, per_channel) # Found a valid quantization, but the backend may not be able to handle it. # Check the relu workaround condition. if backtracking_parameters.relu_fallback_precision is not None: assert isinstance(requantization, ArithFoldedRequantization) min_int = np.iinfo(requantization.narrowing.out_dtype).min # Backend can handle this quantization if # the output zero point is equal to min_int (relu will be removed), # or it is equal to 0, or the right-shift is a scalar value (`zero_point << right_shift` is a scalar). ok_for_relu = (output_quant.zero_point == min_int or output_quant.zero_point == 0 or not isinstance(requantization.narrowing.shift, np.ndarray)) if not ok_for_relu: # Change the precision to make it compatible with relu backtracking_parameters.precision = backtracking_parameters.relu_fallback_precision backtracking_parameters.relu_fallback_precision = None # Reset these adjustments so that they will be recalculated for the new precision n_channels = weight.shape[-2] * weight.shape[-1] backtracking_parameters.intrinsic_shift_adjustment = \ ConvBacktrackingParameters.default_intrinsic_shift_adjustment(n_channels, per_channel, use_int15) backtracking_parameters.weight_adjustment = \ ConvBacktrackingParameters.default_weight_adjustment(n_channels, per_channel) raise _Retry() # Quantization has been chosen. Create return values for the algorithm. msb_left_shift = backtracking_parameters.intrinsic_shift_adjustment if use_int15: msb_left_shift = np.logical_not(msb_left_shift) else: assert not np.any(msb_left_shift) if msb_left_shift.shape == (): msb_left_shift = bool(msb_left_shift.item()) return q_weight, q_bias, requantization, output_type, output_quant, msb_left_shift
[docs] def quantize_convolution_parameters( input_quant: Quantization, output_distribution: ObservedDistribution, weight: np.ndarray, bias: Optional[np.ndarray], *, per_channel: bool, bias_corrector: BiasCorrector, asymmetry: bool, use_int15: bool, use_sima_relu_workaround: bool, precision: ConvolutionPrecision, allow_full_output_precision: bool, error_reporter: Optional[NodeReporter] = None ) -> Tuple[np.ndarray, np.ndarray, BaseRequantization[np.ndarray], ScalarType, Quantization, bool]: """ Select quantized parameters for convolution or matrix multiply. :param input_quant: Quantization that was selected for the input of convolution. :param output_distribution: Value distribution of the output of convolution. :param weight: Weight tensor. :param bias: A bias tensor. If it is None, a bias tensor will still be returned containing the bias correction that was introduced by quantization. :param per_channel: Whether to do per-channel quantization. If true, the scale will be a tensor with one value per channel. :param bias_corrector: How to calculate a bias correction term. :param use_int15: Whether to quantize for the int15 convolution algorithm. If false, quantize for the int8 convolution algorithm. :param use_sima_relu_workaround: Whether to use a workaround for int8 SiMa quantization with relu activation. If True, and relu cannot be executed by the backend, then use TFLite quantization. This parameter is only relevant when precision is sima_int8 or sima_int16, and it must be False otherwise. :param precision: The precision to use for quantizing convolution output. :param allow_full_output_precision: Whether 16-bit precision can be widened to 32-bit output. If false, quantizing with 16-bit precision will always produce 16-bit output. :param error_reporter: Used for warnings about bad quantization. :return: A tuple containing the chosen quantization-related parameters: the quantized weight tensor, the quantized bias tensor, the requantization, the scalar type of the output, the quantization of the output, and the msb_left_shift flag value. """ # Require 5D weight tensor for quantizing 2D convolution, and 6D weight tensor for 3D convolution. assert len(weight.shape) == 5 or len(weight.shape) == 6 n_channels = weight.shape[-2] * weight.shape[-1] if bias is not None: assert len(bias.shape) == 1 assert bias.shape[0] == n_channels # Choose a fallback precision to work around the relu limitation, if needed if use_sima_relu_workaround: if precision == ConvolutionPrecision.sima_int8: fallback_precision = ConvolutionPrecision.restricted_tflite_int8 elif precision == ConvolutionPrecision.sima_int16: fallback_precision = ConvolutionPrecision.restricted_tflite_int16 else: raise ValueError("use_sima_relu_workaround must be False when precision is " + str(precision)) else: fallback_precision = None backtracking_parameters = ConvBacktrackingParameters( precision=precision, relu_fallback_precision=fallback_precision, intrinsic_shift_adjustment=ConvBacktrackingParameters.default_intrinsic_shift_adjustment(n_channels, per_channel, use_int15), weight_adjustment=ConvBacktrackingParameters.default_weight_adjustment(n_channels, per_channel) ) # The maximum number of tries is chosen so the algorithm will never fail. # It always runs once at the beginning. # It may retry and change precision once. # It may retry and change weight 16 times (8 times, doubled because this is reset when precision changes). max_tries = 18 return run_backtracking_loop( lambda local_error_reporter: _try_quantize_convolution_parameters( backtracking_parameters, weight, bias, input_quant, output_distribution, per_channel=per_channel, bias_corrector=bias_corrector, asymmetry=asymmetry, use_int15=use_int15, allow_full_output_precision=allow_full_output_precision, error_reporter=local_error_reporter ), max_tries, "Failed to quantize convolution", error_reporter )
def _preprocess_weight_tensor(is_dense: bool, kernel_layout: Optional[str], weight: np.ndarray) -> np.ndarray: """ Reshape a weight tensor so that the last dimension is the output channel dimension. Save shape information so the tensor can be reshaped back to the original layout. :param is_dense: Whether the weight tensor is for a dense matrix multiply :param kernel_layout: Layout of the weight tensor, if not dense :param weight: The weight tensor :return: Shape information and reshaped weight tensor """ if not is_dense: kernel_layout = cast(str, kernel_layout) mla_layout = AwesomeConvWeightLayout if len(kernel_layout) == 4 else AwesomeConvWeightLayout5D weight = transpose_tensor_according_to_layout_strings(weight, kernel_layout, mla_layout) return weight def _postprocess_weight_tensor(is_dense: bool, kernel_layout: Optional[str], weight: np.ndarray) -> np.ndarray: """ Restore a weight tensor to the original shape by doing the reverse of _preprocess_weight_tensor. :param is_dense: Whether the weight tensor is for a dense matrix multiply :param kernel_layout: Layout of the original weight tensor, if not dense :param weight: The preprocessed weight tensor :return: Weight tensor converted to original shape """ if not is_dense: # Convert quantized weight shape from AwesomeConvWeightLayout to original layout. kernel_layout = cast(str, kernel_layout) mla_layout = AwesomeConvWeightLayout if len(kernel_layout) == 4 else AwesomeConvWeightLayout5D weight = transpose_tensor_according_to_layout_strings(weight, mla_layout, kernel_layout) return weight
[docs] def get_bfloat16_with_int_weights_quant_params( attrs: ConvAddActivationAttrs, per_channel: bool, bits: int ) -> tuple[np.ndarray,np.ndarray | None, BaseRequantization]: """ Get quantized weights and bias if present and requantization. Weights are quantized to int8 or int4 and bias if present is unquantized, this allows the requantization scale factor to be just 1/weight_scale as requantization is done after adding bias. :param attrs: Weights. :param per_channel: Whether per-channel quantization scheme is used for weights. :param bits: Number of bits to be used. :return: Quantized weights, Optional(quantized bias) and requantization. """ weights = attrs.weights_attrs.data bias = attrs.bias_attrs.data if attrs.bias_attrs is not None else None # Quantize weights weight_scale = weight_quantization_scale(weights, per_channel, bits=bits) quantized_weight, _ = quantize_weight_tensor(weights, weight_scale, bits=bits) # Replace 0 with 1 in weight scale to avoid division by zero weight_scale = np.where(weight_scale == 0, 1, weight_scale) scale_factor = np.array(1 / weight_scale, np.float32) requant = requantization.Renormalization(scale_factor, Narrowing(0, RoundType.TOEVEN, bfloat16)) return quantized_weight, bias, requant