Identity Module

The agentweave.identity module provides cryptographic identity management using SPIFFE/SPIRE. All agents must have a verified identity before they can communicate with other agents.

Module Overview

1
2
3
4
5
6
7
8
from agentweave.identity import (
    IdentityProvider,          # Abstract base class
    SPIFFEIdentityProvider,    # SPIFFE/SPIRE implementation
    IdentityError,             # Base exception
    TrustDomainError,         # Trust domain errors
    SVIDExpiredError,         # SVID expiration errors
    ConnectionError,          # Connection errors
)

Classes

IdentityProvider

Abstract base class that defines the interface for all identity providers.

1
2
3
4
5
6
7
from abc import ABC, abstractmethod
import ssl
from typing import Optional
from spiffe import X509Svid, X509Bundle

class IdentityProvider(ABC):
    """Abstract base class for identity providers."""

Methods

get_identity()

Get the SPIFFE ID of this workload.

1
async def get_identity(self) -> str

Returns:

  • str: The SPIFFE ID in the format spiffe://trust-domain/path

Raises:

  • IdentityError: If identity cannot be determined

Example:

1
2
identity = await provider.get_identity()
print(identity)  # spiffe://agentweave.io/agent/search/prod

get_svid()

Get the current X.509 SVID for this workload.

1
async def get_svid(self) -> X509Svid

Returns a cached SVID if available and valid, or fetches a new one if needed. The SVID contains the certificate chain and private key needed for mTLS.

Returns:

  • X509Svid: The X.509 SVID containing certificate and private key

Raises:

  • IdentityError: If SVID cannot be obtained

Example:

1
2
3
svid = await provider.get_svid()
print(f"SVID expires: {svid.leaf.not_valid_after_utc}")
print(f"SPIFFE ID: {svid.spiffe_id}")

get_trust_bundle()

Get the trust bundle for verifying peer SVIDs.

1
async def get_trust_bundle(self, trust_domain: Optional[str] = None) -> X509Bundle

The trust bundle contains the CA certificates needed to verify SVIDs from peers in the specified trust domain.

Parameters:

Parameter Type Description Default
trust_domain Optional[str] The trust domain to get the bundle for. If None, returns the bundle for this workload's trust domain. None

Returns:

  • X509Bundle: The trust bundle containing CA certificates

Raises:

  • IdentityError: If trust bundle cannot be obtained
  • TrustDomainError: If the trust domain is not recognized

Example:

1
2
3
4
5
# Get our own trust bundle
bundle = await provider.get_trust_bundle()

# Get trust bundle for another domain
partner_bundle = await provider.get_trust_bundle("partner.com")

create_tls_context()

Create an SSL context configured for mTLS.

1
async def create_tls_context(self, server: bool = False) -> ssl.SSLContext

The SSL context will be configured with:

  • Current SVID certificate and private key
  • Trust bundle for peer verification
  • TLS 1.3 minimum version (or as configured)
  • Mutual authentication enabled
  • Hostname checking disabled (SPIFFE uses SPIFFE ID verification)

Parameters:

Parameter Type Description Default
server bool If True, create a server-side SSL context. If False, create a client-side context. False

Returns:

  • ssl.SSLContext: Configured SSL context ready for use

Raises:

  • IdentityError: If SSL context cannot be created

Example:

1
2
3
4
5
# Client context
client_ctx = await provider.create_tls_context(server=False)

# Server context
server_ctx = await provider.create_tls_context(server=True)

health_check()

Check if the identity provider is healthy.

1
async def health_check(self) -> bool

This method verifies that the provider can obtain identity and trust bundles. It's useful for readiness probes.

Returns:

  • bool: True if healthy, False otherwise

Example:

1
2
3
4
if await provider.health_check():
    print("Identity provider is healthy")
else:
    print("Identity provider is unhealthy")

SPIFFEIdentityProvider

SPIFFE Workload API-based identity provider implementation.

1
2
class SPIFFEIdentityProvider(IdentityProvider):
    """SPIFFE Workload API-based identity provider."""

This provider connects to a SPIRE agent (or other SPIFFE Workload API implementation) to obtain X.509 SVIDs for workload identity. It handles:

  • Automatic SVID fetching and caching
  • Certificate rotation with callbacks
  • Trust bundle management for multiple trust domains
  • mTLS SSL context creation

The provider will automatically watch for SVID updates and invoke registered callbacks when rotation occurs.

Constructor

1
2
3
4
5
def __init__(
    self,
    endpoint: Optional[str] = None,
    tls_min_version: ssl.TLSVersion = ssl.TLSVersion.TLSv1_3,
)

Parameters:

Parameter Type Description Default
endpoint Optional[str] The SPIFFE Workload API endpoint. If None, will use the SPIFFE_ENDPOINT_SOCKET environment variable, or default to unix:///run/spire/sockets/agent.sock None
tls_min_version ssl.TLSVersion Minimum TLS version for SSL contexts. ssl.TLSVersion.TLSv1_3

Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import ssl
from agentweave.identity import SPIFFEIdentityProvider

