Asynchronous Examples¶
Note
The examples require Python 3.7+ to run and use the asyncio
framework.
Remote Control¶
Get Current Status¶
1import asyncio
2
3from pupil_labs.realtime_api import Device, Network
4
5
6async def main():
7 async with Network() as network:
8 dev_info = await network.wait_for_new_device(timeout_seconds=5)
9 if dev_info is None:
10 print("No device could be found! Abort")
11 return
12
13 async with Device.from_discovered_device(dev_info) as device:
14 status = await device.get_status()
15
16 print(f"Device IP address: {status.phone.ip}")
17 print(f"Battery level: {status.phone.battery_level} %")
18
19 print(f"Connected glasses: SN {status.hardware.glasses_serial}")
20 print(f"Connected scene camera: SN {status.hardware.world_camera_serial}")
21
22 world = status.direct_world_sensor()
23 print(f"World sensor: connected={world.connected} url={world.url}")
24
25 gaze = status.direct_gaze_sensor()
26 print(f"Gaze sensor: connected={gaze.connected} url={gaze.url}")
27
28
29if __name__ == "__main__":
30 asyncio.run(main())
Status Updates¶
Wait for status updates from the device
1import asyncio
2import contextlib
3
4from pupil_labs.realtime_api import Device, Network
5
6
7async def main():
8 async with Network() as network:
9 dev_info = await network.wait_for_new_device(timeout_seconds=5)
10 if dev_info is None:
11 print("No device could be found! Abort")
12 return
13
14 async with Device.from_discovered_device(dev_info) as device:
15 print("Waiting for status updates... hit ctrl-c to stop.")
16 async for changed in device.status_updates():
17 print(changed)
18
19
20if __name__ == "__main__":
21 with contextlib.suppress(KeyboardInterrupt):
22 asyncio.run(main())
Get a callback when there is a new status updates
1import asyncio
2
3from pupil_labs.realtime_api import Device, Network, StatusUpdateNotifier
4
5
6def print_component(component):
7 print(component)
8
9
10async def main():
11 async with Network() as network:
12 dev_info = await network.wait_for_new_device(timeout_seconds=5)
13 if dev_info is None:
14 print("No device could be found! Abort")
15 return
16
17 async with Device.from_discovered_device(dev_info) as device:
18 duration = 20
19 print(f"Starting auto-update for {duration} seconds")
20 # callbacks can be awaitable, too
21 notifier = StatusUpdateNotifier(device, callbacks=[print_component])
22 await notifier.receive_updates_start()
23 await asyncio.sleep(duration)
24 print("Stopping auto-update")
25 await notifier.receive_updates_stop()
26
27
28if __name__ == "__main__":
29 asyncio.run(main())
Send Event¶
An event without an explicit timestamp, will be timestamped on arrival at the Pupil Invisible Companion device.
1import asyncio
2import time
3
4from pupil_labs.realtime_api import Device, Network
5
6
7async def main():
8 async with Network() as network:
9 dev_info = await network.wait_for_new_device(timeout_seconds=5)
10 if dev_info is None:
11 print("No device could be found! Abort")
12 return
13
14 async with Device.from_discovered_device(dev_info) as device:
15 # send event without timestamp
16 print(await device.send_event("test event"))
17
18 # send event with current timestamp
19 print(
20 await device.send_event(
21 "test event", event_timestamp_unix_ns=time.time_ns()
22 )
23 )
24
25
26if __name__ == "__main__":
27 asyncio.run(main())
Start, stop and save, and cancel recordings¶
1import asyncio
2
3from pupil_labs.realtime_api import Device, Network, StatusUpdateNotifier
4from pupil_labs.realtime_api.models import Recording
5
6
7async def print_recording(component):
8 if isinstance(component, Recording):
9 print(f"Update: {component.message}")
10
11
12async def main():
13 async with Network() as network:
14 dev_info = await network.wait_for_new_device(timeout_seconds=5)
15 if dev_info is None:
16 print("No device could be found! Abort")
17 return
18
19 async with Device.from_discovered_device(dev_info) as device:
20 # get update when recording is fully started
21 notifier = StatusUpdateNotifier(device, callbacks=[print_recording])
22 await notifier.receive_updates_start()
23 recording_id = await device.recording_start()
24 print(f"Initiated recording with id {recording_id}")
25 await asyncio.sleep(5)
26 print("Stopping recording")
27 await device.recording_stop_and_save()
28 # await control.recording_cancel() # uncomment to cancel recording
29 await asyncio.sleep(2) # wait for confirmation via auto-update
30 await notifier.receive_updates_stop()
31
32
33if __name__ == "__main__":
34 asyncio.run(main())
Streaming¶
Gaze Data¶
1import asyncio
2import contextlib
3
4from pupil_labs.realtime_api import Device, Network, receive_gaze_data
5
6
7async def main():
8 async with Network() as network:
9 dev_info = await network.wait_for_new_device(timeout_seconds=5)
10 if dev_info is None:
11 print("No device could be found! Abort")
12 return
13
14 async with Device.from_discovered_device(dev_info) as device:
15 status = await device.get_status()
16 sensor_gaze = status.direct_gaze_sensor()
17 if not sensor_gaze.connected:
18 print(f"Gaze sensor is not connected to {device}")
19 return
20
21 restart_on_disconnect = True
22 async for gaze in receive_gaze_data(
23 sensor_gaze.url, run_loop=restart_on_disconnect
24 ):
25 print(gaze)
26
27
28if __name__ == "__main__":
29 with contextlib.suppress(KeyboardInterrupt):
30 asyncio.run(main())
Scene Camera Video¶
1import asyncio
2import contextlib
3
4import cv2
5
6from pupil_labs.realtime_api import Device, Network, receive_video_frames
7
8
9async def main():
10 async with Network() as network:
11 dev_info = await network.wait_for_new_device(timeout_seconds=5)
12 if dev_info is None:
13 print("No device could be found! Abort")
14 return
15
16 async with Device.from_discovered_device(dev_info) as device:
17 status = await device.get_status()
18 sensor_world = status.direct_world_sensor()
19 if not sensor_world.connected:
20 print(f"Scene camera is not connected to {device}")
21 return
22
23 restart_on_disconnect = True
24 async for frame in receive_video_frames(
25 sensor_world.url, run_loop=restart_on_disconnect
26 ):
27 bgr_buffer = frame.bgr_buffer()
28 draw_time(bgr_buffer, frame.datetime)
29 cv2.imshow("Scene Camera - Press ESC to quit", bgr_buffer)
30 if cv2.waitKey(1) & 0xFF == 27:
31 return
32
33
34def draw_time(frame, time):
35 frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
36 frame_txt_font_scale = 1.0
37 frame_txt_thickness = 1
38
39 # first line: frame index
40 frame_txt = str(time)
41
42 cv2.putText(
43 frame,
44 frame_txt,
45 (20, 50),
46 frame_txt_font_name,
47 frame_txt_font_scale,
48 (255, 255, 255),
49 thickness=frame_txt_thickness,
50 lineType=cv2.LINE_8,
51 )
52
53
54if __name__ == "__main__":
55 with contextlib.suppress(KeyboardInterrupt):
56 asyncio.run(main())
Eyes Camera Video¶
1import asyncio
2import contextlib
3import time
4
5import cv2
6
7from pupil_labs.realtime_api import Device, Network, receive_video_frames
8
9
10async def main(preview_frame_rate=30):
11 async with Network() as network:
12 dev_info = await network.wait_for_new_device(timeout_seconds=5)
13 if dev_info is None:
14 print("No device could be found! Abort")
15 return
16
17 async with Device.from_discovered_device(dev_info) as device:
18 status = await device.get_status()
19 sensor_eyes = status.direct_eyes_sensor()
20 if not sensor_eyes.connected:
21 print(f"Eyes camera is not connected to {device}")
22 return
23
24 restart_on_disconnect = True
25 _last_update = time.perf_counter()
26 async for frame in receive_video_frames(
27 sensor_eyes.url, run_loop=restart_on_disconnect
28 ):
29 bgr_buffer = frame.bgr_buffer()
30 draw_time(bgr_buffer, frame.datetime)
31 cv2.imshow("Eye Cameras - Press ESC to quit", bgr_buffer)
32
33 time_since_last_update = time.perf_counter() - _last_update
34 if time_since_last_update > 1 / preview_frame_rate:
35 if cv2.waitKey(1) & 0xFF == 27:
36 return
37 _last_update = time.perf_counter()
38
39
40def draw_time(frame, time):
41 frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
42 frame_txt_font_scale = 0.5
43 frame_txt_thickness = 1
44
45 # first line: frame index
46 frame_txt = str(time)
47
48 cv2.putText(
49 frame,
50 frame_txt,
51 (20, 50),
52 frame_txt_font_name,
53 frame_txt_font_scale,
54 (255, 255, 255),
55 thickness=frame_txt_thickness,
56 lineType=cv2.LINE_8,
57 )
58
59
60if __name__ == "__main__":
61 with contextlib.suppress(KeyboardInterrupt):
62 asyncio.run(main())
Scene Camera Video With Overlayed Gaze¶
This example processes two streams (video and gaze data) at the same time, matches each video frame with its temporally closest gaze point, and previews both in a window.
1import asyncio
2import contextlib
3import typing as T
4
5import cv2
6
7from pupil_labs.realtime_api import (
8 Device,
9 Network,
10 receive_gaze_data,
11 receive_video_frames,
12)
13
14
15async def main():
16 async with Network() as network:
17 dev_info = await network.wait_for_new_device(timeout_seconds=5)
18 if dev_info is None:
19 print("No device could be found! Abort")
20 return
21
22 async with Device.from_discovered_device(dev_info) as device:
23 print(f"Getting status information from {device}")
24 status = await device.get_status()
25
26 sensor_gaze = status.direct_gaze_sensor()
27 if not sensor_gaze.connected:
28 print(f"Gaze sensor is not connected to {device}")
29 return
30
31 sensor_world = status.direct_world_sensor()
32 if not sensor_world.connected:
33 print(f"Scene camera is not connected to {device}")
34 return
35
36 restart_on_disconnect = True
37
38 queue_video = asyncio.Queue()
39 queue_gaze = asyncio.Queue()
40
41 process_video = asyncio.create_task(
42 enqueue_sensor_data(
43 receive_video_frames(sensor_world.url, run_loop=restart_on_disconnect),
44 queue_video,
45 )
46 )
47 process_gaze = asyncio.create_task(
48 enqueue_sensor_data(
49 receive_gaze_data(sensor_gaze.url, run_loop=restart_on_disconnect),
50 queue_gaze,
51 )
52 )
53 try:
54 await match_and_draw(queue_video, queue_gaze)
55 finally:
56 process_video.cancel()
57 process_gaze.cancel()
58
59
60async def enqueue_sensor_data(sensor: T.AsyncIterator, queue: asyncio.Queue) -> None:
61 async for datum in sensor:
62 try:
63 queue.put_nowait((datum.datetime, datum))
64 except asyncio.QueueFull:
65 print(f"Queue is full, dropping {datum}")
66
67
68async def match_and_draw(queue_video, queue_gaze):
69 while True:
70 video_datetime, video_frame = await get_most_recent_item(queue_video)
71 _, gaze_datum = await get_closest_item(queue_gaze, video_datetime)
72
73 bgr_buffer = video_frame.to_ndarray(format="bgr24")
74
75 cv2.circle(
76 bgr_buffer,
77 (int(gaze_datum.x), int(gaze_datum.y)),
78 radius=80,
79 color=(0, 0, 255),
80 thickness=15,
81 )
82
83 cv2.imshow("Scene camera with gaze overlay", bgr_buffer)
84 cv2.waitKey(1)
85
86
87async def get_most_recent_item(queue):
88 item = await queue.get()
89 while True:
90 try:
91 next_item = queue.get_nowait()
92 except asyncio.QueueEmpty:
93 return item
94 else:
95 item = next_item
96
97
98async def get_closest_item(queue, timestamp):
99 item_ts, item = await queue.get()
100 # assumes monotonically increasing timestamps
101 if item_ts > timestamp:
102 return item_ts, item
103 while True:
104 try:
105 next_item_ts, next_item = queue.get_nowait()
106 except asyncio.QueueEmpty:
107 return item_ts, item
108 else:
109 if next_item_ts > timestamp:
110 return next_item_ts, next_item
111 item_ts, item = next_item_ts, next_item
112
113
114if __name__ == "__main__":
115 with contextlib.suppress(KeyboardInterrupt):
116 asyncio.run(main())
Device Discovery¶
1import asyncio
2import contextlib
3
4from pupil_labs.realtime_api.discovery import Network, discover_devices
5
6
7async def main():
8 async with Network() as network:
9 print("Looking for the next best device...\n\t", end="")
10 print(await network.wait_for_new_device(timeout_seconds=5))
11
12 print("---")
13 print("All devices after searching for additional 5 seconds:")
14 await asyncio.sleep(5)
15 print(network.devices)
16
17 print("---")
18 print("Starting new, indefinitive search... hit ctrl-c to stop.")
19 # optionally set timeout_seconds argument to limit search duration
20 async for device_info in discover_devices():
21 print(f"\t{device_info}")
22
23
24if __name__ == "__main__":
25 with contextlib.suppress(KeyboardInterrupt):
26 asyncio.run(main())
Time Offset Estimation¶
See pupil_labs.realtime_api.time_echo
for details.
1import asyncio
2
3from pupil_labs.realtime_api import Device, Network
4from pupil_labs.realtime_api.time_echo import TimeOffsetEstimator
5
6
7async def main():
8 async with Network() as network:
9 dev_info = await network.wait_for_new_device(timeout_seconds=5)
10 if dev_info is None:
11 print("No device could be found! Abort")
12 return
13
14 async with Device.from_discovered_device(dev_info) as device:
15 status = await device.get_status()
16
17 print(f"Device IP address: {status.phone.ip}")
18 print(f"Device Time Echo port: {status.phone.time_echo_port}")
19
20 if status.phone.time_echo_port is None:
21 print(
22 "You Pupil Invisible Companion app is out-of-date and does not yet "
23 "support the Time Echo protocol. Upgrade to version 1.4.28 or newer."
24 )
25 return
26
27 time_offset_estimator = TimeOffsetEstimator(
28 status.phone.ip, status.phone.time_echo_port
29 )
30 estimated_offset = await time_offset_estimator.estimate()
31 print(f"Mean time offset: {estimated_offset.time_offset_ms.mean} ms")
32 print(
33 "Mean roundtrip duration: "
34 f"{estimated_offset.roundtrip_duration_ms.mean} ms"
35 )
36
37
38if __name__ == "__main__":
39 asyncio.run(main())