Skip to content

API Reference

Guard(policy, *, logger_sink=None, metrics=None, obligation_checker=None, role_resolver=None, relationship_checker=None, cache=None, cache_ttl=300, strict_types=False)

Policy evaluation engine.

Holds a policy or a policy set and evaluates access decisions.

Design
  • Single async core _evaluate_core_async (one source of truth).
  • Sync API wraps the async core; if a loop is already running, uses a class-level ThreadPoolExecutor (created lazily, shared across all Guard instances) to avoid the overhead of spawning a new thread pool on every call.
  • DI (resolver/obligations/metrics/logger) can be sync or async; both supported via maybe_await.
  • CPU-bound evaluation is offloaded to a thread via asyncio.to_thread.

clear_cache()

Clear the decision cache if configured.

This is safe to call at any time. Errors are swallowed to avoid interfering with decision flow.

evaluate_async(subject, action, resource, context=None, *, explain=False) async

True async API for ASGI frameworks.

Parameters:

Name Type Description Default
explain bool

when True, populate :attr:Decision.trace with a per-rule evaluation log. Has no effect on the decision itself.

False

evaluate_batch_async(requests, *, explain=False, timeout=None) async

Evaluate multiple access requests concurrently, preserving order.

Runs all requests in parallel via :func:asyncio.gather. The returned list has exactly one :class:Decision per input tuple, in the same order. If any individual evaluation raises an exception the whole batch propagates that exception (fail-fast semantics).

Parameters:

Name Type Description Default
requests Sequence[tuple[Subject, Action, Resource, Context | None]]

sequence of (subject, action, resource, context) tuples. context may be None.

required
explain bool

when True, populate :attr:Decision.trace on every returned :class:Decision.

False
timeout float | None

optional wall-clock deadline in seconds for the entire batch. When exceeded :class:asyncio.TimeoutError is raised and no partial results are returned. None (default) means no deadline. Useful when individual checks may call a slow ReBAC provider (e.g. SpiceDB, OpenFGA) and you need a bound on total latency.

None

Returns:

Type Description
list[Decision]

List of :class:Decision objects, one per request, preserving

list[Decision]

input order.

Raises:

Type Description
TimeoutError

when timeout is set and the batch does not complete within the allotted time.

Example::

decisions = await guard.evaluate_batch_async([
    (subject, Action("read"),   resource1, ctx),
    (subject, Action("write"),  resource1, ctx),
    (subject, Action("delete"), resource2, None),
], timeout=2.0)

evaluate_batch_sync(requests, *, explain=False, timeout=None)

Synchronous wrapper for :meth:evaluate_batch_async.

Uses the same loop-detection strategy as :meth:evaluate_sync: runs directly via :func:asyncio.run when no event loop is active, or submits to the class-level :class:~concurrent.futures.ThreadPoolExecutor when called from within a running loop.

Parameters:

Name Type Description Default
requests Sequence[tuple[Subject, Action, Resource, Context | None]]

sequence of (subject, action, resource, context) tuples. context may be None.

required
explain bool

when True, populate :attr:Decision.trace on every returned :class:Decision.

False
timeout float | None

optional wall-clock deadline in seconds passed through to :meth:evaluate_batch_async. None means no deadline.

None

Returns:

Type Description
list[Decision]

List of :class:Decision objects, one per request, preserving

list[Decision]

input order.

Raises:

Type Description
TimeoutError

when timeout is set and the batch does not complete within the allotted time.

evaluate_sync(subject, action, resource, context=None, *, explain=False)

Synchronous wrapper for the async core.

  • If no running loop in this thread: use asyncio.run() directly.
  • If a loop is running (e.g. called from sync code inside an async framework): submit to the class-level ThreadPoolExecutor so the worker thread gets its own event loop via asyncio.run(). The executor is created lazily and reused across calls to avoid the overhead of spawning a new thread pool on every invocation.

Parameters:

Name Type Description Default
explain bool

