#########################################################
# Copyright (C) 2021 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Joey Chou
#########################################################
import numpy as np
import copy
from typing import List, Tuple, Union, TypeVar, Any, Optional, Callable, Sequence, Dict, Set
import afe.ir.defines as types
from ml_kernels.math_helpers import RoundType
from ml_kernels.types import is_supported_type, is_integer_type
from ml_kernels import requantization
from sima_utils.logging.sima_logger import UserFacingException
[docs]
R = TypeVar("R", types.AwesomePad, types.AwesomeStrides, types.AwesomeDilation, types.AwesomePoolSize)
[docs]
def afe_warn(msg: str) -> bool:
"""
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
"""
from warnings import warn
warn("\033[93m" + msg + "\033[0m")
return False
[docs]
def exclude_axes(shape_len: int, axis: List[int]) -> List[int]:
"""Used when you want a list of axes not in the current axis list"""
new_axes = list(range(shape_len))
new_axes = [x for x in new_axes if x not in axis]
return new_axes
[docs]
def get_transpose_indices_according_to_layout_strings(current_layout: str, desired_layout: str) -> List[int]:
"""
Returns a list of indices from layout strings, so that transpose with this list of indices would be equivalent.
EG: NHWC NCHW -> [0, 3, 1, 2]
"""
transpose_arr = []
for s in desired_layout:
transpose_arr.append(current_layout.index(s))
return transpose_arr
[docs]
def transpose_tensor_according_to_layout_strings(input_tensor: np.ndarray, current_layout: str,
desired_layout: str) -> np.ndarray:
"""
returns a np.ndarray transposed in the same fashion that the layout moves from the current to the desired layout
Examples: HWIO -> OIHW, NHWC -> NCHW, NCDHW -> NDHWC
"""
if current_layout != desired_layout:
transpose_arr = [current_layout.index(s) for s in desired_layout]
new_tensor = np.transpose(input_tensor, transpose_arr)
return new_tensor
else:
return input_tensor
[docs]
def transpose_attr_according_to_layout_strings(attr: R, current_layout: str, desired_layout: str) -> List[Any]:
"""
Transpose and prune the data in the same fashion that the layout moves from the current to the desired layout
EG: attr = [0,1,2,3] current_layout = 'NHWC' desired_layout = 'WH' => ouput_list = [2,1]
"""
assert len(current_layout) >= len(desired_layout), "Error: Desired Layout exceeds Current Layout"
if current_layout == desired_layout:
return list(attr)
transpose_axes = [(current_layout.index(desired_layout[i])) for i in range(len(desired_layout))]
transposed = [attr[transpose_axes[i]] for i in range(len(desired_layout))]
return transposed
[docs]
def insert_according_to_layout_strings(target: Tuple[T], source: Sequence[T],
target_layout: str, source_layout: str) -> Tuple[T]:
"""
Transpose and merge elements from source into target. Wherever a label appears
in source_layout and target_layout, the element from source at that index in source_layout
is inserted into target at that index in target_layout, overwriting what was there.
For example, `insert_according_to_layout_strings((0.5, 0.7, 0.9), (-2.5, -4.5), "ABC", "BD")`
returns (0.5, -2.5, 0.9). The label 'B' is present in both layouts, and the
value at that label's position is copied from source to target.
:param target: Values to combine. A copy of this tuple is created, and some
tuple fields are replaced by data from source.
:param source: Values to combine. Items from this tuple are inserted into
a copy of target.
:param target_layout: A string with one character for each tuple element in target.
The character is a label associated with the tuple element.
:param source_layout: A string with one character for each tuple element in source.
The character is a label associated with the tuple element.
:return: A new tuple holding the merged data.
"""
assert len(target) == len(target_layout)
assert len(source) == len(source_layout)
ret = list(target)
for source_i, label in enumerate(source_layout):
try:
target_i = target_layout.index(label)
except ValueError:
continue
ret[target_i] = source[source_i]
return tuple(ret)
[docs]
def compare_outputs(out1: np.ndarray, out2: np.ndarray, tolerance: float = 0.001) -> None:
"""Compares outputs of 2 tensors. Raises an error arrays are not element wise equal within a tolerance"""
assert isinstance(out1, np.ndarray) and isinstance(out2, np.ndarray), \
"Error: Expected type for both outputs is np.ndarray"
assert np.array_equal(out1.shape, out2.shape), \
"Error: shapes of tensors do not match {} vs {}".format(out1.shape, out2.shape)
if not np.allclose(out1, out2, atol=tolerance):
max_l1_err = np.max(abs(out1.astype(np.float32) - out2.astype(np.float32)))
raise Exception(f"Error: Outputs do not match within tolerance = {tolerance}. Got {max_l1_err}")
[docs]
def reverse_transpose(original: Union[List[T], Tuple[T, ...]], transpose_axes: Tuple[int, ...]) -> Tuple[T, ...]:
"""
Reverse the list of element given the applied transpose axes.
"""
assert len(original) == len(transpose_axes)
res = list(copy.deepcopy(original))
for i, d in zip(transpose_axes, original):
res[i] = d
return tuple(res)
[docs]
def is_depthwise_conv(in_channels: int,
out_channels: int,
groups: int) -> bool:
"""
Return True if the parameters designate a depthwise convolution.
Depthwise convolution is a special case of grouped convolution where
the number of groups is equal to the number of input channels.
When there is a single input channel, it is ambiguous, and we treat it as a regular convolution.
Parameters
----------
:param in_channels: int. Number of input channels.
:param out_channels: Number of output channels.
:param groups: int. Number of convolution groups.
Return
------
:return: bool. Return True if the convolution is a depthwise convolution.
"""
return in_channels == groups == out_channels and groups > 1
[docs]
def is_depthwise_conv_with_channel_mul(in_channels: int,
out_channels: int,
groups: int) -> bool:
"""
Return True if the parameters designate a depthwise convolution with channel multiplier.
Depthwise convolution is a special case of grouped convolution where
the number of groups is equal to the number of input channels and
number of output channels is equal to: input_channels * channel_multiplier.
Parameters
----------
:param in_channels: int. Number of input channels.
:param out_channels: Number of output channels.
:param groups: int. Number of convolution groups.
Return
------
:return: bool. Return True if the convolution is a depthwise convolution with channel multiplier.
"""
channel_mul = out_channels // in_channels
return in_channels == groups and channel_mul > 1 and groups > 1
[docs]
def is_group_conv(in_channels: int,
out_channels: int,
groups: int) -> bool:
"""
Return True if the parameters designate a grouped convolution that is not a depthwise convolution.
Parameters
----------
:param in_channels: Number of input channels.
:param out_channels: Number of output channels.
:param groups: int. Number of convolution groups.
Return
------
:return: bool. Return True if the convolution is a group convolution
"""
# Make sure it is not a depthwise convolution
return not is_depthwise_conv(in_channels, out_channels, groups) and groups > 1
[docs]
def transpose_axis_to_the_last(data: np.ndarray, axis: int) -> np.ndarray:
"""
Transpose axis in the data to the last dimension.
Parameters
----------
:param data: np.ndarray
:param axis: int
Return
------
:return: np.ndarray. Transposed data
"""
axes = [i for i in range(data.ndim)]
axes[axis], axes[-1] = axes[-1], axes[axis]
return data.transpose(axes)
[docs]
def with_axis_last(data: np.ndarray, axis: int, f: Callable[[np.ndarray], np.ndarray]) -> np.ndarray:
"""
Apply a function to a transposed view of an array and reverse the transposition on the function's result.
The function must return an array of the same shape as its input.
:param data: Array to transform
:param axis: Index of axis that will be transposed to the last axis
:param f: Function to apply on the transposed array
:return: Result of f with the axis transposed back to its original position
"""
x = transpose_axis_to_the_last(data, axis)
y = f(x)
assert x.shape == y.shape, "Function did not return an array of the same shape"
return transpose_axis_to_the_last(y, axis)
[docs]
def convert_transpose_conv2d_to_conv2d_paddings(weight_shape: Tuple[int, int, int, int],
weight_layout: str,
data_layout: str,
padding: types.AwesomePad,
output_padding: Optional[types.AwesomePad] = None
) -> types.AwesomePad:
"""
Converta transpose conv2d padding to conv2d padding
Parameters
----------
:param weight_shape: Tuple[int, int, int, int]. Shape of the 4-D weight
:param weight_layout: str. Weight layout
:param data_layout: str. Data layout
:param padding: AwesomePad. Padding of the transpose conv2d
:param output_padding: AwesomePad. Output padding of the transpose conv2d
Return
------
:return: AwesomePad. A transformed padding for regular conv2d
"""
R, S = weight_shape[weight_layout.index("H")], weight_shape[weight_layout.index("W")]
h_ind, w_ind = data_layout.index("H"), data_layout.index("W")
pad_top = R - 1 - padding[h_ind][0]
pad_bottom = R - 1 - padding[h_ind][1]
pad_left = S - 1 - padding[w_ind][0]
pad_right = S - 1 - padding[w_ind][1]
if output_padding:
pad_bottom += output_padding[h_ind][1]
pad_right += output_padding[w_ind][1]
return ((0, 0), (pad_top, pad_bottom), (pad_left, pad_right), (0, 0))
[docs]
def set_shape_batch_size(shape: Tuple[int, ...], batch_size: int) -> Tuple[int, ...]:
"""
Given the Tuple representing the shape of a tensor, return the Tuple
corresponding the same tensor shape with a given batch size.
Warning - This is a hack before we have a general solution for all dimensions
1D Override of the length is not reversible
2D Override of the first dimension is not reversible, if it not meant for batch
4D Assume batch is the first dimension
Others No change
:param shape: Tuple representing the shape of a tensor.
:param batch_size: Integer value representing the batch size value.
:return: Tuple corresponding to the shape of a tensor with a given batch size and input shape.
"""
if len(shape) == 1:
return (batch_size * shape[0],)
elif len(shape) == 4 or len(shape) == 2:
return (batch_size, *shape[1:])
else:
return shape
[docs]
def transpose_conv_output_shape(input_shape: Sequence[int], kernel_shape: Sequence[int],
padding: Sequence[Tuple[int, int]],
output_padding: Sequence[Tuple[int, int]],
stride: Sequence[int], dilation: Sequence[int]) -> Tuple[int]:
"""
Calculate the shape of the output tensor of a transposed convolution
in spatial dimensions.
All parameters sequences must have the same length as the number of
spatial dimensions: two for 2D convolution, three for 3D convolution.
:param input_shape: Shape of the input feature map
:param kernel_shape: Shape of the convolution kernel
:param padding: Padding applied to the input
:param output_padding: Padding applied to the output. Only the second component,
that is the padding at the end, is used.
:param stride: Stride of the convolution
:param dilation: Dilation of the convolution
:return: Shape of the output feature map in the spatial dimensions
"""
ret = []
for n, k, (p_lo, p_hi), o_p, s, d in zip(input_shape, kernel_shape, padding, output_padding, stride, dilation):
o = (n - 1)*s - p_lo - p_hi + d * (k - 1) + o_p[1] + 1
ret.append(o)
return tuple(ret)
[docs]
def create_and_verify_narrowing(shift: Union[int, np.ndarray], round_type: RoundType,
out_dtype: type) -> requantization.Narrowing:
if not is_supported_type(out_dtype):
raise UserFacingException(f"Unsupported out type encountered during requantization: {out_dtype}")
if is_integer_type(out_dtype):
# Although shift tensor must also be in the 32-bit int range,
# we can't check it in generic code.
if isinstance(shift, int):
if not 0 <= shift <= 31:
raise UserFacingException(f"Quantization failure: Shift value ({shift}) out of range (0, 31). "
"This can occur for certain unexpected values during "
"calibration/quantization process. "
"Please try using different calibration and/or quantization schemes. ")
elif isinstance(shift, np.ndarray):
if not (np.all(0 <= shift) and np.all(shift <= 31)):
raise UserFacingException(f"Quantization failed: Shift array has values that are out of range (0, 31). "
"This can occur for certain unexpected values during "
"calibration/quantization process. "
"Please try using different calibration and/or quantization schemes. ")
else:
assert isinstance(shift, int) and shift == 0
assert round_type == RoundType.TOEVEN
return requantization.Narrowing(shift, round_type, out_dtype)
[docs]
def split_einsum_equation(equation: str) -> Tuple[List[str], List[str]]:
"""
Separate the inputs and outputs parts of the einsum equation, returning list of strings
representing specs of each tensor in equation. Note that empty list is returned if there is no
output spec in an equation. Also, space characters are removed if present in an equation
string.
:param equation: Einsum equation string, i.e "nchw,nqhc->nqhw".
:return: Tuple containing lists of input and output tensor spec strings.
"""
eq_strings = equation.replace(" ", "").split('->')
assert len(eq_strings) in (1, 2), f"Illegal einsum equation: {equation}"
input_strings = eq_strings[0].split(',')
output_strings = eq_strings[1].split(',') if len(eq_strings) > 1 else list()
return input_strings, output_strings
[docs]
def mla_supports_einsum_equation_strings(input_strings: List[str], output_strings: List[str]) \
-> bool:
"""
Checks if the Einsum equation can be supported on MLA. For Einsum equation to be supported on
MLA, number of specs for input tensors must be equal to two, number of specs for output tensors
must be equal to one and all the specs must contain exactly four letters.
This check is used when deciding whether Einsum operator's layout is going to be changed during
the ConvertLayout pass. We cannot base the decision on the contents of the equation here, since
MLAChecker (which is coming to action in the latter, GraphPartitioning, pass) could use the
wrong layout to come to partitioning decision for the Einsum operator.
:param input_strings: The list of spec string defining input tensors.
:param output_strings: The list of spec string defining output tensors.
:return: True if input and output strings meet requirements to be supported on MLA, otherwise
False.
"""
# There must be exactly two inputs and one output
if len(input_strings) != 2 or len(output_strings) != 1:
return False
# Each tensor must be specified with four characters
if not all([len(in_str) == 4 for in_str in input_strings]) or len(output_strings[0]) != 4:
return False
return True
[docs]
def is_mla_supported_einsum_equation(equation: str, data_layout: str) -> bool:
"""
Checks if the einsum equation is supported on MLA for the given data layout.
Supported equations:
- nchw,nchq->nqhw and nchw,nqhc->nqhw for NCHW data layout
- nhwc,nhqc->nhwq and nhwc,nhcq->nhwq for NHWC data layout
Note that naming of the axes and presence of whitespaces are irrelevant for decision.
"""
input_strings, output_strings = split_einsum_equation(equation)
if mla_supports_einsum_equation_strings(input_strings, output_strings):
# Convert to NHWC if data_layout == NCHW
if data_layout == "NCHW":
def _transpose_string_to_nhwc(s: str) -> str:
return s[0] + s[2:] + s[1]
input_strings = [_transpose_string_to_nhwc(in_str) for in_str in input_strings]
output_strings = [_transpose_string_to_nhwc(out_str) for out_str in output_strings]
# First MLA supported case: nhwc,nhqc->nhwq
is_mla_1 = (input_strings[0][0:3] + input_strings[1][2] == output_strings[0]
and
input_strings[0][0:2] + input_strings[0][3] ==
input_strings[1][0:2] + input_strings[1][3])
# Second MLA supported case: nhwc,nhcq->nhwq
is_mla_2 = (input_strings[0][0:3] + input_strings[1][3] == output_strings[0]
and
input_strings[0][0:2] + input_strings[0][3] == input_strings[1][0:3])
return is_mla_1 or is_mla_2
return False