Source code for afe.apis.transform

#########################################################
# Copyright (C) 2022 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Christopher Rodrigues
#########################################################
"""
Tensor transformations that can be applied to a model's
input or output.
"""
import dataclasses
import functools
from typing import Callable, Sequence

import numpy as np

from afe.apis.compilation_job_base import Tensor
from afe.apis.defines import (
    ChromaSampling, ColorConversion, ColorSpaceStandard, ResizeDepositLocation, ResizeMethod
)
from afe.backends import Backend
from afe.core.configs import QuantizationConfigs
import afe.ir.operations as _afe_op
import afe.ir.attributes as _afe_attr
from afe.ir.defines import NodeName
from afe.ir.node import AwesomeNode as _AwesomeNode
from afe.ir.sima_ir import SiMaIR as _SimaIR
from afe.ir.sima_ir import SiMaIRMetadata as _SimaIRMetadata
from afe.ir.tensor_type import (
    TensorType, ScalarType, TensorValue, TupleValue, data_byte_size, scalar_type_to_dtype
)
import afe.ir.operation_functions as _op_fn
from ml_kernels.math_helpers import RoundType as _RoundType

from ev_transforms import transforms as ev_transforms


def _format_call_string(name: str, *args, **kwargs) -> str:
    """
    Create a function call expression for display.
    Convert arguments and keyword arguments to strings and display them as
    arguments or keyword arguments, respectively, in the displayed expression.
    """
    call_args = [str(a) for a in args] + [kw + "=" + str(a) for kw, a in kwargs.items()]
    return name + "(" + ",".join(call_args) + ")"


