Policy loading (hot reload)¶
RBACX supports hot-reloading policies from external sources via a production-grade reloader.
- A policy source implements the
PolicySource
protocol (sync or async):load() -> Dict[str, Any] | Awaitable[Dict[str, Any]]
andetag() -> Optional[str] | Awaitable[Optional[str]]
. - The
HotReloader
watches a source: when its ETag changes, it loads the new policy and applies it to a runningGuard
. (Ifetag()
returnsNone
, the reloader will attempt a load and let the source decide change detection.) ETag is a standard content version identifier in HTTP and storage systems.
When to use¶
- You want changes to JSON or dict policies to be picked up without restarting your application.
- You use file/HTTP/S3 (or any custom) policy storage and want automatic or manual checks for updates.
- You want resilient reloads with exponential backoff + jitter after errors to avoid thundering herds.
Quick examples¶
1) Safe startup (recommended)¶
Ensures a valid policy is loaded at boot, then enables background polling.
from rbacx import Guard
from rbacx.store import FilePolicySource
from rbacx import HotReloader
guard = Guard(policy={})
source = FilePolicySource("policy.json")
reloader = HotReloader(guard, source, initial_load=True, poll_interval=2.0)
# Synchronously load whatever is in policy.json right now
reloader.check_and_reload() # returns True on first load
# Then keep watching for changes in the background
reloader.start()
# ...
reloader.stop()
2) Legacy behavior (no initial load)¶
First check is a NO-OP unless the policy changes (backwards-compatible default).
reloader = HotReloader(guard, source, initial_load=False)
reloader.check_and_reload() # likely False until ETag changes
3) Force a one-time load (ignore ETag)¶
Useful for bootstrap/migrations.
reloader = HotReloader(guard, source)
reloader.check_and_reload(force=True)
4) Force initial load via start()
¶
Performs a synchronous load before the thread starts.
reloader = HotReloader(guard, source, initial_load=False)
reloader.start(initial_load=True, force_initial=True)
Steps:
1) Start with an initial or empty policy
2) Choose source (Local filesystem in examples)
3) Create the reloader (optionally enable initial_load
)
4) Optional: force a one-time check at startup
5) Optional: run background polling
6) Your app runs between start()
and stop()
HotReloader API¶
from rbacx import HotReloader
Constructor parameters¶
Parameter | Description |
---|---|
guard |
The rbacx.core.engine.Guard instance to update. |
source |
Any PolicySource (File, HTTP, S3, custom, …). |
initial_load: bool = False |
If True , do not prime the ETag so the first check_and_reload() will load the current policy. If False (default), the first check is a NO-OP unless the ETag changed (legacy behavior). |
poll_interval: float = 5.0 |
Default polling interval (seconds) used by start() . |
Methods¶
-
check_and_reload(*, force: bool = False) -> bool
Synchronously checks the source’s ETag; if changed, loads and applies the policy. Ifforce=True
, loads and applies regardless of ETag. ReturnsTrue
if a reload occurred. -
start(interval: float | None = None, *, initial_load: bool | None = None, force_initial: bool = False) -> None
Starts background polling. interval
overrides the constructor’spoll_interval
.initial_load
overrides the constructor’s flag just for this start.-
If
initial_load
is truthy andforce_initial=True
, performs a synchronous load before starting the thread (ETag ignored for that initial load). -
stop(timeout: float | None = None) -> None
Stops background polling; optionally waits up totimeout
seconds for the current check.
Diagnostics / properties¶
last_etag
— most recently seen ETag from the source.last_reload_at
— timestamp of the last successful reload.last_error
— the last exception encountered (if any).suppressed_until
— time until which further attempts are delayed after errors (exponential backoff with jitter).
Typical reload cycle¶
- Ask the
PolicySource
for its current ETag. - If the ETag is new (or
etag()
isNone
), callload()
to fetch the policy. - Validate (if the source performs schema checks).
- Apply the policy to
guard
only after a successful load. - On errors (parse, network, permissions), keep the previous working policy, log the error, and schedule the next attempt using exponential backoff with jitter.
Integration with web frameworks¶
ASGI middleware & on-request checks¶
Use HotReloader
with your middleware to check for changes before handling requests, or rely solely on background polling.
from rbacx.adapters.asgi import RbacxMiddleware
from rbacx import Guard
from rbacx.store import FilePolicySource
from rbacx import HotReloader
from litestar import Litestar, get
from litestar.middleware import DefineMiddleware
guard = Guard(policy={})
reloader = HotReloader(guard, FilePolicySource("policy.json"), initial_load=True, poll_interval=2.0)
reloader.check_and_reload() # ensure policy is present at startup
@get("/secure")
def secure() -> dict:
# guard used automatically via middleware or dependency
return {"ok": True}
app = Litestar(
route_handlers=[secure],
middleware=[DefineMiddleware(RbacxMiddleware, guard=guard, policy_reloader=reloader)],
)
If you need ultra-low detection latency, call reloader.check_and_reload()
at the beginning of request handling (cheap ETag check), or keep background polling short.
Supported PolicySource
types¶
Out of the box, RBACX provides:
- FilePolicySource — local JSON file or dict snapshot.
- HTTPPolicySource — HTTP/HTTPS endpoint (ideal with ETag or Last-Modified validators).
- S3PolicySource — Amazon S3 objects with ETag-based change detection.
Any custom source that implements load()
and etag()
is supported.
Operational guidance¶
- Atomic writes (file sources): write to a temp file and
rename
to avoid readers seeing partial content. - Backoff & jitter: on repeated failures, use exponential backoff with jitter; this avoids synchronized retries and thundering herds. RBACX’s reloader applies jitter by default.
- Observability: export metrics/counters for reload successes/failures and
last_reload_at
. - Fail-safe policy: keep the last known good policy if a new load fails.
- Security defaults: default-deny policies are recommended until the first valid policy is loaded.
Sync vs Async usage¶
HotReloader
exposes both a synchronous and an asynchronous API:
check_and_reload(...)
— sync wrapper over the async core. Safe to call in synchronous apps (Flask/CLI) and even inside a running event loop; the reloader will delegate work to a helper thread.check_and_reload_async(...)
— async-native method for ASGI/async tasks.
Examples:
Sync (Flask/CLI/Celery)
reloader = HotReloader(guard, source)
changed = reloader.check_and_reload()
Async (FastAPI background task / asyncio)
reloader = HotReloader(guard, source)
changed = await reloader.check_and_reload_async()
Deprecated API¶
ReloadingPolicyManager
is deprecated and kept only for compatibility. Constructing it emits aDeprecationWarning
and a log warning; it delegates toHotReloader
with legacy semantics (initial_load=False
). Please migrate toHotReloader
.PolicyManager
fromrbacx.store.manager
is deprecated; useHotReloader
(plus aPolicySource
such asFilePolicySource
) instead.
Changelog (excerpt)¶
HotReloader(..., initial_load: bool = False)
— new flag to control startup behavior.check_and_reload(force: bool = False)
— newforce
parameter to bypass ETag.start(..., initial_load: bool | None = None, force_initial: bool = False)
— optional synchronous load before the polling thread starts.ReloadingPolicyManager
andrbacx.store.manager.PolicyManager
— deprecated; useHotReloader
.