Skip to main content

Cloud & Design Automation

RevitPy includes a cloud layer for running Revit workloads through the Autodesk Platform Services (APS) Design Automation API. The revitpy.cloud module provides OAuth2 authentication, job submission and monitoring, parallel batch processing with retry, CI/CD pipeline generation, and webhook event handling.

Install the cloud extras with:

pip install revitpy[cloud]

Quick Start

For simple one-shot operations, use the convenience functions at module level:

from revitpy.cloud import submit_job, batch_process, generate_ci_config
from revitpy.cloud import ApsCredentials, JobConfig, BatchConfig

credentials = ApsCredentials(
    client_id="your-client-id",
    client_secret="your-client-secret",
)

# Submit a single job
job_id = await submit_job(
    credentials,
    JobConfig(
        activity_id="RevitPy.Validate+prod",
        input_file="https://bucket.s3.amazonaws.com/model.rvt",
    ),
)

# Batch-process multiple jobs
configs = [
    JobConfig(activity_id="RevitPy.Validate+prod", input_file=f)
    for f in rvt_file_urls
]
result = await batch_process(
    credentials,
    configs,
    batch_config=BatchConfig(max_parallel=5, retry_count=2),
)
print(f"{result.completed}/{result.total_jobs} succeeded")

# Generate a CI pipeline
yaml = generate_ci_config(provider="github", script_path="validate.py")

ApsCredentials

ApsCredentials holds the OAuth2 client credentials used throughout the cloud module.

Field Type Default Description
client_id str APS application client ID
client_secret str APS application client secret
region CloudRegion CloudRegion.US Target cloud region
from revitpy.cloud import ApsCredentials, CloudRegion

credentials = ApsCredentials(
    client_id="your-client-id",
    client_secret="your-client-secret",
    region=CloudRegion.EMEA,
)

CloudRegion Enum

Value Description
US United States region
EMEA Europe, Middle East, and Africa region

ApsAuthenticator

ApsAuthenticator implements the OAuth2 client-credentials flow against the APS token endpoint. It caches the issued token and transparently refreshes it when it nears expiry (with a 60-second buffer).

from revitpy.cloud import ApsAuthenticator, ApsCredentials

credentials = ApsCredentials(
    client_id="your-client-id",
    client_secret="your-client-secret",
)
auth = ApsAuthenticator(credentials)

# Perform a fresh authentication
token = await auth.authenticate()
print(token.access_token, token.expires_in, token.scope)

# Get a cached token (auto-refreshes if expired)
token = await auth.get_token()

# Check validity manually
if auth.is_token_valid():
    print("Token is still valid")

ApsToken Dataclass

Field Type Default Description
access_token str OAuth2 access token string
token_type str "Bearer" Token type
expires_in int 3600 Token lifetime in seconds
scope str "" Granted OAuth2 scopes
issued_at float time.time() Unix timestamp when the token was issued

The is_expired property returns True when the current time is within 60 seconds of expiry:

if token.is_expired:
    token = await auth.authenticate()

AuthMethod Enum

Value Description
CLIENT_CREDENTIALS Service-to-service OAuth2 flow (used by ApsAuthenticator)
AUTHORIZATION_CODE User-interactive three-legged OAuth2 flow
DEVICE_CODE Device authorization grant for headless environments

ApsClient

ApsClient is the base HTTP client for all APS API requests. It wraps httpx.AsyncClient and provides automatic Bearer-token injection, sliding-window rate limiting (20 requests per second), and exponential-backoff retry on transient failures.

Creating a Client

from revitpy.cloud import ApsAuthenticator, ApsClient, CloudRegion

auth = ApsAuthenticator(credentials)
client = ApsClient(auth, region=CloudRegion.US)

Making Requests

The client exposes get, post, and delete convenience methods, plus a general request method. All return parsed JSON as a dict:

# GET request
data = await client.get("/da/us-east/v3/workitems/abc123")

# POST request
data = await client.post("/da/us-east/v3/workitems", json=payload)

# DELETE request
data = await client.delete("/da/us-east/v3/workitems/abc123")

# General request
data = await client.request("PATCH", "/some/endpoint", json=body)

Rate Limiting and Retry

ApsClient enforces a sliding-window rate limit of 20 requests per second. When the limit is reached, subsequent requests are delayed until a slot opens.

Transient HTTP errors (status codes 429, 500, 502, 503) and connection errors are retried up to 3 times with exponential backoff starting at 1 second. Non-retryable errors raise ApsApiError immediately.

Setting Value
Max requests per second 20
Retryable status codes 429, 500, 502, 503
Max retries 3
Initial backoff 1.0 second
Backoff multiplier 2x per attempt

JobManager

JobManager wraps the APS Design Automation v3 WorkItems API and provides high-level operations for submitting, polling, downloading, and cancelling cloud-based Revit processing jobs.

