Skip to content

Image Update Watcher

pyportainer comes with a built-in background watcher: PortainerImageWatcher. The watcher can continuously monitor your running Docker containers for available image updates.

It polls Portainer at a configurable interval, compares the local image digest of each running container against the digest in the registry, and stores the results so your application can react to available updates without blocking.

How it works

  1. On start(), a background asyncio task is created.
  2. The first check runs immediately, then repeats after each configured interval.
  3. For every endpoint (or a specific one), the watcher fetches all running containers, deduplicates by image, and concurrently calls container_image_status() for each unique image.
  4. Results are stored internally and exposed via watcher.results.
  5. Errors for individual containers or endpoints are logged but never stop the polling loop.

Installation

No extra dependencies are needed, the watcher is part of the core pyportainer package and runs on asyncio.

Basic usage

import asyncio
from datetime import timedelta

from pyportainer import Portainer, PortainerImageWatcher


async def main() -> None:
    async with Portainer(
        api_url="http://localhost:9000",
        api_key="YOUR_API_KEY",
    ) as portainer:
        watcher = PortainerImageWatcher(
            portainer,
            interval=timedelta(hours=6),
        )

        watcher.start()

        # Depending on the registires it may take some time for the first check to complete
        # So we wait a bit before accessing results
        await asyncio.sleep(timedelta(minutes=1))
        for (endpoint_id, container_id), result in watcher.results.items():
            if result.status and result.status.update_available:
                print(f"Update available for container {container_id} on endpoint {endpoint_id}")

        watcher.stop()


if __name__ == "__main__":
    asyncio.run(main())

Watching a specific endpoint

Pass endpoint_id to limit monitoring to a single Portainer endpoint:

watcher = PortainerImageWatcher(
    portainer,
    endpoint_id=1,
    interval=timedelta(hours=12),
)

Configuration

Parameter Type Default Description
portainer Portainer The Portainer client instance
endpoint_id int \| None None Endpoint to monitor. None watches all endpoints
interval timedelta 12 hours How often to poll for updates
debug bool False Enable debug-level logging

Results

watcher.results returns a copy of the current results as a dictionary:

dict[(endpoint_id, container_id), PortainerImageWatcherResult]

Each PortainerImageWatcherResult contains:

Field Type Description
endpoint_id int \| None The endpoint the container belongs to
container_id str \| None The container ID
status PortainerImageUpdateStatus \| None The image update check result

PortainerImageUpdateStatus fields:

Field Type Description
update_available bool True if a newer image is available in the registry
local_digest str \| None Digest of the locally running image
registry_digest str \| None Digest of the latest image in the registry

Callbacks

Instead of polling watcher.results yourself, you can register callbacks that are invoked automatically after each poll cycle, once per container result. Both sync and async callables are supported.

Registering a callback

from pyportainer import Portainer, PortainerImageWatcher
from pyportainer.watcher import PortainerImageWatcherResult


def on_result(result: PortainerImageWatcherResult) -> None:
    if result.status and result.status.update_available:
        print(f"Update available for container {result.container_id}")


watcher = PortainerImageWatcher(portainer, interval=timedelta(hours=6))
watcher.register_callback(on_result)
watcher.start()

Async callbacks

async def on_result(result: PortainerImageWatcherResult) -> None:
    if result.status and result.status.update_available:
        # Follow up with an async action, e.g. send a notification
        await notify(result.container_id)

watcher.register_callback(on_result)

Filtering for updates only

Callbacks receive every result, including containers where no update is available. Filter inside the callback:

def on_result(result: PortainerImageWatcherResult) -> None:
    if not result.status or not result.status.update_available:
        return
    # Handle update ...

Unregistering a callback

watcher.unregister_callback(on_result)

Notes

  • Registering the same callable twice is a no-go; it will only be called once per result.
  • Exceptions raised inside a callback are logged but do not stop the watcher or prevent other callbacks from running.
  • The WatcherCallback type alias is exported from pyportainer for type annotations: from pyportainer import WatcherCallback.

Runtime control

Changing the interval

The polling interval can be updated at any time. The new value takes effect after the next completed check:

from datetime import timedelta

watcher.interval = timedelta(hours=1)

It is however recommended to not thunderherd the registry with too frequent checks, especially if you have many containers or endpoints. A reasonable interval is usually between 6 and 24 hours, depending on how often you update your images. Registries may rate-limit requests, so adjust accordingly if you have many containers or a registry with strict limits.

Checking when the last poll ran

import datetime

if watcher.last_check:
    last = datetime.datetime.fromtimestamp(watcher.last_check)
    print(f"Last checked at: {last}")

Stopping the watcher

watcher.stop()

Calling stop() cancels the background task. Call start() again to restart it.

API reference

Background image update watcher.

PortainerImageWatcher

Periodically checks all containers on an endpoint for image updates.

