Source code for pupil_labs.realtime_api.streaming.gaze

import datetime
import logging
import struct
import typing as T

from .base import RTSPData, RTSPRawStreamer

logger = logging.getLogger(__name__)


[docs] class Point(T.NamedTuple): x: float y: float
[docs] class GazeData(T.NamedTuple): x: float y: float worn: bool timestamp_unix_seconds: float
[docs] @classmethod def from_raw(cls, data: RTSPData) -> "GazeData": x, y, worn = struct.unpack("!ffB", data.raw) return cls(x, y, worn == 255, data.timestamp_unix_seconds)
@property def datetime(self): return datetime.datetime.fromtimestamp(self.timestamp_unix_seconds) @property def timestamp_unix_ns(self): return int(self.timestamp_unix_seconds * 1e9)
class DualMonocularGazeData(T.NamedTuple): """EXPERIMENTAL CLASS""" left: Point right: Point worn: bool timestamp_unix_seconds: float @classmethod def from_raw(cls, data: RTSPData) -> "GazeData": x1, y1, worn, x2, y2 = struct.unpack("!ffBff", data.raw) return cls( Point(x1, y1), Point(x2, y2), worn == 255, data.timestamp_unix_seconds ) @property def datetime(self): return datetime.datetime.fromtimestamp(self.timestamp_unix_seconds) @property def timestamp_unix_ns(self): return int(self.timestamp_unix_seconds * 1e9)
[docs] class EyestateGazeData(T.NamedTuple): x: float y: float worn: bool pupil_diameter_left: float eyeball_center_left_x: float eyeball_center_left_y: float eyeball_center_left_z: float optical_axis_left_x: float optical_axis_left_y: float optical_axis_left_z: float pupil_diameter_right: float eyeball_center_right_x: float eyeball_center_right_y: float eyeball_center_right_z: float optical_axis_right_x: float optical_axis_right_y: float optical_axis_right_z: float timestamp_unix_seconds: float
[docs] @classmethod def from_raw(cls, data: RTSPData) -> "EyestateGazeData": ( x, y, worn, pupil_diameter_left, eyeball_center_left_x, eyeball_center_left_y, eyeball_center_left_z, optical_axis_left_x, optical_axis_left_y, optical_axis_left_z, pupil_diam_right, eyeball_center_right_x, eyeball_center_right_y, eyeball_center_right_z, optical_axis_right_x, optical_axis_right_y, optical_axis_right_z, ) = struct.unpack("!ffBffffffffffffff", data.raw) return cls( x, y, worn == 255, pupil_diameter_left, eyeball_center_left_x, eyeball_center_left_y, eyeball_center_left_z, optical_axis_left_x, optical_axis_left_y, optical_axis_left_z, pupil_diam_right, eyeball_center_right_x, eyeball_center_right_y, eyeball_center_right_z, optical_axis_right_x, optical_axis_right_y, optical_axis_right_z, data.timestamp_unix_seconds, )
@property def datetime(self): return datetime.datetime.fromtimestamp(self.timestamp_unix_seconds) @property def timestamp_unix_ns(self): return int(self.timestamp_unix_seconds * 1e9)
[docs] async def receive_gaze_data( url, *args, **kwargs ) -> T.AsyncIterator[T.Union[GazeData, DualMonocularGazeData, EyestateGazeData]]: async with RTSPGazeStreamer(url, *args, **kwargs) as streamer: async for datum in streamer.receive(): yield datum
[docs] class RTSPGazeStreamer(RTSPRawStreamer):
[docs] async def receive( self, ) -> T.AsyncIterator[T.Union[GazeData, DualMonocularGazeData, EyestateGazeData]]: data_class_by_raw_len = { 9: GazeData, 17: DualMonocularGazeData, 65: EyestateGazeData, } async for data in super().receive(): try: cls = data_class_by_raw_len[len(data.raw)] yield cls.from_raw(data) except KeyError: logger.exception(f"Raw gaze data has unexpected length: {data}") raise except Exception: logger.exception(f"Unable to parse gaze data {data}") raise