Asynchronous Examples¶
Note
The examples require Python 3.7+ to run and use the asyncio
framework.
Remote Control¶
Get Current Status¶
1import asyncio
2
3from pupil_labs.realtime_api import Device, Network
4
5
6async def main():
7 async with Network() as network:
8 dev_info = await network.wait_for_new_device(timeout_seconds=5)
9 if dev_info is None:
10 print("No device could be found! Abort")
11 return
12
13 async with Device.from_discovered_device(dev_info) as device:
14 status = await device.get_status()
15
16 print(f"Device IP address: {status.phone.ip}")
17 print(f"Battery level: {status.phone.battery_level} %")
18
19 print(f"Connected glasses: SN {status.hardware.glasses_serial}")
20 print(f"Connected scene camera: SN {status.hardware.world_camera_serial}")
21
22 world = status.direct_world_sensor()
23 print(f"World sensor: connected={world.connected} url={world.url}")
24
25 gaze = status.direct_gaze_sensor()
26 print(f"Gaze sensor: connected={gaze.connected} url={gaze.url}")
27
28
29if __name__ == "__main__":
30 asyncio.run(main())
Status Updates¶
Wait for status updates from the device
1import asyncio
2import contextlib
3
4from pupil_labs.realtime_api import Device, Network
5
6
7async def main():
8 async with Network() as network:
9 dev_info = await network.wait_for_new_device(timeout_seconds=5)
10 if dev_info is None:
11 print("No device could be found! Abort")
12 return
13
14 async with Device.from_discovered_device(dev_info) as device:
15 print("Waiting for status updates... hit ctrl-c to stop.")
16 async for changed in device.status_updates():
17 print(changed)
18
19
20if __name__ == "__main__":
21 with contextlib.suppress(KeyboardInterrupt):
22 asyncio.run(main())
Get a callback when there is a new status updates
1import asyncio
2
3from pupil_labs.realtime_api import Device, Network, StatusUpdateNotifier
4
5
6def print_component(component):
7 print(component)
8
9
10async def main():
11 async with Network() as network:
12 dev_info = await network.wait_for_new_device(timeout_seconds=5)
13 if dev_info is None:
14 print("No device could be found! Abort")
15 return
16
17 async with Device.from_discovered_device(dev_info) as device:
18 duration = 20
19 print(f"Starting auto-update for {duration} seconds")
20 # callbacks can be awaitable, too
21 notifier = StatusUpdateNotifier(device, callbacks=[print_component])
22 await notifier.receive_updates_start()
23 await asyncio.sleep(duration)
24 print("Stopping auto-update")
25 await notifier.receive_updates_stop()
26
27
28if __name__ == "__main__":
29 asyncio.run(main())
Send Event¶
An event without an explicit timestamp, will be timestamped on arrival at the Neon / Pupil Invisible Companion device.
1import asyncio
2import time
3
4from pupil_labs.realtime_api import Device, Network
5
6
7async def main():
8 async with Network() as network:
9 dev_info = await network.wait_for_new_device(timeout_seconds=5)
10 if dev_info is None:
11 print("No device could be found! Abort")
12 return
13
14 async with Device.from_discovered_device(dev_info) as device:
15 # send event without timestamp
16 print(await device.send_event("test event"))
17
18 # send event with current timestamp
19 print(
20 await device.send_event(
21 "test event", event_timestamp_unix_ns=time.time_ns()
22 )
23 )
24
25
26if __name__ == "__main__":
27 asyncio.run(main())
Start, stop and save, and cancel recordings¶
1import asyncio
2
3from pupil_labs.realtime_api import Device, Network, StatusUpdateNotifier
4from pupil_labs.realtime_api.models import Recording, Sensor
5
6
7async def on_status_update(component):
8 if isinstance(component, Recording):
9 if component.action == "ERROR":
10 print(f"Error : {component.message}")
11
12 elif isinstance(component, Sensor):
13 if component.stream_error:
14 print(f"Stream error in sensor {component.sensor}")
15
16
17async def main():
18 async with Network() as network:
19 dev_info = await network.wait_for_new_device(timeout_seconds=5)
20 if dev_info is None:
21 print("No device could be found! Abort")
22 return
23
24 async with Device.from_discovered_device(dev_info) as device:
25 # get update when recording is fully started
26 notifier = StatusUpdateNotifier(device, callbacks=[on_status_update])
27 await notifier.receive_updates_start()
28 recording_id = await device.recording_start()
29 print(f"Initiated recording with id {recording_id}")
30 await asyncio.sleep(5)
31 print("Stopping recording")
32 await device.recording_stop_and_save()
33 # await control.recording_cancel() # uncomment to cancel recording
34 await asyncio.sleep(2) # wait for confirmation via auto-update
35 await notifier.receive_updates_stop()
36
37
38if __name__ == "__main__":
39 asyncio.run(main())
Templates¶
You can programmatically fill the template. This allows you to also define the recording name if the template is created correctly.
1import asyncio
2import os
3
4import beaupy
5
6from pupil_labs.realtime_api import Device, Network
7from pupil_labs.realtime_api.models import InvalidTemplateAnswersError, TemplateItem
8
9LINE = "\u2500" * os.get_terminal_size().columns
10RED = "\033[31m"
11RESET = "\033[0m"
12
13
14def prompt_checkbox_answer(item: TemplateItem, current_value):
15 ticked = []
16 for i, choice in enumerate(item.choices):
17 current_value: list
18 if choice in (current_value or []):
19 current_value.remove(choice)
20 ticked.append(i)
21 choices = beaupy.select_multiple(
22 item.choices,
23 ticked_indices=ticked,
24 )
25 return choices
26
27
28def prompt_radio_answer(item: TemplateItem, current_value):
29 cursor_index = 0
30 if current_value and current_value[0] in item.choices:
31 cursor_index = item.choices.index(current_value[0])
32
33 choice = beaupy.select(item.choices, cursor_index=cursor_index)
34 template_input = []
35 if choice is not None:
36 template_input = [choice]
37 return template_input
38
39
40def prompt_string_answer(item: TemplateItem, current_value):
41 placeholder = item.help_text if item.help_text and item.help_text != [""] else None
42 current_value = (
43 placeholder if not current_value or current_value == [""] else current_value
44 )
45 return beaupy.prompt(
46 f"Enter value for '{item.title}': ",
47 initial_value="" if current_value is None else str(current_value),
48 )
49
50
51async def main(): # noqa: C901
52 async with Network() as network:
53 dev_info = await network.wait_for_new_device(timeout_seconds=5)
54 if dev_info is None:
55 print("No device could be found! Abort")
56 return
57
58 async with Device.from_discovered_device(dev_info) as device:
59 # Fetch current template definition
60 template = await device.get_template()
61 # Fetch data filled on the template
62 data = await device.get_template_data(format="simple")
63
64 print(f"[{template.name}] Data pre-filled:")
65 print(LINE)
66 print("\n".join(f"{k}\t{v}" for k, v in data.items()))
67
68 # Filling a template
69 questionnaire = {}
70 if template:
71 try:
72 for item in template.items:
73 if item.widget_type in ("SECTION_HEADER", "PAGE_BREAK"):
74 continue
75 print(LINE)
76 print(
77 f"{'* ' if item.required else ''}"
78 + f"ID: {item.id} - Title: {item.title} "
79 + f"- Input Type: {item.input_type}"
80 )
81 current_value = data.get(str(item.id))
82 while True:
83 question = template.get_question_by_id(item.id)
84 if item.widget_type == "CHECKBOX_LIST":
85 template_input = prompt_checkbox_answer(item, current_value)
86 elif item.widget_type == "RADIO_LIST":
87 template_input = prompt_radio_answer(item, current_value)
88 else:
89 template_input = prompt_string_answer(item, current_value)
90
91 try:
92 print(template_input)
93 errors = question.validate_answer(template_input)
94 if not errors:
95 questionnaire[str(item.id)] = template_input
96 break
97 else:
98 print(f"Errors: {errors}")
99 except InvalidTemplateAnswersError as e:
100 print(f"{RED}Validation failed for: {template_input}")
101 for error in e.errors:
102 print(f" {error['msg']}")
103 print(LINE + RESET)
104 except KeyboardInterrupt:
105 print("\nKeyboardInterrupt detected. Skipping line.")
106
107 print(LINE)
108
109 # Sending the template
110 if questionnaire:
111 await device.post_template_data(questionnaire)
112
113 # Fetch new data filled on the template
114 data = await device.get_template_data(format="api")
115
116 # Iterate to check filled data
117 print(f"[{template.name}] Data post:")
118 print(LINE)
119 print("\n".join(f"{k}\t{v}" for k, v in data.items()))
120
121
122if __name__ == "__main__":
123 asyncio.run(main())
Streaming¶
Gaze Data¶
1import asyncio
2import contextlib
3
4from pupil_labs.realtime_api import Device, Network, receive_gaze_data
5
6
7async def main():
8 async with Network() as network:
9 dev_info = await network.wait_for_new_device(timeout_seconds=5)
10 if dev_info is None:
11 print("No device could be found! Abort")
12 return
13
14 async with Device.from_discovered_device(dev_info) as device:
15 status = await device.get_status()
16 sensor_gaze = status.direct_gaze_sensor()
17 if not sensor_gaze.connected:
18 print(f"Gaze sensor is not connected to {device}")
19 return
20
21 restart_on_disconnect = True
22 async for gaze in receive_gaze_data(
23 sensor_gaze.url, run_loop=restart_on_disconnect
24 ):
25 print(gaze)
26
27
28if __name__ == "__main__":
29 with contextlib.suppress(KeyboardInterrupt):
30 asyncio.run(main())
Scene Camera Video¶
1import asyncio
2import contextlib
3
4import cv2
5import numpy as np
6
7# Workaround for https://github.com/opencv/opencv/issues/21952
8cv2.imshow("cv/av bug", np.zeros(1))
9cv2.destroyAllWindows()
10
11from pupil_labs.realtime_api import Device, Network, receive_video_frames # noqa
12
13
14async def main():
15 async with Network() as network:
16 dev_info = await network.wait_for_new_device(timeout_seconds=5)
17 if dev_info is None:
18 print("No device could be found! Abort")
19 return
20
21 async with Device.from_discovered_device(dev_info) as device:
22 status = await device.get_status()
23 sensor_world = status.direct_world_sensor()
24 if not sensor_world.connected:
25 print(f"Scene camera is not connected to {device}")
26 return
27
28 restart_on_disconnect = True
29 async for frame in receive_video_frames(
30 sensor_world.url, run_loop=restart_on_disconnect
31 ):
32 bgr_buffer = frame.bgr_buffer()
33 draw_time(bgr_buffer, frame.datetime)
34 cv2.imshow("Scene Camera - Press ESC to quit", bgr_buffer)
35 if cv2.waitKey(1) & 0xFF == 27:
36 return
37
38
39def draw_time(frame, time):
40 frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
41 frame_txt_font_scale = 1.0
42 frame_txt_thickness = 1
43
44 # first line: frame index
45 frame_txt = str(time)
46
47 cv2.putText(
48 frame,
49 frame_txt,
50 (20, 50),
51 frame_txt_font_name,
52 frame_txt_font_scale,
53 (255, 255, 255),
54 thickness=frame_txt_thickness,
55 lineType=cv2.LINE_8,
56 )
57
58
59if __name__ == "__main__":
60 with contextlib.suppress(KeyboardInterrupt):
61 asyncio.run(main())
Eyes Camera Video¶
1import asyncio
2import contextlib
3import time
4
5import cv2
6import numpy as np
7
8# Workaround for https://github.com/opencv/opencv/issues/21952
9cv2.imshow("cv/av bug", np.zeros(1))
10cv2.destroyAllWindows()
11
12from pupil_labs.realtime_api import Device, Network, receive_video_frames # noqa
13
14
15async def main(preview_frame_rate=30):
16 async with Network() as network:
17 dev_info = await network.wait_for_new_device(timeout_seconds=5)
18 if dev_info is None:
19 print("No device could be found! Abort")
20 return
21
22 async with Device.from_discovered_device(dev_info) as device:
23 status = await device.get_status()
24 sensor_eyes = status.direct_eyes_sensor()
25 if not sensor_eyes.connected:
26 print(f"Eyes camera is not connected to {device}")
27 return
28
29 restart_on_disconnect = True
30 _last_update = time.perf_counter()
31 async for frame in receive_video_frames(
32 sensor_eyes.url, run_loop=restart_on_disconnect
33 ):
34 bgr_buffer = frame.bgr_buffer()
35 draw_time(bgr_buffer, frame.datetime)
36 cv2.imshow("Eye Cameras - Press ESC to quit", bgr_buffer)
37
38 time_since_last_update = time.perf_counter() - _last_update
39 if time_since_last_update > 1 / preview_frame_rate:
40 if cv2.waitKey(1) & 0xFF == 27:
41 return
42 _last_update = time.perf_counter()
43
44
45def draw_time(frame, time):
46 frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
47 frame_txt_font_scale = 0.5
48 frame_txt_thickness = 1
49
50 # first line: frame index
51 frame_txt = str(time)
52
53 cv2.putText(
54 frame,
55 frame_txt,
56 (20, 50),
57 frame_txt_font_name,
58 frame_txt_font_scale,
59 (255, 255, 255),
60 thickness=frame_txt_thickness,
61 lineType=cv2.LINE_8,
62 )
63
64
65if __name__ == "__main__":
66 with contextlib.suppress(KeyboardInterrupt):
67 asyncio.run(main())
Scene Camera Video With Overlayed Gaze¶
This example processes two streams (video and gaze data) at the same time, matches each video frame with its temporally closest gaze point, and previews both in a window.
1import asyncio
2import contextlib
3import typing as T
4
5import cv2
6import numpy as np
7
8# Workaround for https://github.com/opencv/opencv/issues/21952
9cv2.imshow("cv/av bug", np.zeros(1))
10cv2.destroyAllWindows()
11
12from pupil_labs.realtime_api import ( # noqa
13 Device,
14 Network,
15 receive_gaze_data,
16 receive_video_frames,
17)
18
19
20async def main():
21 async with Network() as network:
22 dev_info = await network.wait_for_new_device(timeout_seconds=5)
23 if dev_info is None:
24 print("No device could be found! Abort")
25 return
26
27 async with Device.from_discovered_device(dev_info) as device:
28 print(f"Getting status information from {device}")
29 status = await device.get_status()
30
31 sensor_gaze = status.direct_gaze_sensor()
32 if not sensor_gaze.connected:
33 print(f"Gaze sensor is not connected to {device}")
34 return
35
36 sensor_world = status.direct_world_sensor()
37 if not sensor_world.connected:
38 print(f"Scene camera is not connected to {device}")
39 return
40
41 restart_on_disconnect = True
42
43 queue_video = asyncio.Queue()
44 queue_gaze = asyncio.Queue()
45
46 process_video = asyncio.create_task(
47 enqueue_sensor_data(
48 receive_video_frames(sensor_world.url, run_loop=restart_on_disconnect),
49 queue_video,
50 )
51 )
52 process_gaze = asyncio.create_task(
53 enqueue_sensor_data(
54 receive_gaze_data(sensor_gaze.url, run_loop=restart_on_disconnect),
55 queue_gaze,
56 )
57 )
58 try:
59 await match_and_draw(queue_video, queue_gaze)
60 finally:
61 process_video.cancel()
62 process_gaze.cancel()
63
64
65async def enqueue_sensor_data(sensor: T.AsyncIterator, queue: asyncio.Queue) -> None:
66 async for datum in sensor:
67 try:
68 queue.put_nowait((datum.datetime, datum))
69 except asyncio.QueueFull:
70 print(f"Queue is full, dropping {datum}")
71
72
73async def match_and_draw(queue_video, queue_gaze):
74 while True:
75 video_datetime, video_frame = await get_most_recent_item(queue_video)
76 _, gaze_datum = await get_closest_item(queue_gaze, video_datetime)
77
78 bgr_buffer = video_frame.to_ndarray(format="bgr24")
79
80 cv2.circle(
81 bgr_buffer,
82 (int(gaze_datum.x), int(gaze_datum.y)),
83 radius=80,
84 color=(0, 0, 255),
85 thickness=15,
86 )
87
88 cv2.imshow("Scene camera with gaze overlay", bgr_buffer)
89 cv2.waitKey(1)
90
91
92async def get_most_recent_item(queue):
93 item = await queue.get()
94 while True:
95 try:
96 next_item = queue.get_nowait()
97 except asyncio.QueueEmpty:
98 return item
99 else:
100 item = next_item
101
102
103async def get_closest_item(queue, timestamp):
104 item_ts, item = await queue.get()
105 # assumes monotonically increasing timestamps
106 if item_ts > timestamp:
107 return item_ts, item
108 while True:
109 try:
110 next_item_ts, next_item = queue.get_nowait()
111 except asyncio.QueueEmpty:
112 return item_ts, item
113 else:
114 if next_item_ts > timestamp:
115 return next_item_ts, next_item
116 item_ts, item = next_item_ts, next_item
117
118
119if __name__ == "__main__":
120 with contextlib.suppress(KeyboardInterrupt):
121 asyncio.run(main())
Device Discovery¶
1import asyncio
2import contextlib
3
4from pupil_labs.realtime_api.discovery import Network, discover_devices
5
6
7async def main():
8 async with Network() as network:
9 print("Looking for the next best device...\n\t", end="")
10 print(await network.wait_for_new_device(timeout_seconds=5))
11
12 print("---")
13 print("All devices after searching for additional 5 seconds:")
14 await asyncio.sleep(5)
15 print(network.devices)
16
17 print("---")
18 print("Starting new, indefinitive search... hit ctrl-c to stop.")
19 # optionally set timeout_seconds argument to limit search duration
20 async for device_info in discover_devices():
21 print(f"\t{device_info}")
22
23
24if __name__ == "__main__":
25 with contextlib.suppress(KeyboardInterrupt):
26 asyncio.run(main())
Time Offset Estimation¶
See pupil_labs.realtime_api.time_echo
for details.
1import asyncio
2
3from pupil_labs.realtime_api import Device, Network
4from pupil_labs.realtime_api.time_echo import TimeOffsetEstimator
5
6
7async def main():
8 async with Network() as network:
9 dev_info = await network.wait_for_new_device(timeout_seconds=5)
10 if dev_info is None:
11 print("No device could be found! Abort")
12 return
13
14 async with Device.from_discovered_device(dev_info) as device:
15 status = await device.get_status()
16
17 print(f"Device IP address: {status.phone.ip}")
18 print(f"Device Time Echo port: {status.phone.time_echo_port}")
19
20 if status.phone.time_echo_port is None:
21 print(
22 "You Pupil Invisible Companion app is out-of-date and does not yet "
23 "support the Time Echo protocol. Upgrade to version 1.4.28 or newer."
24 )
25 return
26
27 time_offset_estimator = TimeOffsetEstimator(
28 status.phone.ip, status.phone.time_echo_port
29 )
30 estimated_offset = await time_offset_estimator.estimate()
31 print(f"Mean time offset: {estimated_offset.time_offset_ms.mean} ms")
32 print(
33 "Mean roundtrip duration: "
34 f"{estimated_offset.roundtrip_duration_ms.mean} ms"
35 )
36
37
38if __name__ == "__main__":
39 asyncio.run(main())