# Use default endpoint
provider = SPIFFEIdentityProvider()

# Use custom endpoint
provider = SPIFFEIdentityProvider(
    endpoint="unix:///custom/path/agent.sock"
)

# Use TLS 1.2 minimum
provider = SPIFFEIdentityProvider(
    tls_min_version=ssl.TLSVersion.TLSv1_2
)

Methods

initialize()

Initialize the connection to the SPIFFE Workload API.

1
async def initialize(self) -> None

This method must be called before using the provider. It establishes the connection to the SPIRE agent and fetches the initial SVID.

Raises:

  • ConnectionError: If connection to Workload API fails
  • IdentityError: If initial SVID cannot be fetched

Example:

1
2
provider = SPIFFEIdentityProvider()
await provider.initialize()

shutdown()

Shutdown the identity provider and cleanup resources.

1
async def shutdown(self) -> None

This cancels the SVID watch task and cleans up temporary files.

Example:

1
await provider.shutdown()

register_rotation_callback()

Register a callback to be invoked when SVID rotates.

1
2
3
4
def register_rotation_callback(
    self,
    callback: Callable[[X509Svid], Awaitable[None]]
) -> None

The callback will be called with the new SVID whenever automatic rotation occurs. This is useful for updating SSL contexts or notifying other components.

Parameters:

Parameter Type Description
callback Callable[[X509Svid], Awaitable[None]] Async function that takes an X509Svid parameter

Example:

1
2
3
4
5
6
async def on_svid_rotation(new_svid: X509Svid):
    print(f"SVID rotated: {new_svid.spiffe_id}")
    print(f"New expiration: {new_svid.leaf.not_valid_after_utc}")
    # Update TLS contexts, etc.

provider.register_rotation_callback(on_svid_rotation)

get_identity()

Get the SPIFFE ID of this workload.

1
async def get_identity(self) -> str

Returns:

  • str: The SPIFFE ID in the format spiffe://trust-domain/path

Raises:

  • IdentityError: If identity cannot be determined or provider not initialized

Example:

1
2
identity = await provider.get_identity()
print(identity)  # spiffe://agentweave.io/agent/search/prod

get_svid()

Get the current X.509 SVID for this workload.

1
async def get_svid(self) -> X509Svid

Returns a cached SVID if available, or fetches a new one. The SVID is automatically rotated when it approaches expiration.

Returns:

  • X509Svid: The X.509 SVID containing certificate and private key

Raises:

  • IdentityError: If SVID cannot be obtained or provider not initialized

Example:

1
2
svid = await provider.get_svid()
print(f"Certificate chain length: {len(svid.cert_chain())}")

get_trust_bundle()

Get the trust bundle for verifying peer SVIDs.

1
async def get_trust_bundle(self, trust_domain: Optional[str] = None) -> X509Bundle

Parameters:

Parameter Type Description Default
trust_domain Optional[str] The trust domain to get the bundle for. If None, returns the bundle for this workload's trust domain. None

Returns:

  • X509Bundle: The trust bundle containing CA certificates

Raises:

  • IdentityError: If trust bundle cannot be obtained or provider not initialized
  • TrustDomainError: If the trust domain is not recognized

Example:

1
2
3
4
5
# Get our own trust bundle
bundle = await provider.get_trust_bundle()

# Get trust bundle for federated domain
partner_bundle = await provider.get_trust_bundle("partner.agentweave.io")

create_tls_context()

Create an SSL context configured for mTLS.

1
async def create_tls_context(self, server: bool = False) -> ssl.SSLContext

The context is configured with:

  • Current SVID certificate and private key
  • Trust bundle for peer verification
  • TLS 1.3 minimum version (configurable)
  • Mutual authentication enabled
  • Hostname checking disabled (SPIFFE uses SPIFFE ID verification)

Parameters:

Parameter Type Description Default
server bool If True, create a server-side SSL context. If False, create a client-side context. False

Returns:

  • ssl.SSLContext: Configured SSL context ready for use

Raises:

  • IdentityError: If SSL context cannot be created or provider not initialized

Example:

1
2
3
4
5
# Create client context for making requests
client_ctx = await provider.create_tls_context(server=False)

# Create server context for accepting connections
server_ctx = await provider.create_tls_context(server=True)

health_check()

Check if the identity provider is healthy.

1
async def health_check(self) -> bool

Returns:

  • bool: True if healthy and initialized, False otherwise

Example:

1
2
3
4
if await provider.health_check():
    await agent.start()
else:
    logger.error("Identity provider unhealthy, cannot start agent")

Usage Examples

Basic Usage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from agentweave.identity import SPIFFEIdentityProvider

# Create and initialize provider
provider = SPIFFEIdentityProvider()
await provider.initialize()

try:
    # Get identity information
    identity = await provider.get_identity()
    print(f"My identity: {identity}")

    # Get SVID for certificate details
    svid = await provider.get_svid()
    print(f"Certificate expires: {svid.leaf.not_valid_after_utc}")

    # Create TLS context for secure communication
    tls_ctx = await provider.create_tls_context(server=False)

