Data Processing Pipeline Example

Complexity: Intermediate Time to Complete: 40 minutes Prerequisites: Understanding of ETL patterns, basic data processing

This example demonstrates a real-world data processing pipeline using the agent pattern. Three specialized agents (Ingester, Processor, Storage) work together to process streaming data with security and observability built-in.

What You'll Learn

  • Pipeline pattern with agent orchestration
  • Stream processing with agents
  • Error handling and retry logic
  • Data validation and transformation
  • Monitoring pipeline health
  • Backpressure handling

Use Case

Scenario: Process customer event data from various sources

  • Ingester Agent: Receives events from external sources, validates, normalizes
  • Processor Agent: Enriches data, applies business rules, filters
  • Storage Agent: Persists to database, sends to data warehouse

Architecture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
┌─────────────────────────────────────────────────────────────────┐
│                         Data Sources                            │
│                  (APIs, Webhooks, Message Queues)               │
└────────────────────────┬────────────────────────────────────────┘
                         │
                         │ Raw Events
                         │
                         ▼
              ┌──────────────────────┐
              │   Ingester Agent     │
              │                      │
              │  • Validate schema   │
              │  • Normalize format  │
              │  • Rate limiting     │
              │  • Deduplication     │
              └──────────┬───────────┘
                         │
                         │ Validated Events
                         │
                         ▼
              ┌──────────────────────┐
              │  Processor Agent     │
              │                      │
              │  • Enrich data       │
              │  • Apply rules       │
              │  • Filter/transform  │
              │  • Aggregate         │
              └──────────┬───────────┘
                         │
                         │ Processed Events
                         │
                         ▼
              ┌──────────────────────┐
              │   Storage Agent      │
              │                      │
              │  • Save to DB        │
              │  • Send to warehouse │
              │  • Update indexes    │
              │  • Trigger analytics │
              └──────────────────────┘
                         │
                         ▼
              ┌──────────────────────┐
              │  Data Stores         │
              │  • PostgreSQL        │
              │  • S3/Data Lake      │
              │  • Search Index      │
              └──────────────────────┘

All communication secured with:
- SPIFFE identity
- mTLS transport
- OPA authorization
- Full audit trail

Data Flow Diagram

1
2
3
4
5
6
7
8
9
10
11
External Event → Ingester → Processor → Storage → Database
                     │           │           │
                     │           │           └─→ Data Warehouse
                     │           └─→ Metrics
                     └─→ Dead Letter Queue (if invalid)

Each arrow represents:
- A2A protocol message
- mTLS connection
- OPA policy check
- Distributed trace span

Complete Code

Ingester Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# ingester_agent.py
"""
Ingester Agent - Receives and validates incoming events.

Responsibilities:
- Accept events from external sources
- Validate schema and format
- Normalize data structure
- Rate limiting
- Send to processor
"""

import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime
from pydantic import BaseModel, ValidationError

from agentweave import SecureAgent, capability
from agentweave.types import TaskResult, Message, DataPart
from agentweave.exceptions import AgentCallError


class Event(BaseModel):
    """Event schema."""
    event_id: str
    event_type: str
    timestamp: datetime
    user_id: str
    properties: Dict[str, Any]
    source: str