when True, populate :attr:Decision.trace with a per-rule evaluation log. Has no effect on the decision itself.

False

set_policy(policy)

Replace policy/policyset.

Thread-safe: acquires _policy_lock so that concurrent readers in _decide_async always see a consistent triple of (policy, policy_etag, _compiled).

update_policy(policy)

Alias kept for backward-compatibility.



ConditionDepthError

Bases: Exception

Raised when a condition tree exceeds the maximum nesting depth.

This is a security guard against DoS via deeply nested and/or/not chains in policies loaded from untrusted external sources (HTTP, S3, etc.). The engine treats this as a failed condition (fail-closed) and records reason = "condition_depth_exceeded" in the decision.

ConditionTypeError

Bases: Exception

Raised when a condition compares incompatible types.

eval_condition(cond, env, _depth=0)

Evaluate condition dict safely.

Raises ConditionTypeError on type mismatches and ConditionDepthError when the and/or/not nesting exceeds MAX_CONDITION_DEPTH (default 50). The depth guard prevents a maliciously crafted policy loaded from an external source (HTTP, S3, …) from triggering a Python RecursionError and crashing the process.

_depth is an internal parameter — callers must not pass it.

resolve(token, env)

Resolve a token; supports {"attr": "a.b.c"} lookups in env.


decide(policyset, env)

Evaluate a policy set with combining algorithm over its child policies.


RoleResolver

Bases: Protocol

expand(roles)

Return roles including inherited/derived ones.



DecisionLogger(*, sample_rate=1.0, redactions=None, logger_name='rbacx.audit', as_json=False, level=logging.INFO, redact_in_place=False, use_default_redactions=False, smart_sampling=False, category_sampling_rates=None, max_env_bytes=None)

Bases: DecisionLogSink

Minimal, framework-agnostic audit logger for PDP decisions.

Backwards-compatible defaults
  • No redactions are applied unless explicitly configured.
  • sample_rate controls probabilistic logging: 0.0 → drop all, 1.0 → log all.
  • Smart sampling is disabled by default.
  • No env size limit by default.
Opt-in features
  • use_default_redactions=True enables DEFAULT_REDACTIONS when redactions is not provided.
  • smart_sampling=True enables category-aware sampling (deny and permit-with-obligations can be forced to 1.0).
  • max_env_bytes truncates the (redacted) env if the serialized JSON exceeds the threshold.


BasicObligationChecker

Bases: ObligationChecker

Validate common obligations carried by a decision.

Design goals (documented for contributors): - Fail-closed semantics preserved for legacy callers: * If legacy string key decision is present and not equal to "permit" -> (False, None). * If legacy key absent, derive effect from effect/allowed; any non-"permit" -> (False, None). - Support obligations targeting the current effect (on: "permit" | "deny"). This allows, for example, an explicit http_challenge on deny to still surface a challenge. - Do not mutate the incoming decision; return a (ok, challenge) tuple for the Guard to consume. - Unknown obligation type is ignored (treated as advice/no-op).

Supported type values: - require_mfa -> challenge "mfa" - require_level (attrs.min) -> "step_up" - http_challenge (attrs.scheme in Basic/Bearer/Digest) -> "http_basic" / "http_bearer" / "http_digest"; else "http_auth" - require_consent (attrs.key or any consent) -> "consent" - require_terms_accept -> "tos" - require_captcha -> "captcha" - require_reauth (attrs.max_age vs context.reauth_age_seconds) -> "reauth" - require_age_verified -> "age_verification"

check(decision, context)

Check obligations attached to a raw decision.

Parameters

decision: Mapping-like (dict). Legacy callers may pass string key decision ("permit"|"deny"); modern shape may include effect/allowed. obligations is a list of mappings. context : Object whose .attrs is a dict (or context itself is a dict).

Returns

(ok, challenge): bool and optional machine-readable challenge string.

ObligationCheckResult(ok, challenge=None, reason=None) dataclass

Small DTO kept for backwards-compatibility if needed by contributors.


AbstractCache

Bases: Protocol

