Asynchronous Examples#

Note

The examples require Python 3.7+ to run and use the asyncio framework.

Remote Control#

Get Current Status#

 1import asyncio
 2
 3from pupil_labs.realtime_api import Device, Network
 4
 5
 6async def main():
 7    async with Network() as network:
 8        dev_info = await network.wait_for_new_device(timeout_seconds=5)
 9    if dev_info is None:
10        print("No device could be found! Abort")
11        return
12
13    async with Device.from_discovered_device(dev_info) as device:
14        status = await device.get_status()
15
16        print(f"Device IP address: {status.phone.ip}")
17        print(f"Battery level: {status.phone.battery_level} %")
18
19        print(f"Connected glasses: SN {status.hardware.glasses_serial}")
20        print(f"Connected scene camera: SN {status.hardware.world_camera_serial}")
21
22        world = status.direct_world_sensor()
23        print(f"World sensor: connected={world.connected} url={world.url}")
24
25        gaze = status.direct_gaze_sensor()
26        print(f"Gaze sensor: connected={gaze.connected} url={gaze.url}")
27
28
29if __name__ == "__main__":
30    asyncio.run(main())

Status Updates#

Wait for status updates from the device

 1import asyncio
 2import contextlib
 3
 4from pupil_labs.realtime_api import Device, Network
 5
 6
 7async def main():
 8    async with Network() as network:
 9        dev_info = await network.wait_for_new_device(timeout_seconds=5)
10    if dev_info is None:
11        print("No device could be found! Abort")
12        return
13
14    async with Device.from_discovered_device(dev_info) as device:
15        print("Waiting for status updates... hit ctrl-c to stop.")
16        async for changed in device.status_updates():
17            print(changed)
18
19
20if __name__ == "__main__":
21    with contextlib.suppress(KeyboardInterrupt):
22        asyncio.run(main())

Get a callback when there is a new status updates

 1import asyncio
 2
 3from pupil_labs.realtime_api import Device, Network, StatusUpdateNotifier
 4
 5
 6def print_component(component):
 7    print(component)
 8
 9
10async def main():
11    async with Network() as network:
12        dev_info = await network.wait_for_new_device(timeout_seconds=5)
13    if dev_info is None:
14        print("No device could be found! Abort")
15        return
16
17    async with Device.from_discovered_device(dev_info) as device:
18        duration = 20
19        print(f"Starting auto-update for {duration} seconds")
20        # callbacks can be awaitable, too
21        notifier = StatusUpdateNotifier(device, callbacks=[print_component])
22        await notifier.receive_updates_start()
23        await asyncio.sleep(duration)
24        print("Stopping auto-update")
25        await notifier.receive_updates_stop()
26
27
28if __name__ == "__main__":
29    asyncio.run(main())

Send Event#

An event without an explicit timestamp, will be timestamped on arrival at the Pupil Invisible Companion device.

 1import asyncio
 2import time
 3
 4from pupil_labs.realtime_api import Device, Network
 5
 6
 7async def main():
 8    async with Network() as network:
 9        dev_info = await network.wait_for_new_device(timeout_seconds=5)
10    if dev_info is None:
11        print("No device could be found! Abort")
12        return
13
14    async with Device.from_discovered_device(dev_info) as device:
15        # send event without timestamp
16        print(await device.send_event("test event"))
17
18        # send event with current timestamp
19        print(
20            await device.send_event(
21                "test event", event_timestamp_unix_ns=time.time_ns()
22            )
23        )
24
25
26if __name__ == "__main__":
27    asyncio.run(main())

Start, stop and save, and cancel recordings#

 1import asyncio
 2
 3from pupil_labs.realtime_api import Device, Network, StatusUpdateNotifier
 4from pupil_labs.realtime_api.models import Recording
 5
 6
 7async def print_recording(component):
 8    if isinstance(component, Recording):
 9        print(f"Update: {component.message}")