class IngesterAgent(SecureAgent):
    """Ingests and validates events from external sources."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.processor_id = "spiffe://agentweave.io/agent/processor"

        # Metrics
        self._events_received = 0
        self._events_validated = 0
        self._events_rejected = 0

    @capability("ingest_event")
    async def ingest_event(self, event_data: Dict[str, Any]) -> TaskResult:
        """
        Ingest a single event.

        Process:
        1. Validate event schema
        2. Normalize format
        3. Check for duplicates
        4. Send to processor
        """
        self._events_received += 1

        try:
            # Validate event schema
            event = Event(**event_data)

            # Check for duplicates (simplified)
            if await self._is_duplicate(event.event_id):
                self.logger.warning(
                    "Duplicate event detected",
                    extra={"event_id": event.event_id}
                )
                return TaskResult(
                    status="completed",
                    messages=[Message(
                        role="assistant",
                        parts=[DataPart(data={
                            "status": "duplicate",
                            "event_id": event.event_id
                        })]
                    )]
                )

            # Normalize event
            normalized = await self._normalize_event(event)

            # Send to processor
            result = await self.call_agent(
                target=self.processor_id,
                task_type="process_event",
                payload={
                    "event": normalized,
                    "ingested_at": datetime.utcnow().isoformat(),
                    "ingester_id": str(self.spiffe_id)
                },
                timeout=30.0
            )

            if result.status == "completed":
                self._events_validated += 1
                self.logger.info(
                    "Event ingested successfully",
                    extra={
                        "event_id": event.event_id,
                        "event_type": event.event_type
                    }
                )
            else:
                raise AgentCallError(f"Processor failed: {result.error}")

            return TaskResult(
                status="completed",
                messages=[Message(
                    role="assistant",
                    parts=[DataPart(data={
                        "status": "ingested",
                        "event_id": event.event_id,
                        "processed": True
                    })]
                )]
            )

        except ValidationError as e:
            self._events_rejected += 1
            self.logger.error(
                "Event validation failed",
                extra={"errors": str(e), "event_data": event_data}
            )

            # Send to dead letter queue
            await self._send_to_dlq(event_data, str(e))

            return TaskResult(
                status="failed",
                error=f"Invalid event schema: {e}"
            )

        except AgentCallError as e:
            self.logger.error(f"Failed to send to processor: {e}")
            # Could implement retry logic here
            return TaskResult(
                status="failed",
                error=f"Processing failed: {e}"
            )

    @capability("ingest_batch")
    async def ingest_batch(
        self,
        events: List[Dict[str, Any]]
    ) -> TaskResult:
        """
        Ingest a batch of events.

        Processes events in parallel with backpressure control.
        """
        self.logger.info(f"Ingesting batch of {len(events)} events")

        # Process events with concurrency limit
        semaphore = asyncio.Semaphore(10)  # Max 10 concurrent

        async def process_one(event_data):
            async with semaphore:
                return await self.ingest_event(event_data)

        results = await asyncio.gather(
            *[process_one(e) for e in events],
            return_exceptions=True
        )

        # Aggregate results
        succeeded = sum(1 for r in results if isinstance(r, TaskResult) and r.status == "completed")
        failed = len(results) - succeeded

        return TaskResult(
            status="completed",
            messages=[Message(
                role="assistant",
                parts=[DataPart(data={
                    "total": len(events),
                    "succeeded": succeeded,
                    "failed": failed
                })]
            )]
        )

    @capability("health")
    async def health(self) -> TaskResult:
        """Get ingester health metrics."""
        return TaskResult(
            status="completed",
            messages=[Message(
                role="assistant",
                parts=[DataPart(data={
                    "agent": "ingester",
                    "status": "healthy",
                    "metrics": {
                        "events_received": self._events_received,
                        "events_validated": self._events_validated,
                        "events_rejected": self._events_rejected,
                        "rejection_rate": self._events_rejected / max(self._events_received, 1)
                    }
                })]
            )]
        )

    async def _is_duplicate(self, event_id: str) -> bool:
        """Check if event was already processed."""
        # In production, check Redis or database
        return False

    async def _normalize_event(self, event: Event) -> Dict[str, Any]:
        """Normalize event format."""
        return {
            "event_id": event.event_id,
            "event_type": event.event_type,
            "timestamp": event.timestamp.isoformat(),
            "user_id": event.user_id,
            "properties": event.properties,
            "source": event.source,
            "version": "1.0"
        }

    async def _send_to_dlq(self, event_data: Dict[str, Any], error: str):
        """Send invalid event to dead letter queue."""
        self.logger.warning(
            "Sending to DLQ",
            extra={"event": event_data, "error": error}
        )
        # In production, send to SQS, Kafka, etc.


async def main():
    agent = IngesterAgent.from_config("config/ingester.yaml")
    await agent.run()


if __name__ == "__main__":
    asyncio.run(main())

Processor Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# processor_agent.py
"""
Processor Agent - Enriches and transforms events.