Minimal cache interface for dependency inversion.

Implementations MUST be safe to call from multiple threads in-process or be clearly documented otherwise.

get should return None if a key doesn't exist or is expired. set may accept an optional TTL in seconds.

DefaultInMemoryCache(maxsize=2048)

Bases: AbstractCache

Thread-safe in-memory LRU cache with optional per-key TTL.

Notes

  • Uses time.monotonic() for TTL to avoid wall clock changes.
  • Designed for single process scenarios. For multi-process/multi-host, inject a distributed cache implementation that conforms to AbstractCache.
  • Values are stored as-is; callers are responsible for storing immutable or copy-safe data if necessary.

StaticRoleResolver(graph=None)

Bases: RoleResolver

Simple in-memory role resolver with inheritance.

graph: {role: [parent_role, ...]} expand(['manager']) -> ['manager', 'employee', 'user', ...]


HotReloader(guard, source, *, initial_load=False, poll_interval=5.0, backoff_min=2.0, backoff_max=30.0, jitter_ratio=0.15, thread_daemon=True)

Unified, production-grade policy reloader.

Features
  • ETag-first logic: call source.etag() and only load/apply when it changes.
  • Error suppression with exponential backoff + jitter to avoid log/IO storms.
  • Optional background polling loop with clean start/stop.
  • Backwards-compatible one-shot API aliases: refresh_if_needed()/poll_once().
Notes
  • If source.etag() returns None, we will attempt to load() and let the source decide.
  • Guard.set_policy(policy) is called only after a successful load().
  • This class is thread-safe for concurrent check_and_reload() calls.

Parameters:

Name Type Description Default
initial_load bool

Controls startup behavior. - False (default): prime ETag at construction time; the first check will NO-OP unless the policy changes. (Backwards-compatible with previous versions.) - True: do not prime ETag; the first check will load the current policy.

False

check_and_reload(*, force=False)

Perform a single reload check (sync wrapper over the async core).

Parameters:

Name Type Description Default
force bool

If True, load/apply the policy regardless of ETag state.

False

Returns:

Type Description
bool

True if a new policy was loaded and applied; otherwise False.

check_and_reload_async(*, force=False) async

Async-aware reload check
  • supports sync/async PolicySource.etag()/load() via _maybe_await
  • never holds the thread lock while awaiting

start(interval=None, *, initial_load=None, force_initial=False)

Start the background polling thread.

Parameters:

Name Type Description Default
interval float | None

seconds between checks; if None, uses self.poll_interval (or 5.0 fallback).

None
initial_load bool | None

override constructor's initial_load just for this start(). If True, perform a synchronous load/check before starting the thread. If False, skip any initial load. If None, inherit the constructor setting.

None
force_initial bool

if True and an initial load is requested, bypass the ETag check for that initial load (equivalent to check_and_reload(force=True)).

False

stop(timeout=1.0)

Signal the polling thread to stop and optionally wait for it.


ComputedUserset(relation) dataclass

Follow another relation on the SAME object.

InMemoryRelationshipStore()

Minimal tuple store with indexes by (resource, relation) and (subject, relation). Suitable for tests/dev. For production, implement the same interface on top of a DB.

LocalRelationshipChecker(store, *, rules=None, caveat_registry=None, max_depth=8, max_nodes=10000, deadline_ms=50)

Bases: RelationshipChecker

In-process ReBAC implementation based on a userset-rewrite graph
  • primitives: union (list), This, ComputedUserset, TupleToUserset
  • safety limits: max_depth, max_nodes, deadline_ms
  • conditional tuples via a caveat registry (predicate by name)

This() dataclass

Direct relation: subject --relation--> resource (aka 'this').

TupleToUserset(tupleset, computed_userset) dataclass

Traverse an object->object edge first (tupleset) and then evaluate a relation ('computed_userset') on the TARGET object.



OpenFGAChecker(config, *, client=None, async_client=None)

Bases: RelationshipChecker

