Authorization Module
The agentweave.authz module provides policy-based authorization using Open Policy Agent (OPA). All agent-to-agent communication is subject to authorization checks to enforce fine-grained access control.
Module Overview
1
2
3
4
5
6
7
8
| from agentweave.authz import (
AuthorizationProvider, # Abstract base class
AuthzDecision, # Authorization decision dataclass
OPAProvider, # OPA implementation
AuthorizationError, # Base exception
PolicyEvaluationError, # Policy evaluation errors
CircuitBreakerError, # Circuit breaker errors
)
|
Classes
AuthorizationProvider
Abstract base class that defines the interface for all authorization providers.
1
2
3
4
5
| from abc import ABC, abstractmethod
from typing import Optional
class AuthorizationProvider(ABC):
"""Abstract base class for authorization providers."""
|
Implementations must enforce fine-grained access control based on:
- Caller identity (SPIFFE ID)
- Resource being accessed
- Action being performed
- Additional context (request metadata, environment, etc.)
Methods
check()
Check if a caller is authorized to perform an action on a resource.
1
2
3
4
5
6
7
| async def check(
self,
caller_id: str,
resource: str,
action: str,
context: Optional[dict] = None
) -> AuthzDecision
|
Parameters:
| Parameter |
Type |
Description |
Default |
caller_id |
str |
SPIFFE ID of the caller |
Required |
resource |
str |
Resource being accessed (e.g., SPIFFE ID of target agent) |
Required |
action |
str |
Action being performed (e.g., "search", "process") |
Required |
context |
Optional[dict] |
Additional context for policy evaluation |
None |
Returns:
AuthzDecision: Authorization decision with result and metadata
Raises:
AuthorizationError: If the authorization check fails
Example:
1
2
3
4
5
6
7
8
9
10
11
12
13
| decision = await provider.check(
caller_id="spiffe://agentweave.io/agent/frontend",
resource="spiffe://agentweave.io/agent/database",
action="query",
context={"request_id": "123", "environment": "production"}
)
if decision.allowed:
# Proceed with request
pass
else:
# Deny request
raise PermissionDenied(decision.reason)
|
health_check()
Check if the authorization provider is healthy and reachable.
1
| async def health_check(self) -> bool
|
Returns:
bool: True if healthy, False otherwise
Example:
1
2
3
4
| if await provider.health_check():
print("Authorization provider is healthy")
else:
print("Authorization provider is unhealthy")
|
AuthzDecision
Authorization decision result dataclass.
1
2
3
4
5
6
7
8
9
10
| from dataclasses import dataclass
from typing import Optional
@dataclass(frozen=True)
class AuthzDecision:
"""Authorization decision result."""
allowed: bool
reason: str
policy_id: Optional[str] = None
audit_id: str = ""
|
This immutable dataclass represents the result of an authorization check.
Attributes
| Attribute |
Type |
Description |
allowed |
bool |
Whether the action is permitted |
reason |
str |
Human-readable explanation for the decision |
policy_id |
Optional[str] |
ID of the policy that made the decision (if applicable) |
audit_id |
str |
Unique identifier for audit trail correlation (auto-generated if not provided) |
Usage Examples
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| # Allow decision
decision = AuthzDecision(
allowed=True,
reason="Caller has admin role",
policy_id="admin-access-policy"
)
# Deny decision
decision = AuthzDecision(
allowed=False,
reason="Caller not in allowed trust domain",
policy_id="trust-domain-policy"
)
# Access decision fields
if decision.allowed:
print(f"Access granted: {decision.reason}")
print(f"Audit ID: {decision.audit_id}")
else:
print(f"Access denied: {decision.reason}")
|
OPAProvider
OPA-based authorization provider with circuit breaker and caching.
1
2
| class OPAProvider(AuthorizationProvider):
"""OPA-based authorization provider."""
|
Features:
- REST API integration with OPA
- Circuit breaker for OPA failures
- Decision caching with TTL
- Automatic audit logging
- Default deny in production mode
- SPIFFE ID aware policy context
Constructor
1
2
3
4
5
6
7
8
9
10
11
| def __init__(
self,
endpoint: str = "http://localhost:8181",
policy_path: str = "agentweave/authz/allow",
default_deny: bool = True,
cache_ttl: float = 60.0,
cache_size: int = 1000,
timeout: float = 5.0,
circuit_breaker_threshold: int = 5,
circuit_breaker_timeout: float = 30.0
)
|
Parameters:
| Parameter |
Type |
Description |
Default |
endpoint |
str |
OPA server base URL |
"http://localhost:8181" |
policy_path |
str |
Path to the policy decision endpoint (relative to /v1/data/) |
"agentweave/authz/allow" |
default_deny |
bool |
If True, deny requests when OPA is unavailable. Should always be True in production. |
True |
cache_ttl |
float |
Cache TTL in seconds |
60.0 |
cache_size |
int |
Maximum number of cached decisions (LRU eviction) |
1000 |
timeout |
float |
Request timeout in seconds |
5.0 |
circuit_breaker_threshold |
int |
Number of consecutive failures before opening circuit |
5 |
circuit_breaker_timeout |
float |
Seconds before attempting recovery from OPEN state |
30.0 |
Example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| from agentweave.authz import OPAProvider
# Default configuration
provider = OPAProvider()
# Production configuration
provider = OPAProvider(
endpoint="http://opa.internal:8181",
policy_path="agentweave/authz/allow",
default_deny=True, # Always deny when OPA unavailable
cache_ttl=120.0, # Cache for 2 minutes
timeout=10.0, # 10 second timeout
)
# Development configuration (NOT for production)
provider = OPAProvider(
endpoint="http://localhost:8181",
default_deny=False, # Allow when OPA unavailable (dev only!)
cache_ttl=10.0,
)
|
Methods
check()
Check authorization via OPA.
1
2
3
4
5
6
7
| async def check(
self,
caller_id: str,
resource: str,
action: str,
context: Optional[dict] = None
) -> AuthzDecision
|
This method:
- Checks the decision cache first
- Builds OPA input document with SPIFFE context
- Queries OPA via circuit breaker protection
- Caches the decision
- Audit logs the decision
Parameters:
| Parameter |
Type |
Description |
Default |
caller_id |
str |
SPIFFE ID of the caller |
Required |
resource |
str |
Resource being accessed (typically target SPIFFE ID) |
Required |
action |
str |
Action being performed |
Required |
context |
Optional[dict] |
Additional context for policy evaluation |
None |
Returns:
AuthzDecision: The authorization decision
Example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| decision = await provider.check(
caller_id="spiffe://agentweave.io/agent/search/prod",
resource="spiffe://agentweave.io/agent/database/prod",
action="read",
context={
"query_type": "customer_data",
"environment": "production",
"request_id": "req-12345"
}
)
if decision.allowed:
# Execute the action
result = await perform_action()
else:
# Log denial and reject
logger.warning(
f"Access denied: {decision.reason} "
f"(audit_id={decision.audit_id})"
)
raise PermissionDenied(decision.reason)
|
OPA Input Document Format:
The provider builds an input document with the following structure:
1
2
3
4
5
6
7
8
9
10
11
12
13
| {
"caller_spiffe_id": "spiffe://agentweave.io/agent/search/prod",
"resource_spiffe_id": "spiffe://agentweave.io/agent/database/prod",
"action": "read",
"timestamp": "2025-12-07T12:34:56.789Z",
"caller_trust_domain": "agentweave.io",
"resource_trust_domain": "agentweave.io",
"context": {
"query_type": "customer_data",
"environment": "production",
"request_id": "req-12345"
}
}
|
OPA Response Format:
OPA can return either a boolean or an object:
1
2
3
4
5
6
7
8
9
10
11
| // Boolean response
{"result": true}
// Object response
{
"result": {
"allow": true,
"reason": "Caller has read permission",
"policy_id": "database-read-policy"
}
}
|
health_check()
Check if OPA is healthy and reachable.
1
| async def health_check(self) -> bool
|
Sends a GET request to the OPA /health endpoint.
Returns:
bool: True if OPA is healthy (HTTP 200), False otherwise
Example:
1
2
3
4
5
| if await provider.health_check():
await agent.start()
else:
logger.error("OPA is unhealthy, cannot start agent")
sys.exit(1)
|
close()
Close the HTTP client.
1
| async def close(self) -> None
|
Call this to cleanup the internal HTTP client when shutting down.
Example:
1
2
3
4
5
| try:
# Use provider
await provider.check(...)
finally:
await provider.close()
|
CircuitBreaker
Circuit breaker for protecting against OPA failures.
1
2
| class CircuitBreaker:
"""Circuit breaker for OPA failures."""
|
The circuit breaker has three states:
| State |
Description |
Behavior |
CLOSED |
Normal operation |
All requests pass through to OPA |
OPEN |
Too many failures |
Requests fail immediately without calling OPA |
HALF_OPEN |
Testing recovery |
Limited requests pass through to test if OPA recovered |
Constructor
1
2
3
4
5
6
| def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
success_threshold: int = 2
)
|
Parameters:
| Parameter |
Type |
Description |
Default |
failure_threshold |
int |
Number of consecutive failures before opening circuit |
5 |
recovery_timeout |
float |
Seconds to wait in OPEN state before entering HALF_OPEN |
30.0 |
success_threshold |
int |
Number of consecutive successes in HALF_OPEN to close circuit |
2 |
State Transitions
1
2
3
4
| CLOSED --[failure_threshold failures]--> OPEN
OPEN --[recovery_timeout elapsed]--> HALF_OPEN
HALF_OPEN --[success_threshold successes]--> CLOSED
HALF_OPEN --[any failure]--> OPEN
|
Behavior
- CLOSED: Normal operation, failures increment counter
- OPEN: Immediately raises
CircuitBreakerError without calling OPA
- HALF_OPEN: Allows requests through; success closes circuit, failure opens it
DecisionCache
LRU cache for authorization decisions with TTL.
1
2
| class DecisionCache:
"""LRU cache for authorization decisions with TTL."""
|
Caches authorization decisions to reduce load on OPA and improve performance.
Constructor
1
2
3
4
5
| def __init__(
self,
max_size: int = 1000,
ttl_seconds: float = 60.0
)
|
Parameters:
| Parameter |
Type |
Description |
Default |
max_size |
int |
Maximum number of cached decisions |
1000 |
ttl_seconds |
float |
Time-to-live for cached decisions in seconds |
60.0 |
Behavior
- LRU Eviction: When cache is full, least recently used entries are evicted
- TTL Expiration: Entries older than TTL are automatically expired on access
- Cache Key: Hash of (caller_id, resource, action, context)
Usage Examples
Basic Usage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| from agentweave.authz import OPAProvider
# Create provider
provider = OPAProvider(
endpoint="http://opa:8181",
policy_path="agentweave/authz/allow",
default_deny=True
)
# Check authorization
decision = await provider.check(
caller_id="spiffe://agentweave.io/agent/api",
resource="spiffe://agentweave.io/agent/database",
action="write"
)
if decision.allowed:
await database.write(data)
else:
raise PermissionDenied(decision.reason)
# Cleanup
await provider.close()
|
With Context
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| decision = await provider.check(
caller_id="spiffe://agentweave.io/agent/frontend",
resource="spiffe://agentweave.io/agent/payment",
action="process_payment",
context={
"amount": 1000.00,
"currency": "USD",
"user_id": "user-12345",
"environment": "production",
"request_id": "req-abc123"
}
)
if not decision.allowed:
logger.warning(
f"Payment processing denied: {decision.reason} "
f"(audit_id={decision.audit_id})"
)
|
Custom Configuration
1
2
3
4
5
6
7
8
9
10
11
12
| from agentweave.authz import OPAProvider
provider = OPAProvider(
endpoint="https://opa.production.internal:8181",
policy_path="myorg/agent_authz/allow",
default_deny=True, # Always deny when OPA unavailable
cache_ttl=300.0, # Cache for 5 minutes
cache_size=5000, # Large cache
timeout=10.0, # 10 second timeout
circuit_breaker_threshold=3, # Open after 3 failures
circuit_breaker_timeout=60.0 # Wait 60s before retry
)
|
Health Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| import asyncio
from agentweave.authz import OPAProvider
async def monitor_opa_health(provider: OPAProvider):
"""Monitor OPA health and alert on failures."""
while True:
is_healthy = await provider.health_check()
if not is_healthy:
logger.error("OPA health check failed!")
# Send alert, trigger runbook, etc.
await asyncio.sleep(30) # Check every 30 seconds
provider = OPAProvider()
asyncio.create_task(monitor_opa_health(provider))
|
Error Handling
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| from agentweave.authz import (
OPAProvider,
AuthorizationError,
PolicyEvaluationError,
CircuitBreakerError
)
provider = OPAProvider(default_deny=True)
try:
decision = await provider.check(
caller_id="spiffe://agentweave.io/agent/test",
resource="spiffe://agentweave.io/agent/prod",
action="delete"
)
except CircuitBreakerError:
logger.error("Circuit breaker is open, OPA unavailable")
# Fall back to default deny
decision = AuthzDecision(
allowed=False,
reason="Circuit breaker open",
policy_id="circuit-breaker"
)
except PolicyEvaluationError as e:
logger.error(f"Policy evaluation failed: {e}")
# Fall back to default deny
decision = AuthzDecision(
allowed=False,
reason=f"Policy evaluation error: {e}",
policy_id="error-fallback"
)
except AuthorizationError as e:
logger.error(f"Authorization check failed: {e}")
raise
|
Cache Usage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| from agentweave.authz import OPAProvider
# Configure aggressive caching
provider = OPAProvider(
cache_ttl=600.0, # 10 minute TTL
cache_size=10000 # Large cache
)
# First call - cache miss, queries OPA
decision1 = await provider.check(
caller_id="spiffe://agentweave.io/agent/a",
resource="spiffe://agentweave.io/agent/b",
action="read"
)
# Second call - cache hit, no OPA query
decision2 = await provider.check(
caller_id="spiffe://agentweave.io/agent/a",
resource="spiffe://agentweave.io/agent/b",
action="read"
)
# Different context - cache miss
decision3 = await provider.check(
caller_id="spiffe://agentweave.io/agent/a",
resource="spiffe://agentweave.io/agent/b",
action="read",
context={"user": "different"} # Different cache key
)
|
Exceptions
AuthorizationError
Base exception for authorization-related errors.
1
2
| class AuthorizationError(Exception):
"""Raised when authorization check fails."""
|
Usage:
1
2
3
4
5
6
| from agentweave.authz import AuthorizationError
try:
decision = await provider.check(...)
except AuthorizationError as e:
logger.error(f"Authorization failed: {e}")
|
PolicyEvaluationError
Raised when OPA policy evaluation fails.
1
2
| class PolicyEvaluationError(AuthorizationError):
"""Raised when OPA policy evaluation fails."""
|
This is a specific type of AuthorizationError that occurs when there are technical issues evaluating the policy (not policy denials).
Examples:
- OPA returned malformed response
- Policy evaluation timeout
- OPA server error (500)
- Invalid policy input document
Usage:
1
2
3
4
5
6
7
| from agentweave.authz import PolicyEvaluationError
try:
decision = await provider.check(...)
except PolicyEvaluationError as e:
logger.error(f"Policy evaluation failed: {e}")
# Fall back to default deny
|
CircuitBreakerError
Raised when circuit breaker is open.
1
2
| class CircuitBreakerError(Exception):
"""Raised when circuit breaker is open."""
|
Usage:
1
2
3
4
5
6
7
| from agentweave.authz import CircuitBreakerError
try:
decision = await provider.check(...)
except CircuitBreakerError:
logger.warning("Circuit breaker open, OPA unavailable")
# Apply default policy
|
OPA Policy Examples
Basic Allow Policy
1
2
3
4
5
6
7
8
| package agentweave.authz
default allow = false
# Allow all agents in same trust domain
allow {
input.caller_trust_domain == input.resource_trust_domain
}
|
Role-Based Access Control
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| package agentweave.authz
default allow = {
"allow": false,
"reason": "Default deny"
}
# Admin agents can do anything
allow = {
"allow": true,
"reason": "Caller has admin role",
"policy_id": "admin-access"
} {
contains(input.caller_spiffe_id, "/admin/")
}
# Read-only agents can only read
allow = {
"allow": true,
"reason": "Caller has read role",
"policy_id": "read-access"
} {
contains(input.caller_spiffe_id, "/reader/")
input.action == "read"
}
|
Environment-Based Access
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| package agentweave.authz
default allow = false
# Production agents can only access production
allow {
contains(input.caller_spiffe_id, "/prod/")
contains(input.resource_spiffe_id, "/prod/")
}
# Dev agents can only access dev
allow {
contains(input.caller_spiffe_id, "/dev/")
contains(input.resource_spiffe_id, "/dev/")
}
|
Best Practices
1. Always Use Default Deny in Production
1
2
3
4
5
| # GOOD - Production
provider = OPAProvider(default_deny=True)
# BAD - Never in production!
provider = OPAProvider(default_deny=False)
|
1
2
3
4
5
| # For frequently changing policies - short TTL
provider = OPAProvider(cache_ttl=10.0)
# For stable policies - longer TTL
provider = OPAProvider(cache_ttl=300.0)
|
3. Monitor Circuit Breaker State
1
2
3
| if provider._circuit_breaker.state == "OPEN":
logger.error("Circuit breaker is OPEN!")
# Alert operations team
|
4. Include Rich Context
1
2
3
4
5
6
7
8
9
10
11
12
| decision = await provider.check(
caller_id=caller,
resource=resource,
action=action,
context={
"request_id": request_id,
"environment": environment,
"user_id": user_id,
"timestamp": datetime.utcnow().isoformat(),
# Include any data needed for policy decisions
}
)
|
5. Log Denials for Security Auditing
1
2
3
4
5
6
7
8
9
10
| decision = await provider.check(...)
if not decision.allowed:
logger.warning(
f"AUTHZ_DENY: {decision.audit_id} | "
f"caller={caller_id} | "
f"resource={resource} | "
f"action={action} | "
f"reason={decision.reason}"
)
|
6. Use Health Checks in Readiness Probes
1
2
3
| # Kubernetes readiness probe
async def readiness():
return await authz_provider.health_check()
|
See Also