Responsibilities:
- Enrich event data (lookup user info, etc.)
- Apply business rules
- Filter and transform
- Send to storage
"""

import asyncio
from typing import Dict, Any
from datetime import datetime

from agentweave import SecureAgent, capability, requires_peer
from agentweave.types import TaskResult, Message, DataPart
from agentweave.exceptions import AgentCallError


class ProcessorAgent(SecureAgent):
    """Processes and enriches events."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.storage_id = "spiffe://agentweave.io/agent/storage"

        # Metrics
        self._events_processed = 0
        self._events_filtered = 0

    @capability("process_event")
    @requires_peer("spiffe://agentweave.io/agent/ingester")
    async def process_event(
        self,
        event: Dict[str, Any],
        ingested_at: str,
        ingester_id: str
    ) -> TaskResult:
        """
        Process an event.

        Processing pipeline:
        1. Enrich with additional data
        2. Apply business rules
        3. Transform format
        4. Send to storage
        """
        self.logger.info(
            "Processing event",
            extra={
                "event_id": event["event_id"],
                "event_type": event["event_type"]
            }
        )

        # Step 1: Enrich event
        enriched = await self._enrich_event(event)

        # Step 2: Apply business rules
        if not await self._should_process(enriched):
            self._events_filtered += 1
            self.logger.debug(
                "Event filtered by business rules",
                extra={"event_id": event["event_id"]}
            )
            return TaskResult(
                status="completed",
                messages=[Message(
                    role="assistant",
                    parts=[DataPart(data={"status": "filtered"})]
                )]
            )

        # Step 3: Transform
        transformed = await self._transform_event(enriched)

        # Step 4: Send to storage
        try:
            result = await self.call_agent(
                target=self.storage_id,
                task_type="store_event",
                payload={
                    "event": transformed,
                    "processed_at": datetime.utcnow().isoformat(),
                    "processor_id": str(self.spiffe_id)
                },
                timeout=30.0
            )

            if result.status != "completed":
                raise AgentCallError(f"Storage failed: {result.error}")

            self._events_processed += 1

            return TaskResult(
                status="completed",
                messages=[Message(
                    role="assistant",
                    parts=[DataPart(data={
                        "status": "processed",
                        "event_id": event["event_id"]
                    })]
                )]
            )

        except AgentCallError as e:
            self.logger.error(f"Failed to store event: {e}")
            return TaskResult(
                status="failed",
                error=f"Storage failed: {e}"
            )

    async def _enrich_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
        """
        Enrich event with additional data.

        In production:
        - Lookup user profile
        - Geo-location from IP
        - Product catalog data
        """
        enriched = event.copy()

        # Simulate user lookup
        user_data = await self._lookup_user(event["user_id"])
        enriched["user"] = user_data

        # Add computed fields
        enriched["computed"] = {
            "day_of_week": datetime.fromisoformat(event["timestamp"]).strftime("%A"),
            "hour": datetime.fromisoformat(event["timestamp"]).hour
        }

        return enriched

    async def _should_process(self, event: Dict[str, Any]) -> bool:
        """
        Apply business rules to determine if event should be processed.

        Examples:
        - Filter test users
        - Skip certain event types
        - Apply sampling
        """
        # Skip test users
        if event.get("user", {}).get("is_test", False):
            return False

        # Skip certain event types
        skip_types = ["heartbeat", "debug"]
        if event["event_type"] in skip_types:
            return False

        return True

    async def _transform_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
        """
        Transform event to storage format.

        - Flatten nested structures
        - Convert types
        - Add metadata
        """
        return {
            "id": event["event_id"],
            "type": event["event_type"],
            "timestamp": event["timestamp"],
            "user_id": event["user_id"],
            "user_name": event.get("user", {}).get("name"),
            "user_tier": event.get("user", {}).get("tier", "free"),
            "properties": event["properties"],
            "source": event["source"],
            "day_of_week": event["computed"]["day_of_week"],
            "hour": event["computed"]["hour"],
            "version": event["version"]
        }

    async def _lookup_user(self, user_id: str) -> Dict[str, Any]:
        """Lookup user data (mock)."""
        # In production, query user service or database
        return {
            "id": user_id,
            "name": f"User {user_id}",
            "tier": "premium",
            "is_test": False
        }


async def main():
    agent = ProcessorAgent.from_config("config/processor.yaml")
    await agent.run()


if __name__ == "__main__":
    asyncio.run(main())

Storage Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# storage_agent.py
"""
Storage Agent - Persists events to data stores.

Responsibilities:
- Save to database
- Send to data warehouse
- Update search indexes
- Trigger downstream analytics
"""

import asyncio
from typing import Dict, Any
from datetime import datetime

from agentweave import SecureAgent, capability, requires_peer
from agentweave.types import TaskResult, Message, DataPart


