Simple Examples

Code examples that use the Simple API.

Note

The examples require Python 3.9+ to run.

Find one or more devices

1from pupil_labs.realtime_api.simple import discover_devices, discover_one_device
2
3# Look for devices. Returns as soon as it has found the first device.
4print("Looking for the next best device...\n\t", end="")
5print(discover_one_device(max_search_duration_seconds=10.0))
6
7# List all devices that could be found within 10 seconds
8print("Starting 10 second search...\n\t", end="")
9print(discover_devices(search_duration_seconds=10.0))

Remote control devices

Get current status

 1from pupil_labs.realtime_api.simple import discover_one_device
 2
 3# Look for devices. Returns as soon as it has found the first device.
 4print("Looking for the next best device...")
 5device = discover_one_device(max_search_duration_seconds=10)
 6if device is None:
 7    print("No device found.")
 8    raise SystemExit(-1)
 9
10# Device status is fetched on initialization and kept up-to-date in the background
11
12print(f"Phone IP address: {device.phone_ip}")
13print(f"Phone name: {device.phone_name}")
14print(f"Phone unique ID: {device.phone_id}")
15
16print(f"Battery level: {device.battery_level_percent}%")
17print(f"Battery state: {device.battery_state}")
18
19print(f"Free storage: {device.memory_num_free_bytes / 1024**3}GB")
20print(f"Storage level: {device.memory_state}")
21
22print(f"Connected glasses: SN {device.serial_number_glasses}")
23print(f"Connected scene camera: SN {device.serial_number_scene_cam}")
24
25device.close()  # explicitly stop auto-update

Automatic status updates

The Device class monitors a Neon / Pupil Invisible Companion device in the background and mirrors its state accordingly.

 1from pupil_labs.realtime_api.simple import discover_one_device
 2
 3# Look for devices. Returns as soon as it has found the first device.
 4print("Looking for the next best device...")
 5device = discover_one_device(max_search_duration_seconds=10)
 6if device is None:
 7    print("No device found.")
 8    raise SystemExit(-1)
 9
10scene_camera = device.world_sensor()
11connected = False if scene_camera is None else scene_camera.connected
12print("Scene camera connected:", connected)
13
14input("(Dis)connect the scene camera and hit enter...")
15
16scene_camera = device.world_sensor()
17connected = False if scene_camera is None else scene_camera.connected
18print("Scene camera connected:", connected)
19
20device.close()

Send event

An event without an explicit timestamp, will be timestamped on arrival at the Neon / Pupil Invisible Companion device.

 1import time
 2
 3from pupil_labs.realtime_api.simple import discover_one_device
 4
 5# Look for devices. Returns as soon as it has found the first device.
 6print("Looking for the next best device...")
 7device = discover_one_device(max_search_duration_seconds=10)
 8if device is None:
 9    print("No device found.")
10    raise SystemExit(-1)
11
12print(device.send_event("test event; timestamped at arrival"))
13
14# send event with current timestamp
15print(
16    device.send_event(
17        "test event; timestamped by the client, relying on NTP for sync",
18        event_timestamp_unix_ns=time.time_ns(),
19    )
20)
21
22# Estimate clock offset between Companion device and client script
23# (only needs to be done once)
24estimate = device.estimate_time_offset()
25clock_offset_ns = round(estimate.time_offset_ms.mean * 1_000_000)
26print(f"Clock offset: {clock_offset_ns:_d} ns")
27
28# send event with current timestamp, but correct it manual for possible clock offset
29current_time_ns_in_client_clock = time.time_ns()
30current_time_ns_in_companion_clock = current_time_ns_in_client_clock - clock_offset_ns
31print(
32    device.send_event(
33        "test event; timestamped by the client, manual clock offset correction",
34        event_timestamp_unix_ns=current_time_ns_in_companion_clock,
35    )
36)
37
38
39device.close()

Start, stop and save, and cancel recordings

 1import time
 2
 3from pupil_labs.realtime_api.simple import discover_one_device
 4
 5# Look for devices. Returns as soon as it has found the first device.
 6print("Looking for the next best device...")
 7device = discover_one_device(max_search_duration_seconds=10)
 8if device is None:
 9    print("No device found.")
10    raise SystemExit(-1)
11
12print(f"Starting recording")
13recording_id = device.recording_start()
14print(f"Started recording with id {recording_id}")
15
16time.sleep(5)
17
18device.recording_stop_and_save()
19
20print("Recording stopped and saved")
21# device.recording_cancel()  # uncomment to cancel recording
22
23device.close()