ReBAC provider backed by OpenFGA HTTP API.

  • Uses /stores/{store_id}/check and /stores/{store_id}/batch-check.
  • For conditions, forwards context (OpenFGA merges persisted and request contexts).
  • If both clients are provided, AsyncClient takes precedence (methods return awaitables).

batch_check(triples, *, context=None, authorization_model_id=None)

Check multiple (subject, relation, resource) triples via OpenFGA /batch-check — a single HTTP round-trip for all triples.

Uses correlation_id per check to reassemble results in input order, since the OpenFGA API does not guarantee response ordering. On any HTTP error all results resolve to False (fail-closed).

OpenFGAConfig(api_url, store_id, authorization_model_id=None, api_token=None, timeout_seconds=2.0) dataclass

Minimal configuration for OpenFGA HTTP client.


SpiceDBChecker(config, *, async_mode=False)

Bases: RelationshipChecker

ReBAC provider backed by the SpiceDB/Authzed gRPC API.

  • Single checks use CheckPermission.
  • Batch checks use BulkCheckPermissions (one gRPC call for N triples) in async mode; falls back to sequential sync calls otherwise.
  • Consistency: ZedToken (at_least_as_fresh) or fully_consistent.
  • Caveats: pass context as google.protobuf.Struct.

batch_check(triples, *, context=None, zed_token=None)

Check multiple (subject, relation, resource) triples in one call.

Async mode uses BulkCheckPermissions when available (authzed ≥ 0.9) — a single gRPC round-trip for all triples, preserving order. Falls back to concurrent CheckPermission calls when the bulk RPC is absent.

Sync mode falls back to sequential CheckPermission calls (the SpiceDB sync gRPC client does not expose a bulk endpoint).

On any RPC error the affected item resolves to False (fail-closed).

SpiceDBConfig(endpoint, token=None, insecure=False, prefer_fully_consistent=False, timeout_seconds=2.0) dataclass

Minimal configuration for the SpiceDB/Authzed gRPC client.


FilePolicySource(path, *, validate_schema=False, include_mtime_in_etag=False, chunk_size=512 * 1024)

Bases: PolicySource

Policy source backed by a local JSON file.

ETag semantics
  • By default, ETag = SHA-256 of file content.
  • If include_mtime_in_etag=True, the ETag also includes mtime (ns), so a simple "touch" (metadata-only change) will trigger a reload.

The class caches the last SHA by (size, mtime_ns) to avoid unnecessary hashing.

atomic_write(path, data, *, encoding='utf-8')

Write data atomically to path using a temp file + os.replace().


S3PolicySource(url, *, client=None, session=None, config=None, client_extra=None, validate_schema=True, change_detector='etag', prefer_checksum='sha256')

Bases: PolicySource

Policy source backed by Amazon S3.

Change detection strategies (choose one via change_detector): - "etag" : HeadObject ETag (default). - "version_id" : HeadObject VersionId (requires bucket versioning). - "checksum" : GetObjectAttributes(..., ObjectAttributes=['Checksum']) if available.

Networking defaults are production-friendly (timeouts + retries) and can be overridden via a custom botocore Config or client parameters.

etag()

Return the current change marker according to change_detector.

load()

Download and parse the policy document from S3.

The format (JSON vs YAML) is auto-detected using the object key (filename) and/or content.


HTTPPolicySource(url, *, headers=None, validate_schema=False, verify_ssl=True, timeout=5.0, allow_redirects=True, allowed_schemes=('http', 'https'), block_private_ips=False)

Bases: PolicySource

HTTP policy source using requests with ETag support.

Security parameters

verify_ssl : bool Passed as verify to requests.get. Defaults to True (certificate verification enabled). Set to False only in development environments where you control the server.

float

Request timeout in seconds. Defaults to 5.0.

bool

Passed as allow_redirects to requests.get. Defaults to True. Set to False to prevent open-redirect abuse.

tuple[str, ...]

URL schemes that are permitted. Defaults to ("http", "https"). To restrict to HTTPS only, pass ("https",).

