Skip to content

Event Listener

pyportainer includes a built-in PortainerEventListener that maintains a streaming connection to the Docker events endpoint. Unlike the image watcher, which polls on a fixed interval, the event listener reacts in real time as events occur; container starts, stops, crashes, image pulls, network changes, and more.

How it works

  1. On start(), a background asyncio task is created.
  2. The task resolves which endpoints to listen to (all, or a specific one).
  3. One persistent HTTP streaming connection is opened per endpoint, concurrently.
  4. Each incoming Docker event is parsed and delivered to registered callbacks immediately.
  5. If a connection drops (network error, server restart), it is automatically re-established after reconnect_interval.
  6. Authentication errors are treated as fatal for that endpoint — no retry is attempted.

Basic usage

import asyncio
from datetime import timedelta

from pyportainer import Portainer, PortainerEventListener
from pyportainer.listener import PortainerEventListenerResult


async def on_event(result: PortainerEventListenerResult) -> None:
    """Handle an incoming Docker event."""
    print(f"[endpoint {result.endpoint_id}] {result.event.type} {result.event.action}")


async def main() -> None:
    async with Portainer(
        api_url="http://localhost:9000",
        api_key="YOUR_API_KEY",
    ) as portainer:
        listener = PortainerEventListener(portainer)
        listener.register_callback(on_event)
        listener.start()

        await asyncio.sleep(60)  # Listen for a while

        listener.stop()


if __name__ == "__main__":
    asyncio.run(main())

Listening to a specific endpoint

Pass endpoint_id to open a stream to a single Portainer endpoint instead of all:

listener = PortainerEventListener(portainer, endpoint_id=1)

Filtering by event type

Use event_types to receive only the Docker event categories you care about. Accepted values match the Docker API's type filter: container, image, volume, network, daemon, plugin, node, service, secret, config.

listener = PortainerEventListener(
    portainer,
    event_types=["container", "image"],
)

Configuration

Parameter Type Default Description
portainer Portainer The Portainer client instance
endpoint_id int \| None None Endpoint to listen to. None listens to all endpoints
event_types list[str] \| None None Docker event types to filter on. None means all types
reconnect_interval timedelta 5 seconds How long to wait before reconnecting after a dropped connection
debug bool False Enable debug-level logging

Event data

Each callback receives a PortainerEventListenerResult:

Field Type Description
endpoint_id int The endpoint the event came from
event DockerEvent The Docker event payload

DockerEvent fields:

Field Type Description
type str \| None Event category: container, image, etc.
action str \| None What happened: start, stop, die, etc.
actor DockerEventActor \| None The object the event is about
scope str \| None local or swarm
time int \| None Unix timestamp (seconds)
time_nano int \| None Unix timestamp (nanoseconds)

DockerEventActor fields:

Field Type Description
id str \| None ID of the object (container ID, image name, etc.)
attributes dict[str, str] \| None Extra metadata (image name, container name, etc.)

Callbacks

Register a callback to be invoked for every event received. Both sync and async callables are supported.

Registering a callback

from pyportainer import PortainerEventListener
from pyportainer.listener import PortainerEventListenerResult


def on_event(result: PortainerEventListenerResult) -> None:
    print(f"{result.event.type} {result.event.action}{result.event.actor.id}")


listener = PortainerEventListener(portainer, endpoint_id=1)
listener.register_callback(on_event)
listener.start()

Async callbacks

async def on_event(result: PortainerEventListenerResult) -> None:
    if result.event.action == "die":
        await alert(f"Container {result.event.actor.id} has stopped unexpectedly")


listener.register_callback(on_event)

Filtering inside a callback

Callbacks receive every event that passes the event_types filter. Add further logic inside the callback:

def on_event(result: PortainerEventListenerResult) -> None:
    if result.event.action not in ("start", "die"):
        return
    print(f"Container {result.event.actor.id}: {result.event.action}")

Unregistering a callback