Templates

You can programmatically fill the template. This allows you to also define the recording name if the template is created correctly.

  1import os
  2
  3import beaupy
  4
  5from pupil_labs.realtime_api.models import InvalidTemplateAnswersError, TemplateItem
  6from pupil_labs.realtime_api.simple import discover_one_device
  7
  8# handle KeyboardInterrupts ourselves
  9beaupy.Config.raise_on_interrupt = True
 10
 11# Look for devices. Returns as soon as it has found the first device.
 12print("Looking for the next best device...")
 13device = discover_one_device(max_search_duration_seconds=10)
 14if device is None:
 15    print("No device found.")
 16    raise SystemExit(-1)
 17
 18# Fetch current template definition
 19template = device.get_template()
 20
 21# Fetch data filled on the template
 22data = device.get_template_data()
 23
 24LINE = "\u2500" * os.get_terminal_size().columns
 25RED = "\033[31m"
 26RESET = "\033[0m"
 27
 28print(f"[{template.name}] Data pre-filled:")
 29print(LINE)
 30print("\n".join(f"{k}\t{v}" for k, v in data.items()))
 31
 32
 33def prompt_checkbox_answer(item: TemplateItem, current_value):
 34    ticked = []
 35    for i, choice in enumerate(item.choices):
 36        current_value: list
 37        if choice in (current_value or []):
 38            current_value.remove(choice)
 39            ticked.append(i)
 40    choices = beaupy.select_multiple(
 41        item.choices,
 42        ticked_indices=ticked,
 43    )
 44    return choices
 45
 46
 47def prompt_radio_answer(item: TemplateItem, current_value):
 48    cursor_index = 0
 49    if current_value and current_value[0] in item.choices:
 50        cursor_index = item.choices.index(current_value[0])
 51
 52    choice = beaupy.select(item.choices, cursor_index=cursor_index)
 53    template_input = []
 54    if choice is not None:
 55        template_input = [choice]
 56    return template_input
 57
 58
 59def prompt_string_answer(item: TemplateItem, current_value):
 60    placeholder = item.help_text if item.help_text and item.help_text != [""] else None
 61    current_value = (
 62        placeholder if not current_value or current_value == [""] else current_value
 63    )
 64    return beaupy.prompt(
 65        f"Enter value for '{item.title}': ",
 66        initial_value="" if current_value is None else str(current_value),
 67    )
 68
 69
 70# Filling a template
 71questionnaire = {}
 72if template:
 73    try:
 74        for item in template.items:
 75            if item.widget_type in ("SECTION_HEADER", "PAGE_BREAK"):
 76                continue
 77            print(LINE)
 78            print(
 79                f"{'* ' if item.required else ''}"
 80                + f"ID: {item.id} - Title: {item.title} "
 81                + f"- Input Type: {item.input_type}"
 82            )
 83            current_value = data.get(str(item.id))
 84            while True:
 85                question = template.get_question_by_id(item.id)
 86                if item.widget_type == "CHECKBOX_LIST":
 87                    template_input = prompt_checkbox_answer(item, current_value)
 88                elif item.widget_type == "RADIO_LIST":
 89                    template_input = prompt_radio_answer(item, current_value)
 90                else:
 91                    template_input = prompt_string_answer(item, current_value)
 92
 93                try:
 94                    print(template_input)
 95                    errors = question.validate_answer(template_input)
 96                    if not errors:
 97                        questionnaire[str(item.id)] = template_input
 98                        break
 99                    else:
100                        print(f"Errors: {errors}")
101                except InvalidTemplateAnswersError as e:
102                    print(f"{RED}Validation failed for: {template_input}")
103                    for error in e.errors:
104                        print(f"    {error['msg']}")
105                    print(LINE + RESET)
106    except KeyboardInterrupt:
107        print("\nKeyboardInterrupt detected. Skipping the rest of the template")
108
109print(LINE)
110# Sending the template
111if questionnaire:
112    device.post_template_data(questionnaire)
113
114# Fetch new data filled on the template
115data = device.get_template_data()
116
117# Iterate to check filled data
118print(f"[{template.name}] Data post:")
119print(LINE)
120print("\n".join(f"{k}\t{v}" for k, v in data.items()))
121
122device.close()

Streaming

