Microservices Pattern Example

Complexity: Intermediate Time to Complete: 45 minutes Prerequisites: Understanding of microservices architecture

This example demonstrates converting traditional HTTP/REST microservices to secure AgentWeave agents. We'll build an e-commerce system with User, Order, and Payment services, comparing the traditional approach with the AgentWeave pattern.

What You'll Learn

  • Converting microservices to secure agents
  • Service-to-service communication patterns
  • Comparison with service mesh (Istio, Linkerd)
  • API gateway agent pattern
  • Service discovery
  • Benefits of agent-based architecture

Traditional vs AgentWeave Architecture

Traditional Microservices (Without Service Mesh)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
┌─────────────┐
│   Client    │
└──────┬──────┘
       │ HTTPS
       ▼
┌─────────────┐
│ API Gateway │
└──────┬──────┘
       │ HTTP (insecure)
       ├────────────┬───────────┬─────────────┐
       │            │           │             │
       ▼            ▼           ▼             ▼
  ┌────────┐  ┌─────────┐  ┌────────┐  ┌──────────┐
  │  User  │  │  Order  │  │Payment │  │Inventory │
  │Service │  │ Service │  │Service │  │ Service  │
  └────────┘  └─────────┘  └────────┘  └──────────┘

Issues:
- No mutual TLS between services
- Manual auth token passing
- No fine-grained authorization
- Poor observability
- Complex service discovery

With Service Mesh (Istio)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
┌─────────────┐
│   Client    │
└──────┬──────┘
       │ HTTPS
       ▼
┌─────────────┐
│ API Gateway │
└──────┬──────┘
       │
       ├────────────┬───────────┬─────────────┐
       │            │           │             │
       ▼            ▼           ▼             ▼
  ┌────────┐  ┌─────────┐  ┌────────┐  ┌──────────┐
  │  User  │  │  Order  │  │Payment │  │Inventory │
  │+Envoy  │  │ +Envoy  │  │+Envoy  │  │  +Envoy  │
  └────────┘  └─────────┘  └────────┘  └──────────┘

Better but:
- Complex Envoy configuration
- Separate control plane (Istiod)
- Policy in Envoy config, not code
- Still application-level auth issues

With AgentWeave

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
┌─────────────┐
│   Client    │
└──────┬──────┘
       │ mTLS (SPIFFE)
       ▼
┌─────────────────┐
│  Gateway Agent  │
│  (SPIFFE ID)    │
└────────┬────────┘
         │ All calls use:
         │ - mTLS with SPIFFE
         │ - OPA authorization
         │ - A2A protocol
         ├────────────┬───────────┬─────────────┐
         │            │           │             │
         ▼            ▼           ▼             ▼
    ┌────────┐  ┌─────────┐  ┌────────┐  ┌──────────┐
    │  User  │  │  Order  │  │Payment │  │Inventory │
    │ Agent  │  │  Agent  │  │ Agent  │  │  Agent   │
    └────────┘  └─────────┘  └────────┘  └──────────┘

Benefits:
- No sidecar needed
- Identity + authorization in SDK
- Policy as code (Rego)
- Built-in observability
- Simple service discovery

Scenario: E-Commerce System

Services (Agents):

  • User Agent: User registration, authentication, profile management
  • Order Agent: Create/manage orders, order history
  • Payment Agent: Process payments, refunds
  • Inventory Agent: Stock management, availability checks
  • Gateway Agent: Public API, request routing

Complete Code

User Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# user_agent.py
"""
User Agent - Manages user accounts and authentication.

Traditional Equivalent: User Microservice
Port: 8080 (HTTP) → 8443 (HTTPS + mTLS)
"""

import asyncio
from typing import Dict, Any, Optional
from pydantic import BaseModel, EmailStr

from agentweave import SecureAgent, capability
from agentweave.types import TaskResult, Message, DataPart


class User(BaseModel):
    """User model."""
    user_id: str
    email: EmailStr
    name: str
    tier: str = "free"  # free, premium, enterprise