10
11
12async def main():
13    async with Network() as network:
14        dev_info = await network.wait_for_new_device(timeout_seconds=5)
15    if dev_info is None:
16        print("No device could be found! Abort")
17        return
18
19    async with Device.from_discovered_device(dev_info) as device:
20        # get update when recording is fully started
21        notifier = StatusUpdateNotifier(device, callbacks=[print_recording])
22        await notifier.receive_updates_start()
23        recording_id = await device.recording_start()
24        print(f"Initiated recording with id {recording_id}")
25        await asyncio.sleep(5)
26        print("Stopping recording")
27        await device.recording_stop_and_save()
28        # await control.recording_cancel()  # uncomment to cancel recording
29        await asyncio.sleep(2)  # wait for confirmation via auto-update
30        await notifier.receive_updates_stop()
31
32
33if __name__ == "__main__":
34    asyncio.run(main())

Streaming#

Gaze Data#

 1import asyncio
 2import contextlib
 3
 4from pupil_labs.realtime_api import Device, Network, receive_gaze_data
 5
 6
 7async def main():
 8    async with Network() as network:
 9        dev_info = await network.wait_for_new_device(timeout_seconds=5)
10    if dev_info is None:
11        print("No device could be found! Abort")
12        return
13
14    async with Device.from_discovered_device(dev_info) as device:
15        status = await device.get_status()
16        sensor_gaze = status.direct_gaze_sensor()
17        if not sensor_gaze.connected:
18            print(f"Gaze sensor is not connected to {device}")
19            return
20
21        restart_on_disconnect = True
22        async for gaze in receive_gaze_data(
23            sensor_gaze.url, run_loop=restart_on_disconnect
24        ):
25            print(gaze)
26
27
28if __name__ == "__main__":
29    with contextlib.suppress(KeyboardInterrupt):
30        asyncio.run(main())

Scene Camera Video#

 1import asyncio
 2import contextlib
 3
 4import cv2
 5
 6from pupil_labs.realtime_api import Device, Network, receive_video_frames
 7
 8
 9async def main():
10    async with Network() as network:
11        dev_info = await network.wait_for_new_device(timeout_seconds=5)
12    if dev_info is None:
13        print("No device could be found! Abort")
14        return
15
16    async with Device.from_discovered_device(dev_info) as device:
17        status = await device.get_status()
18        sensor_world = status.direct_world_sensor()
19        if not sensor_world.connected:
20            print(f"Scene camera is not connected to {device}")
21            return
22
23        restart_on_disconnect = True
24        async for frame in receive_video_frames(
25            sensor_world.url, run_loop=restart_on_disconnect
26        ):
27            bgr_buffer = frame.bgr_buffer()
28            draw_time(bgr_buffer, frame.datetime)
29            cv2.imshow("Scene Camera - Press ESC to quit", bgr_buffer)
30            if cv2.waitKey(1) & 0xFF == 27:
31                return
32
33
34def draw_time(frame, time):
35    frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
36    frame_txt_font_scale = 1.0
37    frame_txt_thickness = 1
38
39    # first line: frame index
40    frame_txt = str(time)
41
42    cv2.putText(
43        frame,
44        frame_txt,
45        (20, 50),
46        frame_txt_font_name,
47        frame_txt_font_scale,
48        (255, 255, 255),
49        thickness=frame_txt_thickness,
50        lineType=cv2.LINE_8,
51    )
52
53
54if __name__ == "__main__":
55    with contextlib.suppress(KeyboardInterrupt):
56        asyncio.run(main())

Eyes Camera Video#

 1import asyncio
 2import contextlib
 3import time
 4
 5import cv2
 6
 7from pupil_labs.realtime_api import Device, Network, receive_video_frames
 8
 9
