Source code for afe.ir.operation_functions

#########################################################
# Copyright (C) 2020 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Joey Chou
#########################################################
import enum
import functools

import dataclasses
import math
import numpy as np
import tensorflow as tf
from typing import Tuple, Optional, Union, List, Dict, Callable, Sequence

import afe.ir.quantization_utils
from afe._tvm._defines import TVMGraphModule
import afe.ir.attributes as _afe_attrs
from afe.ir.attributes import ConvAttrs, convolution_output_shape
from afe.ir.defines import (
    AwesomeConvWeightLayout5D, Float, InputShape, AwesomePad2D,
    AwesomeConvWeightLayout, AwesomeDepthwiseConvWeightLayout, AwesomeDataLayout,
    AwesomeDataLayout5D, QuantizedTensor, AwesomeStrides, AwesomeDilation, AwesomeStrides3D,
    AwesomeDilation3D, AwesomePad3D, AwesomePoolSize, InputName
)
from afe.ir.quantization_utils import round_op, QNNDtype, DTYPE_BOUNDS
import afe.ir.utils as utils
from afe.ir.tensor_type import scalar_type_to_dtype, ScalarType, TensorType

from dataclasses import dataclass
from enum import Enum

from ml_kernels.math_helpers import RoundType, ArgMinMaxOp, Activation, bfloat16

from ev_transforms import transforms as _ev_transforms
from ml_kernels.requantization import BaseRequantization, Renormalization, get_id_requantization, \
    is_identity_requantization, FloatRequantization, is_renormalization
import ml_kernels.np_operators
import ml_kernels.math_helpers