class UserAgent(SecureAgent):
    """Manages user accounts."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._users: Dict[str, User] = {}  # In production, use database

    @capability("create_user")
    async def create_user(
        self,
        email: str,
        name: str,
        tier: str = "free"
    ) -> TaskResult:
        """
        Create new user account.

        In traditional microservice:
        - POST /api/v1/users
        - Manual JWT validation
        - Application-level authorization

        With AgentWeave:
        - Caller identity verified by SDK (SPIFFE)
        - OPA policy checked automatically
        - No manual auth code needed
        """
        import uuid

        user_id = str(uuid.uuid4())
        user = User(
            user_id=user_id,
            email=email,
            name=name,
            tier=tier
        )

        self._users[user_id] = user

        self.logger.info(
            "User created",
            extra={
                "user_id": user_id,
                "caller": self.context.caller_spiffe_id
            }
        )

        return TaskResult(
            status="completed",
            messages=[Message(
                role="assistant",
                parts=[DataPart(data=user.dict())]
            )]
        )

    @capability("get_user")
    async def get_user(self, user_id: str) -> TaskResult:
        """
        Get user by ID.

        Traditional: GET /api/v1/users/{user_id}
        """
        user = self._users.get(user_id)

        if not user:
            return TaskResult(
                status="failed",
                error=f"User {user_id} not found"
            )

        return TaskResult(
            status="completed",
            messages=[Message(
                role="assistant",
                parts=[DataPart(data=user.dict())]
            )]
        )

    @capability("update_tier")
    async def update_tier(
        self,
        user_id: str,
        new_tier: str
    ) -> TaskResult:
        """
        Update user tier (admin only).

        Traditional: PATCH /api/v1/users/{user_id}/tier
        Required: Admin JWT token

        AgentWeave: OPA policy ensures only admin agents can call this
        """
        user = self._users.get(user_id)

        if not user:
            return TaskResult(
                status="failed",
                error=f"User {user_id} not found"
            )

        user.tier = new_tier
        self._users[user_id] = user

        self.logger.info(
            "User tier updated",
            extra={
                "user_id": user_id,
                "new_tier": new_tier,
                "caller": self.context.caller_spiffe_id
            }
        )

        return TaskResult(
            status="completed",
            messages=[Message(
                role="assistant",
                parts=[DataPart(data=user.dict())]
            )]
        )


async def main():
    agent = UserAgent.from_config("config/user.yaml")
    await agent.run()


if __name__ == "__main__":
    asyncio.run(main())

Order Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# order_agent.py
"""
Order Agent - Manages customer orders.