bool

When True, raises ValueError if the URL's host is a numeric IP address in a private, loopback, or link-local range (SSRF guard). Hostname literals (e.g. "localhost") are not blocked by this flag because they require DNS resolution; use network-level controls for hostname-based SSRF protection. Defaults to False to preserve backward compatibility.

Extra: rbacx[http]


RbacxMiddleware(app, *, guard, mode='enforce', build_env=None, add_headers=False)

Framework-agnostic ASGI middleware.

Modes
  • "inject": only injects guard into scope.
  • "enforce": evaluates access for HTTP requests when build_env is provided.
Security
  • Does not leak denial reasons in the response body.
  • If add_headers=True, attaches X-RBACX-* headers on deny.



require_access(guard, build_env, *, add_headers=False)

Decorator for Flask view functions to enforce access.


AsyncRbacxDjangoMiddleware(get_response)

Async-capable variant of :class:RbacxDjangoMiddleware for Django 4.1+ ASGI applications.

Inject a Guard instance onto each Django request as request.rbacx_guard without blocking the event loop.

Config
  • settings.RBACX_GUARD_FACTORY: dotted path to a zero-arg callable returning a Guard.

Django detects async middleware via the _is_coroutine marker attribute (set in __init__ when get_response is itself a coroutine function) and the async_capable / sync_capable flags.

__call__(request) async

Attach the guard to the request, then await the next middleware.

RbacxDjangoMiddleware(get_response)

Inject a Guard instance onto each Django request as request.rbacx_guard.

Config
  • settings.RBACX_GUARD_FACTORY: dotted path to a zero-arg callable returning a Guard.
Notes
  • Middleware init(get_response) runs once at startup; guard is created once.
  • call(request) runs per-request; we attach the same guard to each request.

AsyncTraceIdMiddleware(get_response)

Async-capable variant of :class:TraceIdMiddleware for Django 4.1+ ASGI.

Injects a trace / request-id into the logging context for the duration of the request and echoes it back in the X-Request-ID response header.

Accepts X-Request-ID and traceparent (W3C Trace Context) request headers, generating a UUID when neither is present.


RBACXMiddleware(app, *, guard, build_env, add_headers=False)

Bases: AbstractMiddleware

Litestar middleware that checks access using RBACX Guard.

  • Prefers :class:litestar.middleware.ASGIMiddleware (Litestar >= 2.15).
  • Falls back to :class:litestar.middleware.AbstractMiddleware when needed.
  • Uses :py:meth:Guard.evaluate_async.

Decision explanation / trace

Pass explain=True to any evaluation method to get a per-rule evaluation log attached to the returned Decision.

d = guard.evaluate_sync(subject, action, resource, context, explain=True)

for entry in d.trace:
    status = "matched" if entry.matched else f"skipped ({entry.skip_reason})"
    print(f"  rule {entry.rule_id!r} [{entry.effect}] → {status}")

When explain=False (the default) Decision.trace is None — there is no overhead on the hot path.

RuleTrace fields

Field Type Description
rule_id str The id field of the rule as declared in the policy
effect str Declared effect: "permit" or "deny"
matched bool True when the rule fully matched; False when skipped
skip_reason str \| None Why the rule was skipped, or None when matched=True

Possible skip_reason values: "action_mismatch", "resource_mismatch", "condition_mismatch", "condition_type_mismatch", "condition_depth_exceeded".

Algorithm-specific trace behaviour

  • deny-overrides — trace includes every rule up to and including the first matching deny (the loop breaks there). When only permits fire, all rules are present.
  • permit-overrides — trace up to and including the first matching permit.
  • first-applicable — trace up to and including the first match; subsequent rules are absent.
  • No match — every rule appears in the trace with matched=False.

explain=True is supported on all four evaluation methods:

# Single request
d = guard.evaluate_sync(..., explain=True)
d = await guard.evaluate_async(..., explain=True)

