#########################################################
# Copyright (C) 2021 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Joey Chou
#########################################################
from enum import Enum, auto
from dataclasses import dataclass
from typing import Tuple, Mapping, Type, List, Union
import numpy as np
from ml_kernels.types import bfloat16
from afe._tvm._defines import TVMTensorType
from afe.ir.defines import NodeAssociatedValue, DataValue, TensorValue, TupleValue, map_data_value
from afe.ir.utils import set_shape_batch_size
[docs]
UNRECOGNIZED_TYPE = "Unrecognized type {}"
[docs]
class TensorDirection(str, Enum):
[docs]
output_tensor = "output_tensor"
[docs]
class ScalarType(Enum):
"""
A scalar type used in quantized network arithmetic or tensors.
"""
@staticmethod
[docs]
def from_numpy(dtype: Union[np.dtype, Type[np.number]]) -> "ScalarType":
"""Make a ScalarType from a numpy numeric type."""
for np_type, sc_type in _NUMPY_TYPE_TO_SCALAR:
if np_type == dtype:
return sc_type
# Else, no type found
raise ValueError(UNRECOGNIZED_TYPE.format(dtype))
[docs]
def numpy_type(self) -> Type[np.number]:
"""Convert this scalar type to a numpy type."""
return _SCALAR_TYPE_TO_NUMPY[self]
# Association from numpy type objects to scalar types.
# We can't use a mapping because there are multiple different representations of types in numpy. They
# compare equal with == but have different object identity.
_NUMPY_TYPE_TO_SCALAR: List[Tuple[Type[np.number], ScalarType]] = [
(np.int8, ScalarType.int8),
(np.uint8, ScalarType.uint8),
(np.int16, ScalarType.int16),
(np.uint16, ScalarType.uint16),
(np.int32, ScalarType.int32),
(np.float32, ScalarType.float32),
(np.float64, ScalarType.float64),
(np.float16, ScalarType.float16),
(np.bool_, ScalarType.bool),
(np.uint32, ScalarType.uint32),
(np.uint64, ScalarType.uint64),
(np.int64, ScalarType.int64),
(bfloat16, ScalarType.bfloat16),
(np.dtype('bfloat16'), ScalarType.bfloat16),
]
# Mapping from scalar types to numpy type objects
_SCALAR_TYPE_TO_NUMPY: Mapping[ScalarType, Type[np.number]] = {
ScalarType.int8: np.int8,
ScalarType.uint8: np.uint8,
ScalarType.int16: np.int16,
ScalarType.uint16: np.uint16,
ScalarType.int32: np.int32,
ScalarType.float16: np.float16,
ScalarType.float32: np.float32,
ScalarType.float64: np.float64,
ScalarType.bool: np.bool_,
ScalarType.int64: np.int64,
ScalarType.uint32: np.uint32,
ScalarType.uint64: np.uint64,
ScalarType.bfloat16: bfloat16,
}
@dataclass(frozen=True)
[docs]
class TensorType:
"""
A type of a tensor in a network. It consists of the scalar type
and the shape of a multidimensional array.
"""
def __post_init__(self):
assert isinstance(self.scalar, ScalarType)
assert all(isinstance(x, int) for x in self.shape)
[docs]
def set_tensor_type_batch_size(t: TensorType, batch_size: int) -> TensorType:
return TensorType(t.scalar, set_shape_batch_size(t.shape, batch_size))
# A type of a node in a network.
[docs]
NodeType = NodeAssociatedValue[TensorType]
[docs]
def set_node_type_batch_size(n: NodeType, batch_size: int) -> NodeType:
return NodeType(
{
k: map_data_value(lambda x: set_tensor_type_batch_size(x, batch_size), v)
for k, v in n.inputs.items()
},
map_data_value(lambda x: set_tensor_type_batch_size(x, batch_size), n.output)
)
# All integral types. These types could be the quantized representation of a real number.
_INTEGRAL_TYPES = {
ScalarType.int8, ScalarType.uint8,
ScalarType.int16, ScalarType.uint16,
ScalarType.int32, ScalarType.uint32,
ScalarType.int64, ScalarType.uint64
}
# All floating-point types.
_FLOATING_TYPES = {
ScalarType.float32, ScalarType.bfloat16
}
[docs]
def scalar_is_integral(s: ScalarType) -> bool:
"""Return true if the scalar type is an integral type."""
return s in _INTEGRAL_TYPES
[docs]
def scalar_is_floating(s: ScalarType) -> bool:
"""Return true if the scalar type is a floating-point type."""
return s in _FLOATING_TYPES
[docs]
def scalar_byte_size(s: ScalarType) -> int:
"""
Calculate scalar byte size
:param s: Scalar type
:return: number of bytes
"""
if s in (ScalarType.int8, ScalarType.uint8):
num_of_bytes = 1
elif s in (ScalarType.int16, ScalarType.uint16, ScalarType.bfloat16):
num_of_bytes = 2
elif s in (ScalarType.int32, ScalarType.float32):
num_of_bytes = 4
elif s in (ScalarType.int64, ScalarType.uint64):
num_of_bytes = 8
else:
raise ValueError(f"Unsupported data type: {s}\n"
f"Currently supported data types: {ScalarType._member_names_}")
return num_of_bytes
[docs]
def tensor_byte_size(t: TensorType) -> int:
"""
Calculate tensor byte size
:param t: tensor type
:return: number of bytes
"""
assert t.shape is not None
# Find number of elements in tensor
number_of_elements = 1
for val in t.shape:
number_of_elements *= val
number_of_bytes = scalar_byte_size(t.scalar) * number_of_elements
return number_of_bytes
def _calculate_data_byte_size_and_offset(d: DataValue[TensorType], total_size: int = 0) \
-> Tuple[int, DataValue[int]]:
"""
Calculate memory size and offset of DataValue
:param d: data
:param total_size: memory size of all previous data blocks, if any
:return: memory size and offset value of the current data block
"""
if isinstance(d, TensorValue):
return tensor_byte_size(d.value), TensorValue(total_size)
elif isinstance(d, NodeAssociatedValue):
return tensor_byte_size(d.output.value), TensorValue(total_size)
else:
assert isinstance(d, TupleValue)
offset_list = list()
size = 0
for el in d.elements:
size += (-size) % 16
curr_size, curr_offset = _calculate_data_byte_size_and_offset(el, total_size + size)
offset_list.append(curr_offset)
size += curr_size
return size, TupleValue(offset_list)
def _calculate_data_byte_size(d: DataValue[TensorType]) -> int:
s, _ = _calculate_data_byte_size_and_offset(d)
return s
def _calculate_data_byte_offset(d: DataValue[TensorType]) -> DataValue[int]:
_, o = _calculate_data_byte_size_and_offset(d)
return o
[docs]
def data_byte_size(d: DataValue[TensorType]) -> int:
"""
Calculate data byte size
:param d: data
:return: number of bytes
"""
return _calculate_data_byte_size(d)
[docs]
def data_byte_offset(d: DataValue[TensorType]) -> DataValue[int]:
"""
Calculate data byte offset
:param d: data
:return: offset value
"""
return _calculate_data_byte_offset(d)
_SCALAR_TYPE_TO_DTYPE: Mapping[ScalarType, str] = {
ScalarType.int8: 'int8',
ScalarType.uint8: 'uint8',
ScalarType.int16: 'int16',
ScalarType.uint16: 'uint16',
ScalarType.int32: 'int32',
ScalarType.float32: 'float32',
ScalarType.float16: 'float16',
ScalarType.float64: 'float64',
ScalarType.bool: 'bool',
ScalarType.int64: 'int64',
ScalarType.uint64: 'uint64',
ScalarType.uint32: 'uint32',
ScalarType.bfloat16: 'bfloat16',
}
_DTYPE_TO_SCALAR_TYPE: Mapping[str, ScalarType] = {
'int8': ScalarType.int8,
'uint8': ScalarType.uint8,
'int16': ScalarType.int16,
'uint16': ScalarType.uint16,
'int32': ScalarType.int32,
'float32': ScalarType.float32,
'float16': ScalarType.float16,
'float64': ScalarType.float64,
'bool': ScalarType.bool,
'int64': ScalarType.int64,
'uint32': ScalarType.uint32,
'uint64': ScalarType.uint64,
'bfloat16': ScalarType.bfloat16,
}
[docs]
def scalar_type_from_dtype(dtype: str) -> ScalarType:
"""Make a ScalarType from a string name of a type used in TensorType."""
try:
return _DTYPE_TO_SCALAR_TYPE[dtype]
except KeyError:
raise ValueError(UNRECOGNIZED_TYPE.format(dtype))
[docs]
def scalar_type_to_dtype(scalar: ScalarType) -> str:
"""Make a string name from a ScalarType."""
try:
return _SCALAR_TYPE_TO_DTYPE[scalar]
except KeyError:
raise ValueError(UNRECOGNIZED_TYPE.format(scalar))
[docs]
def tensor_type_from_tvm_tensor_type(tensor_type: TVMTensorType) -> TensorType:
return TensorType(scalar=scalar_type_from_dtype(tensor_type.dtype),
shape=tensor_type.concrete_shape)