listener.unregister_callback(on_event)

Notes

  • Registering the same callable twice is silently ignored; it is only called once per event.
  • Exceptions raised inside a callback are logged but do not stop the listener or prevent other callbacks from running.
  • The EventListenerCallback type alias is exported from pyportainer for type annotations: from pyportainer import EventListenerCallback.

Runtime control

Stopping and restarting

listener.stop()   # Cancels all streaming connections
listener.start()  # Reconnects and starts listening again

Changing the reconnect interval

from datetime import timedelta

listener._reconnect_interval = timedelta(seconds=30)

Querying events directly

The underlying get_events and get_recent_events methods on the Portainer client are also available directly, without using PortainerEventListener.

Stream events in real time

get_events is an async generator that keeps the connection open and yields events as they arrive:

async for event in portainer.get_events(endpoint_id=1):
    print(event.type, event.action)

Pass since to replay events from a specific point, or until to close the stream automatically at a timestamp:

from datetime import UTC, datetime, timedelta

async for event in portainer.get_events(
    endpoint_id=1,
    since=datetime.now(UTC) - timedelta(hours=1),
    filters={"type": ["container"], "event": ["start", "die"]},
):
    print(event.action, event.actor.id)

Fetch a bounded list of past events

get_recent_events collects all events in a time window into a list and returns once the window is exhausted:

from datetime import UTC, datetime, timedelta

events = await portainer.get_recent_events(
    endpoint_id=1,
    since=datetime.now(UTC) - timedelta(hours=1),
)
for event in events:
    print(event.type, event.action)

until defaults to now, so the connection closes automatically. You can also pass an explicit end time:

events = await portainer.get_recent_events(
    endpoint_id=1,
    since=datetime(2024, 1, 1, tzinfo=UTC),
    until=datetime(2024, 1, 2, tzinfo=UTC),
    filters={"type": ["image"]},
)

API reference

Background Docker event listener.

PortainerEventListener

Maintains persistent streaming connections to Docker event endpoints.

One streaming connection is opened per endpoint. Events are delivered to registered callbacks as they arrive, in real time. If a connection drops, it is automatically re-established after reconnect_interval.

