Performance Tuning

This guide shows you how to optimize AgentWeave agents for high performance, including connection pooling, caching, async best practices, and monitoring.

Table of Contents

  1. Performance Tuning
    1. Connection Pool Configuration
      1. Default Configuration
      2. Sizing Connection Pools
      3. Monitoring Connection Pool Health
    2. Cache Tuning (OPA Decision Cache)
      1. Enable Decision Caching
      2. Cache Hit Ratio Monitoring
      3. Cache Invalidation Strategy
    3. Async Best Practices
      1. Use asyncio.gather for Parallel Calls
      2. Avoid Blocking I/O
      3. Use Semaphores for Concurrency Control
    4. Profiling Agents
      1. Using cProfile
      2. Using py-spy
      3. Custom Performance Metrics
    5. Metrics to Monitor
      1. Request Metrics
      2. System Metrics
      3. Authorization Metrics
    6. Common Bottlenecks
      1. 1. Sequential Agent Calls
      2. 2. OPA Policy Evaluation Overhead
      3. 3. Connection Pool Exhaustion
      4. 4. Blocking I/O in Event Loop
      5. 5. Large Payload Serialization
    7. Scaling Strategies
      1. Horizontal Scaling
      2. Vertical Scaling
      3. Auto-scaling
    8. Performance Testing
      1. Using Locust
    9. Related Guides
    10. External Resources

Connection Pool Configuration

Connection pooling reuses TCP connections to reduce overhead and improve throughput.

Default Configuration

1
2
3
4
5
6
7
# config.yaml
transport:
  connection_pool:
    max_connections: 100           # Maximum connections per destination
    idle_timeout_seconds: 60       # Close idle connections after 60s
    max_keepalive_seconds: 300     # Maximum connection lifetime
    connect_timeout_seconds: 10    # Connection establishment timeout

Sizing Connection Pools

Rule of thumb: max_connections = (expected_concurrent_requests × average_request_time) + buffer

Examples:

Low-traffic agent (< 10 req/s):

1
2
3
4
transport:
  connection_pool:
    max_connections: 10
    idle_timeout_seconds: 30

Medium-traffic agent (10-100 req/s):

1
2
3
4
transport:
  connection_pool:
    max_connections: 50
    idle_timeout_seconds: 60

High-traffic agent (> 100 req/s):

1
2
3
4
5
transport:
  connection_pool:
    max_connections: 200
    idle_timeout_seconds: 120
    max_keepalive_seconds: 600

Monitoring Connection Pool Health

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from agentweave import SecureAgent
from agentweave.observability.metrics import get_metrics_registry

class MonitoredAgent(SecureAgent):
    async def start(self):
        await super().start()

        # Start connection pool monitoring
        asyncio.create_task(self._monitor_connection_pool())

    async def _monitor_connection_pool(self):
        """Monitor connection pool metrics."""
        metrics = get_metrics_registry()

        while self.is_running:
            # Get pool stats from transport layer
            pool_stats = await self.transport.get_pool_stats()

            metrics.gauge("connection_pool.active", pool_stats.active)
            metrics.gauge("connection_pool.idle", pool_stats.idle)
            metrics.gauge("connection_pool.total", pool_stats.total)
            metrics.gauge("connection_pool.utilization",
                         pool_stats.active / pool_stats.max_connections)

            # Alert if pool is exhausted
            if pool_stats.active >= pool_stats.max_connections * 0.9:
                self.logger.warning(
                    "Connection pool near capacity",
                    extra={
                        "active": pool_stats.active,
                        "max": pool_stats.max_connections,
                        "utilization": pool_stats.active / pool_stats.max_connections
                    }
                )

            await asyncio.sleep(10)  # Check every 10 seconds

Cache Tuning (OPA Decision Cache)

Caching OPA authorization decisions can significantly improve performance.

Enable Decision Caching

1
2
3
4
5
6
7
8
9
10
11
12
# config.yaml
authorization:
  provider: opa
  opa_endpoint: http://localhost:8181
  policy_path: agentweave/authz

  # Decision cache configuration
  cache:
    enabled: true
    ttl_seconds: 300              # Cache for 5 minutes
    max_size: 10000               # Max 10k cached decisions
    eviction_policy: lru          # Least recently used

Cache Hit Ratio Monitoring

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from agentweave.authz import OPAAuthzProvider