Calls: User Agent, Payment Agent, Inventory Agent
"""

import asyncio
from typing import Dict, Any, List
from decimal import Decimal
from datetime import datetime

from agentweave import SecureAgent, capability
from agentweave.types import TaskResult, Message, DataPart
from agentweave.exceptions import AgentCallError


class OrderAgent(SecureAgent):
    """Manages customer orders."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Agent dependencies
        self.user_agent = "spiffe://agentweave.io/agent/user"
        self.payment_agent = "spiffe://agentweave.io/agent/payment"
        self.inventory_agent = "spiffe://agentweave.io/agent/inventory"

        self._orders: Dict[str, Dict] = {}

    @capability("create_order")
    async def create_order(
        self,
        user_id: str,
        items: List[Dict[str, Any]],
        payment_method: str
    ) -> TaskResult:
        """
        Create new order.

        Workflow:
        1. Verify user exists (call User Agent)
        2. Check inventory (call Inventory Agent)
        3. Process payment (call Payment Agent)
        4. Create order
        5. Update inventory

        Traditional microservices:
        - 5 HTTP calls with manual auth
        - Complex error handling
        - No built-in retries

        AgentWeave:
        - 5 agent calls with automatic mTLS
        - OPA policies enforce permissions
        - Built-in retries, circuit breakers
        - Distributed tracing included
        """
        import uuid

        order_id = str(uuid.uuid4())

        self.logger.info(
            "Creating order",
            extra={
                "order_id": order_id,
                "user_id": user_id,
                "items_count": len(items)
            }
        )

        try:
            # Step 1: Verify user
            user_result = await self.call_agent(
                target=self.user_agent,
                task_type="get_user",
                payload={"user_id": user_id}
            )

            if user_result.status != "completed":
                raise AgentCallError(f"User not found: {user_id}")

            user = user_result.artifacts[0]["data"]

            # Step 2: Check inventory availability
            inventory_result = await self.call_agent(
                target=self.inventory_agent,
                task_type="check_availability",
                payload={"items": items}
            )

            if inventory_result.status != "completed":
                raise AgentCallError("Items not available")

            # Step 3: Calculate total
            total = self._calculate_total(items)

            # Step 4: Process payment
            payment_result = await self.call_agent(
                target=self.payment_agent,
                task_type="process_payment",
                payload={
                    "user_id": user_id,
                    "amount": str(total),
                    "payment_method": payment_method,
                    "order_id": order_id
                }
            )

            if payment_result.status != "completed":
                raise AgentCallError("Payment failed")

            payment_data = payment_result.artifacts[0]["data"]

            # Step 5: Reserve inventory
            await self.call_agent(
                target=self.inventory_agent,
                task_type="reserve_items",
                payload={"order_id": order_id, "items": items}
            )

            # Step 6: Create order record
            order = {
                "order_id": order_id,
                "user_id": user_id,
                "items": items,
                "total": str(total),
                "payment_method": payment_method,
                "transaction_id": payment_data["transaction_id"],
                "status": "confirmed",
                "created_at": datetime.utcnow().isoformat()
            }

            self._orders[order_id] = order

            return TaskResult(
                status="completed",
                messages=[Message(
                    role="assistant",
                    parts=[DataPart(data=order)]
                )]
            )

        except AgentCallError as e:
            self.logger.error(f"Order creation failed: {e}")
            # In production, implement compensating transactions
            return TaskResult(
                status="failed",
                error=f"Failed to create order: {e}"
            )

    @capability("get_order")
    async def get_order(self, order_id: str) -> TaskResult:
        """Get order by ID."""
        order = self._orders.get(order_id)

        if not order:
            return TaskResult(
                status="failed",
                error=f"Order {order_id} not found"
            )

        return TaskResult(
            status="completed",
            messages=[Message(
                role="assistant",
                parts=[DataPart(data=order)]
            )]
        )

    def _calculate_total(self, items: List[Dict[str, Any]]) -> Decimal:
        """Calculate order total."""
        total = Decimal("0")
        for item in items:
            price = Decimal(str(item["price"]))
            quantity = item["quantity"]
            total += price * quantity
        return total


async def main():
    agent = OrderAgent.from_config("config/order.yaml")
    await agent.run()


if __name__ == "__main__":
    asyncio.run(main())

Gateway Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# gateway_agent.py
"""
Gateway Agent - Public-facing API gateway.

Traditional: Kong, NGINX, AWS API Gateway
AgentWeave: Secure agent with routing logic
"""

import asyncio
from typing import Dict, Any

from agentweave import SecureAgent, capability
from agentweave.types import TaskResult
from agentweave.exceptions import AgentCallError


