Simple Examples¶
Code examples that use the Simple API.
Note
The examples require Python 3.9+ to run.
Find one or more devices¶
1from pupil_labs.realtime_api.simple import discover_devices, discover_one_device
2
3# Look for devices. Returns as soon as it has found the first device.
4print("Looking for the next best device...\n\t", end="")
5print(discover_one_device(max_search_duration_seconds=10.0))
6
7# List all devices that could be found within 10 seconds
8print("Starting 10 second search...\n\t", end="")
9print(discover_devices(search_duration_seconds=10.0))
Remote control devices¶
Get current status¶
1from pupil_labs.realtime_api.simple import discover_one_device
2
3# Look for devices. Returns as soon as it has found the first device.
4print("Looking for the next best device...")
5device = discover_one_device(max_search_duration_seconds=10)
6if device is None:
7 print("No device found.")
8 raise SystemExit(-1)
9
10# Device status is fetched on initialization and kept up-to-date in the background
11
12print(f"Phone IP address: {device.phone_ip}")
13print(f"Phone name: {device.phone_name}")
14print(f"Phone unique ID: {device.phone_id}")
15
16print(f"Battery level: {device.battery_level_percent}%")
17print(f"Battery state: {device.battery_state}")
18
19print(f"Free storage: {device.memory_num_free_bytes / 1024**3}GB")
20print(f"Storage level: {device.memory_state}")
21
22print(f"Connected glasses: SN {device.serial_number_glasses}")
23print(f"Connected scene camera: SN {device.serial_number_scene_cam}")
24
25device.close() # explicitly stop auto-update
Automatic status updates¶
The Device
class monitors a
Neon / Pupil Invisible Companion device in the background and mirrors its state accordingly.
1from pupil_labs.realtime_api.simple import discover_one_device
2
3# Look for devices. Returns as soon as it has found the first device.
4print("Looking for the next best device...")
5device = discover_one_device(max_search_duration_seconds=10)
6if device is None:
7 print("No device found.")
8 raise SystemExit(-1)
9
10scene_camera = device.world_sensor()
11connected = False if scene_camera is None else scene_camera.connected
12print("Scene camera connected:", connected)
13
14input("(Dis)connect the scene camera and hit enter...")
15
16scene_camera = device.world_sensor()
17connected = False if scene_camera is None else scene_camera.connected
18print("Scene camera connected:", connected)
19
20device.close()
Send event¶
An event without an explicit timestamp, will be timestamped on arrival at the Neon / Pupil Invisible Companion device.
1import time
2
3from pupil_labs.realtime_api.simple import discover_one_device
4
5# Look for devices. Returns as soon as it has found the first device.
6print("Looking for the next best device...")
7device = discover_one_device(max_search_duration_seconds=10)
8if device is None:
9 print("No device found.")
10 raise SystemExit(-1)
11
12print(device.send_event("test event; timestamped at arrival"))
13
14# send event with current timestamp
15print(
16 device.send_event(
17 "test event; timestamped by the client, relying on NTP for sync",
18 event_timestamp_unix_ns=time.time_ns(),
19 )
20)
21
22# Estimate clock offset between Companion device and client script
23# (only needs to be done once)
24estimate = device.estimate_time_offset()
25clock_offset_ns = round(estimate.time_offset_ms.mean * 1_000_000)
26print(f"Clock offset: {clock_offset_ns:_d} ns")
27
28# send event with current timestamp, but correct it manual for possible clock offset
29current_time_ns_in_client_clock = time.time_ns()
30current_time_ns_in_companion_clock = current_time_ns_in_client_clock - clock_offset_ns
31print(
32 device.send_event(
33 "test event; timestamped by the client, manual clock offset correction",
34 event_timestamp_unix_ns=current_time_ns_in_companion_clock,
35 )
36)
37
38
39device.close()
Start, stop and save, and cancel recordings¶
1import time
2
3from pupil_labs.realtime_api.simple import discover_one_device
4
5# Look for devices. Returns as soon as it has found the first device.
6print("Looking for the next best device...")
7device = discover_one_device(max_search_duration_seconds=10)
8if device is None:
9 print("No device found.")
10 raise SystemExit(-1)
11
12print(f"Starting recording")
13recording_id = device.recording_start()
14print(f"Started recording with id {recording_id}")
15
16time.sleep(5)
17
18device.recording_stop_and_save()
19
20print("Recording stopped and saved")
21# device.recording_cancel() # uncomment to cancel recording
22
23device.close()
Templates¶
You can programmatically fill the template. This allows you to also define the recording name if the template is created correctly.
1import os
2
3import beaupy
4
5from pupil_labs.realtime_api.models import InvalidTemplateAnswersError, TemplateItem
6from pupil_labs.realtime_api.simple import discover_one_device
7
8# handle KeyboardInterrupts ourselves
9beaupy.Config.raise_on_interrupt = True
10
11# Look for devices. Returns as soon as it has found the first device.
12print("Looking for the next best device...")
13device = discover_one_device(max_search_duration_seconds=10)
14if device is None:
15 print("No device found.")
16 raise SystemExit(-1)
17
18# Fetch current template definition
19template = device.get_template()
20
21# Fetch data filled on the template
22data = device.get_template_data()
23
24LINE = "\u2500" * os.get_terminal_size().columns
25RED = "\033[31m"
26RESET = "\033[0m"
27
28print(f"[{template.name}] Data pre-filled:")
29print(LINE)
30print("\n".join(f"{k}\t{v}" for k, v in data.items()))
31
32
33def prompt_checkbox_answer(item: TemplateItem, current_value):
34 ticked = []
35 for i, choice in enumerate(item.choices):
36 current_value: list
37 if choice in (current_value or []):
38 current_value.remove(choice)
39 ticked.append(i)
40 choices = beaupy.select_multiple(
41 item.choices,
42 ticked_indices=ticked,
43 )
44 return choices
45
46
47def prompt_radio_answer(item: TemplateItem, current_value):
48 cursor_index = 0
49 if current_value and current_value[0] in item.choices:
50 cursor_index = item.choices.index(current_value[0])
51
52 choice = beaupy.select(item.choices, cursor_index=cursor_index)
53 template_input = []
54 if choice is not None:
55 template_input = [choice]
56 return template_input
57
58
59def prompt_string_answer(item: TemplateItem, current_value):
60 placeholder = item.help_text if item.help_text and item.help_text != [""] else None
61 current_value = (
62 placeholder if not current_value or current_value == [""] else current_value
63 )
64 return beaupy.prompt(
65 f"Enter value for '{item.title}': ",
66 initial_value="" if current_value is None else str(current_value),
67 )
68
69
70# Filling a template
71questionnaire = {}
72if template:
73 try:
74 for item in template.items:
75 if item.widget_type in ("SECTION_HEADER", "PAGE_BREAK"):
76 continue
77 print(LINE)
78 print(
79 f"{'* ' if item.required else ''}"
80 + f"ID: {item.id} - Title: {item.title} "
81 + f"- Input Type: {item.input_type}"
82 )
83 current_value = data.get(str(item.id))
84 while True:
85 question = template.get_question_by_id(item.id)
86 if item.widget_type == "CHECKBOX_LIST":
87 template_input = prompt_checkbox_answer(item, current_value)
88 elif item.widget_type == "RADIO_LIST":
89 template_input = prompt_radio_answer(item, current_value)
90 else:
91 template_input = prompt_string_answer(item, current_value)
92
93 try:
94 print(template_input)
95 errors = question.validate_answer(template_input)
96 if not errors:
97 questionnaire[str(item.id)] = template_input
98 break
99 else:
100 print(f"Errors: {errors}")
101 except InvalidTemplateAnswersError as e:
102 print(f"{RED}Validation failed for: {template_input}")
103 for error in e.errors:
104 print(f" {error['msg']}")
105 print(LINE + RESET)
106 except KeyboardInterrupt:
107 print("\nKeyboardInterrupt detected. Skipping the rest of the template")
108
109print(LINE)
110# Sending the template
111if questionnaire:
112 device.post_template_data(questionnaire)
113
114# Fetch new data filled on the template
115data = device.get_template_data()
116
117# Iterate to check filled data
118print(f"[{template.name}] Data post:")
119print(LINE)
120print("\n".join(f"{k}\t{v}" for k, v in data.items()))
121
122device.close()
Streaming¶
Gaze data¶
1from pupil_labs.realtime_api.simple import discover_one_device
2
3# Look for devices. Returns as soon as it has found the first device.
4print("Looking for the next best device...")
5device = discover_one_device(max_search_duration_seconds=10)
6if device is None:
7 print("No device found.")
8 raise SystemExit(-1)
9
10# device.streaming_start() # optional, if not called, stream is started on-demand
11
12try:
13 while True:
14 print(device.receive_gaze_datum())
15except KeyboardInterrupt:
16 pass
17finally:
18 print("Stopping...")
19 # device.streaming_stop() # optional, if not called, stream is stopped on close
20 device.close() # explicitly stop auto-update
Scene camera video¶
1import cv2
2import numpy as np
3
4# Workaround for https://github.com/opencv/opencv/issues/21952
5cv2.imshow("cv/av bug", np.zeros(1))
6cv2.destroyAllWindows()
7
8from pupil_labs.realtime_api.simple import discover_one_device # noqa
9
10
11def main():
12 # Look for devices. Returns as soon as it has found the first device.
13 print("Looking for the next best device...")
14 device = discover_one_device(max_search_duration_seconds=10)
15 if device is None:
16 print("No device found.")
17 raise SystemExit(-1)
18
19 print(f"Connecting to {device}...")
20
21 try:
22 while True:
23 bgr_pixels, frame_datetime = device.receive_scene_video_frame()
24 draw_time(bgr_pixels, frame_datetime)
25 cv2.imshow("Scene Camera - Press ESC to quit", bgr_pixels)
26 if cv2.waitKey(1) & 0xFF == 27:
27 break
28 except KeyboardInterrupt:
29 pass
30 finally:
31 print("Stopping...")
32 device.close() # explicitly stop auto-update
33
34
35def draw_time(frame, time):
36 frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
37 frame_txt_font_scale = 1.0
38 frame_txt_thickness = 1
39
40 # first line: frame index
41 frame_txt = str(time)
42
43 cv2.putText(
44 frame,
45 frame_txt,
46 (20, 50),
47 frame_txt_font_name,
48 frame_txt_font_scale,
49 (255, 255, 255),
50 thickness=frame_txt_thickness,
51 lineType=cv2.LINE_8,
52 )
53
54
55if __name__ == "__main__":
56 main()
Eyes camera video¶
Note
Only available when connecting to a Neon Companion app
1import cv2
2import numpy as np
3
4# Workaround for https://github.com/opencv/opencv/issues/21952
5cv2.imshow("cv/av bug", np.zeros(1))
6cv2.destroyAllWindows()
7
8from pupil_labs.realtime_api.simple import discover_one_device # noqa
9
10
11def main():
12 # Look for devices. Returns as soon as it has found the first device.
13 print("Looking for the next best device...")
14 device = discover_one_device(max_search_duration_seconds=10)
15 if device is None:
16 print("No device found.")
17 raise SystemExit(-1)
18
19 print(f"Connecting to {device}...")
20
21 try:
22 while True:
23 bgr_pixels, frame_datetime = device.receive_eyes_video_frame()
24 draw_time(bgr_pixels, frame_datetime)
25 cv2.imshow("Eyes Camera - Press ESC to quit", bgr_pixels)
26 if cv2.waitKey(1) & 0xFF == 27:
27 break
28 except KeyboardInterrupt:
29 pass
30 finally:
31 print("Stopping...")
32 device.close() # explicitly stop auto-update
33
34
35def draw_time(frame, time):
36 frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
37 frame_txt_font_scale = 1.0
38 frame_txt_thickness = 1
39
40 # first line: frame index
41 frame_txt = str(time)
42
43 cv2.putText(
44 frame,
45 frame_txt,
46 (20, 50),
47 frame_txt_font_name,
48 frame_txt_font_scale,
49 (255, 255, 255),
50 thickness=frame_txt_thickness,
51 lineType=cv2.LINE_8,
52 )
53
54
55if __name__ == "__main__":
56 main()
Camera calibration¶
Note
Only available when connecting to a Neon Companion app
1from pupil_labs.realtime_api.simple import discover_one_device
2
3# Look for devices. Returns as soon as it has found the first device.
4print("Looking for the next best device...")
5device = discover_one_device(max_search_duration_seconds=10)
6if device is None:
7 print("No device found.")
8 raise SystemExit(-1)
9
10# Device status is fetched on initialization and kept up-to-date in the background
11
12calibration = device.get_calibration()
13
14print("Scene camera matrix:")
15print(calibration["scene_camera_matrix"][0])
16print("\nScene distortion coefficients:")
17print(calibration["scene_distortion_coefficients"][0])
18
19print("\nRight camera matrix:")
20print(calibration["right_camera_matrix"][0])
21print("\nRight distortion coefficients:")
22print(calibration["right_distortion_coefficients"][0])
23
24print("\nLeft camera matrix:")
25print(calibration["left_camera_matrix"][0])
26print("\nLeft distortion coefficients:")
27print(calibration["left_distortion_coefficients"][0])
28
29device.close()
Scene camera video with overlayed gaze¶
1import cv2
2import numpy as np
3
4# Workaround for https://github.com/opencv/opencv/issues/21952
5cv2.imshow("cv/av bug", np.zeros(1))
6cv2.destroyAllWindows()
7
8from pupil_labs.realtime_api.simple import discover_one_device # noqa
9
10
11def main():
12 # Look for devices. Returns as soon as it has found the first device.
13 print("Looking for the next best device...")
14 device = discover_one_device(max_search_duration_seconds=10)
15 if device is None:
16 print("No device found.")
17 raise SystemExit(-1)
18
19 print(f"Connecting to {device}...")
20
21 try:
22 while True:
23 frame, gaze = device.receive_matched_scene_video_frame_and_gaze()
24 cv2.circle(
25 frame.bgr_pixels,
26 (int(gaze.x), int(gaze.y)),
27 radius=80,
28 color=(0, 0, 255),
29 thickness=15,
30 )
31
32 cv2.imshow("Scene camera with gaze overlay", frame.bgr_pixels)
33 if cv2.waitKey(1) & 0xFF == 27:
34 break
35 except KeyboardInterrupt:
36 pass
37 finally:
38 print("Stopping...")
39 device.close() # explicitly stop auto-update
40
41
42if __name__ == "__main__":
43 main()
Scene camera video with overlayed eyes video and gaze circle¶
Note
Only available when connecting to a Neon Companion app
1import cv2
2import numpy as np
3
4# Workaround for https://github.com/opencv/opencv/issues/21952
5cv2.imshow("cv/av bug", np.zeros(1))
6cv2.destroyAllWindows()
7
8from pupil_labs.realtime_api.simple import discover_one_device # noqa
9
10
11def main():
12 # Look for devices. Returns as soon as it has found the first device.
13 print("Looking for the next best device...")
14 device = discover_one_device(max_search_duration_seconds=10)
15 if device is None:
16 print("No device found.")
17 raise SystemExit(-1)
18
19 print(f"Connecting to {device}...")
20
21 try:
22 while True:
23 matched = device.receive_matched_scene_and_eyes_video_frames_and_gaze()
24 if not matched:
25 print(
26 "Not able to find a match! Note: Pupil Invisible does not support "
27 "streaming eyes video"
28 )
29 continue
30
31 cv2.circle(
32 matched.scene.bgr_pixels,
33 (int(matched.gaze.x), int(matched.gaze.y)),
34 radius=80,
35 color=(0, 0, 255),
36 thickness=15,
37 )
38
39 # Render eyes video into the scene video
40 height, width, _ = matched.eyes.bgr_pixels.shape
41 matched.scene.bgr_pixels[:height, :width, :] = matched.eyes.bgr_pixels
42
43 cv2.imshow(
44 "Scene camera with eyes and gaze overlay", matched.scene.bgr_pixels
45 )
46 if cv2.waitKey(1) & 0xFF == 27:
47 break
48 except KeyboardInterrupt:
49 pass
50 finally:
51 print("Stopping...")
52 device.close() # explicitly stop auto-update
53
54
55if __name__ == "__main__":
56 main()
Time Offset Estimation¶
See pupil_labs.realtime_api.time_echo
for details.
1from pupil_labs.realtime_api.simple import discover_one_device
2
3# Look for devices. Returns as soon as it has found the first device.
4print("Looking for the next best device...")
5device = discover_one_device(max_search_duration_seconds=10)
6if device is None:
7 raise SystemExit("No device found.")
8
9estimate = device.estimate_time_offset()
10if estimate is None:
11 device.close()
12 raise SystemExit("Pupil Companion app is too old")
13
14print(f"Mean time offset: {estimate.time_offset_ms.mean} ms")
15print(f"Mean roundtrip duration: {estimate.roundtrip_duration_ms.mean} ms")
16
17device.close()