Asynchronous Examples

Note

The examples require Python 3.7+ to run and use the asyncio framework.

Remote Control

Get Current Status

 1import asyncio
 2
 3from pupil_labs.realtime_api import Device, Network
 4
 5
 6async def main():
 7    async with Network() as network:
 8        dev_info = await network.wait_for_new_device(timeout_seconds=5)
 9    if dev_info is None:
10        print("No device could be found! Abort")
11        return
12
13    async with Device.from_discovered_device(dev_info) as device:
14        status = await device.get_status()
15
16        print(f"Device IP address: {status.phone.ip}")
17        print(f"Battery level: {status.phone.battery_level} %")
18
19        print(f"Connected glasses: SN {status.hardware.glasses_serial}")
20        print(f"Connected scene camera: SN {status.hardware.world_camera_serial}")
21
22        world = status.direct_world_sensor()
23        print(f"World sensor: connected={world.connected} url={world.url}")
24
25        gaze = status.direct_gaze_sensor()
26        print(f"Gaze sensor: connected={gaze.connected} url={gaze.url}")
27
28
29if __name__ == "__main__":
30    asyncio.run(main())

Status Updates

Wait for status updates from the device

 1import asyncio
 2import contextlib
 3
 4from pupil_labs.realtime_api import Device, Network
 5
 6
 7async def main():
 8    async with Network() as network:
 9        dev_info = await network.wait_for_new_device(timeout_seconds=5)
10    if dev_info is None:
11        print("No device could be found! Abort")
12        return
13
14    async with Device.from_discovered_device(dev_info) as device:
15        print("Waiting for status updates... hit ctrl-c to stop.")
16        async for changed in device.status_updates():
17            print(changed)
18
19
20if __name__ == "__main__":
21    with contextlib.suppress(KeyboardInterrupt):
22        asyncio.run(main())

Get a callback when there is a new status updates

 1import asyncio
 2
 3from pupil_labs.realtime_api import Device, Network, StatusUpdateNotifier
 4
 5
 6def print_component(component):
 7    print(component)
 8
 9
10async def main():
11    async with Network() as network:
12        dev_info = await network.wait_for_new_device(timeout_seconds=5)
13    if dev_info is None:
14        print("No device could be found! Abort")
15        return
16
17    async with Device.from_discovered_device(dev_info) as device:
18        duration = 20
19        print(f"Starting auto-update for {duration} seconds")
20        # callbacks can be awaitable, too
21        notifier = StatusUpdateNotifier(device, callbacks=[print_component])
22        await notifier.receive_updates_start()
23        await asyncio.sleep(duration)
24        print("Stopping auto-update")
25        await notifier.receive_updates_stop()
26
27
28if __name__ == "__main__":
29    asyncio.run(main())

Send Event

An event without an explicit timestamp, will be timestamped on arrival at the Pupil Invisible Companion device.

 1import asyncio
 2import time
 3
 4from pupil_labs.realtime_api import Device, Network
 5
 6
 7async def main():
 8    async with Network() as network:
 9        dev_info = await network.wait_for_new_device(timeout_seconds=5)
10    if dev_info is None:
11        print("No device could be found! Abort")
12        return
13
14    async with Device.from_discovered_device(dev_info) as device:
15        # send event without timestamp
16        print(await device.send_event("test event"))
17
18        # send event with current timestamp
19        print(
20            await device.send_event(
21                "test event", event_timestamp_unix_ns=time.time_ns()
22            )
23        )
24
25
26if __name__ == "__main__":
27    asyncio.run(main())

Start, stop and save, and cancel recordings

 1import asyncio
 2
 3from pupil_labs.realtime_api import Device, Network, StatusUpdateNotifier
 4from pupil_labs.realtime_api.models import Recording
 5
 6
 7async def print_recording(component):
 8    if isinstance(component, Recording):
 9        print(f"Update: {component.message}")
10
11
12async def main():
13    async with Network() as network:
14        dev_info = await network.wait_for_new_device(timeout_seconds=5)
15    if dev_info is None:
16        print("No device could be found! Abort")
17        return
18
19    async with Device.from_discovered_device(dev_info) as device:
20        # get update when recording is fully started
21        notifier = StatusUpdateNotifier(device, callbacks=[print_recording])
22        await notifier.receive_updates_start()
23        recording_id = await device.recording_start()
24        print(f"Initiated recording with id {recording_id}")
25        await asyncio.sleep(5)
26        print("Stopping recording")
27        await device.recording_stop_and_save()
28        # await control.recording_cancel()  # uncomment to cancel recording
29        await asyncio.sleep(2)  # wait for confirmation via auto-update
30        await notifier.receive_updates_stop()
31
32
33if __name__ == "__main__":
34    asyncio.run(main())

Streaming

Gaze Data

 1import asyncio
 2import contextlib
 3
 4from pupil_labs.realtime_api import Device, Network, receive_gaze_data
 5
 6
 7async def main():
 8    async with Network() as network:
 9        dev_info = await network.wait_for_new_device(timeout_seconds=5)
