Source code for pupil_labs.realtime_api.streaming.video
importbase64importdatetimeimportloggingimporttypingasTimportavimportnumpyasnpimportnumpy.typingasnptfrom.baseimportRTSPRawStreamer,SDPDataNotAvailableErrorfrom.nal_unitimportextract_payload_from_nal_unitlogger=logging.getLogger(__name__)BGRBuffer=npt.NDArray[np.uint8]"""Type annotation for raw BGR image buffers of the scene camera"""
[docs]asyncdefreceive(self)->T.AsyncIterator[VideoFrame]:codec=Noneframe_timestamp=Noneasyncfordatainsuper().receive():ifnotcodec:try:codec=av.CodecContext.create(self.encoding,"r")forparaminself.sprop_parameter_set_payloads:codec.parse(param)exceptSDPDataNotAvailableErroraserr:logger.debug(f"Session description protocol data not available yet: {err}")continue# if pkt is the start of a new fragmented frame, parse will return a packet# containing the data from the previous fragmentsforpacketincodec.parse(extract_payload_from_nal_unit(data.raw)):# use timestamp of previous packetsforav_frameincodec.decode(packet):yieldVideoFrame(av_frame,frame_timestamp)frame_timestamp=data.timestamp_unix_seconds
@propertydefsprop_parameter_set_payloads(self)->T.Optional[T.List[T.ByteString]]:""":raises pupil_labs.realtime_api.streaming.base.SDPDataNotAvailableError:"""ifself._sprop_parameter_set_payloadsisNone:try:attributes=self.reader.session.sdp["medias"][0]["attributes"]sprop_parameter_sets=attributes["fmtp"]["sprop-parameter-sets"]params=(base64.b64decode(param)forparaminsprop_parameter_sets.split(","))self._sprop_parameter_set_payloads=[extract_payload_from_nal_unit(param)forparaminparams]except(IndexError,KeyError)aserr:raiseSDPDataNotAvailableError(f"SDP data is missing {err} field")fromerrreturnself._sprop_parameter_set_payloads