Source code for afe.ir.serializer.visualizer

#########################################################
# Copyright (C) 2022 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Christopher Rodrigues
#########################################################
"""AwesomeNode to Netron conversion."""
import json
from dataclasses import dataclass
from typing import Dict, Any, Optional, Tuple, Union, List

import numpy as np

from afe.backends import BackendIR
from afe.common_utils import get_index_from_node_name
from afe.ir.attributes import AwesomeAttributes, AwesomeQuantAttrBase, ConstantAttrs, ConstantQuantAttrs
from afe.ir.defines import NodeName, Status, get_expected_tensor_value, data_value_elements
from afe.ir.net import AwesomeNet
from afe.ir.node import AwesomeNode, node_is_sima_ir, node_is_placeholder, node_is_external, node_is_constant, \
    node_is_awesomenet
from afe.ir.serializer.visualizer_defines import NNodeName, NArray, NLayerWeights, NInboundNode, \
    NLayer, NInterfaceLayer

# Attributes names

[docs] WEIGHT_ATTRS_NAME = 'weights_attrs'
[docs] BIAS_ATTRS_NAME = 'bias_attrs'
[docs] netron_name_map = { WEIGHT_ATTRS_NAME: 'weights', BIAS_ATTRS_NAME: 'bias', }
@dataclass(frozen=True)
[docs] class NetronOptions: """Options controlling how to produce a Netron graph from an AwesomeNet.""" pass
def _get_awesomenode_types_and_attrs_dict(node: AwesomeNode, netron_name: "NNodeName" ) -> Dict[str, Any]: """ Get AwesomeNode input types, output types and attributes in string format. Return them in a dictionary. :param node: AwesomeNode :param netron_name: Name used in Netron file for the node :return: Dict[str, Any] """ # Only serialize AwesomeNode containing SiMaIR assert node_is_sima_ir(node), f"Expect node contains a SiMaIR, got {type(node.ir)} instead" node_info_dict = dict() # AwesomeAttributes if node.ir.attrs is not None: node_info_dict.update(node.ir.attrs.__dict__) # Remove this field because the data may be very large, which causes a problem when loading from file if 'irmod_str' in node_info_dict: del node_info_dict['irmod_str'] # Node Status node_info_dict.update({"_status": node.status}) # Layer Statistics if node.layer_stats is not None: node_info_dict.update({"Layer Statistics": node._layer_stats}) # Show quant_attrs if the node is quantized if node.status == Status.SIMA_QUANTIZED and node.ir.quant_attrs is not None: node_info_dict.update(node.ir.quant_attrs.__dict__) # Input types # Only get input types for non-placeholder AwesomeNode node_type = node.get_type() if not node_is_placeholder(node): input_types = [data_value_elements(t) for t in node_type.inputs.values()] node_info_dict.update({"_input_types": input_types}) # Output types node_info_dict.update({"_output_types": data_value_elements(node_type.output)}) # Metadata node_info_dict.update({"_metadata": json.dumps(node.ir._metadata)}) config_dict = dict() if node_is_placeholder(node): # Displays input shapes in Netron config_dict["batch_input_shape"] = list(get_expected_tensor_value(node_type.output).shape) for key, value in node_info_dict.items(): if isinstance(value, (AwesomeAttributes, AwesomeQuantAttrBase)): sub_attr_dict = value.__dict__ for k, v in sub_attr_dict.items(): config_dict[key + "." + k] = v else: config_dict[key] = value # Name used for identifying node in Netron config_dict["name"] = netron_name return config_dict def _replace_node_name_with_operation_name(node: AwesomeNode) -> str: """ Replace the Relay expression name in the node_name with its corresponding AwesomeOperation name. Keep the prefix and the index Example ------- MLA_1/conv2d_10 -> MLA_1/Conv2DOp_10 where * prefix = 'MLA_1/' * index = 10 * Relay expression name = 'conv2d' * AwesomeOperation name = "Conv2DOp """ node_name = node.name # Get the prefix. The prefix is section before the last "/" prefix = node_name[:node_name.rfind('/') + 1] # Get the index number from the node name index = get_index_from_node_name(node_name) new_node_name = type(node.ir.operation).__name__[:-2] + f"_{index}" # If prefix exists, put it before the new node name if prefix != "": new_node_name = prefix + new_node_name return new_node_name def _constant_array(attrs: Union[ConstantAttrs, ConstantQuantAttrs], node_name: NNodeName, const_name: str) -> NArray: """ Create an NArray to represent a constant array parameter of a node. :param attrs: Constant array as an attributes object. :param node_name: Name of the node in the Netron file. :param const_name: Name of the node's attribute that contains the constant array. :return: Array to use in Netron file. """ data = attrs.quant_data if isinstance(attrs, ConstantQuantAttrs) else attrs.data assert isinstance(data, np.ndarray), "Error: Constant node data has not been initialized" netron_constant_name = node_name + "/" + netron_name_map[const_name] return NArray(str(data.dtype), list(data.shape), netron_constant_name) def _node_graph_elements(options: NetronOptions, name_overrides: Dict[NodeName, str], node: AwesomeNode, inputs: Dict[NodeName, Tuple[NNodeName, AwesomeNode]]) \ -> Tuple[NNodeName, NLayer, NLayerWeights]: """ Format information about an AwesomeNode to put in the Netron format for display. :param options: Customization of Netron output :param name_overrides: Node names to display instead of the internal node name :param node: Node to produce graph elements from :param inputs: Node's inputs. For each input, the name used in Netron and the node are given. :return: Graph elements to include in output """ assert not node_is_constant(node) # Constant nodes should not be inserted into graph # Placeholder nodes have an input that does not refer to a real node; ignore the input. if node_is_placeholder(node): inputs = {} if node_is_placeholder(node): class_name = "InputLayer" elif node_is_external(node): class_name = node.name elif node_is_sima_ir(node) or node_is_awesomenet(node): class_name = _replace_node_name_with_operation_name(node) else: assert isinstance(node.ir, BackendIR) class_name = "CompiledProcedure" if node.name in name_overrides: node_name = NNodeName(name_overrides[node.name]) else: node_name = NNodeName(node.name) # Constant nodes are excluded from display inbound_nodes = [NInboundNode(name, i) for i, (name, node) in enumerate(inputs.values()) if not node_is_constant(node)] if node_is_sima_ir(node): attributes = _get_awesomenode_types_and_attrs_dict(node, node_name) else: attributes = {} # Handle the AwesomeNode with conv2d/dense/conv2d_transpose related composite SiMaIR constant_arrays = [] if node_is_sima_ir(node) and node.ir.attrs is not None: node_attrs_dict = node.ir.attrs.__dict__ for name in [WEIGHT_ATTRS_NAME, BIAS_ATTRS_NAME]: if name in node_attrs_dict.keys() and node_attrs_dict[name] is not None: attrs = node_attrs_dict[name] assert isinstance(attrs, (ConstantAttrs, ConstantQuantAttrs)) constant_arrays.append(_constant_array(attrs, node_name, name)) layer = NLayer(class_name, node_name, inbound_nodes, attributes) layer_weights = NLayerWeights(constant_arrays) if constant_arrays else None return node_name, layer, layer_weights class _NetronDictBuilder: """ Holds layers and constants that are created for a Netron file. After all layers and constants are collected, they will be written to two different sections in the file. """ def __init__(self): self._layers = [] self._constants = [] def add_layer(self, layer: NLayer, constants: Optional[NLayerWeights]) -> None: self._layers.append(layer) if constants is not None: self._constants.append(constants) def finish(self) -> Tuple[List[NLayer], List[NLayerWeights]]: return self._layers, self._constants @dataclass class _BuildGraphData: """ Data that is global while processing the nodes in one model graph. """ # Global netron output options options: NetronOptions # Names to display for nodes. # If name_overrides[nid] = name, the name of node nid will be displayed as name. name_overrides: Dict[NodeName, str] # Collects output data structures builder: _NetronDictBuilder def _build_graph_node(build_data: _BuildGraphData, node: AwesomeNode, inputs: Dict[NodeName, Tuple[NNodeName, AwesomeNode]]) \ -> Tuple[NNodeName, AwesomeNode]: """ Build Netron graph elements for one graph node. :param build_data: Global data for the processing of the model graph :param node: Node to transform into graph elements :param inputs: Properties of node's inputs for graph building :return: Properties of node's output for graph building """ if isinstance(node.ir, AwesomeNet): # Embed the subgraph into the graph return _build_awesomenet_graph_nodes(build_data, node.ir, inputs) elif node_is_constant(node): # Do not show constant nodes in the graph return NNodeName(node.name), node else: nnode_name, layer, layer_weights = _node_graph_elements(build_data.options, build_data.name_overrides, node, inputs) build_data.builder.add_layer(layer, layer_weights) return nnode_name, node def _build_awesomenet_graph_nodes(build_data: _BuildGraphData, net: AwesomeNet, net_inputs: Optional[Dict[NodeName, Tuple[NNodeName, AwesomeNode]]]) \ -> Tuple[NNodeName, AwesomeNode]: """ Build Netron graph elements from a network. :param build_data: Global data for the processing of the model graph :param net: Net to transform into graph elements. The net and any subnetworks must have had their topological order computed. :param net_inputs: Properties of the net's input for graph building :return: Properties of the net's output for graph building """ # Use the provided inputs in place of placeholder nodes assert list(net_inputs.keys()) == net.input_node_names node_values = net_inputs.copy() # Handle nodes other than placeholders for node_name in net.execution_order: if node_name in net.input_node_names: continue node = net.nodes[node_name] node_inputs = {param: node_values[arg] for param, arg in zip(node.input_names, node.input_node_names)} node_values[node_name] = _build_graph_node(build_data, node, node_inputs) return node_values[net.output_node_name]
[docs] def create_netron_graph(net: AwesomeNet, model_name: str, options: NetronOptions = NetronOptions()) -> Any: """ Produce a Netron-readable representation of a network. The return value is a JSON-representable Python object, which can be read in Netron as a Keras JSON file. :param net: Network to convert to JSON :param model_name: Name of the model :param options: Representation options :return: JSON-representable Python object """ # If the network's output is labeled, use that label as the name name_overrides : Dict[NodeName, str] = {} if net.output_labels is not None: if len(net.output_labels) == 1: output_label = net.output_labels[0] else: output_label = "(" + ", ".join(net.output_labels) + ")" name_overrides[net.output_node_name] = output_label builder = _NetronDictBuilder() build_data = _BuildGraphData(options, name_overrides, builder) # Create placeholder nodes to represent the inputs input_layers = [] node_values: Dict[NodeName, Tuple[NNodeName, AwesomeNode]] = {} for idx, node_name in enumerate(net.input_node_names): node = net.nodes[node_name] nnode_name, node = _build_graph_node(build_data, node, {}) input_layers.append(NInterfaceLayer(nnode_name, idx)) node_values[node_name] = (nnode_name, node) # Build the graph output_name, _ = _build_awesomenet_graph_nodes(build_data, net, node_values) layers, layer_weights = builder.finish() output_layers = [NInterfaceLayer(output_name, 0)] # Convert data structure to JSON return { "modelTopology": { "model_config": { "class_name": "Model", "config": { "layers": [l.to_json() for l in layers], "input_layers": [l.to_json() for l in input_layers], "name": model_name, "output_layers": [l.to_json() for l in output_layers] } } }, "weightsManifest": [w.to_json() for w in layer_weights] }