#########################################################
# Copyright (C) 2020 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Joey Chou
#########################################################
import enum
import functools
import dataclasses
import math
import numpy as np
import tensorflow as tf
from typing import Tuple, Optional, Union, List, Dict, Callable, Sequence
import afe.ir.quantization_utils
from afe._tvm._defines import TVMGraphModule
import afe.ir.attributes as _afe_attrs
from afe.ir.attributes import ConvAttrs, convolution_output_shape
from afe.ir.defines import (
AwesomeConvWeightLayout5D, Float, InputShape, AwesomePad2D,
AwesomeConvWeightLayout, AwesomeDepthwiseConvWeightLayout, AwesomeDataLayout,
AwesomeDataLayout5D, QuantizedTensor, AwesomeStrides, AwesomeDilation, AwesomeStrides3D,
AwesomeDilation3D, AwesomePad3D, AwesomePoolSize, InputName
)
from afe.ir.quantization_utils import round_op, QNNDtype, DTYPE_BOUNDS
import afe.ir.utils as utils
from afe.ir.tensor_type import scalar_type_to_dtype, ScalarType, TensorType
from dataclasses import dataclass
from enum import Enum
from ml_kernels.math_helpers import RoundType, ArgMinMaxOp, Activation, bfloat16
from ev_transforms import transforms as _ev_transforms
from ml_kernels.requantization import BaseRequantization, Renormalization, get_id_requantization, \
is_identity_requantization, FloatRequantization, is_renormalization
import ml_kernels.np_operators
import ml_kernels.math_helpers
@dataclass
[docs]
class RunMode(Enum):
"""
Supported run modes.
MLA_MODE : use an implementation that exactly matches execution on the MLA.
FAST_MODE : use a fast execution implementation
"""
[docs]
FAST_MODE_MODEL_SDK = 3
[docs]
def is_fast_mode(self):
return self.value == RunMode.FAST_MODE.value
"""
Network functions that are executed within AwesomeOperations
While SiMa.ir (AwesomeAttributes) essentially inherit from the attributes of the operations in TVM, not all
of these attributes are necessarily fed into the functions SiMa uses for network inference. The AwesomeOperations
are a useful layer of abstraction. For methods like run and run_quant, AwesomeOperations allow the user to simply
pass in input dictionaries and AwesomeAttributes then return the output tensors. Internally these methods
perform preprocessing on the inputs and AwesomeAttributes before passing them to the actual functions. An added benefit
of defining these operations as class attributes external from AwesomeOperators is that they can be readily swapped out
for operations with the same function signature (useful for when we define our own suite of numpy operations).
"""
############################
# Quantized Kernel functions
############################
def _avgpool_kernel(
data: np.ndarray,
pool_size: tuple[int, ...],
strides: tuple[int, ...],
paddings: tuple[int, ...],
pad_value: int = 0,
requant: Optional[BaseRequantization] = None,
rounding_type: RoundType = RoundType.TRUNC,
mode: RunMode = RunMode.MLA_MODE,
) -> np.ndarray:
"""
Execute a quantized 3D average pool using the backend provided kernel. The input
data layout mush be in NDHWC and the output will have the same NDHWC layout.
Note: Don't support dilation.
Parameters
----------
:param data: np.ndarray. Input data with NHWC layout.
:param pool_size: Tuple[int, int]. Pooling window size in a Tuple format.
The order of the dimension must be in [height, width].
:param strides: Tuple[int, int]. Pooling window strides in a Tuple format.
The order of the dimension must be in [height, width].
:param paddings: Tuple[int, int, int, int]. Padding for each side of
height and width. The order is in:
[pad_top, pad_bottom, pad_left, pad_right]
:param pad_value: int. Defualt is 0. The value of paddings.
:param rounding_type: RoundType. Rounding method of requantization.
Return
------
:return: np.ndarray. 3D Average pool output in NDHWC data layout.
"""
from ml_kernels.np_operators import ideal_pool, pool_requantization
op = 'average'
# TODO: Remove astype() casts once all kernels use consistent array formats and set correct formats
requantization = pool_requantization(pool_size, op, rounding_type=rounding_type) if requant is None\
else requant
outputs: list[np.ndarray] = list()
for idx in range(data.shape[0]):
output = ideal_pool(
ifm=data[idx],
kernel_size=pool_size,
stride=strides,
padding=paddings,
op=op,
requantization=requantization,
use_tf=mode.is_fast_mode()
)
outputs.append(np.expand_dims(output, axis=0))
res = np.concatenate(outputs)
return res
def _not_used_avgpool3d_kernel(data: np.ndarray,
pool_size: Tuple[int, int, int],
strides: Tuple[int, int, int],
paddings: Tuple[int, int, int, int, int, int],
pad_value: int = 0,
rounding_type: RoundType = RoundType.TRUNC,
mode: RunMode = RunMode.MLA_MODE,
) -> np.ndarray:
"""
Execute a quantized 3D average pool using the backend provided kernel. The input
data layout mush be in NDHWC and the output will have the same NDHWC layout.
Note: Don't support dilation.
Parameters
----------
:param data: np.ndarray. Input data with NDHWC layout.
:param pool_size: Tuple[int, int, int]. Pooling window size in a Tuple format.
The order of the dimension must be in [depth, height, width].
:param strides: Tuple[int, int, int]. Pooling window strides in a Tuple format.
The order of the dimension must be in [depth, height, width].
:param paddings: Tuple[int, int, int, int, int, int]. Padding for each side of
depth, height and width. The order is in:
[pad_depth_front, pad_depth_back, pad_top, pad_bottom, pad_left, pad_right]
:param pad_value: int. Default is 0. The value of paddings.
:param rounding_type: RoundType. Rounding method of requantization.
Return
------
:return: np.ndarray. 3D Average pool output in NDHWC data layout.
"""
# Pad the data along the depth dimension
depth_paddings = ([(0, 0) for _ in range(5)]) # NDHWC
depth_paddings[1] = (paddings[0:2])
padded_data = np.pad(data, depth_paddings, 'constant', constant_values=pad_value)
input_batch, input_d, input_h, input_w, input_c = padded_data.shape
pool_d, pool_h, pool_w = pool_size
stride_d, stride_h, stride_w = strides
# Lower the depth dimension to the height dimension
new_pool_h = pool_d * pool_h
# Modify the stride along the height dimension to make sure the pooling
# can jump to the right height dimension after lowering depth dimension
# into the height dimension.
new_stride_h = stride_h * pool_d
# Swap the depth and height dimension before lowering the depth dimension into
# the height dimension to make sure the new_stride_h can slice out the correct
# partial tensor.
transposed_data = padded_data.transpose((0, 2, 1, 3, 4)) # NHDWC
output = []
for start_d in range(0, input_d - pool_d + 1, stride_d):
end_d = start_d + pool_d
# Slice out the target depth dimension. The transposed_data
# has NHDWC data layout
partial_data = transposed_data[:, :, start_d:end_d, :, :]
# Lower the depth dimension into the height dimension and modify the stride
# along the height dimension to accommodate the depth dimension.
partial_data = partial_data.reshape((input_batch, pool_d * input_h, input_w, input_c))
partial_output = _avgpool_kernel(
partial_data, (new_pool_h, pool_w), (new_stride_h, stride_w), paddings=paddings[2:],
rounding_type=rounding_type,
mode=mode)
output.append(partial_output.astype(data.dtype))
# Concatenate along depth dimension using np.array(output).
return np.array(output).transpose((1, 0, 2, 3, 4))
################
# Functions APIs
################
[docs]
def placeholder(data: np.ndarray) -> np.ndarray:
return data
[docs]
def constant(data: np.ndarray) -> np.ndarray:
return data
def _depthwise_conv3d(data: np.ndarray,
weight: np.ndarray,
strides: AwesomeStrides3D,
dilations: AwesomeDilation3D,
padding: Optional[AwesomePad3D] = None,
pad_value: Union[float, int] = 0) -> np.ndarray:
""" Support both depthwise_conv3d and depthwise_conv3d with channel_multiplier > 1.
"""
if padding is not None:
# Pad input with constant values
data = np.pad(data, padding, 'constant', constant_values=pad_value)
# DEBUG(Joey): Find a way to do int/int32 depthwise_conv3d to see the accuracy difference.
return tf.nn.depthwise_conv3d(input=data.astype(Float), # TF does not support int/int32 dtype
filter=weight.astype(Float), # TF does not support int/int32 dtype
strides=strides,
padding='VALID',
data_format=AwesomeDataLayout5D,
dilations=dilations).numpy()
def _group_conv2d(data: np.ndarray,
weight: np.ndarray,
groups: int,
strides: AwesomeStrides,
dilations: AwesomeDilation,
padding: Optional[AwesomePad2D] = None,
pad_value: Union[float, int] = 0) -> np.ndarray:
if padding is not None:
# Pad input with constant values
data = np.pad(data, padding, 'constant', constant_values=pad_value)
# Split weight along output channel dimension
weight_k_axis = AwesomeConvWeightLayout.index("O")
weight_list = np.split(weight, indices_or_sections=groups, axis=weight_k_axis)
# Split data along input channel dimension
data_c_axis = AwesomeDataLayout.index("C")
data_list = np.split(data, indices_or_sections=groups, axis=data_c_axis)
# Outputs
output_list = []
for _data, _weight in zip(data_list, weight_list):
output_list.append(
tf.nn.conv2d(input=_data.astype(Float),
filters=_weight.astype(Float),
strides=strides,
padding='VALID',
data_format=AwesomeDataLayout,
dilations=dilations).numpy())
# Concatenate outputs along the channel dimension
output = np.concatenate(output_list, axis=data_c_axis)
return output
def _normal_conv2d(data: np.ndarray,
weight: np.ndarray,
strides: AwesomeStrides,
dilations: AwesomeDilation,
padding: Optional[AwesomePad2D] = None,
pad_value: Union[float, int] = 0) -> np.ndarray:
if padding is not None:
# Pad input with constant values
data = np.pad(data, padding, 'constant', constant_values=pad_value)
return tf.nn.conv2d(input=data.astype(Float),
filters=weight.astype(Float),
strides=strides,
padding='VALID',
data_format=AwesomeDataLayout,
dilations=dilations).numpy()
def _normal_conv3d(data: np.ndarray,
weight: np.ndarray,
strides: AwesomeStrides,
dilations: AwesomeDilation,
padding: Optional[AwesomePad3D] = None,
pad_value: Union[float, int] = 0) -> np.ndarray:
if padding is not None:
# Pad input with constant values
data = np.pad(data, padding, 'constant', constant_values=pad_value)
return tf.nn.conv3d(input=data.astype(Float),
filters=weight.astype(Float),
strides=strides,
padding='VALID',
data_format=AwesomeDataLayout5D,
dilations=dilations).numpy()
def _map_batch(f: Callable[[np.array], np.array], a: np.array) -> np.array:
"""
Apply f elementwise over the first dimension of a and collect results into
a new array.
This is analogous to map over lists, where the first dimension
of the input and output array play the role of the input and output list.
:param f: Function to apply. Its input shape is a.shape[1:] and its
output shape is _map_batch(f, a).shape[1:].
:param a: Array to transform.
:return: Transformed array r, where r[i] = f(a[i]) for each i.
"""
r_list = []
for a_slice in a:
r_list.append(f(a_slice))
return np.stack(r_list, axis=0)
[docs]
def float_convolution(attrs: _afe_attrs.ConvAddActivationAttrs, data: np.ndarray, mode: RunMode) -> np.ndarray:
"""
Execute a floating-point convolution using an algorithm from ml_kernels.
:param attrs: Attributes of the convolution operator
:param data: Input activation data in NHWC layout
:param mode: Mode of execution
:return: Convolved result
"""
assert attrs.conv_attrs.num_spatial_dimensions <= 3, "Only 2D/3D convolution is implemented"
return _ml_kernels_convolution(attrs.conv_attrs, attrs.weights_attrs.data,
attrs.bias_attrs.data if attrs.bias_attrs is not None else None,
attrs.activ_attrs, get_id_requantization(data.dtype), False, 0, 0, data, mode)
[docs]
def quantized_convolution(attrs: _afe_attrs.ConvQuantAttrs, data: np.ndarray, mode: RunMode) -> np.ndarray:
"""
Execute a quantized convolution using an algorithm from ml_kernels.
:param attrs: Attributes of the convolution operator
:param data: Input activation data in NHWC layout
:param mode: Mode of execution
:return: Convolved result
"""
assert attrs.conv_attrs.num_spatial_dimensions <= 3, "Only 2D/3D convolution is implemented"
return _ml_kernels_convolution(attrs.conv_attrs, attrs.weight_quant_data,
attrs.bias_quant_data, attrs.activ_attrs, attrs.requant,
attrs.msb_left_shift, attrs.input_zp, attrs.zero_point, data, mode)
@dataclass
class _GeneralMLKernelsConvolutionParameters:
"""
Parameters used by the convolution functions in the ml_kernels package.
These parameters are in the format expected by ML Kernels, which may be
different from how they are stored in Conv2DQuantAttrs.
:param data_shape: Shape of the input feature map in HWC layout
:param weight: Weight tensor in OIW, OIHW, or OIDHW layout
:param bias: Bias tensor or None
:param strides: Strides in H and W dimensions
:param padding: Padding on top, bottom, left, and right
:param output_padding: Padding to apply to the output on top, bottom, left, and right.
This padding is used for transposed convolution. It must be zero for other convolutions.
:param dilations: Dilation in H and W dimensions
:param int15_params: Parameters for the int15 convolution algorithm. It is used when
the input feature map's scalar type is int16.
:param activ: Activation function to compute as part of convolution.
:param requant: Requantization to apply to the 32-bit result of convolution
:param ifm_zero_point: Zero point of the quantized input
:param ofm_zero_point: Zero point of the quantized output
:param use_tf: Whether to call Tensorflow to compute the convolution
"""
ifm_shape: tuple[int, ...]
weight: np.ndarray
bias: np.ndarray | None
strides: tuple[int, ...]
padding: tuple[int, ...]
output_padding: tuple[int, ...]
dilations: tuple[int, ...]
int15_params: ml_kernels.np_operators.ConvInt15Params
activ: Activation
requant: BaseRequantization[np.ndarray]
ifm_zero_point: int
ofm_zero_point: int
use_tf: bool
_ConvFunction = Callable[[np.ndarray], np.ndarray]
def _make_conv3d_function(p: _GeneralMLKernelsConvolutionParameters,
is_transposed: bool = False,
output_shape: None | tuple[int] = None,
groups: int = 1) -> _ConvFunction:
def compute_conv3d(ifm):
ifm = np.pad(ifm,
[(p.padding[0], p.padding[1]), (p.padding[2], p.padding[3]), (p.padding[4], p.padding[5]),
(0, 0)],
'constant',
constant_values=p.ifm_zero_point)
ifm = np.expand_dims(ifm, axis=0)
# For tf.nn.conv3d, input data must be one of the following types: half, bfloat16, float32, float64
compute_type = np.float32
internal_compute_type = np.float64
conv_compute_type = np.float64 if p.int15_params is not None else compute_type
if is_transposed:
# Weight is MLA layout OIDHW, transpose to DHWOI for TF
weight = p.weight.transpose((2, 3, 4, 0, 1))
ofm = tf.nn.conv3d_transpose(input=ifm.astype(conv_compute_type, copy=False),
filters=weight.astype(conv_compute_type, copy=False),
output_shape=output_shape,
strides=(1,)+p.strides+(1,), padding="VALID", data_format="NDHWC",
dilations=((1,)+p.dilations+(1,))).numpy().astype(conv_compute_type, copy=False)[0]
elif groups > 1:
assert p.output_padding == (0, 0, 0, 0, 0, 0)
ofm = []
for g in range(groups):
group_ifm = ifm[:, :, :, :, g:(g + 1)]
# Weight is MLA layout OIDHW, slice and transpose to DHWIO for TF
group_weight = p.weight[g:(g + 1)].transpose((2, 3, 4, 1, 0))
group_ofm = tf.nn.conv3d(input=group_ifm.astype(conv_compute_type, copy=False),
filters=group_weight.astype(conv_compute_type, copy=False),
strides=(1,)+p.strides+(1,), padding="VALID", data_format="NDHWC",
dilations=((1,)+p.dilations+(1,))) \
.numpy().astype(conv_compute_type, copy=False)[0]
ofm.append(group_ofm)
ofm = np.concatenate(ofm, axis=3)
else:
assert p.output_padding == (0, 0, 0, 0, 0, 0)
# Weight is MLA layout OIDHW, transpose to DHWIO for TF
weight = p.weight.transpose((2, 3, 4, 1, 0))
ofm = tf.nn.conv3d(input=ifm.astype(conv_compute_type, copy=False),
filters=weight.astype(conv_compute_type, copy=False),
strides=(1,)+p.strides+(1,), padding="VALID", data_format="NDHWC",
dilations=((1,)+p.dilations+(1,))).numpy().astype(conv_compute_type, copy=False)[0]
if p.int15_params is not None:
# Shift right and return to 32-bit precision
if isinstance(p.int15_params.msb_left_shift, np.ndarray):
int15_shift = np.array([-1 if s else -8 for s in p.int15_params.msb_left_shift])
else:
int15_shift = -1 if p.int15_params.msb_left_shift else -8
ofm = np.ldexp(ofm, int15_shift).astype(compute_type, copy=False)
if p.bias is not None:
ofm = tf.math.add(ofm.astype(internal_compute_type, copy=False),
p.bias.astype(internal_compute_type, copy=False)).numpy()
ofm = ml_kernels.np_operators.normalize(ofm, compute_type)
if ifm.dtype != bfloat16:
ofm = ofm.astype(np.int32)
ofm = ml_kernels.np_operators.requantize(ofm, p.requant)
assert p.activ in (Activation.RELU, Activation.NONE), "Only support RELU activation for conv3d for now."
if p.activ == Activation.RELU:
ofm[np.where(ofm < p.ofm_zero_point)] = p.ofm_zero_point
return ofm
return compute_conv3d
def _make_normal_convolution_function(p: _GeneralMLKernelsConvolutionParameters) -> _ConvFunction:
assert all(pad == 0 for pad in p.output_padding)
def compute_convolution(ifm):
return ml_kernels.np_operators.ideal_convolution(
ifm, p.weight, p.bias, p.strides, p.padding, p.dilations, p.requant,
activ=p.activ, ifm_zero_point=p.ifm_zero_point, ofm_zero_point=p.ofm_zero_point,
int15_params=p.int15_params, use_tf=p.use_tf
)
return compute_convolution
def _make_group_convolution_function(
p: _GeneralMLKernelsConvolutionParameters, groups: int
) -> _ConvFunction:
assert all(pad == 0 for pad in p.output_padding)
def compute_convolution(ifm):
return ml_kernels.np_operators.ideal_group_convolution(
ifm, p.weight, p.bias, p.strides, p.padding, p.dilations, p.requant,
activ=p.activ, ifm_zero_point=p.ifm_zero_point, ofm_zero_point=p.ofm_zero_point,
groups=groups, int15_params=p.int15_params, use_tf=p.use_tf
)
return compute_convolution
def _make_depthwise_transposed_convolution_function(
p: _GeneralMLKernelsConvolutionParameters, groups: int
) -> _ConvFunction:
# Make a function that computes depthwise transposed convolution.
# No such form of convolution is implemented in ml_kernels, so this function does
# upscaling, padding, and depthwise conv2d.
num_spatial_dims = len(p.ifm_shape) - 1
ofm_spatial_shape = tuple(
(p.ifm_shape[i] - 1) * p.strides[i] + 1 for i in range(num_spatial_dims)
)
ofm_shape = (*ofm_spatial_shape, p.ifm_shape[-1])
kernel_shape = p.weight.shape[2:]
weight = np.rot90(p.weight, k=2, axes=(-2, -1))
# Calculate a padding parameter for np.pad
assert all(x == 0 for x in p.output_padding[::2])
np_pad_shape = list()
for i in range(num_spatial_dims):
np_pad_shape.append(
(
kernel_shape[i] - 1 - p.padding[2 * i],
kernel_shape[i] - 1 - p.padding[2 * i + 1] + p.output_padding[2 * i + 1]
)
)
np_pad_shape.append((0, 0))
def compute_convolution(ifm: np.ndarray):
# Implement depthwise transposed conv2d using upscale, pad, and depthwise conv2d
# Upscale
pad_value = p.ifm_zero_point
slices = [slice(None, None, p.strides[i]) for i in range(num_spatial_dims)]
slices.append(slice(None))
data_ext = np.full(ofm_shape, pad_value, dtype=ifm.dtype)
data_ext[tuple(slices)] = ifm
# Apply padding
data_ext = np.pad(data_ext, np_pad_shape, 'constant', constant_values=pad_value)
# Convolution
strides = (1, ) * num_spatial_dims
padding = (0, ) * (num_spatial_dims * 2)
return ml_kernels.np_operators.ideal_group_convolution(
data_ext, weight, p.bias, strides, padding, p.dilations, p.requant,
activ=p.activ, ifm_zero_point=p.ifm_zero_point, ofm_zero_point=p.ofm_zero_point,
groups=groups, int15_params=p.int15_params, use_tf=p.use_tf
)
return compute_convolution
def _make_transposed_convolution_function(
p: _GeneralMLKernelsConvolutionParameters,
output_shape: None | tuple[int] = None
) -> _ConvFunction:
assert all(x == 1 for x in p.dilations), "Transposed convolution only supports dilation equal to 1"
assert all(x == 0 for x in p.output_padding[::2]), (
"Output padding on beginning of a spatial dimension is not supported"
)
# Remove batch dimension from the computed output shape
ofm_shape = output_shape[1:]
def compute_convolution(ifm):
return ml_kernels.np_operators.ideal_transpose_convolution(
ifm, ofm_shape, p.weight, p.bias, p.strides, p.padding, p.requant,
activ=p.activ, ifm_zero_point=p.ifm_zero_point, ofm_zero_point=p.ofm_zero_point,
int15_params=p.int15_params, use_tf=p.use_tf
)
return compute_convolution
def _ml_kernels_convolution(conv_attrs: _afe_attrs.ConvAttrs,
weight: np.ndarray,
bias: Optional[np.ndarray],
activ_attrs: Optional[Union[_afe_attrs.ACTIVATION_ATTRS, _afe_attrs.QUANT_ACTIVATION_ATTRS]],
requant: BaseRequantization[np.ndarray],
msb_left_shift: Union[bool, np.ndarray],
input_zp: int,
zero_point: int,
data: np.ndarray,
mode: RunMode) -> np.ndarray:
"""
Execute a convolution using an algorithm from ml_kernels. The function parameters other than "data"
should be taken from the convolution's attributes.
This function converts inputs and outputs as required to call ml_kernels.
:param conv_attrs: Attributes of the convolution operator
:param weight: Weight tensor
:param bias: Optional bias tensor
:param activ_attrs: Optional activation after convolution
:param msb_left_shift: Has the same meaning as the msb_left_shift field of Conv2DQuantAttrs,
if this is a convolution with int16 input. Ignored otherwise.
:param input_zp: Zero point of quantized input. Ignored for floating-point.
:param zero_point: Zero point of quantized output. Ignored for floating-point.
:param data: Input activation data in NHWC or NDHWC layout. The type of array elements
must be int8, int16, bfloat16, or bfloat32. This type determines what
numerical precision to use in the convolution algorithm.
:param mode: Mode of execution
:return: Convolved result
"""
import afe.backends.mla.afe_to_n2a_compiler.n2a_compiler_utils as n2a_compiler_utils
assert data.dtype in (np.int8, np.int16, bfloat16, np.float32)
# Decide which convolution function in the ml_kernels package will be used
make_conv_function: Callable[[_GeneralMLKernelsConvolutionParameters], _ConvFunction]
if conv_attrs.is_transposed:
if conv_attrs.groups == 1:
# Normal transposed convolution
output_shape = conv_attrs.output_shape
make_conv_function = functools.partial(
_make_transposed_convolution_function, output_shape=output_shape
)
elif conv_attrs.is_depthwise_one_channel:
# Depthwise transposed convolution
make_conv_function = functools.partial(
_make_depthwise_transposed_convolution_function, groups=conv_attrs.groups
)
else:
# Group transposed convolution
raise NotImplementedError("group transposed convolution is not implemented")
elif conv_attrs.groups == 1:
# Normal convolution
make_conv_function = _make_normal_convolution_function
elif conv_attrs.groups > 1:
# Group convolution
make_conv_function = functools.partial(
_make_group_convolution_function, groups=conv_attrs.groups
)
else:
raise RuntimeError(f"Unknown Conv2D with attributes {conv_attrs}")
# Transpose tensors and attributes to satisfy ml_kernels requirements
strides = conv_attrs.stride
dilations = conv_attrs.dilation
flattened_padding = sum(conv_attrs.padding, ())
flattened_output_padding = sum(conv_attrs.output_padding, ())
# Cast numeric types
if data.dtype in (np.int8, np.int16):
weight = weight.astype(np.int8, copy=False)
assert bias is None or bias.dtype == np.int32
requant = afe.ir.quantization_utils.fix_requantization(requant)
elif data.dtype == bfloat16:
# For conv with bfloat16 input, bias is expected to be of float32 type.
assert bias is None or bias.dtype == np.float32
if weight.dtype == np.float32:
weight = weight.astype(bfloat16)
assert is_identity_requantization(requant)
else:
weight = weight.astype(np.int8, copy=False)
assert is_renormalization(requant)
else: # np.float32
assert weight.dtype == np.float32
assert bias is None or bias.dtype == np.float32
assert is_identity_requantization(requant)
weight = weight.reshape(weight.shape[:-2] + (-1,)) # Flatten the group and output dimensions
tensor_layout = "HWIO" if len(weight.shape) == 4 else "DHWIO"
mla_layout = "OIHW" if len(weight.shape) == 4 else "OIDHW"
weight = utils.transpose_tensor_according_to_layout_strings(weight, tensor_layout, mla_layout)
if data.dtype == np.int16:
int15_params = ml_kernels.np_operators.ConvInt15Params(msb_left_shift=msb_left_shift)
else:
int15_params = None
if bias is not None:
assert len(bias.shape) == 1
if activ_attrs is None:
activ = Activation.NONE
elif isinstance(activ_attrs, _afe_attrs.ReluAttrs | _afe_attrs.ReluQuantAttrs):
activ = Activation.RELU
elif isinstance(activ_attrs, _afe_attrs.ClipAttrs | _afe_attrs.ClipQuantAttrs):
# Clip will be processed after the main convolution algorithm
activ = Activation.NONE
else:
raise TypeError("Unrecognized activation type")
general_parameters = _GeneralMLKernelsConvolutionParameters(
data.shape[1:], weight, bias, strides, flattened_padding, flattened_output_padding, dilations, int15_params,
activ, requant, input_zp, zero_point, mode.is_fast_mode()
)
ofm = _map_batch(make_conv_function(general_parameters), data)
if isinstance(activ_attrs, _afe_attrs.ClipAttrs | _afe_attrs.ClipQuantAttrs):
ofm = clip(activ_attrs, ofm)
# Convert output to the expected layout
return ofm
def _expand_transposed_convolution_padding(attrs: _afe_attrs.ConvAttrs) -> tuple[tuple[int, int], ...]:
"""
Calculate how to pad the input tensor for calculating a transposed convolution using
regular convolution.
Args:
attrs: Attributes of convolution
Returns:
Padding width to apply to the input tensor in each spatial dimension
"""
assert attrs.is_transposed
ret = []
for kernel_size, padding, output_padding in zip(attrs.kernel_size, attrs.padding, attrs.output_padding):
assert output_padding[0] == 0
pad_front = kernel_size - 1 - padding[0]
pad_back = kernel_size - 1 - padding[1] + output_padding[1]
ret.append((pad_front, pad_back))
return tuple(ret)
def _expand_transposed_convolution_input(
attrs: _afe_attrs.ConvAttrs,
data: np.ndarray,
) -> np.ndarray:
"""
Expand data into a larger array for transposed convolution.
Data is copied with a stride into a larger zero-filled array.
Args:
attrs: Attributes of the transposed convolution
data: Input feature map
"""
assert attrs.is_transposed
padding = _expand_transposed_convolution_padding(attrs)
# Create array of zeros having the expanded size
spatial_size = [(size - 1) * s + 1 + p[0] + p[1]
for size, s, p in zip(attrs.input_spatial_shape, attrs.stride, padding)]
data_ext = np.zeros((attrs.batch_size, *spatial_size, attrs.input_channels), dtype=np.float32)
# Copy data into new input array. Padding determines the starting index. Stride determines the stride.
spatial_index_pattern = [slice(p[0], p[0] + s * size, s)
for size, s, p in zip(attrs.input_spatial_shape, attrs.stride, padding)]
data_ext[(slice(None), *spatial_index_pattern, slice(None))] = data
return data_ext
[docs]
def conv_tensorflow(attrs: _afe_attrs.ConvAttrs, data: np.ndarray, weight: np.ndarray) -> np.ndarray:
"""
Compute a floating-point convolution by calling Tensorflow's convolution operator.
This function may not exactly match MLA behavior.
Args:
attrs: Attributes of the convolution
data: Input tensor
weight: Weight tensor
Returns: Convolved tensor
"""
assert data.shape == attrs.input_shape
assert weight.shape == attrs.weight_shape
if attrs.groups > 1:
# Perform convolution on each group and concatenate the results
data_groups = np.split(data, attrs.groups, axis=-1)
weight_groups = np.split(weight, attrs.groups, axis=-2)
group_attrs = dataclasses.replace(attrs, weight_shape=weight_groups[0].shape)
return np.concatenate([conv_tensorflow(group_attrs, d, w) for d, w in zip(data_groups, weight_groups)], axis=-1)
if attrs.is_transposed:
# Convert transposed convolution to regular convolution
assert all(d == 1 for d in attrs.dilation), \
"Transposed convolution with dilation greater than 1 is not supported"
data = _expand_transposed_convolution_input(attrs, data)
weight = np.flip(weight, axis=tuple(range(attrs.num_spatial_dimensions)))
no_padding = ((0, 0),) * attrs.num_spatial_dimensions
regular_attrs = dataclasses.replace(attrs, is_transposed=False,
input_spatial_shape=data.shape[1:-1],
stride=(1,) * attrs.num_spatial_dimensions,
padding=no_padding, output_padding=no_padding)
return conv_tensorflow(regular_attrs, data, weight)
# Else, regular convolution
weight = weight.squeeze(axis=-2) # Remove the groups axis
weight = weight.astype(np.float32, copy=False)
data = data.astype(np.float32, copy=False)
if any(p[0] != 0 or p[1] != 0 for p in attrs.padding):
numpy_padding = ((0, 0), *attrs.padding, (0, 0))
data = np.pad(data, numpy_padding)
ret = tf.nn.convolution(data, weight, strides=attrs.stride, dilations=attrs.dilation)
return np.array(ret, dtype=np.float32)
[docs]
def add(lhs: np.ndarray, rhs: np.ndarray, axis: Optional[int] = None) -> np.ndarray:
assert (lhs.dtype == rhs.dtype) and lhs.dtype in (np.float32, bfloat16, np.int32)
if axis is not None:
shape_len_minus_batch = len(lhs.shape) - 1
if axis < 0:
axis = axis + len(lhs.shape)
for _ in range(shape_len_minus_batch - axis):
rhs = np.expand_dims(rhs, axis=-1)
if lhs.dtype == np.int32:
output = lhs.astype(np.int64) + rhs
np.clip(output, np.iinfo(np.int32).min, np.iinfo(np.int32).max, out=output)
else:
output = lhs + rhs
return output
[docs]
def relu(data: np.ndarray, zp: int = 0) -> np.ndarray:
if data.dtype == bfloat16:
# Convert zp to bfloat16, as numpy will fail trying to convert it in np.where
zp = bfloat16(zp)
data = np.where(data < zp, zp, data)
return data
[docs]
def clip(attrs: _afe_attrs.ClipAttrs | _afe_attrs.ClipQuantAttrs, data: np.ndarray) -> np.ndarray:
outputs: List[np.ndarray] = list()
a_min = np.asarray(attrs.a_min).astype(np.float32) if isinstance(attrs.a_min, float) else attrs.a_min
a_max = np.asarray(attrs.a_max).astype(np.float32) if isinstance(attrs.a_max, float) else attrs.a_max
for idx in range(data.shape[0]):
output = ml_kernels.np_operators.ideal_clip(data[idx], a_min, a_max)
outputs.append(np.expand_dims(output, axis=0))
res = np.concatenate(outputs)
return res
[docs]
def prelu(data: np.ndarray,
alpha: Union[np.ndarray, float, int],
axis: Optional[int] = None,
zp: int = 0) -> np.ndarray:
assert data.dtype in (np.float32, np.int8, bfloat16)
if axis is not None:
data = utils.transpose_axis_to_the_last(data, axis)
positive = relu(data, zp)
negative = alpha * (data - positive)
res = positive + negative
if axis is not None:
res = utils.transpose_axis_to_the_last(res, axis)
if data.dtype == bfloat16:
assert res.dtype == bfloat16
return res
[docs]
def elu(data: np.ndarray):
return tf.nn.elu(data).numpy()
[docs]
def leaky_relu(data: np.ndarray, alpha: Union[float, int]) -> np.ndarray:
assert data.dtype == np.float32
return prelu(data, alpha, axis=None).astype(np.float32)
[docs]
def maxpool(attrs: _afe_attrs.PoolAttrs, data: np.ndarray, pad_value: Union[float, int],
mode: RunMode = RunMode.MLA_MODE) -> np.ndarray:
assert data.dtype in (np.float32, bfloat16, np.int8, np.int16)
padding = attrs.padding
# Explicit padding
data = np.pad(data, padding, 'constant', constant_values=pad_value)
# Pick an AwesomeDataLayout depending on the dimensionality
awesome_data_layout = AwesomeDataLayout if len(attrs.layout) == 4 else AwesomeDataLayout5D
assert len(attrs.layout) in (4, 5)
# Transpose pool_size, strides, and data to accommodate tensorflow requirements
pool_size = utils.transpose_attr_according_to_layout_strings(attrs.pool_size, attrs.layout, awesome_data_layout)
strides = utils.transpose_attr_according_to_layout_strings(attrs.strides, attrs.layout, awesome_data_layout)
data = utils.transpose_tensor_according_to_layout_strings(data, attrs.layout, awesome_data_layout)
assert pool_size[0] == pool_size[-1] == 1
assert strides[0] == strides[-1] == 1
min_value = ml_kernels.math_helpers.get_dtype_min(data.dtype)
outputs = []
for idx in range(data.shape[0]):
outputs.append(
ml_kernels.np_operators.ideal_pool(
data[idx, ...], pool_size[1:-1], strides[1:-1], (0, ) * (data.ndim - 2) * 2, "max",
get_id_requantization(data.dtype), pad_value=min_value,
use_tf=mode.is_fast_mode()
)
)
output = np.stack(outputs)
# When attrs.out_layout is an empty string we keep it same as attrs.layout.
output_layout = attrs.out_layout if attrs.out_layout else attrs.layout
# Transpose data according to out_layout
output = utils.transpose_tensor_according_to_layout_strings(output, awesome_data_layout, output_layout)
return output.astype(data.dtype, copy=False)
def _avgpool(data: np.ndarray, data_layout: str,
pool_size: AwesomePoolSize, strides: AwesomeStrides,
out_layout: str, out_type: ScalarType, quantized: bool = False, rounding_type: Optional[RoundType] = None,
requant: Optional[BaseRequantization] = None,
mode: RunMode = RunMode.MLA_MODE) -> np.ndarray:
"""
Running avgpool on the given data
"""
# Pick an AwesomeDataLayout depending on the dimensionality
awesome_data_layout = AwesomeDataLayout if len(data_layout) == 4 else AwesomeDataLayout5D
assert len(data_layout) in (4, 5) # Required by ideal_pool
# Transpose pool_size, strides, and data to accommodate awesome_data_layout
pool_size = utils.transpose_attr_according_to_layout_strings(pool_size, data_layout, awesome_data_layout)
strides = utils.transpose_attr_according_to_layout_strings(strides, data_layout, awesome_data_layout)
data = utils.transpose_tensor_according_to_layout_strings(data, data_layout, awesome_data_layout)
assert pool_size[0] == pool_size[-1] == 1
assert strides[0] == strides[-1] == 1
paddings = (0, ) * (data.ndim - 2) * 2
if quantized:
assert data.dtype == np.int8 or data.dtype == np.int16
# Using quantized pooling kernels
output = _avgpool_kernel(
data, pool_size[1:-1], strides[1:-1], paddings=paddings, rounding_type=rounding_type,
requant=requant, mode=mode
)
else:
if data.dtype == np.float32:
# Make sure the result exactly matches ONNX's result by calling the tensorflow library
output = tf.nn.avg_pool(data, pool_size, strides, 'VALID', awesome_data_layout).numpy()
else:
# Put the scaling factor for averaging into a Renormalization
renormalization = Renormalization(
np.float32(1.0) / math.prod(pool_size[1:-1]),
utils.create_and_verify_narrowing(0, RoundType.TOEVEN, out_type.numpy_type())
)
outputs = []
for idx in range(data.shape[0]):
outputs.append(ml_kernels.np_operators.ideal_pool(
data[idx, ...], tuple(pool_size[1:-1]), strides[1:-1], paddings, "average",
renormalization, pad_value=0, use_tf=mode.is_fast_mode())
)
output = np.stack(outputs)
# Transpose data back to original layout
output = utils.transpose_tensor_according_to_layout_strings(output, awesome_data_layout, out_layout)
return output
[docs]
def avgpool(attrs: _afe_attrs.PoolAttrs, data: np.ndarray,
pad_value: Union[float, int], quantized: bool = False,
rounding_type: Optional[RoundType] = None,
requant: Optional[BaseRequantization] = None,
mode: RunMode = RunMode.MLA_MODE,
) -> np.ndarray:
# Explicit padding
padding = attrs.padding
data = np.pad(data, padding, 'constant', constant_values=pad_value)
out_layout = attrs.out_layout if attrs.out_layout else attrs.layout
return _avgpool(data, attrs.layout, attrs.pool_size, attrs.strides, out_layout, attrs.scalar_type,
quantized=quantized, rounding_type=rounding_type, requant=requant, mode=mode)
[docs]
def variance(data: np.ndarray, mean: np.ndarray, requant: BaseRequantization | None = None,
requant_var: BaseRequantization | None = None):
from ml_kernels.np_operators import ideal_variance
if data.dtype == np.float32 or data.dtype == bfloat16:
divisor = np.float32(1.0) / math.prod(data.shape[1:-1])
requant = Renormalization(
divisor, utils.create_and_verify_narrowing(0, RoundType.TOEVEN, data.dtype)
)
assert data.shape[0] == mean.shape[0]
kernel_size = data.shape[1:-1]
stride = (1, ) * len(kernel_size)
padding = (0, ) * len(kernel_size) * 2
outputs = [
np.expand_dims(
ideal_variance(
ifm=data[i], mean=mean[i], kernel_size=kernel_size, stride=stride,
padding=padding, requantization=requant, requant_var=requant_var
),
axis=0
)
for i in range(data.shape[0])
]
out = np.concatenate(outputs, axis=0)
return out
def _get_pool_size_from_shape(input_shape: InputShape, layout: str) -> AwesomePoolSize:
"""Use the input data shape to create a pooling window size"""
c_dim = layout.index("C")
pool_size = list(input_shape)
pool_size[c_dim] = 1
return tuple(pool_size)
[docs]
def adaptive_avgpool2d(attrs: _afe_attrs.AdaptiveAvgPool2DAttrs, data: np.ndarray,
quantized: bool = False, rounding_type: Optional[RoundType] = None,
*args, **kwargs) -> np.ndarray:
pool_size = _get_pool_size_from_shape(data.shape, attrs.layout)
strides = (1, 1, 1, 1)
out_layout = attrs.out_layout if attrs.out_layout else attrs.layout
output = _avgpool(data, attrs.layout, pool_size, strides=strides, out_layout=out_layout,
out_type=ScalarType.float32, quantized=quantized, rounding_type=rounding_type,
mode=RunMode.MLA_MODE)
return output
[docs]
def broadcast_to(attrs: Union[_afe_attrs.BroadcastToAttrs, _afe_attrs.BroadcastToQuantAttrs], data: np.ndarray):
return np.copy(np.broadcast_to(data, attrs.output_shape))
[docs]
def multiply(lhs: np.ndarray, rhs: np.ndarray) -> np.ndarray:
"""Floating-point multiplication."""
assert lhs.dtype == rhs.dtype and lhs.dtype in (bfloat16, np.float32)
return tf.math.multiply(lhs, rhs).numpy()
[docs]
def quantized_multiply(attrs: _afe_attrs.MultiplyQuantAttrs, lhs: np.ndarray, rhs: np.ndarray) -> np.ndarray:
"""Quantized multiplication."""
from ml_kernels.np_operators import ideal_mul
# Process the batch dimension with a loop. ideal_mul does not handle the batch dimension.
lhs, rhs = np.broadcast_arrays(lhs, rhs)
products = []
for i in range(lhs.shape[0]):
p = ideal_mul(lhs[i, ...], rhs[i, ...], attrs.lhs_zero_point, attrs.rhs_zero_point, attrs.requant,
intrinsic_shift=attrs.intrinsic_shift)
products.append(p)
return np.stack(products)
[docs]
def pad(attrs: _afe_attrs.PadAttrs, data: np.ndarray, pad_value: np.ndarray) -> np.ndarray:
pad_value_scalar = np.ndarray.item(pad_value)
assert data.dtype == np.float32
output = tf.pad(data, attrs.pad_width, mode='CONSTANT', constant_values=pad_value_scalar).numpy()
return output.astype(data.dtype)
[docs]
def mean(attrs: _afe_attrs.MeanAttrs, data: np.ndarray, quantized=False) -> np.ndarray:
"""
When in quantized set to True, using avg_pool2d to do mean along axis =
* (1)
* (2)
* (1, 2)
Parameters
----------
:param attrs: MeanAttrs. Attributes needed to execute the mean operation.
:param data: np.ndarray. Input data to the mean operation.
:param quantized: bool. Default is False. Set to True if the mean operation
is executed in a quantization domain.
Return
------
:return: np.ndarray. Result of the mean operation.
"""
# TODO: Adding mean operator in ml_kernels
axis = attrs.axis
if attrs.exclude:
axis = utils.exclude_axes(len(data.shape), axis)
if (
quantized
and 0 not in axis
and data.ndim in (4, 5)
and data.ndim - 1 not in axis
):
assert data.dtype == np.int8
# Using _avgpool_kernel to do mean
pool_shape = tuple(
x if i in axis else 1
for i, x in enumerate(data.shape[1:-1], start=1)
)
strides = (1, ) * len(pool_shape)
paddings = (0, ) * len(pool_shape) * 2
output = _avgpool_kernel(data, pool_shape, strides=strides, paddings=paddings)
if not attrs.keepdims:
output = np.squeeze(output, tuple(axis))
else:
output = tf.math.reduce_mean(data, axis=axis, keepdims=bool(attrs.keepdims)).numpy()
return output
[docs]
def squeeze(attrs: _afe_attrs.SqueezeAttrs, data: np.ndarray) -> np.ndarray:
return tf.squeeze(data, axis=attrs.axis).numpy()
[docs]
def argmax(attrs: _afe_attrs.ArgMaxAttrs, data: np.ndarray) -> np.ndarray:
if attrs.input_scalar_type == ScalarType.float32:
# Floating-point argmax. Use the numpy implementation.
assert data.dtype == np.float32
assert attrs.result_scalar_type == ScalarType.int32, "Unsupported result type for the argmax operator"
axis = attrs.axis
if attrs.exclude:
axis = utils.exclude_axes(len(data.shape), axis)
assert len(axis) == 1, "Error Argmax does not support multiple axes"
if attrs.select_last_index:
data = np.flip(data, axis=axis[0])
res = np.argmax(data, axis[0], keepdims=bool(attrs.keepdims)).astype(np.int32)
if attrs.select_last_index:
res = data.shape[axis[0]] - 1 - res
return res
elif attrs.input_scalar_type in (ScalarType.int8, ScalarType.bfloat16):
from ml_kernels.np_operators import ideal_arg_min_max
# Integer argmax. Use the reference implementation in ml_kernels.
assert data.dtype == attrs.input_scalar_type.numpy_type()
assert attrs.result_scalar_type == ScalarType.int32, "Unsupported result type for the argmax operator"
# Call the reference implementation without a batch dimension.
outputs: List[np.ndarray] = list()
for idx in range(data.shape[0]):
output = ideal_arg_min_max(
data[idx], ArgMinMaxOp.MAX, select_last_index=attrs.select_last_index
)
outputs.append(np.expand_dims(output, axis=0))
res = np.concatenate(outputs)
return res
# else
raise ValueError("Unexpected scalar type for argmax operator")
[docs]
def softmax(attrs: Union[_afe_attrs.SoftmaxAttrs, _afe_attrs.SoftmaxQuantAttrs], data: np.ndarray) -> np.ndarray:
if isinstance(attrs, _afe_attrs.SoftmaxQuantAttrs):
from ml_kernels.requantization import FractionalZeroRequantization, Narrowing
from ml_kernels.np_operators import ideal_softmax
assert data.dtype in (np.int8, np.int16)
axis = attrs.axis if attrs.axis == -1 else attrs.axis - 1
lut_input_pre_shift = attrs.lut_input_pre_shift
output_pre_shift = attrs.output_pre_shift
output = np.array([ideal_softmax(ifm, zp_exp=attrs.exp_zp,
zp_rec=attrs.rec_zp, axis=axis,
lut_exp=attrs.lookup_table_exp,
lut_rec=attrs.lookup_table_rec,
req_lut_input=attrs.requant_lut,
req_output=attrs.requant_output,
lut_input_pre_shift=lut_input_pre_shift,
output_pre_shift=output_pre_shift) for ifm in data])
elif data.dtype == bfloat16:
from ml_kernels.global_constants import compute_bf16_exp_lut, compute_bf16_reciprocal_lut
from ml_kernels.np_operators import ideal_softmax
axis = attrs.axis if attrs.axis == -1 else attrs.axis - 1
lut_exp = compute_bf16_exp_lut()
lut_rec = compute_bf16_reciprocal_lut()
output = np.array([ideal_softmax(ifm, axis=axis,
lut_exp=lut_exp,
lut_rec=lut_rec) for ifm in data])
else:
output = _ev_transforms.softmax(data, attrs.axis)
return output
[docs]
def lrn(attrs: Union[_afe_attrs.LRNAttrs, _afe_attrs.LRNQuantAttrs], data: np.ndarray) -> np.ndarray:
from ml_kernels.np_operators import ideal_lrn
if attrs.axis == 1:
# Transpose data to AwesomeDataLayout. We assume the data is formatted to NCHW
data = utils.transpose_tensor_according_to_layout_strings(data, 'NCHW', AwesomeDataLayout)
if isinstance(attrs, _afe_attrs.LRNAttrs):
assert data.dtype == np.float32
# NOTES FOR TENSORFLOW
# TVM defines size as size_tvm = (depth_radius_tf * 2) + 1
# TVM defines alpha as alpha_tvm = alpha_tf * size_tf
alpha = attrs.alpha / attrs.size
if attrs.size % 2 == 1:
# Use tf for odd window size
depth_radius = int((attrs.size - 1) / 2)
output = tf.nn.local_response_normalization(input=data, depth_radius=depth_radius,
bias=attrs.bias, alpha=alpha, beta=attrs.beta).numpy()
else:
# Use ml_kernels for even window size
output = ideal_lrn(data[0], window_size=attrs.size, padding=attrs.size // 2,
bias=attrs.bias, alpha=alpha, beta=attrs.beta)
output = np.expand_dims(output, axis=0)
else:
assert isinstance(attrs, _afe_attrs.LRNQuantAttrs)
assert data.dtype == np.int8
from ml_kernels.requantization import FractionalZeroRequantization, Narrowing
requant_lut_input = FractionalZeroRequantization(attrs.lut_scale,
attrs.lut_zp_corr,
Narrowing(attrs.lut_sh, RoundType.UPWARD, data.dtype))
requant_output = FractionalZeroRequantization(attrs.output_scale,
attrs.output_zp_corr,
Narrowing(attrs.output_sh, RoundType.UPWARD, data.dtype))
output = ideal_lrn(data[0], window_size=attrs.size, padding=attrs.size // 2,
lut=attrs.lookup_table.reshape((16, 16)),
zp_input=attrs.input_zp,
requant_lut_input=requant_lut_input,
requant_output=requant_output)
output = np.expand_dims(output, axis=0)
if attrs.axis == 1:
# Transpose data back to original layout
output = utils.transpose_tensor_according_to_layout_strings(output, AwesomeDataLayout, 'NCHW')
return output
[docs]
def concatenate(attrs: _afe_attrs.ConcatenateAttrs, data_list: List[np.ndarray]) -> np.ndarray:
return tf.concat(values=data_list, axis=attrs.axis).numpy()
[docs]
def transpose(attrs: _afe_attrs.TransposeAttrs, data: np.ndarray) -> np.ndarray:
if len(attrs.axes) < 1:
perm = None
else:
perm = attrs.axes
output = tf.transpose(a=data, perm=perm).numpy()
return output
[docs]
def depth_to_space(attrs: _afe_attrs.DepthToSpaceAttrs, data: np.ndarray) -> np.ndarray:
return ml_kernels.np_operators.ideal_depth_to_space(data, attrs.block_size, attrs.mode)
[docs]
def reshape(attrs: _afe_attrs.ReshapeAttrs, data: np.ndarray) -> np.ndarray:
return _ev_transforms.reshape(attrs.newshape, data)
[docs]
def expand_dims(attrs: _afe_attrs.ExpandDimsAttrs, data: np.ndarray) -> np.ndarray:
for _ in range(attrs.num_newaxis):
data = np.expand_dims(data, attrs.axis)
return data
[docs]
def batch_flatten(data: np.ndarray) -> np.ndarray:
"""Flattens all the dimensions except for the batch dimension"""
new_shape = (data.shape[0], -1)
return np.reshape(data, new_shape)
[docs]
def min_op(attrs: _afe_attrs.ExtmAttrs, data: np.ndarray) -> np.ndarray:
assert data.dtype == np.float32
axis = attrs.axis
if attrs.exclude:
axis = utils.exclude_axes(len(data.shape), axis)
assert len(axis) == 1, "Error min does not support multiple axes"
_validate_reduce_operands(axis, data.shape)
output = tf.math.reduce_min(input_tensor=data, axis=axis[0], keepdims=bool(attrs.keepdims))
return output.numpy()
[docs]
def max_op(attrs: _afe_attrs.ExtmAttrs, data: np.ndarray) -> np.ndarray:
assert data.dtype == np.float32
axis = attrs.axis
if attrs.exclude:
axis = utils.exclude_axes(len(data.shape), axis)
assert len(axis) == 1, "Error max does not support multiple axes"
_validate_reduce_operands(axis, data.shape)
output = tf.math.reduce_max(input_tensor=data, axis=axis[0], keepdims=bool(attrs.keepdims))
return output.numpy()
[docs]
def sum_op(attrs: _afe_attrs.SumAttrs, data: np.ndarray) -> np.ndarray:
assert data.dtype == np.float32
axis = attrs.axis
if attrs.exclude:
axis = utils.exclude_axes(len(data.shape), axis)
assert len(axis) == 1, "Error sum does not support multiple axes"
_validate_reduce_operands(axis, data.shape)
output = tf.math.reduce_sum(input_tensor=data, axis=axis[0], keepdims=bool(attrs.keepdims))
return output.numpy()
[docs]
def prod(attrs: _afe_attrs.ProdAttrs, data: np.ndarray) -> np.ndarray:
assert data.dtype == np.float32
axis = attrs.axis
if attrs.exclude:
axis = utils.exclude_axes(len(data.shape), axis)
assert len(axis) == 1, "Error prod does not support multiple axes"
_validate_reduce_operands(axis, data.shape)
output = tf.math.reduce_prod(input_tensor=data, axis=axis[0], keepdims=bool(attrs.keepdims))
return output.numpy()
[docs]
def tuple_get_item(attrs: _afe_attrs.TupleGetItemAttrs, t: tuple) -> np.ndarray:
return t[attrs.index]
[docs]
def exp(data: np.ndarray) -> np.ndarray:
return tf.math.exp(data).numpy()
[docs]
def sqrt(data: np.ndarray) -> np.ndarray:
return tf.math.sqrt(data).numpy()
[docs]
def sigmoid(data: np.ndarray) -> np.ndarray:
if data.dtype == bfloat16:
from ml_kernels.global_constants import compute_bf16_exp_lut, compute_bf16_reciprocal_lut
from ml_kernels.np_operators import ideal_sigmoid
lut_exp = compute_bf16_exp_lut()
lut_rec = compute_bf16_reciprocal_lut()
output = np.array([ideal_sigmoid(ifm, lut_exp=lut_exp, lut_rec=lut_rec) for ifm in data])
else:
output = tf.math.sigmoid(data).numpy()
return output
[docs]
def swish(data: np.ndarray) -> np.ndarray:
if data.dtype == bfloat16:
from ml_kernels.global_constants import compute_bf16_exp_lut, compute_bf16_reciprocal_lut
from ml_kernels.np_operators import ideal_sigmoid
lut_exp = compute_bf16_exp_lut()
lut_rec = compute_bf16_reciprocal_lut()
output = np.array([multiply(ifm, ideal_sigmoid(ifm, lut_exp=lut_exp, lut_rec=lut_rec)) for ifm in data])
else:
output = tf.math.multiply(data, tf.math.sigmoid(data)).numpy()
return output
[docs]
def hard_sigmoid(data: np.ndarray) -> np.ndarray:
return np.clip((data / 6.0) + 0.5, a_min=0.0, a_max=1.0)
[docs]
def hard_swish(data: np.ndarray) -> np.ndarray:
return data * np.clip((data / 6.0) + 0.5, a_min=0.0, a_max=1.0)
[docs]
def log(data: np.ndarray) -> np.ndarray:
return np.log(data)
[docs]
def softplus(data: np.ndarray) -> np.ndarray:
return tf.math.softplus(data).numpy()
[docs]
def erf(data: np.ndarray) -> np.ndarray:
if data.dtype == np.dtype('bfloat16'):
from ml_kernels.np_operators import ideal_erf
outs = []
for i in range(data.shape[0]):
out = ideal_erf(data[i])
out = np.expand_dims(out, axis=0)
outs.append(out)
output = np.concatenate(outs, axis=0)
return output
else:
return tf.math.erf(data).numpy()
[docs]
def gelu(x: np.ndarray) -> np.ndarray:
return x * 0.5 * (1.0 + tf.math.erf(x / np.sqrt(2)).numpy())
[docs]
def log2(data: np.ndarray) -> np.ndarray:
return np.log2(data)
[docs]
def log10(data: np.ndarray) -> np.ndarray:
return np.log10(data)
[docs]
def subtract(lhs: np.ndarray, rhs: np.ndarray) -> np.ndarray:
assert (lhs.dtype == rhs.dtype) and lhs.dtype in (np.float32, bfloat16, np.int32)
return tf.math.subtract(lhs, rhs).numpy()
[docs]
def power(lhs: np.ndarray, rhs: np.ndarray) -> np.ndarray:
return tf.math.pow(lhs, rhs).numpy()
[docs]
def divide(lhs: np.ndarray, rhs: np.ndarray) -> np.ndarray:
return np.asarray(tf.math.divide_no_nan(lhs, rhs))
[docs]
def reciprocal(data: np.ndarray) -> np.ndarray:
return tf.math.reciprocal(data).numpy()
[docs]
def maximum(lhs: np.ndarray, rhs: np.ndarray) -> np.ndarray:
assert (lhs.dtype == np.float32 and rhs.dtype == np.float32)
return tf.math.maximum(lhs, rhs).numpy()
[docs]
def minimum(lhs: np.ndarray, rhs: np.ndarray) -> np.ndarray:
assert (lhs.dtype == np.float32 and rhs.dtype == np.float32)
return tf.math.minimum(lhs, rhs).numpy()
[docs]
def full(attrs: _afe_attrs.FullAttrs, fill_value: np.ndarray) -> np.ndarray:
dtype = attrs.dtype if attrs.dtype else None
return np.full(attrs.shape, fill_value, dtype=dtype)
[docs]
def tile(attrs: _afe_attrs.TileAttrs, data: np.ndarray) -> np.ndarray:
return np.tile(data, attrs.reps)
[docs]
def split(attrs: _afe_attrs.SplitAttrs, data: np.ndarray) -> Tuple[np.ndarray, ...]:
return tuple(np.split(data, attrs.indices_or_sections, attrs.axis))
[docs]
def take(attrs: _afe_attrs.TakeAttrs, data: np.ndarray, indices: np.ndarray) -> np.ndarray:
assert attrs.batch_dims == 0, "Take operation currently only support batch_dims == 0"
mode = attrs.mode
if mode == 'fast':
mode = 'raise'
return np.take(data, indices, axis=attrs.axis, mode=mode)
[docs]
def strided_slice(attrs: _afe_attrs.StridedSliceAttrs, data: np.ndarray) -> np.ndarray:
begin = list(attrs.begin)
end = list(attrs.end)
strides = list(attrs.strides)
if attrs.axes is not None:
# Begin, end, strides and axes are constrained to the same length by TVM
axes = list(attrs.axes)
new_begin = len(data.shape) * [0]
new_end = list(data.shape)
new_strides = len(data.shape) * [1]
for i, e in enumerate(axes):
new_begin[e] = begin[i]
new_end[e] = end[i]
new_strides[e] = strides[i]
begin = new_begin
end = new_end
strides = new_strides
if attrs.slice_mode == 'size':
for i, b_e in enumerate(zip(begin, end)):
b, e = b_e
if e == -1:
end[i] = data.shape[i]
else:
end[i] = b + e
strides = None
return tf.strided_slice(data, begin=begin, end=end, strides=strides).numpy()
[docs]
def rsqrt(data: np.ndarray) -> np.ndarray:
return tf.math.rsqrt(data).numpy()
[docs]
def tanh(data: np.ndarray) -> np.ndarray:
return tf.math.tanh(data).numpy()
def _resize2d(data: np.ndarray, size: Tuple[int, int],
method: str, layout: str = AwesomeDataLayout,
align_corners: bool = False,
tf_ver: int = 1,
rounding: Optional[RoundType] = None) -> np.ndarray:
"""
Resize along H and/or W dimension(s) using Tensorflow V1 or V2 image.resize function.
:param: data: np.ndarray
:param size: Tuple[int, int]. Output size after resizing
:param method: str. Name of the resizing methods. Currently support "nearest" and "bilinear"
:param layout: str. Defaut is AwesomeDataLayout. Layout of the input data
:param align_corners: bool. Default is False. Only applicable with tf_ver == 1
:param rounding: Rounding method for integer resize. If None, floating-point resize is performed.
"""
dtype = data.dtype
if rounding is None:
assert dtype in ('float32', 'bfloat16')
else:
assert dtype in ('int8', 'int16')
_SUPPORTED_METHODS = ["nearest", "bilinear"]
assert method in _SUPPORTED_METHODS, f"Only support {_SUPPORTED_METHODS}, got {method}"
assert len(data.shape) == len(layout) == 4, f"Only support 4-D tensor, got {data.shape}-D with layout = {layout}"
data = utils.transpose_tensor_according_to_layout_strings(data, layout, AwesomeDataLayout)
if tf_ver == 1:
output = tf.compat.v1.image.resize(images=data,
size=size,
method=method,
align_corners=align_corners).numpy()
else:
output = tf.image.resize(images=data,
size=size,
method=method).numpy()
if rounding is not None:
output = round_op(output, rounding)
output = utils.transpose_tensor_according_to_layout_strings(output, AwesomeDataLayout, layout)
return output.astype(dtype)
[docs]
def image_resize2d(attrs: _afe_attrs.ImageResize2DAttrs, data: np.ndarray, rounding: Optional[str] = None) -> np.ndarray:
"""
AFE and MLA does not have a way to support nearest_neighbor with asymmetric. However,
the error should be ignorable during inference.
"""
method = attrs.method
if method == 'nearest_neighbor':
method = 'nearest'
elif method == 'linear':
method = 'bilinear'
elif method == 'cubic':
method = 'bicubic'
# Find out which version of Tensorflow
# TF v1 default is "asymmetric" while TF v2 default is "half_pixel"
# TVM does not care about this, it always returns "asymmetric" which can introduce error
tf_ver = 2 if attrs.coordinate_transformation_mode in ['half_pixel', 'pytorch_half_pixel'] else 1
align_corners = attrs.coordinate_transformation_mode == 'align_corners'
output = _resize2d(data, attrs.size, method, attrs.layout, align_corners, tf_ver, rounding=rounding)
return output
[docs]
def upsample(attrs: _afe_attrs.UpsamplingAttrs, data: np.ndarray, rounding: Optional[str] = None) -> np.ndarray:
"""
Upsample the input tensor along H and/or W dimension
"""
method = "nearest" if attrs.method == "nearest_neighbor" else attrs.method
height_dim, weight_dim = (data.shape[attrs.layout.index("H")],
data.shape[attrs.layout.index("W")])
size = (int(attrs.scale_h * height_dim), int(attrs.scale_w * weight_dim))
return _resize2d(data, size, method, attrs.layout, attrs.align_corners, tf_ver=1, rounding=rounding)
[docs]
def gridsample(attrs: _afe_attrs.GridSampleAttrs,
data: np.ndarray, grid: np.ndarray) -> np.ndarray:
"""
Image interpolation through GridSample
"""
mode = "linear" if attrs.method == "bilinear" else attrs.method
if data.dtype != bfloat16:
return_dtype = data.dtype
data = data.astype(bfloat16)
grid = grid.astype(bfloat16)
else:
return_dtype = bfloat16
outputs = []
for idx in range(data.shape[0]):
input_data = data[idx]
grid_data = grid[idx]
output = ml_kernels.np_operators.ideal_grid_sample(
input_data, grid_data, mode, attrs.padding_mode, attrs.align_corners
)
outputs.append(np.expand_dims(output, axis=0))
res = np.concatenate(outputs)
return res.astype(return_dtype)
def _validate_reduce_operands(axis: Tuple[int, ...], shape: Tuple[int, ...]):
if len(shape) == 4:
# Since only NHWC is currently supported by AFE, NHWC layout is assumed if tensor has 4 dimensions
assert set(axis) == set([3]), "Only channel dimension is supported for NHWC"
elif len(shape) == 3:
# Only support H, W or both HW dimension for NHW
assert set(axis) in [set([1]), set([2]), set([1, 2])], "H, W or both HW dimensions are supported for NHW"
[docs]
def layer_norm(attrs: _afe_attrs.LayerNormAttrs | _afe_attrs.LayerNormQuantAttrs,
data: np.ndarray) -> np.ndarray:
from ml_kernels.global_constants import compute_bf16_rsqrt_lut
from ml_kernels.np_operators import ideal_layer_norm
if isinstance(attrs, _afe_attrs.LayerNormAttrs):
if data.dtype == np.float32:
output = ideal_layer_norm(data[0], axis=attrs.axis, epsilon=attrs.epsilon)
else:
lut_rsqrt = compute_bf16_rsqrt_lut()
output = ideal_layer_norm(data[0], axis=attrs.axis, epsilon=attrs.epsilon, lut_rsqrt=lut_rsqrt)
else:
assert isinstance(attrs, _afe_attrs.LayerNormQuantAttrs)
assert data.dtype == np.int8
output = ideal_layer_norm(
data[0], axis=attrs.axis, lut_rsqrt=attrs.lookup_table_rsqrt, zp_rsqrt=attrs.zp_rsqrt,
req_mean=attrs.requant_mean, req_lut_input=attrs.requant_lut_input,
req_output=attrs.requant_output)
output = np.expand_dims(output, axis=0)
return output
[docs]
def rms_norm(data: np.ndarray, attrs: Union[_afe_attrs.RMSNormAttrs, _afe_attrs.RMSNormQuantAttrs]) -> np.ndarray:
from ml_kernels.global_constants import compute_bf16_rsqrt_lut
from ml_kernels.np_operators import ideal_rms_norm
assert data.shape[0] == 1, f'Batch dimension should be 1, got {data.shape[0]}.'
if isinstance(attrs, _afe_attrs.RMSNormAttrs):
if data.dtype == np.float32:
output = ideal_rms_norm(data[0], epsilon=attrs.epsilon)
else:
lookup_table_rsqrt = compute_bf16_rsqrt_lut()
output = ideal_rms_norm(data[0], epsilon=attrs.epsilon, lut_rsqrt=lookup_table_rsqrt)
else:
output = ideal_rms_norm(data[0], zp_ifm=attrs.zp_ifm, lut_rsqrt=attrs.lookup_table_rsqrt,
zp_rsqrt=attrs.zp_rsqrt, req_lut_input=attrs.requant_lut_input,
req_output=attrs.requant_output, lut_input_pre_shift=attrs.lut_input_pre_shift,
output_pre_shift=attrs.output_pre_shift)
output = np.expand_dims(output, axis=0)
return output
[docs]
def instance_norm(data: np.ndarray, mean: np.ndarray, variance: np.ndarray,
attrs: _afe_attrs.InstanceNormAttrs | _afe_attrs.InstanceNormQuantAttrs):
from ml_kernels.np_operators import ideal_instance_norm
assert data.shape[0] == mean.shape[0] == variance.shape[0]
# TODO: add depth channel to 4D tensor and remove it after getting result back.
if isinstance(attrs, _afe_attrs.InstanceNormAttrs):
outputs = [np.expand_dims(ideal_instance_norm(data[i], variance[i], mean[i],
epsilon=attrs.epsilon), axis=0) for i in range(data.shape[0])]
else:
outputs = [np.expand_dims(ideal_instance_norm(data[i], variance[i], mean[i],
lut_rsqrt=attrs.lut_rsqrt,
zp_rsqrt=attrs.zp_rsqrt,
req_output=attrs.requant_out), axis=0) for i in range(data.shape[0])]
output = np.concatenate(outputs, axis=0)
return output
[docs]
def calculate_tessellated_tensor_shape(
tensor_type: TensorType, slice_shape: Sequence[int], align_c16: bool
) -> tuple[int, int]:
elem_size = np.dtype(tensor_type.scalar.numpy_type()).itemsize
tile_slice_grids = [
_ev_transforms.calculate_slice_grid(x, y)
for x, y in zip(tensor_type.shape[1:], slice_shape)
]
# Calculate block sizes
tile_sizes = _ev_transforms.calculate_slice_grid_sizes(tile_slice_grids, align_c16, elem_size)
return tensor_type.shape[0], int(np.sum(tile_sizes))
[docs]
def tessellation(attrs: _afe_attrs.TessellationTransformAttrs, data: np.ndarray) -> np.ndarray:
"""
Input tensor is 4D NHWC, int8 only
Output tensor is 2D array
"""
return _ev_transforms.tessellation(data, attrs.slice_shape, attrs.align_c16, attrs.cblock)
[docs]
def detessellation(attrs: _afe_attrs.DetessellationTransformAttrs, data: np.ndarray) -> np.ndarray:
"""
Input tensor is 2D
Output tensor is 4D: NHWC
"""
out_dtype = scalar_type_to_dtype(attrs.frame_type.scalar)
return _ev_transforms.detessellation(
data, attrs.slice_shape, out_dtype, attrs.frame_type.shape, attrs.align_c16, attrs.cblock
)
def _size_aligned_to_multiple(n: int, m: int) -> int:
"""
Helper function for rounding a value to be a multiple of m.
"""
return ((n + m - 1) // m) * m
[docs]
def get_channel_aligned_shape(tensor_shape: Sequence[int], elem_size: int) -> tuple[int, ...]:
"""
Helper function to get a tensor shape where channel is aligned based on the element size.
"""
assert elem_size in (1, 2, 4)
channel_align = 16 // elem_size
return *tensor_shape[:-1], _size_aligned_to_multiple(tensor_shape[-1], channel_align)
[docs]
def get_mla_padded_2d_shape(tensor_shape: Sequence[int], elem_size: int) -> tuple[int, int]:
tensor_shape = (*tensor_shape[:-1], tensor_shape[-1] * elem_size)
return tensor_shape[0], _size_aligned_to_multiple(math.prod(tensor_shape[1:]), 16)
[docs]
def reshape_to_mla_padded_2d_shape(tensor: np.ndarray) -> np.ndarray:
"""
Reshape tensor to MLA 2D buffer shape (batch_size, data_size (D * H * W * C)) where data size
must be multiple of 16.
"""
assert len(tensor.shape) in (4, 5), f'Expected 4D/5D tensor, got {len(tensor.shape)}D.'
mla_2d_shape = get_mla_padded_2d_shape(
tensor_shape=tensor.shape, elem_size=tensor.dtype.itemsize
)
tensor_2d = tensor.reshape(mla_2d_shape[0], -1).view(np.int8)
tensor_2d = np.pad(tensor_2d, ((0, 0), (0, mla_2d_shape[1] - tensor_2d.shape[1])))
assert tensor_2d.shape == mla_2d_shape, (
f"Expected shape: {mla_2d_shape}, got {tensor_2d.shape}."
)
return tensor_2d
[docs]
def reshape_from_mla_padded_2d_shape(
tensor: np.ndarray, data_shape: Sequence[int], tensor_type: type
) -> np.ndarray:
"""
Reshape tensor from MLA 2D shape to 4D/5D shape.
:param tensor: 2D tensor.
:param data_shape: 4D/5D tensor shape.
:return: Reshaped 4D/5D tensor.
"""
assert len(tensor.shape) == 2, f'Expected 2D tensor, got {len(tensor.shape)}D.'
elem_size = np.dtype(tensor_type).itemsize
return tensor[:, :math.prod(data_shape[1:] * elem_size)].view(tensor_type).reshape(data_shape)
[docs]
def pack(data: List[np.ndarray]) -> np.ndarray:
"""
Multiple tensors are packed sequentially as a 2D array.
Input data can be either a 2D tessellated tensor or
a 4D tensor that will be tessellated on the MLA.
If there is the 4D tensor reshape it to MLA 2D shape.
"""
tensors = []
for tensor in data:
if len(tensor.shape) in (4, 5):
tensor_2d = reshape_to_mla_padded_2d_shape(tensor)
tensors.append(tensor_2d)
else:
tensors.append(tensor)
return _ev_transforms.pack(tensors)
[docs]
def unpack(attrs: _afe_attrs.UnpackTransformAttrs, data: np.ndarray) -> List[np.ndarray]:
"""
A 2D array is unpacked to produce multiple 2D arrays
"""
return _ev_transforms.unpack(data,
[scalar_type_to_dtype(tt.scalar) for tt in attrs.tensor_types],
[tt.shape for tt in attrs.tensor_types])
[docs]
def normalization(attrs: _afe_attrs.NormalizationTransformAttrs, data: np.ndarray) -> np.ndarray:
"""
Normalization performs the following three steps:
1) Divide by a per-channel divisor
2) Subtract by per-channel mean values
3) Divide by per-channel standard deviation values
"""
N, H, W, C = data.shape
# Per channel params: List of (divisor, mean, sigma)
channel_params = attrs.channel_params
assert len(channel_params) == 1 or len(channel_params) == C,\
"Channel param list must be of length 1 or same as number of channels"
assert len(channel_params[0]) == 3, "Channel params must contain three values"
# If channel params are just for one channel, populate it to all channels
if len(channel_params) == 1:
channel_params *= C
return _ev_transforms.normalize(data, channel_params)
[docs]
def ev_quantize(attrs: _afe_attrs.QuantizationTransformAttrs, data: np.ndarray) -> np.ndarray:
"""
Quantization transform.
"""
if data.dtype == bfloat16:
assert len(attrs.channel_params) == 1, "Per-channel for quantize is not supported yet."
scale, zp = attrs.channel_params[0]
requant = FloatRequantization(
sc_correction=scale, zp_correction=zp, out_dtype=attrs.output_data_type.numpy_type()
)
return ml_kernels.np_operators.ideal_requantize(data, requant)
else:
return _ev_transforms.quantize(data, attrs.channel_params, attrs.num_bits)
[docs]
def ev_dequantize(attrs: _afe_attrs.DequantizationTransformAttrs, data: np.ndarray) -> np.ndarray:
"""
Dequantization transform.
"""
if attrs.output_type == ScalarType.bfloat16:
assert len(attrs.channel_params) == 1, "Per-channel for dequantize is not supported yet."
scale, zp = attrs.channel_params[0]
requant = FloatRequantization(
sc_correction=scale, zp_correction=zp, out_dtype=attrs.output_type.numpy_type()
)
return ml_kernels.np_operators.ideal_requantize(data, requant)
else:
return _ev_transforms.dequantize(data, attrs.channel_params)
[docs]
def ev_resize(attrs: _afe_attrs.ResizeTransformAttrs, data: np.ndarray) -> np.ndarray:
return _ev_transforms.resize(data, attrs.target_width, attrs.target_height,
attrs.keep_aspect, attrs.deposit_location.value, attrs.method.value)
[docs]
def chroma_upsample(attrs: _afe_attrs.ChromaUpsampleTransformAttrs, data: np.ndarray) -> np.ndarray:
return _ev_transforms.chroma_upsample(data, attrs.frame_height, attrs.frame_width,
attrs.yuv_sampling.value)
[docs]
def yuv_rgb_conversion(attrs: _afe_attrs.YuvRgbConversionTransformAttrs, data: np.ndarray) -> np.ndarray:
return _ev_transforms.yuv_rgb_conversion(data, attrs.conversion.value, attrs.std.value)
[docs]
def bgr_rgb_conversion(attrs: _afe_attrs.BgrRgbConversionTransformAttrs, data: np.ndarray) -> np.ndarray:
return _ev_transforms.bgr_rgb_conversion(data, attrs.conversion.value)
[docs]
def ev_sigmoid(attrs: _afe_attrs.SigmoidTransformAttrs, data: np.ndarray) -> np.ndarray:
return _ev_transforms.sigmoid(data, attrs.save_int16)
[docs]
def nms_maxpool(attrs: _afe_attrs.NmsMaxpoolTransformAttrs, data: np.ndarray) -> np.ndarray:
return _ev_transforms.nms_maxpool(data, attrs.kernel)
[docs]
def cast(attrs: _afe_attrs.CastAttrs, data: np.ndarray) -> np.ndarray:
return np.array(data).astype(attrs.out_dtype)
[docs]
def qnn_quantize(attrs: _afe_attrs.QNNQuantizeAttrs, data: np.ndarray,
output_scale: np.ndarray, output_zero_point: np.ndarray) -> np.ndarray:
"""
For the rounding type used for this operator (away from 0), refer to:
https://github.com/apache/tvm/pull/3512/commits/c089ebcdf4b13f98b776bb0213779f6783fa6743#diff-a47be721cf0f30d86d0f548a8cc5a1fe184d0827efd450c8446bfc05d962abf5R47
"""
out_dtype = attrs.out_dtype
axis = attrs.axis
assert out_dtype in [QNNDtype.INT8, QNNDtype.UINT8, QNNDtype.INT32]
axis_shape = data.shape[axis]
scale_len = output_scale.size
zp_len = output_zero_point.size
assert scale_len == 1 or scale_len == axis_shape and zp_len == 1 or zp_len == axis_shape
if zp_len == axis_shape or scale_len == axis_shape:
output_scale = np.array(axis_shape * output_scale) if scale_len == 1 else output_scale
output_zero_point = np.array(axis_shape * output_zero_point) if zp_len == 1 else output_zero_point
output_scale = np.squeeze(output_scale)
output_zero_point = np.squeeze(output_zero_point)
output = np.swapaxes(data, axis, -1)
output = output / output_scale + output_zero_point
output = np.swapaxes(output, axis, -1)
else:
output = (data / output_scale) + output_zero_point
out_dtype_min, out_dtype_max = DTYPE_BOUNDS[out_dtype]
return np.clip(round_op(output, RoundType.TONEAREST),
out_dtype_min,
out_dtype_max).astype(out_dtype)
[docs]
def qnn_dequantize(attrs: _afe_attrs.QNNDequantizeAttrs, data: np.ndarray,
input_scale: np.ndarray, input_zero_point: np.ndarray) -> np.ndarray:
axis = attrs.axis
axis_shape = data.shape[axis]
# Casting in order to avoid underflow/overflow of int8 and uint8 inputs.
data = data.astype(Float)
scale_len = input_scale.size
zp_len = input_zero_point.size
assert scale_len == 1 or scale_len == axis_shape and zp_len == 1 or zp_len == axis_shape
if zp_len == axis_shape or scale_len == axis_shape:
input_scale = np.array(axis_shape * input_scale) if scale_len == 1 else input_scale
input_zero_point = np.array(axis_shape * input_zero_point) if zp_len == 1 else input_zero_point
input_scale = np.squeeze(input_scale)
input_zero_point = np.squeeze(input_zero_point)
output = np.swapaxes(data, axis, -1)
output = (output - input_zero_point) * input_scale
output = np.swapaxes(output, axis, -1)
else:
output = (data - input_zero_point) * input_scale
return output
def _qnn_requantize(data: np.ndarray,
input_scale: float, input_zero_point: int,
output_scale: float, output_zero_point: int,
rounding: RoundType = RoundType.TOEVEN,
out_dtype: QNNDtype = QNNDtype.INT32) -> np.ndarray:
"""
For the explanation of UPWARD and TONEAREST rounding types, refer to:
https://github.com/apache/tvm/blob/main/include/tvm/relay/qnn/attrs.h#L47
"""
if rounding is None or rounding == "None":
rounding = RoundType.TOEVEN
assert rounding in [RoundType.UPWARD, RoundType.TONEAREST, RoundType.TOEVEN]
assert out_dtype in [QNNDtype.INT8, QNNDtype.UINT8, QNNDtype.INT32]
# Casting in order to avoid underflow/overflow of int8 and uint8 inputs.
data = data.astype(QNNDtype.INT32)
output = output_zero_point + round_op((input_scale / output_scale) * (data - input_zero_point), rounding)
out_dtype_min, out_dtype_max = DTYPE_BOUNDS[out_dtype]
return np.clip(output, out_dtype_min, out_dtype_max)
[docs]
def do_requantize(in_scale, out_scale, in_zp, out_zp) -> bool:
return in_scale != out_scale or in_zp != out_zp
[docs]
def qnn_add(attrs: _afe_attrs.AwesomeAttributes, lhs: np.ndarray, rhs: np.ndarray,
lhs_scale: float, lhs_zero_point: int,
rhs_scale: float, rhs_zero_point: int,
output_scale: float, output_zero_point: int, op: str = "add") -> np.ndarray:
# Requantize input tensors to the output parameters
# See QnnAddCanonicalize in src/relay/qnn/op/add.cc
assert op in ["add", "sub"]
if do_requantize(lhs_scale, output_scale, lhs_zero_point, output_zero_point):
lhs = _qnn_requantize(lhs, lhs_scale, lhs_zero_point, output_scale, output_zero_point)
if do_requantize(rhs_scale, output_scale, rhs_zero_point, output_zero_point):
rhs = _qnn_requantize(rhs, rhs_scale, rhs_zero_point, output_scale, output_zero_point)
if op == "sub":
output_zero_point = -output_zero_point
rhs = -rhs
output = lhs + rhs - output_zero_point
return output
[docs]
def qnn_mul(attrs: _afe_attrs.AwesomeAttributes, lhs: np.ndarray, rhs: np.ndarray,
lhs_scale: float, lhs_zero_point: int,
rhs_scale: float, rhs_zero_point: int,
output_scale: float, output_zero_point: int) -> np.ndarray:
# See QnnMulCanonicalize in src/relay/qnn/op/mul.cc
lhs_shifted = lhs.astype(QuantizedTensor)
rhs_shifted = rhs.astype(QuantizedTensor)
if lhs_zero_point != 0:
lhs_shifted -= lhs_zero_point
if rhs_zero_point != 0:
rhs_shifted -= rhs_zero_point
output = lhs_shifted * rhs_shifted
new_input_scale = lhs_scale * rhs_scale
new_input_zero_point = np.array(0)
output = _qnn_requantize(output, new_input_scale, new_input_zero_point,
output_scale, output_zero_point)
return output
#########################
# PARTITIONING OPERATIONS
#########################
def _run_mod(rt_mod: TVMGraphModule, input_dict: Dict[str, np.ndarray], num_outputs: int) -> List[np.ndarray]:
rt_mod.set_input(**input_dict)
rt_mod.run()
return [np.asarray(np.nan_to_num(rt_mod.get_output(i).asnumpy())) for i in range(num_outputs)]
[docs]
def external(attrs: _afe_attrs.ExternalAttrs, input_dict: Dict[str, np.ndarray]) \
-> Union[np.ndarray, Tuple[np.ndarray, ...]]:
# Execute runtime module
rt_mod = attrs.graph_module
num_outputs = rt_mod.get_num_outputs()
external_op_input_dict = {input_name: value
for input_name, value in zip(attrs.external_input_list, input_dict.values())}
outputs = _run_mod(rt_mod, external_op_input_dict, num_outputs)
output = tuple(outputs)
return output[0] if len(output) == 1 else output
###################
# CUSTOM OPERATIONS
###################
[docs]
def init_custom_op(attrs: _afe_attrs.CustomOpAttrs,
input_dict: Dict[InputName, np.ndarray],
output_shape: Tuple[int, ...],
force_compile: bool = True) -> None:
"""
Initialize the custom op. Compile the custom op and put it into the
CustomOpLibraryManager. Update the CustomOpAttrs with generated arguments
list and function so it can be used at the execution time.
:param attrs: CustomOpAttrs
:param input_dict: Dict[InputName, np.ndarray]. Input name to its tensor
:param output_shape: Tuple[int, ...]. Output shape
:param force_compile: bool. Default is True. If True, the init_custom_op will
compile the custom op no matter the library is ready or not
"""
from afe.ir.custom_operation.custom_operation import create_custom_op_function
if force_compile or (attrs.args_list or attrs.function) is None:
# Get the input shapes and types
shapes: List[Tuple[int, ...]] = []
types: List[str] = []
for _input in input_dict.values():
shapes.append(_input.shape)
types.append(str(_input.dtype))
# Only support same dtype for all input tensors
assert all([dtype == types[0] for dtype in types]), \
f"Only support input tensors with same dtypes. Got {types}"
# Pass the attributes into CustomOpAttrs
attrs.c_code_in_shapes = shapes
attrs.c_code_in_dtypes = types
# TODO: This assumes there is only 1 ouput. Check if there will be multiple outputs.
args_list, function = create_custom_op_function(attrs, input_dict, output_shape)
attrs.args_list = args_list
attrs.function = function
[docs]
def execute_custom_op(attrs: _afe_attrs.CustomOpAttrs,
input_dict: Dict[InputName, np.ndarray]) -> np.ndarray:
"""
Execute the custom op
:param attrs: CustomOpAttrs
:param input_dict: Dict[InputName, np.ndarray]. Input name to its tensor
:return: np.ndarray
"""
from afe.ir.custom_operation.custom_operation import execute_custom_op
return execute_custom_op(attrs, input_dict)
[docs]
def batch_matmul(lhs: np.ndarray, rhs: np.ndarray,
attrs: Union[_afe_attrs.BatchMatmulAttrs, _afe_attrs.BatchMatmulQuantAttrs]) \
-> np.ndarray:
"""
Execute batch matmul operation.
:param lhs: Tensor representing lhs value of batch matmul operation.
:param rhs: Tensor representing rhs value of batch matmul operation.
:param attrs: BatchMatmul operator's attributes.
:return: The result of batch matmul operation.
"""
from ml_kernels.np_operators import ideal_batch_matmul
if isinstance(attrs, _afe_attrs.BatchMatmulAttrs):
assert lhs.dtype in ('float32', 'bfloat16') and rhs.dtype in ('float32', 'bfloat16')
output = ideal_batch_matmul(lhs[0], rhs[0], transpose_b=attrs.transpose_b)
return np.expand_dims(output, axis=0)
else:
assert lhs.dtype == np.int8 and rhs.dtype == np.int8
output = ideal_batch_matmul(lhs[0], rhs[0], transpose_b=attrs.attrs.transpose_b,
zp_a=attrs.lhs_zp, zp_b=attrs.rhs_zp,
requant_params=attrs.requant, intrinsic_shift=attrs.intrinsic_shift)
return np.expand_dims(output, axis=0)