Source code for sima.video_reader

#########################################################
# Copyright (C) 2024-25 SiMa Technologies, Inc.
#
# This material is SiMa proprietary and confidential.
#
# This material may not be copied or distributed without
# the express prior written permission of SiMa.
#
# All rights reserved.
#########################################################

import cv2
import os
import numpy as np
from spy import _spy
from spy.logger import Logger, LogLevel
from typing import Tuple
from .gst_utils import GstReader, GstWriter
from abc import ABC, abstractmethod
from spy.pcie_utils import PCIeUtils
[docs] logger = Logger()
[docs] class Reader(ABC): @abstractmethod
[docs] def read(self) -> Tuple[int, np.ndarray]: pass
@abstractmethod
[docs] def isOpened(self) -> bool: pass
@abstractmethod
[docs] def get_cam_resolution(self) -> Tuple[int, int]: pass
@abstractmethod
[docs] def release(self): pass
[docs] class Writer(ABC): @abstractmethod
[docs] def write(self, frame, meta=None): pass
@abstractmethod
[docs] def release(self): pass
[docs] class RTSPReader(Reader): def __init__(self, cam_source: str):
[docs] self.frame_num = 0
os.environ['GST_DEBUG'] = "3" os.environ['GST_PLUGIN_PATH'] = '/usr/local/lib/gstreamer-1.0' os.environ['LD_LIBRARY_PATH'] = '/usr/local/lib/gstreamer-1.0'
[docs] self.app_source = ( f"rtspsrc location={cam_source} latency=0 ! rtph264depay wait-for-keyframe=true ! " f"h264parse ! video/x-h264,parsed=true,stream-format=(string)byte-stream,alignment=(string)au ! " f"simaaidecoder name=simaaidecoder1 op-buff-name=simaaidecoder1 dec-fmt=NV12 ! " f"appsink sync=false max-buffers=100 drop=true" )
[docs] self.cap = cv2.VideoCapture(self.app_source, cv2.CAP_GSTREAMER)
self.frame_width, self.frame_height = int(self.cap.get( cv2.CAP_PROP_FRAME_WIDTH)), int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) if not self.cap.isOpened(): raise RuntimeError( f"Failed to initialize RTSPReader with cam_source={cam_source}.") os.environ['GST_SIMAAI_ALLEGRO_CAPS'] = f'type=video/x-raw,width={self.frame_width},height={self.frame_height},format=YUV420' logger.log(LogLevel.INFO, "RTSP Reader initialised.")
[docs] def get_cam_resolution(self) -> Tuple[int, int]: """ Get frame width and frame height """ return self.frame_width, self.frame_height
[docs] def read(self): ret, frame = self.cap.read() if ret: self.frame_num += 1 return ret, frame.flatten() return False, None
[docs] def isOpened(self) -> bool: return self.cap.isOpened()
[docs] def release(self): self.cap.release()
[docs] class FolderImageReader: def __init__(self, folder_path: str, frame_width: int = 1280, frame_height: int = 720):
[docs] self.folder_path = folder_path
[docs] self.loop = False
[docs] self.image_files = sorted( [f for f in os.listdir(folder_path) if f.lower().endswith( ('png', 'jpg', 'jpeg'))] )
[docs] self.frame_num = 0
if not self.image_files: raise RuntimeError(f"No images found in folder: {folder_path}")
[docs] self.total_images = len(self.image_files)
[docs] self.current_index = 0
self.frame_width, self.frame_height = frame_width, frame_height logger.log( LogLevel.INFO, f"Folder Image Reader initialized with: {self.total_images} images.")
[docs] def set_loop(self, val): self.loop = val
[docs] def get_cam_resolution(self) -> Tuple[int, int]: """ Get frame width and frame height """ return self.frame_width, self.frame_height
[docs] def read(self) -> Tuple[int, np.ndarray]: if self.current_index >= self.total_images: if self.loop: self.current_index = 0 # Restart from the first image else: return False, None # Stop reading if not looping image_path = os.path.join( self.folder_path, self.image_files[self.current_index]) frame = cv2.imread(image_path) self.frame_num += 1 self.current_index += 1 return True, frame
[docs] def isOpened(self) -> bool: return self.loop or self.current_index < self.total_images
[docs] def release(self): print("Folder Image Reader released.")
[docs] class PCIeReader(Reader): def __init__(self, frame_width: int = 1280, frame_height: int = 720, data_buf_name: str = "in-img-source"): data_buf_size = int(frame_width * frame_height * 1.5)
[docs] self.app_source = f"simaaipciesrc buffer-size={data_buf_size} ! appsink sync=true max-buffers=100 drop=true"
[docs] self.data_reader = GstReader(self.app_source)
[docs] self.frame_num = 0
self.frame_width, self.frame_height = frame_width, frame_height logger.log( LogLevel.INFO, f"PCIE Reader initialised with data buf size: {data_buf_size}.")
[docs] def get_cam_resolution(self) -> Tuple[int, int]: """ Get frame width and frame height """ return self.frame_width, self.frame_height
[docs] def read(self) -> Tuple[int, np.ndarray]: data, meta = self.data_reader.read() if data is not None: self.frame_num += 1 return np.frombuffer(data, dtype=np.uint8), meta return None, None
[docs] def isOpened(self) -> bool: return self.data_reader is not None
[docs] def release(self): self.data_reader = None
[docs] class RTSPWriter(Writer): def __init__(self, host_ip: str, port: int, width: int, height: int): os.environ['GST_DEBUG'] = "3" os.environ['GST_PLUGIN_PATH'] = '/usr/local/lib/gstreamer-1.0' os.environ['LD_LIBRARY_PATH'] = '/usr/local/lib/gstreamer-1.0' os.environ['GST_SIMAAI_ALLEGRO_CAPS'] = f'type=video/x-raw,width={width},height={height},format=YUV420'
[docs] self.app_sink = ( f"appsrc ! video/x-raw,format=NV12,width={width},height={height},framerate=30/1 ! " f"simaaiencoder enc-width={width} enc-height={height} enc-bitrate=4000 name=simaaiencoder1 ! " f"h264parse ! rtph264pay ! udpsink host={host_ip} port={port}" )
[docs] self.writer = cv2.VideoWriter( self.app_sink, cv2.CAP_GSTREAMER, 0, 30.0, (width, height), False)
if not self.writer.isOpened(): raise RuntimeError("Failed to initialize RTSPWriter.")
[docs] def write(self, frame, meta=None): self.writer.write(frame)
[docs] def release(self): self.writer.release()
[docs] class PCIeWriter(Writer): def __init__(self, data_buf_name: str = "overlay", frame_width: int = 1280, frame_height: int = 720):
[docs] self.buf_size = int(frame_width * frame_height * 1.5)
[docs] self.app_sink = f"appsrc ! simaaimetaparser dump-data=false ! simaaipciesink data-buffer-size={self.buf_size} use-multi-buffers=0 data-buf-name={data_buf_name}"
[docs] self.data_writer = GstWriter(self.app_sink, isPcie=True)
[docs] def write(self, frame, meta=None): self.data_writer.write(frame, meta=meta)
[docs] def release(self): self.data_writer = None
[docs] class VideoReader(): """ A class for reading video from various sources, including ``pcie``, ``rtspsrc`` and ``filesrc``. """ def __init__(self, source: str = None, frame_width: int = 1280, frame_height: int = 720): source_type = source["name"] match source_type: case "pcie": self.reader = PCIeReader(frame_width, frame_height) case "rtspsrc": self.reader = RTSPReader(source["value"]) case "filesrc": self.reader = FolderImageReader(source["value"]) case _: raise ValueError(f"Unsupported image source: {source_type}") self.frame_width, self.frame_height = self.get_cam_resolution()
[docs] def read(self) -> Tuple[int, np.ndarray]: """ Reads a frame from the video source. """ return self.reader.read()
[docs] def isOpened(self) -> bool: """ Check whether the video source has been successfully opened. This method verifies that the underlying video reader or stream is properly initialized and ready for reading frames. Returns: bool: True if the video source is open and accessible; False otherwise. """ return self.reader.isOpened()
[docs] def get_cam_resolution(self) -> Tuple[int, int]: """ Get frame width and frame height """ return self.reader.get_cam_resolution()
[docs] def release(self): """ Releases the VideoReader resources. """ logger.log(LogLevel.INFO, "VideoReader released.") return self.reader.release()
[docs] def set_loop(self, val): if hasattr(self.reader, "set_loop"): return self.reader.set_loop(val) else: logger.log(LogLevel.WARNING, "Looping is only applicable for FolderImageReader.")
@property
[docs] def frame_num(self): return self.reader.frame_num
[docs] class VideoWriter: """ A class for writing video stream to various destinations, including ``pcie``, ``rtspsrc`` and ``filesrc``. """ def __init__(self, source: str = None, host_ip: str = None, port: int = None, frame_width: int = None, frame_height: int = None): source_type = source["name"] match source_type: case "pcie": self.writer = PCIeWriter( frame_width=frame_width, frame_height=frame_height) case "rtspsrc" | "filesrc": self.writer = RTSPWriter( host_ip, port, frame_width, frame_height) case _: raise ValueError(f"Unsupported cam_source: {source_type}")
[docs] def write(self, frame, meta=None): """ Writes a frame to the video sink. """ self.writer.write(frame, meta)
[docs] def release(self): """ Releases the VideoWriter resources. """ self.writer.release()