class CachedAuthzAgent(SecureAgent):
    def __init__(self):
        super().__init__()
        self.cache_hits = 0
        self.cache_misses = 0

    async def _check_authorization_with_metrics(
        self,
        caller_id: str,
        action: str
    ) -> bool:
        """Check authorization and track cache performance."""
        # Check if decision is cached
        cache_key = f"{caller_id}:{action}"
        cached_decision = await self.authz_provider.cache.get(cache_key)

        if cached_decision is not None:
            self.cache_hits += 1
            return cached_decision.allowed
        else:
            self.cache_misses += 1

        # Make OPA decision
        decision = await self.authz_provider.check_inbound(caller_id, action)

        # Log cache performance
        total = self.cache_hits + self.cache_misses
        hit_ratio = self.cache_hits / total if total > 0 else 0

        if total % 100 == 0:  # Log every 100 requests
            self.logger.info(
                f"Authorization cache hit ratio: {hit_ratio:.2%}",
                extra={
                    "hits": self.cache_hits,
                    "misses": self.cache_misses,
                    "ratio": hit_ratio
                }
            )

        return decision.allowed

Cache Invalidation Strategy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from agentweave.authz.cache import DecisionCache

class SmartCacheAgent(SecureAgent):
    def __init__(self):
        super().__init__()
        self.decision_cache = DecisionCache(
            ttl_seconds=300,
            max_size=10000
        )

    async def invalidate_cache_for_agent(self, spiffe_id: str):
        """Invalidate all cached decisions involving an agent."""
        # When agent permissions change, clear related cache entries
        await self.decision_cache.invalidate_pattern(f"*{spiffe_id}*")

        self.logger.info(
            f"Invalidated cache for {spiffe_id}",
            extra={"spiffe_id": spiffe_id}
        )

    async def on_policy_update(self, policy_version: str):
        """Clear all cached decisions when policy updates."""
        await self.decision_cache.clear()

        self.logger.info(
            "Cleared authorization cache due to policy update",
            extra={"policy_version": policy_version}
        )

Async Best Practices

Maximize concurrency and avoid blocking the event loop.

Use asyncio.gather for Parallel Calls

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import asyncio
from agentweave import SecureAgent, capability

class ParallelAgent(SecureAgent):
    @capability("aggregate")
    async def aggregate(self, query: dict) -> dict:
        # BAD: Sequential calls (slow)
        # search_results = await self.call_agent("search", "search", query)
        # index_results = await self.call_agent("indexer", "query", query)
        # storage_results = await self.call_agent("storage", "retrieve", query)

        # GOOD: Parallel calls (fast)
        search_task = self.call_agent(
            "spiffe://yourdomain.com/agent/search",
            "search",
            query
        )
        index_task = self.call_agent(
            "spiffe://yourdomain.com/agent/indexer",
            "query",
            query
        )
        storage_task = self.call_agent(
            "spiffe://yourdomain.com/agent/storage",
            "retrieve",
            query
        )

        # Wait for all to complete
        search_results, index_results, storage_results = await asyncio.gather(
            search_task,
            index_task,
            storage_task,
            return_exceptions=True  # Don't fail if one fails
        )

        return {
            "search": search_results if not isinstance(search_results, Exception) else None,
            "index": index_results if not isinstance(index_results, Exception) else None,
            "storage": storage_results if not isinstance(storage_results, Exception) else None,
        }

Avoid Blocking I/O

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
import aiofiles
from agentweave import SecureAgent, capability

class NonBlockingAgent(SecureAgent):
    @capability("process_file")
    async def process_file(self, filepath: str) -> dict:
        # BAD: Blocking I/O
        # with open(filepath, 'r') as f:
        #     data = f.read()  # Blocks event loop!

        # GOOD: Async I/O
        async with aiofiles.open(filepath, 'r') as f:
            data = await f.read()  # Non-blocking

        # Process data
        result = await self._process_data(data)
        return result

    async def _process_data(self, data: str) -> dict:
        # If you must use blocking code, run in executor
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(
            None,  # Uses default ThreadPoolExecutor
            self._blocking_computation,
            data
        )
        return result

    def _blocking_computation(self, data: str) -> dict:
        """CPU-intensive computation that would block event loop."""
        # Heavy computation here
        return {"processed": len(data)}

Use Semaphores for Concurrency Control

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio
from agentweave import SecureAgent

class ThrottledAgent(SecureAgent):
    def __init__(self):
        super().__init__()
        # Limit to 10 concurrent outbound calls
        self.call_semaphore = asyncio.Semaphore(10)

    async def call_with_throttle(
        self,
        callee_id: str,
        action: str,
        payload: dict
    ) -> dict:
        """Make call with concurrency limiting."""
        async with self.call_semaphore:
            return await self.call_agent(callee_id, action, payload)

    @capability("batch_process")
    async def batch_process(self, items: list) -> list:
        """Process many items with concurrency limit."""
        tasks = [
            self.call_with_throttle(
                "spiffe://yourdomain.com/agent/processor",
                "process",
                {"item": item}
            )
            for item in items
        ]

        # Process all, but only 10 concurrent at a time
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