Creating a Job Manager

from revitpy.cloud import ApsAuthenticator, ApsClient, JobManager

auth = ApsAuthenticator(credentials)
client = ApsClient(auth)
manager = JobManager(client)

Submitting a Job

from revitpy.cloud import JobConfig

config = JobConfig(
    activity_id="RevitPy.Validate+prod",
    input_file="https://bucket.s3.amazonaws.com/model.rvt",
    output_file="https://bucket.s3.amazonaws.com/result.json",
    script_path="https://bucket.s3.amazonaws.com/validate.py",
    parameters={"version": "2024"},
    timeout=600.0,
)

job_id = await manager.submit(config)
print(f"Submitted: {job_id}")

JobConfig Dataclass

Field Type Default Description
activity_id str Design Automation activity identifier
input_file str \| Path URL or path to the input Revit file
output_file str \| Path \| None None URL or path for the output file
script_path str \| Path \| None None URL or path to the processing script
parameters dict[str, Any] {} Additional parameters passed to the activity
timeout float 600.0 Maximum job duration in seconds

Polling for Completion

# Check status once
status = await manager.get_status(job_id)
print(status)  # JobStatus.RUNNING

# Poll until terminal state (blocks up to timeout)
result = await manager.wait_for_completion(
    job_id,
    timeout=600.0,
    poll_interval=5.0,
)
print(result.status, result.duration_ms)

wait_for_completion raises JobExecutionError if the job fails or times out. On success, it returns a JobResult.

Downloading Results

from pathlib import Path

downloaded = await manager.download_results(
    job_id,
    output_dir=Path("./results"),
)
for path in downloaded:
    print(f"Downloaded: {path}")

Cancelling a Job

success = await manager.cancel(job_id)
if success:
    print("Job cancelled")

Retrieving Logs

logs = await manager.get_logs(job_id)
print(logs)

JobResult Dataclass

Field Type Default Description
job_id str Work-item identifier
status JobStatus Final status of the job
output_files list[str] [] URLs of output files
logs str "" URL to the execution report
duration_ms float 0.0 Total execution time in milliseconds
error str \| None None Error message if the job failed

JobStatus Enum

Value Description
PENDING Job has been accepted but not yet queued
QUEUED Job is waiting for a processing slot
RUNNING Job is actively executing
COMPLETED Job finished successfully
FAILED Job encountered an error
CANCELLED Job was cancelled by the caller
TIMED_OUT Job exceeded its timeout

BatchProcessor

BatchProcessor runs multiple JobConfig items in parallel with bounded concurrency, automatic retry on failure, and optional progress and cancellation callbacks.

Creating a Batch Processor

from revitpy.cloud import JobManager, BatchProcessor, BatchConfig

processor = BatchProcessor(
    job_manager=manager,
    config=BatchConfig(
        max_parallel=5,
        retry_count=2,
        retry_delay=30.0,
        continue_on_error=True,
    ),
)

BatchConfig Dataclass

Field Type Default Description
max_parallel int 5 Maximum concurrent jobs
retry_count int 2 Number of retries per failed job
retry_delay float 30.0 Seconds to wait between retries
continue_on_error bool True Continue processing remaining jobs when one fails

Processing a List of Jobs

import asyncio

configs = [
    JobConfig(activity_id="RevitPy.Validate+prod", input_file=url)
    for url in file_urls
]

# Optional progress callback
def on_progress(completed: int, total: int):
    print(f"Progress: {completed}/{total}")

# Optional cancellation event
cancel = asyncio.Event()

result = await processor.process(
    configs,
    progress=on_progress,
    cancel=cancel,
)

print(f"Total: {result.total_jobs}")
print(f"Completed: {result.completed}")
print(f"Failed: {result.failed}")
print(f"Cancelled: {result.cancelled}")
print(f"Duration: {result.total_duration_ms:.0f}ms")

# Inspect individual results
for job_result in result.results:
    print(job_result.job_id, job_result.status.value)

When continue_on_error is False, the processor sets the cancel event on the first failure, preventing new jobs from starting.

Processing a Directory of .rvt Files

process_directory is a convenience method that discovers all .rvt files in a directory, creates a JobConfig for each, and processes them:

from pathlib import Path

result = await processor.process_directory(
    input_dir=Path("./models"),
    script_path=Path("./scripts/validate.py"),
    activity_id="RevitPy.Validate+prod",
)
print(f"Processed {result.total_jobs} Revit files")

BatchResult Dataclass

Field Type Default Description
total_jobs int 0 Total number of jobs submitted
completed int 0 Number of successfully completed jobs
failed int 0 Number of failed jobs
cancelled int 0 Number of cancelled jobs
results list[JobResult] [] Per-job results
total_duration_ms float 0.0 Total batch processing time in milliseconds

CIHelper

