Asynchronous Examples

Note

The examples require Python 3.7+ to run and use the asyncio framework.

Remote Control

Get Current Status

 1import asyncio
 2
 3from pupil_labs.realtime_api import Device, Network
 4
 5
 6async def main():
 7    async with Network() as network:
 8        dev_info = await network.wait_for_new_device(timeout_seconds=5)
 9    if dev_info is None:
10        print("No device could be found! Abort")
11        return
12
13    async with Device.from_discovered_device(dev_info) as device:
14        status = await device.get_status()
15
16        print(f"Device IP address: {status.phone.ip}")
17        print(f"Battery level: {status.phone.battery_level} %")
18
19        print(f"Connected glasses: SN {status.hardware.glasses_serial}")
20        print(f"Connected scene camera: SN {status.hardware.world_camera_serial}")
21
22        world = status.direct_world_sensor()
23        print(f"World sensor: connected={world.connected} url={world.url}")
24
25        gaze = status.direct_gaze_sensor()
26        print(f"Gaze sensor: connected={gaze.connected} url={gaze.url}")
27
28
29if __name__ == "__main__":
30    asyncio.run(main())

Status Updates

Wait for status updates from the device

 1import asyncio
 2import contextlib
 3
 4from pupil_labs.realtime_api import Device, Network
 5
 6
 7async def main():
 8    async with Network() as network:
 9        dev_info = await network.wait_for_new_device(timeout_seconds=5)
10    if dev_info is None:
11        print("No device could be found! Abort")
12        return
13
14    async with Device.from_discovered_device(dev_info) as device:
15        print("Waiting for status updates... hit ctrl-c to stop.")
16        async for changed in device.status_updates():
17            print(changed)
18
19
20if __name__ == "__main__":
21    with contextlib.suppress(KeyboardInterrupt):
22        asyncio.run(main())

Get a callback when there is a new status updates

 1import asyncio
 2
 3from pupil_labs.realtime_api import Device, Network, StatusUpdateNotifier
 4
 5
 6def print_component(component):
 7    print(component)
 8
 9
10async def main():
11    async with Network() as network:
12        dev_info = await network.wait_for_new_device(timeout_seconds=5)
13    if dev_info is None:
14        print("No device could be found! Abort")
15        return
16
17    async with Device.from_discovered_device(dev_info) as device:
18        duration = 20
19        print(f"Starting auto-update for {duration} seconds")
20        # callbacks can be awaitable, too
21        notifier = StatusUpdateNotifier(device, callbacks=[print_component])
22        await notifier.receive_updates_start()
23        await asyncio.sleep(duration)
24        print("Stopping auto-update")
25        await notifier.receive_updates_stop()
26
27
28if __name__ == "__main__":
29    asyncio.run(main())

Send Event

An event without an explicit timestamp, will be timestamped on arrival at the Neon / Pupil Invisible Companion device.

 1import asyncio
 2import time
 3
 4from pupil_labs.realtime_api import Device, Network
 5
 6
 7async def main():
 8    async with Network() as network:
 9        dev_info = await network.wait_for_new_device(timeout_seconds=5)
10    if dev_info is None:
11        print("No device could be found! Abort")
12        return
13
14    async with Device.from_discovered_device(dev_info) as device:
15        # send event without timestamp
16        print(await device.send_event("test event"))
17
18        # send event with current timestamp
19        print(
20            await device.send_event(
21                "test event", event_timestamp_unix_ns=time.time_ns()
22            )
23        )
24
25
26if __name__ == "__main__":
27    asyncio.run(main())

Start, stop and save, and cancel recordings

 1import asyncio
 2
 3from pupil_labs.realtime_api import Device, Network, StatusUpdateNotifier
 4from pupil_labs.realtime_api.models import Recording, Sensor
 5
 6
 7async def on_status_update(component):
 8    if isinstance(component, Recording):
 9        if component.action == "ERROR":