10    if dev_info is None:
11        print("No device could be found! Abort")
12        return
13
14    async with Device.from_discovered_device(dev_info) as device:
15        status = await device.get_status()
16        sensor_gaze = status.direct_gaze_sensor()
17        if not sensor_gaze.connected:
18            print(f"Gaze sensor is not connected to {device}")
19            return
20
21        restart_on_disconnect = True
22        async for gaze in receive_gaze_data(
23            sensor_gaze.url, run_loop=restart_on_disconnect
24        ):
25            print(gaze)
26
27
28if __name__ == "__main__":
29    with contextlib.suppress(KeyboardInterrupt):
30        asyncio.run(main())

Scene Camera Video

 1import asyncio
 2import contextlib
 3
 4import cv2
 5
 6from pupil_labs.realtime_api import Device, Network, receive_video_frames
 7
 8
 9async def main():
10    async with Network() as network:
11        dev_info = await network.wait_for_new_device(timeout_seconds=5)
12    if dev_info is None:
13        print("No device could be found! Abort")
14        return
15
16    async with Device.from_discovered_device(dev_info) as device:
17        status = await device.get_status()
18        sensor_world = status.direct_world_sensor()
19        if not sensor_world.connected:
20            print(f"Scene camera is not connected to {device}")
21            return
22
23        restart_on_disconnect = True
24        async for frame in receive_video_frames(
25            sensor_world.url, run_loop=restart_on_disconnect
26        ):
27            bgr_buffer = frame.bgr_buffer()
28            draw_time(bgr_buffer, frame.datetime)
29            cv2.imshow("Scene Camera - Press ESC to quit", bgr_buffer)
30            if cv2.waitKey(1) & 0xFF == 27:
31                return
32
33
34def draw_time(frame, time):
35    frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
36    frame_txt_font_scale = 1.0
37    frame_txt_thickness = 1
38
39    # first line: frame index
40    frame_txt = str(time)
41
42    cv2.putText(
43        frame,
44        frame_txt,
45        (20, 50),
46        frame_txt_font_name,
47        frame_txt_font_scale,
48        (255, 255, 255),
49        thickness=frame_txt_thickness,
50        lineType=cv2.LINE_8,
51    )
52
53
54if __name__ == "__main__":
55    with contextlib.suppress(KeyboardInterrupt):
56        asyncio.run(main())

Eyes Camera Video

 1import asyncio
 2import contextlib
 3import time
 4
 5import cv2
 6
 7from pupil_labs.realtime_api import Device, Network, receive_video_frames
 8
 9
10async def main(preview_frame_rate=30):
11    async with Network() as network:
12        dev_info = await network.wait_for_new_device(timeout_seconds=5)
13    if dev_info is None:
14        print("No device could be found! Abort")
15        return
16
17    async with Device.from_discovered_device(dev_info) as device:
18        status = await device.get_status()
19        sensor_eyes = status.direct_eyes_sensor()
20        if not sensor_eyes.connected:
21            print(f"Eyes camera is not connected to {device}")
22            return
23
24        restart_on_disconnect = True
25        _last_update = time.perf_counter()
26        async for frame in receive_video_frames(
27            sensor_eyes.url, run_loop=restart_on_disconnect
28        ):
29            bgr_buffer = frame.bgr_buffer()
30            draw_time(bgr_buffer, frame.datetime)
31            cv2.imshow("Eye Cameras - Press ESC to quit", bgr_buffer)
32
33            time_since_last_update = time.perf_counter() - _last_update
34            if time_since_last_update > 1 / preview_frame_rate:
35                if cv2.waitKey(1) & 0xFF == 27:
36                    return
37                _last_update = time.perf_counter()
38
39
40def draw_time(frame, time):
41    frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
42    frame_txt_font_scale = 0.5
43    frame_txt_thickness = 1
44
45    # first line: frame index
46    frame_txt = str(time)
47
48    cv2.putText(
49        frame,
50        frame_txt,
51        (20, 50),
52        frame_txt_font_name,
53        frame_txt_font_scale,
54        (255, 255, 255),
55        thickness=frame_txt_thickness,
56        lineType=cv2.LINE_8,
57    )
58
59
60if __name__ == "__main__":
61    with contextlib.suppress(KeyboardInterrupt):
62        asyncio.run(main())

Scene Camera Video With Overlayed Gaze