# Batch — explain applies to every request in the batch
decisions = guard.evaluate_batch_sync([...], explain=True)
decisions = await guard.evaluate_batch_async([...], explain=True)

RuleTrace is importable directly from the root package:

from rbacx import RuleTrace

Batch evaluation

Guard exposes two methods for evaluating multiple access requests in a single call — useful for populating UIs that need to know which buttons/tabs/actions to show for a given user.

from rbacx import Guard, Subject, Action, Resource, Context

guard = Guard(policy)
subject = Subject(id="u1", roles=["editor"])
resource = Resource(type="document", id="doc-42")
ctx = Context(attrs={"mfa": True})

# Async (preferred in ASGI applications)
decisions = await guard.evaluate_batch_async([
    (subject, Action("read"),   resource, ctx),
    (subject, Action("write"),  resource, ctx),
    (subject, Action("delete"), resource, ctx),
])

# Sync (works everywhere, including inside a running event loop)
decisions = guard.evaluate_batch_sync([
    (subject, Action("read"),   resource, ctx),
    (subject, Action("write"),  resource, ctx),
    (subject, Action("delete"), resource, ctx),
])

for action_name, decision in zip(["read", "write", "delete"], decisions):
    print(action_name, "→", "allow" if decision.allowed else "deny")

Signature

async def evaluate_batch_async(
    self,
    requests: Sequence[tuple[Subject, Action, Resource, Context | None]],
    *,
    explain: bool = False,
    timeout: float | None = None,
) -> list[Decision]: ...

def evaluate_batch_sync(
    self,
    requests: Sequence[tuple[Subject, Action, Resource, Context | None]],
    *,
    explain: bool = False,
    timeout: float | None = None,
) -> list[Decision]: ...

Guarantees

  • Results are returned in the same order as the input sequence.
  • Requests are evaluated concurrently via asyncio.gather — wall-clock time grows with the slowest single request rather than the total count.
  • An empty input list returns [] immediately without any evaluation.
  • timeout (seconds) bounds the total wall-clock time for the batch. asyncio.TimeoutError is raised if the deadline is exceeded. None (default) means no deadline.
  • Context may be None for any individual request.
  • All DI hooks (metrics, logger, obligation checker, role resolver, cache) are invoked per request, exactly as with evaluate_async / evaluate_sync.
  • If any individual request raises an exception the entire batch propagates that exception (fail-fast semantics, consistent with asyncio.gather).

Decision object

Fields returned by Guard.evaluate*:

  • allowed: bool
  • effect: "permit" | "deny"
  • obligations: List[Dict[str, Any]]
  • challenge: Optional[str]
  • rule_id: Optional[str]
  • policy_id: Optional[str]
  • reason: Optional[str]
  • trace: Optional[List[RuleTrace]] — populated when explain=True; None by default

require_batch_access (FastAPI)

FastAPI dependency that evaluates multiple (action, resource_type) pairs in one evaluate_batch_async call and returns a list[Decision].

from rbacx.adapters.fastapi import require_batch_access
from rbacx import Subject

def build_subject(request: Request) -> Subject:
    return Subject(id="user", roles=[request.headers.get("X-Role", "viewer")])

@app.get("/ui-state")
async def ui_state(
    decisions=Depends(
        require_batch_access(
            guard,
            [("read", "document"), ("write", "document"), ("delete", "document")],
            build_subject,
            timeout=2.0,
        )
    )
):
    return {
        "can_read":   decisions[0].allowed,
        "can_write":  decisions[1].allowed,
        "can_delete": decisions[2].allowed,
    }

AI Policy Authoring

rbacx AI Policy Authoring System.

Optional module — install with::

pip install rbacx[ai]

Provides AI-assisted policy generation, iterative refinement, and human-readable decision explanations using any OpenAI-compatible LLM.

Quick start::

from rbacx.ai import AIPolicy

ai = AIPolicy(api_key="sk-...", model="gpt-4o")

# Generate a policy from an OpenAPI schema
result = await ai.from_schema("openapi.json", context="SaaS B2B")

