Source code for pupil_labs.realtime_api.streaming.eye_events

import datetime
import logging
import struct
import typing as T

from .base import RTSPData, RTSPRawStreamer

logger = logging.getLogger(__name__)


[docs] class BlinkEventData(T.NamedTuple): event_type: int start_time_ns: int end_time_ns: int rtp_ts_unix_seconds: float
[docs] @classmethod def from_raw(cls, data: RTSPData) -> "BlinkEventData": ( event_type, start_time_ns, end_time_ns, ) = struct.unpack("!iqq", data.raw) return cls(event_type, start_time_ns, end_time_ns, data.timestamp_unix_seconds)
@property def datetime(self): return datetime.datetime.fromtimestamp(self.rtp_ts_unix_seconds) @property def timestamp_unix_ns(self): return int(self.rtp_ts_unix_seconds * 1e9)
[docs] class FixationEventData(T.NamedTuple): event_type: int # 0: Saccade, 1: Fixation start_time_ns: int end_time_ns: int start_gaze_x: float start_gaze_y: float end_gaze_x: float end_gaze_y: float mean_gaze_x: float mean_gaze_y: float amplitude_pixels: float amplitude_angle_deg: float mean_velocity: float max_velocity: float rtp_ts_unix_seconds: float
[docs] @classmethod def from_raw(cls, data: RTSPData) -> "FixationEventData": ( event_type, start_time_ns, end_time_ns, start_gaze_x, start_gaze_y, end_gaze_x, end_gaze_y, mean_gaze_x, mean_gaze_y, amplitude_pixels, amplitude_angle_deg, mean_velocity, max_velocity, ) = struct.unpack("!iqqffffffffff", data.raw) return cls( event_type, start_time_ns, end_time_ns, start_gaze_x, start_gaze_y, end_gaze_x, end_gaze_y, mean_gaze_x, mean_gaze_y, amplitude_pixels, amplitude_angle_deg, mean_velocity, max_velocity, data.timestamp_unix_seconds, )
@property def datetime(self): return datetime.datetime.fromtimestamp(self.rtp_ts_unix_seconds) @property def timestamp_unix_ns(self): return int(self.rtp_ts_unix_seconds * 1e9)
[docs] class FixationOnsetEventData(T.NamedTuple): event_type: int # 0: Saccade, 1: Fixation start_time_ns: int rtp_ts_unix_seconds: float
[docs] @classmethod def from_raw(cls, data: RTSPData) -> "FixationOnsetEventData": ( event_type, start_time_ns, ) = struct.unpack("!iq", data.raw) return cls(event_type, start_time_ns, data.timestamp_unix_seconds)
@property def datetime(self): return datetime.datetime.fromtimestamp(self.rtp_ts_unix_seconds) @property def timestamp_unix_ns(self): return int(self.rtp_ts_unix_seconds * 1e9)
[docs] async def receive_eye_events_data( url, *args, **kwargs ) -> T.AsyncIterator[T.Union[FixationEventData]]: async with RTSPEyeEventStreamer(url, *args, **kwargs) as streamer: async for datum in streamer.receive(): yield datum
[docs] class RTSPEyeEventStreamer(RTSPRawStreamer):
[docs] async def receive( self, ) -> T.AsyncIterator[T.Union[FixationEventData, BlinkEventData]]: data_class_by_type = { 0: FixationEventData, 1: FixationEventData, 2: FixationOnsetEventData, 3: FixationOnsetEventData, 4: BlinkEventData, 5: None, # KEEPALIVE MSG, SKIP } async for data in super().receive(): try: event_type = struct.unpack_from("!i", data.raw)[0] cls = data_class_by_type[event_type] if cls is not None: yield cls.from_raw(data) except KeyError: logger.exception(f"Raw eye event data has unexpected type: {data}") raise except Exception: logger.exception(f"Unable to parse eye event data {data}") raise