Source code in src/pyportainer/listener.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class PortainerEventListener:
    """Maintains persistent streaming connections to Docker event endpoints.

    One streaming connection is opened per endpoint. Events are delivered to
    registered callbacks as they arrive, in real time. If a connection drops,
    it is automatically re-established after ``reconnect_interval``.
    """

    def __init__(  # pylint: disable=too-many-arguments,too-many-instance-attributes
        self,
        portainer: Portainer,
        endpoint_id: int | None = None,
        *,
        event_types: list[str] | None = None,
        reconnect_interval: timedelta = timedelta(seconds=5),
        debug: bool = False,
    ) -> None:
        """Initialize the PortainerEventListener.

        Args:
        ----
            portainer: An authenticated Portainer client instance.
            endpoint_id: The ID of the endpoint to listen to. If None, all
                endpoints are monitored concurrently.
            event_types: Docker event types to filter on, e.g.
                ``["container", "image"]``. If None, all event types are
                delivered.
            reconnect_interval: How long to wait before reconnecting after a
                dropped connection. Defaults to 5 seconds.
            debug: Enable debug logging.

        """
        self._portainer = portainer
        self._endpoint_id = endpoint_id
        self._event_types = event_types
        self._reconnect_interval = reconnect_interval
        self._task: asyncio.Task[None] | None = None
        self._callbacks: list[EventListenerCallback] = []

        _LOGGER.setLevel(logging.DEBUG if debug else logging.INFO)

    def start(self) -> None:
        """Start listening for Docker events.

        Opens streaming connections immediately. Must be called from within a
        running asyncio event loop.
        """
        if self._task is None or self._task.done():
            self._task = asyncio.create_task(self._run())

    def stop(self) -> None:
        """Stop all streaming connections."""
        if self._task and not self._task.done():
            self._task.cancel()

    def register_callback(self, callback: EventListenerCallback) -> None:
        """Register a callback to be invoked for every Docker event received.

        Both synchronous and async callables are supported. The callback
        receives a single :class:`PortainerEventListenerResult` argument.
        Each unique callable is only registered once; duplicates are ignored.

        Args:
        ----
            callback: A sync or async callable that accepts a
                :class:`PortainerEventListenerResult`.

        """
        if callback not in self._callbacks:
            self._callbacks.append(callback)

    def unregister_callback(self, callback: EventListenerCallback) -> None:
        """Remove a previously registered callback.

        Args:
        ----
            callback: The callable to remove. Raises :exc:`ValueError` if it
                was not registered.

        """
        self._callbacks.remove(callback)

    async def _fire_callbacks(self, result: PortainerEventListenerResult) -> None:
        """Invoke all registered callbacks for a single event.

        Exceptions raised by individual callbacks are logged but do not stop
        the listener.
        """
        for callback in list(self._callbacks):
            try:
                ret = callback(result)
                if asyncio.iscoroutine(ret):
                    await ret
            except Exception:  # pylint: disable=broad-except
                _LOGGER.exception(
                    "Callback raised an exception for event %s on endpoint %s",
                    result.event.action,
                    result.endpoint_id,
                )

    async def _listen(self, endpoint_id: int) -> None:
        """Stream events from a single endpoint and fire callbacks for each.

        Args:
        ----
            endpoint_id: The endpoint to stream events from.

        """
        filters = {"type": self._event_types} if self._event_types else None
        async for event in self._portainer.get_events(endpoint_id, filters=filters):
            result = PortainerEventListenerResult(endpoint_id=endpoint_id, event=event)
            await self._fire_callbacks(result)

    async def _listen_with_reconnect(self, endpoint_id: int) -> None:
        """Stream events from an endpoint, reconnecting on transient errors.

        Authentication errors are treated as fatal and stop the listener for
        that endpoint. All other :class:`~pyportainer.exceptions.PortainerError`
        subclasses trigger a reconnect after ``reconnect_interval``.

        Args:
        ----
            endpoint_id: The endpoint to stream events from.

        """
        while True:
            try:
                await self._listen(endpoint_id)
            except PortainerAuthenticationError:
                _LOGGER.exception(
                    "Authentication error for endpoint %s, stopping listener",
                    endpoint_id,
                )
                return
            except PortainerTimeoutError:
                _LOGGER.warning(
                    "Timeout on endpoint %s, reconnecting in %ss",
                    endpoint_id,
                    self._reconnect_interval.total_seconds(),
                )
            except PortainerConnectionError:
                _LOGGER.warning(
                    "Connection lost on endpoint %s, reconnecting in %ss",
                    endpoint_id,
                    self._reconnect_interval.total_seconds(),
                )
            except PortainerError:
                _LOGGER.exception(
                    "Error on endpoint %s, reconnecting in %ss",
                    endpoint_id,
                    self._reconnect_interval.total_seconds(),
                )

            await asyncio.sleep(self._reconnect_interval.total_seconds())

    async def _run(self) -> None:
        """Resolve endpoints and open a streaming connection to each.

        Runs all per-endpoint listeners concurrently via :func:`asyncio.gather`.
        """
        if self._endpoint_id is not None:
            endpoint_ids: list[int] = [self._endpoint_id]
        else:
            _LOGGER.debug("No endpoint_id specified, fetching all endpoints to listen to.")
            endpoints = await self._portainer.get_endpoints()
            endpoint_ids = [endpoint.id for endpoint in endpoints]

        await asyncio.gather(*(self._listen_with_reconnect(ep_id) for ep_id in endpoint_ids))

__init__(portainer, endpoint_id=None, *, event_types=None, reconnect_interval=timedelta(seconds=5), debug=False)