10            print(f"Error : {component.message}")
11
12    elif isinstance(component, Sensor):
13        if component.stream_error:
14            print(f"Stream error in sensor {component.sensor}")
15
16
17async def main():
18    async with Network() as network:
19        dev_info = await network.wait_for_new_device(timeout_seconds=5)
20    if dev_info is None:
21        print("No device could be found! Abort")
22        return
23
24    async with Device.from_discovered_device(dev_info) as device:
25        # get update when recording is fully started
26        notifier = StatusUpdateNotifier(device, callbacks=[on_status_update])
27        await notifier.receive_updates_start()
28        recording_id = await device.recording_start()
29        print(f"Initiated recording with id {recording_id}")
30        await asyncio.sleep(5)
31        print("Stopping recording")
32        await device.recording_stop_and_save()
33        # await control.recording_cancel()  # uncomment to cancel recording
34        await asyncio.sleep(2)  # wait for confirmation via auto-update
35        await notifier.receive_updates_stop()
36
37
38if __name__ == "__main__":
39    asyncio.run(main())

Templates

You can programmatically fill the template. This allows you to also define the recording name if the template is created correctly.

  1import asyncio
  2import os
  3
  4import beaupy
  5
  6from pupil_labs.realtime_api import Device, Network
  7from pupil_labs.realtime_api.models import InvalidTemplateAnswersError, TemplateItem
  8
  9LINE = "\u2500" * os.get_terminal_size().columns
 10RED = "\033[31m"
 11RESET = "\033[0m"
 12
 13
 14def prompt_checkbox_answer(item: TemplateItem, current_value):
 15    ticked = []
 16    for i, choice in enumerate(item.choices):
 17        current_value: list
 18        if choice in (current_value or []):
 19            current_value.remove(choice)
 20            ticked.append(i)
 21    choices = beaupy.select_multiple(
 22        item.choices,
 23        ticked_indices=ticked,
 24    )
 25    return choices
 26
 27
 28def prompt_radio_answer(item: TemplateItem, current_value):
 29    cursor_index = 0
 30    if current_value and current_value[0] in item.choices:
 31        cursor_index = item.choices.index(current_value[0])
 32
 33    choice = beaupy.select(item.choices, cursor_index=cursor_index)
 34    template_input = []
 35    if choice is not None:
 36        template_input = [choice]
 37    return template_input
 38
 39
 40def prompt_string_answer(item: TemplateItem, current_value):
 41    placeholder = item.help_text if item.help_text and item.help_text != [""] else None
 42    current_value = (
 43        placeholder if not current_value or current_value == [""] else current_value
 44    )
 45    return beaupy.prompt(
 46        f"Enter value for '{item.title}': ",
 47        initial_value="" if current_value is None else str(current_value),
 48    )
 49
 50
 51async def main():  # noqa: C901
 52    async with Network() as network:
 53        dev_info = await network.wait_for_new_device(timeout_seconds=5)
 54    if dev_info is None:
 55        print("No device could be found! Abort")
 56        return
 57
 58    async with Device.from_discovered_device(dev_info) as device:
 59        # Fetch current template definition
 60        template = await device.get_template()
 61        # Fetch data filled on the template
 62        data = await device.get_template_data(format="simple")
 63
 64        print(f"[{template.name}] Data pre-filled:")
 65        print(LINE)
 66        print("\n".join(f"{k}\t{v}" for k, v in data.items()))
 67
 68        # Filling a template
 69        questionnaire = {}
 70        if template:
 71            try:
 72                for item in template.items:
 73                    if item.widget_type in ("SECTION_HEADER", "PAGE_BREAK"):
 74                        continue
 75                    print(LINE)
 76                    print(
 77                        f"{'* ' if item.required else ''}"
 78                        + f"ID: {item.id} - Title: {item.title} "
 79                        + f"- Input Type: {item.input_type}"
 80                    )
 81                    current_value = data.get(str(item.id))
 82                    while True:
 83                        question = template.get_question_by_id(item.id)
 84                        if item.widget_type == "CHECKBOX_LIST":
 85                            template_input = prompt_checkbox_answer(item, current_value)
 86                        elif item.widget_type == "RADIO_LIST":
 87                            template_input = prompt_radio_answer(item, current_value)
 88                        else:
 89                            template_input = prompt_string_answer(item, current_value)
 90
 91                        try:
 92                            print(template_input)
 93                            errors = question.validate_answer(template_input)
 94                            if not errors:
 95                                questionnaire[str(item.id)] = template_input
 96                                break
 97                            else:
 98                                print(f"Errors: {errors}")
 99                        except InvalidTemplateAnswersError as e:
