API Reference¶
Guard(policy, *, logger_sink=None, metrics=None, obligation_checker=None, role_resolver=None, relationship_checker=None, cache=None, cache_ttl=300, strict_types=False)
¶
Policy evaluation engine.
Holds a policy or a policy set and evaluates access decisions.
Design
- Single async core
_evaluate_core_async(one source of truth). - Sync API wraps the async core; if a loop is already running, uses a class-level ThreadPoolExecutor (created lazily, shared across all Guard instances) to avoid the overhead of spawning a new thread pool on every call.
- DI (resolver/obligations/metrics/logger) can be sync or async; both supported via
maybe_await. - CPU-bound evaluation is offloaded to a thread via
asyncio.to_thread.
clear_cache()
¶
Clear the decision cache if configured.
This is safe to call at any time. Errors are swallowed to avoid interfering with decision flow.
evaluate_async(subject, action, resource, context=None, *, explain=False)
async
¶
True async API for ASGI frameworks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
explain
|
bool
|
when |
False
|
evaluate_batch_async(requests, *, explain=False, timeout=None)
async
¶
Evaluate multiple access requests concurrently, preserving order.
Runs all requests in parallel via :func:asyncio.gather. The
returned list has exactly one :class:Decision per input tuple, in
the same order. If any individual evaluation raises an exception the
whole batch propagates that exception (fail-fast semantics).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
requests
|
Sequence[tuple[Subject, Action, Resource, Context | None]]
|
sequence of |
required |
explain
|
bool
|
when |
False
|
timeout
|
float | None
|
optional wall-clock deadline in seconds for the entire
batch. When exceeded :class: |
None
|
Returns:
| Type | Description |
|---|---|
list[Decision]
|
List of :class: |
list[Decision]
|
input order. |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
when timeout is set and the batch does not complete within the allotted time. |
Example::
decisions = await guard.evaluate_batch_async([
(subject, Action("read"), resource1, ctx),
(subject, Action("write"), resource1, ctx),
(subject, Action("delete"), resource2, None),
], timeout=2.0)
evaluate_batch_sync(requests, *, explain=False, timeout=None)
¶
Synchronous wrapper for :meth:evaluate_batch_async.
Uses the same loop-detection strategy as :meth:evaluate_sync: runs
directly via :func:asyncio.run when no event loop is active, or
submits to the class-level :class:~concurrent.futures.ThreadPoolExecutor
when called from within a running loop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
requests
|
Sequence[tuple[Subject, Action, Resource, Context | None]]
|
sequence of |
required |
explain
|
bool
|
when |
False
|
timeout
|
float | None
|
optional wall-clock deadline in seconds passed through to
:meth: |
None
|
Returns:
| Type | Description |
|---|---|
list[Decision]
|
List of :class: |
list[Decision]
|
input order. |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
when timeout is set and the batch does not complete within the allotted time. |
evaluate_sync(subject, action, resource, context=None, *, explain=False)
¶
Synchronous wrapper for the async core.
- If no running loop in this thread: use asyncio.run() directly.
- If a loop is running (e.g. called from sync code inside an async framework): submit to the class-level ThreadPoolExecutor so the worker thread gets its own event loop via asyncio.run(). The executor is created lazily and reused across calls to avoid the overhead of spawning a new thread pool on every invocation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
explain
|
bool
|
when |
False
|
set_policy(policy)
¶
Replace policy/policyset.
Thread-safe: acquires _policy_lock so that concurrent readers in
_decide_async always see a consistent triple of
(policy, policy_etag, _compiled).
update_policy(policy)
¶
Alias kept for backward-compatibility.
ConditionDepthError
¶
Bases: Exception
Raised when a condition tree exceeds the maximum nesting depth.
This is a security guard against DoS via deeply nested and/or/not
chains in policies loaded from untrusted external sources (HTTP, S3, etc.).
The engine treats this as a failed condition (fail-closed) and records
reason = "condition_depth_exceeded" in the decision.
ConditionTypeError
¶
Bases: Exception
Raised when a condition compares incompatible types.
eval_condition(cond, env, _depth=0)
¶
Evaluate condition dict safely.
Raises ConditionTypeError on type mismatches and
ConditionDepthError when the and/or/not nesting exceeds
MAX_CONDITION_DEPTH (default 50). The depth guard prevents a
maliciously crafted policy loaded from an external source (HTTP, S3, …)
from triggering a Python RecursionError and crashing the process.
_depth is an internal parameter — callers must not pass it.
resolve(token, env)
¶
Resolve a token; supports {"attr": "a.b.c"} lookups in env.
decide(policyset, env)
¶
Evaluate a policy set with combining algorithm over its child policies.
DecisionLogger(*, sample_rate=1.0, redactions=None, logger_name='rbacx.audit', as_json=False, level=logging.INFO, redact_in_place=False, use_default_redactions=False, smart_sampling=False, category_sampling_rates=None, max_env_bytes=None)
¶
Bases: DecisionLogSink
Minimal, framework-agnostic audit logger for PDP decisions.
Backwards-compatible defaults
- No redactions are applied unless explicitly configured.
sample_ratecontrols probabilistic logging: 0.0 → drop all, 1.0 → log all.- Smart sampling is disabled by default.
- No env size limit by default.
Opt-in features
use_default_redactions=Trueenables DEFAULT_REDACTIONS whenredactionsis not provided.smart_sampling=Trueenables category-aware sampling (deny and permit-with-obligations can be forced to 1.0).max_env_bytestruncates the (redacted) env if the serialized JSON exceeds the threshold.
BasicObligationChecker
¶
Bases: ObligationChecker
Validate common obligations carried by a decision.
Design goals (documented for contributors):
- Fail-closed semantics preserved for legacy callers:
* If legacy string key decision is present and not equal to "permit" -> (False, None).
* If legacy key absent, derive effect from effect/allowed; any non-"permit" -> (False, None).
- Support obligations targeting the current effect (on: "permit" | "deny").
This allows, for example, an explicit http_challenge on deny to still surface a challenge.
- Do not mutate the incoming decision; return a (ok, challenge) tuple for the Guard to consume.
- Unknown obligation type is ignored (treated as advice/no-op).
Supported type values:
- require_mfa -> challenge "mfa"
- require_level (attrs.min) -> "step_up"
- http_challenge (attrs.scheme in Basic/Bearer/Digest) -> "http_basic" / "http_bearer" / "http_digest"; else "http_auth"
- require_consent (attrs.key or any consent) -> "consent"
- require_terms_accept -> "tos"
- require_captcha -> "captcha"
- require_reauth (attrs.max_age vs context.reauth_age_seconds) -> "reauth"
- require_age_verified -> "age_verification"
check(decision, context)
¶
Check obligations attached to a raw decision.
Parameters¶
decision: Mapping-like (dict). Legacy callers may pass string key decision ("permit"|"deny");
modern shape may include effect/allowed. obligations is a list of mappings.
context : Object whose .attrs is a dict (or context itself is a dict).
Returns¶
(ok, challenge): bool and optional machine-readable challenge string.
ObligationCheckResult(ok, challenge=None, reason=None)
dataclass
¶
Small DTO kept for backwards-compatibility if needed by contributors.
AbstractCache
¶
Bases: Protocol
Minimal cache interface for dependency inversion.
Implementations MUST be safe to call from multiple threads in-process or be clearly documented otherwise.
get should return None if a key doesn't exist or is expired. set may accept an optional TTL in seconds.
DefaultInMemoryCache(maxsize=2048)
¶
Bases: AbstractCache
Thread-safe in-memory LRU cache with optional per-key TTL.
Notes¶
- Uses time.monotonic() for TTL to avoid wall clock changes.
- Designed for single process scenarios. For multi-process/multi-host, inject a distributed cache implementation that conforms to AbstractCache.
- Values are stored as-is; callers are responsible for storing immutable or copy-safe data if necessary.
StaticRoleResolver(graph=None)
¶
Bases: RoleResolver
Simple in-memory role resolver with inheritance.
graph: {role: [parent_role, ...]} expand(['manager']) -> ['manager', 'employee', 'user', ...]
HotReloader(guard, source, *, initial_load=False, poll_interval=5.0, backoff_min=2.0, backoff_max=30.0, jitter_ratio=0.15, thread_daemon=True)
¶
Unified, production-grade policy reloader.
Features
- ETag-first logic: call source.etag() and only load/apply when it changes.
- Error suppression with exponential backoff + jitter to avoid log/IO storms.
- Optional background polling loop with clean start/stop.
- Backwards-compatible one-shot API aliases: refresh_if_needed()/poll_once().
Notes
- If source.etag() returns None, we will attempt to load() and let the source decide.
- Guard.set_policy(policy) is called only after a successful load().
- This class is thread-safe for concurrent check_and_reload() calls.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
initial_load
|
bool
|
Controls startup behavior. - False (default): prime ETag at construction time; the first check will NO-OP unless the policy changes. (Backwards-compatible with previous versions.) - True: do not prime ETag; the first check will load the current policy. |
False
|
check_and_reload(*, force=False)
¶
Perform a single reload check (sync wrapper over the async core).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
force
|
bool
|
If True, load/apply the policy regardless of ETag state. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if a new policy was loaded and applied; otherwise False. |
check_and_reload_async(*, force=False)
async
¶
Async-aware reload check
- supports sync/async PolicySource.etag()/load() via _maybe_await
- never holds the thread lock while awaiting
start(interval=None, *, initial_load=None, force_initial=False)
¶
Start the background polling thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
interval
|
float | None
|
seconds between checks; if None, uses self.poll_interval (or 5.0 fallback). |
None
|
initial_load
|
bool | None
|
override constructor's initial_load just for this start(). If True, perform a synchronous load/check before starting the thread. If False, skip any initial load. If None, inherit the constructor setting. |
None
|
force_initial
|
bool
|
if True and an initial load is requested, bypass the ETag check for that initial load (equivalent to check_and_reload(force=True)). |
False
|
stop(timeout=1.0)
¶
Signal the polling thread to stop and optionally wait for it.
ComputedUserset(relation)
dataclass
¶
Follow another relation on the SAME object.
InMemoryRelationshipStore()
¶
Minimal tuple store with indexes by (resource, relation) and (subject, relation). Suitable for tests/dev. For production, implement the same interface on top of a DB.
LocalRelationshipChecker(store, *, rules=None, caveat_registry=None, max_depth=8, max_nodes=10000, deadline_ms=50)
¶
Bases: RelationshipChecker
In-process ReBAC implementation based on a userset-rewrite graph
- primitives: union (list), This, ComputedUserset, TupleToUserset
- safety limits: max_depth, max_nodes, deadline_ms
- conditional tuples via a caveat registry (predicate by name)
This()
dataclass
¶
Direct relation: subject --relation--> resource (aka 'this').
TupleToUserset(tupleset, computed_userset)
dataclass
¶
Traverse an object->object edge first (tupleset) and then evaluate a relation ('computed_userset') on the TARGET object.
OpenFGAChecker(config, *, client=None, async_client=None)
¶
Bases: RelationshipChecker
ReBAC provider backed by OpenFGA HTTP API.
- Uses /stores/{store_id}/check and /stores/{store_id}/batch-check.
- For conditions, forwards
context(OpenFGA merges persisted and request contexts). - If both clients are provided, AsyncClient takes precedence (methods return awaitables).
batch_check(triples, *, context=None, authorization_model_id=None)
¶
Check multiple (subject, relation, resource) triples via OpenFGA
/batch-check — a single HTTP round-trip for all triples.
Uses correlation_id per check to reassemble results in input order,
since the OpenFGA API does not guarantee response ordering.
On any HTTP error all results resolve to False (fail-closed).
OpenFGAConfig(api_url, store_id, authorization_model_id=None, api_token=None, timeout_seconds=2.0)
dataclass
¶
Minimal configuration for OpenFGA HTTP client.
SpiceDBChecker(config, *, async_mode=False)
¶
Bases: RelationshipChecker
ReBAC provider backed by the SpiceDB/Authzed gRPC API.
- Single checks use
CheckPermission. - Batch checks use
BulkCheckPermissions(one gRPC call for N triples) in async mode; falls back to sequential sync calls otherwise. - Consistency: ZedToken (at_least_as_fresh) or fully_consistent.
- Caveats: pass context as google.protobuf.Struct.
batch_check(triples, *, context=None, zed_token=None)
¶
Check multiple (subject, relation, resource) triples in one call.
Async mode uses BulkCheckPermissions when available (authzed ≥ 0.9)
— a single gRPC round-trip for all triples, preserving order. Falls
back to concurrent CheckPermission calls when the bulk RPC is absent.
Sync mode falls back to sequential CheckPermission calls (the
SpiceDB sync gRPC client does not expose a bulk endpoint).
On any RPC error the affected item resolves to False (fail-closed).
SpiceDBConfig(endpoint, token=None, insecure=False, prefer_fully_consistent=False, timeout_seconds=2.0)
dataclass
¶
Minimal configuration for the SpiceDB/Authzed gRPC client.
FilePolicySource(path, *, validate_schema=False, include_mtime_in_etag=False, chunk_size=512 * 1024)
¶
Bases: PolicySource
Policy source backed by a local JSON file.
ETag semantics
- By default, ETag = SHA-256 of file content.
- If include_mtime_in_etag=True, the ETag also includes mtime (ns), so a simple "touch" (metadata-only change) will trigger a reload.
The class caches the last SHA by (size, mtime_ns) to avoid unnecessary hashing.
atomic_write(path, data, *, encoding='utf-8')
¶
Write data atomically to path using a temp file + os.replace().
S3PolicySource(url, *, client=None, session=None, config=None, client_extra=None, validate_schema=True, change_detector='etag', prefer_checksum='sha256')
¶
Bases: PolicySource
Policy source backed by Amazon S3.
Change detection strategies (choose one via change_detector):
- "etag" : HeadObject ETag (default).
- "version_id" : HeadObject VersionId (requires bucket versioning).
- "checksum" : GetObjectAttributes(..., ObjectAttributes=['Checksum']) if available.
Networking defaults are production-friendly (timeouts + retries) and can be overridden via a custom botocore Config or client parameters.
HTTPPolicySource(url, *, headers=None, validate_schema=False, verify_ssl=True, timeout=5.0, allow_redirects=True, allowed_schemes=('http', 'https'), block_private_ips=False)
¶
Bases: PolicySource
HTTP policy source using requests with ETag support.
Security parameters¶
verify_ssl : bool
Passed as verify to requests.get. Defaults to True
(certificate verification enabled). Set to False only in
development environments where you control the server.
float
Request timeout in seconds. Defaults to 5.0.
bool
Passed as allow_redirects to requests.get. Defaults to
True. Set to False to prevent open-redirect abuse.
tuple[str, ...]
URL schemes that are permitted. Defaults to ("http", "https").
To restrict to HTTPS only, pass ("https",).
bool
When True, raises ValueError if the URL's host is a numeric
IP address in a private, loopback, or link-local range (SSRF guard).
Hostname literals (e.g. "localhost") are not blocked by this
flag because they require DNS resolution; use network-level controls
for hostname-based SSRF protection. Defaults to False to preserve
backward compatibility.
Extra: rbacx[http]
RbacxMiddleware(app, *, guard, mode='enforce', build_env=None, add_headers=False)
¶
Framework-agnostic ASGI middleware.
Modes
- "inject": only injects guard into scope.
- "enforce": evaluates access for HTTP requests when build_env is provided.
Security
- Does not leak denial reasons in the response body.
- If
add_headers=True, attachesX-RBACX-*headers on deny.
require_access(guard, build_env, *, add_headers=False)
¶
Decorator for Flask view functions to enforce access.
AsyncRbacxDjangoMiddleware(get_response)
¶
Async-capable variant of :class:RbacxDjangoMiddleware for Django 4.1+
ASGI applications.
Inject a Guard instance onto each Django request as request.rbacx_guard
without blocking the event loop.
Config
settings.RBACX_GUARD_FACTORY: dotted path to a zero-arg callable returning aGuard.
Django detects async middleware via the _is_coroutine marker attribute
(set in __init__ when get_response is itself a coroutine function)
and the async_capable / sync_capable flags.
__call__(request)
async
¶
Attach the guard to the request, then await the next middleware.
RbacxDjangoMiddleware(get_response)
¶
Inject a Guard instance onto each Django request as request.rbacx_guard.
Config
- settings.RBACX_GUARD_FACTORY: dotted path to a zero-arg callable returning a Guard.
Notes
- Middleware init(get_response) runs once at startup; guard is created once.
- call(request) runs per-request; we attach the same guard to each request.
AsyncTraceIdMiddleware(get_response)
¶
Async-capable variant of :class:TraceIdMiddleware for Django 4.1+ ASGI.
Injects a trace / request-id into the logging context for the duration of
the request and echoes it back in the X-Request-ID response header.
Accepts X-Request-ID and traceparent (W3C Trace Context) request
headers, generating a UUID when neither is present.
RBACXMiddleware(app, *, guard, build_env, add_headers=False)
¶
Bases: AbstractMiddleware
Litestar middleware that checks access using RBACX Guard.
- Prefers :class:
litestar.middleware.ASGIMiddleware(Litestar >= 2.15). - Falls back to :class:
litestar.middleware.AbstractMiddlewarewhen needed. - Uses :py:meth:
Guard.evaluate_async.
Decision explanation / trace¶
Pass explain=True to any evaluation method to get a per-rule evaluation log
attached to the returned Decision.
d = guard.evaluate_sync(subject, action, resource, context, explain=True)
for entry in d.trace:
status = "matched" if entry.matched else f"skipped ({entry.skip_reason})"
print(f" rule {entry.rule_id!r} [{entry.effect}] → {status}")
When explain=False (the default) Decision.trace is None — there is no
overhead on the hot path.
RuleTrace fields
| Field | Type | Description |
|---|---|---|
rule_id |
str |
The id field of the rule as declared in the policy |
effect |
str |
Declared effect: "permit" or "deny" |
matched |
bool |
True when the rule fully matched; False when skipped |
skip_reason |
str \| None |
Why the rule was skipped, or None when matched=True |
Possible skip_reason values: "action_mismatch", "resource_mismatch",
"condition_mismatch", "condition_type_mismatch", "condition_depth_exceeded".
Algorithm-specific trace behaviour
deny-overrides— trace includes every rule up to and including the first matching deny (the loop breaks there). When only permits fire, all rules are present.permit-overrides— trace up to and including the first matching permit.first-applicable— trace up to and including the first match; subsequent rules are absent.- No match — every rule appears in the trace with
matched=False.
explain=True is supported on all four evaluation methods:
# Single request
d = guard.evaluate_sync(..., explain=True)
d = await guard.evaluate_async(..., explain=True)
# Batch — explain applies to every request in the batch
decisions = guard.evaluate_batch_sync([...], explain=True)
decisions = await guard.evaluate_batch_async([...], explain=True)
RuleTrace is importable directly from the root package:
from rbacx import RuleTrace
Batch evaluation¶
Guard exposes two methods for evaluating multiple access requests in a single
call — useful for populating UIs that need to know which buttons/tabs/actions
to show for a given user.
from rbacx import Guard, Subject, Action, Resource, Context
guard = Guard(policy)
subject = Subject(id="u1", roles=["editor"])
resource = Resource(type="document", id="doc-42")
ctx = Context(attrs={"mfa": True})
# Async (preferred in ASGI applications)
decisions = await guard.evaluate_batch_async([
(subject, Action("read"), resource, ctx),
(subject, Action("write"), resource, ctx),
(subject, Action("delete"), resource, ctx),
])
# Sync (works everywhere, including inside a running event loop)
decisions = guard.evaluate_batch_sync([
(subject, Action("read"), resource, ctx),
(subject, Action("write"), resource, ctx),
(subject, Action("delete"), resource, ctx),
])
for action_name, decision in zip(["read", "write", "delete"], decisions):
print(action_name, "→", "allow" if decision.allowed else "deny")
Signature
async def evaluate_batch_async(
self,
requests: Sequence[tuple[Subject, Action, Resource, Context | None]],
*,
explain: bool = False,
timeout: float | None = None,
) -> list[Decision]: ...
def evaluate_batch_sync(
self,
requests: Sequence[tuple[Subject, Action, Resource, Context | None]],
*,
explain: bool = False,
timeout: float | None = None,
) -> list[Decision]: ...
Guarantees
- Results are returned in the same order as the input sequence.
- Requests are evaluated concurrently via
asyncio.gather— wall-clock time grows with the slowest single request rather than the total count. - An empty input list returns
[]immediately without any evaluation. timeout(seconds) bounds the total wall-clock time for the batch.asyncio.TimeoutErroris raised if the deadline is exceeded.None(default) means no deadline.Contextmay beNonefor any individual request.- All DI hooks (metrics, logger, obligation checker, role resolver, cache) are
invoked per request, exactly as with
evaluate_async/evaluate_sync. - If any individual request raises an exception the entire batch propagates
that exception (fail-fast semantics, consistent with
asyncio.gather).
Decision object¶
Fields returned by Guard.evaluate*:
allowed: booleffect: "permit" | "deny"obligations: List[Dict[str, Any]]challenge: Optional[str]rule_id: Optional[str]policy_id: Optional[str]reason: Optional[str]trace: Optional[List[RuleTrace]]— populated whenexplain=True;Noneby default
require_batch_access (FastAPI)¶
FastAPI dependency that evaluates multiple (action, resource_type) pairs in
one evaluate_batch_async call and returns a list[Decision].
from rbacx.adapters.fastapi import require_batch_access
from rbacx import Subject
def build_subject(request: Request) -> Subject:
return Subject(id="user", roles=[request.headers.get("X-Role", "viewer")])
@app.get("/ui-state")
async def ui_state(
decisions=Depends(
require_batch_access(
guard,
[("read", "document"), ("write", "document"), ("delete", "document")],
build_subject,
timeout=2.0,
)
)
):
return {
"can_read": decisions[0].allowed,
"can_write": decisions[1].allowed,
"can_delete": decisions[2].allowed,
}
AI Policy Authoring¶
rbacx AI Policy Authoring System.
Optional module — install with::
pip install rbacx[ai]
Provides AI-assisted policy generation, iterative refinement, and human-readable decision explanations using any OpenAI-compatible LLM.
Quick start::
from rbacx.ai import AIPolicy
ai = AIPolicy(api_key="sk-...", model="gpt-4o")
# Generate a policy from an OpenAPI schema
result = await ai.from_schema("openapi.json", context="SaaS B2B")
# Use the policy with Guard
from rbacx.core.engine import Guard
guard = Guard(result.dsl)
# Refine iteratively
result2 = await ai.refine_policy("deny delete for viewer role")
# Explain a specific decision
expl = await ai.explain_decision(
policy=result.dsl,
input={
"subject": {"id": "u1", "roles": ["viewer"]},
"action": "delete",
"resource": {"type": "doc", "id": "d1"},
},
)
print(expl.decision.allowed) # False
print(expl.human) # plain-English explanation
AIPolicy(api_key, model, *, base_url=None, timeout=60.0)
¶
AI Policy Authoring System — main entry point.
Creates one :class:~rbacx.ai._client.LLMClient instance that is
reused across all operations. After :meth:from_schema is called a
:class:~rbacx.ai._refinement.RefinementSession is created internally
and can be driven forward via :meth:refine_policy.
Supports any OpenAI-compatible provider through the base_url
parameter — standard OpenAI, OpenRouter, Ollama, Azure OpenAI, etc.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_key
|
str
|
API key for the LLM provider. |
required |
model
|
str
|
model identifier, e.g. |
required |
base_url
|
str | None
|
optional base URL override. |
None
|
timeout
|
float
|
HTTP request timeout in seconds passed to the SDK. |
60.0
|
Example::
ai = AIPolicy(api_key="sk-...", model="gpt-4o")
result = await ai.from_schema("openapi.json", context="SaaS B2B")
result2 = await ai.refine_policy(feedback="deny delete for viewers")
expl = await ai.explain_decision(
policy=result.dsl,
input={
"subject": {"id": "u1", "roles": ["viewer"]},
"action": "delete",
"resource": {"type": "doc", "id": "d1"},
},
)
explain_decision(policy, input)
async
¶
Explain a specific access decision using Guard + LLM.
The decision is evaluated deterministically by a minimal
:class:~rbacx.core.engine.Guard instance — the LLM is never asked
to decide allow/deny. The LLM only produces the human-readable
explanation of why.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
policy
|
dict[str, Any]
|
valid rbacx policy dict. |
required |
input
|
dict[str, Any]
|
access-request dict:: { "subject": { "id": str, "roles": list[str], # optional "attrs": dict, # optional }, "action": str, "resource": { "type": str, "id": str | None, # optional "attrs": dict, # optional }, } |
required |
Returns:
| Name | Type | Description |
|---|---|---|
DecisionExplanation
|
class: |
|
authoritative |
DecisionExplanation
|
class: |
DecisionExplanation
|
plain-English explanation. |
Raises:
| Type | Description |
|---|---|
PolicyGenerationError
|
input is malformed or missing required fields. |
from_schema(schema, *, context='', safe_mode=True, compile=False, explain=False, raw=False)
async
¶
Generate an rbacx policy from an API schema.
Resets any existing :class:~rbacx.ai._refinement.RefinementSession
and creates a new one seeded with the generated policy and the
conversation messages used during generation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
Path | str | dict[str, Any]
|
OpenAPI 3.x or 2.0 schema supplied as:
|
required |
context
|
str
|
free-form domain description to guide generation,
e.g. |
''
|
safe_mode
|
bool
|
run the validate → retry on failure → lint pipeline. |
True
|
compile
|
bool
|
compile the policy via the rbacx compiler and include
the result in :attr: |
False
|
explain
|
bool
|
request per-rule human explanations (one extra LLM
call). Result is in :attr: |
False
|
raw
|
bool
|
include the raw LLM output string in
:attr: |
False
|
Returns:
| Type | Description |
|---|---|
PolicyResult
|
class: |
Raises:
| Type | Description |
|---|---|
SchemaParseError
|
unrecognised schema format or unreadable file. |
PolicyGenerationError
|
JSON parse failure or |
ValidationRetryError
|
|
refine_policy(feedback, *, policy=None, compile=False)
async
¶
Refine a policy with natural-language feedback.
If policy is provided the current session is reset to that policy
as the new starting point before applying feedback. If policy
is None the existing session is continued.
Always runs the safe_mode validation pipeline internally.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
feedback
|
str
|
natural-language refinement instruction, e.g.
|
required |
policy
|
dict[str, Any] | None
|
optional policy dict to reset the session to before
refining. If |
None
|
compile
|
bool
|
compile the refined policy. Raises
:exc: |
False
|
Returns:
| Type | Description |
|---|---|
PolicyResult
|
class: |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
called before :meth: |
ValidationRetryError
|
validation fails after retry. |
PolicyGenerationError
|
JSON parse error or compiler unavailable. |
DecisionExplanation(decision, human)
dataclass
¶
The outcome of an explain_decision call.
The access decision is evaluated deterministically by a minimal
Guard instance — the LLM is never asked to decide allow/deny.
The LLM only produces the human-readable narrative of why the decision
was reached.
Attributes:
| Name | Type | Description |
|---|---|---|
decision |
Decision
|
the authoritative |
human |
str
|
a plain-English explanation of why the decision was made, generated by the configured LLM. |
PolicyGenerationError(message, cause=None)
¶
Bases: Exception
Raised for general generation failures.
Examples of situations that trigger this exception:
- The LLM response is not parseable JSON.
- The LLM returned an empty response.
compile=Truewas requested but the rbacx compiler is unavailable.input_attrspassed toexplain_decisionis missing required fields.
Attributes:
| Name | Type | Description |
|---|---|---|
cause |
Exception | None
|
the original exception that triggered this error, if available. |
PolicyResult(dsl, warnings=list(), compiled=None, explanation=None, raw=None)
dataclass
¶
The outcome of a policy generation or refinement operation.
Attributes:
| Name | Type | Description |
|---|---|---|
dsl |
dict[str, Any]
|
the generated policy dict, ready to pass directly to
|
warnings |
list[Issue]
|
lint issues returned by |
compiled |
Any | None
|
the compiled policy returned by |
explanation |
dict[str, str] | None
|
per-rule human-readable explanations keyed by
|
raw |
str | None
|
the raw LLM output string when |
SchemaParseError(message, format_hint=None)
¶
Bases: Exception
Raised when the input schema format cannot be recognised or parsed.
Attributes:
| Name | Type | Description |
|---|---|---|
format_hint |
str | None
|
detected or expected format string for diagnostics
(e.g. |
ValidationRetryError(message, raw, validation_errors)
¶
Bases: Exception
Raised when both validation attempts fail in the safe_mode pipeline.
The generator tried to fix the policy once after the initial validation failure; this exception means the second attempt also produced an invalid policy.
Attributes:
| Name | Type | Description |
|---|---|---|
raw |
str
|
raw LLM output from the last generation attempt. |
validation_errors |
list[str]
|
list of human-readable jsonschema error messages collected from the final failed validation. |
YAML policies¶
All built-in policy sources accept JSON and, with the optional rbacx[yaml] extra, YAML.
- File: detected by extension
.yaml/.yml. - HTTP: detected by
Content-Type(e.g.,application/yaml,application/x-yaml,text/yaml) or URL suffix. - S3: detected by key suffix
.yaml/.yml.
Internally YAML is parsed and validated against the same JSON Schema as JSON.