Gaze data

 1from pupil_labs.realtime_api.simple import discover_one_device
 2
 3# Look for devices. Returns as soon as it has found the first device.
 4print("Looking for the next best device...")
 5device = discover_one_device(max_search_duration_seconds=10)
 6if device is None:
 7    print("No device found.")
 8    raise SystemExit(-1)
 9
10# device.streaming_start()  # optional, if not called, stream is started on-demand
11
12try:
13    while True:
14        print(device.receive_gaze_datum())
15except KeyboardInterrupt:
16    pass
17finally:
18    print("Stopping...")
19    # device.streaming_stop()  # optional, if not called, stream is stopped on close
20    device.close()  # explicitly stop auto-update

Scene camera video

 1import cv2
 2import numpy as np
 3
 4# Workaround for https://github.com/opencv/opencv/issues/21952
 5cv2.imshow("cv/av bug", np.zeros(1))
 6cv2.destroyAllWindows()
 7
 8from pupil_labs.realtime_api.simple import discover_one_device  # noqa
 9
10
11def main():
12    # Look for devices. Returns as soon as it has found the first device.
13    print("Looking for the next best device...")
14    device = discover_one_device(max_search_duration_seconds=10)
15    if device is None:
16        print("No device found.")
17        raise SystemExit(-1)
18
19    print(f"Connecting to {device}...")
20
21    try:
22        while True:
23            bgr_pixels, frame_datetime = device.receive_scene_video_frame()
24            draw_time(bgr_pixels, frame_datetime)
25            cv2.imshow("Scene Camera - Press ESC to quit", bgr_pixels)
26            if cv2.waitKey(1) & 0xFF == 27:
27                break
28    except KeyboardInterrupt:
29        pass
30    finally:
31        print("Stopping...")
32        device.close()  # explicitly stop auto-update
33
34
35def draw_time(frame, time):
36    frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
37    frame_txt_font_scale = 1.0
38    frame_txt_thickness = 1
39
40    # first line: frame index
41    frame_txt = str(time)
42
43    cv2.putText(
44        frame,
45        frame_txt,
46        (20, 50),
47        frame_txt_font_name,
48        frame_txt_font_scale,
49        (255, 255, 255),
50        thickness=frame_txt_thickness,
51        lineType=cv2.LINE_8,
52    )
53
54
55if __name__ == "__main__":
56    main()

Eyes camera video

Note

Only available when connecting to a Neon Companion app

 1import cv2
 2import numpy as np
 3
 4# Workaround for https://github.com/opencv/opencv/issues/21952
 5cv2.imshow("cv/av bug", np.zeros(1))
 6cv2.destroyAllWindows()
 7
 8from pupil_labs.realtime_api.simple import discover_one_device  # noqa
 9
10
11def main():
12    # Look for devices. Returns as soon as it has found the first device.
13    print("Looking for the next best device...")
14    device = discover_one_device(max_search_duration_seconds=10)
15    if device is None:
16        print("No device found.")
17        raise SystemExit(-1)
18
19    print(f"Connecting to {device}...")
20
21    try:
22        while True:
23            bgr_pixels, frame_datetime = device.receive_eyes_video_frame()
24            draw_time(bgr_pixels, frame_datetime)
25            cv2.imshow("Eyes Camera - Press ESC to quit", bgr_pixels)
26            if cv2.waitKey(1) & 0xFF == 27:
27                break
28    except KeyboardInterrupt:
29        pass
30    finally:
31        print("Stopping...")
32        device.close()  # explicitly stop auto-update
33
34
35def draw_time(frame, time):
36    frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
37    frame_txt_font_scale = 1.0
38    frame_txt_thickness = 1
39
40    # first line: frame index
41    frame_txt = str(time)
42
43    cv2.putText(
44        frame,
45        frame_txt,
46        (20, 50),
47        frame_txt_font_name,
48        frame_txt_font_scale,
49        (255, 255, 255),
50        thickness=frame_txt_thickness,
51        lineType=cv2.LINE_8,
52    )
53
54
55if __name__ == "__main__":
56    main()

Camera calibration

Note

Only available when connecting to a Neon Companion app

 1from pupil_labs.realtime_api.simple import discover_one_device
 2
 3# Look for devices. Returns as soon as it has found the first device.
 4print("Looking for the next best device...")
 5device = discover_one_device(max_search_duration_seconds=10)
 6if device is None:
 7    print("No device found.")
 8    raise SystemExit(-1)
 9