# Use the policy with Guard
from rbacx.core.engine import Guard
guard = Guard(result.dsl)

# Refine iteratively
result2 = await ai.refine_policy("deny delete for viewer role")

# Explain a specific decision
expl = await ai.explain_decision(
    policy=result.dsl,
    input={
        "subject": {"id": "u1", "roles": ["viewer"]},
        "action": "delete",
        "resource": {"type": "doc", "id": "d1"},
    },
)
print(expl.decision.allowed)  # False
print(expl.human)             # plain-English explanation

AIPolicy(api_key, model, *, base_url=None, timeout=60.0)

AI Policy Authoring System — main entry point.

Creates one :class:~rbacx.ai._client.LLMClient instance that is reused across all operations. After :meth:from_schema is called a :class:~rbacx.ai._refinement.RefinementSession is created internally and can be driven forward via :meth:refine_policy.

Supports any OpenAI-compatible provider through the base_url parameter — standard OpenAI, OpenRouter, Ollama, Azure OpenAI, etc.

Parameters:

Name Type Description Default
api_key str

API key for the LLM provider.

required
model str

model identifier, e.g. "gpt-4o", "anthropic/claude-3-5-sonnet" (OpenRouter), "llama3" (Ollama).

required
base_url str | None

optional base URL override. None uses the standard OpenAI endpoint. Examples: "https://openrouter.ai/api/v1", "http://localhost:11434/v1" (Ollama).

None
timeout float

HTTP request timeout in seconds passed to the SDK.

60.0

Example::

ai = AIPolicy(api_key="sk-...", model="gpt-4o")
result = await ai.from_schema("openapi.json", context="SaaS B2B")
result2 = await ai.refine_policy(feedback="deny delete for viewers")
expl = await ai.explain_decision(
    policy=result.dsl,
    input={
        "subject": {"id": "u1", "roles": ["viewer"]},
        "action": "delete",
        "resource": {"type": "doc", "id": "d1"},
    },
)

explain_decision(policy, input) async

Explain a specific access decision using Guard + LLM.

The decision is evaluated deterministically by a minimal :class:~rbacx.core.engine.Guard instance — the LLM is never asked to decide allow/deny. The LLM only produces the human-readable explanation of why.

Parameters:

Name Type Description Default
policy dict[str, Any]

valid rbacx policy dict.

required
input dict[str, Any]

access-request dict::

