Event Listener
pyportainer includes a built-in PortainerEventListener that maintains a streaming connection to the Docker events endpoint. Unlike the image watcher, which polls on a fixed interval, the event listener reacts in real time as events occur; container starts, stops, crashes, image pulls, network changes, and more.
How it works
- On
start(), a background asyncio task is created. - The task resolves which endpoints to listen to (all, or a specific one).
- One persistent HTTP streaming connection is opened per endpoint, concurrently.
- Each incoming Docker event is parsed and delivered to registered callbacks immediately.
- If a connection drops (network error, server restart), it is automatically re-established after
reconnect_interval. - Authentication errors are treated as fatal for that endpoint — no retry is attempted.
Basic usage
import asyncio
from datetime import timedelta
from pyportainer import Portainer, PortainerEventListener
from pyportainer.listener import PortainerEventListenerResult
async def on_event(result: PortainerEventListenerResult) -> None:
"""Handle an incoming Docker event."""
print(f"[endpoint {result.endpoint_id}] {result.event.type} {result.event.action}")
async def main() -> None:
async with Portainer(
api_url="http://localhost:9000",
api_key="YOUR_API_KEY",
) as portainer:
listener = PortainerEventListener(portainer)
listener.register_callback(on_event)
listener.start()
await asyncio.sleep(60) # Listen for a while
listener.stop()
if __name__ == "__main__":
asyncio.run(main())
Listening to a specific endpoint
Pass endpoint_id to open a stream to a single Portainer endpoint instead of all:
listener = PortainerEventListener(portainer, endpoint_id=1)
Filtering by event type
Use event_types to receive only the Docker event categories you care about. Accepted values match the Docker API's type filter: container, image, volume, network, daemon, plugin, node, service, secret, config.
listener = PortainerEventListener(
portainer,
event_types=["container", "image"],
)
Configuration
| Parameter | Type | Default | Description |
|---|---|---|---|
portainer |
Portainer |
— | The Portainer client instance |
endpoint_id |
int \| None |
None |
Endpoint to listen to. None listens to all endpoints |
event_types |
list[str] \| None |
None |
Docker event types to filter on. None means all types |
reconnect_interval |
timedelta |
5 seconds | How long to wait before reconnecting after a dropped connection |
debug |
bool |
False |
Enable debug-level logging |
Event data
Each callback receives a PortainerEventListenerResult:
| Field | Type | Description |
|---|---|---|
endpoint_id |
int |
The endpoint the event came from |
event |
DockerEvent |
The Docker event payload |
DockerEvent fields:
| Field | Type | Description |
|---|---|---|
type |
str \| None |
Event category: container, image, etc. |
action |
str \| None |
What happened: start, stop, die, etc. |
actor |
DockerEventActor \| None |
The object the event is about |
scope |
str \| None |
local or swarm |
time |
int \| None |
Unix timestamp (seconds) |
time_nano |
int \| None |
Unix timestamp (nanoseconds) |
DockerEventActor fields:
| Field | Type | Description |
|---|---|---|
id |
str \| None |
ID of the object (container ID, image name, etc.) |
attributes |
dict[str, str] \| None |
Extra metadata (image name, container name, etc.) |
Callbacks
Register a callback to be invoked for every event received. Both sync and async callables are supported.
Registering a callback
from pyportainer import PortainerEventListener
from pyportainer.listener import PortainerEventListenerResult
def on_event(result: PortainerEventListenerResult) -> None:
print(f"{result.event.type} {result.event.action} — {result.event.actor.id}")
listener = PortainerEventListener(portainer, endpoint_id=1)
listener.register_callback(on_event)
listener.start()
Async callbacks
async def on_event(result: PortainerEventListenerResult) -> None:
if result.event.action == "die":
await alert(f"Container {result.event.actor.id} has stopped unexpectedly")
listener.register_callback(on_event)
Filtering inside a callback
Callbacks receive every event that passes the event_types filter. Add further logic inside the callback:
def on_event(result: PortainerEventListenerResult) -> None:
if result.event.action not in ("start", "die"):
return
print(f"Container {result.event.actor.id}: {result.event.action}")
Unregistering a callback
listener.unregister_callback(on_event)
Notes
- Registering the same callable twice is silently ignored; it is only called once per event.
- Exceptions raised inside a callback are logged but do not stop the listener or prevent other callbacks from running.
- The
EventListenerCallbacktype alias is exported frompyportainerfor type annotations:from pyportainer import EventListenerCallback.
Runtime control
Stopping and restarting
listener.stop() # Cancels all streaming connections
listener.start() # Reconnects and starts listening again
Changing the reconnect interval
from datetime import timedelta
listener._reconnect_interval = timedelta(seconds=30)
Querying events directly
The underlying get_events and get_recent_events methods on the Portainer client are also available directly, without using PortainerEventListener.
Stream events in real time
get_events is an async generator that keeps the connection open and yields events as they arrive:
async for event in portainer.get_events(endpoint_id=1):
print(event.type, event.action)
Pass since to replay events from a specific point, or until to close the stream automatically at a timestamp:
from datetime import UTC, datetime, timedelta
async for event in portainer.get_events(
endpoint_id=1,
since=datetime.now(UTC) - timedelta(hours=1),
filters={"type": ["container"], "event": ["start", "die"]},
):
print(event.action, event.actor.id)
Fetch a bounded list of past events
get_recent_events collects all events in a time window into a list and returns once the window is exhausted:
from datetime import UTC, datetime, timedelta
events = await portainer.get_recent_events(
endpoint_id=1,
since=datetime.now(UTC) - timedelta(hours=1),
)
for event in events:
print(event.type, event.action)
until defaults to now, so the connection closes automatically. You can also pass an explicit end time:
events = await portainer.get_recent_events(
endpoint_id=1,
since=datetime(2024, 1, 1, tzinfo=UTC),
until=datetime(2024, 1, 2, tzinfo=UTC),
filters={"type": ["image"]},
)
API reference
Background Docker event listener.
PortainerEventListener
Maintains persistent streaming connections to Docker event endpoints.
One streaming connection is opened per endpoint. Events are delivered to
registered callbacks as they arrive, in real time. If a connection drops,
it is automatically re-established after reconnect_interval.
Source code in src/pyportainer/listener.py
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | |
__init__(portainer, endpoint_id=None, *, event_types=None, reconnect_interval=timedelta(seconds=5), debug=False)
Initialize the PortainerEventListener.
portainer: An authenticated Portainer client instance.
endpoint_id: The ID of the endpoint to listen to. If None, all
endpoints are monitored concurrently.
event_types: Docker event types to filter on, e.g.
``["container", "image"]``. If None, all event types are
delivered.
reconnect_interval: How long to wait before reconnecting after a
dropped connection. Defaults to 5 seconds.
debug: Enable debug logging.
Source code in src/pyportainer/listener.py
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | |
register_callback(callback)
Register a callback to be invoked for every Docker event received.
Both synchronous and async callables are supported. The callback
receives a single :class:PortainerEventListenerResult argument.
Each unique callable is only registered once; duplicates are ignored.
callback: A sync or async callable that accepts a
:class:`PortainerEventListenerResult`.
Source code in src/pyportainer/listener.py
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | |
start()
Start listening for Docker events.
Opens streaming connections immediately. Must be called from within a running asyncio event loop.
Source code in src/pyportainer/listener.py
73 74 75 76 77 78 79 80 | |
stop()
Stop all streaming connections.
Source code in src/pyportainer/listener.py
82 83 84 85 | |
unregister_callback(callback)
Remove a previously registered callback.
callback: The callable to remove. Raises :exc:`ValueError` if it
was not registered.
Source code in src/pyportainer/listener.py
103 104 105 106 107 108 109 110 111 112 | |
PortainerEventListenerResult
dataclass
Represents a single Docker event received from an endpoint.
Source code in src/pyportainer/listener.py
24 25 26 27 28 29 | |