finally:
    await provider.shutdown()

Context Manager Pattern

1
2
3
4
5
6
7
8
9
10
11
12
13
from agentweave.identity import SPIFFEIdentityProvider

async def main():
    provider = SPIFFEIdentityProvider(
        endpoint="unix:///run/spire/sockets/agent.sock"
    )

    await provider.initialize()
    try:
        identity = await provider.get_identity()
        print(f"Identity: {identity}")
    finally:
        await provider.shutdown()

SVID Rotation Handling

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from agentweave.identity import SPIFFEIdentityProvider
from spiffe import X509Svid
import logging

logger = logging.getLogger(__name__)

async def handle_rotation(new_svid: X509Svid):
    """Handle SVID rotation by updating SSL contexts."""
    logger.info(f"SVID rotated for {new_svid.spiffe_id}")
    logger.info(f"New expiration: {new_svid.leaf.not_valid_after_utc}")

    # Update TLS contexts, reconnect clients, etc.
    await update_tls_contexts(new_svid)

# Register callback
provider = SPIFFEIdentityProvider()
provider.register_rotation_callback(handle_rotation)
await provider.initialize()

Multi-Trust Domain Setup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from agentweave.identity import SPIFFEIdentityProvider

provider = SPIFFEIdentityProvider()
await provider.initialize()

# Get our own trust domain bundle
our_bundle = await provider.get_trust_bundle()

# Get federated trust domain bundles
partner_bundle = await provider.get_trust_bundle("partner.agentweave.io")
customer_bundle = await provider.get_trust_bundle("customer.example.com")

print(f"Our trust domain: {our_bundle.trust_domain}")
print(f"Partner trust domain: {partner_bundle.trust_domain}")

Health Monitoring

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from agentweave.identity import SPIFFEIdentityProvider
import asyncio

async def monitor_health(provider: SPIFFEIdentityProvider):
    """Periodically check identity provider health."""
    while True:
        is_healthy = await provider.health_check()
        if not is_healthy:
            logger.error("Identity provider unhealthy!")
            # Trigger alerts, circuit breakers, etc.

        await asyncio.sleep(30)  # Check every 30 seconds

provider = SPIFFEIdentityProvider()
await provider.initialize()

# Start health monitoring in background
asyncio.create_task(monitor_health(provider))

Exceptions

IdentityError

Base exception for identity-related errors.

1
2
class IdentityError(Exception):
    """Base exception for identity-related errors."""

Usage:

1
2
3
4
5
6
from agentweave.identity import IdentityError

try:
    svid = await provider.get_svid()
except IdentityError as e:
    logger.error(f"Failed to get SVID: {e}")

TrustDomainError

Exception raised when a trust domain is not recognized or trusted.

1
2
class TrustDomainError(IdentityError):
    """Exception raised when a trust domain is not recognized or trusted."""

Usage:

1
2
3
4
5
6
from agentweave.identity import TrustDomainError

try:
    bundle = await provider.get_trust_bundle("unknown-domain.com")
except TrustDomainError as e:
    logger.error(f"Trust domain not found: {e}")

SVIDExpiredError

Exception raised when an SVID has expired and cannot be renewed.

1
2
class SVIDExpiredError(IdentityError):
    """Exception raised when an SVID has expired and cannot be renewed."""

Usage:

1
2
3
4
5
6
7
from agentweave.identity import SVIDExpiredError

try:
    svid = await provider.get_svid()
except SVIDExpiredError as e:
    logger.critical(f"SVID expired and rotation failed: {e}")
    # Trigger emergency shutdown

ConnectionError

Exception raised when connection to identity provider fails.

1
2
class ConnectionError(IdentityError):
    """Exception raised when connection to identity provider fails."""

Usage:

1
2
3
4
5
6
7
from agentweave.identity import ConnectionError

try:
    await provider.initialize()
except ConnectionError as e:
    logger.error(f"Cannot connect to SPIRE agent: {e}")
    # Check that SPIRE agent is running

Best Practices

1. Always Initialize Before Use

1
2
provider = SPIFFEIdentityProvider()
await provider.initialize()  # Required!

2. Handle Cleanup Properly

1
2
3
4
5
try:
    await provider.initialize()
    # Use provider
finally:
    await provider.shutdown()

3. Monitor SVID Expiration

1
2
3
4
svid = await provider.get_svid()
time_until_expiry = svid.leaf.not_valid_after_utc - datetime.utcnow()
if time_until_expiry < timedelta(hours=1):
    logger.warning("SVID expires soon!")

4. Register Rotation Callbacks Early

1
2
3
provider = SPIFFEIdentityProvider()
provider.register_rotation_callback(handle_rotation)
await provider.initialize()

5. Use Health Checks

1
2
3
# In Kubernetes readiness probe
async def readiness():
    return await identity_provider.health_check()

See Also