Source code for sima.gst_utils

import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstApp', '1.0')
from gi.repository import Gst, GstApp
import ctypes
import numpy as np
from typing import Union

[docs] SIMAAI_META_STR = "GstSimaMeta"
[docs] class MetadataStruct(ctypes.Structure): _fields_ = [ ("buffer_id", ctypes.c_int64), ("frame_id", ctypes.c_int64), ("timestamp", ctypes.c_uint64), ("buffer_offset", ctypes.c_int64), ("pcie_buffer_id", ctypes.c_int64), ("stream_id_len", ctypes.c_uint32), # Length of stream_id string ("buffer_name_len", ctypes.c_uint32), # Length of buffer_name string ]
[docs] def extract_metadata(buffer: Gst.Buffer) -> dict: """Extract metadata from a Gst.Buffer and return it as a dictionary.""" meta = buffer.get_custom_meta(SIMAAI_META_STR) if not meta or not (s := meta.get_structure()): print("No metadata structure found in buffer") return None metadata = {} try: # Extract PCIE info ret, pcie_buffer_id = s.get_int64("pcie-buffer-id") if ret: metadata['is_pcie'] = True metadata['pcie_buffer_id'] = pcie_buffer_id # Extract int64 fields for field in ['buffer-id', 'frame-id', 'buffer-offset']: ret, value = s.get_int64(field) if ret: metadata[field.replace('-', '_')] = value # Extract uint64 fields ret, timestamp = s.get_uint64('timestamp') if ret: metadata['timestamp'] = timestamp # Extract string fields for field in ['stream-id', 'buffer-name']: value = s.get_string(field) if value: metadata[field.replace('-', '_')] = value return metadata except Exception as e: print(f"Error extracting metadata: {e}") return None
[docs] def create_metadata_struct(meta_dict: dict) -> MetadataStruct: meta = MetadataStruct() meta.buffer_id = meta_dict.get('buffer_id', 0) meta.frame_id = meta_dict.get('frame_id', 0) meta.timestamp = meta_dict.get('timestamp', 0) meta.buffer_offset = meta_dict.get('buffer_offset', 0) meta.pcie_buffer_id = meta_dict.get('pcie_buffer_id', 0) meta.stream_id_len = len(meta_dict.get('stream_id', '')) meta.buffer_name_len = len(meta_dict.get('buffer_name', '')) stream_id_bytes = meta_dict.get('stream_id', '').encode('utf-8') buffer_name_bytes = meta_dict.get('buffer_name', '').encode('utf-8') packed_metadata_bytes = bytes(meta) + stream_id_bytes + buffer_name_bytes return packed_metadata_bytes
[docs] def get_dummy_metadata(): meta = MetadataStruct() meta.buffer_id = 0 meta.frame_id = 0 meta.timestamp = 0 meta.buffer_offset = 0 meta.pcie_buffer_id = 0 meta.stream_id_len = 0 meta.buffer_name_len = 0 return meta
Gst.init(None) Gst.Meta.register_custom( SIMAAI_META_STR, "", # Optional description None, # Default init function None # Default free function )
[docs] class GstReader: def __init__(self, pipeline_str): if 'appsink' not in pipeline_str: raise ValueError("Pipeline string must contain 'appsink'")
[docs] self.pipeline = Gst.parse_launch(pipeline_str)
if self.pipeline is None: raise ValueError("Failed to create pipeline")
[docs] self.appsink = self.get_appsink()
if not self.appsink: raise ValueError("Pipeline string must contain an element named 'appsink'") self.pipeline.set_state(Gst.State.PLAYING) def __del__(self): self.stop()
[docs] def get_appsink(self): for element in self.pipeline.iterate_elements(): if element.get_factory().get_klass().find('Sink') != -1: if element.get_factory().get_name() == 'appsink': return element return None
[docs] def read(self): sample = self.appsink.try_pull_sample(1e9) if sample: buffer = sample.get_buffer() meta_dict = extract_metadata(buffer) result, map_info = buffer.map(Gst.MapFlags.READ) if result: data = bytes(map_info.data) buffer.unmap(map_info) return data, meta_dict else: RuntimeWarning("No sample available") return None, None
[docs] def stop(self): self.pipeline.set_state(Gst.State.NULL)
[docs] class GstWriter: def __init__(self, pipeline_str, isPcie=False):
[docs] self.isPcie = isPcie
if 'appsrc' not in pipeline_str: raise ValueError("Pipeline string must contain 'appsrc'")
[docs] self.pipeline = Gst.parse_launch(pipeline_str)
if not self.pipeline: raise ValueError("Failed to create pipeline")
[docs] self.appsrc = self.get_appsrc()
if not self.appsrc: raise ValueError("Pipeline string must contain an element named 'appsrc'") self.pipeline.set_state(Gst.State.PLAYING) def __del__(self): self.stop()
[docs] def write(self, data: Union[bytes, np.ndarray], meta: dict = None): if isinstance(data, np.ndarray): data = data.tobytes() #PCIe sink requires legitimate metadata. Hence the functionality to #include it, via the metaparser plugin. # We have to use metaparser because python cannot embed metadata into the buffer. if self.isPcie: if meta is not None: meta_bytes = create_metadata_struct(meta) total_size = 4 + len(meta_bytes) + len(data) buffer = Gst.Buffer.new_allocate(None, total_size, None) meta_size_bytes = len(meta_bytes).to_bytes(4, byteorder='little') data = meta_size_bytes + meta_bytes + data else: buffer = Gst.Buffer.new_allocate(None, len(data), None) else: buffer = Gst.Buffer.new_allocate(None, len(data), None) buffer.fill(0, data) self.appsrc.emit('push-buffer', buffer)
[docs] def stop(self): self.pipeline.set_state(Gst.State.NULL)
[docs] def get_appsrc(self): for element in self.pipeline.iterate_elements(): if element.get_factory().get_name() == 'appsrc': return element return None