Source code for pupil_labs.realtime_api.streaming.nal_unit

import struct
import typing as T


[docs] def extract_payload_from_nal_unit(unit: T.ByteString) -> T.ByteString: """ - prepend NAL unit start code to payload if necessary - handle fragmented units (of type FU-A) Inspired by https://github.com/runtheops/rtsp-rtp/blob/master/transport/primitives/nal_unit.py Rewritten due to license incompatibility. """ start_code = b"\x00\x00\x00\x01" offset = 0 # slice to keep ByteString type; indexing would return int in native byte order first_byte = unit[:1] # ensure network order for conversion to uint8 first_byte = struct.unpack("!B", first_byte)[0] is_first_bit_one = first_byte & 0b10000000 if is_first_bit_one: # See section 1.3 of https://www.ietf.org/rfc/rfc3984.txt raise ValueError("First bit must be zero (forbidden_zero_bit)") nal_type = first_byte & 0b00011111 if nal_type == 28: # Fragmentation unit FU-A # https://www.ietf.org/rfc/rfc3984.txt # Section 5.8. fu_header = unit[1:2] # get second byte while retaining ByteString type fu_header = struct.unpack("!B", fu_header)[0] offset = 2 # skip first two bytes is_fu_start_bit_one = fu_header & 0b10000000 if is_fu_start_bit_one: # reconstruct header of a non-fragmented NAL unit first_byte_bits_1_to_3 = first_byte & 0b11100000 # NAL type of non-fragmented NAL unit: fu_header_bits_4_to_8 = fu_header & 0b00011111 reconstructed_header = first_byte_bits_1_to_3 + fu_header_bits_4_to_8 start_code += bytes((reconstructed_header,)) # convert int to ByteString else: # do not prepend start code to payload since we are in the middle of a # fragmented unit start_code = b"" return start_code + unit[offset:]