Source code for sima.gst_utils
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstApp', '1.0')
from gi.repository import Gst, GstApp
import ctypes
import numpy as np
from typing import Union
[docs]
class MetadataStruct(ctypes.Structure):
_fields_ = [
("buffer_id", ctypes.c_int64),
("frame_id", ctypes.c_int64),
("timestamp", ctypes.c_uint64),
("buffer_offset", ctypes.c_int64),
("pcie_buffer_id", ctypes.c_int64),
("stream_id_len", ctypes.c_uint32), # Length of stream_id string
("buffer_name_len", ctypes.c_uint32), # Length of buffer_name string
]
[docs]
def extract_metadata(buffer: Gst.Buffer) -> dict:
"""Extract metadata from a Gst.Buffer and return it as a dictionary."""
meta = buffer.get_custom_meta(SIMAAI_META_STR)
if not meta or not (s := meta.get_structure()):
print("No metadata structure found in buffer")
return None
metadata = {}
try:
# Extract PCIE info
ret, pcie_buffer_id = s.get_int64("pcie-buffer-id")
if ret:
metadata['is_pcie'] = True
metadata['pcie_buffer_id'] = pcie_buffer_id
# Extract int64 fields
for field in ['buffer-id', 'frame-id', 'buffer-offset']:
ret, value = s.get_int64(field)
if ret:
metadata[field.replace('-', '_')] = value
# Extract uint64 fields
ret, timestamp = s.get_uint64('timestamp')
if ret:
metadata['timestamp'] = timestamp
# Extract string fields
for field in ['stream-id', 'buffer-name']:
value = s.get_string(field)
if value:
metadata[field.replace('-', '_')] = value
return metadata
except Exception as e:
print(f"Error extracting metadata: {e}")
return None
[docs]
def create_metadata_struct(meta_dict: dict) -> MetadataStruct:
meta = MetadataStruct()
meta.buffer_id = meta_dict.get('buffer_id', 0)
meta.frame_id = meta_dict.get('frame_id', 0)
meta.timestamp = meta_dict.get('timestamp', 0)
meta.buffer_offset = meta_dict.get('buffer_offset', 0)
meta.pcie_buffer_id = meta_dict.get('pcie_buffer_id', 0)
meta.stream_id_len = len(meta_dict.get('stream_id', ''))
meta.buffer_name_len = len(meta_dict.get('buffer_name', ''))
stream_id_bytes = meta_dict.get('stream_id', '').encode('utf-8')
buffer_name_bytes = meta_dict.get('buffer_name', '').encode('utf-8')
packed_metadata_bytes = bytes(meta) + stream_id_bytes + buffer_name_bytes
return packed_metadata_bytes
[docs]
def get_dummy_metadata():
meta = MetadataStruct()
meta.buffer_id = 0
meta.frame_id = 0
meta.timestamp = 0
meta.buffer_offset = 0
meta.pcie_buffer_id = 0
meta.stream_id_len = 0
meta.buffer_name_len = 0
return meta
Gst.init(None)
Gst.Meta.register_custom(
SIMAAI_META_STR,
"", # Optional description
None, # Default init function
None # Default free function
)
[docs]
class GstReader:
def __init__(self, pipeline_str):
if 'appsink' not in pipeline_str:
raise ValueError("Pipeline string must contain 'appsink'")
if self.pipeline is None:
raise ValueError("Failed to create pipeline")
if not self.appsink:
raise ValueError("Pipeline string must contain an element named 'appsink'")
self.pipeline.set_state(Gst.State.PLAYING)
def __del__(self):
self.stop()
[docs]
def get_appsink(self):
for element in self.pipeline.iterate_elements():
if element.get_factory().get_klass().find('Sink') != -1:
if element.get_factory().get_name() == 'appsink':
return element
return None
[docs]
def read(self):
sample = self.appsink.try_pull_sample(1e9)
if sample:
buffer = sample.get_buffer()
meta_dict = extract_metadata(buffer)
result, map_info = buffer.map(Gst.MapFlags.READ)
if result:
data = bytes(map_info.data)
buffer.unmap(map_info)
return data, meta_dict
else:
RuntimeWarning("No sample available")
return None, None
[docs]
class GstWriter:
def __init__(self, pipeline_str, isPcie=False):
if 'appsrc' not in pipeline_str:
raise ValueError("Pipeline string must contain 'appsrc'")
if not self.pipeline:
raise ValueError("Failed to create pipeline")
if not self.appsrc:
raise ValueError("Pipeline string must contain an element named 'appsrc'")
self.pipeline.set_state(Gst.State.PLAYING)
def __del__(self):
self.stop()
[docs]
def write(self, data: Union[bytes, np.ndarray], meta: dict = None):
if isinstance(data, np.ndarray):
data = data.tobytes()
#PCIe sink requires legitimate metadata. Hence the functionality to
#include it, via the metaparser plugin.
# We have to use metaparser because python cannot embed metadata into the buffer.
if self.isPcie:
if meta is not None:
meta_bytes = create_metadata_struct(meta)
total_size = 4 + len(meta_bytes) + len(data)
buffer = Gst.Buffer.new_allocate(None, total_size, None)
meta_size_bytes = len(meta_bytes).to_bytes(4, byteorder='little')
data = meta_size_bytes + meta_bytes + data
else:
buffer = Gst.Buffer.new_allocate(None, len(data), None)
else:
buffer = Gst.Buffer.new_allocate(None, len(data), None)
buffer.fill(0, data)
self.appsrc.emit('push-buffer', buffer)
[docs]
def get_appsrc(self):
for element in self.pipeline.iterate_elements():
if element.get_factory().get_name() == 'appsrc':
return element
return None