CIHelper generates CI/CD pipeline configurations for GitHub Actions and GitLab CI that automate Revit model validation using Design Automation.

Generating a GitHub Actions Workflow

from revitpy.cloud import CIHelper

helper = CIHelper()

yaml = helper.generate_github_workflow(
    name="revitpy-validation",
    script_path="validate.py",
    revit_version="2024",
    branches="main",
    runner="ubuntu-latest",
    python_version="3.11",
)
print(yaml)
Parameter Type Default Description
name str "revitpy-validation" Workflow name
script_path str "validate.py" Path to the validation script
revit_version str "2024" Target Revit version
branches str "main" Comma-separated branch triggers
runner str "ubuntu-latest" GitHub Actions runner label
python_version str "3.11" Python version

Generating a GitLab CI Pipeline

yaml = helper.generate_gitlab_ci(
    name="revitpy-validation",
    script_path="validate.py",
    revit_version="2024",
    python_version="3.11",
)
print(yaml)
Parameter Type Default Description
name str "revitpy-validation" Job name
script_path str "validate.py" Path to the validation script
revit_version str "2024" Target Revit version
python_version str "3.11" Python version for the Docker image

Using the Convenience Function

The module-level generate_ci_config function dispatches to the appropriate generator:

from revitpy.cloud import generate_ci_config

github_yaml = generate_ci_config(provider="github", script_path="validate.py")
gitlab_yaml = generate_ci_config(provider="gitlab", revit_version="2025")

Raises ValueError if provider is not "github" or "gitlab".

Saving to Disk

from pathlib import Path

path = helper.save_workflow(
    content=yaml,
    output_path=".github/workflows/revitpy-validation.yml",
)
print(f"Saved to {path}")

save_workflow creates parent directories automatically and returns the resolved Path.

WebhookHandler

WebhookHandler receives, verifies, and routes incoming APS webhook events. It supports HMAC-SHA256 signature verification and event-type-based callback dispatch.

Setting Up a Handler

from revitpy.cloud import WebhookHandler, WebhookConfig

handler = WebhookHandler(
    config=WebhookConfig(
        url="https://myapp.example.com/webhooks/aps",
        secret="your-webhook-secret",
        events=["job.completed", "job.failed"],
    ),
)

WebhookConfig Dataclass

Field Type Default Description
url str Webhook listener URL
secret str Shared secret for HMAC verification
events list[str] [] Event types to listen for

Verifying Signatures

is_valid = handler.verify_signature(
    payload=request.body,       # raw bytes
    signature=request.headers["X-Signature"],  # hex HMAC-SHA256
)
if not is_valid:
    raise ValueError("Invalid webhook signature")

Raises WebhookError if no secret is configured.

Handling Events

event = handler.handle_event(event_data={
    "eventType": "job.completed",
    "jobId": "abc123",
    "status": "completed",
    "timestamp": "2025-01-15T10:30:00Z",
})
print(event.event_type, event.job_id, event.status)

handle_event parses the payload into a WebhookEvent, dispatches registered callbacks, and returns the event. Raises WebhookError if required fields (like eventType) are missing.

WebhookEvent Dataclass

Field Type Default Description
event_type str Event type identifier
job_id str Associated job identifier
status JobStatus Job status at the time of the event
timestamp str ISO timestamp of the event
payload dict[str, Any] {} Full raw event payload

Registering Callbacks

Register callbacks for specific event types. Callbacks receive a WebhookEvent argument. Use "*" as the event type to listen for all events:

def on_completed(event):
    print(f"Job {event.job_id} completed")

def on_failed(event):
    print(f"Job {event.job_id} failed")

def on_any(event):
    print(f"Event: {event.event_type}")

handler.register_callback("job.completed", on_completed)
handler.register_callback("job.failed", on_failed)
handler.register_callback("*", on_any)  # wildcard listener

Multiple callbacks can be registered for the same event type. They are invoked in registration order.

Error Handling

All cloud errors inherit from CloudError. Specific exception types let you handle different failure modes:

Exception Description
CloudError Base exception for all cloud errors
AuthenticationError OAuth2 authentication failed
ApsApiError APS API returned an error response
JobSubmissionError Work-item submission was rejected
JobExecutionError Job failed during execution or timed out
WebhookError Webhook verification or event parsing failed
from revitpy.cloud import (
    AuthenticationError,
    ApsApiError,
    JobSubmissionError,
    JobExecutionError,
    WebhookError,
)

try:
    job_id = await manager.submit(config)
    result = await manager.wait_for_completion(job_id)
except AuthenticationError as exc:
    print(f"Auth failed: {exc}")
except JobSubmissionError as exc:
    print(f"Submission rejected: {exc}")
except JobExecutionError as exc:
    print(f"Job failed: {exc}")
except ApsApiError as exc:
    print(f"API error: {exc}")