Purple AI
LLM-guided analyst workflows and agentic assistance.
Python · FastAPI · LLM Systems · Production AI at SentinelOne
LLM-guided analyst workflows and agentic assistance.
Telemetry ingestion, fusion, and alert correlation at scale.
Agent-based protection and local detection surfaces.
Cloud workload telemetry and posture-aware analytics.
Identity risk, access anomalies, and response automation.
Clarify APIs, data contracts, failure modes, and blast radius before code exists.
Ship typed services, clear boundaries, and operationally safe defaults.
Exercise unit logic, integrations, and evaluation harnesses for AI behavior.
Use controlled rollout, health signals, and rollback-ready releases.
Read traces, errors, token costs, throughput, and analyst feedback.
Close the loop with metrics, PR review, on-call learning, and follow-up fixes.
def validate_threat_event(payload: dict) -> dict:
if "event_id" not in payload:
raise ValueError("missing event_id")
if not isinstance(payload["event_id"], str):
raise TypeError("event_id must be a string")
if "severity" not in payload or payload["severity"] not in {"low", "medium", "high", "critical"}:
raise ValueError("invalid severity")
if "risk_score" not in payload:
raise ValueError("missing risk_score")
payload["risk_score"] = float(payload["risk_score"])
if not 0 <= payload["risk_score"] <= 100:
raise ValueError("risk_score out of range")
if "source_ip" not in payload or payload["source_ip"].count(".") != 3:
raise ValueError("invalid source_ip")
if "context" not in payload or "tenant_id" not in payload["context"]:
raise ValueError("missing nested context.tenant_id")
return payload
from pydantic import BaseModel, ConfigDict, field_validator
class ThreatContext(BaseModel):
tenant_id: str
class ThreatEvent(BaseModel):
model_config = ConfigDict(extra="forbid")
event_id: str
severity: str
risk_score: float
source_ip: str
context: ThreatContext
@field_validator("severity")
@classmethod
def validate_severity(cls, value: str) -> str:
allowed = {"low", "medium", "high", "critical"}
if value not in allowed:
raise ValueError("invalid severity")
return value
model = ThreatEvent(**raw_data)
from datetime import datetime, timezone
from typing import Literal
from uuid import UUID
from pydantic import BaseModel, ConfigDict, IPvAnyAddress, field_validator
class AlertContext(BaseModel):
tenant_id: str
source: str
class ThreatEvent(BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid")
event_id: UUID
severity: Literal["low", "medium", "high", "critical"]
timestamp: datetime
source_ip: IPvAnyAddress
context: AlertContext
@field_validator("timestamp")
@classmethod
def must_be_timezone_aware(cls, value: datetime) -> datetime:
if value.tzinfo is None or value.tzinfo.utcoffset(value) is None:
raise ValueError("timestamp must include timezone")
return value.astimezone(timezone.utc)
from datetime import datetime
from enum import Enum
from pydantic import BaseModel, ConfigDict, Field
class OSType(str, Enum):
linux = "linux"
windows = "windows"
macos = "macos"
class EndpointAgent(BaseModel):
model_config = ConfigDict(frozen=True)
agent_id: str
hostname: str
os_type: OSType
threat_score: float = Field(ge=0, le=100)
tags: list[str] = []
last_seen: datetime
class AgentHeartbeat(BaseModel):
model_config = ConfigDict(frozen=True)
heartbeat_id: str
status: str
agent: EndpointAgent
received_at: datetime
from typing import Literal
from pydantic import BaseModel
from pydantic_ai import Agent
class ThreatSummary(BaseModel):
severity: Literal["low", "medium", "high", "critical"]
affected_hosts: list[str]
recommended_action: Literal[
"monitor", "isolate-host", "rotate-credentials", "open-investigation"
]
triage_agent = Agent(
"openai:gpt-4.1",
result_type=ThreatSummary,
system_prompt="Summarise threat telemetry for SOC analysts.",
)
result = await triage_agent.run("Summarise this endpoint incident")
summary = result.output
| Option | Best for | Why |
|---|---|---|
| Dataclasses | Internal compute objects, speed-critical loops | No boundary validation overhead, simple memory shape. |
| Pydantic BaseModel | API requests, responses, config, LLM outputs | Rich validation, serialization, schema, and error reporting. |
| Pydantic dataclass | Middle ground with lightweight validation | Useful when dataclass ergonomics matter but some validation is needed. |
[mypy]
plugins = pydantic.mypy
class AuditSession:
def __enter__(self):
self.conn = open_audit_channel()
return self.conn
def __exit__(self, exc_type, exc, tb):
self.conn.close()
return False
from contextlib import contextmanager
@contextmanager
def audit_session():
conn = open_audit_channel()
try:
yield conn
finally:
conn.close()
from contextlib import asynccontextmanager
import httpx
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
async with httpx.AsyncClient(timeout=10.0) as client:
app.state.http_client = client
app.state.llm_client = build_llm_client(client)
yield
app = FastAPI(lifespan=lifespan)
from fastapi.responses import StreamingResponse
async def stream_llm_tokens(prompt: str):
async for chunk in llm_client.stream(prompt):
yield f"data: {chunk.text}\n\n"
@router.get("/analysis/stream")
async def stream_analysis(prompt: str):
return StreamingResponse(
stream_llm_tokens(prompt),
media_type="text/event-stream",
)
def read_events(lines):
for line in lines:
yield json.loads(line)
def parse_events(records):
for record in records:
yield ThreatEvent.model_validate(record)
def enrich_with_context(events):
for event in events:
yield event, load_context(event.event_id)
batch_write(enrich_with_context(parse_events(read_events(stream))))
AsyncIO is about hiding wait time, not making CPU work magically parallel.
async def fetch_signal(signal_id: str) -> dict:
return await api_client.get(signal_id)
async def fetch_all(ids: list[str]) -> list[dict]:
first = await fetch_signal(ids[0])
rest = await asyncio.gather(*(fetch_signal(i) for i in ids[1:]))
asyncio.create_task(write_audit_log(ids))
return [first, *rest]
Created when async function is called, executed when awaited or scheduled.
Suspends current coroutine until awaited work completes.
Run many awaitables concurrently and aggregate results.
Schedule work independently, but track lifecycle deliberately.
summaries = await asyncio.gather(
*(call_llm(event) for event in events)
)
results = []
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(call_llm(event)) for event in events]
for task in tasks:
results.append(task.result())
semaphore = asyncio.Semaphore(10)
async def score_event(event: ThreatEvent) -> ThreatSummary:
async with semaphore:
return await llm_gateway.score(event)
results = await asyncio.gather(*(score_event(event) for event in events))
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
async def scan_binary(binary: bytes) -> ScanResult:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
executor,
cpu_heavy_fn,
binary,
)
from fastapi import BackgroundTasks
@router.post("/alerts/{event_id}/notify")
async def notify(event_id: str, background_tasks: BackgroundTasks):
event = await service.get_event(event_id)
background_tasks.add_task(send_alert_webhook, event)
return {"status": "queued", "event_id": event_id}
@asynccontextmanager
async def lifespan(app: FastAPI):
async with httpx.AsyncClient() as http_client:
redis = await redis.from_url(settings.redis_url)
llm = build_llm_gateway(http_client)
app.state.http_client = http_client
app.state.redis = redis
app.state.llm = llm
yield
await redis.close()
def get_llm_client(request: Request) -> LLMGateway:
return request.app.state.llm
@router.post("/triage")
async def triage(
payload: ThreatEventRequest,
db: AsyncSession = Depends(get_db_session),
user: User = Depends(get_current_user),
llm: LLMGateway = Depends(get_llm_client),
):
return await service.triage(payload, db, user, llm)
class ThreatEventRequest(BaseModel):
eventId: str | None = None
sourceIp: str | None = None
hostName: str | None = None
rawPayload: dict | None = Noneclass ThreatEvent(BaseModel):
model_config = ConfigDict(frozen=True)
event_id: UUID
source_ip: IPvAnyAddress
hostname: str
payload: ThreatPayloaddef to_domain_model(request: ThreatEventRequest) -> ThreatEvent:
return ThreatEvent(
event_id=request.eventId,
source_ip=request.sourceIp,
hostname=request.hostName,
payload=request.rawPayload,
)
class ErrorResponse(BaseModel):
code: str
message: str
trace_id: str
@app.exception_handler(ServiceUnavailableError)
async def service_unavailable_handler(request: Request, exc: ServiceUnavailableError):
payload = ErrorResponse(
code="upstream_unavailable",
message=str(exc),
trace_id=request.state.trace_id,
)
return JSONResponse(status_code=503, content=payload.model_dump())
raise HTTPException(status_code=404, detail={"code": "not_found", "entity": "event"})
trace_id_var: ContextVar[str] = ContextVar("trace_id")
@app.middleware("http")
async def inject_trace_id(request: Request, call_next):
trace_id = request.headers.get("x-trace-id", str(uuid4()))
request.state.trace_id = trace_id
token = trace_id_var.set(trace_id)
try:
response = await call_next(request)
response.headers["x-trace-id"] = trace_id
return response
finally:
trace_id_var.reset(token)
Mock dependencies and assert business decisions, branching, and mapping logic.
Use TestClient and overrides against real adapters such as Dockerized Postgres or Redis.
Validate request and response shapes against OpenAPI and consumer expectations.
app.dependency_overrides[get_db] = get_test_db
client = TestClient(app)
response = client.post("/triage", json=payload)
assert response.status_code == 200
System prompt and tool definitions.
RAG context and short memory window.
Trade off recall, recency, and action space.
@agent.tool
async def lookup_cve(cve_id: str) -> CVERecord:
return await cve_client.fetch(cve_id)
@agent.tool
async def search_endpoint(hostname: str) -> EndpointRecord:
return await graph_client.lookup_host(hostname)
result = await agent.run("Investigate suspicious PowerShell activity")
@tool
async def list_compromised_hosts(campaign_id: str) -> list[str]:
return await singularity_client.list_hosts(campaign_id)
@tool
async def fetch_indicator(indicator: str) -> IndicatorRecord:
return await singularity_client.get_indicator(indicator)
Prevent infinite loops and exploding latency.
Reduce ambiguity when the action surface is safety-sensitive.
Gate host isolation or high-severity actions behind review.
Trace LLM calls, tool invocations, tokens, and retries end to end.
Golden datasets and automated scoring to anchor improvements.
Critic models can scale comparative review with clear prompts.
Run on every deploy to catch prompt, retriever, or tool drift.
Feed analyst labels back into prioritization and fine-tuning loops.
class EvalResult(BaseModel):
query_id: str
grounded: bool
score: float
result = EvalResult(query_id="q-102", grounded=True, score=0.91)
| Metric | Why it matters | Tooling |
|---|---|---|
| Latency percentiles | Measure user-visible delay per model and route. | OpenTelemetry, Grafana |
| Token input and output | Track cost, prompt bloat, and response size. | LangSmith, vendor telemetry |
| Cost per request | Prevent silent spend expansion. | Billing tags, warehouse exports |
| Tool call success | Catch broken integrations quickly. | Trace spans, structured logs |
| Retrieval precision@k | Evaluate grounding quality. | Arize Phoenix, internal eval harness |
| Human feedback score | Link output quality to analyst reality. | Feedback pipeline |
Retry, backoff, timeout, and circuit breaker for straightforward APIs.
High-throughput internal services where schema control and efficiency matter.
Event-driven pipelines for telemetry fan-in and asynchronous processing.
| Pattern | Use when |
|---|---|
| REST | Human-readable APIs, moderate volume, broad compatibility. |
| gRPC | Strict contracts and high-throughput internal RPC. |
| Kafka | Loose coupling, replay, and streaming workloads. |
| Store | Best for | Singularity use case |
|---|---|---|
| Postgres | Structured, transactional data | Agent state, audits, workflow records |
| DynamoDB / Mongo | Flexible schemas, high write throughput | Event metadata and enrichment caches |
| Vector DB | Semantic retrieval | Threat intel RAG |
| Redis | Cache, rate limiting, session state | Hot lookups and guardrails |
| Kafka | Streaming ingestion | Telemetry pipelines |
# ── INGESTION (runs continuously) ──────────────
consumer = KafkaConsumer(
topics=["endpoint.events.*"],
group_id="telemetry-ingest",
)
async def ingest_loop():
async for batch in consumer.stream(
max_batch=10_000,
timeout_ms=500,
):
# Validate & normalise at stream edge
events = [
TelemetryEvent.model_validate(raw)
for raw in batch
]
normalized = await enrich_pipeline(events)
# Dual-write: structured + semantic
await clickhouse.bulk_insert(
table="telemetry",
rows=normalized,
)
embeddings = await embed(normalized)
await vector_db.upsert(embeddings)
await consumer.commit()
# ── PURPLE AI QUERY (on-demand) ────────────────
@app.post("/purple/query")
async def purple_query(
req: QueryRequest,
ctx: AppContext = Depends(get_context),
) -> StreamingResponse:
# 1. Retrieve grounding evidence
tel_rows = await clickhouse.query(
sql=build_telemetry_sql(req),
time_window="24h",
limit=200,
)
rag_hits = await vector_db.search(
query=req.query,
top_k=50,
)
context = build_context(
tel_rows, rag_hits, max_tokens=8_192
)
# 2. Build grounded prompt
messages = [
SystemMessage(PURPLE_SYSTEM_PROMPT),
UserMessage(req.query, context),
]
# 3. Stream tokens back via SSE
async def token_stream():
async for chunk in ctx.llm.stream(
messages, model="purple-ai-v2"
):
yield f"data: {chunk.json()}\n\n"
return StreamingResponse(
token_stream(),
media_type="text/event-stream",
)
| Protocol | Layer | Role in enterprise MCP |
|---|---|---|
| W3C Trace Context | Observability | traceparent / tracestate headers propagated across every tool call hop |
| OpenTelemetry (OTEL) | Telemetry SDK | Spans, metrics, and logs emitted from every MCP tool invocation |
| OAuth 2.0 / OIDC | Identity | Bearer token scoping which tools and resources an agent may call |
| mTLS / X.509 | Transport auth | Mutual TLS for verified service-to-service MCP identities |
| HMAC-SHA256 signing | Integrity | Tool call payload signing for non-repudiation and tamper detection |
| CEF / JSON audit logs | Compliance | Tamper-evident structured events for SIEM ingestion and SOC 2 |
| IETF Rate-Limit headers | Quota | X-RateLimit-* transparency so tool consumers can self-throttle |
# traceparent: {version}-{trace-id}-{parent-id}-{flags}
# ─────────────────────────────────────────────────────────
# version : 00 (W3C spec version, always 00)
# trace-id : 4bf92f3577b34da6a3ce929d0e0e4736 (128-bit, IMMUTABLE across all hops)
# parent-id: 00f067aa0ba902b7 (64-bit, NEW value at every hop)
# flags : 01 (01 = sampled, 00 = not sampled)
traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
tracestate = "s1=purple-ai,sentinel=v2" # vendor-specific extensions
# Inject + propagate in FastAPI middleware
from opentelemetry.propagate import extract, inject
class TraceContextMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
ctx = extract(request.headers) # read inbound traceparent
with tracer.start_as_current_span(
"mcp.tool.call",
context=ctx,
) as span:
span.set_attribute("mcp.tool", request.url.path)
span.set_attribute("tenant.id", request.headers.get("x-tenant-id", ""))
outbound: dict[str, str] = {}
inject(outbound) # write NEW traceparent (same trace-id, new parent-id)
request.state.trace_headers = outbound
return await call_next(request)
model_validate() rejects bad input before Depends or handler execute — no wasted I/O.finally block is the implicit context manager.response_model serialisation — schema closes the loop at both ends.from typing import Literal
from fastapi import Depends
from pydantic import BaseModel
# ── Schema: BOTH input gate (stage 5) AND output gate (stage 8) ──────────
class ThreatRequest(BaseModel): # stage 5 — validated before handler runs
tenant_id: str
severity: Literal["high", "critical"]
class ThreatResponse(BaseModel): # stage 8 — serialised after handler returns
alert_id: str
risk_score: float
# ── Context Manager inside Depends: yields into handler, cleans up after send ──
async def get_db_session():
session = await db_pool.acquire() # stage 6: resource acquired
try:
yield session # <── handler body runs between yield and finally
finally:
await session.close() # stage 8: cleanup AFTER response is already on wire
# ── Route: response_model= closes the schema loop ────────────────────────
@app.post("/threats", response_model=ThreatResponse)
async def create_threat(
req: ThreatRequest, # Pydantic validates at stage 5
db: Session = Depends(get_db_session), # generator injected at stage 6
) -> ThreatResponse:
result = await db.save_threat(req)
return ThreatResponse(alert_id=result.id, risk_score=result.score)
finally block is the Context Manager — it runs after the response bytes are already on the wire, freeing the DB connection without blocking the client.Test your recall across Pydantic, Context Managers, AsyncIO, FastAPI, LLM Systems, RAG & Evals, Integrations, and Cloud & Ops.
Here is your breakdown by topic. Focus revision on any section below 80%.