10async def main(preview_frame_rate=30):
11    async with Network() as network:
12        dev_info = await network.wait_for_new_device(timeout_seconds=5)
13    if dev_info is None:
14        print("No device could be found! Abort")
15        return
16
17    async with Device.from_discovered_device(dev_info) as device:
18        status = await device.get_status()
19        sensor_eyes = status.direct_eyes_sensor()
20        if not sensor_eyes.connected:
21            print(f"Eyes camera is not connected to {device}")
22            return
23
24        restart_on_disconnect = True
25        _last_update = time.perf_counter()
26        async for frame in receive_video_frames(
27            sensor_eyes.url, run_loop=restart_on_disconnect
28        ):
29            bgr_buffer = frame.bgr_buffer()
30            draw_time(bgr_buffer, frame.datetime)
31            cv2.imshow("Eye Cameras - Press ESC to quit", bgr_buffer)
32
33            time_since_last_update = time.perf_counter() - _last_update
34            if time_since_last_update > 1 / preview_frame_rate:
35                if cv2.waitKey(1) & 0xFF == 27:
36                    return
37                _last_update = time.perf_counter()
38
39
40def draw_time(frame, time):
41    frame_txt_font_name = cv2.FONT_HERSHEY_SIMPLEX
42    frame_txt_font_scale = 0.5
43    frame_txt_thickness = 1
44
45    # first line: frame index
46    frame_txt = str(time)
47
48    cv2.putText(
49        frame,
50        frame_txt,
51        (20, 50),
52        frame_txt_font_name,
53        frame_txt_font_scale,
54        (255, 255, 255),
55        thickness=frame_txt_thickness,
56        lineType=cv2.LINE_8,
57    )
58
59
60if __name__ == "__main__":
61    with contextlib.suppress(KeyboardInterrupt):
62        asyncio.run(main())

Scene Camera Video With Overlayed Gaze#

This example processes two streams (video and gaze data) at the same time, matches each video frame with its temporally closest gaze point, and previews both in a window.

  1import asyncio
  2import contextlib
  3import typing as T
  4
  5import cv2
  6
  7from pupil_labs.realtime_api import (
  8    Device,
  9    Network,
 10    receive_gaze_data,
 11    receive_video_frames,
 12)
 13
 14
 15async def main():
 16    async with Network() as network:
 17        dev_info = await network.wait_for_new_device(timeout_seconds=5)
 18    if dev_info is None:
 19        print("No device could be found! Abort")
 20        return
 21
 22    async with Device.from_discovered_device(dev_info) as device:
 23        print(f"Getting status information from {device}")
 24        status = await device.get_status()
 25
 26        sensor_gaze = status.direct_gaze_sensor()
 27        if not sensor_gaze.connected:
 28            print(f"Gaze sensor is not connected to {device}")
 29            return
 30
 31        sensor_world = status.direct_world_sensor()
 32        if not sensor_world.connected:
 33            print(f"Scene camera is not connected to {device}")
 34            return
 35
 36        restart_on_disconnect = True
 37
 38        queue_video = asyncio.Queue()
 39        queue_gaze = asyncio.Queue()
 40
 41        process_video = asyncio.create_task(
 42            enqueue_sensor_data(
 43                receive_video_frames(sensor_world.url, run_loop=restart_on_disconnect),
 44                queue_video,
 45            )
 46        )
 47        process_gaze = asyncio.create_task(
 48            enqueue_sensor_data(
 49                receive_gaze_data(sensor_gaze.url, run_loop=restart_on_disconnect),
 50                queue_gaze,
 51            )
 52        )
 53        try:
 54            await match_and_draw(queue_video, queue_gaze)
 55        finally:
 56            process_video.cancel()
 57            process_gaze.cancel()
 58
 59
 60async def enqueue_sensor_data(sensor: T.AsyncIterator, queue: asyncio.Queue) -> None:
 61    async for datum in sensor:
 62        try:
 63            queue.put_nowait((datum.datetime, datum))
 64        except asyncio.QueueFull:
 65            print(f"Queue is full, dropping {datum}")
 66
 67
 68async def match_and_draw(queue_video, queue_gaze):
 69    while True:
 70        video_datetime, video_frame = await get_most_recent_item(queue_video)
 71        _, gaze_datum = await get_closest_item(queue_gaze, video_datetime)
 72
 73        bgr_buffer = video_frame.to_ndarray(format="bgr24")
 74
 75        cv2.circle(
 76            bgr_buffer,
 77            (int(gaze_datum.x), int(gaze_datum.y)),
 78            radius=80,
 79            color=(0, 0, 255),
 80            thickness=15,
 81        )
 82
 83        cv2.imshow("Scene camera with gaze overlay", bgr_buffer)
 84        cv2.waitKey(1)
 85
 86
 87async def get_most_recent_item(queue):
 88    item = await queue.get()
 89    while True:
 90        try:
 91            next_item = queue.get_nowait()
 92        except asyncio.QueueEmpty:
 93            return item
 94        else:
 95            item = next_item
 96
 97
 98async def get_closest_item(queue, timestamp):
 99    item_ts, item = await queue.get()
