Source code for sima.gst_utils

import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstApp', '1.0')
gi.require_version('GstVideo', '1.0')
from gi.repository import Gst, GstApp, GstVideo
import ctypes
import time
import numpy as np
from typing import Union

[docs] SIMAAI_META_STR = "GstSimaMeta"
from spy.logger import Logger, LogLevel
[docs] logger = Logger()
[docs] class MetadataStruct(ctypes.Structure): _fields_ = [ ("buffer_id", ctypes.c_int64), ("frame_id", ctypes.c_int64), ("timestamp", ctypes.c_uint64), ("buffer_offset", ctypes.c_int64), ("pcie_buffer_id", ctypes.c_int64), ("stream_id_len", ctypes.c_uint32), # Length of stream_id string ("buffer_name_len", ctypes.c_uint32), # Length of buffer_name string ]
[docs] def extract_metadata(buffer: Gst.Buffer) -> dict: """Extract metadata from a Gst.Buffer and return it as a dictionary.""" meta = buffer.get_custom_meta(SIMAAI_META_STR) if not meta or not (s := meta.get_structure()): print("No metadata structure found in buffer") return None metadata = {} try: # Extract PCIE info ret, pcie_buffer_id = s.get_int64("pcie-buffer-id") if ret: metadata['is_pcie'] = True metadata['pcie_buffer_id'] = pcie_buffer_id # Extract int64 fields for field in ['buffer-id', 'frame-id', 'buffer-offset']: ret, value = s.get_int64(field) if ret: metadata[field.replace('-', '_')] = value # Extract uint64 fields ret, timestamp = s.get_uint64('timestamp') if ret: metadata['timestamp'] = timestamp # Extract string fields for field in ['stream-id', 'buffer-name']: value = s.get_string(field) if value: metadata[field.replace('-', '_')] = value return metadata except Exception as e: print(f"Error extracting metadata: {e}") return None
[docs] def create_metadata_struct(meta_dict: dict) -> MetadataStruct: meta = MetadataStruct() meta.buffer_id = meta_dict.get('buffer_id', 0) meta.frame_id = meta_dict.get('frame_id', 0) meta.timestamp = meta_dict.get('timestamp', 0) meta.buffer_offset = meta_dict.get('buffer_offset', 0) meta.pcie_buffer_id = meta_dict.get('pcie_buffer_id', 0) meta.stream_id_len = len(meta_dict.get('stream_id', '')) meta.buffer_name_len = len(meta_dict.get('buffer_name', '')) stream_id_bytes = meta_dict.get('stream_id', '').encode('utf-8') buffer_name_bytes = meta_dict.get('buffer_name', '').encode('utf-8') packed_metadata_bytes = bytes(meta) + stream_id_bytes + buffer_name_bytes return packed_metadata_bytes
[docs] def get_dummy_metadata(): meta = MetadataStruct() meta.buffer_id = 0 meta.frame_id = 0 meta.timestamp = 0 meta.buffer_offset = 0 meta.pcie_buffer_id = 0 meta.stream_id_len = 0 meta.buffer_name_len = 0 return meta
Gst.init(None) Gst.Meta.register_custom( SIMAAI_META_STR, "", # Optional description None, # Default init function None # Default free function )
[docs] class GstReader: def __init__(self, pipeline_str, resolution_timeout: float = 50.0, isPcie: bool=False): if 'appsink' not in pipeline_str: raise ValueError("Pipeline string must contain 'appsink'")
[docs] self.pipeline = Gst.parse_launch(pipeline_str)
if self.pipeline is None: raise ValueError("Failed to create pipeline")
[docs] self.appsink = self.get_appsink()
if not self.appsink: raise ValueError("Pipeline string must contain an element named 'appsink'") self.pipeline.set_state(Gst.State.PLAYING)
[docs] self.frame_width = 0
[docs] self.frame_height = 0
[docs] self.fps = 0
if not isPcie: # Extract resolution from negotiated caps self._get_video_info(timeout=resolution_timeout) def __del__(self): self.stop()
[docs] def get_appsink(self): for element in self.pipeline.iterate_elements(): if element.get_factory().get_klass().find('Sink') != -1: if element.get_factory().get_name() == 'appsink': return element return None
def _get_video_info(self, timeout: float = 5.0) -> bool: pad = self.appsink.get_static_pad('sink') start = time.time() caps = None while time.time() - start < timeout: caps = pad.get_current_caps() if caps and caps.get_size() > 0: break time.sleep(0.01) if not caps or caps.get_size() == 0: raise RuntimeError("Timed out waiting for caps negotiation on appsink") video_info = GstVideo.VideoInfo.new_from_caps(caps) if not video_info: raise RuntimeError("Failed to parse video caps for resolution") self.video_info = video_info self.fps = video_info.fps_n self.frame_width = video_info.width self.frame_height = video_info.height return True
[docs] def isOpened(self) -> bool: _, state, _ = self.pipeline.get_state(0) return state == Gst.State.PLAYING
[docs] def release(self): self.stop()
[docs] def read(self): sample = self.appsink.try_pull_sample(1e9) if sample: buffer = sample.get_buffer() meta_dict = extract_metadata(buffer) result, map_info = buffer.map(Gst.MapFlags.READ) if result: data = bytes(map_info.data) buffer.unmap(map_info) return data, meta_dict else: RuntimeWarning("No sample available") return None, None
[docs] def stop(self): self.pipeline.set_state(Gst.State.NULL)
[docs] class GstWriter: def __init__(self, pipeline_str, isPcie=False):
[docs] self.isPcie = isPcie
if 'appsrc' not in pipeline_str: raise ValueError("Pipeline string must contain 'appsrc'")
[docs] self.pipeline = Gst.parse_launch(pipeline_str)
if not self.pipeline: raise ValueError("Failed to create pipeline")
[docs] self.appsrc = self.get_appsrc()
if not self.appsrc: raise ValueError("Pipeline string must contain an element named 'appsrc'") self.pipeline.set_state(Gst.State.PLAYING) def __del__(self): self.stop()
[docs] def write(self, data: Union[bytes, np.ndarray], meta: dict = None): if isinstance(data, np.ndarray): data = data.tobytes() # PCIe sink requires legitimate metadata. Hence the functionality to #include it, via the metaparser plugin. # We have to use metaparser because python cannot embed metadata into the buffer. if self.isPcie: if meta is not None: meta_bytes = create_metadata_struct(meta) total_size = 4 + len(meta_bytes) + len(data) buffer = Gst.Buffer.new_allocate(None, total_size, None) meta_size_bytes = len(meta_bytes).to_bytes(4, byteorder='little') data = meta_size_bytes + meta_bytes + data else: buffer = Gst.Buffer.new_allocate(None, len(data), None) else: buffer = Gst.Buffer.new_allocate(None, len(data), None) buffer.fill(0, data) self.appsrc.emit('push-buffer', buffer)
[docs] def isOpened(self) -> bool: _, state, _ = self.pipeline.get_state(0) return state == Gst.State.PLAYING
[docs] def stop(self): self.pipeline.set_state(Gst.State.NULL)
[docs] def get_appsrc(self): for element in self.pipeline.iterate_elements(): if element.get_factory().get_name() == 'appsrc': return element return None