import asyncio
import inspect
import json
import logging
import types
import typing as T
import aiohttp
import numpy as np
import websockets
import pupil_labs # noqa: F401
from .base import DeviceBase
from .models import (
APIPath,
Component,
Event,
Status,
UnknownComponentError,
parse_component,
)
logger = logging.getLogger(__name__)
UpdateCallbackSync = T.Callable[["pupil_labs.realtime_api.models.Component"], None]
"""Type annotation for synchronous update callbacks"""
UpdateCallbackAsync = T.Callable[
["pupil_labs.realtime_api.models.Component"], T.Awaitable[None]
]
"""Type annotation for asynchronous update callbacks"""
UpdateCallback = T.Union[UpdateCallbackSync, UpdateCallbackAsync]
"""Type annotation for synchronous and asynchronous callbacks"""
[docs]
class DeviceError(Exception):
pass
[docs]
class Device(DeviceBase):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._create_client_session()
[docs]
async def get_status(self) -> Status:
"""
:raises pupil_labs.realtime_api.device.DeviceError: if the request fails
"""
async with self.session.get(self.api_url(APIPath.STATUS)) as response:
confirmation = await response.json()
if response.status != 200:
raise DeviceError(response.status, confirmation["message"])
result = confirmation["result"]
logger.debug(f"[{self}.get_status] Received status: {result}")
return Status.from_dict(result)
[docs]
async def status_updates(self) -> T.AsyncIterator[Component]:
# Auto-reconnect, see
# https://websockets.readthedocs.io/en/stable/reference/client.html#websockets.client.connect
websocket_status_endpoint = self.api_url(APIPath.STATUS, protocol="ws")
async for websocket in websockets.connect(websocket_status_endpoint):
try:
async for message_raw in websocket:
message_json = json.loads(message_raw)
try:
component = parse_component(message_json)
except UnknownComponentError:
logger.warning(f"Dropping unknown component: {component}")
continue
yield component
except websockets.ConnectionClosed:
logger.debug("Websocket connection closed. Reconnecting...")
continue
except asyncio.CancelledError:
logger.debug("status_updates() cancelled")
break
[docs]
async def recording_start(self) -> str:
"""
:raises pupil_labs.realtime_api.device.DeviceError:
if the recording could not be started. Possible reasons include
- Recording already running
- Template has required fields
- Low battery
- Low storage
- No wearer selected
- No workspace selected
- Setup bottom sheets not completed
"""
async with self.session.post(self.api_url(APIPath.RECORDING_START)) as response:
confirmation = await response.json()
logger.debug(f"[{self}.start_recording] Received response: {confirmation}")
if response.status != 200:
raise DeviceError(response.status, confirmation["message"])
return confirmation["result"]["id"]
[docs]
async def recording_stop_and_save(self):
"""
:raises pupil_labs.realtime_api.device.DeviceError:
if the recording could not be started
Possible reasons include
- Recording not running
- template has required fields
"""
async with self.session.post(
self.api_url(APIPath.RECORDING_STOP_AND_SAVE)
) as response:
confirmation = await response.json()
logger.debug(f"[{self}.stop_recording] Received response: {confirmation}")
if response.status != 200:
raise DeviceError(response.status, confirmation["message"])
[docs]
async def recording_cancel(self):
"""
:raises pupil_labs.realtime_api.device.DeviceError:
if the recording could not be started
Possible reasons include
- Recording not running
"""
async with self.session.post(
self.api_url(APIPath.RECORDING_CANCEL)
) as response:
confirmation = await response.json()
logger.debug(f"[{self}.stop_recording] Received response: {confirmation}")
if response.status != 200:
raise DeviceError(response.status, confirmation["message"])
[docs]
async def send_event(
self, event_name: str, event_timestamp_unix_ns: T.Optional[int] = None
) -> Event:
"""
:raises pupil_labs.realtime_api.device.DeviceError: if sending the event fails
"""
event: T.Dict[str, T.Any] = {"name": event_name}
if event_timestamp_unix_ns is not None:
event["timestamp"] = event_timestamp_unix_ns
async with self.session.post(
self.api_url(APIPath.EVENT), json=event
) as response:
confirmation = await response.json()
logger.debug(f"[{self}.send_event] Received response: {confirmation}")
if response.status != 200:
raise DeviceError(response.status, confirmation["message"])
return Event.from_dict(confirmation["result"])
[docs]
async def close(self):
await self.session.close()
self.session = None
async def __aenter__(self) -> "Device":
if self.session is None:
self._create_client_session()
return self
async def __aexit__(
self,
exc_type: T.Optional[T.Type[BaseException]],
exc_val: T.Optional[BaseException],
exc_tb: T.Optional[types.TracebackType],
) -> None:
await self.close()
def _create_client_session(self):
self.session = aiohttp.ClientSession()
[docs]
async def get_calibration(self) -> np.ndarray:
"""
:raises pupil_labs.realtime_api.device.DeviceError: if the request fails
"""
async with self.session.get(self.api_url(APIPath.CALIBRATION)) as response:
if response.status != 200:
raise DeviceError(response.status, "Failed to fetch calibration")
raw_data = await response.read()
return np.frombuffer(
raw_data,
np.dtype(
[
("version", "u1"),
("serial", "6a"),
("scene_camera_matrix", "(3,3)d"),
("scene_distortion_coefficients", "8d"),
("scene_extrinsics_affine_matrix", "(4,4)d"),
("right_camera_matrix", "(3,3)d"),
("right_distortion_coefficients", "8d"),
("right_extrinsics_affine_matrix", "(4,4)d"),
("left_camera_matrix", "(3,3)d"),
("left_distortion_coefficients", "8d"),
("left_extrinsics_affine_matrix", "(4,4)d"),
("crc", "u4"),
]
),
)
[docs]
class StatusUpdateNotifier:
def __init__(self, device: Device, callbacks: T.List[UpdateCallback]) -> None:
self._auto_update_task: T.Optional[asyncio.Task] = None
self._device = device
self._callbacks = callbacks
[docs]
async def receive_updates_start(self) -> None:
if self._auto_update_task is not None:
logger.debug("Auto-update already started!")
return
self._auto_update_task = asyncio.create_task(self._auto_update())
[docs]
async def receive_updates_stop(self):
if self._auto_update_task is None:
logger.debug("Auto-update is not running!")
return
self._auto_update_task.cancel()
try:
# wait for the task to be cancelled
await self._auto_update_task
except asyncio.CancelledError:
pass # task has been successfully cancelled
self._auto_update_task = None
async def __aenter__(self):
await self.receive_updates_start()
async def __aexit__(
self,
exc_type: T.Optional[T.Type[BaseException]],
exc_val: T.Optional[BaseException],
exc_tb: T.Optional[types.TracebackType],
):
await self.receive_updates_stop()
async def _auto_update(self) -> None:
async for changed in self._device.status_updates():
for callback in self._callbacks:
result = callback(changed)
if inspect.isawaitable(result):
await result