Profiling Agents

Identify performance bottlenecks in your agents.

Using cProfile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import cProfile
import pstats
from agentweave import SecureAgent

class ProfiledAgent(SecureAgent):
    @capability("profile_test")
    async def profile_test(self, data: dict) -> dict:
        """Capability that can be profiled."""
        profiler = cProfile.Profile()
        profiler.enable()

        try:
            result = await self._do_work(data)
            return result
        finally:
            profiler.disable()

            # Print top 20 time-consuming functions
            stats = pstats.Stats(profiler)
            stats.sort_stats('cumulative')
            stats.print_stats(20)

Using py-spy

1
2
3
4
5
6
7
8
9
10
11
# Install py-spy
pip install py-spy

# Profile running agent
py-spy top --pid $(pgrep -f "python.*agent.py")

# Generate flamegraph
py-spy record --pid $(pgrep -f "python.*agent.py") --output profile.svg

# Profile for 60 seconds
py-spy record --pid $(pgrep -f "python.*agent.py") --duration 60 --output profile.svg

Custom Performance Metrics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import time
from functools import wraps
from agentweave import SecureAgent, capability
from agentweave.observability.metrics import get_metrics_registry

def measure_time(metric_name: str):
    """Decorator to measure execution time."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start = time.perf_counter()
            try:
                result = await func(*args, **kwargs)
                return result
            finally:
                duration = time.perf_counter() - start
                metrics = get_metrics_registry()
                metrics.histogram(metric_name, duration)
        return wrapper
    return decorator


class MetricsAgent(SecureAgent):
    @capability("search")
    @measure_time("capability.search.duration")
    async def search(self, query: str) -> dict:
        """Search capability with automatic timing."""
        results = await self._perform_search(query)
        return {"results": results}

    @measure_time("search.database.query")
    async def _perform_search(self, query: str) -> list:
        """Database search (timed separately)."""
        # Database query here
        return []

Metrics to Monitor

Key performance indicators for AgentWeave agents.

Request Metrics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from agentweave.observability.metrics import get_metrics_registry

class MonitoredAgent(SecureAgent):
    def __init__(self):
        super().__init__()
        self.metrics = get_metrics_registry()

    @capability("process")
    async def process(self, data: dict) -> dict:
        # Track request count
        self.metrics.counter("requests.total").inc()
        self.metrics.counter("requests.by_capability", {"capability": "process"}).inc()

        start_time = time.perf_counter()

        try:
            result = await self._do_processing(data)

            # Track success
            self.metrics.counter("requests.success").inc()

            # Track duration
            duration = time.perf_counter() - start_time
            self.metrics.histogram("request.duration", duration, {"capability": "process"})

            # Track payload size
            self.metrics.histogram("request.payload_size", len(str(data)))

            return result

        except Exception as e:
            # Track errors
            self.metrics.counter("requests.error", {"error_type": type(e).__name__}).inc()
            raise

System Metrics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import psutil
import asyncio

class SystemMetricsAgent(SecureAgent):
    async def start(self):
        await super().start()
        asyncio.create_task(self._collect_system_metrics())

    async def _collect_system_metrics(self):
        """Collect system-level metrics."""
        metrics = get_metrics_registry()

        while self.is_running:
            # CPU usage
            cpu_percent = psutil.cpu_percent(interval=1)
            metrics.gauge("system.cpu.percent", cpu_percent)

            # Memory usage
            memory = psutil.virtual_memory()
            metrics.gauge("system.memory.percent", memory.percent)
            metrics.gauge("system.memory.available_mb", memory.available / 1024 / 1024)

            # Disk usage
            disk = psutil.disk_usage('/')
            metrics.gauge("system.disk.percent", disk.percent)

            # Network I/O
            net_io = psutil.net_io_counters()
            metrics.counter("system.network.bytes_sent", net_io.bytes_sent)
            metrics.counter("system.network.bytes_recv", net_io.bytes_recv)

            await asyncio.sleep(30)  # Collect every 30 seconds

Authorization Metrics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class AuthzMetricsAgent(SecureAgent):
    async def _check_authorization_with_metrics(
        self,
        caller_id: str,
        action: str
    ):
        """Check authorization and emit metrics."""
        metrics = get_metrics_registry()

        start = time.perf_counter()

        try:
            decision = await self.authz_provider.check_inbound(caller_id, action)

            # Track decision time
            duration = time.perf_counter() - start
            metrics.histogram("authz.check.duration", duration)

            # Track decision outcome
            if decision.allowed:
                metrics.counter("authz.decisions.allowed").inc()
            else:
                metrics.counter("authz.decisions.denied").inc()

            # Track by action
            metrics.counter(
                "authz.decisions.by_action",
                {"action": action, "result": "allowed" if decision.allowed else "denied"}
            ).inc()

            return decision

        except Exception as e:
            metrics.counter("authz.check.errors", {"error_type": type(e).__name__}).inc()
            raise

Common Bottlenecks

1. Sequential Agent Calls

Problem: Calling agents one after another

1
2
3
4
5
# SLOW: Sequential (total time = sum of all calls)
result1 = await agent.call_agent("agent1", "action1", {})
result2 = await agent.call_agent("agent2", "action2", {})
result3 = await agent.call_agent("agent3", "action3", {})
# Total: 300ms + 200ms + 150ms = 650ms

Solution: Parallel calls with asyncio.gather

1
2
3
4
5
6
7
# FAST: Parallel (total time = max of all calls)
results = await asyncio.gather(
    agent.call_agent("agent1", "action1", {}),
    agent.call_agent("agent2", "action2", {}),
    agent.call_agent("agent3", "action3", {}),
)
# Total: max(300ms, 200ms, 150ms) = 300ms

2. OPA Policy Evaluation Overhead

Problem: Every request hits OPA

Solution: Enable decision caching (see Cache Tuning section)

3. Connection Pool Exhaustion

Problem: Too many concurrent requests, not enough connections

Symptoms:

  • Requests waiting for connections
  • High request latency
  • Timeout errors

Solution: Increase connection pool size or add concurrency limits

1
2
3
transport:
  connection_pool:
    max_connections: 200  # Increase from default 100

4. Blocking I/O in Event Loop

Problem: Synchronous I/O blocks all requests

Solution: Use async I/O libraries (aiofiles, aiomysql, aioredis, etc.)

5. Large Payload Serialization

Problem: JSON serialization/deserialization is slow for large payloads

Solution: Use streaming or chunked transfer

1
2
3
4
5
6
7
8
9
10
11
class StreamingAgent(SecureAgent):
    @capability("process_large_data")
    async def process_large_data(self, data_url: str) -> dict:
        # Instead of loading entire payload
        # BAD: data = await download_all(data_url)

        # Stream and process in chunks
        async for chunk in self._stream_download(data_url):
            await self._process_chunk(chunk)

        return {"status": "completed"}

Scaling Strategies

Horizontal Scaling

Deploy multiple instances of your agent behind a load balancer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: search-agent
spec:
  replicas: 5  # Run 5 instances
  selector:
    matchLabels:
      app: search-agent
  template:
    metadata:
      labels:
        app: search-agent
    spec:
      containers:
      - name: search-agent
        image: myorg/search-agent:v1
        resources:
          requests:
            cpu: 500m
            memory: 512Mi
          limits:
            cpu: 1000m
            memory: 1Gi

Vertical Scaling

Increase resources for a single instance.

1
2
3
4
5
6
7
8
# Increase CPU/memory limits
resources:
  requests:
    cpu: 2000m    # 2 cores
    memory: 4Gi   # 4 GB
  limits:
    cpu: 4000m    # 4 cores
    memory: 8Gi   # 8 GB

Auto-scaling

Automatically scale based on metrics.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# kubernetes/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: search-agent-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: search-agent
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Pods
    pods:
      metric:
        name: requests_per_second
      target:
        type: AverageValue
        averageValue: "100"

Performance Testing

Load test your agents before production.

Using Locust

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# locustfile.py
from locust import HttpUser, task, between
import json

class AgentUser(HttpUser):
    wait_time = between(1, 3)

    @task
    def search(self):
        """Simulate search request."""
        payload = {
            "action": "search",
            "query": "test query",
            "limit": 10
        }
        self.client.post(
            "/task",
            json=payload,
            headers={"Content-Type": "application/json"}
        )

    @task(2)  # 2x weight
    def process(self):
        """Simulate process request."""
        payload = {
            "action": "process",
            "data": {"key": "value"}
        }
        self.client.post("/task", json=payload)

Run load test:

1
2
3
4
5
6
7
8
9
10
# Install locust
pip install locust

# Run test
locust -f locustfile.py --host https://agent.yourdomain.com --users 100 --spawn-rate 10

# Results show:
# - Requests per second
# - Response times (p50, p95, p99)
# - Failure rate


External Resources