Results are stored and accessible via the results property after each check.

Source code in src/pyportainer/watcher.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
class PortainerImageWatcher:
    """Periodically checks all containers on an endpoint for image updates.

    Results are stored and accessible via the `results` property after each check.
    """

    def __init__(
        self,
        portainer: Portainer,
        endpoint_id: int | None = None,
        interval: timedelta = timedelta(hours=12),
        *,
        debug: bool = False,
    ) -> None:
        """Initialize the PortainerImageWatcher.

        Args:
        ----
            portainer: An authenticated Portainer client instance.
            endpoint_id: The ID of the endpoint whose containers to monitor. If None, all endpoints are monitored.
            interval: How often to poll for updates. Defaults to 12 hours.

        """
        self._portainer = portainer
        self._endpoint_id = endpoint_id
        self._interval = interval
        self._results: dict[tuple[int, str], PortainerImageWatcherResult] = {}
        self._task: asyncio.Task[None] | None = None
        self._last_check: float | None = None
        self._callbacks: list[WatcherCallback] = []

        _LOGGER.setLevel(logging.DEBUG if debug else logging.INFO)

    @property
    def interval(self) -> timedelta:
        """Polling interval."""
        return self._interval

    @interval.setter
    def interval(self, value: timedelta) -> None:
        """Update the polling interval. Takes effect after the next check."""
        self._interval = value

    @property
    def last_check(self) -> float | None:
        """Timestamp of the last completed check, or None if no checks have completed yet."""
        return self._last_check

    @property
    def results(self) -> dict[tuple[int, str], PortainerImageWatcherResult]:
        """Latest update status as of the last check."""
        return self._results.copy()

    def start(self) -> None:
        """Start the background polling loop.

        The first check runs immediately; subsequent checks run after each interval.
        Must be called from within a running asyncio event loop.
        """
        if self._task is None or self._task.done():
            self._task = asyncio.create_task(self._run())

    def stop(self) -> None:
        """Cancel the background polling loop."""
        if self._task and not self._task.done():
            self._task.cancel()

    def register_callback(self, callback: WatcherCallback) -> None:
        """Register a callback to be invoked for every result after each poll cycle.

        Both synchronous and async callables are supported. The callback receives a
        single :class:`PortainerImageWatcherResult` argument. Each unique callable
        is only registered once; duplicate registrations are silently ignored.

        Args:
        ----
            callback: A sync or async callable that accepts a
                :class:`PortainerImageWatcherResult`.

        """
        if callback not in self._callbacks:
            self._callbacks.append(callback)

    def unregister_callback(self, callback: WatcherCallback) -> None:
        """Remove a previously registered callback.

        Args:
        ----
            callback: The callable to remove. Raises :exc:`ValueError` if it was not registered.

        """
        self._callbacks.remove(callback)

    async def _fire_callbacks(self, result: PortainerImageWatcherResult) -> None:
        """Invoke all registered callbacks for a single result.

        Exceptions raised by individual callbacks are logged but not blocking.
        """
        for callback in list(self._callbacks):
            try:
                ret = callback(result)
                if asyncio.iscoroutine(ret):
                    await ret
            except Exception:  # pylint: disable=broad-except
                _LOGGER.exception("Callback raised an exception for container %s", result.container_id)

    async def _run(self) -> None:
        """Loop that checks immediately, then sleep for the interval, then repeat.

        Errors during checks are logged but don't stop the watcher, allowing recovery from transient issues.
        """
        while True:
            try:
                await self._check_all()
            except PortainerTimeoutError:
                _LOGGER.exception("Timeout during image check")
            except PortainerConnectionError:
                _LOGGER.exception("Connection error during image check")
            except PortainerAuthenticationError:
                _LOGGER.exception("Authentication error during image check")
            except PortainerError:
                _LOGGER.exception("Error during image check")
            finally:
                self._last_check = time.time()

            await asyncio.sleep(self._interval.total_seconds())

    async def _check_all(self) -> None:
        """Fetch all containers and check each unique image concurrently.

        Errors for individual images are logged but silently skipped so one
        failing image does not prevent the rest from being checked.
        """
        if self._endpoint_id is not None:
            endpoint_ids: list[int] = [self._endpoint_id]
        else:
            _LOGGER.debug("No endpoint_id specified, fetching all endpoints to check.")
            endpoints = await self._portainer.get_endpoints()
            endpoint_ids = [endpoint.id for endpoint in endpoints]

        fresh: dict[tuple[int, str], PortainerImageWatcherResult] = {}

        for endpoint_id in endpoint_ids:
            try:
                containers = await self._portainer.get_containers(endpoint_id)
            except PortainerError:
                _LOGGER.warning("Failed to fetch containers for endpoint %s, skipping", endpoint_id)
                continue

            image_containers = defaultdict(list)
            for container in containers:
                if container.image and container.state == "running":
                    image_containers[container.image].append(container.id)

            _LOGGER.debug("Checking %d unique images for endpoint %s...", len(image_containers), endpoint_id)

            statuses = await asyncio.gather(
                *(self._portainer.container_image_status(endpoint_id, image) for image in image_containers),
                return_exceptions=True,
            )
            for image, status in zip(image_containers, statuses, strict=False):
                if isinstance(status, BaseException):
                    _LOGGER.warning("Failed to check image %s on endpoint %s: %s", image, endpoint_id, status)
                    continue
                for container_id in image_containers[image]:
                    fresh[(endpoint_id, container_id)] = PortainerImageWatcherResult(
                        endpoint_id=endpoint_id,
                        container_id=container_id,
                        status=status,
                    )

                    _LOGGER.debug("Checked image %s on endpoint %s for container %s", image, endpoint_id, container_id)

        self._results = fresh

        if self._callbacks and fresh:
            await asyncio.gather(*(self._fire_callbacks(result) for result in fresh.values()))