100                            print(f"{RED}Validation failed for: {template_input}")
101                            for error in e.errors:
102                                print(f"    {error['msg']}")
103                            print(LINE + RESET)
104            except KeyboardInterrupt:
105                print("\nKeyboardInterrupt detected. Skipping line.")
106
107        print(LINE)
108
109        # Sending the template
110        if questionnaire:
111            await device.post_template_data(questionnaire)
112
113        # Fetch new data filled on the template
114        data = await device.get_template_data(format="api")
115
116        # Iterate to check filled data
117        print(f"[{template.name}] Data post:")
118        print(LINE)
119        print("\n".join(f"{k}\t{v}" for k, v in data.items()))
120
121
122if __name__ == "__main__":
123    asyncio.run(main())

Streaming

Gaze Data

 1import asyncio
 2import contextlib
 3
 4from pupil_labs.realtime_api import Device, Network, receive_gaze_data
 5
 6
 7async def main():
 8    async with Network() as network:
 9        dev_info = await network.wait_for_new_device(timeout_seconds=5)
10    if dev_info is None:
11        print("No device could be found! Abort")
12        return
13
14    async with Device.from_discovered_device(dev_info) as device:
15        status = await device.get_status()
16        sensor_gaze = status.direct_gaze_sensor()
17        if not sensor_gaze.connected:
18            print(f"Gaze sensor is not connected to {device}")
19            return
20
21        restart_on_disconnect = True
22        async for gaze in receive_gaze_data(
23            sensor_gaze.url, run_loop=restart_on_disconnect
24        ):
25            print(gaze)
26
27
28if __name__ == "__main__":
29    with contextlib.suppress(KeyboardInterrupt):
30        asyncio.run(main())

Scene Camera Video

 1import asyncio
 2import contextlib
 3
 4import cv2
 5import numpy as np
 6
 7# Workaround for https://github.com/opencv/opencv/issues/21952
 8cv2.imshow("cv/av bug", np.zeros(1))
 9cv2.destroyAllWindows()
10
11from pupil_labs.realtime_api import Device, Network, receive_video_frames  # noqa
12
13
14async def main():
15    async with Network() as network:
16        dev_info = await network.wait_for_new_device(timeout_seconds=5)
17    if dev_info is None:
18        print("No device could be found! Abort")
19        return
20
21    async with Device.from_discovered_device(dev_info) as device:
22        status = await device.get_status()
23        sensor_world = status.direct_world_sensor()
24        if not sensor_world.connected:
25            print(f"Scene camera is not connected to {device}")
26            return
27
28        restart_on_disconnect = True
29        async for frame in receive_video_frames(
30            sensor_world.url, run_loop=restart_on_disconnect
31        ):
32            bgr_buffer = frame.bgr_buffer()
33            draw_time(bgr_buffer, frame.datetime)
34            cv2.imshow("Scene Camera - Press ESC to quit", bgr_buffer)
35            if cv2.waitKey(1) & 0xFF == 27:
36                return
37
38
39def draw_time(frame, time):
40    frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
41    frame_txt_font_scale = 1.0
42    frame_txt_thickness = 1
43
44    # first line: frame index
45    frame_txt = str(time)
46
47    cv2.putText(
48        frame,
49        frame_txt,
50        (20, 50),
51        frame_txt_font_name,
52        frame_txt_font_scale,
53        (255, 255, 255),
54        thickness=frame_txt_thickness,
55        lineType=cv2.LINE_8,
56    )
57
58
59if __name__ == "__main__":
60    with contextlib.suppress(KeyboardInterrupt):
61        asyncio.run(main())

Eyes Camera Video

 1import asyncio
 2import contextlib
 3import time
 4
 5import cv2
 6import numpy as np
 7
 8# Workaround for https://github.com/opencv/opencv/issues/21952
 9cv2.imshow("cv/av bug", np.zeros(1))
