import dataclasses
import datetime
import enum
import logging
import typing as T
try:
from typing import Literal
except ImportError:
# FIXME: Remove when dropping py3.7 support
from typing_extensions import Literal
logger = logging.getLogger(__name__)
[docs]
class APIPath(enum.Enum):
STATUS = "/status"
RECORDING_START = "/recording:start"
RECORDING_STOP_AND_SAVE = "/recording:stop_and_save"
RECORDING_CANCEL = "/recording:cancel"
EVENT = "/event"
CALIBRATION = "/../calibration.bin"
[docs]
def full_address(
self, address: str, port: int, protocol: str = "http", prefix: str = "/api"
) -> str:
return f"{protocol}://{address}:{port}" + prefix + self.value
[docs]
class DiscoveredDeviceInfo(T.NamedTuple):
name: str
"""Full mDNS service name.
Follows ``'PI monitor:<phone name>:<hardware id>._http._tcp.local.'`` naming pattern
"""
server: str
"e.g. ``'pi.local.'``"
port: int
"e.g. `8080`"
addresses: T.List[str]
"e.g. ``['192.168.0.2']``"
[docs]
class Event(T.NamedTuple):
name: T.Optional[str]
recording_id: T.Optional[str]
timestamp: int # unix epoch, in nanoseconds
[docs]
@classmethod
def from_dict(cls, dct: T.Dict[str, T.Any]) -> "Event":
return cls(
name=dct.get("name"),
recording_id=dct.get("recording_id"),
timestamp=dct["timestamp"],
)
@property
def datetime(self) -> datetime.datetime:
return datetime.datetime.fromtimestamp(self.timestamp / 1e9)
def __repr__(self) -> str:
return (
f"Event(name={self.name} "
f"recording_id={self.recording_id} "
f"timestamp_unix_ns={self.timestamp} "
f"datetime={self.datetime})"
)
[docs]
class Phone(T.NamedTuple):
battery_level: int
battery_state: Literal["OK", "LOW", "CRITICAL"]
device_id: str
device_name: str
ip: str
memory: int
memory_state: Literal["OK", "LOW", "CRITICAL"]
time_echo_port: T.Optional[int] = None
[docs]
class Hardware(T.NamedTuple):
version: str = "unknown"
glasses_serial: str = "unknown"
world_camera_serial: str = "unknown"
module_serial: str = "unknown"
[docs]
class NetworkDevice(T.NamedTuple):
"""Information about devices discovered by the host device, not the client.
.. note::
This class represents device information made available via the websocket update
connection by the host device (exposed via
:py:meth:`pupil_labs.realtime_api.device.Device.status_updates`). Devices
discovered directly by this library are represented as
:py:class:`.DiscoveredDeviceInfo` and returned by
:py:func:`pupil_labs.realtime_api.discovery.discover_devices` and
:py:class:`pupil_labs.realtime_api.discovery.Network`.
"""
ip: str
device_id: str
device_name: str
connected: bool
[docs]
class Sensor(T.NamedTuple):
sensor: str
conn_type: str
connected: bool = False
ip: T.Optional[str] = None
params: T.Optional[str] = None
port: T.Optional[int] = None
protocol: str = "rtsp"
@property
def url(self) -> T.Optional[str]:
if self.connected:
return f"{self.protocol}://{self.ip}:{self.port}/?{self.params}"
return None
[docs]
class Name(enum.Enum):
ANY = None
GAZE = "gaze"
WORLD = "world"
IMU = "imu"
EYES = "eyes"
[docs]
class Connection(enum.Enum):
ANY = None
WEBSOCKET = "WEBSOCKET"
DIRECT = "DIRECT"
[docs]
class Recording(T.NamedTuple):
action: str
id: str
message: str
rec_duration_ns: int
@property
def rec_duration_seconds(self) -> float:
return self.rec_duration_ns / 1e9
Component = T.Union[Phone, Hardware, Sensor, Recording, NetworkDevice]
"""Type annotation for :py:class:`Status <.Status>` components."""
ComponentRaw = T.Dict[str, T.Any]
"""Type annotation for json-parsed responses from the REST and Websocket API."""
_model_class_map: T.Dict[str, T.Type[Component]] = {
"Phone": Phone,
"Hardware": Hardware,
"Sensor": Sensor,
"Recording": Recording,
"Event": Event,
"NetworkDevice": NetworkDevice,
}
def _init_cls_with_annotated_fields_only(cls, d: T.Dict[str, T.Any]):
return cls(**{attr: d.get(attr, None) for attr in cls.__annotations__})
[docs]
class UnknownComponentError(ValueError):
pass
[docs]
def parse_component(raw: ComponentRaw) -> Component:
"""Initialize an explicitly modelled representation
(:py:obj:`pupil_labs.realtime_api.models.Component`) from the json-parsed dictionary
(:py:obj:`pupil_labs.realtime_api.models.ComponentRaw`) received from the API.
:raises UnknownComponentError: if the component name cannot be mapped to an
explicitly modelled class or the contained data does not fit the modelled
fields.
"""
model_name = raw["model"]
data = raw["data"]
try:
model_class = _model_class_map[model_name]
return _init_cls_with_annotated_fields_only(model_class, data)
except KeyError as err:
raise UnknownComponentError(
f"Could not generate component for {model_name} from {data}"
) from err
[docs]
@dataclasses.dataclass
class Status:
"Represents the Companion's full status"
phone: Phone
hardware: Hardware
sensors: T.List[Sensor]
recording: T.Optional[Recording]
[docs]
@classmethod
def from_dict(cls, status_json_result: T.List[ComponentRaw]) -> "Status":
phone = None # always present
recording = None # might not be present
hardware = Hardware() # won't be present if glasses are not connected
sensors = []
for dct in status_json_result:
try:
component = parse_component(dct)
except UnknownComponentError:
logger.warning(f"Dropping unknown component: {dct}")
continue
if isinstance(component, Phone):
phone = component
elif isinstance(component, Hardware):
hardware = component
elif isinstance(component, Sensor):
sensors.append(component)
elif isinstance(component, Recording):
recording = component
elif isinstance(component, NetworkDevice):
pass # no need to handle NetworkDevice updates here
else:
logger.warning(f"Unknown model class: {type(component).__name__}")
sensors.sort(key=lambda s: (not s.connected, s.conn_type, s.sensor))
return cls(phone, hardware, sensors, recording)
[docs]
def update(self, component: Component) -> None:
if isinstance(component, Phone):
self.phone = component
elif isinstance(component, Hardware):
self.hardware = component
elif isinstance(component, Recording):
self.recording = component
elif isinstance(component, Sensor):
for idx, sensor in enumerate(self.sensors):
if (
sensor.sensor == component.sensor
and sensor.conn_type == component.conn_type
):
self.sensors[idx] = component
break
[docs]
def matching_sensors(self, name: Sensor.Name, connection: Sensor.Connection):
for sensor in self.sensors:
if name is not Sensor.Name.ANY and sensor.sensor != name.value:
continue
if (
connection is not Sensor.Connection.ANY
and sensor.conn_type != connection.value
):
continue
yield sensor
[docs]
def direct_world_sensor(self) -> T.Optional[Sensor]:
return next(
self.matching_sensors(Sensor.Name.WORLD, Sensor.Connection.DIRECT),
Sensor(
sensor=Sensor.Name.WORLD.value, conn_type=Sensor.Connection.DIRECT.value
),
)
[docs]
def direct_gaze_sensor(self) -> T.Optional[Sensor]:
return next(
self.matching_sensors(Sensor.Name.GAZE, Sensor.Connection.DIRECT),
Sensor(
sensor=Sensor.Name.GAZE.value, conn_type=Sensor.Connection.DIRECT.value
),
)
[docs]
def direct_imu_sensor(self) -> T.Optional[Sensor]:
return next(
self.matching_sensors(Sensor.Name.IMU, Sensor.Connection.DIRECT),
Sensor(
sensor=Sensor.Name.IMU.value, conn_type=Sensor.Connection.DIRECT.value
),
)
[docs]
def direct_eyes_sensor(self) -> T.Optional[Sensor]:
return next(
self.matching_sensors(Sensor.Name.EYES, Sensor.Connection.DIRECT),
Sensor(
sensor=Sensor.Name.EYES.value, conn_type=Sensor.Connection.DIRECT.value
),
)