#########################################################
# Copyright (C) 2024-25 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################
import cv2
import os
import numpy as np
from spy import _spy
from spy.logger import Logger, LogLevel
from typing import Tuple
from .gst_utils import GstReader, GstWriter
from abc import ABC, abstractmethod
from spy.pcie_utils import PCIeUtils
[docs]
class Reader(ABC):
@abstractmethod
[docs]
def read(self) -> Tuple[int, np.ndarray]:
pass
@abstractmethod
[docs]
def isOpened(self) -> bool:
pass
@abstractmethod
[docs]
def get_cam_resolution(self) -> Tuple[int, int]:
pass
@abstractmethod
[docs]
def release(self):
pass
[docs]
class Writer(ABC):
@abstractmethod
[docs]
def write(self, frame, meta=None):
pass
@abstractmethod
[docs]
def release(self):
pass
[docs]
class RTSPReader(Reader):
def __init__(self, cam_source: str):
os.environ['GST_DEBUG'] = "3"
os.environ['GST_PLUGIN_PATH'] = '/usr/local/lib/gstreamer-1.0'
os.environ['LD_LIBRARY_PATH'] = '/usr/local/lib/gstreamer-1.0'
[docs]
self.app_source = (
f"rtspsrc location={cam_source} latency=0 ! rtph264depay wait-for-keyframe=true ! "
f"h264parse ! video/x-h264,parsed=true,stream-format=(string)byte-stream,alignment=(string)au ! "
f"simaaidecoder name=simaaidecoder1 op-buff-name=simaaidecoder1 dec-fmt=NV12 ! "
f"appsink sync=false max-buffers=100 drop=true"
)
[docs]
self.cap = cv2.VideoCapture(self.app_source, cv2.CAP_GSTREAMER)
self.frame_width, self.frame_height = int(self.cap.get(
cv2.CAP_PROP_FRAME_WIDTH)), int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if not self.cap.isOpened():
raise RuntimeError(
f"Failed to initialize RTSPReader with cam_source={cam_source}.")
os.environ['GST_SIMAAI_ALLEGRO_CAPS'] = f'type=video/x-raw,width={self.frame_width},height={self.frame_height},format=YUV420'
logger.log(LogLevel.INFO, "RTSP Reader initialised.")
[docs]
def get_cam_resolution(self) -> Tuple[int, int]:
"""
Get frame width and frame height
"""
return self.frame_width, self.frame_height
[docs]
def read(self):
ret, frame = self.cap.read()
if ret:
self.frame_num += 1
return ret, frame.flatten()
return False, None
[docs]
def isOpened(self) -> bool:
return self.cap.isOpened()
[docs]
def release(self):
self.cap.release()
[docs]
class FolderImageReader:
def __init__(self, folder_path: str, frame_width: int = 1280, frame_height: int = 720):
[docs]
self.folder_path = folder_path
[docs]
self.image_files = sorted(
[f for f in os.listdir(folder_path) if f.lower().endswith(
('png', 'jpg', 'jpeg'))]
)
if not self.image_files:
raise RuntimeError(f"No images found in folder: {folder_path}")
[docs]
self.total_images = len(self.image_files)
self.frame_width, self.frame_height = frame_width, frame_height
logger.log(
LogLevel.INFO, f"Folder Image Reader initialized with: {self.total_images} images.")
[docs]
def set_loop(self, val):
self.loop = val
[docs]
def get_cam_resolution(self) -> Tuple[int, int]:
""" Get frame width and frame height """
return self.frame_width, self.frame_height
[docs]
def read(self) -> Tuple[int, np.ndarray]:
if self.current_index >= self.total_images:
if self.loop:
self.current_index = 0 # Restart from the first image
else:
return False, None # Stop reading if not looping
image_path = os.path.join(
self.folder_path, self.image_files[self.current_index])
frame = cv2.imread(image_path)
self.frame_num += 1
self.current_index += 1
return True, frame
[docs]
def isOpened(self) -> bool:
return self.loop or self.current_index < self.total_images
[docs]
def release(self):
print("Folder Image Reader released.")
[docs]
class PCIeReader(Reader):
def __init__(self, frame_width: int = 1280, frame_height: int = 720, data_buf_name: str = "in-img-source"):
data_buf_size = int(frame_width * frame_height * 1.5)
[docs]
self.app_source = f"simaaipciesrc buffer-size={data_buf_size} ! appsink sync=true max-buffers=100 drop=true"
[docs]
self.data_reader = GstReader(self.app_source)
self.frame_width, self.frame_height = frame_width, frame_height
logger.log(
LogLevel.INFO, f"PCIE Reader initialised with data buf size: {data_buf_size}.")
[docs]
def get_cam_resolution(self) -> Tuple[int, int]:
"""
Get frame width and frame height
"""
return self.frame_width, self.frame_height
[docs]
def read(self) -> Tuple[int, np.ndarray]:
data, meta = self.data_reader.read()
if data is not None:
self.frame_num += 1
return np.frombuffer(data, dtype=np.uint8), meta
return None, None
[docs]
def isOpened(self) -> bool:
return self.data_reader is not None
[docs]
def release(self):
self.data_reader = None
[docs]
class RTSPWriter(Writer):
def __init__(self, host_ip: str, port: int, width: int, height: int):
os.environ['GST_DEBUG'] = "3"
os.environ['GST_PLUGIN_PATH'] = '/usr/local/lib/gstreamer-1.0'
os.environ['LD_LIBRARY_PATH'] = '/usr/local/lib/gstreamer-1.0'
os.environ['GST_SIMAAI_ALLEGRO_CAPS'] = f'type=video/x-raw,width={width},height={height},format=YUV420'
[docs]
self.app_sink = (
f"appsrc ! video/x-raw,format=NV12,width={width},height={height},framerate=30/1 ! "
f"simaaiencoder enc-width={width} enc-height={height} enc-bitrate=4000 name=simaaiencoder1 ! "
f"h264parse ! rtph264pay ! udpsink host={host_ip} port={port}"
)
[docs]
self.writer = cv2.VideoWriter(
self.app_sink, cv2.CAP_GSTREAMER, 0, 30.0, (width, height), False)
if not self.writer.isOpened():
raise RuntimeError("Failed to initialize RTSPWriter.")
[docs]
def write(self, frame, meta=None):
self.writer.write(frame)
[docs]
def release(self):
self.writer.release()
[docs]
class PCIeWriter(Writer):
def __init__(self, data_buf_name: str = "overlay", frame_width: int = 1280, frame_height: int = 720):
[docs]
self.buf_size = int(frame_width * frame_height * 1.5)
[docs]
self.app_sink = f"appsrc ! simaaimetaparser dump-data=false ! simaaipciesink data-buffer-size={self.buf_size} use-multi-buffers=0 data-buf-name={data_buf_name}"
[docs]
self.data_writer = GstWriter(self.app_sink, isPcie=True)
[docs]
def write(self, frame, meta=None):
self.data_writer.write(frame, meta=meta)
[docs]
def release(self):
self.data_writer = None
[docs]
class VideoReader():
"""
A class for reading video from various sources, including ``pcie``, ``rtspsrc`` and ``filesrc``.
"""
def __init__(self, source: str = None, frame_width: int = 1280, frame_height: int = 720):
source_type = source["name"]
match source_type:
case "pcie":
self.reader = PCIeReader(frame_width, frame_height)
case "rtspsrc":
self.reader = RTSPReader(source["value"])
case "filesrc":
self.reader = FolderImageReader(source["value"])
case _:
raise ValueError(f"Unsupported image source: {source_type}")
self.frame_width, self.frame_height = self.get_cam_resolution()
[docs]
def read(self) -> Tuple[int, np.ndarray]:
"""
Reads a frame from the video source.
"""
return self.reader.read()
[docs]
def isOpened(self) -> bool:
"""
Check whether the video source has been successfully opened.
This method verifies that the underlying video reader or stream is properly initialized
and ready for reading frames.
Returns:
bool: True if the video source is open and accessible; False otherwise.
"""
return self.reader.isOpened()
[docs]
def get_cam_resolution(self) -> Tuple[int, int]:
"""
Get frame width and frame height
"""
return self.reader.get_cam_resolution()
[docs]
def release(self):
"""
Releases the VideoReader resources.
"""
logger.log(LogLevel.INFO, "VideoReader released.")
return self.reader.release()
[docs]
def set_loop(self, val):
if hasattr(self.reader, "set_loop"):
return self.reader.set_loop(val)
else:
logger.log(LogLevel.WARNING, "Looping is only applicable for FolderImageReader.")
@property
[docs]
def frame_num(self):
return self.reader.frame_num
[docs]
class VideoWriter:
"""
A class for writing video stream to various destinations, including ``pcie``, ``rtspsrc`` and ``filesrc``.
"""
def __init__(self, source: str = None, host_ip: str = None, port: int = None, frame_width: int = None, frame_height: int = None):
source_type = source["name"]
match source_type:
case "pcie":
self.writer = PCIeWriter(
frame_width=frame_width, frame_height=frame_height)
case "rtspsrc" | "filesrc":
self.writer = RTSPWriter(
host_ip, port, frame_width, frame_height)
case _:
raise ValueError(f"Unsupported cam_source: {source_type}")
[docs]
def write(self, frame, meta=None):
"""
Writes a frame to the video sink.
"""
self.writer.write(frame, meta)
[docs]
def release(self):
"""
Releases the VideoWriter resources.
"""
self.writer.release()