#########################################################
# Copyright (C) 2022 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Christopher Rodrigues
#########################################################
"""
Tensor transformations that can be applied to a model's
input or output.
"""
import dataclasses
import functools
from typing import Callable, Sequence
import numpy as np
from afe.apis.compilation_job_base import Tensor
from afe.apis.defines import (
ChromaSampling, ColorConversion, ColorSpaceStandard, ResizeDepositLocation, ResizeMethod
)
from afe.backends import Backend
from afe.core.configs import QuantizationConfigs
import afe.ir.operations as _afe_op
import afe.ir.attributes as _afe_attr
from afe.ir.defines import NodeName
from afe.ir.node import AwesomeNode as _AwesomeNode
from afe.ir.sima_ir import SiMaIR as _SimaIR
from afe.ir.sima_ir import SiMaIRMetadata as _SimaIRMetadata
from afe.ir.tensor_type import (
TensorType, ScalarType, TensorValue, TupleValue, data_byte_size, scalar_type_to_dtype
)
import afe.ir.operation_functions as _op_fn
from ml_kernels.math_helpers import RoundType as _RoundType
from ev_transforms import transforms as ev_transforms
def _format_call_string(name: str, *args, **kwargs) -> str:
"""
Create a function call expression for display.
Convert arguments and keyword arguments to strings and display them as
arguments or keyword arguments, respectively, in the displayed expression.
"""
call_args = [str(a) for a in args] + [kw + "=" + str(a) for kw, a in kwargs.items()]
return name + "(" + ",".join(call_args) + ")"
class _ArgmaxTransform(Transform):
def __init__(self, *, axis: int):
self._axis = axis
def apply(self, t: Tensor) -> Tensor:
if len(t.shape) <= self._axis:
raise ValueError("Tensor does not have the selected axis")
if t.dtype != np.float32:
raise ValueError("Only 32-bit floating point data is supported for argmax")
return ev_transforms.argmax(t, axis=self._axis)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
if len(parameter_type.shape) <= self._axis:
raise ValueError("Tensor does not have the selected axis")
if parameter_type.scalar.numpy_type() != np.float32:
raise ValueError("Only 32-bit floating point data is supported for argmax")
new_shape = tuple(v for i, v in enumerate(parameter_type.shape) if i != self._axis)
return TensorType(scalar=ScalarType.int32, shape=new_shape)
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
attrs = _afe_attr.ArgMaxAttrs(
[self._axis], False, False, input_type.shape, False, ScalarType.int32, input_type.scalar
)
return self._build_ir_for_single_operator(_afe_op.ArgMaxOp(), "argmax",
attrs, input_type, input_name, output_name)
def __str__(self) -> str:
return _format_call_string('argmax', axis=self._axis)
[docs]
def argmax(*, axis: int) -> Transform:
"""
The argmax operation applied over a single axis of a tensor.
:param axis: Axis to reduce
:return: The argmax transformation
"""
return _ArgmaxTransform(axis=axis)
class _ReshapeTransform(Transform):
def __init__(self, *, newshape: list[int]):
self._newshape = newshape
def apply(self, t: Tensor) -> Tensor:
return ev_transforms.reshape(self._newshape, t)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
new_shape = ev_transforms.calculate_new_shape_after_reshape(parameter_type.shape, self._newshape)
return TensorType(scalar=parameter_type.scalar, shape=tuple(new_shape))
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
attrs = _afe_attr.ReshapeAttrs(input_type.shape, input_type.scalar, self._newshape)
return self._build_ir_for_single_operator(_afe_op.ReshapeOp(), "reshape_transform",
attrs, input_type, input_name, output_name)
def __str__(self) -> str:
return _format_call_string('reshape_transform', newshape=self._newshape)
class _LayoutTransform(Transform):
"""
Layout transform
Used to test the infrastructure related to EV plugins
until EV plugins get implemented.
"""
def __init__(self, *, src_layout: str, dst_layout: str):
self._src_layout = src_layout
self._dst_layout = dst_layout
if len(self._src_layout) != len(self._dst_layout):
raise ValueError("Source and destination layouts incompatible.")
for s in self._src_layout:
assert s in self._dst_layout, "Source and destination layouts incompatible."
def apply(self, t: Tensor) -> Tensor:
return ev_transforms.layout_transform(t, self._src_layout, self._dst_layout)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
if len(self._src_layout) != len(parameter_type.shape):
raise ValueError("Input type shape not compatible with source layout.")
output_shape = tuple([parameter_type.shape[self._src_layout.index(c)] for c in self._dst_layout])
assert len(self._dst_layout) == len(output_shape), "Output type shape not compatible with source layout."
return dataclasses.replace(parameter_type, shape=output_shape)
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
# Use implicitly_removable flag to prevent removing this operator
attrs = _afe_attr.LayoutTransformAttrs(input_type, self._src_layout, self._dst_layout,
implicitly_removable=False)
return self._build_ir_for_single_operator(_afe_op.LayoutTransformOp(), "layout_transform",
attrs, input_type, input_name, output_name)
def __str__(self) -> str:
return _format_call_string('layout_transform', src_layout=self._src_layout, dst_layout=self._dst_layout)
class _TessellationTransform(Transform):
"""
Tessellation transform
Used to convert a tensor from frame based storage format to slice (or tile) based storage format.
"""
_name: str = 'tessellation_transform'
def __init__(
self, *, slice_shape: Sequence[int], align_c16: bool, cblock: bool
):
self._slice_shape = slice_shape
self._align_c16 = align_c16
self._cblock = cblock
if not self._align_c16 and self._cblock:
raise ValueError("Channels need to be aligned to 16 to be blocked.")
def apply(self, t: Tensor) -> Tensor:
if len(t.shape) not in (4, 5):
raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D/5D is expected.")
if len(t.shape) == len(self._slice_shape) + 1:
raise ValueError(
f"Input tensor is {len(t.shape)}D, but slice shape is {len(self._slice_shape)}."
)
if any(x > y for x, y in zip(self._slice_shape, t.shape[1:])):
raise ValueError(
f"Slice shape {self._slice_shape} is bigger than frame shape {t.shape[1:]}"
)
if self._slice_shape[-1] < t.shape[-1] and self._slice_shape[-1] % 16 != 0:
raise ValueError(
f"Unsupported slice channel size {self._slice_shape[-1]} for frame channel size"
f" {t.shape[-1]}"
)
return ev_transforms.tessellation(t, self._slice_shape, self._align_c16, self._cblock)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
if len(parameter_type.shape) not in (4, 5):
raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D/5D is expected")
if any(x > y for x, y in zip(self._slice_shape, parameter_type.shape[1:])):
raise ValueError(
f"Slice shape {self._slice_shape} is bigger than frame shape"
f" {parameter_type.shape[1:]}"
)
if self._slice_shape[-1] < parameter_type.shape[-1] and self._slice_shape[-1] % 16 != 0:
raise ValueError(
f"Unsupported slice channel size {self._slice_shape[-1]} for frame channel size"
f" {parameter_type.shape[-1]}"
)
# Calculate output_tensor_shape
output_shape = _op_fn.calculate_tessellated_tensor_shape(
parameter_type, self._slice_shape, self._align_c16
)
return dataclasses.replace(parameter_type, shape=output_shape)
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
if len(input_type.shape) not in (4, 5):
raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D/5D is expected")
if any(x > y for x, y in zip(self._slice_shape, input_type.shape[1:])):
raise ValueError(
f"Slice shape {self._slice_shape} is bigger than frame shape"
f" {input_type.shape[1:]}"
)
attrs = _afe_attr.TessellationTransformAttrs(
self._slice_shape, self._align_c16, self._cblock, input_type
)
return self._build_ir_for_single_operator(
_afe_op.TessellationTransformOp(), "tessellation_transform", attrs, input_type,
input_name, output_name
)
def __str__(self) -> str:
return _format_call_string(
self._name, slice_shape=self._slice_shape, align_c16=self._align_c16,
cblock=self._cblock
)
class _DetessellationTransform(Transform):
"""
Detessellation transform
Used to convert a tensor from slice based storage format to frame based storage format.
"""
_name: str = 'detessellation_transform'
def __init__(
self, *, slice_shape: Sequence[int], align_c16: bool, cblock: bool, frame_type: TensorType
):
self._slice_shape = slice_shape
self._align_c16 = align_c16
self._cblock = cblock
self._frame_type = frame_type
if len(self._frame_type.shape) not in (4, 5):
raise ValueError(
f"Frame shape is {len(self._frame_type.shape)}D, but 4D/5D is expected"
)
if any(x > y for x, y in zip(self._slice_shape, self._frame_type.shape[1:])):
raise ValueError(
f"Slice shape {self._slice_shape} is bigger than frame shape"
f" {self._frame_type.shape[1:]}"
)
if self._slice_shape[-1] < self._frame_type.shape[-1] and self._slice_shape[-1] % 16 != 0:
raise ValueError(f"Slice channels ({self._slice_shape[-1]}) is not multiple of 16!")
if not self._align_c16 and self._cblock:
raise ValueError("Channels need to be aligned to 16 to be blocked.")
def apply(self, t: Tensor) -> Tensor:
if len(t.shape) != 2:
raise ValueError(f"Input tensor is {len(t.shape)}D, but 2D is expected")
if t.dtype != np.int8:
raise ValueError(f"Tensor type is {t.dtype}, but int8 is expected")
return ev_transforms.detessellation(
t, self._slice_shape,
scalar_type_to_dtype(self._frame_type.scalar), self._frame_type.shape,
self._align_c16, self._cblock
)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
if len(parameter_type.shape) != 2:
raise ValueError(f"Input tensor shape is {len(parameter_type.shape)}D, but 2D is expected")
if parameter_type.scalar != ScalarType.int8:
raise ValueError(f"Input tensor type is {parameter_type.scalar}, but int8 is expected")
return self._frame_type
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
if len(input_type.shape) != 2:
raise ValueError(f"Input shape is {len(input_type.shape)}D, but 2D is expected")
if input_type.scalar != ScalarType.int8:
raise ValueError(f"Input type is {input_type.scalar}, but int8 is expected")
attrs = _afe_attr.DetessellationTransformAttrs(
self._slice_shape, self._align_c16, self._cblock, self._frame_type, input_type.shape
)
return self._build_ir_for_single_operator(
_afe_op.DetessellationTransformOp(), "detessellation_transform", attrs, input_type,
input_name, output_name
)
def __str__(self) -> str:
return _format_call_string(
self._name, slice_shape=self._slice_shape, align_c16=self._align_c16,
cblock=self._cblock, frame_type=self._frame_type
)
class _PackTransform(Transform):
"""
Pack transform
Used to pack multiple tensors into a single 2D array
"""
def __init__(self):
"""
No parameters are required outside tensor data to transform
"""
pass
def apply(self, t: list[Tensor]) -> Tensor:
if len(t) <= 1:
raise ValueError(f"Multiple tensors are required in pack transform, but got {len(t)}")
return _op_fn.pack(t)
def get_packed_result_type(self, parameter_types: list[TensorType]) -> TensorType:
if len(parameter_types) <= 1:
raise ValueError(f"Multiple tensors are required in pack transform, but got {len(parameter_types)}")
# Output tensor is 2D int8 with paddings
batch_size = parameter_types[0].shape[0]
data_types = _afe_op.get_pack_input_types(parameter_types)
tensor_values = [TensorValue(TensorType(pt.scalar, pt.shape[1:])) for pt in data_types]
tuple_value = TupleValue(tensor_values)
total_size = data_byte_size(tuple_value)
return TensorType(ScalarType.int8, (batch_size, total_size))
def extract_pack_ir(
self, input_types: list[TensorType], input_name_list: list[NodeName], output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
if len(input_types) <= 1:
raise ValueError(f"Multiple tensors are required in pack transform, but got {len(input_types)}")
operation = _afe_op.PackTransformOp()
attrs = _afe_attr.PackTransformAttrs(input_types, ScalarType.int8)
output_type = self.get_packed_result_type(input_types)
node_name = NodeName(f"{output_name}_pack_transform")
node = _AwesomeNode(name=node_name, input_names=[f"input_{idx}" for idx in range(len(input_types))],
input_node_names=input_name_list,
ir=_SimaIR(operation=operation, _attrs=attrs, calib_attrs=_afe_attr.AwesomeCalibAttrs(),
_quant_attrs=None, quant_config=QuantizationConfigs(), backend=Backend.EV,
_metadata=_SimaIRMetadata("")))
return output_type, [node]
def __str__(self) -> str:
return _format_call_string('pack_transform')
class _UnpackTransform(Transform):
"""
Unpack transform
Used to unpack 2D array to multiple ND arrays according to target tensor types
"""
def __init__(self, *, tensor_types: list[TensorType]):
self._tensor_types = tensor_types
def apply(self, t: Tensor) -> list[Tensor]:
if len(t.shape) != 2:
raise ValueError(f"Input tensor is {len(t.shape)}D, but 2D is expected")
if t.dtype != np.int8:
raise ValueError(f"Tensor type is {t.dtype}, but int8 is expected")
return ev_transforms.unpack(t, [scalar_type_to_dtype(tt.scalar) for tt in self._tensor_types],
[tt.shape for tt in self._tensor_types])
def get_unpacked_result_type(self, parameter_type: TensorType) -> list[TensorType]:
if len(parameter_type.shape) != 2:
raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 2D is expected")
if parameter_type.scalar != ScalarType.int8:
raise ValueError(f"Tensor type is {parameter_type.scalar}, but int8 is expected")
# Output is determined by the target tensor types
return self._tensor_types
def extract_unpack_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[list[TensorType], list[_AwesomeNode]]:
if len(input_type.shape) != 2:
raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 2D is expected")
if input_type.scalar != ScalarType.int8:
raise ValueError(f"Tensor type is {input_type.scalar}, but int8 is expected")
operation = _afe_op.UnpackTransformOp()
attrs = _afe_attr.UnpackTransformAttrs(input_type.shape, self._tensor_types)
output_types = self.get_unpacked_result_type(input_type)
node_name = NodeName(f"{output_name}_unpack_transform")
node = _AwesomeNode(name=node_name, input_names=operation.input_list, input_node_names=[input_name],
ir=_SimaIR(operation=operation, _attrs=attrs, calib_attrs=_afe_attr.AwesomeCalibAttrs(),
_quant_attrs=None, quant_config=QuantizationConfigs(),
backend=Backend.EV, # Currently all transforms go to EV
_metadata=_SimaIRMetadata("")))
return output_types, [node]
def __str__(self) -> str:
return _format_call_string('unpack_transform', tensor_types=self._tensor_types)
class _NormalizationTransform(Transform):
"""
Normalization transform
Used to normalize a tensor
"""
def __init__(self, *, channel_params: list[tuple[float, float, float]]):
self._channel_params = channel_params
if len(self._channel_params[0]) != 3:
raise ValueError(f"Number of parameters must be 3, but got {len(self._channel_params[0])}")
def apply(self, t: Tensor) -> Tensor:
if len(t.shape) != 4:
raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected")
return ev_transforms.normalize(t, self._channel_params)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
if len(parameter_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected")
output_type = ScalarType.float32
return dataclasses.replace(parameter_type, scalar=output_type)
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
if len(input_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected")
attrs = _afe_attr.NormalizationTransformAttrs(self._channel_params, input_type)
return self._build_ir_for_single_operator(_afe_op.NormalizationTransformOp(), "normalization_transform",
attrs, input_type, input_name, output_name)
def __str__(self) -> str:
return _format_call_string('normalization_transform', channel_params=self._channel_params)
class _QuantizationTransform(Transform):
"""
Quantization transform
Used to quantize a tensor with given per-channel zero point and scale values
"""
def __init__(self, *, channel_params: list[tuple[float, int]], num_bits: int,
rounding: _RoundType = _RoundType.TONEAREST):
self._channel_params = channel_params
self._num_bits = num_bits
self._rounding = rounding
if len(self._channel_params[0]) != 2:
raise ValueError(f"Number of parameters must be 2, but got {len(self._channel_params[0])}")
if self._num_bits != 8 and self._num_bits != 32:
raise ValueError(f"Number of bits for quantiation must be 8 or 32, but got {self._num_bits}")
if self._rounding != _RoundType.TONEAREST:
raise ValueError(f"Rounding type must be TONEAREST, but got {self._rounding}")
def apply(self, t: Tensor) -> Tensor:
if len(t.shape) != 4:
raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected")
if t.dtype != np.float32:
raise ValueError(f"Input tensor type is {t.dtype}, but float32 is expected")
return ev_transforms.quantize(t, self._channel_params, self._num_bits)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
if len(parameter_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected")
if parameter_type.scalar != ScalarType.float32:
raise ValueError(f"Tensor type is {parameter_type.scalar}, but float32 is expected")
result_type = ScalarType.int8 if self._num_bits == 8 else ScalarType.int32
return dataclasses.replace(parameter_type, scalar=result_type)
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
if len(input_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected")
if input_type.scalar != ScalarType.float32:
raise ValueError(f"Tensor type is {input_type.scalar}, but float32 is expected")
output_data_type = ScalarType.int8 if self._num_bits == 8 else ScalarType.int32
attrs = _afe_attr.QuantizationTransformAttrs(self._channel_params, input_type.shape,
self._num_bits, self._rounding, output_data_type)
return self._build_ir_for_single_operator(_afe_op.QuantizationTransformOp(), "quantization_transform",
attrs, input_type, input_name, output_name)
def __str__(self) -> str:
return _format_call_string('quantization_transform', channel_params=self._channel_params,
num_bits=self._num_bits, rounding=self._rounding)
class _DequantizationTransform(Transform):
"""
Dequantization transform
Used to de-quantize a tensor with given per-channel zero point and quantization scale values
"""
def __init__(self, *, channel_params: list[tuple[float, int]]):
self._channel_params = channel_params
if len(self._channel_params[0]) != 2:
raise ValueError(f"Number of parameters must be 2, but got {len(self._channel_params[0])}")
def apply(self, t: Tensor) -> Tensor:
if len(t.shape) != 4:
raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected")
if t.dtype != np.int8 and t.dtype != np.int32:
raise ValueError(f"Input tensor type is {t.dtype}, but int8 or int32 is expected")
return ev_transforms.dequantize(t, self._channel_params)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
if len(parameter_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected")
if parameter_type.scalar != ScalarType.int8 and parameter_type.scalar != ScalarType.int32:
raise ValueError(f"Tensor type is {parameter_type.scalar}, but int8 or int32 is expected")
output_type = ScalarType.float32
return dataclasses.replace(parameter_type, scalar=output_type)
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
if len(input_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected")
if input_type.scalar != ScalarType.int8 and input_type.scalar != ScalarType.int32:
raise ValueError(f"Tensor type is {input_type.scalar}, but int8 or int32 is expected")
attrs = _afe_attr.DequantizationTransformAttrs(self._channel_params, input_type)
return self._build_ir_for_single_operator(_afe_op.DequantizationTransformOp(), "dequantization_transform",
attrs, input_type, input_name, output_name)
def __str__(self) -> str:
return _format_call_string('dequantization_transform', channel_params=self._channel_params)
class _ResizeTransform(Transform):
"""
Resize transform
Used to resize a tensor
"""
def __init__(self, *, target_height: int, target_width: int, keep_aspect: bool,
deposit_location: ResizeDepositLocation, method: ResizeMethod):
self._target_height = target_height
self._target_width = target_width
self._keep_aspect = keep_aspect
self._deposit_location = deposit_location
self._method = method
def apply(self, t: Tensor) -> Tensor:
if len(t.shape) != 4:
raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected")
return ev_transforms.resize(t, self._target_width, self._target_height,
self._keep_aspect, self._deposit_location.value, self._method.value)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
if len(parameter_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected")
output_shape = (parameter_type.shape[0], self._target_height, self._target_width, parameter_type.shape[3])
return dataclasses.replace(parameter_type, shape=output_shape)
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
if len(input_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected")
attrs = _afe_attr.ResizeTransformAttrs(self._target_height, self._target_width, self._keep_aspect,
self._deposit_location, self._method, input_type)
return self._build_ir_for_single_operator(_afe_op.ResizeTransformOp(), "resize_transform",
attrs, input_type, input_name, output_name)
def __str__(self) -> str:
return _format_call_string('resize_transform', target_height=self._target_height,
target_width=self._target_width, keep_aspect=self._keep_aspect,
deposit_location=self._deposit_location, method=self._method)
class _ChromaUpsampleTransform(Transform):
"""
Chroma upsample transform
Used to interpolate subsampled YUV images
"""
def __init__(self, *, frame_height: int, frame_width: int, yuv_sampling: ChromaSampling):
self._frame_height = frame_height
self._frame_width = frame_width
self._yuv_sampling = yuv_sampling
def _expected_frame_size_from_subsampled_chroma(self) -> int:
frame_size: int = self._frame_height * self._frame_width
if self._yuv_sampling in (ChromaSampling.NV12, ChromaSampling.YUV420):
frame_size = frame_size * 3 // 2
elif self._yuv_sampling == ChromaSampling.YUV422:
frame_size *= 2
else:
raise ValueError(f"Not supported chroma sampling format {self._yuv_sampling}")
return frame_size
def apply(self, t: Tensor) -> Tensor:
if len(t.shape) != 2:
raise ValueError(f"Input tensor is {len(t.shape)}D, but 2D is expected")
expected_size = self._expected_frame_size_from_subsampled_chroma()
if t.shape[1] != expected_size:
raise ValueError(f"Input tensor size not match {self._frame_height}H x {self._frame_width}W "
f"for chroma {self._yuv_sampling}")
return ev_transforms.chroma_upsample(t, self._frame_height, self._frame_width,
self._yuv_sampling.value)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
if len(parameter_type.shape) != 2:
raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 2D is expected")
expected_size = self._expected_frame_size_from_subsampled_chroma()
if parameter_type.shape[1] != expected_size:
raise ValueError(f"Input tensor size not match {self._frame_height}H x {self._frame_width}W "
f"for chroma {self._yuv_sampling}")
output_shape = (parameter_type.shape[0], self._frame_height, self._frame_width, 3)
return dataclasses.replace(parameter_type, shape=output_shape)
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
if len(input_type.shape) != 2:
raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 2D is expected")
expected_size = self._expected_frame_size_from_subsampled_chroma()
if input_type.shape[1] != expected_size:
raise ValueError(f"Input tensor size not match {self._frame_height}H x {self._frame_width}W "
f"for chroma {self._yuv_sampling}")
attrs = _afe_attr.ChromaUpsampleTransformAttrs(self._frame_height, self._frame_width,
self._yuv_sampling, input_type)
return self._build_ir_for_single_operator(_afe_op.ChromaUpsampleTransformOp(), "chroma_upsample_transform",
attrs, input_type, input_name, output_name)
def __str__(self) -> str:
return _format_call_string('chroma_upsample_transform', frame_height=self._frame_height,
frame_width=self._frame_width, yuv_sampling=self._yuv_sampling)
class _YuvRgbConversionTransform(Transform):
"""
YUV and RGB conversion transform
Used to convert between YUV and RGB images
"""
def __init__(self, *, conversion: ColorConversion, std: ColorSpaceStandard):
self._conversion = conversion
self._std = std
def apply(self, t: Tensor) -> Tensor:
if t.dtype != np.uint8:
raise ValueError(f"Tensor type is {str(t.dtype)}, but uint8 is expected.")
if len(t.shape) != 4:
raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected")
if t.shape[3] != 3:
raise ValueError(f"Number of input channels is {t.shape[3]}, but 3 is expected")
return ev_transforms.yuv_rgb_conversion(t, self._conversion.value, self._std.value)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
if parameter_type.scalar != ScalarType.uint8:
raise ValueError(f"Tensor type is {parameter_type.scalar.value}, but uint8 is expected.")
if len(parameter_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected")
if parameter_type.shape[3] != 3:
raise ValueError(f"Number of input channels is {parameter_type.shape[3]}, but 3 is expected")
return parameter_type
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
if input_type.scalar != ScalarType.uint8:
raise ValueError(f"Tensor type is {input_type.scalar.value}, but int8 is expected.")
if len(input_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected")
if input_type.shape[3] != 3:
raise ValueError(f"Number of input channels is {input_type.shape[3]}, but 3 is expected")
attrs = _afe_attr.YuvRgbConversionTransformAttrs(self._conversion, self._std, input_type.shape)
return self._build_ir_for_single_operator(_afe_op.YuvRgbConversionTransformOp(), "yuv_rgb_conversion_transform",
attrs, input_type, input_name, output_name)
def __str__(self) -> str:
return _format_call_string('yuv_rgb_conversion_transform', conversion=self._conversion, std=self._std)
class _BgrRgbConversionTransform(Transform):
"""
BGR and RGB conversion transform
Used to convert between BGR and RGB images
"""
def __init__(self, *, conversion: ColorConversion):
self._conversion = conversion
def apply(self, t: Tensor) -> Tensor:
if t.dtype != np.uint8:
raise ValueError(f"Tensor type is {str(t.dtype)}, but uint8 is expected.")
if len(t.shape) != 4:
raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected")
if t.shape[3] != 3:
raise ValueError(f"Number of input channels is {t.shape[3]}, but 3 is expected")
return ev_transforms.bgr_rgb_conversion(t, self._conversion.value)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
if parameter_type.scalar != ScalarType.uint8:
raise ValueError(f"Tensor type is {parameter_type.scalar.value}, but uint8 is expected.")
if len(parameter_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected")
if parameter_type.shape[3] != 3:
raise ValueError(f"Number of input channels is {parameter_type.shape[3]}, but 3 is expected")
return parameter_type
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
if input_type.scalar != ScalarType.uint8:
raise ValueError(f"Tensor type is {input_type.scalar.value}, but int8 is expected.")
if len(input_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected")
if input_type.shape[3] != 3:
raise ValueError(f"Number of input channels is {input_type.shape[3]}, but 3 is expected")
attrs = _afe_attr.BgrRgbConversionTransformAttrs(self._conversion, input_type.shape)
return self._build_ir_for_single_operator(_afe_op.BgrRgbConversionTransformOp(), "bgr_rgb_conversion_transform",
attrs, input_type, input_name, output_name)
def __str__(self) -> str:
return _format_call_string('bgr_rgb_conversion_transform', conversion=self._conversion)
def _validate_crop_with_shape(input_shape: tuple[int, ...], bounding_box: list[tuple[int, int]]):
"""
Validate the parameters of crop_transform against the shape of the input tensor.
Raise an exception for invalid parameters.
The caller must validate the individual parameters before this call.
"""
assert bounding_box[0][0] >= 0 and bounding_box[0][1] >= 0, \
"Bounding box origin must be greater than 0 in each dimension"
assert bounding_box[1][0] <= input_shape[1] and bounding_box[1][1] <= input_shape[2], \
"Bounding box endpoint must not be greater than input tensor shape"
def _convert_crop_to_slice_parameters(
input_shape: tuple[int, ...], bounding_box: list[tuple[int, int]]
) -> tuple[list[int], list[int]]:
"""
Convert the parameters of crop_transform to parameters of slice_transform, given the shape of the input tensor.
The parameters are not validated.
:param input_shape: Shape of the input tensor. Must be 4D.
:param bounding_box: Bounding box of crop transform, as a list of two (x,y) coordinates.
:return: Begin and end tuples for slice transform
"""
begin = [0, bounding_box[0][0], bounding_box[0][1], 0]
end = [input_shape[0], bounding_box[1][0], bounding_box[1][1], input_shape[3]]
return begin, end
def _get_crop_output_shape(input_shape: tuple[int, ...], bounding_box: list[tuple[int, int]]) -> list[int]:
"""
Get the shape of the output of crop_transform, given the shape of the input tensor.
The parameters are not validated.
:param input_shape: Shape of the input tensor. Must be 4D.
:param bounding_box: Bounding box of crop transform, as a list of two (x,y) coordinates.
:return: Output shape
"""
return [
input_shape[0],
bounding_box[1][0] - bounding_box[0][0],
bounding_box[1][1] - bounding_box[0][1],
input_shape[3]
]
class _CropTransform(Transform):
"""
Crop transform
Used to crop a rectangle region of a tensor
"""
def __init__(self, *, bounding_box: list[tuple[int, int]]):
assert len(bounding_box) == 2, f"Incompatible bounding box length = {len(bounding_box)}"
self._bounding_box = bounding_box
def apply(self, t: Tensor) -> Tensor:
if len(t.shape) != 4:
raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected")
begin, end = _convert_crop_to_slice_parameters(t.shape, self._bounding_box)
return ev_transforms.ev_slice(t, begin, end)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
if len(parameter_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected")
return TensorType(parameter_type.scalar, tuple(_get_crop_output_shape(t.shape, self._bounding_box)))
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
if len(input_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected")
begin, end = _convert_crop_to_slice_parameters(t.shape, self._bounding_box)
attrs = _afe_attr.StridedSliceAttrs(begin=begin, end=end, strides=[1, 1, 1, 1],
axes=[0, 1, 2, 3], slice_mode='end', input_shape=input_type.shape,
input_type=input_type.scalar)
return self._build_ir_for_single_operator(_afe_op.StridedSliceOp(), "slice_transform",
attrs, input_type, input_name, output_name)
def __str__(self) -> str:
return _format_call_string('crop_transform', conversion=self._bounding_box)
class _SliceTransform(Transform):
"""
Slice transform.
Extracts a slice of a tensor.
"""
def __init__(self, *, begin: Sequence[int], end: Sequence[int]):
assert len(begin) in (4, 5), (
f"Incompatible begin indices length = {len(begin)}, expected 4 or 5."
)
assert len(end) in (4, 5), f"Incompatible end indices length = {len(end)}, expected 4 or 5."
self._begin = begin
self._end = end
def apply(self, t: Tensor) -> Tensor:
if len(t.shape) not in (4, 5):
raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D/5D is expected")
return ev_transforms.ev_slice(t, self._begin, self._end)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
if len(parameter_type.shape) not in (4, 5):
raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D/5D is expected")
return parameter_type
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
if len(input_type.shape) not in (4, 5):
raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D/5D is expected")
attrs = _afe_attr.StridedSliceAttrs(
begin=self._begin, end=self._end, strides=[1] * len(self._begin),
axes=list(range(len(self._begin))), slice_mode='end', input_shape=input_type.shape,
input_type=input_type.scalar
)
return self._build_ir_for_single_operator(
_afe_op.StridedSliceOp(), "slice_transform", attrs, input_type, input_name,
output_name
)
def __str__(self) -> str:
return _format_call_string('slice_transform', begin=self._begin, end=self._end)
class _SigmoidTransform(Transform):
"""
Sigmoid transform
Used to compute sigmoid of tensor
"""
def __init__(self, *, save_int16: bool):
self._save_int16 = save_int16
def apply(self, t: Tensor) -> Tensor:
if len(t.shape) != 4:
raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected")
return ev_transforms.sigmoid(t, self._save_int16)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
if len(parameter_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected")
output_type = ScalarType.int16 if self._save_int16 else ScalarType.float32
return dataclasses.replace(parameter_type, scalar=output_type)
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
if len(input_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected")
attrs = _afe_attr.SigmoidTransformAttrs(self._save_int16, input_type.shape)
return self._build_ir_for_single_operator(_afe_op.SigmoidTransformOp(), "sigmoid_transform",
attrs, input_type, input_name, output_name)
def __str__(self) -> str:
return _format_call_string('sigmoid_transform', save_int16=self._save_int16)
class _NmsMaxpoolTransform(Transform):
"""
NMS maxpool transform
Used to compute maxpool with NMS
"""
def __init__(self, *, kernel: int):
self._kernel = kernel
def apply(self, t: Tensor) -> Tensor:
if len(t.shape) != 4:
raise ValueError(f"Input tensor is {len(t.shape)}D, but 4D is expected")
return ev_transforms.nms_maxpool(t, self._kernel)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
if len(parameter_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(parameter_type.shape)}D, but 4D is expected")
return parameter_type
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
if len(input_type.shape) != 4:
raise ValueError(f"Tensor shape is {len(input_type.shape)}D, but 4D is expected")
attrs = _afe_attr.NmsMaxpoolTransformAttrs(self._kernel, input_type)
return self._build_ir_for_single_operator(_afe_op.NmsMaxpoolTransformOp(), "nms_maxpool_transform",
attrs, input_type, input_name, output_name)
def __str__(self) -> str:
return _format_call_string('nms_maxpool_transform', kernel=self._kernel)
class _SoftmaxTransform(Transform):
def __init__(self, *, axis: int):
self._axis = axis
def apply(self, t: Tensor) -> Tensor:
if len(t.shape) <= self._axis:
raise ValueError(f"{len(t.shape)}D tensor does not have axis {self._axis}")
return ev_transforms.softmax(t, self._axis)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
if len(parameter_type.shape) <= self._axis:
raise ValueError(f"Tensor does not have axis {self._axis}")
return TensorType(scalar=ScalarType.float32, shape=parameter_type.shape)
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
attrs = _afe_attr.SoftmaxAttrs(self._axis, input_type.shape)
return self._build_ir_for_single_operator(_afe_op.SoftmaxOp(), "softmax_transform",
attrs, input_type, input_name, output_name)
def __str__(self) -> str:
return _format_call_string('softmax_transform', axis=self._axis)
@dataclasses.dataclass(frozen=True)
class _FunctionalTransform(Transform):
"""
_FunctionalTensorTransform(name, apply_f, type_f, extract_f, description)
constructs a tensor transform that is implemented by the given functions.
"""
_apply_f: Callable[[Tensor], Tensor]
_type_f: Callable[[TensorType], TensorType]
_extract_f: Callable[[TensorType, NodeName, NodeName],
tuple[NodeName, NodeName, TensorType, list[_AwesomeNode]]]
# Human-readable description for displaying this object
_description: str
def apply(self, t: Tensor) -> Tensor:
return self._apply_f(t)
def get_result_type(self, parameter_type: TensorType) -> TensorType:
return self._type_f(parameter_type)
def extract_ir(
self, input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
return self._extract_f(input_type, input_name, output_name)
def __str__(self):
return self._description
[docs]
def compose(transforms: list[Transform]) -> Transform:
"""
Make a transform that is the composition of the given transforms.
The composition performs transforms in the same order as they occur
in the list:
compose([x, y]).apply(t) == y.apply(x.apply(t))
:param transforms: Transforms to compose
:return: Their composition
"""
def apply(t: Tensor) -> Tensor:
return functools.reduce(lambda tensor, transform: transform.apply(tensor), transforms, t)
def get_result_type(t: TensorType) -> TensorType:
return functools.reduce(lambda ty, transform: transform.get_result_type(ty), transforms, t)
def extract_ir(
input_type: TensorType, input_name: NodeName, output_name: NodeName
) -> tuple[TensorType, list[_AwesomeNode]]:
outputs: list[_AwesomeNode] = list()
intermediate_result_type = input_type
for idx, tr in enumerate(transforms):
output_node_name = f"{output_name}_{idx}"
input_node_type, out_nodes = tr.extract_ir(intermediate_result_type, input_name, output_node_name)
intermediate_result_type = input_node_type
outputs.extend(out_nodes)
# Keep the same input name if Transform does not generate any nodes.
input_name = out_nodes[-1].name if out_nodes else input_name
return intermediate_result_type, outputs
descriptions = '[' + ', '.join(str(t) for t in transforms) + ']'
description = _format_call_string('compose', descriptions)
return _FunctionalTransform(apply, get_result_type, extract_ir, description)
[docs]
def identity() -> Transform:
"""An identity transformation on a tensor."""
return compose([])