10cv2.destroyAllWindows()
11
12from pupil_labs.realtime_api import Device, Network, receive_video_frames  # noqa
13
14
15async def main(preview_frame_rate=30):
16    async with Network() as network:
17        dev_info = await network.wait_for_new_device(timeout_seconds=5)
18    if dev_info is None:
19        print("No device could be found! Abort")
20        return
21
22    async with Device.from_discovered_device(dev_info) as device:
23        status = await device.get_status()
24        sensor_eyes = status.direct_eyes_sensor()
25        if not sensor_eyes.connected:
26            print(f"Eyes camera is not connected to {device}")
27            return
28
29        restart_on_disconnect = True
30        _last_update = time.perf_counter()
31        async for frame in receive_video_frames(
32            sensor_eyes.url, run_loop=restart_on_disconnect
33        ):
34            bgr_buffer = frame.bgr_buffer()
35            draw_time(bgr_buffer, frame.datetime)
36            cv2.imshow("Eye Cameras - Press ESC to quit", bgr_buffer)
37
38            time_since_last_update = time.perf_counter() - _last_update
39            if time_since_last_update > 1 / preview_frame_rate:
40                if cv2.waitKey(1) & 0xFF == 27:
41                    return
42                _last_update = time.perf_counter()
43
44
45def draw_time(frame, time):
46    frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
47    frame_txt_font_scale = 0.5
48    frame_txt_thickness = 1
49
50    # first line: frame index
51    frame_txt = str(time)
52
53    cv2.putText(
54        frame,
55        frame_txt,
56        (20, 50),
57        frame_txt_font_name,
58        frame_txt_font_scale,
59        (255, 255, 255),
60        thickness=frame_txt_thickness,
61        lineType=cv2.LINE_8,
62    )
63
64
65if __name__ == "__main__":
66    with contextlib.suppress(KeyboardInterrupt):
67        asyncio.run(main())

Scene Camera Video With Overlayed Gaze

This example processes two streams (video and gaze data) at the same time, matches each video frame with its temporally closest gaze point, and previews both in a window.

  1import asyncio
  2import contextlib
  3import typing as T
  4
  5import cv2
  6import numpy as np
  7
  8# Workaround for https://github.com/opencv/opencv/issues/21952
  9cv2.imshow("cv/av bug", np.zeros(1))
 10cv2.destroyAllWindows()
 11
 12from pupil_labs.realtime_api import (  # noqa
 13    Device,
 14    Network,
 15    receive_gaze_data,
 16    receive_video_frames,
 17)
 18
 19
 20async def main():
 21    async with Network() as network:
 22        dev_info = await network.wait_for_new_device(timeout_seconds=5)
 23    if dev_info is None:
 24        print("No device could be found! Abort")
 25        return
 26
 27    async with Device.from_discovered_device(dev_info) as device:
 28        print(f"Getting status information from {device}")
 29        status = await device.get_status()
 30
 31        sensor_gaze = status.direct_gaze_sensor()
 32        if not sensor_gaze.connected:
 33            print(f"Gaze sensor is not connected to {device}")
 34            return
 35
 36        sensor_world = status.direct_world_sensor()
 37        if not sensor_world.connected:
 38            print(f"Scene camera is not connected to {device}")
 39            return
 40
 41        restart_on_disconnect = True
 42
 43        queue_video = asyncio.Queue()
 44        queue_gaze = asyncio.Queue()
 45
 46        process_video = asyncio.create_task(
 47            enqueue_sensor_data(
 48                receive_video_frames(sensor_world.url, run_loop=restart_on_disconnect),
 49                queue_video,
 50            )
 51        )
 52        process_gaze = asyncio.create_task(
 53            enqueue_sensor_data(
 54                receive_gaze_data(sensor_gaze.url, run_loop=restart_on_disconnect),
 55                queue_gaze,
 56            )
 57        )
 58        try:
 59            await match_and_draw(queue_video, queue_gaze)
 60        finally:
 61            process_video.cancel()
 62            process_gaze.cancel()
 63
 64
 65async def enqueue_sensor_data(sensor: T.AsyncIterator, queue: asyncio.Queue) -> None:
 66    async for datum in sensor:
 67        try:
 68            queue.put_nowait((datum.datetime, datum))
 69        except asyncio.QueueFull:
 70            print(f"Queue is full, dropping {datum}")
 71
 72
 73async def match_and_draw(queue_video, queue_gaze):
 74    while True:
 75        video_datetime, video_frame = await get_most_recent_item(queue_video)
 76        _, gaze_datum = await get_closest_item(queue_gaze, video_datetime)
 77
 78        bgr_buffer = video_frame.to_ndarray(format="bgr24")
 79
 80        cv2.circle(
 81            bgr_buffer,
 82            (int(gaze_datum.x), int(gaze_datum.y)),
 83            radius=80,
 84            color=(0, 0, 255),
 85            thickness=15,
 86        )
 87
 88        cv2.imshow("Scene camera with gaze overlay", bgr_buffer)
 89        cv2.waitKey(1)
 90
 91
 92async def get_most_recent_item(queue):
 93    item = await queue.get()
 94    while True:
 95        try:
 96            next_item = queue.get_nowait()
 97        except asyncio.QueueEmpty:
 98            return item
 99        else:
