Communications Module API Reference
Communications Module API Reference
The agentweave.comms module provides A2A (Agent-to-Agent) protocol implementation based on JSON-RPC 2.0, enabling secure, standardized communication between agents.
Key Features:
- JSON-RPC 2.0 based protocol
- Agent capability advertisement via Agent Cards
- Task lifecycle management
- Server-Sent Events (SSE) for streaming updates
- Discovery mechanisms for finding agents
- mTLS authentication integration
A2A Protocol Components
AgentCard
Import: from agentweave.comms.a2a import AgentCard
Agent Card for capability advertisement, served at /.well-known/agent.json endpoint.
Constructor
1
2
3
4
5
6
7
8
9
AgentCard(
name: str,
description: str,
url: str,
version: str = "1.0.0",
capabilities: list[Capability] = [],
authentication: dict[str, Any] = {"schemes": []},
extensions: dict[str, Any] = {}
)
Parameters:
name(str): Agent namedescription(str): Agent descriptionurl(str): Agent base URL (must start with http:// or https://)version(str): Agent version (default: "1.0.0")capabilities(list[Capability]): List of agent capabilitiesauthentication(dict): Authentication configurationextensions(dict): Custom extensions (e.g., SPIFFE ID)
Raises:
ValueError: If URL format is invalid
Class Methods
from_config
1
2
3
4
5
6
7
8
9
10
11
@classmethod
def from_config(
cls,
name: str,
description: str,
url: str,
spiffe_id: str,
version: str = "1.0.0",
capabilities: list[Capability] | None = None,
auth_schemes: list[AuthScheme] | None = None
) -> AgentCard
Create an AgentCard from agent configuration with automatic SPIFFE authentication setup.
Parameters:
name(str): Agent namedescription(str): Agent descriptionurl(str): Agent base URLspiffe_id(str): SPIFFE ID for this agentversion(str): Agent versioncapabilities(list[Capability], optional): List of capabilitiesauth_schemes(list[AuthScheme], optional): Authentication schemes (defaults to SPIFFE)
Returns: AgentCard instance
from_json
1
2
@classmethod
def from_json(cls, json_str: str) -> AgentCard
Deserialize AgentCard from JSON string.
Parameters:
json_str(str): JSON string representation
Returns: AgentCard instance
from_dict
1
2
@classmethod
def from_dict(cls, data: dict[str, Any]) -> AgentCard
Create AgentCard from dictionary.
Parameters:
data(dict): Dictionary representation
Returns: AgentCard instance
Instance Methods
to_json
1
def to_json() -> str
Serialize AgentCard to JSON string.
Returns: JSON string representation
to_dict
1
def to_dict() -> dict[str, Any]
Convert AgentCard to dictionary.
Returns: Dictionary representation suitable for JSON serialization
get_spiffe_id
1
def get_spiffe_id() -> str | None
Extract SPIFFE ID from extensions.
Returns: SPIFFE ID if present, None otherwise
has_capability
1
def has_capability(capability_name: str) -> bool
Check if agent has a specific capability.
Parameters:
capability_name(str): Name of the capability to check
Returns: True if capability exists, False otherwise
get_capability
1
def get_capability(capability_name: str) -> Capability | None
Get a capability by name.
Parameters:
capability_name(str): Name of the capability to retrieve
Returns: Capability if found, None otherwise
Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from agentweave.comms.a2a import AgentCard, Capability
# Create agent card
card = AgentCard.from_config(
name="search-agent",
description="Semantic search agent",
url="https://search.example.com",
spiffe_id="spiffe://example.com/search-agent",
capabilities=[
Capability(
name="search",
description="Perform semantic search",
input_modes=["application/json"],
output_modes=["application/json"]
)
]
)
# Serialize to JSON
json_str = card.to_json()
# Check capabilities
if card.has_capability("search"):
cap = card.get_capability("search")
Capability
Import: from agentweave.comms.a2a import Capability
Represents an agent capability as per A2A protocol.
Constructor
1
2
3
4
5
6
7
Capability(
name: str,
description: str,
input_modes: list[str] = ["application/json"],
output_modes: list[str] = ["application/json"],
parameters: dict[str, Any] | None = None
)
Parameters:
name(str): Capability identifierdescription(str): Human-readable descriptioninput_modes(list[str]): Supported input content typesoutput_modes(list[str]): Supported output content typesparameters(dict, optional): JSON Schema for capability parameters
Methods
to_dict
1
def to_dict() -> dict[str, Any]
Convert to dictionary for JSON serialization.
AuthScheme
Import: from agentweave.comms.a2a import AuthScheme
Authentication scheme specification.
Constructor
1
2
3
4
5
AuthScheme(
type: str,
description: str | None = None,
metadata: dict[str, Any] = {}
)
Parameters:
type(str): Auth type (e.g., "spiffe", "oauth2", "api_key")description(str, optional): Scheme descriptionmetadata(dict): Scheme-specific metadata
A2AClient
Import: from agentweave.comms.a2a import A2AClient
Client for A2A protocol communication with remote agents.
Constructor
1
2
3
4
5
6
A2AClient(
identity_provider=None,
authz_enforcer=None,
timeout: float = 30.0,
max_retries: int = 3
)
Parameters:
identity_provider(optional): Identity provider for mTLSauthz_enforcer(optional): Authorization enforcertimeout(float): Default request timeout in seconds (default: 30.0)max_retries(int): Maximum number of retry attempts (default: 3)
Methods
discover_agent
1
async def discover_agent(url: str) -> AgentCard
Discover agent by fetching its agent card from /.well-known/agent.json.
Parameters:
url(str): Base URL of the agent
Returns: AgentCard for the discovered agent
Raises:
DiscoveryError: If agent card cannot be retrieved
send_task
1
2
3
4
5
6
7
8
async def send_task(
target_url: str,
task_type: str,
payload: dict[str, Any],
messages: list | None = None,
metadata: dict[str, Any] | None = None,
timeout: float | None = None
) -> Task
Send a task to a remote agent.
Parameters:
target_url(str): Base URL of target agenttask_type(str): Type of task/capability to invokepayload(dict): Task payload datamessages(list, optional): Optional message historymetadata(dict, optional): Optional task metadatatimeout(float, optional): Optional timeout override
Returns: Task with initial response
Raises:
TaskSubmissionError: If task submission fails
get_task_status
1
2
3
4
5
async def get_task_status(
target_url: str,
task_id: str,
timeout: float | None = None
) -> Task
Get the status of a task.
Parameters:
target_url(str): Base URL of target agenttask_id(str): ID of the tasktimeout(float, optional): Optional timeout override
Returns: Task with current status
Raises:
TaskStatusError: If status retrieval fails
stream_task_updates
1
2
3
4
5
async def stream_task_updates(
target_url: str,
task_id: str,
timeout: float | None = None
) -> AsyncIterator[Task]
Stream task updates via Server-Sent Events (SSE).
Parameters:
target_url(str): Base URL of target agenttask_id(str): ID of the tasktimeout(float, optional): Optional timeout override
Yields: Task updates as they occur
Raises:
TaskStatusError: If streaming fails
poll_until_complete
1
2
3
4
5
6
async def poll_until_complete(
target_url: str,
task_id: str,
poll_interval: float = 1.0,
max_wait: float | None = None
) -> Task
Poll task status until completion.
Parameters:
target_url(str): Base URL of target agenttask_id(str): ID of the taskpoll_interval(float): Seconds between polls (default: 1.0)max_wait(float, optional): Maximum seconds to wait (None = unlimited)
Returns: Completed task
Raises:
TaskStatusError: If polling failsTimeoutError: If max_wait is exceeded
cancel_task
1
2
3
4
5
async def cancel_task(
target_url: str,
task_id: str,
timeout: float | None = None
) -> Task
Cancel a running task.
Parameters:
target_url(str): Base URL of target agenttask_id(str): ID of the task to canceltimeout(float, optional): Optional timeout override
Returns: Cancelled task
Raises:
TaskStatusError: If cancellation fails
close
1
async def close() -> None
Close HTTP client and cleanup resources.
Context Manager Support
1
2
3
async with A2AClient() as client:
card = await client.discover_agent("https://agent.example.com")
task = await client.send_task(url, "search", {"query": "example"})
Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from agentweave.comms.a2a import A2AClient
async with A2AClient(timeout=30.0) as client:
# Discover agent
card = await client.discover_agent("https://search.example.com")
print(f"Found agent: {card.name}")
# Send task
task = await client.send_task(
target_url="https://search.example.com",
task_type="search",
payload={"query": "AI agents"}
)
# Poll until complete
completed = await client.poll_until_complete(
target_url="https://search.example.com",
task_id=task.id,
max_wait=60.0
)
print(f"Result: {completed.result}")
A2AServer
Import: from agentweave.comms.a2a import A2AServer
FastAPI-based A2A protocol server with JSON-RPC endpoints and SSE streaming.
Constructor
1
2
3
4
5
6
A2AServer(
agent_card: AgentCard,
task_manager: TaskManager | None = None,
authz_enforcer=None,
enable_cors: bool = True
)
Parameters:
agent_card(AgentCard): Agent card to servetask_manager(TaskManager, optional): Task manager instance (creates new if None)authz_enforcer(optional): Authorization enforcerenable_cors(bool): Enable CORS middleware (default: True)
Properties
app
1
2
@property
def app(self) -> FastAPI
Get FastAPI application instance.
Returns: FastAPI app
Methods
register_task_handler
1
2
3
4
def register_task_handler(
task_type: str,
handler: Callable[[Task], Awaitable[Task]]
) -> None
Register a handler for a task type.
Parameters:
task_type(str): Type of task (matches capability name)handler(Callable): Async function that takes a Task and returns updated Task
get_app
1
def get_app() -> FastAPI
Get FastAPI application instance.
Returns: FastAPI app for use with ASGI server
start
1
async def start(host: str = "0.0.0.0", port: int = 8443) -> None
Start the server (convenience method, use uvicorn in production).
Parameters:
host(str): Host to bind to (default: "0.0.0.0")port(int): Port to bind to (default: 8443)
Endpoints
The server automatically provides these endpoints:
GET /.well-known/agent.json- Serve agent cardPOST /rpc- Handle JSON-RPC 2.0 requeststask.send- Submit new tasktask.status- Get task statustask.cancel- Cancel task
GET /tasks/{task_id}/stream- Stream task updates via SSEGET /health- Health check endpoint
Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from agentweave.comms.a2a import A2AServer, AgentCard
from agentweave.comms.a2a import Task, TaskState
# Create agent card
card = AgentCard.from_config(
name="processor",
description="Data processor agent",
url="https://processor.example.com",
spiffe_id="spiffe://example.com/processor"
)
# Create server
server = A2AServer(agent_card=card)
# Register task handler
async def process_handler(task: Task) -> Task:
# Process task
result = process_data(task.payload)
task.mark_completed(result=result)
return task
server.register_task_handler("process", process_handler)
# Get FastAPI app for use with uvicorn
app = server.get_app()
# Run with: uvicorn main:app --host 0.0.0.0 --port 8443
Task
Import: from agentweave.comms.a2a import Task
A2A Task representing a unit of work with lifecycle management.
Constructor
1
2
3
4
5
6
7
8
9
10
11
12
13
Task(
id: str = <auto-generated>,
type: str,
state: TaskState = TaskState.PENDING,
payload: dict[str, Any] = {},
messages: list[Message] = [],
result: Any | None = None,
artifacts: list[Artifact] = [],
error: str | None = None,
created_at: datetime = <now>,
updated_at: datetime = <now>,
metadata: dict[str, Any] = {}
)
Parameters:
id(str): Unique task ID (auto-generated if not provided)type(str): Task type/capability namestate(TaskState): Current task state (default: PENDING)payload(dict): Task input payloadmessages(list[Message]): Message historyresult(Any, optional): Task resultartifacts(list[Artifact]): Output artifactserror(str, optional): Error message if failedcreated_at(datetime): Task creation timestampupdated_at(datetime): Last update timestampmetadata(dict): Additional task metadata
Methods
update_state
1
def update_state(new_state: TaskState, error: str | None = None) -> None
Update task state and timestamp.
Parameters:
new_state(TaskState): New task stateerror(str, optional): Error message if state is FAILED
add_message
1
def add_message(role: str, parts: list[MessagePart]) -> None
Add a message to the task.
Parameters:
role(str): Message role (user, assistant, system)parts(list[MessagePart]): Message parts
add_artifact
1
2
3
4
5
def add_artifact(
artifact_type: str,
data: Any,
metadata: dict[str, Any] | None = None
) -> None
Add an artifact to the task.
Parameters:
artifact_type(str): Type of artifactdata(Any): Artifact datametadata(dict, optional): Optional metadata
mark_running
1
def mark_running() -> None
Mark task as running.
mark_completed
1
def mark_completed(result: Any = None) -> None
Mark task as completed.
Parameters:
result(Any, optional): Task result
mark_failed
1
def mark_failed(error: str) -> None
Mark task as failed.
Parameters:
error(str): Error message
mark_cancelled
1
def mark_cancelled() -> None
Mark task as cancelled.
is_terminal
1
def is_terminal() -> bool
Check if task is in a terminal state.
Returns: True if state is COMPLETED, FAILED, or CANCELLED
to_jsonrpc
1
def to_jsonrpc(method: str = "task.send") -> dict[str, Any]
Convert task to JSON-RPC 2.0 request format.
Parameters:
method(str): JSON-RPC method name (default: "task.send")
Returns: JSON-RPC request dictionary
to_dict
1
def to_dict() -> dict[str, Any]
Convert task to dictionary.
Returns: Dictionary representation
TaskState
Import: from agentweave.comms.a2a import TaskState
Task lifecycle states enumeration.
1
2
3
4
5
6
class TaskState(str, Enum):
PENDING = "pending" # Task created, not started
RUNNING = "running" # Task is being processed
COMPLETED = "completed" # Task completed successfully
FAILED = "failed" # Task failed with error
CANCELLED = "cancelled" # Task was cancelled
Method
1
def is_terminal() -> bool
Check if this is a terminal state.
Returns: True for COMPLETED, FAILED, CANCELLED
TaskManager
Import: from agentweave.comms.a2a import TaskManager
Manages task lifecycle and status tracking for long-running tasks.
Constructor
1
TaskManager()
Methods
create_task
1
2
3
4
5
6
async def create_task(
task_type: str,
payload: dict[str, Any] | None = None,
messages: list[Message] | None = None,
metadata: dict[str, Any] | None = None
) -> Task
Create a new task.
Parameters:
task_type(str): Type of taskpayload(dict, optional): Task payloadmessages(list[Message], optional): Initial messagesmetadata(dict, optional): Task metadata
Returns: Created task
get_task
1
async def get_task(task_id: str) -> Task | None
Retrieve a task by ID.
Parameters:
task_id(str): Task ID
Returns: Task if found, None otherwise
update_task
1
2
3
4
5
6
async def update_task(
task_id: str,
state: TaskState | None = None,
result: Any | None = None,
error: str | None = None
) -> Task | None
Update task state and data.
Parameters:
task_id(str): Task IDstate(TaskState, optional): New stateresult(Any, optional): Task resulterror(str, optional): Error message
Returns: Updated task if found, None otherwise
delete_task
1
async def delete_task(task_id: str) -> bool
Delete a task.
Parameters:
task_id(str): Task ID
Returns: True if deleted, False if not found
list_tasks
1
2
3
4
async def list_tasks(
state: TaskState | None = None,
task_type: str | None = None
) -> list[Task]
List tasks with optional filtering.
Parameters:
state(TaskState, optional): Filter by statetask_type(str, optional): Filter by type
Returns: List of matching tasks
wait_for_completion
1
2
3
4
async def wait_for_completion(
task_id: str,
timeout: float | None = None
) -> Task | None
Wait for a task to reach a terminal state.
Parameters:
task_id(str): Task IDtimeout(float, optional): Optional timeout in seconds
Returns: Completed task if found, None otherwise
Raises:
asyncio.TimeoutError: If timeout is reached
cleanup_completed_tasks
1
async def cleanup_completed_tasks(max_age_seconds: int = 3600) -> int
Clean up old completed tasks.
Parameters:
max_age_seconds(int): Maximum age for completed tasks (default: 3600)
Returns: Number of tasks cleaned up
Discovery
DiscoveryClient
Import: from agentweave.comms import DiscoveryClient
Client for discovering agents via well-known endpoints with caching.
Constructor
1
2
3
4
5
DiscoveryClient(
cache_ttl: int = 300,
timeout: float = 10.0,
enable_cache: bool = True
)
Parameters:
cache_ttl(int): Cache TTL in seconds (default: 300)timeout(float): Request timeout in seconds (default: 10.0)enable_cache(bool): Enable agent card caching (default: True)
Methods
discover_agent
1
2
3
4
async def discover_agent(
url: str,
force_refresh: bool = False
) -> AgentCard
Discover agent by URL.
Parameters:
url(str): Base URL of the agentforce_refresh(bool): Force cache refresh (default: False)
Returns: AgentCard for the discovered agent
Raises:
DiscoveryError: If discovery fails
discover_by_spiffe_id
1
2
3
4
async def discover_by_spiffe_id(
spiffe_id: str,
service_mesh_resolver: callable | None = None
) -> AgentCard
Discover agent by SPIFFE ID using service mesh resolver.
Parameters:
spiffe_id(str): SPIFFE ID of the agentservice_mesh_resolver(callable, optional): Function to resolve SPIFFE ID to URL
Returns: AgentCard for the agent
Raises:
DiscoveryError: If discovery failsValueError: If service_mesh_resolver not provided
discover_multiple
1
2
3
4
async def discover_multiple(
urls: list[str],
ignore_errors: bool = False
) -> list[AgentCard]
Discover multiple agents concurrently.
Parameters:
urls(list[str]): List of agent URLsignore_errors(bool): Continue on errors (default: False)
Returns: List of discovered agent cards
Raises:
DiscoveryError: If any discovery fails and ignore_errors=False
verify_agent_capability
1
2
3
4
async def verify_agent_capability(
url: str,
capability_name: str
) -> bool
Verify that an agent has a specific capability.
Parameters:
url(str): Agent URLcapability_name(str): Name of capability to check
Returns: True if agent has capability, False otherwise
Raises:
DiscoveryError: If discovery fails
find_agents_with_capability
1
2
3
4
async def find_agents_with_capability(
urls: list[str],
capability_name: str
) -> list[AgentCard]
Find all agents with a specific capability.
Parameters:
urls(list[str]): List of agent URLs to checkcapability_name(str): Capability to search for
Returns: List of agent cards with the capability
clear_cache
1
async def clear_cache(url: str | None = None) -> None
Clear agent card cache.
Parameters:
url(str, optional): Specific URL to clear (None = clear all)
get_cache_stats
1
def get_cache_stats() -> dict[str, int]
Get cache statistics.
Returns: Dictionary with cache stats (total_entries, expired_entries, active_entries)
close
1
async def close() -> None
Close HTTP client and cleanup resources.
Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from agentweave.comms import DiscoveryClient
async with DiscoveryClient(cache_ttl=300) as discovery:
# Discover single agent
card = await discovery.discover_agent("https://agent.example.com")
# Find agents with specific capability
agents = await discovery.find_agents_with_capability(
urls=["https://agent1.com", "https://agent2.com"],
capability_name="search"
)
# Get cache stats
stats = discovery.get_cache_stats()
Exceptions
A2AClientError
Base exception for A2A client errors.
TaskSubmissionError
Error submitting task to remote agent.
TaskStatusError
Error retrieving task status.
DiscoveryError
Error discovering agent card.
Complete Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from agentweave.comms.a2a import A2AClient, A2AServer, AgentCard, Task, TaskState
from agentweave.comms import DiscoveryClient
# Server side
card = AgentCard.from_config(
name="data-processor",
description="Process data",
url="https://processor.example.com",
spiffe_id="spiffe://example.com/processor"
)
server = A2AServer(agent_card=card)
async def process_data_handler(task: Task) -> Task:
data = task.payload.get("data")
result = {"processed": data.upper()}
task.mark_completed(result=result)
return task
server.register_task_handler("process", process_data_handler)
app = server.get_app() # Use with uvicorn
# Client side
async with A2AClient() as client:
# Discover agent
card = await client.discover_agent("https://processor.example.com")
# Send task
task = await client.send_task(
target_url="https://processor.example.com",
task_type="process",
payload={"data": "hello world"}
)
# Wait for completion
completed = await client.poll_until_complete(
target_url="https://processor.example.com",
task_id=task.id
)
print(f"Result: {completed.result}")