class StorageAgent(SecureAgent):
    """Stores events in multiple data stores."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Metrics
        self._events_stored = 0
        self._storage_errors = 0

    @capability("store_event")
    @requires_peer("spiffe://agentweave.io/agent/processor")
    async def store_event(
        self,
        event: Dict[str, Any],
        processed_at: str,
        processor_id: str
    ) -> TaskResult:
        """
        Store event in multiple destinations.

        Destinations:
        1. Primary database (PostgreSQL)
        2. Data warehouse (S3/Parquet)
        3. Search index (Elasticsearch)
        """
        self.logger.info(
            "Storing event",
            extra={"event_id": event["id"], "event_type": event["type"]}
        )

        results = {}

        try:
            # Store in parallel
            db_task = self._store_in_database(event)
            warehouse_task = self._store_in_warehouse(event)
            search_task = self._update_search_index(event)

            db_result, warehouse_result, search_result = await asyncio.gather(
                db_task, warehouse_task, search_task,
                return_exceptions=True
            )

            results["database"] = "success" if not isinstance(db_result, Exception) else str(db_result)
            results["warehouse"] = "success" if not isinstance(warehouse_result, Exception) else str(warehouse_result)
            results["search"] = "success" if not isinstance(search_result, Exception) else str(search_result)

            # Check if critical storage (database) succeeded
            if isinstance(db_result, Exception):
                self._storage_errors += 1
                raise db_result

            self._events_stored += 1

            # Trigger downstream (async, don't wait)
            asyncio.create_task(self._trigger_analytics(event))

            return TaskResult(
                status="completed",
                messages=[Message(
                    role="assistant",
                    parts=[DataPart(data={
                        "status": "stored",
                        "event_id": event["id"],
                        "destinations": results
                    })]
                )]
            )

        except Exception as e:
            self.logger.error(
                f"Storage failed: {e}",
                extra={"event_id": event["id"]}
            )
            return TaskResult(
                status="failed",
                error=f"Storage failed: {e}"
            )

    async def _store_in_database(self, event: Dict[str, Any]):
        """Store in primary database."""
        self.logger.debug(f"Storing event {event['id']} in database")

        # In production, use async database client
        # await db.events.insert_one(event)

        # Simulate database write
        await asyncio.sleep(0.01)

    async def _store_in_warehouse(self, event: Dict[str, Any]):
        """Store in data warehouse."""
        self.logger.debug(f"Storing event {event['id']} in warehouse")

        # In production:
        # - Buffer events
        # - Write to S3 as Parquet
        # - Partition by date
        # await s3_client.write_parquet(event)

        await asyncio.sleep(0.005)

    async def _update_search_index(self, event: Dict[str, Any]):
        """Update search index."""
        self.logger.debug(f"Indexing event {event['id']} in search")

        # In production, use Elasticsearch client
        # await es_client.index(index="events", document=event)

        await asyncio.sleep(0.005)

    async def _trigger_analytics(self, event: Dict[str, Any]):
        """Trigger downstream analytics (fire and forget)."""
        self.logger.debug(f"Triggering analytics for event {event['id']}")

        # In production:
        # - Send to Kafka topic
        # - Trigger Lambda function
        # - Update real-time dashboards

        await asyncio.sleep(0.01)


async def main():
    agent = StorageAgent.from_config("config/storage.yaml")
    await agent.run()


if __name__ == "__main__":
    asyncio.run(main())

Configuration Files

Ingester Configuration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# config/ingester.yaml
agent:
  name: "ingester"
  trust_domain: "agentweave.io"
  description: "Event ingestion and validation"

  capabilities:
    - name: "ingest_event"
      description: "Ingest single event"
    - name: "ingest_batch"
      description: "Ingest batch of events"
    - name: "health"
      description: "Health check"

identity:
  provider: "spiffe"
  spiffe_endpoint: "unix:///run/spire/sockets/agent.sock"
  allowed_trust_domains:
    - "agentweave.io"

authorization:
  provider: "opa"
  opa_endpoint: "http://opa:8181"
  policy_path: "pipeline/authz/ingester"
  default_action: "deny"

server:
  host: "0.0.0.0"
  port: 8443

observability:
  metrics:
    enabled: true
    port: 9090
  tracing:
    enabled: true
    exporter: "otlp"
    endpoint: "http://jaeger:4317"
  logging:
    level: "INFO"

Docker Compose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# docker-compose.yaml
version: '3.8'

services:
  spire-server:
    image: ghcr.io/spiffe/spire-server:1.9.0
    # ... (same as other examples)

  spire-agent:
    image: ghcr.io/spiffe/spire-agent:1.9.0
    # ... (same as other examples)

  opa:
    image: openpolicyagent/opa:0.62.0
    # ... (same as other examples)

  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: events
      POSTGRES_USER: pipeline
      POSTGRES_PASSWORD: pipeline
    ports:
      - "5432:5432"
    volumes:
      - postgres-data:/var/lib/postgresql/data

  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"  # UI
      - "4317:4317"    # OTLP gRPC

  ingester:
    build:
      context: .
      dockerfile: Dockerfile.ingester
    depends_on:
      - spire-agent
      - opa
    ports:
      - "8443:8443"
      - "9090:9090"

  processor:
    build:
      context: .
      dockerfile: Dockerfile.processor
    depends_on:
      - spire-agent
      - opa
      - ingester
    ports:
      - "8444:8443"
      - "9091:9090"

  storage:
    build:
      context: .
      dockerfile: Dockerfile.storage
    depends_on:
      - spire-agent
      - opa
      - processor
      - postgres
    ports:
      - "8445:8443"
      - "9092:9090"

volumes:
  postgres-data:

Running the Pipeline

Step 1: Start Infrastructure

1
docker-compose up -d spire-server spire-agent opa postgres jaeger

Step 2: Register Agents

1
2
3
4
5
6
7
8
# Register all three agents
for agent in ingester processor storage; do
    docker-compose exec spire-server \
        /opt/spire/bin/spire-server entry create \
        -spiffeID spiffe://agentweave.io/agent/$agent \
        -parentID spiffe://agentweave.io/agent/spire-agent \
        -selector docker:label:com.docker.compose.service:$agent
done

Step 3: Start Pipeline

1
docker-compose up -d ingester processor storage

Step 4: Send Test Event

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Single event
agentweave call \
    --target spiffe://agentweave.io/agent/ingester \
    --capability ingest_event \
    --data '{
        "event_data": {
            "event_id": "evt-001",
            "event_type": "user_signup",
            "timestamp": "2025-12-07T10:00:00Z",
            "user_id": "user-123",
            "source": "web",
            "properties": {
                "plan": "premium",
                "referral_code": "FRIEND20"
            }
        }
    }'

# Batch
agentweave call \
    --target spiffe://agentweave.io/agent/ingester \
    --capability ingest_batch \
    --data @sample_events.json

Monitoring

View Traces in Jaeger

1
2
3
4
5
6
# Open Jaeger UI
open http://localhost:16686

# Search for traces:
# Service: ingester, processor, storage
# See complete pipeline flow

Check Metrics

1
2
3
4
5
6
7
8
# Ingester metrics
curl http://localhost:9090/metrics

# Processor metrics
curl http://localhost:9091/metrics

# Storage metrics
curl http://localhost:9092/metrics

Pipeline Health

1
2
3
4
# Check ingester health
agentweave call \
    --target spiffe://agentweave.io/agent/ingester \
    --capability health

Key Takeaways

Pipeline as Agents

Traditional pipeline services become secure agents:

  • Each stage is an independent agent
  • Communication is secured (mTLS, OPA)
  • Observability is built-in (traces, metrics)
  • Resilience through retries, circuit breakers

Backpressure Handling

1
2
3
4
5
6
# Ingester uses semaphore for concurrency control
semaphore = asyncio.Semaphore(10)

async def process_one(event):
    async with semaphore:
        return await self.ingest_event(event)

Distributed Tracing

Every agent call creates a trace span:

1
2
3
4
5
6
7
8
9
Trace: process-event-evt-001
├── Span: ingester.ingest_event (2ms)
├── Span: processor.process_event (15ms)
│   ├── Span: enrich_event (8ms)
│   └── Span: transform_event (2ms)
└── Span: storage.store_event (25ms)
    ├── Span: database_write (10ms)
    ├── Span: warehouse_write (8ms)
    └── Span: search_index (7ms)

Next Steps

  • Stream Processing: Integrate with Kafka for high-volume events
  • Advanced Patterns: See Microservices Example
  • Production: Add retries, dead letter queues, monitoring alerts
  • Scaling: Deploy on Kubernetes with horizontal pod autoscaling

Complete Code: GitHub Repository