Source code for main

#########################################################
# Copyright (C) 2024-25 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################

import numpy as np
import sima
import yaml
from pathlib import Path
import cv2
from helper import YoloHelpers, merge_levels, effad_postproc, load_mean_std, postprocess
from constants import Constants


with open("project.yaml", "r") as file:
[docs] external_params = yaml.safe_load(file)
# Get reader
[docs] reader = sima.VideoReader( external_params["source"], frame_width=Constants.frame_width, frame_height=Constants.frame_height)
reader.set_loop(True)
[docs] writer = sima.VideoWriter(external_params["source"], external_params["udp_host"], external_params["port"], reader.frame_width, reader.frame_height)
# Configure YOLO MLSoc Session
[docs] yolo_model_params = external_params["Models"][0]
[docs] yolo_session = sima.MLSoCSession(yolo_model_params["targz"], pipeline=external_params["pipeline"]["name"], session_name=yolo_model_params["name"], frame_width=640, frame_height=640)
yolo_session.configure(yolo_model_params) # yolo_session.set_log_level(sima.LogLevel.DEBUG) # Configure Teacher MLSoc Session
[docs] teacher_model_params = external_params["Models"][1]
[docs] teacher_session = sima.MLSoCSession(teacher_model_params["targz"], pipeline=external_params["pipeline"]["name"], session_name=teacher_model_params["name"], frame_width=256, frame_height=256)
teacher_session.configure(teacher_model_params) # Configure Student MLSoc Session
[docs] student_model_params = external_params["Models"][2]
[docs] student_session = sima.MLSoCSession(student_model_params["targz"], pipeline=external_params["pipeline"]["name"], session_name=student_model_params["name"], frame_width=256, frame_height=256)
student_session.configure(student_model_params) # Configure Autoencoder MLSoc Session
[docs] autoencoder_model_params = external_params["Models"][3]
[docs] autoencoder_session = sima.MLSoCSession( autoencoder_model_params["targz"], pipeline=external_params["pipeline"]["name"], session_name=autoencoder_model_params["name"], frame_width=256, frame_height=256)
autoencoder_session.configure(autoencoder_model_params) # tvm_obj = spy.apu_helper(SO_PATH, '') # Other constants teacher_mean, teacher_std = load_mean_std(Path("mean_std.json")) q_st_start, q_st_end, q_ae_start, q_ae_end = Constants.q_st_start, Constants.q_st_end, Constants.q_ae_start, Constants.q_ae_end # Pipeline execution while reader.isOpened(): ret, frame = reader.read() print(f"Processing frame--> {reader.frame_num}", end="\r")
[docs] resized = cv2.resize(frame, (Constants.resize_width, Constants.resize_height))
yolo_out = yolo_session.run_model(resized) yolo_out = [item.transpose(0, 3, 1, 2) for item in yolo_out] model_output = np.concatenate( [merge_levels(yolo_out[3 * i: 3 * (i + 1)]) for i in range(3)], 1), yolo_out[9] boxes, scores, class_ids, mask_maps = postprocess(model_output, Constants.frame_height, Constants.frame_width) seg_seat, cropped_mask, (x_min, y_min), (x_max, y_max) = YoloHelpers.bridge(frame, mask_maps[0]) assert len(mask_maps) == 1, f'Too many masks {len(mask_maps)}' yolo_result = { "seg_seat": seg_seat, "class_idx": class_ids[0], "mask": mask_maps[0], "cropped_mask": cropped_mask, "x_y_min": (x_min, y_min), "x_y_max": (x_max, y_max)} # SimaEfficientAdRunner tensor = cv2.resize(yolo_result['seg_seat'], tuple(Constants.effdet_tensor_dim)) teacher_out = teacher_session.run_model(tensor)[0].transpose(0, 3, 1, 2) student_out = student_session.run_model(tensor)[0].transpose(0, 3, 1, 2) autoencoder_out = autoencoder_session.run_model(tensor)[0] resized_array = cv2.resize( autoencoder_out[0], (56, 56), interpolation=cv2.INTER_LINEAR) resized_array = np.expand_dims(resized_array, axis=0) autoencoder_out = resized_array.transpose(0, 3, 1, 2) teacher_out = (teacher_out - teacher_mean) / teacher_std map_st = np.mean( (teacher_out - student_out[:, : Constants.out_channels]) ** 2, axis=1, keepdims=True) map_ae = np.mean( (autoencoder_out - student_out[:, Constants.out_channels:]) ** 2, axis=1, keepdims=True) if q_st_start is not None: map_st = 0.1 * (map_st - q_st_start) / (q_st_end - q_st_start) if q_ae_start is not None: map_ae = 0.1 * (map_ae - q_ae_start) / (q_ae_end - q_ae_start) map_combined = 0.5 * map_st + 0.5 * map_ae results = effad_postproc(yolo_result, map_combined, Constants.cls_threshold) resized = cv2.resize(results["pred"] * 255, (Constants.output_width, Constants.output_height)).astype(np.uint8) bgr_image = cv2.cvtColor(resized.astype(np.uint8), cv2.COLOR_GRAY2BGR) nv12 = sima.cvtColor(bgr_image, Constants.output_width, Constants.output_height, sima.COLOR_BGRTONV12) writer.write(nv12)