Asynchronous Examples#
Note
The examples require Python 3.7+ to run and use the asyncio
framework.
Remote Control#
Get Current Status#
1import asyncio
2
3from pupil_labs.realtime_api import Device, Network
4
5
6async def main():
7 async with Network() as network:
8 dev_info = await network.wait_for_new_device(timeout_seconds=5)
9 if dev_info is None:
10 print("No device could be found! Abort")
11 return
12
13 async with Device.from_discovered_device(dev_info) as device:
14 status = await device.get_status()
15
16 print(f"Device IP address: {status.phone.ip}")
17 print(f"Battery level: {status.phone.battery_level} %")
18
19 print(f"Connected glasses: SN {status.hardware.glasses_serial}")
20 print(f"Connected scene camera: SN {status.hardware.world_camera_serial}")
21
22 world = status.direct_world_sensor()
23 print(f"World sensor: connected={world.connected} url={world.url}")
24
25 gaze = status.direct_gaze_sensor()
26 print(f"Gaze sensor: connected={gaze.connected} url={gaze.url}")
27
28
29if __name__ == "__main__":
30 asyncio.run(main())
Status Updates#
Wait for status updates from the device
1import asyncio
2import contextlib
3
4from pupil_labs.realtime_api import Device, Network
5
6
7async def main():
8 async with Network() as network:
9 dev_info = await network.wait_for_new_device(timeout_seconds=5)
10 if dev_info is None:
11 print("No device could be found! Abort")
12 return
13
14 async with Device.from_discovered_device(dev_info) as device:
15 print("Waiting for status updates... hit ctrl-c to stop.")
16 async for changed in device.status_updates():
17 print(changed)
18
19
20if __name__ == "__main__":
21 with contextlib.suppress(KeyboardInterrupt):
22 asyncio.run(main())
Get a callback when there is a new status updates
1import asyncio
2
3from pupil_labs.realtime_api import Device, Network, StatusUpdateNotifier
4
5
6def print_component(component):
7 print(component)
8
9
10async def main():
11 async with Network() as network:
12 dev_info = await network.wait_for_new_device(timeout_seconds=5)
13 if dev_info is None:
14 print("No device could be found! Abort")
15 return
16
17 async with Device.from_discovered_device(dev_info) as device:
18 duration = 20
19 print(f"Starting auto-update for {duration} seconds")
20 # callbacks can be awaitable, too
21 notifier = StatusUpdateNotifier(device, callbacks=[print_component])
22 await notifier.receive_updates_start()
23 await asyncio.sleep(duration)
24 print("Stopping auto-update")
25 await notifier.receive_updates_stop()
26
27
28if __name__ == "__main__":
29 asyncio.run(main())
Send Event#
An event without an explicit timestamp, will be timestamped on arrival at the Pupil Invisible Companion device.
1import asyncio
2import time
3
4from pupil_labs.realtime_api import Device, Network
5
6
7async def main():
8 async with Network() as network:
9 dev_info = await network.wait_for_new_device(timeout_seconds=5)
10 if dev_info is None:
11 print("No device could be found! Abort")
12 return
13
14 async with Device.from_discovered_device(dev_info) as device:
15 # send event without timestamp
16 print(await device.send_event("test event"))
17
18 # send event with current timestamp
19 print(
20 await device.send_event(
21 "test event", event_timestamp_unix_ns=time.time_ns()
22 )
23 )
24
25
26if __name__ == "__main__":
27 asyncio.run(main())
Start, stop and save, and cancel recordings#
1import asyncio
2
3from pupil_labs.realtime_api import Device, Network, StatusUpdateNotifier
4from pupil_labs.realtime_api.models import Recording
5
6
7async def print_recording(component):
8 if isinstance(component, Recording):
9 print(f"Update: {component.message}")
10
11
12async def main():
13 async with Network() as network:
14 dev_info = await network.wait_for_new_device(timeout_seconds=5)
15 if dev_info is None:
16 print("No device could be found! Abort")
17 return
18
19 async with Device.from_discovered_device(dev_info) as device:
20 # get update when recording is fully started
21 notifier = StatusUpdateNotifier(device, callbacks=[print_recording])
22 await notifier.receive_updates_start()
23 recording_id = await device.recording_start()
24 print(f"Initiated recording with id {recording_id}")
25 await asyncio.sleep(5)
26 print("Stopping recording")
27 await device.recording_stop_and_save()
28 # await control.recording_cancel() # uncomment to cancel recording
29 await asyncio.sleep(2) # wait for confirmation via auto-update
30 await notifier.receive_updates_stop()
31
32
33if __name__ == "__main__":
34 asyncio.run(main())
Streaming#
Gaze Data#
1import asyncio
2import contextlib
3
4from pupil_labs.realtime_api import Device, Network, receive_gaze_data
5
6
7async def main():
8 async with Network() as network:
9 dev_info = await network.wait_for_new_device(timeout_seconds=5)
10 if dev_info is None:
11 print("No device could be found! Abort")
12 return
13
14 async with Device.from_discovered_device(dev_info) as device:
15 status = await device.get_status()
16 sensor_gaze = status.direct_gaze_sensor()
17 if not sensor_gaze.connected:
18 print(f"Gaze sensor is not connected to {device}")
19 return
20
21 restart_on_disconnect = True
22 async for gaze in receive_gaze_data(
23 sensor_gaze.url, run_loop=restart_on_disconnect
24 ):
25 print(gaze)
26
27
28if __name__ == "__main__":
29 with contextlib.suppress(KeyboardInterrupt):
30 asyncio.run(main())
Scene Camera Video#
1import asyncio
2import contextlib
3
4import cv2
5
6from pupil_labs.realtime_api import Device, Network, receive_video_frames
7
8
9async def main():
10 async with Network() as network:
11 dev_info = await network.wait_for_new_device(timeout_seconds=5)
12 if dev_info is None:
13 print("No device could be found! Abort")
14 return
15
16 async with Device.from_discovered_device(dev_info) as device:
17 status = await device.get_status()
18 sensor_world = status.direct_world_sensor()
19 if not sensor_world.connected:
20 print(f"Scene camera is not connected to {device}")
21 return
22
23 restart_on_disconnect = True
24 async for frame in receive_video_frames(
25 sensor_world.url, run_loop=restart_on_disconnect
26 ):
27 bgr_buffer = frame.bgr_buffer()
28 draw_time(bgr_buffer, frame.datetime)
29 cv2.imshow("Scene Camera - Press ESC to quit", bgr_buffer)
30 if cv2.waitKey(1) & 0xFF == 27:
31 return
32
33
34def draw_time(frame, time):
35 frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
36 frame_txt_font_scale = 1.0
37 frame_txt_thickness = 1
38
39 # first line: frame index
40 frame_txt = str(time)
41
42 cv2.putText(
43 frame,
44 frame_txt,
45 (20, 50),
46 frame_txt_font_name,
47 frame_txt_font_scale,
48 (255, 255, 255),
49 thickness=frame_txt_thickness,
50 lineType=cv2.LINE_8,
51 )
52
53
54if __name__ == "__main__":
55 with contextlib.suppress(KeyboardInterrupt):
56 asyncio.run(main())
Eyes Camera Video#
1import asyncio
2import contextlib
3import time
4
5import cv2
6
7from pupil_labs.realtime_api import Device, Network, receive_video_frames
8
9
10async def main(preview_frame_rate=30):
11 async with Network() as network:
12 dev_info = await network.wait_for_new_device(timeout_seconds=5)
13 if dev_info is None:
14 print("No device could be found! Abort")
15 return
16
17 async with Device.from_discovered_device(dev_info) as device:
18 status = await device.get_status()
19 sensor_eyes = status.direct_eyes_sensor()
20 if not sensor_eyes.connected:
21 print(f"Eyes camera is not connected to {device}")
22 return
23
24 restart_on_disconnect = True
25 _last_update = time.perf_counter()
26 async for frame in receive_video_frames(
27 sensor_eyes.url, run_loop=restart_on_disconnect
28 ):
29 bgr_buffer = frame.bgr_buffer()
30 draw_time(bgr_buffer, frame.datetime)
31 cv2.imshow("Eye Cameras - Press ESC to quit", bgr_buffer)
32
33 time_since_last_update = time.perf_counter() - _last_update
34 if time_since_last_update > 1 / preview_frame_rate:
35 if cv2.waitKey(1) & 0xFF == 27:
36 return
37 _last_update = time.perf_counter()
38
39
40def draw_time(frame, time):
41 frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
42 frame_txt_font_scale = 0.5
43 frame_txt_thickness = 1
44
45 # first line: frame index
46 frame_txt = str(time)
47
48 cv2.putText(
49 frame,
50 frame_txt,
51 (20, 50),
52 frame_txt_font_name,
53 frame_txt_font_scale,
54 (255, 255, 255),
55 thickness=frame_txt_thickness,
56 lineType=cv2.LINE_8,
57 )
58
59
60if __name__ == "__main__":
61 with contextlib.suppress(KeyboardInterrupt):
62 asyncio.run(main())
Scene Camera Video With Overlayed Gaze#
This example processes two streams (video and gaze data) at the same time, matches each video frame with its temporally closest gaze point, and previews both in a window.
1import asyncio
2import contextlib
3import typing as T
4
5import cv2
6
7from pupil_labs.realtime_api import (
8 Device,
9 Network,
10 receive_gaze_data,
11 receive_video_frames,
12)
13
14
15async def main():
16 async with Network() as network:
17 dev_info = await network.wait_for_new_device(timeout_seconds=5)
18 if dev_info is None:
19 print("No device could be found! Abort")
20 return
21
22 async with Device.from_discovered_device(dev_info) as device:
23 print(f"Getting status information from {device}")
24 status = await device.get_status()
25
26 sensor_gaze = status.direct_gaze_sensor()
27 if not sensor_gaze.connected:
28 print(f"Gaze sensor is not connected to {device}")
29 return
30
31 sensor_world = status.direct_world_sensor()
32 if not sensor_world.connected:
33 print(f"Scene camera is not connected to {device}")
34 return
35
36 restart_on_disconnect = True
37
38 queue_video = asyncio.Queue()
39 queue_gaze = asyncio.Queue()
40
41 process_video = asyncio.create_task(
42 enqueue_sensor_data(
43 receive_video_frames(sensor_world.url, run_loop=restart_on_disconnect),
44 queue_video,
45 )
46 )
47 process_gaze = asyncio.create_task(
48 enqueue_sensor_data(
49 receive_gaze_data(sensor_gaze.url, run_loop=restart_on_disconnect),
50 queue_gaze,
51 )
52 )
53 try:
54 await match_and_draw(queue_video, queue_gaze)
55 finally:
56 process_video.cancel()
57 process_gaze.cancel()
58
59
60async def enqueue_sensor_data(sensor: T.AsyncIterator, queue: asyncio.Queue) -> None:
61 async for datum in sensor:
62 try:
63 queue.put_nowait((datum.datetime, datum))
64 except asyncio.QueueFull:
65 print(f"Queue is full, dropping {datum}")
66
67
68async def match_and_draw(queue_video, queue_gaze):
69 while True:
70 video_datetime, video_frame = await get_most_recent_item(queue_video)
71 _, gaze_datum = await get_closest_item(queue_gaze, video_datetime)
72
73 bgr_buffer = video_frame.to_ndarray(format="bgr24")
74
75 cv2.circle(
76 bgr_buffer,
77 (int(gaze_datum.x), int(gaze_datum.y)),
78 radius=80,
79 color=(0, 0, 255),
80 thickness=15,
81 )
82
83 cv2.imshow("Scene camera with gaze overlay", bgr_buffer)
84 cv2.waitKey(1)
85
86
87async def get_most_recent_item(queue):
88 item = await queue.get()
89 while True:
90 try:
91 next_item = queue.get_nowait()
92 except asyncio.QueueEmpty:
93 return item
94 else:
95 item = next_item
96
97
98async def get_closest_item(queue, timestamp):
99 item_ts, item = await queue.get()
100 # assumes monotonically increasing timestamps
101 if item_ts > timestamp:
102 return item_ts, item
103 while True:
104 try:
105 next_item_ts, next_item = queue.get_nowait()
106 except asyncio.QueueEmpty:
107 return item_ts, item
108 else:
109 if next_item_ts > timestamp:
110 return next_item_ts, next_item
111 item_ts, item = next_item_ts, next_item
112
113
114if __name__ == "__main__":
115 with contextlib.suppress(KeyboardInterrupt):
116 asyncio.run(main())
Device Discovery#
1import asyncio
2import contextlib
3
4from pupil_labs.realtime_api.discovery import Network, discover_devices
5
6
7async def main():
8 async with Network() as network:
9 print("Looking for the next best device...\n\t", end="")
10 print(await network.wait_for_new_device(timeout_seconds=5))
11
12 print("---")
13 print("All devices after searching for additional 5 seconds:")
14 await asyncio.sleep(5)
15 print(network.devices)
16
17 print("---")
18 print("Starting new, indefinitive search... hit ctrl-c to stop.")
19 # optionally set timeout_seconds argument to limit search duration
20 async for device_info in discover_devices():
21 print(f"\t{device_info}")
22
23
24if __name__ == "__main__":
25 with contextlib.suppress(KeyboardInterrupt):
26 asyncio.run(main())
Time Offset Estimation#
See pupil_labs.realtime_api.time_echo
for details.
1import asyncio
2
3from pupil_labs.realtime_api import Device, Network
4from pupil_labs.realtime_api.time_echo import TimeOffsetEstimator
5
6
7async def main():
8 async with Network() as network:
9 dev_info = await network.wait_for_new_device(timeout_seconds=5)
10 if dev_info is None:
11 print("No device could be found! Abort")
12 return
13
14 async with Device.from_discovered_device(dev_info) as device:
15 status = await device.get_status()
16
17 print(f"Device IP address: {status.phone.ip}")
18 print(f"Device Time Echo port: {status.phone.time_echo_port}")
19
20 if status.phone.time_echo_port is None:
21 print(
22 "You Pupil Invisible Companion app is out-of-date and does not yet "
23 "support the Time Echo protocol. Upgrade to version 1.4.28 or newer."
24 )
25 return
26
27 time_offset_estimator = TimeOffsetEstimator(
28 status.phone.ip, status.phone.time_echo_port
29 )
30 estimated_offset = await time_offset_estimator.estimate()
31 print(f"Mean time offset: {estimated_offset.time_offset_ms.mean} ms")
32 print(
33 "Mean roundtrip duration: "
34 f"{estimated_offset.roundtrip_duration_ms.mean} ms"
35 )
36
37
38if __name__ == "__main__":
39 asyncio.run(main())