interval property writable

Polling interval.

last_check property

Timestamp of the last completed check, or None if no checks have completed yet.

results property

Latest update status as of the last check.

__init__(portainer, endpoint_id=None, interval=timedelta(hours=12), *, debug=False)

Initialize the PortainerImageWatcher.


portainer: An authenticated Portainer client instance.
endpoint_id: The ID of the endpoint whose containers to monitor. If None, all endpoints are monitored.
interval: How often to poll for updates. Defaults to 12 hours.
Source code in src/pyportainer/watcher.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    portainer: Portainer,
    endpoint_id: int | None = None,
    interval: timedelta = timedelta(hours=12),
    *,
    debug: bool = False,
) -> None:
    """Initialize the PortainerImageWatcher.

    Args:
    ----
        portainer: An authenticated Portainer client instance.
        endpoint_id: The ID of the endpoint whose containers to monitor. If None, all endpoints are monitored.
        interval: How often to poll for updates. Defaults to 12 hours.

    """
    self._portainer = portainer
    self._endpoint_id = endpoint_id
    self._interval = interval
    self._results: dict[tuple[int, str], PortainerImageWatcherResult] = {}
    self._task: asyncio.Task[None] | None = None
    self._last_check: float | None = None
    self._callbacks: list[WatcherCallback] = []

    _LOGGER.setLevel(logging.DEBUG if debug else logging.INFO)

register_callback(callback)

Register a callback to be invoked for every result after each poll cycle.

Both synchronous and async callables are supported. The callback receives a single :class:PortainerImageWatcherResult argument. Each unique callable is only registered once; duplicate registrations are silently ignored.


callback: A sync or async callable that accepts a
    :class:`PortainerImageWatcherResult`.
Source code in src/pyportainer/watcher.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def register_callback(self, callback: WatcherCallback) -> None:
    """Register a callback to be invoked for every result after each poll cycle.

    Both synchronous and async callables are supported. The callback receives a
    single :class:`PortainerImageWatcherResult` argument. Each unique callable
    is only registered once; duplicate registrations are silently ignored.

    Args:
    ----
        callback: A sync or async callable that accepts a
            :class:`PortainerImageWatcherResult`.

    """
    if callback not in self._callbacks:
        self._callbacks.append(callback)

start()

Start the background polling loop.

The first check runs immediately; subsequent checks run after each interval. Must be called from within a running asyncio event loop.

Source code in src/pyportainer/watcher.py
88
89
90
91
92
93
94
95
def start(self) -> None:
    """Start the background polling loop.

    The first check runs immediately; subsequent checks run after each interval.
    Must be called from within a running asyncio event loop.
    """
    if self._task is None or self._task.done():
        self._task = asyncio.create_task(self._run())

stop()

Cancel the background polling loop.

Source code in src/pyportainer/watcher.py
 97
 98
 99
100
def stop(self) -> None:
    """Cancel the background polling loop."""
    if self._task and not self._task.done():
        self._task.cancel()

unregister_callback(callback)

Remove a previously registered callback.


callback: The callable to remove. Raises :exc:`ValueError` if it was not registered.
Source code in src/pyportainer/watcher.py
118
119
120
121
122
123
124
125
126
def unregister_callback(self, callback: WatcherCallback) -> None:
    """Remove a previously registered callback.

    Args:
    ----
        callback: The callable to remove. Raises :exc:`ValueError` if it was not registered.

    """
    self._callbacks.remove(callback)

PortainerImageWatcherResult dataclass

Represents the status of an image watcher.

Source code in src/pyportainer/watcher.py
26
27
28
29
30
31
32
@dataclass(frozen=True)
class PortainerImageWatcherResult:
    """Represents the status of an image watcher."""

    endpoint_id: int | None = None
    container_id: str | None = None
    status: PortainerImageUpdateStatus | None = None