Source code for afe.ir.utils

#########################################################
# Copyright (C) 2021 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Joey Chou
#########################################################
import numpy as np
import copy
from typing import List, Tuple, Union, TypeVar, Any, Optional, Callable, Sequence, Dict, Set
import afe.ir.defines as types

from ml_kernels.math_helpers import RoundType
from ml_kernels.types import is_supported_type, is_integer_type
from ml_kernels import requantization
from sima_utils.logging.sima_logger import UserFacingException

[docs] T = TypeVar("T")
[docs] R = TypeVar("R", types.AwesomePad, types.AwesomeStrides, types.AwesomeDilation, types.AwesomePoolSize)
[docs] def afe_warn(msg: str) -> bool: """ HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' """ from warnings import warn warn("\033[93m" + msg + "\033[0m") return False
[docs] def exclude_axes(shape_len: int, axis: List[int]) -> List[int]: """Used when you want a list of axes not in the current axis list""" new_axes = list(range(shape_len)) new_axes = [x for x in new_axes if x not in axis] return new_axes
[docs] def get_transpose_indices_according_to_layout_strings(current_layout: str, desired_layout: str) -> List[int]: """ Returns a list of indices from layout strings, so that transpose with this list of indices would be equivalent. EG: NHWC NCHW -> [0, 3, 1, 2] """ transpose_arr = [] for s in desired_layout: transpose_arr.append(current_layout.index(s)) return transpose_arr
[docs] def transpose_tensor_according_to_layout_strings(input_tensor: np.ndarray, current_layout: str, desired_layout: str) -> np.ndarray: """ returns a np.ndarray transposed in the same fashion that the layout moves from the current to the desired layout Examples: HWIO -> OIHW, NHWC -> NCHW, NCDHW -> NDHWC """ if current_layout != desired_layout: transpose_arr = [current_layout.index(s) for s in desired_layout] new_tensor = np.transpose(input_tensor, transpose_arr) return new_tensor else: return input_tensor
[docs] def transpose_attr_according_to_layout_strings(attr: R, current_layout: str, desired_layout: str) -> List[Any]: """ Transpose and prune the data in the same fashion that the layout moves from the current to the desired layout EG: attr = [0,1,2,3] current_layout = 'NHWC' desired_layout = 'WH' => ouput_list = [2,1] """ assert len(current_layout) >= len(desired_layout), "Error: Desired Layout exceeds Current Layout" if current_layout == desired_layout: return list(attr) transpose_axes = [(current_layout.index(desired_layout[i])) for i in range(len(desired_layout))] transposed = [attr[transpose_axes[i]] for i in range(len(desired_layout))] return transposed
[docs] def insert_according_to_layout_strings(target: Tuple[T], source: Sequence[T], target_layout: str, source_layout: str) -> Tuple[T]: """ Transpose and merge elements from source into target. Wherever a label appears in source_layout and target_layout, the element from source at that index in source_layout is inserted into target at that index in target_layout, overwriting what was there. For example, `insert_according_to_layout_strings((0.5, 0.7, 0.9), (-2.5, -4.5), "ABC", "BD")` returns (0.5, -2.5, 0.9). The label 'B' is present in both layouts, and the value at that label's position is copied from source to target. :param target: Values to combine. A copy of this tuple is created, and some tuple fields are replaced by data from source. :param source: Values to combine. Items from this tuple are inserted into a copy of target. :param target_layout: A string with one character for each tuple element in target. The character is a label associated with the tuple element. :param source_layout: A string with one character for each tuple element in source. The character is a label associated with the tuple element. :return: A new tuple holding the merged data. """ assert len(target) == len(target_layout) assert len(source) == len(source_layout) ret = list(target) for source_i, label in enumerate(source_layout): try: target_i = target_layout.index(label) except ValueError: continue ret[target_i] = source[source_i] return tuple(ret)
[docs] def compare_outputs(out1: np.ndarray, out2: np.ndarray, tolerance: float = 0.001) -> None: """Compares outputs of 2 tensors. Raises an error arrays are not element wise equal within a tolerance""" assert isinstance(out1, np.ndarray) and isinstance(out2, np.ndarray), \ "Error: Expected type for both outputs is np.ndarray" assert np.array_equal(out1.shape, out2.shape), \ "Error: shapes of tensors do not match {} vs {}".format(out1.shape, out2.shape) if not np.allclose(out1, out2, atol=tolerance): max_l1_err = np.max(abs(out1.astype(np.float32) - out2.astype(np.float32))) raise Exception(f"Error: Outputs do not match within tolerance = {tolerance}. Got {max_l1_err}")
[docs] def reverse_transpose(original: Union[List[T], Tuple[T, ...]], transpose_axes: Tuple[int, ...]) -> Tuple[T, ...]: """ Reverse the list of element given the applied transpose axes. """ assert len(original) == len(transpose_axes) res = list(copy.deepcopy(original)) for i, d in zip(transpose_axes, original): res[i] = d return tuple(res)
[docs] def is_depthwise_conv(in_channels: int, out_channels: int, groups: int) -> bool: """ Return True if the parameters designate a depthwise convolution. Depthwise convolution is a special case of grouped convolution where the number of groups is equal to the number of input channels. When there is a single input channel, it is ambiguous, and we treat it as a regular convolution. Parameters ---------- :param in_channels: int. Number of input channels. :param out_channels: Number of output channels. :param groups: int. Number of convolution groups. Return ------ :return: bool. Return True if the convolution is a depthwise convolution. """ return in_channels == groups == out_channels and groups > 1
[docs] def is_depthwise_conv_with_channel_mul(in_channels: int, out_channels: int, groups: int) -> bool: """ Return True if the parameters designate a depthwise convolution with channel multiplier. Depthwise convolution is a special case of grouped convolution where the number of groups is equal to the number of input channels and number of output channels is equal to: input_channels * channel_multiplier. Parameters ---------- :param in_channels: int. Number of input channels. :param out_channels: Number of output channels. :param groups: int. Number of convolution groups. Return ------ :return: bool. Return True if the convolution is a depthwise convolution with channel multiplier. """ channel_mul = out_channels // in_channels return in_channels == groups and channel_mul > 1 and groups > 1
[docs] def is_group_conv(in_channels: int, out_channels: int, groups: int) -> bool: """ Return True if the parameters designate a grouped convolution that is not a depthwise convolution. Parameters ---------- :param in_channels: Number of input channels. :param out_channels: Number of output channels. :param groups: int. Number of convolution groups. Return ------ :return: bool. Return True if the convolution is a group convolution """ # Make sure it is not a depthwise convolution return not is_depthwise_conv(in_channels, out_channels, groups) and groups > 1
[docs] def transpose_axis_to_the_last(data: np.ndarray, axis: int) -> np.ndarray: """ Transpose axis in the data to the last dimension. Parameters ---------- :param data: np.ndarray :param axis: int Return ------ :return: np.ndarray. Transposed data """ axes = [i for i in range(data.ndim)] axes[axis], axes[-1] = axes[-1], axes[axis] return data.transpose(axes)
[docs] def with_axis_last(data: np.ndarray, axis: int, f: Callable[[np.ndarray], np.ndarray]) -> np.ndarray: """ Apply a function to a transposed view of an array and reverse the transposition on the function's result. The function must return an array of the same shape as its input. :param data: Array to transform :param axis: Index of axis that will be transposed to the last axis :param f: Function to apply on the transposed array :return: Result of f with the axis transposed back to its original position """ x = transpose_axis_to_the_last(data, axis) y = f(x) assert x.shape == y.shape, "Function did not return an array of the same shape" return transpose_axis_to_the_last(y, axis)
[docs] def convert_transpose_conv2d_to_conv2d_paddings(weight_shape: Tuple[int, int, int, int], weight_layout: str, data_layout: str, padding: types.AwesomePad, output_padding: Optional[types.AwesomePad] = None ) -> types.AwesomePad: """ Converta transpose conv2d padding to conv2d padding Parameters ---------- :param weight_shape: Tuple[int, int, int, int]. Shape of the 4-D weight :param weight_layout: str. Weight layout :param data_layout: str. Data layout :param padding: AwesomePad. Padding of the transpose conv2d :param output_padding: AwesomePad. Output padding of the transpose conv2d Return ------ :return: AwesomePad. A transformed padding for regular conv2d """ R, S = weight_shape[weight_layout.index("H")], weight_shape[weight_layout.index("W")] h_ind, w_ind = data_layout.index("H"), data_layout.index("W") pad_top = R - 1 - padding[h_ind][0] pad_bottom = R - 1 - padding[h_ind][1] pad_left = S - 1 - padding[w_ind][0] pad_right = S - 1 - padding[w_ind][1] if output_padding: pad_bottom += output_padding[h_ind][1] pad_right += output_padding[w_ind][1] return ((0, 0), (pad_top, pad_bottom), (pad_left, pad_right), (0, 0))
[docs] def set_shape_batch_size(shape: Tuple[int, ...], batch_size: int) -> Tuple[int, ...]: """ Given the Tuple representing the shape of a tensor, return the Tuple corresponding the same tensor shape with a given batch size. Warning - This is a hack before we have a general solution for all dimensions 1D Override of the length is not reversible 2D Override of the first dimension is not reversible, if it not meant for batch 4D Assume batch is the first dimension Others No change :param shape: Tuple representing the shape of a tensor. :param batch_size: Integer value representing the batch size value. :return: Tuple corresponding to the shape of a tensor with a given batch size and input shape. """ if len(shape) == 1: return (batch_size * shape[0],) elif len(shape) == 4 or len(shape) == 2: return (batch_size, *shape[1:]) else: return shape
[docs] def get_input_dict_batch_size(input_dict: Dict[str, np.ndarray]) -> int: """ Analyzes input dict and returns batch size. All inputs should have matching batch size. :param input_dict: Dictionary of input values, which batch size should be returned. :return: The value of batch size. """ input_batch_sizes: Set[int] = set() for i in input_dict.values(): input_batch_sizes.add(i.shape[0]) assert len(input_batch_sizes) == 1, "Inputs have different batch sizes." batch_size = input_batch_sizes.pop() return batch_size
[docs] def unbatch_input_dict(input_dict: Dict[str, np.ndarray], batch_size: int) -> List[Dict[str, np.ndarray]]: """ Create a list of dictionaries from dictionary of inputs containing numpy arrays where first dimension is a batch size. The length of returning list is equal to the batch size and the size of the first dimension for all the arrays in a returning list is equal to 1. :param input_dict: Dictionary of input numpy arrays. Each array needs to be in shape (batch_size, H, W, C) or (batch_size, C, H, W) depending on the layout. :param batch_size: The batch size of the input dictionary. User needs to make sure that it corresponds to the size of the first dimension in all arrays in the input_dict. :return: The list of dictionaries of input arrays. The length of the list is batch_size and array shapes are (1, H, W, C) or (1, C, H, W) depending on the layout. """ outputs: List[Dict[str, np.ndarray]] = list() for idx in range(batch_size): batch_dict = {input_name: np.expand_dims(input_value[idx], axis=0) for input_name, input_value in input_dict.items()} outputs.append(batch_dict) return outputs
[docs] def transpose_conv_output_shape(input_shape: Sequence[int], kernel_shape: Sequence[int], padding: Sequence[Tuple[int, int]], output_padding: Sequence[Tuple[int, int]], stride: Sequence[int], dilation: Sequence[int]) -> Tuple[int]: """ Calculate the shape of the output tensor of a transposed convolution in spatial dimensions. All parameters sequences must have the same length as the number of spatial dimensions: two for 2D convolution, three for 3D convolution. :param input_shape: Shape of the input feature map :param kernel_shape: Shape of the convolution kernel :param padding: Padding applied to the input :param output_padding: Padding applied to the output. Only the second component, that is the padding at the end, is used. :param stride: Stride of the convolution :param dilation: Dilation of the convolution :return: Shape of the output feature map in the spatial dimensions """ ret = [] for n, k, (p_lo, p_hi), o_p, s, d in zip(input_shape, kernel_shape, padding, output_padding, stride, dilation): o = (n - 1)*s - p_lo - p_hi + d * (k - 1) + o_p[1] + 1 ret.append(o) return tuple(ret)
[docs] def create_and_verify_narrowing(shift: Union[int, np.ndarray], round_type: RoundType, out_dtype: type) -> requantization.Narrowing: if not is_supported_type(out_dtype): raise UserFacingException(f"Unsupported out type encountered during requantization: {out_dtype}") if is_integer_type(out_dtype): # Although shift tensor must also be in the 32-bit int range, # we can't check it in generic code. if isinstance(shift, int): if not 0 <= shift <= 31: raise UserFacingException(f"Quantization failure: Shift value ({shift}) out of range (0, 31). " "This can occur for certain unexpected values during " "calibration/quantization process. " "Please try using different calibration and/or quantization schemes. ") elif isinstance(shift, np.ndarray): if not (np.all(0 <= shift) and np.all(shift <= 31)): raise UserFacingException(f"Quantization failed: Shift array has values that are out of range (0, 31). " "This can occur for certain unexpected values during " "calibration/quantization process. " "Please try using different calibration and/or quantization schemes. ") else: assert isinstance(shift, int) and shift == 0 assert round_type == RoundType.TOEVEN return requantization.Narrowing(shift, round_type, out_dtype)
[docs] def split_einsum_equation(equation: str) -> Tuple[List[str], List[str]]: """ Separate the inputs and outputs parts of the einsum equation, returning list of strings representing specs of each tensor in equation. Note that empty list is returned if there is no output spec in an equation. Also, space characters are removed if present in an equation string. :param equation: Einsum equation string, i.e "nchw,nqhc->nqhw". :return: Tuple containing lists of input and output tensor spec strings. """ eq_strings = equation.replace(" ", "").split('->') assert len(eq_strings) in (1, 2), f"Illegal einsum equation: {equation}" input_strings = eq_strings[0].split(',') output_strings = eq_strings[1].split(',') if len(eq_strings) > 1 else list() return input_strings, output_strings
[docs] def mla_supports_einsum_equation_strings(input_strings: List[str], output_strings: List[str]) \ -> bool: """ Checks if the Einsum equation can be supported on MLA. For Einsum equation to be supported on MLA, number of specs for input tensors must be equal to two, number of specs for output tensors must be equal to one and all the specs must contain exactly four letters. This check is used when deciding whether Einsum operator's layout is going to be changed during the ConvertLayout pass. We cannot base the decision on the contents of the equation here, since MLAChecker (which is coming to action in the latter, GraphPartitioning, pass) could use the wrong layout to come to partitioning decision for the Einsum operator. :param input_strings: The list of spec string defining input tensors. :param output_strings: The list of spec string defining output tensors. :return: True if input and output strings meet requirements to be supported on MLA, otherwise False. """ # There must be exactly two inputs and one output if len(input_strings) != 2 or len(output_strings) != 1: return False # Each tensor must be specified with four characters if not all([len(in_str) == 4 for in_str in input_strings]) or len(output_strings[0]) != 4: return False return True
[docs] def is_mla_supported_einsum_equation(equation: str, data_layout: str) -> bool: """ Checks if the einsum equation is supported on MLA for the given data layout. Supported equations: - nchw,nchq->nqhw and nchw,nqhc->nqhw for NCHW data layout - nhwc,nhqc->nhwq and nhwc,nhcq->nhwq for NHWC data layout Note that naming of the axes and presence of whitespaces are irrelevant for decision. """ input_strings, output_strings = split_einsum_equation(equation) if mla_supports_einsum_equation_strings(input_strings, output_strings): # Convert to NHWC if data_layout == NCHW if data_layout == "NCHW": def _transpose_string_to_nhwc(s: str) -> str: return s[0] + s[2:] + s[1] input_strings = [_transpose_string_to_nhwc(in_str) for in_str in input_strings] output_strings = [_transpose_string_to_nhwc(out_str) for out_str in output_strings] # First MLA supported case: nhwc,nhqc->nhwq is_mla_1 = (input_strings[0][0:3] + input_strings[1][2] == output_strings[0] and input_strings[0][0:2] + input_strings[0][3] == input_strings[1][0:2] + input_strings[1][3]) # Second MLA supported case: nhwc,nhcq->nhwq is_mla_2 = (input_strings[0][0:3] + input_strings[1][3] == output_strings[0] and input_strings[0][0:2] + input_strings[0][3] == input_strings[1][0:3]) return is_mla_1 or is_mla_2 return False