10# Device status is fetched on initialization and kept up-to-date in the background
11
12calibration = device.get_calibration()
13
14print("Scene camera matrix:")
15print(calibration["scene_camera_matrix"][0])
16print("\nScene distortion coefficients:")
17print(calibration["scene_distortion_coefficients"][0])
18
19print("\nRight camera matrix:")
20print(calibration["right_camera_matrix"][0])
21print("\nRight distortion coefficients:")
22print(calibration["right_distortion_coefficients"][0])
23
24print("\nLeft camera matrix:")
25print(calibration["left_camera_matrix"][0])
26print("\nLeft distortion coefficients:")
27print(calibration["left_distortion_coefficients"][0])
28
29device.close()

Scene camera video with overlayed gaze

 1import cv2
 2import numpy as np
 3
 4# Workaround for https://github.com/opencv/opencv/issues/21952
 5cv2.imshow("cv/av bug", np.zeros(1))
 6cv2.destroyAllWindows()
 7
 8from pupil_labs.realtime_api.simple import discover_one_device  # noqa
 9
10
11def main():
12    # Look for devices. Returns as soon as it has found the first device.
13    print("Looking for the next best device...")
14    device = discover_one_device(max_search_duration_seconds=10)
15    if device is None:
16        print("No device found.")
17        raise SystemExit(-1)
18
19    print(f"Connecting to {device}...")
20
21    try:
22        while True:
23            frame, gaze = device.receive_matched_scene_video_frame_and_gaze()
24            cv2.circle(
25                frame.bgr_pixels,
26                (int(gaze.x), int(gaze.y)),
27                radius=80,
28                color=(0, 0, 255),
29                thickness=15,
30            )
31
32            cv2.imshow("Scene camera with gaze overlay", frame.bgr_pixels)
33            if cv2.waitKey(1) & 0xFF == 27:
34                break
35    except KeyboardInterrupt:
36        pass
37    finally:
38        print("Stopping...")
39        device.close()  # explicitly stop auto-update
40
41
42if __name__ == "__main__":
43    main()

Scene camera video with overlayed eyes video and gaze circle

Note

Only available when connecting to a Neon Companion app

 1import cv2
 2import numpy as np
 3
 4# Workaround for https://github.com/opencv/opencv/issues/21952
 5cv2.imshow("cv/av bug", np.zeros(1))
 6cv2.destroyAllWindows()
 7
 8from pupil_labs.realtime_api.simple import discover_one_device  # noqa
 9
10
11def main():
12    # Look for devices. Returns as soon as it has found the first device.
13    print("Looking for the next best device...")
14    device = discover_one_device(max_search_duration_seconds=10)
15    if device is None:
16        print("No device found.")
17        raise SystemExit(-1)
18
19    print(f"Connecting to {device}...")
20
21    try:
22        while True:
23            matched = device.receive_matched_scene_and_eyes_video_frames_and_gaze()
24            if not matched:
25                print(
26                    "Not able to find a match! Note: Pupil Invisible does not support "
27                    "streaming eyes video"
28                )
29                continue
30
31            cv2.circle(
32                matched.scene.bgr_pixels,
33                (int(matched.gaze.x), int(matched.gaze.y)),
34                radius=80,
35                color=(0, 0, 255),
36                thickness=15,
37            )
38
39            # Render eyes video into the scene video
40            height, width, _ = matched.eyes.bgr_pixels.shape
41            matched.scene.bgr_pixels[:height, :width, :] = matched.eyes.bgr_pixels
42
43            cv2.imshow(
44                "Scene camera with eyes and gaze overlay", matched.scene.bgr_pixels
45            )
46            if cv2.waitKey(1) & 0xFF == 27:
47                break
48    except KeyboardInterrupt:
49        pass
50    finally:
51        print("Stopping...")
52        device.close()  # explicitly stop auto-update
53
54
55if __name__ == "__main__":
56    main()

Time Offset Estimation

See pupil_labs.realtime_api.time_echo for details.

 1from pupil_labs.realtime_api.simple import discover_one_device
 2
 3# Look for devices. Returns as soon as it has found the first device.
 4print("Looking for the next best device...")
 5device = discover_one_device(max_search_duration_seconds=10)
 6if device is None:
 7    raise SystemExit("No device found.")
 8
 9estimate = device.estimate_time_offset()
10if estimate is None:
11    device.close()
12    raise SystemExit("Pupil Companion app is too old")
13
14print(f"Mean time offset: {estimate.time_offset_ms.mean} ms")
15print(f"Mean roundtrip duration: {estimate.roundtrip_duration_ms.mean} ms")
16
17device.close()