100            item = next_item
101
102
103async def get_closest_item(queue, timestamp):
104    item_ts, item = await queue.get()
105    # assumes monotonically increasing timestamps
106    if item_ts > timestamp:
107        return item_ts, item
108    while True:
109        try:
110            next_item_ts, next_item = queue.get_nowait()
111        except asyncio.QueueEmpty:
112            return item_ts, item
113        else:
114            if next_item_ts > timestamp:
115                return next_item_ts, next_item
116            item_ts, item = next_item_ts, next_item
117
118
119if __name__ == "__main__":
120    with contextlib.suppress(KeyboardInterrupt):
121        asyncio.run(main())

Device Discovery

 1import asyncio
 2import contextlib
 3
 4from pupil_labs.realtime_api.discovery import Network, discover_devices
 5
 6
 7async def main():
 8    async with Network() as network:
 9        print("Looking for the next best device...\n\t", end="")
10        print(await network.wait_for_new_device(timeout_seconds=5))
11
12        print("---")
13        print("All devices after searching for additional 5 seconds:")
14        await asyncio.sleep(5)
15        print(network.devices)
16
17    print("---")
18    print("Starting new, indefinitive search... hit ctrl-c to stop.")
19    # optionally set timeout_seconds argument to limit search duration
20    async for device_info in discover_devices():
21        print(f"\t{device_info}")
22
23
24if __name__ == "__main__":
25    with contextlib.suppress(KeyboardInterrupt):
26        asyncio.run(main())

Time Offset Estimation

See pupil_labs.realtime_api.time_echo for details.

 1import asyncio
 2
 3from pupil_labs.realtime_api import Device, Network
 4from pupil_labs.realtime_api.time_echo import TimeOffsetEstimator
 5
 6
 7async def main():
 8    async with Network() as network:
 9        dev_info = await network.wait_for_new_device(timeout_seconds=5)
10    if dev_info is None:
11        print("No device could be found! Abort")
12        return
13
14    async with Device.from_discovered_device(dev_info) as device:
15        status = await device.get_status()
16
17        print(f"Device IP address: {status.phone.ip}")
18        print(f"Device Time Echo port: {status.phone.time_echo_port}")
19
20        if status.phone.time_echo_port is None:
21            print(
22                "You Pupil Invisible Companion app is out-of-date and does not yet "
23                "support the Time Echo protocol. Upgrade to version 1.4.28 or newer."
24            )
25            return
26
27        time_offset_estimator = TimeOffsetEstimator(
28            status.phone.ip, status.phone.time_echo_port
29        )
30        estimated_offset = await time_offset_estimator.estimate()
31        print(f"Mean time offset: {estimated_offset.time_offset_ms.mean} ms")
32        print(
33            "Mean roundtrip duration: "
34            f"{estimated_offset.roundtrip_duration_ms.mean} ms"
35        )
36
37
38if __name__ == "__main__":
39    asyncio.run(main())