[docs] class Transform: """ A transformation on a tensor. """
[docs] def apply(self, t: Tensor) -> Tensor: """ Apply this transform to a tensor. Raise an exception if the tensor's type is not suitable for this transform. """ raise NotImplementedError("Method is abstract")
[docs] def get_result_type(self, parameter_type: TensorType) -> TensorType: """ Get the type of this transform's output when it is applied to input of the given type. Raise an exception if the transform cannot be applied to a tensor having the given type. """ raise NotImplementedError("Method is abstract")
[docs] def extract_ir( self, input_type: TensorType, input_name: NodeName, output_base_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: """ Extract the internal representation of this transform. """ raise NotImplementedError("Method is abstract")
def __str__(self) -> str: """ Return a human-readable text representation of this transform. """ raise NotImplementedError("Method is abstract") def _build_ir_for_single_operator( self, operation: _afe_op.AwesomeOperation, operation_mnemonic: str, attrs: _afe_attr.AwesomeAttributes, input_type: TensorType, input_name: NodeName, output_base_name: str ) -> tuple[TensorType, list[_AwesomeNode]]: """ Build the IR for a transform that corresponds to a SiMa IR operator. Subclasses' extract_ir methods should call this method when the subclass translates to a single operator and the other preconditions are met. :param operation: Operation to put into IR. The operation must have a static input list in its input_list field. :param operation_mnemonic: Name of operation to use in the IR node's name. :param input_type: This node's input tensor type :param input_name: The SiMa IR name of this node's input node :param output_base_name: String distinguishing this node in the graph to use for making a unique name :return: Output type and a 1-element list containing the created node """ output_type = self.get_result_type(input_type) node_name = NodeName(f"{output_base_name}_{operation_mnemonic}") node = _AwesomeNode(name=node_name, input_names=operation.input_list, input_node_names=[input_name], ir=_SimaIR(operation=operation, _attrs=attrs, calib_attrs=_afe_attr.AwesomeCalibAttrs(), _quant_attrs=None, quant_config=QuantizationConfigs(), backend=Backend.EV, # Currently all transforms go to EV _metadata=_SimaIRMetadata(""))) return output_type, [node]
class _ArgmaxTransform(Transform): def __init__(self, *, axis: int): self._axis = axis def apply(self, t: Tensor) -> Tensor: if len(t.shape) <= self._axis: raise ValueError("Tensor does not have the selected axis") if t.dtype != np.float32: raise ValueError("Only 32-bit floating point data is supported for argmax") return ev_transforms.argmax(t, axis=self._axis) def get_result_type(self, parameter_type: TensorType) -> TensorType: if len(parameter_type.shape) <= self._axis: raise ValueError("Tensor does not have the selected axis") if parameter_type.scalar.numpy_type() != np.float32: raise ValueError("Only 32-bit floating point data is supported for argmax") new_shape = tuple(v for i, v in enumerate(parameter_type.shape) if i != self._axis) return TensorType(scalar=ScalarType.int32, shape=new_shape) def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: attrs = _afe_attr.ArgMaxAttrs( [self._axis], False, False, input_type.shape, False, ScalarType.int32, input_type.scalar ) return self._build_ir_for_single_operator(_afe_op.ArgMaxOp(), "argmax", attrs, input_type, input_name, output_name) def __str__(self) -> str: return _format_call_string('argmax', axis=self._axis)
[docs] def argmax(*, axis: int) -> Transform: """ The argmax operation applied over a single axis of a tensor. :param axis: Axis to reduce :return: The argmax transformation """ return _ArgmaxTransform(axis=axis)
class _ReshapeTransform(Transform): def __init__(self, *, newshape: list[int]): self._newshape = newshape def apply(self, t: Tensor) -> Tensor: return ev_transforms.reshape(self._newshape, t) def get_result_type(self, parameter_type: TensorType) -> TensorType: new_shape = ev_transforms.calculate_new_shape_after_reshape(parameter_type.shape, self._newshape) return TensorType(scalar=parameter_type.scalar, shape=tuple(new_shape)) def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: attrs = _afe_attr.ReshapeAttrs(input_type.shape, input_type.scalar, self._newshape) return self._build_ir_for_single_operator(_afe_op.ReshapeOp(), "reshape_transform", attrs, input_type, input_name, output_name) def __str__(self) -> str: return _format_call_string('reshape_transform', newshape=self._newshape)
[docs] def reshape_transform(*, newshape: list[int]) -> Transform: """ The reshape_transform operation :param newshape: New shape after reshape transform :return: The reshape transformation """ return _ReshapeTransform(newshape=newshape)
class _LayoutTransform(Transform): """ Layout transform Used to test the infrastructure related to EV plugins until EV plugins get implemented. """ def __init__(self, *, src_layout: str, dst_layout: str): self._src_layout = src_layout self._dst_layout = dst_layout if len(self._src_layout) != len(self._dst_layout): raise ValueError("Source and destination layouts incompatible.") for s in self._src_layout: assert s in self._dst_layout, "Source and destination layouts incompatible." def apply(self, t: Tensor) -> Tensor: return ev_transforms.layout_transform(t, self._src_layout, self._dst_layout) def get_result_type(self, parameter_type: TensorType) -> TensorType: if len(self._src_layout) != len(parameter_type.shape): raise ValueError("Input type shape not compatible with source layout.") output_shape = tuple([parameter_type.shape[self._src_layout.index(c)] for c in self._dst_layout]) assert len(self._dst_layout) == len(output_shape), "Output type shape not compatible with source layout." return dataclasses.replace(parameter_type, shape=output_shape) def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: # Use implicitly_removable flag to prevent removing this operator attrs = _afe_attr.LayoutTransformAttrs(input_type, self._src_layout, self._dst_layout, implicitly_removable=False) return self._build_ir_for_single_operator(_afe_op.LayoutTransformOp(), "layout_transform", attrs, input_type, input_name, output_name) def __str__(self) -> str: return _format_call_string('layout_transform', src_layout=self._src_layout, dst_layout=self._dst_layout)
[docs] def layout_transform(*, src_layout: str, dst_layout: str) -> Transform: """ The layout_transform operation. :param src_layout: Layout of the input tensor :param dst_layout: Layout of the output tensor :return: The layout_transform transformation """ return _LayoutTransform(src_layout=src_layout, dst_layout=dst_layout)
class _TessellationTransform(Transform): """ Tessellation transform Used to convert a tensor from frame based storage format to slice (or tile) based storage format. """ _name: str = 'tessellation_transform' def __init__( self, *, slice_shape: Sequence[int], align_c16: bool, cblock: bool ): self._slice_shape = slice_shape self._align_c16 = align_c16 self._cblock = cblock if not self._align_c16 and self._cblock: raise ValueError("Channels need to be aligned to 16 to be blocked.") def apply(self, t: Tensor) -> Tensor: if len(t.shape) not in (4, 5): raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D/5D is expected.") if len(t.shape) == len(self._slice_shape) + 1: raise ValueError( f"Input tensor is {len(t.shape)}D, but slice shape is {len(self._slice_shape)}." ) if any(x > y for x, y in zip(self._slice_shape, t.shape[1:])): raise ValueError( f"Slice shape {self._slice_shape} is bigger than frame shape {t.shape[1:]}" ) if self._slice_shape[-1] < t.shape[-1] and self._slice_shape[-1] % 16 != 0: raise ValueError( f"Unsupported slice channel size {self._slice_shape[-1]} for frame channel size" f" {t.shape[-1]}" ) return ev_transforms.tessellation(t, self._slice_shape, self._align_c16, self._cblock) def get_result_type(self, parameter_type: TensorType) -> TensorType: if len(parameter_type.shape) not in (4, 5): raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D/5D is expected") if any(x > y for x, y in zip(self._slice_shape, parameter_type.shape[1:])): raise ValueError( f"Slice shape {self._slice_shape} is bigger than frame shape" f" {parameter_type.shape[1:]}" ) if self._slice_shape[-1] < parameter_type.shape[-1] and self._slice_shape[-1] % 16 != 0: raise ValueError( f"Unsupported slice channel size {self._slice_shape[-1]} for frame channel size" f" {parameter_type.shape[-1]}" ) # Calculate output_tensor_shape output_shape = _op_fn.calculate_tessellated_tensor_shape( parameter_type, self._slice_shape, self._align_c16 ) return dataclasses.replace(parameter_type, shape=output_shape) def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: if len(input_type.shape) not in (4, 5): raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D/5D is expected") if any(x > y for x, y in zip(self._slice_shape, input_type.shape[1:])): raise ValueError( f"Slice shape {self._slice_shape} is bigger than frame shape" f" {input_type.shape[1:]}" ) attrs = _afe_attr.TessellationTransformAttrs( self._slice_shape, self._align_c16, self._cblock, input_type ) return self._build_ir_for_single_operator( _afe_op.TessellationTransformOp(), "tessellation_transform", attrs, input_type, input_name, output_name ) def __str__(self) -> str: return _format_call_string( self._name, slice_shape=self._slice_shape, align_c16=self._align_c16, cblock=self._cblock )
[docs] def tessellation_transform( *, slice_shape: Sequence[int], align_c16: bool, cblock: bool ) -> Transform: """ The tessellation_transform operation. :param slice_shape: Shape of slice to tessellate. :param align_c16: True to align channels to 16 in a tessellated slice :param cblock: True to interleave the channel blocks in a tessellated slice. :return: The tessellation_transform transformation """ return _TessellationTransform(slice_shape=slice_shape, align_c16=align_c16, cblock=cblock)
class _DetessellationTransform(Transform): """ Detessellation transform Used to convert a tensor from slice based storage format to frame based storage format. """ _name: str = 'detessellation_transform' def __init__( self, *, slice_shape: Sequence[int], align_c16: bool, cblock: bool, frame_type: TensorType ): self._slice_shape = slice_shape self._align_c16 = align_c16 self._cblock = cblock self._frame_type = frame_type if len(self._frame_type.shape) not in (4, 5): raise ValueError( f"Frame shape is {len(self._frame_type.shape)}D, but 4D/5D is expected" ) if any(x > y for x, y in zip(self._slice_shape, self._frame_type.shape[1:])): raise ValueError( f"Slice shape {self._slice_shape} is bigger than frame shape" f" {self._frame_type.shape[1:]}" ) if self._slice_shape[-1] < self._frame_type.shape[-1] and self._slice_shape[-1] % 16 != 0: raise ValueError(f"Slice channels ({self._slice_shape[-1]}) is not multiple of 16!") if not self._align_c16 and self._cblock: raise ValueError("Channels need to be aligned to 16 to be blocked.") def apply(self, t: Tensor) -> Tensor: if len(t.shape) != 2: raise ValueError(f"Input tensor is {len(t.shape)}D, but 2D is expected") if t.dtype != np.int8: raise ValueError(f"Tensor type is {t.dtype}, but int8 is expected") return ev_transforms.detessellation( t, self._slice_shape, scalar_type_to_dtype(self._frame_type.scalar), self._frame_type.shape, self._align_c16, self._cblock ) def get_result_type(self, parameter_type: TensorType) -> TensorType: if len(parameter_type.shape) != 2: raise ValueError(f"Input tensor shape is {len(parameter_type.shape)}D, but 2D is expected") if parameter_type.scalar != ScalarType.int8: raise ValueError(f"Input tensor type is {parameter_type.scalar}, but int8 is expected") return self._frame_type def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: if len(input_type.shape) != 2: raise ValueError(f"Input shape is {len(input_type.shape)}D, but 2D is expected") if input_type.scalar != ScalarType.int8: raise ValueError(f"Input type is {input_type.scalar}, but int8 is expected") attrs = _afe_attr.DetessellationTransformAttrs( self._slice_shape, self._align_c16, self._cblock, self._frame_type, input_type.shape ) return self._build_ir_for_single_operator( _afe_op.DetessellationTransformOp(), "detessellation_transform", attrs, input_type, input_name, output_name ) def __str__(self) -> str: return _format_call_string( self._name, slice_shape=self._slice_shape, align_c16=self._align_c16, cblock=self._cblock, frame_type=self._frame_type )
[docs] def detessellation_transform( *, slice_shape: Sequence[int], align_c16: bool, cblock: bool, frame_type: TensorType ) -> Transform: """ The detessellation_transform operation. :param slice_shape: Shape of slice to detessellate :param align_c16: True to indicate that slice channels are aligned to 16. :param cblock: True to indicate that channel blocks are interleaved in a slice. :param frame_type: Tensor type of de-tessellated frame :return: The detessellation_transform transformation """ return _DetessellationTransform( slice_shape=slice_shape, align_c16=align_c16, cblock=cblock, frame_type=frame_type )
class _PackTransform(Transform): """ Pack transform Used to pack multiple tensors into a single 2D array """ def __init__(self): """ No parameters are required outside tensor data to transform """ pass def apply(self, t: list[Tensor]) -> Tensor: if len(t) <= 1: raise ValueError(f"Multiple tensors are required in pack transform, but got {len(t)}") return _op_fn.pack(t) def get_packed_result_type(self, parameter_types: list[TensorType]) -> TensorType: if len(parameter_types) <= 1: raise ValueError(f"Multiple tensors are required in pack transform, but got {len(parameter_types)}") # Output tensor is 2D int8 with paddings batch_size = parameter_types[0].shape[0] data_types = _afe_op.get_pack_input_types(parameter_types) tensor_values = [TensorValue(TensorType(pt.scalar, pt.shape[1:])) for pt in data_types] tuple_value = TupleValue(tensor_values) total_size = data_byte_size(tuple_value) return TensorType(ScalarType.int8, (batch_size, total_size)) def extract_pack_ir( self, input_types: list[TensorType], input_name_list: list[NodeName], output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: if len(input_types) <= 1: raise ValueError(f"Multiple tensors are required in pack transform, but got {len(input_types)}") operation = _afe_op.PackTransformOp() attrs = _afe_attr.PackTransformAttrs(input_types, ScalarType.int8) output_type = self.get_packed_result_type(input_types) node_name = NodeName(f"{output_name}_pack_transform") node = _AwesomeNode(name=node_name, input_names=[f"input_{idx}" for idx in range(len(input_types))], input_node_names=input_name_list, ir=_SimaIR(operation=operation, _attrs=attrs, calib_attrs=_afe_attr.AwesomeCalibAttrs(), _quant_attrs=None, quant_config=QuantizationConfigs(), backend=Backend.EV, _metadata=_SimaIRMetadata(""))) return output_type, [node] def __str__(self) -> str: return _format_call_string('pack_transform')
[docs] def pack_transform() -> _PackTransform: """ The pack_transform operation. :return: The pack_transform transformation """ return _PackTransform()
class _UnpackTransform(Transform): """ Unpack transform Used to unpack 2D array to multiple ND arrays according to target tensor types """ def __init__(self, *, tensor_types: list[TensorType]): self._tensor_types = tensor_types def apply(self, t: Tensor) -> list[Tensor]: if len(t.shape) != 2: raise ValueError(f"Input tensor is {len(t.shape)}D, but 2D is expected") if t.dtype != np.int8: raise ValueError(f"Tensor type is {t.dtype}, but int8 is expected") return ev_transforms.unpack(t, [scalar_type_to_dtype(tt.scalar) for tt in self._tensor_types], [tt.shape for tt in self._tensor_types]) def get_unpacked_result_type(self, parameter_type: TensorType) -> list[TensorType]: if len(parameter_type.shape) != 2: raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 2D is expected") if parameter_type.scalar != ScalarType.int8: raise ValueError(f"Tensor type is {parameter_type.scalar}, but int8 is expected") # Output is determined by the target tensor types return self._tensor_types def extract_unpack_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[list[TensorType], list[_AwesomeNode]]: if len(input_type.shape) != 2: raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 2D is expected") if input_type.scalar != ScalarType.int8: raise ValueError(f"Tensor type is {input_type.scalar}, but int8 is expected") operation = _afe_op.UnpackTransformOp() attrs = _afe_attr.UnpackTransformAttrs(input_type.shape, self._tensor_types) output_types = self.get_unpacked_result_type(input_type) node_name = NodeName(f"{output_name}_unpack_transform") node = _AwesomeNode(name=node_name, input_names=operation.input_list, input_node_names=[input_name], ir=_SimaIR(operation=operation, _attrs=attrs, calib_attrs=_afe_attr.AwesomeCalibAttrs(), _quant_attrs=None, quant_config=QuantizationConfigs(), backend=Backend.EV, # Currently all transforms go to EV _metadata=_SimaIRMetadata(""))) return output_types, [node] def __str__(self) -> str: return _format_call_string('unpack_transform', tensor_types=self._tensor_types)
[docs] def unpack_transform(*, tensor_types: list[TensorType]) -> _UnpackTransform: """ The unpack_transform operation. :param tensor_types: List of target tensor types after unpack :return: The unpack_transform transformation """ return _UnpackTransform(tensor_types=tensor_types)
class _NormalizationTransform(Transform): """ Normalization transform Used to normalize a tensor """ def __init__(self, *, channel_params: list[tuple[float, float, float]]): self._channel_params = channel_params if len(self._channel_params[0]) != 3: raise ValueError(f"Number of parameters must be 3, but got {len(self._channel_params[0])}") def apply(self, t: Tensor) -> Tensor: if len(t.shape) != 4: raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected") return ev_transforms.normalize(t, self._channel_params) def get_result_type(self, parameter_type: TensorType) -> TensorType: if len(parameter_type.shape) != 4: raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected") output_type = ScalarType.float32 return dataclasses.replace(parameter_type, scalar=output_type) def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: if len(input_type.shape) != 4: raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected") attrs = _afe_attr.NormalizationTransformAttrs(self._channel_params, input_type) return self._build_ir_for_single_operator(_afe_op.NormalizationTransformOp(), "normalization_transform", attrs, input_type, input_name, output_name) def __str__(self) -> str: return _format_call_string('normalization_transform', channel_params=self._channel_params)
[docs] def normalization_transform(*, channel_params: list[tuple[float, float, float]]) -> Transform: """ The normalization_transform operation. :param channel_params: The list of tuples for (divisor, mean, standard deviation) :return: The normalization_transform transformation """ return _NormalizationTransform(channel_params=channel_params)
class _QuantizationTransform(Transform): """ Quantization transform Used to quantize a tensor with given per-channel zero point and scale values """ def __init__(self, *, channel_params: list[tuple[float, int]], num_bits: int, rounding: _RoundType = _RoundType.TONEAREST): self._channel_params = channel_params self._num_bits = num_bits self._rounding = rounding if len(self._channel_params[0]) != 2: raise ValueError(f"Number of parameters must be 2, but got {len(self._channel_params[0])}") if self._num_bits != 8 and self._num_bits != 32: raise ValueError(f"Number of bits for quantiation must be 8 or 32, but got {self._num_bits}") if self._rounding != _RoundType.TONEAREST: raise ValueError(f"Rounding type must be TONEAREST, but got {self._rounding}") def apply(self, t: Tensor) -> Tensor: if len(t.shape) != 4: raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected") if t.dtype != np.float32: raise ValueError(f"Input tensor type is {t.dtype}, but float32 is expected") return ev_transforms.quantize(t, self._channel_params, self._num_bits) def get_result_type(self, parameter_type: TensorType) -> TensorType: if len(parameter_type.shape) != 4: raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected") if parameter_type.scalar != ScalarType.float32: raise ValueError(f"Tensor type is {parameter_type.scalar}, but float32 is expected") result_type = ScalarType.int8 if self._num_bits == 8 else ScalarType.int32 return dataclasses.replace(parameter_type, scalar=result_type) def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: if len(input_type.shape) != 4: raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected") if input_type.scalar != ScalarType.float32: raise ValueError(f"Tensor type is {input_type.scalar}, but float32 is expected") output_data_type = ScalarType.int8 if self._num_bits == 8 else ScalarType.int32 attrs = _afe_attr.QuantizationTransformAttrs(self._channel_params, input_type.shape, self._num_bits, self._rounding, output_data_type) return self._build_ir_for_single_operator(_afe_op.QuantizationTransformOp(), "quantization_transform", attrs, input_type, input_name, output_name) def __str__(self) -> str: return _format_call_string('quantization_transform', channel_params=self._channel_params, num_bits=self._num_bits, rounding=self._rounding)
[docs] def quantization_transform( *, channel_params: list[tuple[float, int]], num_bits: int, rounding: _RoundType = _RoundType.TONEAREST ) -> Transform: """ The quantization_transform operation. :param channel_params: The list of tuples for (quant_scale, zero_point) :param num_bits: The number of bits used for quantization :param rounding: The rounding type for quantization :return: The quantization_transform transformation """ return _QuantizationTransform(channel_params=channel_params, num_bits=num_bits, rounding=rounding)
class _DequantizationTransform(Transform): """ Dequantization transform Used to de-quantize a tensor with given per-channel zero point and quantization scale values """ def __init__(self, *, channel_params: list[tuple[float, int]]): self._channel_params = channel_params if len(self._channel_params[0]) != 2: raise ValueError(f"Number of parameters must be 2, but got {len(self._channel_params[0])}") def apply(self, t: Tensor) -> Tensor: if len(t.shape) != 4: raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected") if t.dtype != np.int8 and t.dtype != np.int32: raise ValueError(f"Input tensor type is {t.dtype}, but int8 or int32 is expected") return ev_transforms.dequantize(t, self._channel_params) def get_result_type(self, parameter_type: TensorType) -> TensorType: if len(parameter_type.shape) != 4: raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected") if parameter_type.scalar != ScalarType.int8 and parameter_type.scalar != ScalarType.int32: raise ValueError(f"Tensor type is {parameter_type.scalar}, but int8 or int32 is expected") output_type = ScalarType.float32 return dataclasses.replace(parameter_type, scalar=output_type) def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: if len(input_type.shape) != 4: raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected") if input_type.scalar != ScalarType.int8 and input_type.scalar != ScalarType.int32: raise ValueError(f"Tensor type is {input_type.scalar}, but int8 or int32 is expected") attrs = _afe_attr.DequantizationTransformAttrs(self._channel_params, input_type) return self._build_ir_for_single_operator(_afe_op.DequantizationTransformOp(), "dequantization_transform", attrs, input_type, input_name, output_name) def __str__(self) -> str: return _format_call_string('dequantization_transform', channel_params=self._channel_params)
[docs] def dequantization_transform(*, channel_params: list[tuple[float, int]]) -> Transform: """ The dequantization_transform operation. :param channel_params: The list of tuples for (quant_scale, zero_point) :return: The dequantization_transform transformation """ return _DequantizationTransform(channel_params=channel_params)
class _ResizeTransform(Transform): """ Resize transform Used to resize a tensor """ def __init__(self, *, target_height: int, target_width: int, keep_aspect: bool, deposit_location: ResizeDepositLocation, method: ResizeMethod): self._target_height = target_height self._target_width = target_width self._keep_aspect = keep_aspect self._deposit_location = deposit_location self._method = method def apply(self, t: Tensor) -> Tensor: if len(t.shape) != 4: raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected") return ev_transforms.resize(t, self._target_width, self._target_height, self._keep_aspect, self._deposit_location.value, self._method.value) def get_result_type(self, parameter_type: TensorType) -> TensorType: if len(parameter_type.shape) != 4: raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected") output_shape = (parameter_type.shape[0], self._target_height, self._target_width, parameter_type.shape[3]) return dataclasses.replace(parameter_type, shape=output_shape) def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: if len(input_type.shape) != 4: raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected") attrs = _afe_attr.ResizeTransformAttrs(self._target_height, self._target_width, self._keep_aspect, self._deposit_location, self._method, input_type) return self._build_ir_for_single_operator(_afe_op.ResizeTransformOp(), "resize_transform", attrs, input_type, input_name, output_name) def __str__(self) -> str: return _format_call_string('resize_transform', target_height=self._target_height, target_width=self._target_width, keep_aspect=self._keep_aspect, deposit_location=self._deposit_location, method=self._method)
[docs] def resize_transform(*, target_height: int, target_width: int, keep_aspect: bool, deposit_location: ResizeDepositLocation, method: ResizeMethod) -> Transform: """ The resize_transform operation. :param target_height: Target height of resized tensor :param target_width: Target width of resized tensor :param keep_aspect: Boolean flag to keep aspect ratio :param deposit_location: Enum to indicate deposit position of resized image :param method: Enum to indicate supported interpolation methods :return: The resize_transform transformation """ return _ResizeTransform(target_height=target_height, target_width=target_width, keep_aspect=keep_aspect, deposit_location=deposit_location, method=method)
class _ChromaUpsampleTransform(Transform): """ Chroma upsample transform Used to interpolate subsampled YUV images """ def __init__(self, *, frame_height: int, frame_width: int, yuv_sampling: ChromaSampling): self._frame_height = frame_height self._frame_width = frame_width self._yuv_sampling = yuv_sampling def _expected_frame_size_from_subsampled_chroma(self) -> int: frame_size: int = self._frame_height * self._frame_width if self._yuv_sampling in (ChromaSampling.NV12, ChromaSampling.YUV420): frame_size = frame_size * 3 // 2 elif self._yuv_sampling == ChromaSampling.YUV422: frame_size *= 2 else: raise ValueError(f"Not supported chroma sampling format {self._yuv_sampling}") return frame_size def apply(self, t: Tensor) -> Tensor: if len(t.shape) != 2: raise ValueError(f"Input tensor is {len(t.shape)}D, but 2D is expected") expected_size = self._expected_frame_size_from_subsampled_chroma() if t.shape[1] != expected_size: raise ValueError(f"Input tensor size not match {self._frame_height}H x {self._frame_width}W " f"for chroma {self._yuv_sampling}") return ev_transforms.chroma_upsample(t, self._frame_height, self._frame_width, self._yuv_sampling.value) def get_result_type(self, parameter_type: TensorType) -> TensorType: if len(parameter_type.shape) != 2: raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 2D is expected") expected_size = self._expected_frame_size_from_subsampled_chroma() if parameter_type.shape[1] != expected_size: raise ValueError(f"Input tensor size not match {self._frame_height}H x {self._frame_width}W " f"for chroma {self._yuv_sampling}") output_shape = (parameter_type.shape[0], self._frame_height, self._frame_width, 3) return dataclasses.replace(parameter_type, shape=output_shape) def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: if len(input_type.shape) != 2: raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 2D is expected") expected_size = self._expected_frame_size_from_subsampled_chroma() if input_type.shape[1] != expected_size: raise ValueError(f"Input tensor size not match {self._frame_height}H x {self._frame_width}W " f"for chroma {self._yuv_sampling}") attrs = _afe_attr.ChromaUpsampleTransformAttrs(self._frame_height, self._frame_width, self._yuv_sampling, input_type) return self._build_ir_for_single_operator(_afe_op.ChromaUpsampleTransformOp(), "chroma_upsample_transform", attrs, input_type, input_name, output_name) def __str__(self) -> str: return _format_call_string('chroma_upsample_transform', frame_height=self._frame_height, frame_width=self._frame_width, yuv_sampling=self._yuv_sampling)
[docs] def chroma_upsample_transform(*, frame_height: int, frame_width: int, yuv_sampling: ChromaSampling) -> Transform: """ The chroma_upsample_transform operation. :param frame_height: Height of full sampling frame :param frame_width: Width of full sampling frame :param yuv_sampling: Chroma sampling Enum :return: The chroma_upsample_transform transformation """ return _ChromaUpsampleTransform(frame_height=frame_height, frame_width=frame_width, yuv_sampling=yuv_sampling)
class _YuvRgbConversionTransform(Transform): """ YUV and RGB conversion transform Used to convert between YUV and RGB images """ def __init__(self, *, conversion: ColorConversion, std: ColorSpaceStandard): self._conversion = conversion self._std = std def apply(self, t: Tensor) -> Tensor: if t.dtype != np.uint8: raise ValueError(f"Tensor type is {str(t.dtype)}, but uint8 is expected.") if len(t.shape) != 4: raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected") if t.shape[3] != 3: raise ValueError(f"Number of input channels is {t.shape[3]}, but 3 is expected") return ev_transforms.yuv_rgb_conversion(t, self._conversion.value, self._std.value) def get_result_type(self, parameter_type: TensorType) -> TensorType: if parameter_type.scalar != ScalarType.uint8: raise ValueError(f"Tensor type is {parameter_type.scalar.value}, but uint8 is expected.") if len(parameter_type.shape) != 4: raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected") if parameter_type.shape[3] != 3: raise ValueError(f"Number of input channels is {parameter_type.shape[3]}, but 3 is expected") return parameter_type def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: if input_type.scalar != ScalarType.uint8: raise ValueError(f"Tensor type is {input_type.scalar.value}, but int8 is expected.") if len(input_type.shape) != 4: raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected") if input_type.shape[3] != 3: raise ValueError(f"Number of input channels is {input_type.shape[3]}, but 3 is expected") attrs = _afe_attr.YuvRgbConversionTransformAttrs(self._conversion, self._std, input_type.shape) return self._build_ir_for_single_operator(_afe_op.YuvRgbConversionTransformOp(), "yuv_rgb_conversion_transform", attrs, input_type, input_name, output_name) def __str__(self) -> str: return _format_call_string('yuv_rgb_conversion_transform', conversion=self._conversion, std=self._std)
[docs] def yuv_rgb_conversion_transform(*, conversion: ColorConversion, std: ColorSpaceStandard) -> Transform: """ The yuv_rgb_conversion_transform operation. :param conversion: Direction of conversion between YUV and RGB :param std: Standard for color space conversion :return: The yuv_rgb_conversion_transform transformation """ return _YuvRgbConversionTransform(conversion=conversion, std=std)
class _BgrRgbConversionTransform(Transform): """ BGR and RGB conversion transform Used to convert between BGR and RGB images """ def __init__(self, *, conversion: ColorConversion): self._conversion = conversion def apply(self, t: Tensor) -> Tensor: if t.dtype != np.uint8: raise ValueError(f"Tensor type is {str(t.dtype)}, but uint8 is expected.") if len(t.shape) != 4: raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected") if t.shape[3] != 3: raise ValueError(f"Number of input channels is {t.shape[3]}, but 3 is expected") return ev_transforms.bgr_rgb_conversion(t, self._conversion.value) def get_result_type(self, parameter_type: TensorType) -> TensorType: if parameter_type.scalar != ScalarType.uint8: raise ValueError(f"Tensor type is {parameter_type.scalar.value}, but uint8 is expected.") if len(parameter_type.shape) != 4: raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected") if parameter_type.shape[3] != 3: raise ValueError(f"Number of input channels is {parameter_type.shape[3]}, but 3 is expected") return parameter_type def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: if input_type.scalar != ScalarType.uint8: raise ValueError(f"Tensor type is {input_type.scalar.value}, but int8 is expected.") if len(input_type.shape) != 4: raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected") if input_type.shape[3] != 3: raise ValueError(f"Number of input channels is {input_type.shape[3]}, but 3 is expected") attrs = _afe_attr.BgrRgbConversionTransformAttrs(self._conversion, input_type.shape) return self._build_ir_for_single_operator(_afe_op.BgrRgbConversionTransformOp(), "bgr_rgb_conversion_transform", attrs, input_type, input_name, output_name) def __str__(self) -> str: return _format_call_string('bgr_rgb_conversion_transform', conversion=self._conversion)
[docs] def bgr_rgb_conversion_transform(*, conversion: ColorConversion) -> Transform: """ The bgr_rgb_conversion_transform operation. :param conversion: Direction of conversion between BGR and RGB :return: The bgr_rgb_conversion_transform transformation """ return _BgrRgbConversionTransform(conversion=conversion)
def _validate_crop_with_shape(input_shape: tuple[int, ...], bounding_box: list[tuple[int, int]]): """ Validate the parameters of crop_transform against the shape of the input tensor. Raise an exception for invalid parameters. The caller must validate the individual parameters before this call. """ assert bounding_box[0][0] >= 0 and bounding_box[0][1] >= 0, \ "Bounding box origin must be greater than 0 in each dimension" assert bounding_box[1][0] <= input_shape[1] and bounding_box[1][1] <= input_shape[2], \ "Bounding box endpoint must not be greater than input tensor shape" def _convert_crop_to_slice_parameters( input_shape: tuple[int, ...], bounding_box: list[tuple[int, int]] ) -> tuple[list[int], list[int]]: """ Convert the parameters of crop_transform to parameters of slice_transform, given the shape of the input tensor. The parameters are not validated. :param input_shape: Shape of the input tensor. Must be 4D. :param bounding_box: Bounding box of crop transform, as a list of two (x,y) coordinates. :return: Begin and end tuples for slice transform """ begin = [0, bounding_box[0][0], bounding_box[0][1], 0] end = [input_shape[0], bounding_box[1][0], bounding_box[1][1], input_shape[3]] return begin, end def _get_crop_output_shape(input_shape: tuple[int, ...], bounding_box: list[tuple[int, int]]) -> list[int]: """ Get the shape of the output of crop_transform, given the shape of the input tensor. The parameters are not validated. :param input_shape: Shape of the input tensor. Must be 4D. :param bounding_box: Bounding box of crop transform, as a list of two (x,y) coordinates. :return: Output shape """ return [ input_shape[0], bounding_box[1][0] - bounding_box[0][0], bounding_box[1][1] - bounding_box[0][1], input_shape[3] ] class _CropTransform(Transform): """ Crop transform Used to crop a rectangle region of a tensor """ def __init__(self, *, bounding_box: list[tuple[int, int]]): assert len(bounding_box) == 2, f"Incompatible bounding box length = {len(bounding_box)}" self._bounding_box = bounding_box def apply(self, t: Tensor) -> Tensor: if len(t.shape) != 4: raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected") begin, end = _convert_crop_to_slice_parameters(t.shape, self._bounding_box) return ev_transforms.ev_slice(t, begin, end) def get_result_type(self, parameter_type: TensorType) -> TensorType: if len(parameter_type.shape) != 4: raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected") return TensorType(parameter_type.scalar, tuple(_get_crop_output_shape(t.shape, self._bounding_box))) def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: if len(input_type.shape) != 4: raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected") begin, end = _convert_crop_to_slice_parameters(t.shape, self._bounding_box) attrs = _afe_attr.StridedSliceAttrs(begin=begin, end=end, strides=[1, 1, 1, 1], axes=[0, 1, 2, 3], slice_mode='end', input_shape=input_type.shape, input_type=input_type.scalar) return self._build_ir_for_single_operator(_afe_op.StridedSliceOp(), "slice_transform", attrs, input_type, input_name, output_name) def __str__(self) -> str: return _format_call_string('crop_transform', conversion=self._bounding_box)
[docs] def crop_transform(*, bounding_box: list[tuple[int, int]]) -> Transform: """ The crop_transform operation. :param bounding_box: Rectangle area to crop, as a list of 2 items. bounding_box[0] is the (x,y) position of the cropped area's origin within the input area. bounding_box[1] is the (x,y) past-the-end position of the cropped area within the input area. :return: The crop_transform transformation """ return _CropTransform(bounding_box=bounding_box)
class _SliceTransform(Transform): """ Slice transform. Extracts a slice of a tensor. """ def __init__(self, *, begin: Sequence[int], end: Sequence[int]): assert len(begin) in (4, 5), ( f"Incompatible begin indices length = {len(begin)}, expected 4 or 5." ) assert len(end) in (4, 5), f"Incompatible end indices length = {len(end)}, expected 4 or 5." self._begin = begin self._end = end def apply(self, t: Tensor) -> Tensor: if len(t.shape) not in (4, 5): raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D/5D is expected") return ev_transforms.ev_slice(t, self._begin, self._end) def get_result_type(self, parameter_type: TensorType) -> TensorType: if len(parameter_type.shape) not in (4, 5): raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D/5D is expected") return parameter_type def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: if len(input_type.shape) not in (4, 5): raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D/5D is expected") attrs = _afe_attr.StridedSliceAttrs( begin=self._begin, end=self._end, strides=[1] * len(self._begin), axes=list(range(len(self._begin))), slice_mode='end', input_shape=input_type.shape, input_type=input_type.scalar ) return self._build_ir_for_single_operator( _afe_op.StridedSliceOp(), "slice_transform", attrs, input_type, input_name, output_name ) def __str__(self) -> str: return _format_call_string('slice_transform', begin=self._begin, end=self._end)
[docs] def slice_transform(*, begin: list[int], end: list[int]) -> Transform: """ The slice_transform operation. :param begin: Begin indices for each dimension. :param end: End indices for each dimension. :return: The slice_transform transformation """ return _SliceTransform(begin=begin, end=end)
class _SigmoidTransform(Transform): """ Sigmoid transform Used to compute sigmoid of tensor """ def __init__(self, *, save_int16: bool): self._save_int16 = save_int16 def apply(self, t: Tensor) -> Tensor: if len(t.shape) != 4: raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected") return ev_transforms.sigmoid(t, self._save_int16) def get_result_type(self, parameter_type: TensorType) -> TensorType: if len(parameter_type.shape) != 4: raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected") output_type = ScalarType.int16 if self._save_int16 else ScalarType.float32 return dataclasses.replace(parameter_type, scalar=output_type) def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: if len(input_type.shape) != 4: raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected") attrs = _afe_attr.SigmoidTransformAttrs(self._save_int16, input_type.shape) return self._build_ir_for_single_operator(_afe_op.SigmoidTransformOp(), "sigmoid_transform", attrs, input_type, input_name, output_name) def __str__(self) -> str: return _format_call_string('sigmoid_transform', save_int16=self._save_int16)
[docs] def sigmoid_transform(*, save_int16: bool) -> Transform: """ The sigmoid_transform operation. :param save_int16: Boolean flag to save output as 16-bit fixed point :return: The sigmoid_transform transformation """ return _SigmoidTransform(save_int16=save_int16)
class _NmsMaxpoolTransform(Transform): """ NMS maxpool transform Used to compute maxpool with NMS """ def __init__(self, *, kernel: int): self._kernel = kernel def apply(self, t: Tensor) -> Tensor: if len(t.shape) != 4: raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected") return ev_transforms.nms_maxpool(t, self._kernel) def get_result_type(self, parameter_type: TensorType) -> TensorType: if len(parameter_type.shape) != 4: raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected") return parameter_type def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: if len(input_type.shape) != 4: raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected") attrs = _afe_attr.NmsMaxpoolTransformAttrs(self._kernel, input_type) return self._build_ir_for_single_operator(_afe_op.NmsMaxpoolTransformOp(), "nms_maxpool_transform", attrs, input_type, input_name, output_name) def __str__(self) -> str: return _format_call_string('nms_maxpool_transform', kernel=self._kernel)
[docs] def nms_maxpool_transform(*, kernel: int) -> Transform: """ The nms_maxpool_transform operation. :param kernel: Size of pooling kernel :return: The nms_maxpool_transform transformation """ return _NmsMaxpoolTransform(kernel=kernel)
class _SoftmaxTransform(Transform): def __init__(self, *, axis: int): self._axis = axis def apply(self, t: Tensor) -> Tensor: if len(t.shape) <= self._axis: raise ValueError(f"{len(t.shape)}D tensor does not have axis {self._axis}") return ev_transforms.softmax(t, self._axis) def get_result_type(self, parameter_type: TensorType) -> TensorType: if len(parameter_type.shape) <= self._axis: raise ValueError(f"Tensor does not have axis {self._axis}") return TensorType(scalar=ScalarType.float32, shape=parameter_type.shape) def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: attrs = _afe_attr.SoftmaxAttrs(self._axis, input_type.shape) return self._build_ir_for_single_operator(_afe_op.SoftmaxOp(), "softmax_transform", attrs, input_type, input_name, output_name) def __str__(self) -> str: return _format_call_string('softmax_transform', axis=self._axis)
[docs] def softmax_transform(*, axis: int) -> Transform: """ The softmax transform operation. :param axis: Axis to sum over :return: The softmax transform """ return _SoftmaxTransform(axis=axis)
@dataclasses.dataclass(frozen=True) class _FunctionalTransform(Transform): """ _FunctionalTensorTransform(name, apply_f, type_f, extract_f, description) constructs a tensor transform that is implemented by the given functions. """ _apply_f: Callable[[Tensor], Tensor] _type_f: Callable[[TensorType], TensorType] _extract_f: Callable[[TensorType, NodeName, NodeName], tuple[NodeName, NodeName, TensorType, list[_AwesomeNode]]] # Human-readable description for displaying this object _description: str def apply(self, t: Tensor) -> Tensor: return self._apply_f(t) def get_result_type(self, parameter_type: TensorType) -> TensorType: return self._type_f(parameter_type) def extract_ir( self, input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: return self._extract_f(input_type, input_name, output_name) def __str__(self): return self._description
[docs] def compose(transforms: list[Transform]) -> Transform: """ Make a transform that is the composition of the given transforms. The composition performs transforms in the same order as they occur in the list: compose([x, y]).apply(t) == y.apply(x.apply(t)) :param transforms: Transforms to compose :return: Their composition """ def apply(t: Tensor) -> Tensor: return functools.reduce(lambda tensor, transform: transform.apply(tensor), transforms, t) def get_result_type(t: TensorType) -> TensorType: return functools.reduce(lambda ty, transform: transform.get_result_type(ty), transforms, t) def extract_ir( input_type: TensorType, input_name: NodeName, output_name: NodeName ) -> tuple[TensorType, list[_AwesomeNode]]: outputs: list[_AwesomeNode] = list() intermediate_result_type = input_type for idx, tr in enumerate(transforms): output_node_name = f"{output_name}_{idx}" input_node_type, out_nodes = tr.extract_ir(intermediate_result_type, input_name, output_node_name) intermediate_result_type = input_node_type outputs.extend(out_nodes) # Keep the same input name if Transform does not generate any nodes. input_name = out_nodes[-1].name if out_nodes else input_name return intermediate_result_type, outputs descriptions = '[' + ', '.join(str(t) for t in transforms) + ']' description = _format_call_string('compose', descriptions) return _FunctionalTransform(apply, get_result_type, extract_ir, description)
[docs] def identity() -> Transform: """An identity transformation on a tensor.""" return compose([])