Initialize the PortainerEventListener.


portainer: An authenticated Portainer client instance.
endpoint_id: The ID of the endpoint to listen to. If None, all
    endpoints are monitored concurrently.
event_types: Docker event types to filter on, e.g.
    ``["container", "image"]``. If None, all event types are
    delivered.
reconnect_interval: How long to wait before reconnecting after a
    dropped connection. Defaults to 5 seconds.
debug: Enable debug logging.
Source code in src/pyportainer/listener.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def __init__(  # pylint: disable=too-many-arguments,too-many-instance-attributes
    self,
    portainer: Portainer,
    endpoint_id: int | None = None,
    *,
    event_types: list[str] | None = None,
    reconnect_interval: timedelta = timedelta(seconds=5),
    debug: bool = False,
) -> None:
    """Initialize the PortainerEventListener.

    Args:
    ----
        portainer: An authenticated Portainer client instance.
        endpoint_id: The ID of the endpoint to listen to. If None, all
            endpoints are monitored concurrently.
        event_types: Docker event types to filter on, e.g.
            ``["container", "image"]``. If None, all event types are
            delivered.
        reconnect_interval: How long to wait before reconnecting after a
            dropped connection. Defaults to 5 seconds.
        debug: Enable debug logging.

    """
    self._portainer = portainer
    self._endpoint_id = endpoint_id
    self._event_types = event_types
    self._reconnect_interval = reconnect_interval
    self._task: asyncio.Task[None] | None = None
    self._callbacks: list[EventListenerCallback] = []

    _LOGGER.setLevel(logging.DEBUG if debug else logging.INFO)

register_callback(callback)

Register a callback to be invoked for every Docker event received.

Both synchronous and async callables are supported. The callback receives a single :class:PortainerEventListenerResult argument. Each unique callable is only registered once; duplicates are ignored.


callback: A sync or async callable that accepts a
    :class:`PortainerEventListenerResult`.
Source code in src/pyportainer/listener.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def register_callback(self, callback: EventListenerCallback) -> None:
    """Register a callback to be invoked for every Docker event received.

    Both synchronous and async callables are supported. The callback
    receives a single :class:`PortainerEventListenerResult` argument.
    Each unique callable is only registered once; duplicates are ignored.

    Args:
    ----
        callback: A sync or async callable that accepts a
            :class:`PortainerEventListenerResult`.

    """
    if callback not in self._callbacks:
        self._callbacks.append(callback)

start()

Start listening for Docker events.

Opens streaming connections immediately. Must be called from within a running asyncio event loop.

Source code in src/pyportainer/listener.py
73
74
75
76
77
78
79
80
def start(self) -> None:
    """Start listening for Docker events.

    Opens streaming connections immediately. Must be called from within a
    running asyncio event loop.
    """
    if self._task is None or self._task.done():
        self._task = asyncio.create_task(self._run())

stop()

Stop all streaming connections.

Source code in src/pyportainer/listener.py
82
83
84
85
def stop(self) -> None:
    """Stop all streaming connections."""
    if self._task and not self._task.done():
        self._task.cancel()

unregister_callback(callback)

Remove a previously registered callback.


callback: The callable to remove. Raises :exc:`ValueError` if it
    was not registered.
Source code in src/pyportainer/listener.py
103
104
105
106
107
108
109
110
111
112
def unregister_callback(self, callback: EventListenerCallback) -> None:
    """Remove a previously registered callback.

    Args:
    ----
        callback: The callable to remove. Raises :exc:`ValueError` if it
            was not registered.

    """
    self._callbacks.remove(callback)

PortainerEventListenerResult dataclass

Represents a single Docker event received from an endpoint.

Source code in src/pyportainer/listener.py
24
25
26
27
28
29
@dataclass(frozen=True)
class PortainerEventListenerResult:
    """Represents a single Docker event received from an endpoint."""

    endpoint_id: int
    event: DockerEvent