{ "subject": { "id": str, "roles": list[str], # optional "attrs": dict, # optional }, "action": str, "resource": { "type": str, "id": str | None, # optional "attrs": dict, # optional }, }

required

Returns:

Name Type Description
DecisionExplanation

class:~rbacx.ai._result.DecisionExplanation with the

authoritative DecisionExplanation

class:~rbacx.core.decision.Decision and a

DecisionExplanation

plain-English explanation.

Raises:

Type Description
PolicyGenerationError

input is malformed or missing required fields.

from_schema(schema, *, context='', safe_mode=True, compile=False, explain=False, raw=False) async

Generate an rbacx policy from an API schema.

Resets any existing :class:~rbacx.ai._refinement.RefinementSession and creates a new one seeded with the generated policy and the conversation messages used during generation.

Parameters:

Name Type Description Default
schema Path | str | dict[str, Any]

OpenAPI 3.x or 2.0 schema supplied as:

  • a :class:pathlib.Path to a .json or .yaml file,
  • a file-path string,
  • a raw JSON string, or
  • a pre-loaded dict.
required
context str

free-form domain description to guide generation, e.g. "SaaS B2B, tenant isolation, admins per-org".

''
safe_mode bool

run the validate → retry on failure → lint pipeline.

True
compile bool

compile the policy via the rbacx compiler and include the result in :attr:PolicyResult.compiled. Raises :exc:PolicyGenerationError if the compiler is unavailable.

False
explain bool

request per-rule human explanations (one extra LLM call). Result is in :attr:PolicyResult.explanation.

False
raw bool

include the raw LLM output string in :attr:PolicyResult.raw for debugging.

False

Returns:

Type Description
PolicyResult

class:~rbacx.ai._result.PolicyResult.

Raises:

Type Description
SchemaParseError

unrecognised schema format or unreadable file.

PolicyGenerationError

JSON parse failure or compile=True with unavailable compiler.

ValidationRetryError

safe_mode=True and both validation attempts fail.

refine_policy(feedback, *, policy=None, compile=False) async

Refine a policy with natural-language feedback.

If policy is provided the current session is reset to that policy as the new starting point before applying feedback. If policy is None the existing session is continued.

Always runs the safe_mode validation pipeline internally.

Parameters:

Name Type Description Default
feedback str

natural-language refinement instruction, e.g. "deny delete for viewer role".

required
policy dict[str, Any] | None

optional policy dict to reset the session to before refining. If None and no session exists, :exc:RuntimeError is raised.

None
compile bool

compile the refined policy. Raises :exc:PolicyGenerationError if compiler unavailable.

False

Returns:

Type Description
PolicyResult

class:~rbacx.ai._result.PolicyResult with the refined policy.

Raises:

Type Description
RuntimeError

called before :meth:from_schema with no policy argument.

ValidationRetryError

validation fails after retry.

PolicyGenerationError

JSON parse error or compiler unavailable.

DecisionExplanation(decision, human) dataclass

The outcome of an explain_decision call.

The access decision is evaluated deterministically by a minimal Guard instance — the LLM is never asked to decide allow/deny. The LLM only produces the human-readable narrative of why the decision was reached.

Attributes:

Name Type Description
decision Decision

the authoritative Decision from rbacx.core.

human str

a plain-English explanation of why the decision was made, generated by the configured LLM.

PolicyGenerationError(message, cause=None)

Bases: Exception

Raised for general generation failures.

Examples of situations that trigger this exception:

  • The LLM response is not parseable JSON.
  • The LLM returned an empty response.
  • compile=True was requested but the rbacx compiler is unavailable.
  • input_attrs passed to explain_decision is missing required fields.

Attributes:

Name Type Description
cause Exception | None

the original exception that triggered this error, if available.

PolicyResult(dsl, warnings=list(), compiled=None, explanation=None, raw=None) dataclass

The outcome of a policy generation or refinement operation.

Attributes:

Name Type Description
dsl dict[str, Any]

the generated policy dict, ready to pass directly to Guard(policy_result.dsl).

warnings list[Issue]

lint issues returned by analyze_policy. An empty list means the policy passed all semantic checks.

compiled Any | None

the compiled policy returned by rbacx.core.compiler when compile=True was requested, otherwise None.

explanation dict[str, str] | None

per-rule human-readable explanations keyed by rule_id when explain=True was requested, otherwise None.

raw str | None

the raw LLM output string when raw=True was requested, useful for debugging prompt/response pairs. None otherwise.

SchemaParseError(message, format_hint=None)

Bases: Exception

Raised when the input schema format cannot be recognised or parsed.

Attributes:

Name Type Description
format_hint str | None

detected or expected format string for diagnostics (e.g. "openapi3", "openapi2"). May be None when the format is completely unrecognised.

ValidationRetryError(message, raw, validation_errors)

Bases: Exception

Raised when both validation attempts fail in the safe_mode pipeline.

The generator tried to fix the policy once after the initial validation failure; this exception means the second attempt also produced an invalid policy.

Attributes:

Name Type Description
raw str

raw LLM output from the last generation attempt.

validation_errors list[str]

list of human-readable jsonschema error messages collected from the final failed validation.


YAML policies

All built-in policy sources accept JSON and, with the optional rbacx[yaml] extra, YAML.

  • File: detected by extension .yaml / .yml.
  • HTTP: detected by Content-Type (e.g., application/yaml, application/x-yaml, text/yaml) or URL suffix.
  • S3: detected by key suffix .yaml / .yml.

Internally YAML is parsed and validated against the same JSON Schema as JSON.