Source code for afe.ir.net

#########################################################
# Copyright (C) 2020 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
# Code owner: Joey Chou
#########################################################
from copy import deepcopy
from dataclasses import dataclass
from typing import Any, Callable, Iterator, Union
import pathvalidate

import numpy as np
from afe.apis.defines import gen1_target

from sima_utils.logging import sima_logger
from afe.backends import BackendIR, Backend
from afe.ir.attributes import get_data_value_quant_result_scale_with_dummy
from afe.ir.defines import Status, NodeName, InputName, data_value_elements
from afe.ir.node import AwesomeNode, node_is_awesomenet, node_is_external, get_node_inputs, node_is_placeholder
from afe.ir.quantization_utils import dequantize_tensor, quantization_data_value_to_output_list
from afe.ir.sima_ir import SiMaIR, SiMaIRTensorTypes
from afe.ir.tensor_type import scalar_is_integral
from afe.ir.utils import afe_warn, get_input_dict_batch_size
from sima_utils.common import Platform
[docs] NodeCallable = Callable[[AwesomeNode, dict[InputName, Any], dict[NodeName, SiMaIRTensorTypes]], None]
[docs] class AwesomeNet: """ This class is in charge of managing and executing a network contained which consists of a collection of AwesomeNodes """ def __init__(self, name: str, nodes: dict[NodeName, AwesomeNode], input_node_names: list[NodeName], output_node_name: NodeName, _status: Status = Status.RELAY, _execution_order: list[NodeName] | None = None, _prune_dict: dict[NodeName, list[NodeName]] | None = None, _float_node_list: list[NodeName] | None = None, _is_subgraph: bool = False, _backend: Backend = Backend.NONE, _target: Platform = gen1_target, _output_labels: list[str] | None = None, _model_path: str | None = None, _fp_input_range: dict[NodeName, list[float]] | None = None, ) -> None: """ Args: nodes: Dictionary of node names to the AwesomeNodes that make up the network input_node_names: List of name of the network's input nodes output_node_name: Name of the network's output node _status: Default is Status.RELAY. The status of the AwesomeNet _execution_order: The order the network nodes should be executed. Nodes sorted topologically. _prune_dict: Dictionary of consumer node names to lists of producer node names. When we execute the network up to a consumer node then the producer nodes linked to this node can have their data deleted. _float_node_list: List of node names. The listed nodes are fixed in float mode. The nodes will still be calibrated so the downstream nodes can have its zero point and scale. But it won't get quantized. The list can only contain names of nodes in self.nodes, not in subgraphs. _is_subgraph: bool. Default is False. It True, it menas the AwesomeNet is a sub-graph of another AwesomeNet _backend: If this net is a subgraph that is assigned to a backend, this is the backend. Otherwise, this is Backend.NONE. Only the top-level net assigned to a backend uses this field; any nodes nested within it use Backend.NONE. _target: A target platform that a model is compiled for. _output_labels: The names of the network's outputs in the original model. These are used when displaying the model to users. _model_path: Original model path. _fp_input_range: Input range of floating point tensors. """ # Check the model name because it may be used to make file names try: pathvalidate.validate_filename(name) except pathvalidate.ValidationError: raise sima_logger.UserFacingException("Model name {} is not usable as a filename.".format(repr(name)))
[docs] self.name = name
[docs] self.nodes = nodes
[docs] self.input_node_names = input_node_names
[docs] self.output_node_name = output_node_name
self._output_labels = _output_labels self._model_path = _model_path self._status = _status self._execution_order = _execution_order if _execution_order is not None else [] self._prune_dict = _prune_dict if _prune_dict is not None else {} # The node will not be quantized if the node name is in the list. The node # will still be calibrated to have zero point and scale for the downstream nodes. # But the node will only execute in float mode. self._float_node_list = _float_node_list if _float_node_list is not None else [] # If True, it menas the AwesomeNet is a sub-graph of another AwesomeNet self._is_subgraph = _is_subgraph self._backend = _backend self._batch_size = 1 self._target = _target self._fp_input_range = _fp_input_range # Do topological sort to have a correct node execution order self.topological_sort() def _prune_unneeded_activations_from_memory(self, node_outputs_dict: dict[NodeName, Any], consumer_name: NodeName, protected_names: list[NodeName] = None) -> None: """ Prunes producer node data from the data dict that will not be accessed after executing the consumer node. Args: node_outputs_dict: dictionary of node names to node output consumer_name: name of consumer that has been executed protected_names: list of node names who's data should not be deleted """ protected_names = [] if protected_names is None else protected_names if consumer_name in self._prune_dict: for producer_name in self._prune_dict[consumer_name]: if producer_name not in protected_names: del node_outputs_dict[producer_name] @property
[docs] def status(self) -> Status: return self._status
@property
[docs] def backend(self) -> Backend: return self._backend
@status.setter def status(self, _: Status): afe_warn("User is not allowed to change the AwesomeNet status. " "If you have to change the status, please use " "afe.ir.net.update_awesomenet_status function") @property
[docs] def sub_graph_names(self) -> list[NodeName]: """ Return list of sub-graph names :return: List[NodeName] """ _sub_graph_names: list[NodeName] = [] for node_name, node in self.nodes.items(): if node_is_awesomenet(node) or node_is_external(node): _sub_graph_names.append(node_name) return _sub_graph_names
[docs] def get_sub_graph(self, sub_graph_name: NodeName | str) -> Union["AwesomeNet", SiMaIR, None]: """ Given a sub-graph name, return the sub-graph object if the sub-graph name is in the AwesomeNet and the node is a sub-graph. Else return None Args: sub_graph_name: str Returns: Optional[Union[AwesomeNet, SiMaIR]] """ sub_graph_name = NodeName(sub_graph_name) if sub_graph_name not in self.nodes.keys(): afe_warn(f"Can't find '{sub_graph_name}' sub-graph in the AwesomeNet") elif sub_graph_name not in self.sub_graph_names: afe_warn(f"'{sub_graph_name}' is not a sub-graph") else: return self.nodes[sub_graph_name].ir return None
[docs] def topological_sort(self) -> None: """ self._execution_order: Sort the nodes topologically and store the order of NodeName in self._execution_order Set the self._prune_dict with new topologically sorted nodes at the end self._prune_dict: Fills out self._prune_dict which is a dictonary names of consumer nodes to lists of producer node names. The consumer node is the last node to access the producer nodes. We find these producers and consumers by mimicking network execution order, performing a depth first search on the network starting from the network's output node. """ topologically_sorted_nodes: list[NodeName] = list() # A map that links nodes that producer data to the last consumer node that uses their data during inference. producers_to_consumers: dict[NodeName, NodeName] = dict() visited_nodes: list[NodeName] = list() nodes_to_visit: list[NodeName] = list() nodes_to_visit.append(self.output_node_name) while len(nodes_to_visit) != 0: consumer = self.nodes[nodes_to_visit[-1]] visited_nodes.append(consumer.name) unvisited_producers = [producer for producer in consumer.input_node_names if producer not in visited_nodes] if len(unvisited_producers) == 0: topologically_sorted_nodes.append(consumer.name) nodes_to_visit.pop() for producer in consumer.input_node_names: producers_to_consumers[producer] = consumer.name else: nodes_to_visit.append(unvisited_producers[0]) prune_dict: dict[NodeName, list[NodeName]] = {} for producer, consumer in producers_to_consumers.items(): if consumer in prune_dict: prune_dict[consumer].append(producer) else: prune_dict[consumer] = [producer] self._execution_order = topologically_sorted_nodes.copy() self._prune_dict = prune_dict
[docs] def dequantize_outputs(self, outputs: list[np.ndarray]) -> list[np.ndarray]: # Dequantize the output when the output node are quantized output_node = self.nodes[self.output_node_name] if isinstance(output_node.ir, AwesomeNet): output_node = output_node.ir.nodes[output_node.ir.output_node_name] if isinstance(output_node.ir, SiMaIR): if output_node.status == Status.SIMA_QUANTIZED and _has_integer_output_type(output_node.ir): output_quantization = get_data_value_quant_result_scale_with_dummy(output_node.ir.calib_attrs.quant) scales, zps, _, _, _ = quantization_data_value_to_output_list(output_quantization) return dequantize_tensor(outputs, scales, zps) elif isinstance(output_node.ir, BackendIR): # This should never be needed. Before BackendIR is created, Dequantize # nodes are created wherever dequantization is required. raise RuntimeError("Cannot dequantize the output of BackendIR") return outputs
[docs] def run(self, inputs: dict[NodeName, np.ndarray], node_callable: NodeCallable | None = None, node_outputs: dict[NodeName, Any] | None = None, keep_intermediate_results: bool = False) -> list[Any]: """ Runs the entire network after inserting the network inputs into placeholder nodes. Execution of the network happens in the order that is preserved in _execution_order attribute. Node is executed using node_callable function that user needs to provide. After a node executes, it's output is inserted into internal_node_outputs dictionary so if another node depends on it, it can just reference the data in the dictionary. If data is no longer needed for execution of succeeding nodes, or does not need to be saved, it is pruned from the internal_node_outputs dictionary. Args: inputs: Dictionary of placeholder node names (str) to the input data. node_callable: Function used to execute each node of the AwesomeNet. node_outputs: An optional dictionary used for storing the outputs obtained by AwesomeNet's node execution. Needs to be provided if keep_intermediate_results parameter is True. keep_intermediate_results: Whether to store intermediate results obtained by AwesomeNet's node execution. If set to True, user needs to provide the node_outputs dictionary as an argument. Returns: The result of executing the output node. If requested, may return additional intermediate results inside node_outputs dictionary. """ if not self._is_subgraph: inputs = deepcopy(inputs) if keep_intermediate_results: assert isinstance(node_outputs, dict), \ ("Please assign a dictionary to the node_output input argument if keep_intermediate_results is True" f" Got {type(node_outputs)}") internal_node_outputs: dict[NodeName, Any] = dict(inputs) # Run through the nodes in the topological order determined by the graph data manager for node_name in self._execution_order: node = self.nodes[node_name] node_inputs = get_node_inputs(node, internal_node_outputs) if node_is_awesomenet(node): # Execute the sub-graph AwesomeNet and store the result in the shared dictionary sub_net = node.ir assert isinstance(sub_net, AwesomeNet) internal_node_outputs[node_name] = sub_net.run(node_inputs, node_callable=node_callable, node_outputs=node_outputs, keep_intermediate_results=keep_intermediate_results) else: # Execute the node and store the result in the shared dictionary assert node_callable is not None node_callable(node, node_inputs, internal_node_outputs) # Remove data in the internaly computed dictionary that is no longer needed. Data are not pruned iff all # intermediate results are kept. Otherwise, all data except model outputs and intermediate results that # need to be dumped, are pruned. if keep_intermediate_results: protected_names = [self.output_node_name] + list(node_outputs.keys()) self._prune_unneeded_activations_from_memory( internal_node_outputs, node_name, protected_names=protected_names) else: self._prune_unneeded_activations_from_memory( internal_node_outputs, node_name, protected_names=[self.output_node_name]) # Get the final outputs outputs = internal_node_outputs[self.output_node_name] # Iff keep_all_outputs is False, some of node_outputs are already populated. Otherwise, update them from # internal_node_outputs dictionary. if keep_intermediate_results: node_outputs.update({name: internal_node_outputs[name] for name in internal_node_outputs \ if name in node_outputs}) # Check if some of the requested nodes are not computed if self._is_subgraph and node_outputs is not None and \ any(layer_output is None for layer_output in node_outputs.values()): sima_logger.sima_log_warning("Some of provided names are not valid quantized model layer output names") if not self._is_subgraph: # Make sure the outputs is a list of np.ndarray outputs = list(outputs) if isinstance(outputs, tuple) else [outputs] return outputs
[docs] def run_batch(self, inputs: dict[NodeName, np.ndarray], node_callable: NodeCallable | None = None, node_outputs: dict[NodeName, Any] | None = None, keep_intermediate_results: bool = False) -> list[Any]: """ Runs the entire network taking into account the batch size of the inputs. Inputs' batch size is obtained from the inputs dictionary, asserting that all inputs have matching batch size. Since AwesomeNet obtained by translation from any framework uses batch size equal to 1, network is copied and its copy is modified in order to take into account the batch size of the inputs. Execution of the modified network is performed using standard run() method. Args: inputs: Dictionary of placeholder node names (str) to the input data. node_callable: A function used to execute nodes of the network. node_outputs: An optional dictionary used for storing the outputs obtained by AwesomeNet's node execution. Needs to be provided if keep_intermediate_results parameter is set to True. keep_intermediate_results: Whether to store results obtained by AwesomeNet's node execution. If set to True, user needs to provide the node_outputs dictionary as an argument. Returns: The result of executing the output node. """ batch_net = deepcopy(self) batch_size = get_input_dict_batch_size(inputs) if (batch_size != batch_net.get_batch_size()): batch_net.set_batch_size(batch_size) return batch_net.run(inputs, node_callable, node_outputs=node_outputs, keep_intermediate_results=keep_intermediate_results)
[docs] def set_batch_size(self, batch_size: int): """ Modifies AwesomeNet's internal parameters to accommodate for a given batch size. When creating an AwesomeNet, it's input's batch size should be equal to 1 in order to enable supporting different batch sizes while executing and compiling a model. The process of setting a batch size to a value different from 1 is irreversible, so user needs to preserve original AwesomeNet in order to execute APIs not supporting the variable batch size (i.e. calibration / quantization). Args: batch_size: Integer value representing the batch size of the inputs to the AwesomeNet. """ if batch_size == self._batch_size: return for node_name in self.execution_order: node = self.nodes[node_name] node.set_batch_size(batch_size) self._batch_size = batch_size
[docs] def get_batch_size(self) -> int: """ Returns batch size currently set """ return self._batch_size
@property
[docs] def execution_order(self): return self._execution_order
@property
[docs] def float_node_list(self): return self._float_node_list
@float_node_list.setter def float_node_list(self, node_list: list[str]): self._float_node_list = node_list
[docs] def extend_float_node_list(self, node_list: list[str]): self._float_node_list.extend(node_list)
@property
[docs] def fp_input_range(self): return self._fp_input_range
@fp_input_range.setter def fp_input_range(self, input_range: dict[NodeName, list[float]]): self._fp_input_range = input_range @property
[docs] def output_labels(self): return self._output_labels
@property
[docs] def model_path(self): return self._model_path
[docs] def has_mla_nodes(self): """ Check if net has MLA nodes. """ for node in self.nodes.values(): if node.ir.backend == Backend.MLA: return True return False
@property
[docs] def target(self): return self._target
[docs] def iter_nodes_recursive(self) -> Iterator[AwesomeNode]: """ Get an iterator over all nodes in the net, including nodes in subgraphs. Callers should not rely on the order of the iterator's contents. """ for node in self.nodes.values(): yield node if node_is_awesomenet(node): yield from node.ir.iter_nodes_recursive()
def _has_integer_output_type(ir: SiMaIR) -> bool: """ Return true if all the SiMaIR's output types are integer. This is used to detect quantized nodes. It is only reliable for networks that were quantized by AFE. Args: ir: Node to check Returns: Whether its output types are all integer """ return all(scalar_is_integral(t.scalar) for t in data_value_elements(ir.get_type().output)) @dataclass
[docs] class Renaming: """ A renaming on graph nodes. Each entry self.replacements[a] == b means that this renaming replaces references to node name a by references to node name b. Nodes that are not in self.replacements.keys() are not renamed. """
[docs] replacements: dict[NodeName, NodeName]
[docs] def rename_node_name(r: Renaming, n: NodeName) -> NodeName: """Rename a single node reference.""" return r.replacements.get(n, n)
[docs] def rename_mut_node(r: Renaming, node: AwesomeNode) -> None: """ Rename all node references inside an AwesomeNode. The AwesomeNode is mutated. Subgraphs are not examined. Subgraphs, if they are present, comprise a different scope where the renaming is not applicable. """ node.input_node_names = [rename_node_name(r, n) for n in node.input_node_names]
[docs] def rename_mut_awesomenet(r: Renaming, net: AwesomeNet) -> None: """ Rename all node references in the body of the AwesomeNet. Node definitions are not renamed. The AwesomeNet is mutated. """ for node in net.nodes.values(): rename_mut_node(r, node) net.input_node_names = [rename_node_name(r, n) for n in net.input_node_names] net.output_node_name = rename_node_name(r, net.output_node_name)
[docs] def rename_awesomenet_nodes(r: Renaming, net: AwesomeNet): """ Rename all nodes inside the AwesomeNet and their references. Used when composing multiple AwesomeNets in order to avoid duplicate names. """ rename_mut_awesomenet(r, net) new_nodes = {rename_node_name(r, n): v for (n, v) in net.nodes.items()} for node in new_nodes.values(): node.name = rename_node_name(r, node.name) net.nodes = new_nodes
[docs] def update_awesomenet_status(net: AwesomeNet, status: Status, force_update_status: bool = False) -> None: """ Update the AwesomeNet's and its sub-graphs' status to the given input status recursively. Args: net: AwesomeNet status: afe.ir.defines.Status """ if status == Status.SIMA_QUANTIZED: assert net.status == Status.CALIBRATED, \ (f"Please make sure you calibrate the AwesomeNet before updating the status to {status}." f" The current AwesomeNet status == {net.status}") if status == Status.BACKEND_IR_LOWERED and net.has_mla_nodes() and not force_update_status: assert net.status == Status.SIMA_QUANTIZED, \ (f"Please make sure AwesomeNet is quantized before updating the status to {status}." f" The current AwesomeNet status == {net.status}") if status == Status.BACKEND_IR_COMPILED: assert net.status == Status.BACKEND_IR_LOWERED, \ (f"Please make sure AwesomeNet is lowered to BackendIR using " f"translate_sub_awesome_net_to_modelgraph function before updating the status to {status}." f" The current AwesomeNet status == {net.status}") # Update sub-graphs' status for node in net.nodes.values(): if node_is_awesomenet(node): update_awesomenet_status(node.ir, status, force_update_status) # Update top-level graph status net._status = status
[docs] def is_one_mla_segment_net(net: AwesomeNet) -> bool: """ Given an AwesomeNet, traverse the nodes and count the number of sub-graphs, also check if some node can be translated to MLA backend IR. Args: net: AwesomeNet Returns: True if AwesomeNet is contained of one segment only and this segment is assigned to MLA. """ num_of_mla_sub_graph = 0 has_mla_node: bool = False for node in net.nodes.values(): if node_is_awesomenet(node): num_of_mla_sub_graph += 1 if node.ir.backend == Backend.MLA: has_mla_node = True return (num_of_mla_sub_graph == 0) and has_mla_node
def _redirect_subgraph_input_nodes(subgraph_node: AwesomeNode): """ Modifies the subgraph to refer to input nodes in the parent graph instead of the subgraph's placeholders. Args: subgraph_node: The subgraph node that is being inlined into the parent AwesomeNet. Returns: None. The subgraph's AwesomeNet is mutated and is in invalid state on return since it may reference nodes outside of the subgraph. """ assert isinstance(subgraph_node.ir, AwesomeNet) # Each parameter will be renamed to the corresponding argument renaming = Renaming(dict(zip(subgraph_node.input_names, subgraph_node.input_node_names))) rename_mut_awesomenet(renaming, subgraph_node.ir) def _redirect_subgraph_output_node(subgraph_node: AwesomeNode, parent_net: AwesomeNet): """ Modifies the parent net to refer to the result of the final node in the subgraph instead of referring to the result of the subgraph itself. Args: subgraph_node: The subgraph node that is being inlined into the parent AwesomeNet. parent_net: The parent AwesomeNet. Returns: None. The subgraph's and the parent AwesomeNet are being mutated and are in invalid state on return since they may reference each other's nodes. """ # In parent_net, uses of subgraph_node will be replaced with uses of subgraph_node's output node renaming = Renaming({subgraph_node.name: subgraph_node.ir.output_node_name}) rename_mut_awesomenet(renaming, parent_net) def _merge_subgraph_nodes_to_parent_net(subgraph_node: AwesomeNode, parent_net: AwesomeNet): """ Merges the AwesomeNodes from subgraph_node AwesomeNet into the parent AwesomeNet. This function should be called after the nodes in the parent net and subgraph are rewritten to directly refer to one another instead of going through the subgraph's placeholders and output. Args: subgraph_node: The subgraph AwesomeNode being inlined into the parent AwesomeNet. parent_net: The top level AwesomeNet. Returns: None. The parent AwesomeNet is mutated to include all the nodes from the subgraph_node's AwesomeNet and the subgraph_node is removed from the parent_net. The parent_net is in invalid state after return since it's execution_order is invalid. Returning AwesomeNet needs to be sorted topologically in order to reach the valid state. """ parent_net.nodes.pop(subgraph_node.name) # Update the nodes dict with non-placeholder nodes belonging to subgraph_node parent_net.nodes.update(dict(filter(lambda n: not node_is_placeholder(n[1]), subgraph_node.ir.nodes.items()))) def _inline_subgraph_node_into_parent_awesomenet(subgraph_node: AwesomeNode, parent_net: AwesomeNet): """ Inlines all the nodes that are encapsulated in a subgraph AwesomeNode into a parent AwesomeNet. Inlining process is divided into three stages, each of which is mutating subgraph AwesomeNet and parent AwesomeNet. The intermediate resulting AwesomeNet is in invalid state, due to remodeling of the connections between the consisting AwesomeNodes. Args: subgraph_node: The subgraph AwesomeNode which consisting AwesomeNodes are being inlined into the top level AwesomeNet. parent_net: Top level AwesomeNet. Returns: None. The input top-level AwesomeNet is mutated so that subgraph_node's AwesomeNodes are placed into it and subgraph_node is removed. """ _redirect_subgraph_input_nodes(subgraph_node) _redirect_subgraph_output_node(subgraph_node, parent_net) _merge_subgraph_nodes_to_parent_net(subgraph_node, parent_net) parent_net.topological_sort()
[docs] def inline_awesomenet_subgraphs(net: AwesomeNet, inline_criteria: Callable[[AwesomeNet], bool]): """ Inlines the nodes that are a part of subgraph AwesomeNet, given that the inline criteria is being met. For all the sub-graphs that pass the inline criteria, the consisting AwesomeNodes are placed in the top-level AwesomeNet. Args: net: Top-level AwesomeNet that may contain several subgraph AwesomeNodes which may be inlined depending on the inline criteria. inline_criteria: The function determining if a subgraph AwesomeNode should get inlined into a top-level AwesomeNet. Returns: None. The input AwesomeNet is mutated if any of the subgraph AwesomeNodes pass the inline criteria. """ # Nodes dict might be mutated if inlining occurs, so iterate over the original node names node_names = list(net.nodes.keys()) for node_name in node_names: node = net.nodes[node_name] if node_is_awesomenet(node) and inline_criteria(node.ir): _inline_subgraph_node_into_parent_awesomenet(node, net)
[docs] def inline_ev_subgraphs(net: AwesomeNet): """ Inlines the sub-graph AwesomeNode consisting of the EV operators into the top-level AwesomeNet. Args: net: The AwesomeNet which is mutated if it contains any EV sub-graphs. """ # Subgraphs that are not assigned to a backend should be inlined def _is_ev_subgraph(subnet: AwesomeNet) -> bool: return subnet.backend == Backend.NONE inline_awesomenet_subgraphs(net, _is_ev_subgraph)
[docs] def dispatch_backend(backend_callables: list[tuple[Callable[[AwesomeNode], bool], NodeCallable]]) -> NodeCallable: """ Process a node by dispatching to one of several functions depending on the node's properties. This is intended to be used as the node_callable parameter of AwesomeNet.run. Calling dispatch_backend(xs)(node, i, o) searches for and executes a matching callable. It finds the first item (f, g) in xs such that f(node) returns True, then executes g(node, i, o). It raises an exception if nothing matches. Args: backend_callables: A list of predicates and callables. Returns: Callable that dispatches to the given callables. """ def do_dispatch(node: AwesomeNode, inputs: dict[InputName, SiMaIRTensorTypes], node_outputs: dict[NodeName, np.ndarray | tuple[np.ndarray, ...]]): for match, act in backend_callables: if match(node): return act(node, inputs, node_outputs) raise TypeError("No function was provided for handling the given node") return do_dispatch