#########################################################
# Copyright (C) 2022 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Christopher Rodrigues
#########################################################
"""AwesomeNode to Netron conversion."""
import json
from dataclasses import dataclass
from typing import Dict, Any, Optional, Tuple, Union, List
import numpy as np
from afe.backends import BackendIR
from afe.common_utils import get_index_from_node_name
from afe.ir.attributes import AwesomeAttributes, AwesomeQuantAttrBase, ConstantAttrs, ConstantQuantAttrs
from afe.ir.defines import NodeName, Status, get_expected_tensor_value, data_value_elements
from afe.ir.net import AwesomeNet
from afe.ir.node import AwesomeNode, node_is_sima_ir, node_is_placeholder, node_is_external, node_is_constant, \
node_is_awesomenet
from afe.ir.serializer.visualizer_defines import NNodeName, NArray, NLayerWeights, NInboundNode, \
NLayer, NInterfaceLayer
# Attributes names
[docs]
WEIGHT_ATTRS_NAME = 'weights_attrs'
[docs]
BIAS_ATTRS_NAME = 'bias_attrs'
[docs]
netron_name_map = {
WEIGHT_ATTRS_NAME: 'weights',
BIAS_ATTRS_NAME: 'bias',
}
@dataclass(frozen=True)
[docs]
class NetronOptions:
"""Options controlling how to produce a Netron graph from an AwesomeNet."""
pass
def _get_awesomenode_types_and_attrs_dict(node: AwesomeNode,
netron_name: "NNodeName"
) -> Dict[str, Any]:
"""
Get AwesomeNode input types, output types and attributes in string format. Return them
in a dictionary.
:param node: AwesomeNode
:param netron_name: Name used in Netron file for the node
:return: Dict[str, Any]
"""
# Only serialize AwesomeNode containing SiMaIR
assert node_is_sima_ir(node), f"Expect node contains a SiMaIR, got {type(node.ir)} instead"
node_info_dict = dict()
# AwesomeAttributes
if node.ir.attrs is not None:
node_info_dict.update(node.ir.attrs.__dict__)
# Remove this field because the data may be very large, which causes a problem when loading from file
if 'irmod_str' in node_info_dict:
del node_info_dict['irmod_str']
# Node Status
node_info_dict.update({"_status": node.status})
# Layer Statistics
if node.layer_stats is not None:
node_info_dict.update({"Layer Statistics": node._layer_stats})
# Show quant_attrs if the node is quantized
if node.status == Status.SIMA_QUANTIZED and node.ir.quant_attrs is not None:
node_info_dict.update(node.ir.quant_attrs.__dict__)
# Input types
# Only get input types for non-placeholder AwesomeNode
node_type = node.get_type()
if not node_is_placeholder(node):
input_types = [data_value_elements(t) for t in node_type.inputs.values()]
node_info_dict.update({"_input_types": input_types})
# Output types
node_info_dict.update({"_output_types": data_value_elements(node_type.output)})
# Metadata
node_info_dict.update({"_metadata": json.dumps(node.ir._metadata)})
config_dict = dict()
if node_is_placeholder(node):
# Displays input shapes in Netron
config_dict["batch_input_shape"] = list(get_expected_tensor_value(node_type.output).shape)
for key, value in node_info_dict.items():
if isinstance(value, (AwesomeAttributes, AwesomeQuantAttrBase)):
sub_attr_dict = value.__dict__
for k, v in sub_attr_dict.items():
config_dict[key + "." + k] = v
else:
config_dict[key] = value
# Name used for identifying node in Netron
config_dict["name"] = netron_name
return config_dict
def _replace_node_name_with_operation_name(node: AwesomeNode) -> str:
"""
Replace the Relay expression name in the node_name with its corresponding
AwesomeOperation name. Keep the prefix and the index
Example
-------
MLA_1/conv2d_10 -> MLA_1/Conv2DOp_10 where
* prefix = 'MLA_1/'
* index = 10
* Relay expression name = 'conv2d'
* AwesomeOperation name = "Conv2DOp
"""
node_name = node.name
# Get the prefix. The prefix is section before the last "/"
prefix = node_name[:node_name.rfind('/') + 1]
# Get the index number from the node name
index = get_index_from_node_name(node_name)
new_node_name = type(node.ir.operation).__name__[:-2] + f"_{index}"
# If prefix exists, put it before the new node name
if prefix != "":
new_node_name = prefix + new_node_name
return new_node_name
def _constant_array(attrs: Union[ConstantAttrs, ConstantQuantAttrs], node_name: NNodeName, const_name: str) -> NArray:
"""
Create an NArray to represent a constant array parameter of a node.
:param attrs: Constant array as an attributes object.
:param node_name: Name of the node in the Netron file.
:param const_name: Name of the node's attribute that contains the constant array.
:return: Array to use in Netron file.
"""
data = attrs.quant_data if isinstance(attrs, ConstantQuantAttrs) else attrs.data
assert isinstance(data, np.ndarray), "Error: Constant node data has not been initialized"
netron_constant_name = node_name + "/" + netron_name_map[const_name]
return NArray(str(data.dtype), list(data.shape), netron_constant_name)
def _node_graph_elements(options: NetronOptions, name_overrides: Dict[NodeName, str],
node: AwesomeNode, inputs: Dict[NodeName, Tuple[NNodeName, AwesomeNode]]) \
-> Tuple[NNodeName, NLayer, NLayerWeights]:
"""
Format information about an AwesomeNode to put in the Netron format for display.
:param options: Customization of Netron output
:param name_overrides: Node names to display instead of the internal node name
:param node: Node to produce graph elements from
:param inputs: Node's inputs. For each input, the name used in Netron and the node are given.
:return: Graph elements to include in output
"""
assert not node_is_constant(node) # Constant nodes should not be inserted into graph
# Placeholder nodes have an input that does not refer to a real node; ignore the input.
if node_is_placeholder(node):
inputs = {}
if node_is_placeholder(node):
class_name = "InputLayer"
elif node_is_external(node):
class_name = node.name
elif node_is_sima_ir(node) or node_is_awesomenet(node):
class_name = _replace_node_name_with_operation_name(node)
else:
assert isinstance(node.ir, BackendIR)
class_name = "CompiledProcedure"
if node.name in name_overrides:
node_name = NNodeName(name_overrides[node.name])
else:
node_name = NNodeName(node.name)
# Constant nodes are excluded from display
inbound_nodes = [NInboundNode(name, i)
for i, (name, node) in enumerate(inputs.values()) if not node_is_constant(node)]
if node_is_sima_ir(node):
attributes = _get_awesomenode_types_and_attrs_dict(node, node_name)
else:
attributes = {}
# Handle the AwesomeNode with conv2d/dense/conv2d_transpose related composite SiMaIR
constant_arrays = []
if node_is_sima_ir(node) and node.ir.attrs is not None:
node_attrs_dict = node.ir.attrs.__dict__
for name in [WEIGHT_ATTRS_NAME, BIAS_ATTRS_NAME]:
if name in node_attrs_dict.keys() and node_attrs_dict[name] is not None:
attrs = node_attrs_dict[name]
assert isinstance(attrs, (ConstantAttrs, ConstantQuantAttrs))
constant_arrays.append(_constant_array(attrs, node_name, name))
layer = NLayer(class_name, node_name, inbound_nodes, attributes)
layer_weights = NLayerWeights(constant_arrays) if constant_arrays else None
return node_name, layer, layer_weights
class _NetronDictBuilder:
"""
Holds layers and constants that are created for a Netron file.
After all layers and constants are collected, they will be written to two different sections in the file.
"""
def __init__(self):
self._layers = []
self._constants = []
def add_layer(self, layer: NLayer, constants: Optional[NLayerWeights]) -> None:
self._layers.append(layer)
if constants is not None:
self._constants.append(constants)
def finish(self) -> Tuple[List[NLayer], List[NLayerWeights]]:
return self._layers, self._constants
@dataclass
class _BuildGraphData:
"""
Data that is global while processing the nodes in one model graph.
"""
# Global netron output options
options: NetronOptions
# Names to display for nodes.
# If name_overrides[nid] = name, the name of node nid will be displayed as name.
name_overrides: Dict[NodeName, str]
# Collects output data structures
builder: _NetronDictBuilder
def _build_graph_node(build_data: _BuildGraphData,
node: AwesomeNode, inputs: Dict[NodeName, Tuple[NNodeName, AwesomeNode]]) \
-> Tuple[NNodeName, AwesomeNode]:
"""
Build Netron graph elements for one graph node.
:param build_data: Global data for the processing of the model graph
:param node: Node to transform into graph elements
:param inputs: Properties of node's inputs for graph building
:return: Properties of node's output for graph building
"""
if isinstance(node.ir, AwesomeNet):
# Embed the subgraph into the graph
return _build_awesomenet_graph_nodes(build_data, node.ir, inputs)
elif node_is_constant(node):
# Do not show constant nodes in the graph
return NNodeName(node.name), node
else:
nnode_name, layer, layer_weights = _node_graph_elements(build_data.options, build_data.name_overrides,
node, inputs)
build_data.builder.add_layer(layer, layer_weights)
return nnode_name, node
def _build_awesomenet_graph_nodes(build_data: _BuildGraphData, net: AwesomeNet,
net_inputs: Optional[Dict[NodeName, Tuple[NNodeName, AwesomeNode]]]) \
-> Tuple[NNodeName, AwesomeNode]:
"""
Build Netron graph elements from a network.
:param build_data: Global data for the processing of the model graph
:param net: Net to transform into graph elements. The net and any subnetworks must have
had their topological order computed.
:param net_inputs: Properties of the net's input for graph building
:return: Properties of the net's output for graph building
"""
# Use the provided inputs in place of placeholder nodes
assert list(net_inputs.keys()) == net.input_node_names
node_values = net_inputs.copy()
# Handle nodes other than placeholders
for node_name in net.execution_order:
if node_name in net.input_node_names:
continue
node = net.nodes[node_name]
node_inputs = {param: node_values[arg] for param, arg in zip(node.input_names, node.input_node_names)}
node_values[node_name] = _build_graph_node(build_data, node, node_inputs)
return node_values[net.output_node_name]
[docs]
def create_netron_graph(net: AwesomeNet, model_name: str, options: NetronOptions = NetronOptions()) -> Any:
"""
Produce a Netron-readable representation of a network. The return value is a
JSON-representable Python object, which can be read in Netron as a Keras JSON file.
:param net: Network to convert to JSON
:param model_name: Name of the model
:param options: Representation options
:return: JSON-representable Python object
"""
# If the network's output is labeled, use that label as the name
name_overrides : Dict[NodeName, str] = {}
if net.output_labels is not None:
if len(net.output_labels) == 1:
output_label = net.output_labels[0]
else:
output_label = "(" + ", ".join(net.output_labels) + ")"
name_overrides[net.output_node_name] = output_label
builder = _NetronDictBuilder()
build_data = _BuildGraphData(options, name_overrides, builder)
# Create placeholder nodes to represent the inputs
input_layers = []
node_values: Dict[NodeName, Tuple[NNodeName, AwesomeNode]] = {}
for idx, node_name in enumerate(net.input_node_names):
node = net.nodes[node_name]
nnode_name, node = _build_graph_node(build_data, node, {})
input_layers.append(NInterfaceLayer(nnode_name, idx))
node_values[node_name] = (nnode_name, node)
# Build the graph
output_name, _ = _build_awesomenet_graph_nodes(build_data, net, node_values)
layers, layer_weights = builder.finish()
output_layers = [NInterfaceLayer(output_name, 0)]
# Convert data structure to JSON
return {
"modelTopology": {
"model_config": {
"class_name": "Model",
"config": {
"layers": [l.to_json() for l in layers],
"input_layers": [l.to_json() for l in input_layers],
"name": model_name,
"output_layers": [l.to_json() for l in output_layers]
}
}
},
"weightsManifest": [w.to_json() for w in layer_weights]
}