This example processes two streams (video and gaze data) at the same time, matches each video frame with its temporally closest gaze point, and previews both in a window.

  1import asyncio
  2import contextlib
  3import typing as T
  4
  5import cv2
  6
  7from pupil_labs.realtime_api import (
  8    Device,
  9    Network,
 10    receive_gaze_data,
 11    receive_video_frames,
 12)
 13
 14
 15async def main():
 16    async with Network() as network:
 17        dev_info = await network.wait_for_new_device(timeout_seconds=5)
 18    if dev_info is None:
 19        print("No device could be found! Abort")
 20        return
 21
 22    async with Device.from_discovered_device(dev_info) as device:
 23        print(f"Getting status information from {device}")
 24        status = await device.get_status()
 25
 26        sensor_gaze = status.direct_gaze_sensor()
 27        if not sensor_gaze.connected:
 28            print(f"Gaze sensor is not connected to {device}")
 29            return
 30
 31        sensor_world = status.direct_world_sensor()
 32        if not sensor_world.connected:
 33            print(f"Scene camera is not connected to {device}")
 34            return
 35
 36        restart_on_disconnect = True
 37
 38        queue_video = asyncio.Queue()
 39        queue_gaze = asyncio.Queue()
 40
 41        process_video = asyncio.create_task(
 42            enqueue_sensor_data(
 43                receive_video_frames(sensor_world.url, run_loop=restart_on_disconnect),
 44                queue_video,
 45            )
 46        )
 47        process_gaze = asyncio.create_task(
 48            enqueue_sensor_data(
 49                receive_gaze_data(sensor_gaze.url, run_loop=restart_on_disconnect),
 50                queue_gaze,
 51            )
 52        )
 53        try:
 54            await match_and_draw(queue_video, queue_gaze)
 55        finally:
 56            process_video.cancel()
 57            process_gaze.cancel()
 58
 59
 60async def enqueue_sensor_data(sensor: T.AsyncIterator, queue: asyncio.Queue) -> None:
 61    async for datum in sensor:
 62        try:
 63            queue.put_nowait((datum.datetime, datum))
 64        except asyncio.QueueFull:
 65            print(f"Queue is full, dropping {datum}")
 66
 67
 68async def match_and_draw(queue_video, queue_gaze):
 69    while True:
 70        video_datetime, video_frame = await get_most_recent_item(queue_video)
 71        _, gaze_datum = await get_closest_item(queue_gaze, video_datetime)
 72
 73        bgr_buffer = video_frame.to_ndarray(format="bgr24")
 74
 75        cv2.circle(
 76            bgr_buffer,
 77            (int(gaze_datum.x), int(gaze_datum.y)),
 78            radius=80,
 79            color=(0, 0, 255),
 80            thickness=15,
 81        )
 82
 83        cv2.imshow("Scene camera with gaze overlay", bgr_buffer)
 84        cv2.waitKey(1)
 85
 86
 87async def get_most_recent_item(queue):
 88    item = await queue.get()
 89    while True:
 90        try:
 91            next_item = queue.get_nowait()
 92        except asyncio.QueueEmpty:
 93            return item
 94        else:
 95            item = next_item
 96
 97
 98async def get_closest_item(queue, timestamp):
 99    item_ts, item = await queue.get()
100    # assumes monotonically increasing timestamps
101    if item_ts > timestamp:
102        return item_ts, item
103    while True:
104        try:
105            next_item_ts, next_item = queue.get_nowait()
106        except asyncio.QueueEmpty:
107            return item_ts, item
108        else:
109            if next_item_ts > timestamp:
110                return next_item_ts, next_item
111            item_ts, item = next_item_ts, next_item
112
113
114if __name__ == "__main__":
115    with contextlib.suppress(KeyboardInterrupt):
116        asyncio.run(main())

Device Discovery

 1import asyncio
 2import contextlib
 3
 4from pupil_labs.realtime_api.discovery import Network, discover_devices
 5
 6
 7async def main():
 8    async with Network() as network:
 9        print("Looking for the next best device...\n\t", end="")
10        print(await network.wait_for_new_device(timeout_seconds=5))
11
12        print("---")
13        print("All devices after searching for additional 5 seconds:")
14        await asyncio.sleep(5)
15        print(network.devices)
16
17    print("---")
18    print("Starting new, indefinitive search... hit ctrl-c to stop.")
19    # optionally set timeout_seconds argument to limit search duration
20    async for device_info in discover_devices():
21        print(f"\t{device_info}")
22
23
24if __name__ == "__main__":
25    with contextlib.suppress(KeyboardInterrupt):
26        asyncio.run(main())

Time Offset Estimation

See pupil_labs.realtime_api.time_echo for details.

 1import asyncio
 2
 3from pupil_labs.realtime_api import Device, Network
 4from pupil_labs.realtime_api.time_echo import TimeOffsetEstimator
 5
 6
 7async def main():
 8    async with Network() as network:
 9        dev_info = await network.wait_for_new_device(timeout_seconds=5)
10    if dev_info is None:
11        print("No device could be found! Abort")
12        return
13
14    async with Device.from_discovered_device(dev_info) as device:
15        status = await device.get_status()
16
17        print(f"Device IP address: {status.phone.ip}")
18        print(f"Device Time Echo port: {status.phone.time_echo_port}")
19
20        if status.phone.time_echo_port is None:
21            print(
22                "You Pupil Invisible Companion app is out-of-date and does not yet "
23                "support the Time Echo protocol. Upgrade to version 1.4.28 or newer."
24            )
25            return
26
27        time_offset_estimator = TimeOffsetEstimator(
28            status.phone.ip, status.phone.time_echo_port
29        )
30        estimated_offset = await time_offset_estimator.estimate()
31        print(f"Mean time offset: {estimated_offset.time_offset_ms.mean} ms")
32        print(
33            "Mean roundtrip duration: "
34            f"{estimated_offset.roundtrip_duration_ms.mean} ms"
35        )
36
37
38if __name__ == "__main__":
39    asyncio.run(main())