@dataclass
[docs] class RunMode(Enum): """ Supported run modes. MLA_MODE : use an implementation that exactly matches execution on the MLA. FAST_MODE : use a fast execution implementation """
[docs] MLA_MODE = 1
[docs] FAST_MODE = 2
[docs] FAST_MODE_MODEL_SDK = 3
[docs] def is_fast_mode(self): return self.value == RunMode.FAST_MODE.value
""" Network functions that are executed within AwesomeOperations While SiMa.ir (AwesomeAttributes) essentially inherit from the attributes of the operations in TVM, not all of these attributes are necessarily fed into the functions SiMa uses for network inference. The AwesomeOperations are a useful layer of abstraction. For methods like run and run_quant, AwesomeOperations allow the user to simply pass in input dictionaries and AwesomeAttributes then return the output tensors. Internally these methods perform preprocessing on the inputs and AwesomeAttributes before passing them to the actual functions. An added benefit of defining these operations as class attributes external from AwesomeOperators is that they can be readily swapped out for operations with the same function signature (useful for when we define our own suite of numpy operations). """ ############################ # Quantized Kernel functions ############################ def _avgpool_kernel( data: np.ndarray, pool_size: tuple[int, ...], strides: tuple[int, ...], paddings: tuple[int, ...], pad_value: int = 0, requant: Optional[BaseRequantization] = None, rounding_type: RoundType = RoundType.TRUNC, mode: RunMode = RunMode.MLA_MODE, ) -> np.ndarray: """ Execute a quantized 3D average pool using the backend provided kernel. The input data layout mush be in NDHWC and the output will have the same NDHWC layout. Note: Don't support dilation. Parameters ---------- :param data: np.ndarray. Input data with NHWC layout. :param pool_size: Tuple[int, int]. Pooling window size in a Tuple format. The order of the dimension must be in [height, width]. :param strides: Tuple[int, int]. Pooling window strides in a Tuple format. The order of the dimension must be in [height, width]. :param paddings: Tuple[int, int, int, int]. Padding for each side of height and width. The order is in: [pad_top, pad_bottom, pad_left, pad_right] :param pad_value: int. Defualt is 0. The value of paddings. :param rounding_type: RoundType. Rounding method of requantization. Return ------ :return: np.ndarray. 3D Average pool output in NDHWC data layout. """ from ml_kernels.np_operators import ideal_pool, pool_requantization op = 'average' # TODO: Remove astype() casts once all kernels use consistent array formats and set correct formats requantization = pool_requantization(pool_size, op, rounding_type=rounding_type) if requant is None\ else requant outputs: list[np.ndarray] = list() for idx in range(data.shape[0]): output = ideal_pool( ifm=data[idx], kernel_size=pool_size, stride=strides, padding=paddings, op=op, requantization=requantization, use_tf=mode.is_fast_mode() ) outputs.append(np.expand_dims(output, axis=0)) res = np.concatenate(outputs) return res def _not_used_avgpool3d_kernel(data: np.ndarray, pool_size: Tuple[int, int, int], strides: Tuple[int, int, int], paddings: Tuple[int, int, int, int, int, int], pad_value: int = 0, rounding_type: RoundType = RoundType.TRUNC, mode: RunMode = RunMode.MLA_MODE, ) -> np.ndarray: """ Execute a quantized 3D average pool using the backend provided kernel. The input data layout mush be in NDHWC and the output will have the same NDHWC layout. Note: Don't support dilation. Parameters ---------- :param data: np.ndarray. Input data with NDHWC layout. :param pool_size: Tuple[int, int, int]. Pooling window size in a Tuple format. The order of the dimension must be in [depth, height, width]. :param strides: Tuple[int, int, int]. Pooling window strides in a Tuple format. The order of the dimension must be in [depth, height, width]. :param paddings: Tuple[int, int, int, int, int, int]. Padding for each side of depth, height and width. The order is in: [pad_depth_front, pad_depth_back, pad_top, pad_bottom, pad_left, pad_right] :param pad_value: int. Default is 0. The value of paddings. :param rounding_type: RoundType. Rounding method of requantization. Return ------ :return: np.ndarray. 3D Average pool output in NDHWC data layout. """ # Pad the data along the depth dimension depth_paddings = ([(0, 0) for _ in range(5)]) # NDHWC depth_paddings[1] = (paddings[0:2]) padded_data = np.pad(data, depth_paddings, 'constant', constant_values=pad_value) input_batch, input_d, input_h, input_w, input_c = padded_data.shape pool_d, pool_h, pool_w = pool_size stride_d, stride_h, stride_w = strides # Lower the depth dimension to the height dimension new_pool_h = pool_d * pool_h # Modify the stride along the height dimension to make sure the pooling # can jump to the right height dimension after lowering depth dimension # into the height dimension. new_stride_h = stride_h * pool_d # Swap the depth and height dimension before lowering the depth dimension into # the height dimension to make sure the new_stride_h can slice out the correct # partial tensor. transposed_data = padded_data.transpose((0, 2, 1, 3, 4)) # NHDWC output = [] for start_d in range(0, input_d - pool_d + 1, stride_d): end_d = start_d + pool_d # Slice out the target depth dimension. The transposed_data # has NHDWC data layout partial_data = transposed_data[:, :, start_d:end_d, :, :] # Lower the depth dimension into the height dimension and modify the stride # along the height dimension to accommodate the depth dimension. partial_data = partial_data.reshape((input_batch, pool_d * input_h, input_w, input_c)) partial_output = _avgpool_kernel( partial_data, (new_pool_h, pool_w), (new_stride_h, stride_w), paddings=paddings[2:], rounding_type=rounding_type, mode=mode) output.append(partial_output.astype(data.dtype)) # Concatenate along depth dimension using np.array(output). return np.array(output).transpose((1, 0, 2, 3, 4)) ################ # Functions APIs ################
[docs] def placeholder(data: np.ndarray) -> np.ndarray: return data
[docs] def constant(data: np.ndarray) -> np.ndarray: return data
def _depthwise_conv3d(data: np.ndarray, weight: np.ndarray, strides: AwesomeStrides3D, dilations: AwesomeDilation3D, padding: Optional[AwesomePad3D] = None, pad_value: Union[float, int] = 0) -> np.ndarray: """ Support both depthwise_conv3d and depthwise_conv3d with channel_multiplier > 1. """ if padding is not None: # Pad input with constant values data = np.pad(data, padding, 'constant', constant_values=pad_value) # DEBUG(Joey): Find a way to do int/int32 depthwise_conv3d to see the accuracy difference. return tf.nn.depthwise_conv3d(input=data.astype(Float), # TF does not support int/int32 dtype filter=weight.astype(Float), # TF does not support int/int32 dtype strides=strides, padding='VALID', data_format=AwesomeDataLayout5D, dilations=dilations).numpy() def _group_conv2d(data: np.ndarray, weight: np.ndarray, groups: int, strides: AwesomeStrides, dilations: AwesomeDilation, padding: Optional[AwesomePad2D] = None, pad_value: Union[float, int] = 0) -> np.ndarray: if padding is not None: # Pad input with constant values data = np.pad(data, padding, 'constant', constant_values=pad_value) # Split weight along output channel dimension weight_k_axis = AwesomeConvWeightLayout.index("O") weight_list = np.split(weight, indices_or_sections=groups, axis=weight_k_axis) # Split data along input channel dimension data_c_axis = AwesomeDataLayout.index("C") data_list = np.split(data, indices_or_sections=groups, axis=data_c_axis) # Outputs output_list = [] for _data, _weight in zip(data_list, weight_list): output_list.append( tf.nn.conv2d(input=_data.astype(Float), filters=_weight.astype(Float), strides=strides, padding='VALID', data_format=AwesomeDataLayout, dilations=dilations).numpy()) # Concatenate outputs along the channel dimension output = np.concatenate(output_list, axis=data_c_axis) return output def _normal_conv2d(data: np.ndarray, weight: np.ndarray, strides: AwesomeStrides, dilations: AwesomeDilation, padding: Optional[AwesomePad2D] = None, pad_value: Union[float, int] = 0) -> np.ndarray: if padding is not None: # Pad input with constant values data = np.pad(data, padding, 'constant', constant_values=pad_value) return tf.nn.conv2d(input=data.astype(Float), filters=weight.astype(Float), strides=strides, padding='VALID', data_format=AwesomeDataLayout, dilations=dilations).numpy() def _normal_conv3d(data: np.ndarray, weight: np.ndarray, strides: AwesomeStrides, dilations: AwesomeDilation, padding: Optional[AwesomePad3D] = None, pad_value: Union[float, int] = 0) -> np.ndarray: if padding is not None: # Pad input with constant values data = np.pad(data, padding, 'constant', constant_values=pad_value) return tf.nn.conv3d(input=data.astype(Float), filters=weight.astype(Float), strides=strides, padding='VALID', data_format=AwesomeDataLayout5D, dilations=dilations).numpy() def _map_batch(f: Callable[[np.array], np.array], a: np.array) -> np.array: """ Apply f elementwise over the first dimension of a and collect results into a new array. This is analogous to map over lists, where the first dimension of the input and output array play the role of the input and output list. :param f: Function to apply. Its input shape is a.shape[1:] and its output shape is _map_batch(f, a).shape[1:]. :param a: Array to transform. :return: Transformed array r, where r[i] = f(a[i]) for each i. """ r_list = [] for a_slice in a: r_list.append(f(a_slice)) return np.stack(r_list, axis=0)
[docs] def float_convolution(attrs: _afe_attrs.ConvAddActivationAttrs, data: np.ndarray, mode: RunMode) -> np.ndarray: """ Execute a floating-point convolution using an algorithm from ml_kernels. :param attrs: Attributes of the convolution operator :param data: Input activation data in NHWC layout :param mode: Mode of execution :return: Convolved result """ assert attrs.conv_attrs.num_spatial_dimensions <= 3, "Only 2D/3D convolution is implemented" return _ml_kernels_convolution(attrs.conv_attrs, attrs.weights_attrs.data, attrs.bias_attrs.data if attrs.bias_attrs is not None else None, attrs.activ_attrs, get_id_requantization(data.dtype), False, 0, 0, data, mode)
[docs] def quantized_convolution(attrs: _afe_attrs.ConvQuantAttrs, data: np.ndarray, mode: RunMode) -> np.ndarray: """ Execute a quantized convolution using an algorithm from ml_kernels. :param attrs: Attributes of the convolution operator :param data: Input activation data in NHWC layout :param mode: Mode of execution :return: Convolved result """ assert attrs.conv_attrs.num_spatial_dimensions <= 3, "Only 2D/3D convolution is implemented" return _ml_kernels_convolution(attrs.conv_attrs, attrs.weight_quant_data, attrs.bias_quant_data, attrs.activ_attrs, attrs.requant, attrs.msb_left_shift, attrs.input_zp, attrs.zero_point, data, mode)
@dataclass class _GeneralMLKernelsConvolutionParameters: """ Parameters used by the convolution functions in the ml_kernels package. These parameters are in the format expected by ML Kernels, which may be different from how they are stored in Conv2DQuantAttrs. :param data_shape: Shape of the input feature map in HWC layout :param weight: Weight tensor in OIW, OIHW, or OIDHW layout :param bias: Bias tensor or None :param strides: Strides in H and W dimensions :param padding: Padding on top, bottom, left, and right :param output_padding: Padding to apply to the output on top, bottom, left, and right. This padding is used for transposed convolution. It must be zero for other convolutions. :param dilations: Dilation in H and W dimensions :param int15_params: Parameters for the int15 convolution algorithm. It is used when the input feature map's scalar type is int16. :param activ: Activation function to compute as part of convolution. :param requant: Requantization to apply to the 32-bit result of convolution :param ifm_zero_point: Zero point of the quantized input :param ofm_zero_point: Zero point of the quantized output :param use_tf: Whether to call Tensorflow to compute the convolution """ ifm_shape: tuple[int, ...] weight: np.ndarray bias: np.ndarray | None strides: tuple[int, ...] padding: tuple[int, ...] output_padding: tuple[int, ...] dilations: tuple[int, ...] int15_params: ml_kernels.np_operators.ConvInt15Params activ: Activation requant: BaseRequantization[np.ndarray] ifm_zero_point: int ofm_zero_point: int use_tf: bool _ConvFunction = Callable[[np.ndarray], np.ndarray] def _make_conv3d_function(p: _GeneralMLKernelsConvolutionParameters, is_transposed: bool = False, output_shape: None | tuple[int] = None, groups: int = 1) -> _ConvFunction: def compute_conv3d(ifm): ifm = np.pad(ifm, [(p.padding[0], p.padding[1]), (p.padding[2], p.padding[3]), (p.padding[4], p.padding[5]), (0, 0)], 'constant', constant_values=p.ifm_zero_point) ifm = np.expand_dims(ifm, axis=0) # For tf.nn.conv3d, input data must be one of the following types: half, bfloat16, float32, float64 compute_type = np.float32 internal_compute_type = np.float64 conv_compute_type = np.float64 if p.int15_params is not None else compute_type if is_transposed: # Weight is MLA layout OIDHW, transpose to DHWOI for TF weight = p.weight.transpose((2, 3, 4, 0, 1)) ofm = tf.nn.conv3d_transpose(input=ifm.astype(conv_compute_type, copy=False), filters=weight.astype(conv_compute_type, copy=False), output_shape=output_shape, strides=(1,)+p.strides+(1,), padding="VALID", data_format="NDHWC", dilations=((1,)+p.dilations+(1,))).numpy().astype(conv_compute_type, copy=False)[0] elif groups > 1: assert p.output_padding == (0, 0, 0, 0, 0, 0) ofm = [] for g in range(groups): group_ifm = ifm[:, :, :, :, g:(g + 1)] # Weight is MLA layout OIDHW, slice and transpose to DHWIO for TF group_weight = p.weight[g:(g + 1)].transpose((2, 3, 4, 1, 0)) group_ofm = tf.nn.conv3d(input=group_ifm.astype(conv_compute_type, copy=False), filters=group_weight.astype(conv_compute_type, copy=False), strides=(1,)+p.strides+(1,), padding="VALID", data_format="NDHWC", dilations=((1,)+p.dilations+(1,))) \ .numpy().astype(conv_compute_type, copy=False)[0] ofm.append(group_ofm) ofm = np.concatenate(ofm, axis=3) else: assert p.output_padding == (0, 0, 0, 0, 0, 0) # Weight is MLA layout OIDHW, transpose to DHWIO for TF weight = p.weight.transpose((2, 3, 4, 1, 0)) ofm = tf.nn.conv3d(input=ifm.astype(conv_compute_type, copy=False), filters=weight.astype(conv_compute_type, copy=False), strides=(1,)+p.strides+(1,), padding="VALID", data_format="NDHWC", dilations=((1,)+p.dilations+(1,))).numpy().astype(conv_compute_type, copy=False)[0] if p.int15_params is not None: # Shift right and return to 32-bit precision if isinstance(p.int15_params.msb_left_shift, np.ndarray): int15_shift = np.array([-1 if s else -8 for s in p.int15_params.msb_left_shift]) else: int15_shift = -1 if p.int15_params.msb_left_shift else -8 ofm = np.ldexp(ofm, int15_shift).astype(compute_type, copy=False) if p.bias is not None: ofm = tf.math.add(ofm.astype(internal_compute_type, copy=False), p.bias.astype(internal_compute_type, copy=False)).numpy() ofm = ml_kernels.np_operators.normalize(ofm, compute_type) if ifm.dtype != bfloat16: ofm = ofm.astype(np.int32) ofm = ml_kernels.np_operators.requantize(ofm, p.requant) assert p.activ in (Activation.RELU, Activation.NONE), "Only support RELU activation for conv3d for now." if p.activ == Activation.RELU: ofm[np.where(ofm < p.ofm_zero_point)] = p.ofm_zero_point return ofm return compute_conv3d def _make_normal_convolution_function(p: _GeneralMLKernelsConvolutionParameters) -> _ConvFunction: assert all(pad == 0 for pad in p.output_padding) def compute_convolution(ifm): return ml_kernels.np_operators.ideal_convolution( ifm, p.weight, p.bias, p.strides, p.padding, p.dilations, p.requant, activ=p.activ, ifm_zero_point=p.ifm_zero_point, ofm_zero_point=p.ofm_zero_point, int15_params=p.int15_params, use_tf=p.use_tf ) return compute_convolution def _make_group_convolution_function( p: _GeneralMLKernelsConvolutionParameters, groups: int ) -> _ConvFunction: assert all(pad == 0 for pad in p.output_padding) def compute_convolution(ifm): return ml_kernels.np_operators.ideal_group_convolution( ifm, p.weight, p.bias, p.strides, p.padding, p.dilations, p.requant, activ=p.activ, ifm_zero_point=p.ifm_zero_point, ofm_zero_point=p.ofm_zero_point, groups=groups, int15_params=p.int15_params, use_tf=p.use_tf ) return compute_convolution def _make_depthwise_transposed_convolution_function( p: _GeneralMLKernelsConvolutionParameters, groups: int ) -> _ConvFunction: # Make a function that computes depthwise transposed convolution. # No such form of convolution is implemented in ml_kernels, so this function does # upscaling, padding, and depthwise conv2d. num_spatial_dims = len(p.ifm_shape) - 1 ofm_spatial_shape = tuple( (p.ifm_shape[i] - 1) * p.strides[i] + 1 for i in range(num_spatial_dims) ) ofm_shape = (*ofm_spatial_shape, p.ifm_shape[-1]) kernel_shape = p.weight.shape[2:] weight = np.rot90(p.weight, k=2, axes=(-2, -1)) # Calculate a padding parameter for np.pad assert all(x == 0 for x in p.output_padding[::2]) np_pad_shape = list() for i in range(num_spatial_dims): np_pad_shape.append( ( kernel_shape[i] - 1 - p.padding[2 * i], kernel_shape[i] - 1 - p.padding[2 * i + 1] + p.output_padding[2 * i + 1] ) ) np_pad_shape.append((0, 0)) def compute_convolution(ifm: np.ndarray): # Implement depthwise transposed conv2d using upscale, pad, and depthwise conv2d # Upscale pad_value = p.ifm_zero_point slices = [slice(None, None, p.strides[i]) for i in range(num_spatial_dims)] slices.append(slice(None)) data_ext = np.full(ofm_shape, pad_value, dtype=ifm.dtype) data_ext[tuple(slices)] = ifm # Apply padding data_ext = np.pad(data_ext, np_pad_shape, 'constant', constant_values=pad_value) # Convolution strides = (1, ) * num_spatial_dims padding = (0, ) * (num_spatial_dims * 2) return ml_kernels.np_operators.ideal_group_convolution( data_ext, weight, p.bias, strides, padding, p.dilations, p.requant, activ=p.activ, ifm_zero_point=p.ifm_zero_point, ofm_zero_point=p.ofm_zero_point, groups=groups, int15_params=p.int15_params, use_tf=p.use_tf ) return compute_convolution def _make_transposed_convolution_function( p: _GeneralMLKernelsConvolutionParameters, output_shape: None | tuple[int] = None ) -> _ConvFunction: assert all(x == 1 for x in p.dilations), "Transposed convolution only supports dilation equal to 1" assert all(x == 0 for x in p.output_padding[::2]), ( "Output padding on beginning of a spatial dimension is not supported" ) # Remove batch dimension from the computed output shape ofm_shape = output_shape[1:] def compute_convolution(ifm): return ml_kernels.np_operators.ideal_transpose_convolution( ifm, ofm_shape, p.weight, p.bias, p.strides, p.padding, p.requant, activ=p.activ, ifm_zero_point=p.ifm_zero_point, ofm_zero_point=p.ofm_zero_point, int15_params=p.int15_params, use_tf=p.use_tf ) return compute_convolution def _ml_kernels_convolution(conv_attrs: _afe_attrs.ConvAttrs, weight: np.ndarray, bias: Optional[np.ndarray], activ_attrs: Optional[Union[_afe_attrs.ACTIVATION_ATTRS, _afe_attrs.QUANT_ACTIVATION_ATTRS]], requant: BaseRequantization[np.ndarray], msb_left_shift: Union[bool, np.ndarray], input_zp: int, zero_point: int, data: np.ndarray, mode: RunMode) -> np.ndarray: """ Execute a convolution using an algorithm from ml_kernels. The function parameters other than "data" should be taken from the convolution's attributes. This function converts inputs and outputs as required to call ml_kernels. :param conv_attrs: Attributes of the convolution operator :param weight: Weight tensor :param bias: Optional bias tensor :param activ_attrs: Optional activation after convolution :param msb_left_shift: Has the same meaning as the msb_left_shift field of Conv2DQuantAttrs, if this is a convolution with int16 input. Ignored otherwise. :param input_zp: Zero point of quantized input. Ignored for floating-point. :param zero_point: Zero point of quantized output. Ignored for floating-point. :param data: Input activation data in NHWC or NDHWC layout. The type of array elements must be int8, int16, bfloat16, or bfloat32. This type determines what numerical precision to use in the convolution algorithm. :param mode: Mode of execution :return: Convolved result """ import afe.backends.mla.afe_to_n2a_compiler.n2a_compiler_utils as n2a_compiler_utils assert data.dtype in (np.int8, np.int16, bfloat16, np.float32) # Decide which convolution function in the ml_kernels package will be used make_conv_function: Callable[[_GeneralMLKernelsConvolutionParameters], _ConvFunction] if conv_attrs.is_transposed: if conv_attrs.groups == 1: # Normal transposed convolution output_shape = conv_attrs.output_shape make_conv_function = functools.partial( _make_transposed_convolution_function, output_shape=output_shape ) elif conv_attrs.is_depthwise_one_channel: # Depthwise transposed convolution make_conv_function = functools.partial( _make_depthwise_transposed_convolution_function, groups=conv_attrs.groups ) else: # Group transposed convolution raise NotImplementedError("group transposed convolution is not implemented") elif conv_attrs.groups == 1: # Normal convolution make_conv_function = _make_normal_convolution_function elif conv_attrs.groups > 1: # Group convolution make_conv_function = functools.partial( _make_group_convolution_function, groups=conv_attrs.groups ) else: raise RuntimeError(f"Unknown Conv2D with attributes {conv_attrs}") # Transpose tensors and attributes to satisfy ml_kernels requirements strides = conv_attrs.stride dilations = conv_attrs.dilation flattened_padding = sum(conv_attrs.padding, ()) flattened_output_padding = sum(conv_attrs.output_padding, ()) # Cast numeric types if data.dtype in (np.int8, np.int16): weight = weight.astype(np.int8, copy=False) assert bias is None or bias.dtype == np.int32 requant = afe.ir.quantization_utils.fix_requantization(requant) elif data.dtype == bfloat16: # For conv with bfloat16 input, bias is expected to be of float32 type. assert bias is None or bias.dtype == np.float32 if weight.dtype == np.float32: weight = weight.astype(bfloat16) assert is_identity_requantization(requant) else: weight = weight.astype(np.int8, copy=False) assert is_renormalization(requant) else: # np.float32 assert weight.dtype == np.float32 assert bias is None or bias.dtype == np.float32 assert is_identity_requantization(requant) weight = weight.reshape(weight.shape[:-2] + (-1,)) # Flatten the group and output dimensions tensor_layout = "HWIO" if len(weight.shape) == 4 else "DHWIO" mla_layout = "OIHW" if len(weight.shape) == 4 else "OIDHW" weight = utils.transpose_tensor_according_to_layout_strings(weight, tensor_layout, mla_layout) if data.dtype == np.int16: int15_params = ml_kernels.np_operators.ConvInt15Params(msb_left_shift=msb_left_shift) else: int15_params = None if bias is not None: assert len(bias.shape) == 1 if activ_attrs is None: activ = Activation.NONE elif isinstance(activ_attrs, _afe_attrs.ReluAttrs | _afe_attrs.ReluQuantAttrs): activ = Activation.RELU elif isinstance(activ_attrs, _afe_attrs.ClipAttrs | _afe_attrs.ClipQuantAttrs): # Clip will be processed after the main convolution algorithm activ = Activation.NONE else: raise TypeError("Unrecognized activation type") general_parameters = _GeneralMLKernelsConvolutionParameters( data.shape[1:], weight, bias, strides, flattened_padding, flattened_output_padding, dilations, int15_params, activ, requant, input_zp, zero_point, mode.is_fast_mode() ) ofm = _map_batch(make_conv_function(general_parameters), data) if isinstance(activ_attrs, _afe_attrs.ClipAttrs | _afe_attrs.ClipQuantAttrs): ofm = clip(activ_attrs, ofm) # Convert output to the expected layout return ofm def _expand_transposed_convolution_padding(attrs: _afe_attrs.ConvAttrs) -> tuple[tuple[int, int], ...]: """ Calculate how to pad the input tensor for calculating a transposed convolution using regular convolution. Args: attrs: Attributes of convolution Returns: Padding width to apply to the input tensor in each spatial dimension """ assert attrs.is_transposed ret = [] for kernel_size, padding, output_padding in zip(attrs.kernel_size, attrs.padding, attrs.output_padding): assert output_padding[0] == 0 pad_front = kernel_size - 1 - padding[0] pad_back = kernel_size - 1 - padding[1] + output_padding[1] ret.append((pad_front, pad_back)) return tuple(ret) def _expand_transposed_convolution_input( attrs: _afe_attrs.ConvAttrs, data: np.ndarray, ) -> np.ndarray: """ Expand data into a larger array for transposed convolution. Data is copied with a stride into a larger zero-filled array. Args: attrs: Attributes of the transposed convolution data: Input feature map """ assert attrs.is_transposed padding = _expand_transposed_convolution_padding(attrs) # Create array of zeros having the expanded size spatial_size = [(size - 1) * s + 1 + p[0] + p[1] for size, s, p in zip(attrs.input_spatial_shape, attrs.stride, padding)] data_ext = np.zeros((attrs.batch_size, *spatial_size, attrs.input_channels), dtype=np.float32) # Copy data into new input array. Padding determines the starting index. Stride determines the stride. spatial_index_pattern = [slice(p[0], p[0] + s * size, s) for size, s, p in zip(attrs.input_spatial_shape, attrs.stride, padding)] data_ext[(slice(None), *spatial_index_pattern, slice(None))] = data return data_ext
[docs] def conv_tensorflow(attrs: _afe_attrs.ConvAttrs, data: np.ndarray, weight: np.ndarray) -> np.ndarray: """ Compute a floating-point convolution by calling Tensorflow's convolution operator. This function may not exactly match MLA behavior. Args: attrs: Attributes of the convolution data: Input tensor weight: Weight tensor Returns: Convolved tensor """ assert data.shape == attrs.input_shape assert weight.shape == attrs.weight_shape if attrs.groups > 1: # Perform convolution on each group and concatenate the results data_groups = np.split(data, attrs.groups, axis=-1) weight_groups = np.split(weight, attrs.groups, axis=-2) group_attrs = dataclasses.replace(attrs, weight_shape=weight_groups[0].shape) return np.concatenate([conv_tensorflow(group_attrs, d, w) for d, w in zip(data_groups, weight_groups)], axis=-1) if attrs.is_transposed: # Convert transposed convolution to regular convolution assert all(d == 1 for d in attrs.dilation), \ "Transposed convolution with dilation greater than 1 is not supported" data = _expand_transposed_convolution_input(attrs, data) weight = np.flip(weight, axis=tuple(range(attrs.num_spatial_dimensions))) no_padding = ((0, 0),) * attrs.num_spatial_dimensions regular_attrs = dataclasses.replace(attrs, is_transposed=False, input_spatial_shape=data.shape[1:-1], stride=(1,) * attrs.num_spatial_dimensions, padding=no_padding, output_padding=no_padding) return conv_tensorflow(regular_attrs, data, weight) # Else, regular convolution weight = weight.squeeze(axis=-2) # Remove the groups axis weight = weight.astype(np.float32, copy=False) data = data.astype(np.float32, copy=False) if any(p[0] != 0 or p[1] != 0 for p in attrs.padding): numpy_padding = ((0, 0), *attrs.padding, (0, 0)) data = np.pad(data, numpy_padding) ret = tf.nn.convolution(data, weight, strides=attrs.stride, dilations=attrs.dilation) return np.array(ret, dtype=np.float32)
[docs] def add(lhs: np.ndarray, rhs: np.ndarray, axis: Optional[int] = None) -> np.ndarray: assert (lhs.dtype == rhs.dtype) and lhs.dtype in (np.float32, bfloat16, np.int32) if axis is not None: shape_len_minus_batch = len(lhs.shape) - 1 if axis < 0: axis = axis + len(lhs.shape) for _ in range(shape_len_minus_batch - axis): rhs = np.expand_dims(rhs, axis=-1) if lhs.dtype == np.int32: output = lhs.astype(np.int64) + rhs np.clip(output, np.iinfo(np.int32).min, np.iinfo(np.int32).max, out=output) else: output = lhs + rhs return output
[docs] def relu(data: np.ndarray, zp: int = 0) -> np.ndarray: if data.dtype == bfloat16: # Convert zp to bfloat16, as numpy will fail trying to convert it in np.where zp = bfloat16(zp) data = np.where(data < zp, zp, data) return data
[docs] def clip(attrs: _afe_attrs.ClipAttrs | _afe_attrs.ClipQuantAttrs, data: np.ndarray) -> np.ndarray: outputs: List[np.ndarray] = list() a_min = np.asarray(attrs.a_min).astype(np.float32) if isinstance(attrs.a_min, float) else attrs.a_min a_max = np.asarray(attrs.a_max).astype(np.float32) if isinstance(attrs.a_max, float) else attrs.a_max for idx in range(data.shape[0]): output = ml_kernels.np_operators.ideal_clip(data[idx], a_min, a_max) outputs.append(np.expand_dims(output, axis=0)) res = np.concatenate(outputs) return res
[docs] def prelu(data: np.ndarray, alpha: Union[np.ndarray, float, int], axis: Optional[int] = None, zp: int = 0) -> np.ndarray: assert data.dtype in (np.float32, np.int8, bfloat16) if axis is not None: data = utils.transpose_axis_to_the_last(data, axis) positive = relu(data, zp) negative = alpha * (data - positive) res = positive + negative if axis is not None: res = utils.transpose_axis_to_the_last(res, axis) if data.dtype == bfloat16: assert res.dtype == bfloat16 return res
[docs] def elu(data: np.ndarray): return tf.nn.elu(data).numpy()
[docs] def leaky_relu(data: np.ndarray, alpha: Union[float, int]) -> np.ndarray: assert data.dtype == np.float32 return prelu(data, alpha, axis=None).astype(np.float32)
[docs] def maxpool(attrs: _afe_attrs.PoolAttrs, data: np.ndarray, pad_value: Union[float, int], mode: RunMode = RunMode.MLA_MODE) -> np.ndarray: assert data.dtype in (np.float32, bfloat16, np.int8, np.int16) padding = attrs.padding # Explicit padding data = np.pad(data, padding, 'constant', constant_values=pad_value) # Pick an AwesomeDataLayout depending on the dimensionality awesome_data_layout = AwesomeDataLayout if len(attrs.layout) == 4 else AwesomeDataLayout5D assert len(attrs.layout) in (4, 5) # Transpose pool_size, strides, and data to accommodate tensorflow requirements pool_size = utils.transpose_attr_according_to_layout_strings(attrs.pool_size, attrs.layout, awesome_data_layout) strides = utils.transpose_attr_according_to_layout_strings(attrs.strides, attrs.layout, awesome_data_layout) data = utils.transpose_tensor_according_to_layout_strings(data, attrs.layout, awesome_data_layout) assert pool_size[0] == pool_size[-1] == 1 assert strides[0] == strides[-1] == 1 min_value = ml_kernels.math_helpers.get_dtype_min(data.dtype) outputs = [] for idx in range(data.shape[0]): outputs.append( ml_kernels.np_operators.ideal_pool( data[idx, ...], pool_size[1:-1], strides[1:-1], (0, ) * (data.ndim - 2) * 2, "max", get_id_requantization(data.dtype), pad_value=min_value, use_tf=mode.is_fast_mode() ) ) output = np.stack(outputs) # When attrs.out_layout is an empty string we keep it same as attrs.layout. output_layout = attrs.out_layout if attrs.out_layout else attrs.layout # Transpose data according to out_layout output = utils.transpose_tensor_according_to_layout_strings(output, awesome_data_layout, output_layout) return output.astype(data.dtype, copy=False)
def _avgpool(data: np.ndarray, data_layout: str, pool_size: AwesomePoolSize, strides: AwesomeStrides, out_layout: str, out_type: ScalarType, quantized: bool = False, rounding_type: Optional[RoundType] = None, requant: Optional[BaseRequantization] = None, mode: RunMode = RunMode.MLA_MODE) -> np.ndarray: """ Running avgpool on the given data """ # Pick an AwesomeDataLayout depending on the dimensionality awesome_data_layout = AwesomeDataLayout if len(data_layout) == 4 else AwesomeDataLayout5D assert len(data_layout) in (4, 5) # Required by ideal_pool # Transpose pool_size, strides, and data to accommodate awesome_data_layout pool_size = utils.transpose_attr_according_to_layout_strings(pool_size, data_layout, awesome_data_layout) strides = utils.transpose_attr_according_to_layout_strings(strides, data_layout, awesome_data_layout) data = utils.transpose_tensor_according_to_layout_strings(data, data_layout, awesome_data_layout) assert pool_size[0] == pool_size[-1] == 1 assert strides[0] == strides[-1] == 1 paddings = (0, ) * (data.ndim - 2) * 2 if quantized: assert data.dtype == np.int8 or data.dtype == np.int16 # Using quantized pooling kernels output = _avgpool_kernel( data, pool_size[1:-1], strides[1:-1], paddings=paddings, rounding_type=rounding_type, requant=requant, mode=mode ) else: if data.dtype == np.float32: # Make sure the result exactly matches ONNX's result by calling the tensorflow library output = tf.nn.avg_pool(data, pool_size, strides, 'VALID', awesome_data_layout).numpy() else: # Put the scaling factor for averaging into a Renormalization renormalization = Renormalization( np.float32(1.0) / math.prod(pool_size[1:-1]), utils.create_and_verify_narrowing(0, RoundType.TOEVEN, out_type.numpy_type()) ) outputs = [] for idx in range(data.shape[0]): outputs.append(ml_kernels.np_operators.ideal_pool( data[idx, ...], tuple(pool_size[1:-1]), strides[1:-1], paddings, "average", renormalization, pad_value=0, use_tf=mode.is_fast_mode()) ) output = np.stack(outputs) # Transpose data back to original layout output = utils.transpose_tensor_according_to_layout_strings(output, awesome_data_layout, out_layout) return output
[docs] def avgpool(attrs: _afe_attrs.PoolAttrs, data: np.ndarray, pad_value: Union[float, int], quantized: bool = False, rounding_type: Optional[RoundType] = None, requant: Optional[BaseRequantization] = None, mode: RunMode = RunMode.MLA_MODE, ) -> np.ndarray: # Explicit padding padding = attrs.padding data = np.pad(data, padding, 'constant', constant_values=pad_value) out_layout = attrs.out_layout if attrs.out_layout else attrs.layout return _avgpool(data, attrs.layout, attrs.pool_size, attrs.strides, out_layout, attrs.scalar_type, quantized=quantized, rounding_type=rounding_type, requant=requant, mode=mode)
[docs] def variance(data: np.ndarray, mean: np.ndarray, requant: BaseRequantization | None = None, requant_var: BaseRequantization | None = None): from ml_kernels.np_operators import ideal_variance if data.dtype == np.float32 or data.dtype == bfloat16: divisor = np.float32(1.0) / math.prod(data.shape[1:-1]) requant = Renormalization( divisor, utils.create_and_verify_narrowing(0, RoundType.TOEVEN, data.dtype) ) assert data.shape[0] == mean.shape[0] kernel_size = data.shape[1:-1] stride = (1, ) * len(kernel_size) padding = (0, ) * len(kernel_size) * 2 outputs = [ np.expand_dims( ideal_variance( ifm=data[i], mean=mean[i], kernel_size=kernel_size, stride=stride, padding=padding, requantization=requant, requant_var=requant_var ), axis=0 ) for i in range(data.shape[0]) ] out = np.concatenate(outputs, axis=0) return out
def _get_pool_size_from_shape(input_shape: InputShape, layout: str) -> AwesomePoolSize: """Use the input data shape to create a pooling window size""" c_dim = layout.index("C") pool_size = list(input_shape) pool_size[c_dim] = 1 return tuple(pool_size)
[docs] def adaptive_avgpool2d(attrs: _afe_attrs.AdaptiveAvgPool2DAttrs, data: np.ndarray, quantized: bool = False, rounding_type: Optional[RoundType] = None, *args, **kwargs) -> np.ndarray: pool_size = _get_pool_size_from_shape(data.shape, attrs.layout) strides = (1, 1, 1, 1) out_layout = attrs.out_layout if attrs.out_layout else attrs.layout output = _avgpool(data, attrs.layout, pool_size, strides=strides, out_layout=out_layout, out_type=ScalarType.float32, quantized=quantized, rounding_type=rounding_type, mode=RunMode.MLA_MODE) return output
[docs] def broadcast_to(attrs: Union[_afe_attrs.BroadcastToAttrs, _afe_attrs.BroadcastToQuantAttrs], data: np.ndarray): return np.copy(np.broadcast_to(data, attrs.output_shape))
[docs] def multiply(lhs: np.ndarray, rhs: np.ndarray) -> np.ndarray: """Floating-point multiplication.""" assert lhs.dtype == rhs.dtype and lhs.dtype in (bfloat16, np.float32) return tf.math.multiply(lhs, rhs).numpy()
[docs] def quantized_multiply(attrs: _afe_attrs.MultiplyQuantAttrs, lhs: np.ndarray, rhs: np.ndarray) -> np.ndarray: """Quantized multiplication.""" from ml_kernels.np_operators import ideal_mul # Process the batch dimension with a loop. ideal_mul does not handle the batch dimension. lhs, rhs = np.broadcast_arrays(lhs, rhs) products = [] for i in range(lhs.shape[0]): p = ideal_mul(lhs[i, ...], rhs[i, ...], attrs.lhs_zero_point, attrs.rhs_zero_point, attrs.requant, intrinsic_shift=attrs.intrinsic_shift) products.append(p) return np.stack(products)
[docs] def pad(attrs: _afe_attrs.PadAttrs, data: np.ndarray, pad_value: np.ndarray) -> np.ndarray: pad_value_scalar = np.ndarray.item(pad_value) assert data.dtype == np.float32 output = tf.pad(data, attrs.pad_width, mode='CONSTANT', constant_values=pad_value_scalar).numpy() return output.astype(data.dtype)
[docs] def mean(attrs: _afe_attrs.MeanAttrs, data: np.ndarray, quantized=False) -> np.ndarray: """ When in quantized set to True, using avg_pool2d to do mean along axis = * (1) * (2) * (1, 2) Parameters ---------- :param attrs: MeanAttrs. Attributes needed to execute the mean operation. :param data: np.ndarray. Input data to the mean operation. :param quantized: bool. Default is False. Set to True if the mean operation is executed in a quantization domain. Return ------ :return: np.ndarray. Result of the mean operation. """ # TODO: Adding mean operator in ml_kernels axis = attrs.axis if attrs.exclude: axis = utils.exclude_axes(len(data.shape), axis) if ( quantized and 0 not in axis and data.ndim in (4, 5) and data.ndim - 1 not in axis ): assert data.dtype == np.int8 # Using _avgpool_kernel to do mean pool_shape = tuple( x if i in axis else 1 for i, x in enumerate(data.shape[1:-1], start=1) ) strides = (1, ) * len(pool_shape) paddings = (0, ) * len(pool_shape) * 2 output = _avgpool_kernel(data, pool_shape, strides=strides, paddings=paddings) if not attrs.keepdims: output = np.squeeze(output, tuple(axis)) else: output = tf.math.reduce_mean(data, axis=axis, keepdims=bool(attrs.keepdims)).numpy() return output
[docs] def squeeze(attrs: _afe_attrs.SqueezeAttrs, data: np.ndarray) -> np.ndarray: return tf.squeeze(data, axis=attrs.axis).numpy()
[docs] def argmax(attrs: _afe_attrs.ArgMaxAttrs, data: np.ndarray) -> np.ndarray: if attrs.input_scalar_type == ScalarType.float32: # Floating-point argmax. Use the numpy implementation. assert data.dtype == np.float32 assert attrs.result_scalar_type == ScalarType.int32, "Unsupported result type for the argmax operator" axis = attrs.axis if attrs.exclude: axis = utils.exclude_axes(len(data.shape), axis) assert len(axis) == 1, "Error Argmax does not support multiple axes" if attrs.select_last_index: data = np.flip(data, axis=axis[0]) res = np.argmax(data, axis[0], keepdims=bool(attrs.keepdims)).astype(np.int32) if attrs.select_last_index: res = data.shape[axis[0]] - 1 - res return res elif attrs.input_scalar_type in (ScalarType.int8, ScalarType.bfloat16): from ml_kernels.np_operators import ideal_arg_min_max # Integer argmax. Use the reference implementation in ml_kernels. assert data.dtype == attrs.input_scalar_type.numpy_type() assert attrs.result_scalar_type == ScalarType.int32, "Unsupported result type for the argmax operator" # Call the reference implementation without a batch dimension. outputs: List[np.ndarray] = list() for idx in range(data.shape[0]): output = ideal_arg_min_max( data[idx], ArgMinMaxOp.MAX, select_last_index=attrs.select_last_index ) outputs.append(np.expand_dims(output, axis=0)) res = np.concatenate(outputs) return res # else raise ValueError("Unexpected scalar type for argmax operator")
[docs] def softmax(attrs: Union[_afe_attrs.SoftmaxAttrs, _afe_attrs.SoftmaxQuantAttrs], data: np.ndarray) -> np.ndarray: if isinstance(attrs, _afe_attrs.SoftmaxQuantAttrs): from ml_kernels.requantization import FractionalZeroRequantization, Narrowing from ml_kernels.np_operators import ideal_softmax assert data.dtype in (np.int8, np.int16) axis = attrs.axis if attrs.axis == -1 else attrs.axis - 1 lut_input_pre_shift = attrs.lut_input_pre_shift output_pre_shift = attrs.output_pre_shift output = np.array([ideal_softmax(ifm, zp_exp=attrs.exp_zp, zp_rec=attrs.rec_zp, axis=axis, lut_exp=attrs.lookup_table_exp, lut_rec=attrs.lookup_table_rec, req_lut_input=attrs.requant_lut, req_output=attrs.requant_output, lut_input_pre_shift=lut_input_pre_shift, output_pre_shift=output_pre_shift) for ifm in data]) elif data.dtype == bfloat16: from ml_kernels.global_constants import compute_bf16_exp_lut, compute_bf16_reciprocal_lut from ml_kernels.np_operators import ideal_softmax axis = attrs.axis if attrs.axis == -1 else attrs.axis - 1 lut_exp = compute_bf16_exp_lut() lut_rec = compute_bf16_reciprocal_lut() output = np.array([ideal_softmax(ifm, axis=axis, lut_exp=lut_exp, lut_rec=lut_rec) for ifm in data]) else: output = _ev_transforms.softmax(data, attrs.axis) return output
[docs] def lrn(attrs: Union[_afe_attrs.LRNAttrs, _afe_attrs.LRNQuantAttrs], data: np.ndarray) -> np.ndarray: from ml_kernels.np_operators import ideal_lrn if attrs.axis == 1: # Transpose data to AwesomeDataLayout. We assume the data is formatted to NCHW data = utils.transpose_tensor_according_to_layout_strings(data, 'NCHW', AwesomeDataLayout) if isinstance(attrs, _afe_attrs.LRNAttrs): assert data.dtype == np.float32 # NOTES FOR TENSORFLOW # TVM defines size as size_tvm = (depth_radius_tf * 2) + 1 # TVM defines alpha as alpha_tvm = alpha_tf * size_tf alpha = attrs.alpha / attrs.size if attrs.size % 2 == 1: # Use tf for odd window size depth_radius = int((attrs.size - 1) / 2) output = tf.nn.local_response_normalization(input=data, depth_radius=depth_radius, bias=attrs.bias, alpha=alpha, beta=attrs.beta).numpy() else: # Use ml_kernels for even window size output = ideal_lrn(data[0], window_size=attrs.size, padding=attrs.size // 2, bias=attrs.bias, alpha=alpha, beta=attrs.beta) output = np.expand_dims(output, axis=0) else: assert isinstance(attrs, _afe_attrs.LRNQuantAttrs) assert data.dtype == np.int8 from ml_kernels.requantization import FractionalZeroRequantization, Narrowing requant_lut_input = FractionalZeroRequantization(attrs.lut_scale, attrs.lut_zp_corr, Narrowing(attrs.lut_sh, RoundType.UPWARD, data.dtype)) requant_output = FractionalZeroRequantization(attrs.output_scale, attrs.output_zp_corr, Narrowing(attrs.output_sh, RoundType.UPWARD, data.dtype)) output = ideal_lrn(data[0], window_size=attrs.size, padding=attrs.size // 2, lut=attrs.lookup_table.reshape((16, 16)), zp_input=attrs.input_zp, requant_lut_input=requant_lut_input, requant_output=requant_output) output = np.expand_dims(output, axis=0) if attrs.axis == 1: # Transpose data back to original layout output = utils.transpose_tensor_according_to_layout_strings(output, AwesomeDataLayout, 'NCHW') return output
[docs] def concatenate(attrs: _afe_attrs.ConcatenateAttrs, data_list: List[np.ndarray]) -> np.ndarray: return tf.concat(values=data_list, axis=attrs.axis).numpy()
[docs] def transpose(attrs: _afe_attrs.TransposeAttrs, data: np.ndarray) -> np.ndarray: if len(attrs.axes) < 1: perm = None else: perm = attrs.axes output = tf.transpose(a=data, perm=perm).numpy() return output
[docs] def depth_to_space(attrs: _afe_attrs.DepthToSpaceAttrs, data: np.ndarray) -> np.ndarray: return ml_kernels.np_operators.ideal_depth_to_space(data, attrs.block_size, attrs.mode)
[docs] def reshape(attrs: _afe_attrs.ReshapeAttrs, data: np.ndarray) -> np.ndarray: return _ev_transforms.reshape(attrs.newshape, data)
[docs] def expand_dims(attrs: _afe_attrs.ExpandDimsAttrs, data: np.ndarray) -> np.ndarray: for _ in range(attrs.num_newaxis): data = np.expand_dims(data, attrs.axis) return data
[docs] def batch_flatten(data: np.ndarray) -> np.ndarray: """Flattens all the dimensions except for the batch dimension""" new_shape = (data.shape[0], -1) return np.reshape(data, new_shape)
[docs] def min_op(attrs: _afe_attrs.ExtmAttrs, data: np.ndarray) -> np.ndarray: assert data.dtype == np.float32 axis = attrs.axis if attrs.exclude: axis = utils.exclude_axes(len(data.shape), axis) assert len(axis) == 1, "Error min does not support multiple axes" _validate_reduce_operands(axis, data.shape) output = tf.math.reduce_min(input_tensor=data, axis=axis[0], keepdims=bool(attrs.keepdims)) return output.numpy()
[docs] def max_op(attrs: _afe_attrs.ExtmAttrs, data: np.ndarray) -> np.ndarray: assert data.dtype == np.float32 axis = attrs.axis if attrs.exclude: axis = utils.exclude_axes(len(data.shape), axis) assert len(axis) == 1, "Error max does not support multiple axes" _validate_reduce_operands(axis, data.shape) output = tf.math.reduce_max(input_tensor=data, axis=axis[0], keepdims=bool(attrs.keepdims)) return output.numpy()
[docs] def sum_op(attrs: _afe_attrs.SumAttrs, data: np.ndarray) -> np.ndarray: assert data.dtype == np.float32 axis = attrs.axis if attrs.exclude: axis = utils.exclude_axes(len(data.shape), axis) assert len(axis) == 1, "Error sum does not support multiple axes" _validate_reduce_operands(axis, data.shape) output = tf.math.reduce_sum(input_tensor=data, axis=axis[0], keepdims=bool(attrs.keepdims)) return output.numpy()
[docs] def prod(attrs: _afe_attrs.ProdAttrs, data: np.ndarray) -> np.ndarray: assert data.dtype == np.float32 axis = attrs.axis if attrs.exclude: axis = utils.exclude_axes(len(data.shape), axis) assert len(axis) == 1, "Error prod does not support multiple axes" _validate_reduce_operands(axis, data.shape) output = tf.math.reduce_prod(input_tensor=data, axis=axis[0], keepdims=bool(attrs.keepdims)) return output.numpy()
[docs] def tuple_get_item(attrs: _afe_attrs.TupleGetItemAttrs, t: tuple) -> np.ndarray: return t[attrs.index]
[docs] def exp(data: np.ndarray) -> np.ndarray: return tf.math.exp(data).numpy()
[docs] def sqrt(data: np.ndarray) -> np.ndarray: return tf.math.sqrt(data).numpy()
[docs] def sigmoid(data: np.ndarray) -> np.ndarray: if data.dtype == bfloat16: from ml_kernels.global_constants import compute_bf16_exp_lut, compute_bf16_reciprocal_lut from ml_kernels.np_operators import ideal_sigmoid lut_exp = compute_bf16_exp_lut() lut_rec = compute_bf16_reciprocal_lut() output = np.array([ideal_sigmoid(ifm, lut_exp=lut_exp, lut_rec=lut_rec) for ifm in data]) else: output = tf.math.sigmoid(data).numpy() return output
[docs] def swish(data: np.ndarray) -> np.ndarray: if data.dtype == bfloat16: from ml_kernels.global_constants import compute_bf16_exp_lut, compute_bf16_reciprocal_lut from ml_kernels.np_operators import ideal_sigmoid lut_exp = compute_bf16_exp_lut() lut_rec = compute_bf16_reciprocal_lut() output = np.array([multiply(ifm, ideal_sigmoid(ifm, lut_exp=lut_exp, lut_rec=lut_rec)) for ifm in data]) else: output = tf.math.multiply(data, tf.math.sigmoid(data)).numpy() return output
[docs] def hard_sigmoid(data: np.ndarray) -> np.ndarray: return np.clip((data / 6.0) + 0.5, a_min=0.0, a_max=1.0)
[docs] def hard_swish(data: np.ndarray) -> np.ndarray: return data * np.clip((data / 6.0) + 0.5, a_min=0.0, a_max=1.0)
[docs] def log(data: np.ndarray) -> np.ndarray: return np.log(data)
[docs] def softplus(data: np.ndarray) -> np.ndarray: return tf.math.softplus(data).numpy()
[docs] def erf(data: np.ndarray) -> np.ndarray: if data.dtype == np.dtype('bfloat16'): from ml_kernels.np_operators import ideal_erf outs = [] for i in range(data.shape[0]): out = ideal_erf(data[i]) out = np.expand_dims(out, axis=0) outs.append(out) output = np.concatenate(outs, axis=0) return output else: return tf.math.erf(data).numpy()
[docs] def gelu(x: np.ndarray) -> np.ndarray: return x * 0.5 * (1.0 + tf.math.erf(x / np.sqrt(2)).numpy())
[docs] def log2(data: np.ndarray) -> np.ndarray: return np.log2(data)
[docs] def log10(data: np.ndarray) -> np.ndarray: return np.log10(data)
[docs] def subtract(lhs: np.ndarray, rhs: np.ndarray) -> np.ndarray: assert (lhs.dtype == rhs.dtype) and lhs.dtype in (np.float32, bfloat16, np.int32) return tf.math.subtract(lhs, rhs).numpy()
[docs] def power(lhs: np.ndarray, rhs: np.ndarray) -> np.ndarray: return tf.math.pow(lhs, rhs).numpy()
[docs] def divide(lhs: np.ndarray, rhs: np.ndarray) -> np.ndarray: return np.asarray(tf.math.divide_no_nan(lhs, rhs))
[docs] def reciprocal(data: np.ndarray) -> np.ndarray: return tf.math.reciprocal(data).numpy()
[docs] def maximum(lhs: np.ndarray, rhs: np.ndarray) -> np.ndarray: assert (lhs.dtype == np.float32 and rhs.dtype == np.float32) return tf.math.maximum(lhs, rhs).numpy()
[docs] def minimum(lhs: np.ndarray, rhs: np.ndarray) -> np.ndarray: assert (lhs.dtype == np.float32 and rhs.dtype == np.float32) return tf.math.minimum(lhs, rhs).numpy()
[docs] def full(attrs: _afe_attrs.FullAttrs, fill_value: np.ndarray) -> np.ndarray: dtype = attrs.dtype if attrs.dtype else None return np.full(attrs.shape, fill_value, dtype=dtype)
[docs] def tile(attrs: _afe_attrs.TileAttrs, data: np.ndarray) -> np.ndarray: return np.tile(data, attrs.reps)
[docs] def split(attrs: _afe_attrs.SplitAttrs, data: np.ndarray) -> Tuple[np.ndarray, ...]: return tuple(np.split(data, attrs.indices_or_sections, attrs.axis))
[docs] def take(attrs: _afe_attrs.TakeAttrs, data: np.ndarray, indices: np.ndarray) -> np.ndarray: assert attrs.batch_dims == 0, "Take operation currently only support batch_dims == 0" mode = attrs.mode if mode == 'fast': mode = 'raise' return np.take(data, indices, axis=attrs.axis, mode=mode)
[docs] def strided_slice(attrs: _afe_attrs.StridedSliceAttrs, data: np.ndarray) -> np.ndarray: begin = list(attrs.begin) end = list(attrs.end) strides = list(attrs.strides) if attrs.axes is not None: # Begin, end, strides and axes are constrained to the same length by TVM axes = list(attrs.axes) new_begin = len(data.shape) * [0] new_end = list(data.shape) new_strides = len(data.shape) * [1] for i, e in enumerate(axes): new_begin[e] = begin[i] new_end[e] = end[i] new_strides[e] = strides[i] begin = new_begin end = new_end strides = new_strides if attrs.slice_mode == 'size': for i, b_e in enumerate(zip(begin, end)): b, e = b_e if e == -1: end[i] = data.shape[i] else: end[i] = b + e strides = None return tf.strided_slice(data, begin=begin, end=end, strides=strides).numpy()
[docs] def rsqrt(data: np.ndarray) -> np.ndarray: return tf.math.rsqrt(data).numpy()
[docs] def tanh(data: np.ndarray) -> np.ndarray: return tf.math.tanh(data).numpy()
def _resize2d(data: np.ndarray, size: Tuple[int, int], method: str, layout: str = AwesomeDataLayout, align_corners: bool = False, tf_ver: int = 1, rounding: Optional[RoundType] = None) -> np.ndarray: """ Resize along H and/or W dimension(s) using Tensorflow V1 or V2 image.resize function. :param: data: np.ndarray :param size: Tuple[int, int]. Output size after resizing :param method: str. Name of the resizing methods. Currently support "nearest" and "bilinear" :param layout: str. Defaut is AwesomeDataLayout. Layout of the input data :param align_corners: bool. Default is False. Only applicable with tf_ver == 1 :param rounding: Rounding method for integer resize. If None, floating-point resize is performed. """ dtype = data.dtype if rounding is None: assert dtype in ('float32', 'bfloat16') else: assert dtype in ('int8', 'int16') _SUPPORTED_METHODS = ["nearest", "bilinear"] assert method in _SUPPORTED_METHODS, f"Only support {_SUPPORTED_METHODS}, got {method}" assert len(data.shape) == len(layout) == 4, f"Only support 4-D tensor, got {data.shape}-D with layout = {layout}" data = utils.transpose_tensor_according_to_layout_strings(data, layout, AwesomeDataLayout) if tf_ver == 1: output = tf.compat.v1.image.resize(images=data, size=size, method=method, align_corners=align_corners).numpy() else: output = tf.image.resize(images=data, size=size, method=method).numpy() if rounding is not None: output = round_op(output, rounding) output = utils.transpose_tensor_according_to_layout_strings(output, AwesomeDataLayout, layout) return output.astype(dtype)
[docs] def image_resize2d(attrs: _afe_attrs.ImageResize2DAttrs, data: np.ndarray, rounding: Optional[str] = None) -> np.ndarray: """ AFE and MLA does not have a way to support nearest_neighbor with asymmetric. However, the error should be ignorable during inference. """ method = attrs.method if method == 'nearest_neighbor': method = 'nearest' elif method == 'linear': method = 'bilinear' elif method == 'cubic': method = 'bicubic' # Find out which version of Tensorflow # TF v1 default is "asymmetric" while TF v2 default is "half_pixel" # TVM does not care about this, it always returns "asymmetric" which can introduce error tf_ver = 2 if attrs.coordinate_transformation_mode in ['half_pixel', 'pytorch_half_pixel'] else 1 align_corners = attrs.coordinate_transformation_mode == 'align_corners' output = _resize2d(data, attrs.size, method, attrs.layout, align_corners, tf_ver, rounding=rounding) return output
[docs] def upsample(attrs: _afe_attrs.UpsamplingAttrs, data: np.ndarray, rounding: Optional[str] = None) -> np.ndarray: """ Upsample the input tensor along H and/or W dimension """ method = "nearest" if attrs.method == "nearest_neighbor" else attrs.method height_dim, weight_dim = (data.shape[attrs.layout.index("H")], data.shape[attrs.layout.index("W")]) size = (int(attrs.scale_h * height_dim), int(attrs.scale_w * weight_dim)) return _resize2d(data, size, method, attrs.layout, attrs.align_corners, tf_ver=1, rounding=rounding)
[docs] def gridsample(attrs: _afe_attrs.GridSampleAttrs, data: np.ndarray, grid: np.ndarray) -> np.ndarray: """ Image interpolation through GridSample """ mode = "linear" if attrs.method == "bilinear" else attrs.method if data.dtype != bfloat16: return_dtype = data.dtype data = data.astype(bfloat16) grid = grid.astype(bfloat16) else: return_dtype = bfloat16 outputs = [] for idx in range(data.shape[0]): input_data = data[idx] grid_data = grid[idx] output = ml_kernels.np_operators.ideal_grid_sample( input_data, grid_data, mode, attrs.padding_mode, attrs.align_corners ) outputs.append(np.expand_dims(output, axis=0)) res = np.concatenate(outputs) return res.astype(return_dtype)
def _validate_reduce_operands(axis: Tuple[int, ...], shape: Tuple[int, ...]): if len(shape) == 4: # Since only NHWC is currently supported by AFE, NHWC layout is assumed if tensor has 4 dimensions assert set(axis) == set([3]), "Only channel dimension is supported for NHWC" elif len(shape) == 3: # Only support H, W or both HW dimension for NHW assert set(axis) in [set([1]), set([2]), set([1, 2])], "H, W or both HW dimensions are supported for NHW"
[docs] def layer_norm(attrs: _afe_attrs.LayerNormAttrs | _afe_attrs.LayerNormQuantAttrs, data: np.ndarray) -> np.ndarray: from ml_kernels.global_constants import compute_bf16_rsqrt_lut from ml_kernels.np_operators import ideal_layer_norm if isinstance(attrs, _afe_attrs.LayerNormAttrs): if data.dtype == np.float32: output = ideal_layer_norm(data[0], axis=attrs.axis, epsilon=attrs.epsilon) else: lut_rsqrt = compute_bf16_rsqrt_lut() output = ideal_layer_norm(data[0], axis=attrs.axis, epsilon=attrs.epsilon, lut_rsqrt=lut_rsqrt) else: assert isinstance(attrs, _afe_attrs.LayerNormQuantAttrs) assert data.dtype == np.int8 output = ideal_layer_norm( data[0], axis=attrs.axis, lut_rsqrt=attrs.lookup_table_rsqrt, zp_rsqrt=attrs.zp_rsqrt, req_mean=attrs.requant_mean, req_lut_input=attrs.requant_lut_input, req_output=attrs.requant_output) output = np.expand_dims(output, axis=0) return output
[docs] def rms_norm(data: np.ndarray, attrs: Union[_afe_attrs.RMSNormAttrs, _afe_attrs.RMSNormQuantAttrs]) -> np.ndarray: from ml_kernels.global_constants import compute_bf16_rsqrt_lut from ml_kernels.np_operators import ideal_rms_norm assert data.shape[0] == 1, f'Batch dimension should be 1, got {data.shape[0]}.' if isinstance(attrs, _afe_attrs.RMSNormAttrs): if data.dtype == np.float32: output = ideal_rms_norm(data[0], epsilon=attrs.epsilon) else: lookup_table_rsqrt = compute_bf16_rsqrt_lut() output = ideal_rms_norm(data[0], epsilon=attrs.epsilon, lut_rsqrt=lookup_table_rsqrt) else: output = ideal_rms_norm(data[0], zp_ifm=attrs.zp_ifm, lut_rsqrt=attrs.lookup_table_rsqrt, zp_rsqrt=attrs.zp_rsqrt, req_lut_input=attrs.requant_lut_input, req_output=attrs.requant_output, lut_input_pre_shift=attrs.lut_input_pre_shift, output_pre_shift=attrs.output_pre_shift) output = np.expand_dims(output, axis=0) return output
[docs] def instance_norm(data: np.ndarray, mean: np.ndarray, variance: np.ndarray, attrs: _afe_attrs.InstanceNormAttrs | _afe_attrs.InstanceNormQuantAttrs): from ml_kernels.np_operators import ideal_instance_norm assert data.shape[0] == mean.shape[0] == variance.shape[0] # TODO: add depth channel to 4D tensor and remove it after getting result back. if isinstance(attrs, _afe_attrs.InstanceNormAttrs): outputs = [np.expand_dims(ideal_instance_norm(data[i], variance[i], mean[i], epsilon=attrs.epsilon), axis=0) for i in range(data.shape[0])] else: outputs = [np.expand_dims(ideal_instance_norm(data[i], variance[i], mean[i], lut_rsqrt=attrs.lut_rsqrt, zp_rsqrt=attrs.zp_rsqrt, req_output=attrs.requant_out), axis=0) for i in range(data.shape[0])] output = np.concatenate(outputs, axis=0) return output
[docs] def layout_transform(attrs: _afe_attrs.LayoutTransformAttrs, data: np.ndarray) -> np.ndarray: return _ev_transforms.layout_transform(data, attrs.src_layout, attrs.dst_layout)
[docs] def calculate_tessellated_tensor_shape( tensor_type: TensorType, slice_shape: Sequence[int], align_c16: bool ) -> tuple[int, int]: elem_size = np.dtype(tensor_type.scalar.numpy_type()).itemsize tile_slice_grids = [ _ev_transforms.calculate_slice_grid(x, y) for x, y in zip(tensor_type.shape[1:], slice_shape) ] # Calculate block sizes tile_sizes = _ev_transforms.calculate_slice_grid_sizes(tile_slice_grids, align_c16, elem_size) return tensor_type.shape[0], int(np.sum(tile_sizes))
[docs] def tessellation(attrs: _afe_attrs.TessellationTransformAttrs, data: np.ndarray) -> np.ndarray: """ Input tensor is 4D NHWC, int8 only Output tensor is 2D array """ return _ev_transforms.tessellation(data, attrs.slice_shape, attrs.align_c16, attrs.cblock)
[docs] def detessellation(attrs: _afe_attrs.DetessellationTransformAttrs, data: np.ndarray) -> np.ndarray: """ Input tensor is 2D Output tensor is 4D: NHWC """ out_dtype = scalar_type_to_dtype(attrs.frame_type.scalar) return _ev_transforms.detessellation( data, attrs.slice_shape, out_dtype, attrs.frame_type.shape, attrs.align_c16, attrs.cblock )
def _size_aligned_to_multiple(n: int, m: int) -> int: """ Helper function for rounding a value to be a multiple of m. """ return ((n + m - 1) // m) * m
[docs] def get_channel_aligned_shape(tensor_shape: Sequence[int], elem_size: int) -> tuple[int, ...]: """ Helper function to get a tensor shape where channel is aligned based on the element size. """ assert elem_size in (1, 2, 4) channel_align = 16 // elem_size return *tensor_shape[:-1], _size_aligned_to_multiple(tensor_shape[-1], channel_align)
[docs] def get_mla_padded_2d_shape(tensor_shape: Sequence[int], elem_size: int) -> tuple[int, int]: tensor_shape = (*tensor_shape[:-1], tensor_shape[-1] * elem_size) return tensor_shape[0], _size_aligned_to_multiple(math.prod(tensor_shape[1:]), 16)
[docs] def reshape_to_mla_padded_2d_shape(tensor: np.ndarray) -> np.ndarray: """ Reshape tensor to MLA 2D buffer shape (batch_size, data_size (D * H * W * C)) where data size must be multiple of 16. """ assert len(tensor.shape) in (4, 5), f'Expected 4D/5D tensor, got {len(tensor.shape)}D.' mla_2d_shape = get_mla_padded_2d_shape( tensor_shape=tensor.shape, elem_size=tensor.dtype.itemsize ) tensor_2d = tensor.reshape(mla_2d_shape[0], -1).view(np.int8) tensor_2d = np.pad(tensor_2d, ((0, 0), (0, mla_2d_shape[1] - tensor_2d.shape[1]))) assert tensor_2d.shape == mla_2d_shape, ( f"Expected shape: {mla_2d_shape}, got {tensor_2d.shape}." ) return tensor_2d
[docs] def reshape_from_mla_padded_2d_shape( tensor: np.ndarray, data_shape: Sequence[int], tensor_type: type ) -> np.ndarray: """ Reshape tensor from MLA 2D shape to 4D/5D shape. :param tensor: 2D tensor. :param data_shape: 4D/5D tensor shape. :return: Reshaped 4D/5D tensor. """ assert len(tensor.shape) == 2, f'Expected 2D tensor, got {len(tensor.shape)}D.' elem_size = np.dtype(tensor_type).itemsize return tensor[:, :math.prod(data_shape[1:] * elem_size)].view(tensor_type).reshape(data_shape)
[docs] def pack(data: List[np.ndarray]) -> np.ndarray: """ Multiple tensors are packed sequentially as a 2D array. Input data can be either a 2D tessellated tensor or a 4D tensor that will be tessellated on the MLA. If there is the 4D tensor reshape it to MLA 2D shape. """ tensors = [] for tensor in data: if len(tensor.shape) in (4, 5): tensor_2d = reshape_to_mla_padded_2d_shape(tensor) tensors.append(tensor_2d) else: tensors.append(tensor) return _ev_transforms.pack(tensors)
[docs] def unpack(attrs: _afe_attrs.UnpackTransformAttrs, data: np.ndarray) -> List[np.ndarray]: """ A 2D array is unpacked to produce multiple 2D arrays """ return _ev_transforms.unpack(data, [scalar_type_to_dtype(tt.scalar) for tt in attrs.tensor_types], [tt.shape for tt in attrs.tensor_types])
[docs] def normalization(attrs: _afe_attrs.NormalizationTransformAttrs, data: np.ndarray) -> np.ndarray: """ Normalization performs the following three steps: 1) Divide by a per-channel divisor 2) Subtract by per-channel mean values 3) Divide by per-channel standard deviation values """ N, H, W, C = data.shape # Per channel params: List of (divisor, mean, sigma) channel_params = attrs.channel_params assert len(channel_params) == 1 or len(channel_params) == C,\ "Channel param list must be of length 1 or same as number of channels" assert len(channel_params[0]) == 3, "Channel params must contain three values" # If channel params are just for one channel, populate it to all channels if len(channel_params) == 1: channel_params *= C return _ev_transforms.normalize(data, channel_params)
[docs] def ev_quantize(attrs: _afe_attrs.QuantizationTransformAttrs, data: np.ndarray) -> np.ndarray: """ Quantization transform. """ if data.dtype == bfloat16: assert len(attrs.channel_params) == 1, "Per-channel for quantize is not supported yet." scale, zp = attrs.channel_params[0] requant = FloatRequantization( sc_correction=scale, zp_correction=zp, out_dtype=attrs.output_data_type.numpy_type() ) return ml_kernels.np_operators.ideal_requantize(data, requant) else: return _ev_transforms.quantize(data, attrs.channel_params, attrs.num_bits)
[docs] def ev_dequantize(attrs: _afe_attrs.DequantizationTransformAttrs, data: np.ndarray) -> np.ndarray: """ Dequantization transform. """ if attrs.output_type == ScalarType.bfloat16: assert len(attrs.channel_params) == 1, "Per-channel for dequantize is not supported yet." scale, zp = attrs.channel_params[0] requant = FloatRequantization( sc_correction=scale, zp_correction=zp, out_dtype=attrs.output_type.numpy_type() ) return ml_kernels.np_operators.ideal_requantize(data, requant) else: return _ev_transforms.dequantize(data, attrs.channel_params)
[docs] def ev_resize(attrs: _afe_attrs.ResizeTransformAttrs, data: np.ndarray) -> np.ndarray: return _ev_transforms.resize(data, attrs.target_width, attrs.target_height, attrs.keep_aspect, attrs.deposit_location.value, attrs.method.value)
[docs] def chroma_upsample(attrs: _afe_attrs.ChromaUpsampleTransformAttrs, data: np.ndarray) -> np.ndarray: return _ev_transforms.chroma_upsample(data, attrs.frame_height, attrs.frame_width, attrs.yuv_sampling.value)
[docs] def yuv_rgb_conversion(attrs: _afe_attrs.YuvRgbConversionTransformAttrs, data: np.ndarray) -> np.ndarray: return _ev_transforms.yuv_rgb_conversion(data, attrs.conversion.value, attrs.std.value)
[docs] def bgr_rgb_conversion(attrs: _afe_attrs.BgrRgbConversionTransformAttrs, data: np.ndarray) -> np.ndarray: return _ev_transforms.bgr_rgb_conversion(data, attrs.conversion.value)
[docs] def ev_sigmoid(attrs: _afe_attrs.SigmoidTransformAttrs, data: np.ndarray) -> np.ndarray: return _ev_transforms.sigmoid(data, attrs.save_int16)
[docs] def nms_maxpool(attrs: _afe_attrs.NmsMaxpoolTransformAttrs, data: np.ndarray) -> np.ndarray: return _ev_transforms.nms_maxpool(data, attrs.kernel)
[docs] def cast(attrs: _afe_attrs.CastAttrs, data: np.ndarray) -> np.ndarray: return np.array(data).astype(attrs.out_dtype)
[docs] def qnn_quantize(attrs: _afe_attrs.QNNQuantizeAttrs, data: np.ndarray, output_scale: np.ndarray, output_zero_point: np.ndarray) -> np.ndarray: """ For the rounding type used for this operator (away from 0), refer to: https://github.com/apache/tvm/pull/3512/commits/c089ebcdf4b13f98b776bb0213779f6783fa6743#diff-a47be721cf0f30d86d0f548a8cc5a1fe184d0827efd450c8446bfc05d962abf5R47 """ out_dtype = attrs.out_dtype axis = attrs.axis assert out_dtype in [QNNDtype.INT8, QNNDtype.UINT8, QNNDtype.INT32] axis_shape = data.shape[axis] scale_len = output_scale.size zp_len = output_zero_point.size assert scale_len == 1 or scale_len == axis_shape and zp_len == 1 or zp_len == axis_shape if zp_len == axis_shape or scale_len == axis_shape: output_scale = np.array(axis_shape * output_scale) if scale_len == 1 else output_scale output_zero_point = np.array(axis_shape * output_zero_point) if zp_len == 1 else output_zero_point output_scale = np.squeeze(output_scale) output_zero_point = np.squeeze(output_zero_point) output = np.swapaxes(data, axis, -1) output = output / output_scale + output_zero_point output = np.swapaxes(output, axis, -1) else: output = (data / output_scale) + output_zero_point out_dtype_min, out_dtype_max = DTYPE_BOUNDS[out_dtype] return np.clip(round_op(output, RoundType.TONEAREST), out_dtype_min, out_dtype_max).astype(out_dtype)
[docs] def qnn_dequantize(attrs: _afe_attrs.QNNDequantizeAttrs, data: np.ndarray, input_scale: np.ndarray, input_zero_point: np.ndarray) -> np.ndarray: axis = attrs.axis axis_shape = data.shape[axis] # Casting in order to avoid underflow/overflow of int8 and uint8 inputs. data = data.astype(Float) scale_len = input_scale.size zp_len = input_zero_point.size assert scale_len == 1 or scale_len == axis_shape and zp_len == 1 or zp_len == axis_shape if zp_len == axis_shape or scale_len == axis_shape: input_scale = np.array(axis_shape * input_scale) if scale_len == 1 else input_scale input_zero_point = np.array(axis_shape * input_zero_point) if zp_len == 1 else input_zero_point input_scale = np.squeeze(input_scale) input_zero_point = np.squeeze(input_zero_point) output = np.swapaxes(data, axis, -1) output = (output - input_zero_point) * input_scale output = np.swapaxes(output, axis, -1) else: output = (data - input_zero_point) * input_scale return output
def _qnn_requantize(data: np.ndarray, input_scale: float, input_zero_point: int, output_scale: float, output_zero_point: int, rounding: RoundType = RoundType.TOEVEN, out_dtype: QNNDtype = QNNDtype.INT32) -> np.ndarray: """ For the explanation of UPWARD and TONEAREST rounding types, refer to: https://github.com/apache/tvm/blob/main/include/tvm/relay/qnn/attrs.h#L47 """ if rounding is None or rounding == "None": rounding = RoundType.TOEVEN assert rounding in [RoundType.UPWARD, RoundType.TONEAREST, RoundType.TOEVEN] assert out_dtype in [QNNDtype.INT8, QNNDtype.UINT8, QNNDtype.INT32] # Casting in order to avoid underflow/overflow of int8 and uint8 inputs. data = data.astype(QNNDtype.INT32) output = output_zero_point + round_op((input_scale / output_scale) * (data - input_zero_point), rounding) out_dtype_min, out_dtype_max = DTYPE_BOUNDS[out_dtype] return np.clip(output, out_dtype_min, out_dtype_max)
[docs] def do_requantize(in_scale, out_scale, in_zp, out_zp) -> bool: return in_scale != out_scale or in_zp != out_zp
[docs] def qnn_add(attrs: _afe_attrs.AwesomeAttributes, lhs: np.ndarray, rhs: np.ndarray, lhs_scale: float, lhs_zero_point: int, rhs_scale: float, rhs_zero_point: int, output_scale: float, output_zero_point: int, op: str = "add") -> np.ndarray: # Requantize input tensors to the output parameters # See QnnAddCanonicalize in src/relay/qnn/op/add.cc assert op in ["add", "sub"] if do_requantize(lhs_scale, output_scale, lhs_zero_point, output_zero_point): lhs = _qnn_requantize(lhs, lhs_scale, lhs_zero_point, output_scale, output_zero_point) if do_requantize(rhs_scale, output_scale, rhs_zero_point, output_zero_point): rhs = _qnn_requantize(rhs, rhs_scale, rhs_zero_point, output_scale, output_zero_point) if op == "sub": output_zero_point = -output_zero_point rhs = -rhs output = lhs + rhs - output_zero_point return output
[docs] def qnn_mul(attrs: _afe_attrs.AwesomeAttributes, lhs: np.ndarray, rhs: np.ndarray, lhs_scale: float, lhs_zero_point: int, rhs_scale: float, rhs_zero_point: int, output_scale: float, output_zero_point: int) -> np.ndarray: # See QnnMulCanonicalize in src/relay/qnn/op/mul.cc lhs_shifted = lhs.astype(QuantizedTensor) rhs_shifted = rhs.astype(QuantizedTensor) if lhs_zero_point != 0: lhs_shifted -= lhs_zero_point if rhs_zero_point != 0: rhs_shifted -= rhs_zero_point output = lhs_shifted * rhs_shifted new_input_scale = lhs_scale * rhs_scale new_input_zero_point = np.array(0) output = _qnn_requantize(output, new_input_scale, new_input_zero_point, output_scale, output_zero_point) return output
######################### # PARTITIONING OPERATIONS ######################### def _run_mod(rt_mod: TVMGraphModule, input_dict: Dict[str, np.ndarray], num_outputs: int) -> List[np.ndarray]: rt_mod.set_input(**input_dict) rt_mod.run() return [np.asarray(np.nan_to_num(rt_mod.get_output(i).asnumpy())) for i in range(num_outputs)]
[docs] def external(attrs: _afe_attrs.ExternalAttrs, input_dict: Dict[str, np.ndarray]) \ -> Union[np.ndarray, Tuple[np.ndarray, ...]]: # Execute runtime module rt_mod = attrs.graph_module num_outputs = rt_mod.get_num_outputs() external_op_input_dict = {input_name: value for input_name, value in zip(attrs.external_input_list, input_dict.values())} outputs = _run_mod(rt_mod, external_op_input_dict, num_outputs) output = tuple(outputs) return output[0] if len(output) == 1 else output
################### # CUSTOM OPERATIONS ###################
[docs] def init_custom_op(attrs: _afe_attrs.CustomOpAttrs, input_dict: Dict[InputName, np.ndarray], output_shape: Tuple[int, ...], force_compile: bool = True) -> None: """ Initialize the custom op. Compile the custom op and put it into the CustomOpLibraryManager. Update the CustomOpAttrs with generated arguments list and function so it can be used at the execution time. :param attrs: CustomOpAttrs :param input_dict: Dict[InputName, np.ndarray]. Input name to its tensor :param output_shape: Tuple[int, ...]. Output shape :param force_compile: bool. Default is True. If True, the init_custom_op will compile the custom op no matter the library is ready or not """ from afe.ir.custom_operation.custom_operation import create_custom_op_function if force_compile or (attrs.args_list or attrs.function) is None: # Get the input shapes and types shapes: List[Tuple[int, ...]] = [] types: List[str] = [] for _input in input_dict.values(): shapes.append(_input.shape) types.append(str(_input.dtype)) # Only support same dtype for all input tensors assert all([dtype == types[0] for dtype in types]), \ f"Only support input tensors with same dtypes. Got {types}" # Pass the attributes into CustomOpAttrs attrs.c_code_in_shapes = shapes attrs.c_code_in_dtypes = types # TODO: This assumes there is only 1 ouput. Check if there will be multiple outputs. args_list, function = create_custom_op_function(attrs, input_dict, output_shape) attrs.args_list = args_list attrs.function = function
[docs] def execute_custom_op(attrs: _afe_attrs.CustomOpAttrs, input_dict: Dict[InputName, np.ndarray]) -> np.ndarray: """ Execute the custom op :param attrs: CustomOpAttrs :param input_dict: Dict[InputName, np.ndarray]. Input name to its tensor :return: np.ndarray """ from afe.ir.custom_operation.custom_operation import execute_custom_op return execute_custom_op(attrs, input_dict)
[docs] def batch_matmul(lhs: np.ndarray, rhs: np.ndarray, attrs: Union[_afe_attrs.BatchMatmulAttrs, _afe_attrs.BatchMatmulQuantAttrs]) \ -> np.ndarray: """ Execute batch matmul operation. :param lhs: Tensor representing lhs value of batch matmul operation. :param rhs: Tensor representing rhs value of batch matmul operation. :param attrs: BatchMatmul operator's attributes. :return: The result of batch matmul operation. """ from ml_kernels.np_operators import ideal_batch_matmul if isinstance(attrs, _afe_attrs.BatchMatmulAttrs): assert lhs.dtype in ('float32', 'bfloat16') and rhs.dtype in ('float32', 'bfloat16') output = ideal_batch_matmul(lhs[0], rhs[0], transpose_b=attrs.transpose_b) return np.expand_dims(output, axis=0) else: assert lhs.dtype == np.int8 and rhs.dtype == np.int8 output = ideal_batch_matmul(lhs[0], rhs[0], transpose_b=attrs.attrs.transpose_b, zp_a=attrs.lhs_zp, zp_b=attrs.rhs_zp, requant_params=attrs.requant, intrinsic_shift=attrs.intrinsic_shift) return np.expand_dims(output, axis=0)