Multi-Agent Orchestration Example
Complexity: Intermediate Time to Complete: 30 minutes Prerequisites: Complete Simple Agent example first
This example demonstrates the orchestrator pattern: a coordinator agent that delegates work to multiple specialized worker agents. This is one of the most common multi-agent patterns.
What You'll Learn
- How to make agent-to-agent calls using
call_agent() - Orchestrator pattern with worker agents
- Different authorization policies for different agents
- Running multiple agents in a system
- Error handling and fault tolerance
Architecture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
┌──────────────────────────────────────────────────────────────┐
│ Client │
└───────────────────────┬──────────────────────────────────────┘
│
│ "Process this document"
▼
┌─────────────────────────────┐
│ Orchestrator Agent │
│ │
│ Coordinates workflow: │
│ 1. Call Analyzer │
│ 2. Call Summarizer │
│ 3. Combine results │
└──────┬────────────┬─────────┘
│ │
┌────────┘ └────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ Analyzer Agent │ │ Summarizer Agent │
│ │ │ │
│ Analyzes: │ │ Summarizes: │
│ - Sentiment │ │ - Key points │
│ - Topics │ │ - Action items │
│ - Entities │ │ - Summary │
└──────────────────┘ └──────────────────┘
All communication uses:
- mTLS with SPIFFE identity
- OPA policy enforcement
- Audit logging
Sequence Diagram
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Client Orchestrator Analyzer Summarizer
│ │ │ │
│──Process Document────► │ │ │
│ │ │ │
│ │──Analyze────────────►│ │
│ │ │ │
│ │ (mTLS + OPA check) │ │
│ │ │ │
│ │◄─Analysis Results───│ │
│ │ │ │
│ │──Summarize──────────┼──────────────►│
│ │ │ │
│ │ (mTLS + OPA check) │
│ │ │ │
│ │◄─Summary────────────┼───────────────│
│ │ │ │
│ │ (Combine results) │ │
│ │ │ │
│◄─Combined Report───────│ │ │
│ │ │ │
Complete Code
Orchestrator Agent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# orchestrator_agent.py
"""
Orchestrator Agent - Coordinates document processing workflow.
This agent demonstrates:
- Making calls to other agents
- Error handling and retries
- Combining results from multiple agents
- Workflow orchestration
"""
import asyncio
from typing import Dict, Any, List
from agentweave import SecureAgent, capability
from agentweave.types import TaskResult, Message, TextPart, DataPart
from agentweave.exceptions import AgentCallError, AuthorizationError
class OrchestratorAgent(SecureAgent):
"""
Orchestrates document processing across multiple agents.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Configuration: SPIFFE IDs of worker agents
self.analyzer_id = "spiffe://agentweave.io/agent/analyzer"
self.summarizer_id = "spiffe://agentweave.io/agent/summarizer"
@capability("process_document")
async def process_document(
self,
document: str,
options: Dict[str, Any] = None
) -> TaskResult:
"""
Process a document through analysis and summarization.
Workflow:
1. Call Analyzer agent to extract insights
2. Call Summarizer agent to create summary
3. Combine results into comprehensive report
Args:
document: The document text to process
options: Processing options (analyze_sentiment, etc.)
Returns:
TaskResult with combined analysis and summary
"""
options = options or {}
self.logger.info(
"Starting document processing workflow",
extra={
"document_length": len(document),
"options": options
}
)
try:
# Step 1: Analyze document
analysis = await self._analyze_document(document, options)
# Step 2: Summarize document
summary = await self._summarize_document(document, options)
# Step 3: Combine results
report = self._create_report(document, analysis, summary)
self.logger.info("Document processing completed successfully")
return TaskResult(
status="completed",
messages=[
Message(
role="assistant",
parts=[
TextPart(text=report["text"]),
DataPart(data=report["data"])
]
)
],
artifacts=[
{
"type": "analysis",
"data": analysis
},
{
"type": "summary",
"data": summary
}
]
)
except AuthorizationError as e:
self.logger.error(f"Authorization failed: {e}")
return TaskResult(
status="failed",
error=f"Not authorized to call required agents: {e}"
)
except AgentCallError as e:
self.logger.error(f"Agent call failed: {e}")
return TaskResult(
status="failed",
error=f"Failed to process document: {e}"
)
async def _analyze_document(
self,
document: str,
options: Dict[str, Any]
) -> Dict[str, Any]:
"""
Call Analyzer agent to analyze document.
The SDK automatically:
- Verifies our identity with SPIRE
- Establishes mTLS with analyzer
- Checks OPA policy allows this call
- Logs the request
"""
self.logger.debug(f"Calling analyzer: {self.analyzer_id}")
result = await self.call_agent(
target=self.analyzer_id,
task_type="analyze",
payload={
"text": document,
"analyze_sentiment": options.get("analyze_sentiment", True),
"extract_topics": options.get("extract_topics", True),
"extract_entities": options.get("extract_entities", True)
},
timeout=30.0
)
if result.status != "completed":
raise AgentCallError(f"Analyzer failed: {result.error}")
return result.artifacts[0]["data"]
async def _summarize_document(
self,
document: str,
options: Dict[str, Any]
) -> Dict[str, Any]:
"""Call Summarizer agent to create summary."""
self.logger.debug(f"Calling summarizer: {self.summarizer_id}")
result = await self.call_agent(
target=self.summarizer_id,
task_type="summarize",
payload={
"text": document,
"max_length": options.get("summary_length", 200),
"include_key_points": True,
"include_action_items": True
},
timeout=30.0
)
if result.status != "completed":
raise AgentCallError(f"Summarizer failed: {result.error}")
return result.artifacts[0]["data"]
def _create_report(
self,
document: str,
analysis: Dict[str, Any],
summary: Dict[str, Any]
) -> Dict[str, Any]:
"""Combine analysis and summary into final report."""
report_text = f"""
Document Processing Report
==========================
Summary
-------
{summary['summary']}
Key Points
----------
{self._format_list(summary['key_points'])}
Sentiment Analysis
------------------
Overall: {analysis['sentiment']['overall']} ({analysis['sentiment']['score']:.2f})
Topics Identified
-----------------
{self._format_list(analysis['topics'])}
Entities Extracted
------------------
{self._format_entities(analysis['entities'])}
Action Items
------------
{self._format_list(summary.get('action_items', []))}
"""
return {
"text": report_text.strip(),
"data": {
"document_length": len(document),
"analysis": analysis,
"summary": summary,
"processed_at": self.context.request_time.isoformat()
}
}
@staticmethod
def _format_list(items: List[str]) -> str:
"""Format list items with bullets."""
return "\n".join(f"• {item}" for item in items)
@staticmethod
def _format_entities(entities: Dict[str, List[str]]) -> str:
"""Format entity dictionary."""
lines = []
for entity_type, items in entities.items():
lines.append(f"\n{entity_type.title()}:")
lines.extend(f" • {item}" for item in items)
return "\n".join(lines)
async def main():
agent = OrchestratorAgent.from_config("config/orchestrator.yaml")
await agent.run()
if __name__ == "__main__":
asyncio.run(main())
Analyzer Agent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# analyzer_agent.py
"""
Analyzer Agent - Analyzes document content.
This worker agent performs:
- Sentiment analysis
- Topic extraction
- Named entity recognition
"""
import asyncio
from typing import Dict, Any, List
from agentweave import SecureAgent, capability, requires_peer
from agentweave.types import TaskResult, Message, DataPart
class AnalyzerAgent(SecureAgent):
"""Analyzes document content for insights."""
@capability("analyze")
@requires_peer("spiffe://agentweave.io/agent/orchestrator")
async def analyze(
self,
text: str,
analyze_sentiment: bool = True,
extract_topics: bool = True,
extract_entities: bool = True
) -> TaskResult:
"""
Analyze document content.
Note: @requires_peer decorator ensures only orchestrator can call this.
SDK enforces this before method runs.
"""
self.logger.info(
"Analyzing document",
extra={"text_length": len(text)}
)
result = {}
if analyze_sentiment:
result["sentiment"] = await self._analyze_sentiment(text)
if extract_topics:
result["topics"] = await self._extract_topics(text)
if extract_entities:
result["entities"] = await self._extract_entities(text)
return TaskResult(
status="completed",
messages=[
Message(
role="assistant",
parts=[
DataPart(data={
"analysis": result,
"analyzer_version": "1.0.0"
})
]
)
],
artifacts=[
{
"type": "analysis_results",
"data": result
}
]
)
async def _analyze_sentiment(self, text: str) -> Dict[str, Any]:
"""
Analyze sentiment.
In production, this would use an LLM or ML model.
For demo, we'll use a simple heuristic.
"""
# Simple keyword-based sentiment (replace with real model)
positive_words = ["good", "great", "excellent", "positive", "success"]
negative_words = ["bad", "poor", "negative", "fail", "problem"]
text_lower = text.lower()
pos_count = sum(word in text_lower for word in positive_words)
neg_count = sum(word in text_lower for word in negative_words)
total = pos_count + neg_count
if total == 0:
return {"overall": "neutral", "score": 0.0}
score = (pos_count - neg_count) / total
if score > 0.2:
overall = "positive"
elif score < -0.2:
overall = "negative"
else:
overall = "neutral"
return {
"overall": overall,
"score": score,
"positive_indicators": pos_count,
"negative_indicators": neg_count
}
async def _extract_topics(self, text: str) -> List[str]:
"""Extract main topics (simplified for demo)."""
# In production, use topic modeling (LDA, etc.)
topics = []
keywords = {
"technology": ["software", "hardware", "computer", "digital"],
"business": ["revenue", "profit", "market", "customer"],
"science": ["research", "study", "experiment", "data"],
}
text_lower = text.lower()
for topic, words in keywords.items():
if any(word in text_lower for word in words):
topics.append(topic)
return topics or ["general"]
async def _extract_entities(self, text: str) -> Dict[str, List[str]]:
"""Extract named entities (simplified for demo)."""
# In production, use NER model (spaCy, transformers, etc.)
return {
"organizations": ["AgentWeave", "ACME Corp"], # Placeholder
"locations": ["San Francisco", "New York"],
"technologies": ["Python", "Kubernetes"]
}
async def main():
agent = AnalyzerAgent.from_config("config/analyzer.yaml")
await agent.run()
if __name__ == "__main__":
asyncio.run(main())
Summarizer Agent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# summarizer_agent.py
"""
Summarizer Agent - Creates document summaries.
"""
import asyncio
from typing import Dict, Any, List
from agentweave import SecureAgent, capability, requires_peer
from agentweave.types import TaskResult, Message, TextPart, DataPart
class SummarizerAgent(SecureAgent):
"""Creates concise document summaries."""
@capability("summarize")
@requires_peer("spiffe://agentweave.io/agent/orchestrator")
async def summarize(
self,
text: str,
max_length: int = 200,
include_key_points: bool = True,
include_action_items: bool = True
) -> TaskResult:
"""Create document summary."""
self.logger.info(
"Summarizing document",
extra={
"text_length": len(text),
"max_length": max_length
}
)
# Generate summary (simplified for demo)
summary_text = await self._generate_summary(text, max_length)
result = {
"summary": summary_text
}
if include_key_points:
result["key_points"] = await self._extract_key_points(text)
if include_action_items:
result["action_items"] = await self._extract_action_items(text)
return TaskResult(
status="completed",
messages=[
Message(
role="assistant",
parts=[
TextPart(text=summary_text),
DataPart(data=result)
]
)
],
artifacts=[
{
"type": "summary_results",
"data": result
}
]
)
async def _generate_summary(self, text: str, max_length: int) -> str:
"""
Generate summary.
In production, use LLM for summarization.
"""
# Simple extractive summary (first sentences)
sentences = text.split(". ")
summary = sentences[0]
for sentence in sentences[1:]:
if len(summary) + len(sentence) > max_length:
break
summary += ". " + sentence
return summary.strip()
async def _extract_key_points(self, text: str) -> List[str]:
"""Extract key points (simplified)."""
# In production, use LLM to identify key points
sentences = text.split(". ")
return sentences[:3] if len(sentences) >= 3 else sentences
async def _extract_action_items(self, text: str) -> List[str]:
"""Extract action items (simplified)."""
# In production, use LLM to identify action items
action_verbs = ["implement", "create", "develop", "deploy", "test"]
action_items = []
for sentence in text.split(". "):
if any(verb in sentence.lower() for verb in action_verbs):
action_items.append(sentence.strip())
return action_items
async def main():
agent = SummarizerAgent.from_config("config/summarizer.yaml")
await agent.run()
if __name__ == "__main__":
asyncio.run(main())
Configuration Files
Orchestrator Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# config/orchestrator.yaml
agent:
name: "orchestrator"
trust_domain: "agentweave.io"
description: "Orchestrates document processing workflow"
capabilities:
- name: "process_document"
description: "Process document through analysis and summarization"
input_modes: ["text/plain", "application/json"]
output_modes: ["text/plain", "application/json"]
identity:
provider: "spiffe"
spiffe_endpoint: "unix:///run/spire/sockets/agent.sock"
allowed_trust_domains:
- "agentweave.io"
authorization:
provider: "opa"
opa_endpoint: "http://opa:8181"
policy_path: "agentweave/authz/orchestrator"
default_action: "deny"
audit:
enabled: true
destination: "stdout"
transport:
tls_min_version: "1.3"
peer_verification: "strict"
connection_pool:
max_connections: 20
idle_timeout_seconds: 60
server:
host: "0.0.0.0"
port: 8443
protocol: "a2a"
observability:
metrics:
enabled: true
port: 9090
logging:
level: "INFO"
format: "json"
Analyzer Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# config/analyzer.yaml
agent:
name: "analyzer"
trust_domain: "agentweave.io"
description: "Analyzes document content for insights"
capabilities:
- name: "analyze"
description: "Perform sentiment, topic, and entity analysis"
input_modes: ["text/plain"]
output_modes: ["application/json"]
identity:
provider: "spiffe"
spiffe_endpoint: "unix:///run/spire/sockets/agent.sock"
allowed_trust_domains:
- "agentweave.io"
authorization:
provider: "opa"
opa_endpoint: "http://opa:8181"
policy_path: "agentweave/authz/analyzer"
default_action: "deny"
server:
host: "0.0.0.0"
port: 8444
protocol: "a2a"
Summarizer Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# config/summarizer.yaml
agent:
name: "summarizer"
trust_domain: "agentweave.io"
description: "Creates document summaries"
capabilities:
- name: "summarize"
description: "Generate summary with key points and action items"
input_modes: ["text/plain"]
output_modes: ["text/plain", "application/json"]
identity:
provider: "spiffe"
spiffe_endpoint: "unix:///run/spire/sockets/agent.sock"
allowed_trust_domains:
- "agentweave.io"
authorization:
provider: "opa"
opa_endpoint: "http://opa:8181"
policy_path: "agentweave/authz/summarizer"
default_action: "deny"
server:
host: "0.0.0.0"
port: 8445
protocol: "a2a"
Authorization Policies
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# config/policies/orchestrator_authz.rego
package agentweave.authz.orchestrator
import rego.v1
default allow := false
# Orchestrator can call analyzer
allow if {
input.caller_spiffe_id == "spiffe://agentweave.io/agent/orchestrator"
input.callee_spiffe_id == "spiffe://agentweave.io/agent/analyzer"
input.action == "analyze"
}
# Orchestrator can call summarizer
allow if {
input.caller_spiffe_id == "spiffe://agentweave.io/agent/orchestrator"
input.callee_spiffe_id == "spiffe://agentweave.io/agent/summarizer"
input.action == "summarize"
}
# Clients can call orchestrator
allow if {
startswith(input.caller_spiffe_id, "spiffe://agentweave.io/client/")
input.action == "process_document"
}
1
2
3
4
5
6
7
8
9
10
11
12
# config/policies/analyzer_authz.rego
package agentweave.authz.analyzer
import rego.v1
default allow := false
# Only orchestrator can call analyzer
allow if {
input.caller_spiffe_id == "spiffe://agentweave.io/agent/orchestrator"
input.action == "analyze"
}
1
2
3
4
5
6
7
8
9
10
11
12
# config/policies/summarizer_authz.rego
package agentweave.authz.summarizer
import rego.v1
default allow := false
# Only orchestrator can call summarizer
allow if {
input.caller_spiffe_id == "spiffe://agentweave.io/agent/orchestrator"
input.action == "summarize"
}
Docker Compose Setup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# docker-compose.yaml
version: '3.8'
services:
spire-server:
image: ghcr.io/spiffe/spire-server:1.9.0
hostname: spire-server
volumes:
- ./spire/server.conf:/opt/spire/conf/server/server.conf:ro
- spire-server-data:/opt/spire/data
command: ["-config", "/opt/spire/conf/server/server.conf"]
networks:
- agentweave
spire-agent:
image: ghcr.io/spiffe/spire-agent:1.9.0
hostname: spire-agent
depends_on:
- spire-server
volumes:
- ./spire/agent.conf:/opt/spire/conf/agent/agent.conf:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
- spire-agent-socket:/run/spire/sockets
command: ["-config", "/opt/spire/conf/agent/agent.conf"]
networks:
- agentweave
opa:
image: openpolicyagent/opa:0.62.0
hostname: opa
volumes:
- ./config/policies:/policies:ro
command:
- "run"
- "--server"
- "--addr=0.0.0.0:8181"
- "/policies"
ports:
- "8181:8181"
networks:
- agentweave
orchestrator:
build:
context: .
dockerfile: Dockerfile.orchestrator
depends_on:
- spire-agent
- opa
- analyzer
- summarizer
volumes:
- spire-agent-socket:/run/spire/sockets:ro
- ./config:/etc/agentweave:ro
ports:
- "8443:8443"
- "9090:9090"
networks:
- agentweave
analyzer:
build:
context: .
dockerfile: Dockerfile.analyzer
depends_on:
- spire-agent
- opa
volumes:
- spire-agent-socket:/run/spire/sockets:ro
- ./config:/etc/agentweave:ro
ports:
- "8444:8444"
networks:
- agentweave
summarizer:
build:
context: .
dockerfile: Dockerfile.summarizer
depends_on:
- spire-agent
- opa
volumes:
- spire-agent-socket:/run/spire/sockets:ro
- ./config:/etc/agentweave:ro
ports:
- "8445:8445"
networks:
- agentweave
volumes:
spire-server-data:
spire-agent-socket:
networks:
agentweave:
driver: bridge
Running the Example
Step 1: Register Workloads with SPIRE
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Start infrastructure
docker-compose up -d spire-server spire-agent opa
# Register orchestrator
docker-compose exec spire-server \
/opt/spire/bin/spire-server entry create \
-spiffeID spiffe://agentweave.io/agent/orchestrator \
-parentID spiffe://agentweave.io/agent/spire-agent \
-selector docker:label:com.docker.compose.service:orchestrator
# Register analyzer
docker-compose exec spire-server \
/opt/spire/bin/spire-server entry create \
-spiffeID spiffe://agentweave.io/agent/analyzer \
-parentID spiffe://agentweave.io/agent/spire-agent \
-selector docker:label:com.docker.compose.service:analyzer
# Register summarizer
docker-compose exec spire-server \
/opt/spire/bin/spire-server entry create \
-spiffeID spiffe://agentweave.io/agent/summarizer \
-parentID spiffe://agentweave.io/agent/spire-agent \
-selector docker:label:com.docker.compose.service:summarizer
Step 2: Start All Agents
1
docker-compose up -d
Step 3: Test the System
1
2
3
4
5
6
7
8
9
10
11
12
# Process a document
agentweave call \
--target spiffe://agentweave.io/agent/orchestrator \
--capability process_document \
--data '{
"document": "Our latest software release has been excellent. Customers report positive experiences with the new features. We need to implement additional security measures and develop the mobile app. The team should deploy to production next week.",
"options": {
"analyze_sentiment": true,
"extract_topics": true,
"summary_length": 150
}
}'
Expected Output
1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"status": "completed",
"messages": [
{
"role": "assistant",
"parts": [
{
"type": "text",
"text": "Document Processing Report\n==========================\n\nSummary\n-------\nOur latest software release has been excellent. Customers report positive experiences with the new features.\n\nKey Points\n----------\n• Our latest software release has been excellent\n• Customers report positive experiences with the new features\n• We need to implement additional security measures and develop the mobile app\n\nSentiment Analysis\n------------------\nOverall: positive (0.67)\n\nTopics Identified\n-----------------\n• technology\n• business\n\nEntities Extracted\n------------------\nOrganizations:\n • AgentWeave\n • ACME Corp\n\nAction Items\n------------\n• implement additional security measures\n• develop the mobile app\n• deploy to production next week"
}
]
}
]
}
Key Takeaways
Inter-Agent Communication
The orchestrator calls workers with call_agent():
1
2
3
4
5
result = await self.call_agent(
target="spiffe://agentweave.io/agent/analyzer",
task_type="analyze",
payload={...}
)
SDK automatically handles:
- mTLS connection establishment
- OPA policy check (can orchestrator call analyzer?)
- Request serialization (A2A protocol)
- Response deserialization
- Error propagation
Access Control
Each agent has different policies:
- Orchestrator: Can call analyzer and summarizer
- Analyzer: Only orchestrator can call
- Summarizer: Only orchestrator can call
- Clients: Can call orchestrator
This is enforced by OPA before any code runs.
Error Handling
1
2
3
4
5
6
try:
result = await self.call_agent(...)
except AuthorizationError:
# Caller not allowed
except AgentCallError:
# Call failed or timed out
Next Steps
- Add More Workers: Create specialized agents for different tasks
- Parallel Execution: Call multiple agents concurrently with
asyncio.gather() - Workflow Engine: See Data Pipeline Example for complex workflows
- Federation: See Federated Example for cross-domain orchestration
Complete Code: GitHub Repository