#########################################################
# Copyright (C) 2024-25 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
import numpy as np
import sima
import yaml
from pathlib import Path
import cv2
from helper import YoloHelpers, merge_levels, effad_postproc, load_mean_std, postprocess
from constants import Constants
with open("project.yaml", "r") as file:
[docs]
external_params = yaml.safe_load(file)
# Get reader
[docs]
reader = sima.VideoReader(
external_params["source"], frame_width=Constants.frame_width, frame_height=Constants.frame_height)
reader.set_loop(True)
[docs]
writer = sima.VideoWriter(external_params["source"], external_params["udp_host"], external_params["port"],
reader.frame_width, reader.frame_height)
# Configure YOLO MLSoc Session
[docs]
yolo_model_params = external_params["Models"][0]
[docs]
yolo_session = sima.MLSoCSession(yolo_model_params["targz"], pipeline=external_params["pipeline"]["name"],
session_name=yolo_model_params["name"], frame_width=640, frame_height=640)
yolo_session.configure(yolo_model_params)
# yolo_session.set_log_level(sima.LogLevel.DEBUG)
# Configure Teacher MLSoc Session
[docs]
teacher_model_params = external_params["Models"][1]
[docs]
teacher_session = sima.MLSoCSession(teacher_model_params["targz"], pipeline=external_params["pipeline"]["name"],
session_name=teacher_model_params["name"], frame_width=256, frame_height=256)
teacher_session.configure(teacher_model_params)
# Configure Student MLSoc Session
[docs]
student_model_params = external_params["Models"][2]
[docs]
student_session = sima.MLSoCSession(student_model_params["targz"], pipeline=external_params["pipeline"]["name"],
session_name=student_model_params["name"], frame_width=256, frame_height=256)
student_session.configure(student_model_params)
# Configure Autoencoder MLSoc Session
[docs]
autoencoder_model_params = external_params["Models"][3]
[docs]
autoencoder_session = sima.MLSoCSession(
autoencoder_model_params["targz"], pipeline=external_params["pipeline"]["name"], session_name=autoencoder_model_params["name"], frame_width=256, frame_height=256)
autoencoder_session.configure(autoencoder_model_params)
# tvm_obj = spy.apu_helper(SO_PATH, '')
# Other constants
teacher_mean, teacher_std = load_mean_std(Path("mean_std.json"))
q_st_start, q_st_end, q_ae_start, q_ae_end = Constants.q_st_start, Constants.q_st_end, Constants.q_ae_start, Constants.q_ae_end
# Pipeline execution
while reader.isOpened():
ret, frame = reader.read()
print(f"Processing frame--> {reader.frame_num}", end="\r")
[docs]
resized = cv2.resize(frame, (Constants.resize_width, Constants.resize_height))
yolo_out = yolo_session.run_model(resized)
yolo_out = [item.transpose(0, 3, 1, 2) for item in yolo_out]
model_output = np.concatenate(
[merge_levels(yolo_out[3 * i: 3 * (i + 1)]) for i in range(3)], 1), yolo_out[9]
boxes, scores, class_ids, mask_maps = postprocess(model_output, Constants.frame_height, Constants.frame_width)
seg_seat, cropped_mask, (x_min, y_min), (x_max,
y_max) = YoloHelpers.bridge(frame, mask_maps[0])
assert len(mask_maps) == 1, f'Too many masks {len(mask_maps)}'
yolo_result = {
"seg_seat": seg_seat,
"class_idx": class_ids[0],
"mask": mask_maps[0],
"cropped_mask": cropped_mask,
"x_y_min": (x_min, y_min),
"x_y_max": (x_max, y_max)}
# SimaEfficientAdRunner
tensor = cv2.resize(yolo_result['seg_seat'], tuple(Constants.effdet_tensor_dim))
teacher_out = teacher_session.run_model(tensor)[0].transpose(0, 3, 1, 2)
student_out = student_session.run_model(tensor)[0].transpose(0, 3, 1, 2)
autoencoder_out = autoencoder_session.run_model(tensor)[0]
resized_array = cv2.resize(
autoencoder_out[0], (56, 56), interpolation=cv2.INTER_LINEAR)
resized_array = np.expand_dims(resized_array, axis=0)
autoencoder_out = resized_array.transpose(0, 3, 1, 2)
teacher_out = (teacher_out - teacher_mean) / teacher_std
map_st = np.mean(
(teacher_out - student_out[:, : Constants.out_channels]) ** 2, axis=1, keepdims=True)
map_ae = np.mean(
(autoencoder_out - student_out[:, Constants.out_channels:]) ** 2, axis=1, keepdims=True)
if q_st_start is not None:
map_st = 0.1 * (map_st - q_st_start) / (q_st_end - q_st_start)
if q_ae_start is not None:
map_ae = 0.1 * (map_ae - q_ae_start) / (q_ae_end - q_ae_start)
map_combined = 0.5 * map_st + 0.5 * map_ae
results = effad_postproc(yolo_result, map_combined, Constants.cls_threshold)
resized = cv2.resize(results["pred"] * 255, (Constants.output_width, Constants.output_height)).astype(np.uint8)
bgr_image = cv2.cvtColor(resized.astype(np.uint8), cv2.COLOR_GRAY2BGR)
nv12 = sima.cvtColor(bgr_image, Constants.output_width, Constants.output_height, sima.COLOR_BGRTONV12)
writer.write(nv12)