A2A Protocol Reference
The Agent-to-Agent (A2A) Protocol is an open standard for AI agent communication. This guide explains how the AgentWeave SDK implements A2A.
Overview
A2A provides:
- Standardized communication: Agents from different frameworks can interoperate
- Task-based model: Request/response with long-running task support
- Capability discovery: Agents advertise what they can do via Agent Cards
- Framework agnostic: Works with any agent framework (LangGraph, CrewAI, ADK, etc.)
Specification: https://a2a-protocol.org/latest/
Core Concepts
Agent Card
An Agent Card is JSON metadata describing an agent's capabilities and endpoints.
Location: /.well-known/agent.json
Example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
"name": "data-processor",
"description": "Processes structured data",
"url": "https://data-processor.agentweaves.svc.cluster.local:8443",
"version": "1.0.0",
"capabilities": [
{
"name": "process",
"description": "Process data records",
"input_modes": ["application/json"],
"output_modes": ["application/json"],
"parameters": {
"type": "object",
"properties": {
"data": {
"type": "array",
"items": {"type": "object"}
},
"options": {
"type": "object",
"properties": {
"validate": {"type": "boolean"},
"transform": {"type": "string"}
}
}
},
"required": ["data"]
}
}
],
"authentication": {
"schemes": [
{
"type": "mtls",
"description": "SPIFFE mTLS authentication"
}
]
},
"extensions": {
"spiffe_id": "spiffe://agentweave.io/agent/data-processor/prod"
}
}
Task
A task represents a unit of work with a lifecycle:
1
2
submitted → working → completed
↘ failed
Task Structure:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
"id": "task-123",
"type": "process",
"state": "submitted",
"messages": [
{
"role": "user",
"parts": [
{
"type": "data",
"data": {
"records": [...]
}
}
]
}
]
}
Message
Messages contain the actual data being exchanged. Each message has:
- Role:
user(requester) orassistant(agent) - Parts: Content pieces (text, data, files)
Part Types:
text: Plain text or markdowndata: Structured data (JSON)file: File reference with URItool_use: Tool invocationtool_result: Tool execution result
Artifact
Artifacts are the outputs produced by task completion:
1
2
3
4
5
6
7
8
{
"type": "processed_data",
"name": "results.json",
"data": {
"processed": true,
"record_count": 1000
}
}
SDK Implementation
Defining Capabilities
Use the @capability decorator:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from agentweave import SecureAgent, capability
from agentweave.types import TaskResult
class DataProcessor(SecureAgent):
@capability("process")
async def process(self, data: list[dict], options: dict = None) -> TaskResult:
"""
Process data records.
The decorator automatically:
- Registers this in the Agent Card
- Validates input against type hints
- Wraps response in A2A format
"""
processed = [self._transform(record) for record in data]
return TaskResult(
status="completed",
artifacts=[{
"type": "processed_data",
"data": {"records": processed}
}]
)
The SDK automatically generates the Agent Card from your capabilities.
Calling Other Agents
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class OrchestratorAgent(SecureAgent):
async def coordinate(self):
# Discover processor agent
processor_card = await self.discover_agent(
"spiffe://agentweave.io/agent/data-processor/prod"
)
# Check if it has the capability we need
if not processor_card.has_capability("process"):
raise ValueError("Processor doesn't support 'process'")
# Call the agent
result = await self.call_agent(
target="spiffe://agentweave.io/agent/data-processor/prod",
task_type="process",
payload={
"data": [{"id": 1, "value": "test"}],
"options": {"validate": True}
}
)
# Extract result
processed_data = result.artifacts[0]["data"]
return processed_data
A2A Endpoints
The SDK automatically implements these A2A endpoints:
Discovery Endpoint
GET /.well-known/agent.json
Returns the Agent Card.
1
curl https://agent.example.com/.well-known/agent.json
Task Submission
POST /.well-known/a2a/tasks/send
Submit a new task for execution.
Request:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
"jsonrpc": "2.0",
"id": "req-123",
"method": "tasks.send",
"params": {
"task": {
"type": "process",
"messages": [
{
"role": "user",
"parts": [
{
"type": "data",
"data": {
"data": [{"id": 1}],
"options": {"validate": true}
}
}
]
}
]
}
}
}
Response (synchronous):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
"jsonrpc": "2.0",
"id": "req-123",
"result": {
"task": {
"id": "task-456",
"type": "process",
"state": "completed",
"messages": [
{
"role": "user",
"parts": [...]
},
{
"role": "assistant",
"parts": [
{
"type": "data",
"data": {
"processed": true
}
}
]
}
],
"artifacts": [
{
"type": "processed_data",
"data": {"records": [...]}
}
]
}
}
}
Task Status
GET /.well-known/a2a/tasks/{task_id}
Get status of a long-running task.
1
curl https://agent.example.com/.well-known/a2a/tasks/task-456
Response:
1
2
3
4
5
6
7
8
9
{
"task": {
"id": "task-456",
"type": "process",
"state": "working",
"progress": 0.45,
"messages": [...]
}
}
Task Streaming
GET /.well-known/a2a/tasks/{task_id}/stream
Stream task updates via Server-Sent Events (SSE).
1
curl -N https://agent.example.com/.well-known/a2a/tasks/task-456/stream
Response:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
event: state_change
data: {"state": "working", "progress": 0.2}
event: state_change
data: {"state": "working", "progress": 0.5}
event: state_change
data: {"state": "completed"}
event: message
data: {"role": "assistant", "parts": [...]}
event: artifact
data: {"type": "processed_data", "data": {...}}
Long-Running Tasks
For tasks that take more than a few seconds:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from agentweave import SecureAgent, capability
from agentweave.types import TaskResult, TaskProgress
class BatchProcessor(SecureAgent):
@capability("batch_process")
async def batch_process(self, items: list[dict]) -> TaskResult:
"""Process a large batch with progress updates."""
total = len(items)
processed = []
for idx, item in enumerate(items):
# Process item
result = await self._process_item(item)
processed.append(result)
# Report progress
await self.report_progress(
TaskProgress(
state="working",
progress=idx / total,
message=f"Processed {idx}/{total} items"
)
)
return TaskResult(
status="completed",
artifacts=[{
"type": "batch_results",
"data": {"items": processed}
}]
)
Callers can stream progress:
1
2
3
4
5
6
7
8
9
async for progress in orchestrator.call_agent_streaming(
target="spiffe://agentweave.io/agent/batch-processor/prod",
task_type="batch_process",
payload={"items": large_batch}
):
print(f"Progress: {progress.progress * 100:.0f}% - {progress.message}")
# Final result
result = progress.result
Message Parts
Text Part
1
2
3
4
{
"type": "text",
"text": "Process these records with validation enabled"
}
1
2
3
4
5
6
TaskResult(
artifacts=[{
"type": "text",
"text": "Processing completed successfully"
}]
)
Data Part
1
2
3
4
5
6
7
{
"type": "data",
"data": {
"records": [...],
"metadata": {...}
}
}
1
2
3
4
5
6
TaskResult(
artifacts=[{
"type": "data",
"data": {"results": processed_records}
}]
)
File Part
1
2
3
4
5
6
{
"type": "file",
"uri": "s3://bucket/results.csv",
"mime_type": "text/csv",
"size": 1024000
}
1
2
3
4
5
6
7
TaskResult(
artifacts=[{
"type": "file",
"uri": "s3://results/output.csv",
"mime_type": "text/csv"
}]
)
Tool Use (Advanced)
For agents that use tools:
1
2
3
4
5
6
7
8
9
{
"type": "tool_use",
"tool_use_id": "tool-123",
"name": "search_database",
"input": {
"query": "customer records",
"limit": 100
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Agent using a tool
@capability("search")
async def search(self, query: str) -> TaskResult:
# Invoke tool
tool_result = await self.use_tool(
"database_search",
{"query": query}
)
return TaskResult(
status="completed",
artifacts=[{
"type": "tool_result",
"tool_use_id": tool_result.id,
"result": tool_result.data
}]
)
Error Handling
Error Response
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"jsonrpc": "2.0",
"id": "req-123",
"error": {
"code": -32000,
"message": "Task execution failed",
"data": {
"task": {
"id": "task-456",
"type": "process",
"state": "failed",
"error": {
"type": "ValidationError",
"message": "Invalid data format",
"details": {...}
}
}
}
}
}
In SDK
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from agentweave import SecureAgent, capability
from agentweave.types import TaskResult, TaskError
class Processor(SecureAgent):
@capability("process")
async def process(self, data: list[dict]) -> TaskResult:
try:
result = await self._process(data)
return TaskResult(
status="completed",
artifacts=[{"type": "data", "data": result}]
)
except ValidationError as e:
return TaskResult(
status="failed",
error=TaskError(
type="ValidationError",
message=str(e),
details={"field": e.field}
)
)
Authentication
HVS SDK extends A2A with SPIFFE mTLS authentication.
Agent Card Extension:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"authentication": {
"schemes": [
{
"type": "mtls",
"description": "SPIFFE mTLS authentication"
}
]
},
"extensions": {
"spiffe_id": "spiffe://agentweave.io/agent/data-processor/prod",
"allowed_callers": [
"spiffe://agentweave.io/agent/*"
]
}
}
All A2A requests must:
- Use HTTPS with mTLS
- Present valid SPIFFE SVID
- Pass OPA authorization check
Interoperability
Calling Non-HVS Agents
The SDK can call any A2A-compliant agent:
1
2
3
4
5
6
7
8
# Call Google ADK agent
result = await self.call_agent(
target="https://adk-agent.example.com",
task_type="summarize",
payload={"text": "Long document..."},
auth_scheme="oauth2", # Instead of mTLS
auth_token=oauth_token
)
Being Called by Non-HVS Agents
Configure optional OAuth2 for external callers:
1
2
3
4
5
6
7
8
9
server:
authentication:
schemes:
- type: "mtls" # For HVS agents
required: true
- type: "oauth2" # For external agents
required: false
issuer: "https://auth.example.com"
audience: "agentweaves"
1
2
3
4
5
6
7
8
class Processor(SecureAgent):
@capability("process")
@requires_auth("mtls", "oauth2") # Accept either
async def process(self, data: list[dict]) -> TaskResult:
# Check caller type
if self.current_request_context.auth_type == "oauth2":
# Different authorization logic
pass
Testing A2A Integration
Manual Testing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Get Agent Card
curl https://agent.example.com/.well-known/agent.json
# Submit task (with mTLS)
curl -X POST https://agent.example.com/.well-known/a2a/tasks/send \
--cert client.crt \
--key client.key \
--cacert ca.crt \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"id": "test-1",
"method": "tasks.send",
"params": {
"task": {
"type": "process",
"messages": [{
"role": "user",
"parts": [{
"type": "data",
"data": {"test": "data"}
}]
}]
}
}
}'
Automated Testing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import pytest
from agentweave.testing import A2ATestClient
@pytest.fixture
async def agent_client():
async with A2ATestClient("http://localhost:8443") as client:
yield client
async def test_process_capability(agent_client):
# Get Agent Card
card = await agent_client.get_agent_card()
assert "process" in card.capability_names
# Submit task
result = await agent_client.send_task(
task_type="process",
payload={"data": [{"id": 1}]}
)
assert result.status == "completed"
assert len(result.artifacts) > 0
A2A vs gRPC
The SDK supports both A2A (default) and gRPC:
A2A:
- JSON-based (human-readable)
- HTTP/2 with Server-Sent Events
- Easy debugging with curl
- Interoperable with any framework
gRPC:
- Protobuf-based (efficient)
- Native streaming
- Better performance
- Requires .proto definitions
Choose A2A for interoperability, gRPC for performance.
Reference Implementation
See examples/multi_agent/ for complete working examples of:
- Capability definition
- Agent-to-agent calls
- Long-running tasks with progress
- Error handling
- Discovery and invocation
Specification Compliance
The HVS SDK implements:
- A2A Protocol v1.0
- Agent Card format
- Task lifecycle
- Message/Part/Artifact structures
- JSON-RPC 2.0 transport
Extensions:
- SPIFFE mTLS authentication
- OPA authorization
- Streaming progress updates