100    # assumes monotonically increasing timestamps
101    if item_ts > timestamp:
102        return item_ts, item
103    while True:
104        try:
105            next_item_ts, next_item = queue.get_nowait()
106        except asyncio.QueueEmpty:
107            return item_ts, item
108        else:
109            if next_item_ts > timestamp:
110                return next_item_ts, next_item
111            item_ts, item = next_item_ts, next_item
112
113
114if __name__ == "__main__":
115    with contextlib.suppress(KeyboardInterrupt):
116        asyncio.run(main())

Device Discovery#

 1import asyncio
 2import contextlib
 3
 4from pupil_labs.realtime_api.discovery import Network, discover_devices
 5
 6
 7async def main():
 8    async with Network() as network:
 9        print("Looking for the next best device...\n\t", end="")
10        print(await network.wait_for_new_device(timeout_seconds=5))
11
12        print("---")
13        print("All devices after searching for additional 5 seconds:")
14        await asyncio.sleep(5)
15        print(network.devices)
16
17    print("---")
18    print("Starting new, indefinitive search... hit ctrl-c to stop.")
19    # optionally set timeout_seconds argument to limit search duration
20    async for device_info in discover_devices():
21        print(f"\t{device_info}")
22
23
24if __name__ == "__main__":
25    with contextlib.suppress(KeyboardInterrupt):
26        asyncio.run(main())

Time Offset Estimation#

See pupil_labs.realtime_api.time_echo for details.

 1import asyncio
 2
 3from pupil_labs.realtime_api import Device, Network
 4from pupil_labs.realtime_api.time_echo import TimeOffsetEstimator
 5
 6
 7async def main():
 8    async with Network() as network:
 9        dev_info = await network.wait_for_new_device(timeout_seconds=5)
10    if dev_info is None:
11        print("No device could be found! Abort")
12        return
13
14    async with Device.from_discovered_device(dev_info) as device:
15        status = await device.get_status()
16
17        print(f"Device IP address: {status.phone.ip}")
18        print(f"Device Time Echo port: {status.phone.time_echo_port}")
19
20        if status.phone.time_echo_port is None:
21            print(
22                "You Pupil Invisible Companion app is out-of-date and does not yet "
23                "support the Time Echo protocol. Upgrade to version 1.4.28 or newer."
24            )
25            return
26
27        time_offset_estimator = TimeOffsetEstimator(
28            status.phone.ip, status.phone.time_echo_port
29        )
30        estimated_offset = await time_offset_estimator.estimate()
31        print(f"Mean time offset: {estimated_offset.time_offset_ms.mean} ms")
32        print(
33            "Mean roundtrip duration: "
34            f"{estimated_offset.roundtrip_duration_ms.mean} ms"
35        )
36
37
38if __name__ == "__main__":
39    asyncio.run(main())