import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstApp', '1.0')
gi.require_version('GstVideo', '1.0')
from gi.repository import Gst, GstApp, GstVideo
import ctypes
import time
import numpy as np
from typing import Union
from spy.logger import Logger, LogLevel
Gst.init(None)
Gst.Meta.register_custom(
SIMAAI_META_STR,
"", # Optional description
None, # Default init function
None # Default free function
)
[docs]
class GstReader:
def __init__(self, pipeline_str, resolution_timeout: float = 50.0, isPcie: bool=False):
if 'appsink' not in pipeline_str:
raise ValueError("Pipeline string must contain 'appsink'")
[docs]
self.pipeline = Gst.parse_launch(pipeline_str)
if self.pipeline is None:
raise ValueError("Failed to create pipeline")
[docs]
self.appsink = self.get_appsink()
if not self.appsink:
raise ValueError("Pipeline string must contain an element named 'appsink'")
self.pipeline.set_state(Gst.State.PLAYING)
if not isPcie:
# Extract resolution from negotiated caps
self._get_video_info(timeout=resolution_timeout)
def __del__(self):
self.stop()
[docs]
def get_appsink(self):
for element in self.pipeline.iterate_elements():
if element.get_factory().get_klass().find('Sink') != -1:
if element.get_factory().get_name() == 'appsink':
return element
return None
def _get_video_info(self, timeout: float = 5.0) -> bool:
pad = self.appsink.get_static_pad('sink')
start = time.time()
caps = None
while time.time() - start < timeout:
caps = pad.get_current_caps()
if caps and caps.get_size() > 0:
break
time.sleep(0.01)
if not caps or caps.get_size() == 0:
raise RuntimeError("Timed out waiting for caps negotiation on appsink")
video_info = GstVideo.VideoInfo.new_from_caps(caps)
if not video_info:
raise RuntimeError("Failed to parse video caps for resolution")
self.video_info = video_info
self.fps = video_info.fps_n
self.frame_width = video_info.width
self.frame_height = video_info.height
return True
[docs]
def isOpened(self) -> bool:
_, state, _ = self.pipeline.get_state(0)
return state == Gst.State.PLAYING
[docs]
def release(self):
self.stop()
[docs]
def read(self):
sample = self.appsink.try_pull_sample(1e9)
if sample:
buffer = sample.get_buffer()
meta_dict = extract_metadata(buffer)
result, map_info = buffer.map(Gst.MapFlags.READ)
if result:
data = bytes(map_info.data)
buffer.unmap(map_info)
return data, meta_dict
else:
RuntimeWarning("No sample available")
return None, None
[docs]
def stop(self):
self.pipeline.set_state(Gst.State.NULL)
[docs]
class GstWriter:
def __init__(self, pipeline_str, isPcie=False):
if 'appsrc' not in pipeline_str:
raise ValueError("Pipeline string must contain 'appsrc'")
[docs]
self.pipeline = Gst.parse_launch(pipeline_str)
if not self.pipeline:
raise ValueError("Failed to create pipeline")
[docs]
self.appsrc = self.get_appsrc()
if not self.appsrc:
raise ValueError("Pipeline string must contain an element named 'appsrc'")
self.pipeline.set_state(Gst.State.PLAYING)
def __del__(self):
self.stop()
[docs]
def write(self, data: Union[bytes, np.ndarray], meta: dict = None):
if isinstance(data, np.ndarray):
data = data.tobytes()
# PCIe sink requires legitimate metadata. Hence the functionality to
#include it, via the metaparser plugin.
# We have to use metaparser because python cannot embed metadata into the buffer.
if self.isPcie:
if meta is not None:
meta_bytes = create_metadata_struct(meta)
total_size = 4 + len(meta_bytes) + len(data)
buffer = Gst.Buffer.new_allocate(None, total_size, None)
meta_size_bytes = len(meta_bytes).to_bytes(4, byteorder='little')
data = meta_size_bytes + meta_bytes + data
else:
buffer = Gst.Buffer.new_allocate(None, len(data), None)
else:
buffer = Gst.Buffer.new_allocate(None, len(data), None)
buffer.fill(0, data)
self.appsrc.emit('push-buffer', buffer)
[docs]
def isOpened(self) -> bool:
_, state, _ = self.pipeline.get_state(0)
return state == Gst.State.PLAYING
[docs]
def stop(self):
self.pipeline.set_state(Gst.State.NULL)
[docs]
def get_appsrc(self):
for element in self.pipeline.iterate_elements():
if element.get_factory().get_name() == 'appsrc':
return element
return None