class GatewayAgent(SecureAgent):
    """
    API Gateway implemented as an agent.

    Benefits over traditional gateway:
    - No separate gateway configuration
    - Routing logic in Python (testable)
    - Same security model as other agents
    - Built-in observability
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Service registry
        self.services = {
            "user": "spiffe://agentweave.io/agent/user",
            "order": "spiffe://agentweave.io/agent/order",
            "payment": "spiffe://agentweave.io/agent/payment",
            "inventory": "spiffe://agentweave.io/agent/inventory"
        }

    @capability("route")
    async def route(
        self,
        service: str,
        action: str,
        payload: Dict[str, Any]
    ) -> TaskResult:
        """
        Route request to appropriate service.

        Traditional API Gateway:
        - Complex YAML/JSON configuration
        - Limited routing logic
        - Manual auth integration

        AgentWeave Gateway:
        - Python routing logic
        - Automatic mTLS to services
        - Type-safe payloads
        """
        target = self.services.get(service)

        if not target:
            return TaskResult(
                status="failed",
                error=f"Service {service} not found"
            )

        self.logger.info(
            "Routing request",
            extra={
                "service": service,
                "action": action,
                "target": target
            }
        )

        try:
            result = await self.call_agent(
                target=target,
                task_type=action,
                payload=payload,
                timeout=30.0
            )

            return result

        except AgentCallError as e:
            self.logger.error(f"Routing failed: {e}")
            return TaskResult(
                status="failed",
                error=f"Service call failed: {e}"
            )

    @capability("health")
    async def health(self) -> TaskResult:
        """
        Health check all services.

        Returns status of all backend services.
        """
        results = {}

        for service_name, service_id in self.services.items():
            try:
                # Try to call health endpoint with short timeout
                result = await self.call_agent(
                    target=service_id,
                    task_type="health",
                    payload={},
                    timeout=5.0
                )
                results[service_name] = "healthy" if result.status == "completed" else "unhealthy"
            except Exception as e:
                results[service_name] = f"unhealthy: {e}"

        all_healthy = all(status == "healthy" for status in results.values())

        return TaskResult(
            status="completed" if all_healthy else "degraded",
            messages=[Message(
                role="assistant",
                parts=[DataPart(data={
                    "gateway": "healthy",
                    "services": results
                })]
            )]
        )


async def main():
    agent = GatewayAgent.from_config("config/gateway.yaml")
    await agent.run()


if __name__ == "__main__":
    asyncio.run(main())

Authorization Policies

Gateway Policy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# policies/gateway_authz.rego
package gateway.authz

import rego.v1

default allow := false

# Gateway can call any internal service
allow if {
    input.caller_spiffe_id == "spiffe://agentweave.io/agent/gateway"
    startswith(input.callee_spiffe_id, "spiffe://agentweave.io/agent/")
}

# External clients can call gateway
allow if {
    startswith(input.caller_spiffe_id, "spiffe://agentweave.io/client/")
    input.callee_spiffe_id == "spiffe://agentweave.io/agent/gateway"
    input.action in ["route", "health"]
}

Order Agent Policy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# policies/order_authz.rego
package order.authz

import rego.v1

default allow := false

# Gateway can call order agent
allow if {
    input.caller_spiffe_id == "spiffe://agentweave.io/agent/gateway"
    input.callee_spiffe_id == "spiffe://agentweave.io/agent/order"
    input.action in ["create_order", "get_order"]
}

# Order agent can call user, payment, inventory
allow if {
    input.caller_spiffe_id == "spiffe://agentweave.io/agent/order"
    input.callee_spiffe_id in [
        "spiffe://agentweave.io/agent/user",
        "spiffe://agentweave.io/agent/payment",
        "spiffe://agentweave.io/agent/inventory"
    ]
}

User Agent Policy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# policies/user_authz.rego
package user.authz

import rego.v1

default allow := false

# Gateway can call user agent for create/get
allow if {
    input.caller_spiffe_id == "spiffe://agentweave.io/agent/gateway"
    input.action in ["create_user", "get_user"]
}

# Order agent can get users
allow if {
    input.caller_spiffe_id == "spiffe://agentweave.io/agent/order"
    input.action == "get_user"
}

# Only admin agent can update tiers
allow if {
    input.caller_spiffe_id == "spiffe://agentweave.io/agent/admin"
    input.action == "update_tier"
}

Docker Compose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# docker-compose.yaml
version: '3.8'

services:
  # Infrastructure (SPIRE, OPA)
  spire-server:
    # ... (same as other examples)

  spire-agent:
    # ... (same as other examples)

  opa:
    # ... (same as other examples)

  # Database
  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: ecommerce
      POSTGRES_USER: app
      POSTGRES_PASSWORD: app

  # Service Agents
  gateway:
    build: .
    command: python gateway_agent.py
    ports:
      - "8443:8443"
    environment:
      - AGENTWEAVE_CONFIG=/config/gateway.yaml

  user:
    build: .
    command: python user_agent.py
    ports:
      - "8444:8443"
    environment:
      - AGENTWEAVE_CONFIG=/config/user.yaml

  order:
    build: .
    command: python order_agent.py
    ports:
      - "8445:8443"
    environment:
      - AGENTWEAVE_CONFIG=/config/order.yaml

  payment:
    build: .
    command: python payment_agent.py
    ports:
      - "8446:8443"
    environment:
      - AGENTWEAVE_CONFIG=/config/payment.yaml

  inventory:
    build: .
    command: python inventory_agent.py
    ports:
      - "8447:8443"
    environment:
      - AGENTWEAVE_CONFIG=/config/inventory.yaml

Comparison Table

Aspect Traditional Service Mesh AgentWeave
mTLS Manual (cert management) Automatic (via Envoy) Automatic (SPIFFE SDK)
Authorization Application code + JWT Envoy + external authz OPA + SDK (built-in)
Service Discovery DNS, Consul, etc. K8s service + Envoy SPIFFE ID (explicit)
Observability Manual instrumentation Envoy metrics + sidecar SDK built-in
Configuration Per-service Centralized YAML Code + Config
Sidecars None (insecure) or manual Required (Envoy) None (SDK handles it)
Policy Language Application code Envoy config Rego (testable)
Complexity Low (but insecure) High (Istio/Linkerd) Medium (SDK)

Running the Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# Start all services
docker-compose up -d

# Register all agents with SPIRE
for svc in gateway user order payment inventory; do
    docker-compose exec spire-server \
        /opt/spire/bin/spire-server entry create \
        -spiffeID spiffe://agentweave.io/agent/$svc \
        -parentID spiffe://agentweave.io/agent/spire-agent \
        -selector docker:label:com.docker.compose.service:$svc
done

# Test: Create user
agentweave call \
    --target spiffe://agentweave.io/agent/gateway \
    --capability route \
    --data '{
        "service": "user",
        "action": "create_user",
        "payload": {
            "email": "user@example.com",
            "name": "John Doe",
            "tier": "premium"
        }
    }'

# Test: Create order (calls user, payment, inventory)
agentweave call \
    --target spiffe://agentweave.io/agent/gateway \
    --capability route \
    --data '{
        "service": "order",
        "action": "create_order",
        "payload": {
            "user_id": "<user-id-from-above>",
            "items": [
                {"sku": "WIDGET-001", "quantity": 2, "price": "19.99"}
            ],
            "payment_method": "credit_card"
        }
    }'

Key Takeaways

No Sidecar Required

Traditional service mesh:

1
2
3
4
5
6
7
8
9
10
┌──────────────────┐
│  Order Service   │
│  ┌────────────┐  │
│  │   App      │  │
│  └─────┬──────┘  │
│        │         │
│  ┌─────▼──────┐  │
│  │   Envoy    │◄─┼─ Complexity
│  └────────────┘  │
└──────────────────┘

AgentWeave:

1
2
3
4
5
┌──────────────────┐
│  Order Agent     │
│  (SDK handles    │
│   mTLS, authz)   │
└──────────────────┘

Policy as Code

Service mesh (YAML):

1
2
3
4
5
6
7
8
9
10
11
12
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: order-policy
spec:
  selector:
    matchLabels:
      app: order
  rules:
  - from:
    - source:
        principals: ["cluster.local/ns/default/sa/gateway"]

AgentWeave (Rego):

1
2
3
4
allow if {
    input.caller_spiffe_id == "spiffe://agentweave.io/agent/gateway"
    input.action == "create_order"
}

Built-in Observability

No manual instrumentation needed:

1
2
3
4
5
6
# This automatically creates traces, metrics, logs
result = await self.call_agent(
    target=self.payment_agent,
    task_type="process_payment",
    payload={...}
)

Next Steps

  • Production Deployment: See Kubernetes Guide
  • Advanced Patterns: Saga pattern for distributed transactions
  • Monitoring: Set up Grafana dashboards for agent metrics
  • Security: Learn Security Best Practices

Complete Code: GitHub Repository