Technical Revision Deck
Senior AI Software Engineer - Interview Revision
Inter body JetBrains Mono code fade-up 0.3s
fade-up 0.3s Slide 01
SentinelOne Revision

Senior AI Software Engineer - Interview Revision

Python · FastAPI · LLM Systems · Production AI at SentinelOne

This deck ties backend Python, service architecture, evaluation, and operational ownership into one interview narrative.
AI

Purple AI

LLM-guided analyst workflows and agentic assistance.

SI

AI-SIEM

Telemetry ingestion, fusion, and alert correlation at scale.

EP

Singularity Endpoint

Agent-based protection and local detection surfaces.

CL

Singularity Cloud

Cloud workload telemetry and posture-aware analytics.

ID

Singularity Identity

Identity risk, access anomalies, and response automation.

Royalty-free photo: Unsplash
Agenda

What we are covering today

fade-up 0.3s Slide 02
15-11
Pydantic & Data Models
Contracts
212-15
Context Managers & Generators
Resource control
316-22
AsyncIO & Concurrency
Throughput
423-29
FastAPI Production
Service design
530-34
LLM & Agentic Systems
Reliability
635-38
RAG & Eval Pipelines
Grounding
739-41
Service Integrations
Interfaces
842-46
Cloud & Observability
Operations
947-48
MCP Audit Protocols
W3C · OTEL · OAuth
1049-50
FastAPI Request Lifecycle
8 stages
SentinelOne Context

Why architecture choices matter here

fade-up 0.3sSlide 03
flowchart LR purple[Purple AI\nLLM layer] -->|reasoning requests| siem[AI-SIEM\npetabytes of telemetry] endpoint[Singularity Endpoint] -->|detections| siem cloud[Singularity Cloud] -->|runtime signals| siem identity[Singularity Identity] -->|identity anomalies| siem siem -->|sub-second response SLAs| purple purple --> analyst[Security analyst workflows]
This is the system you are building services for.
  • Inference is only one layer. Data quality, rate control, and latency shape the final user experience.
  • Every service sits between huge telemetry volumes and analyst-facing response surfaces.
  • Architecture mistakes amplify fast when downstream alerts, streaming, and response automation depend on them.
PB+
Telemetry scale
<1s
Response pressure
24x7
Operational reality
Engineering Scope

End-to-end ownership at SentinelOne

fade-up 0.3sSlide 04

Design Doc

Clarify APIs, data contracts, failure modes, and blast radius before code exists.

Build

Ship typed services, clear boundaries, and operationally safe defaults.

Test

Exercise unit logic, integrations, and evaluation harnesses for AI behavior.

Deploy

Use controlled rollout, health signals, and rollback-ready releases.

Observe

Read traces, errors, token costs, throughput, and analyst feedback.

Iterate

Close the loop with metrics, PR review, on-call learning, and follow-up fixes.

Design docs PR reviews Eval harnesses Deployment ownership On-call response
Section B · Pydantic

Why Pydantic: the problem it actually solves

fade-up 0.3sSlide 05
Without Pydantic - the old way
Python
def validate_threat_event(payload: dict) -> dict:
    if "event_id" not in payload:
        raise ValueError("missing event_id")
    if not isinstance(payload["event_id"], str):
        raise TypeError("event_id must be a string")
    if "severity" not in payload or payload["severity"] not in {"low", "medium", "high", "critical"}:
        raise ValueError("invalid severity")
    if "risk_score" not in payload:
        raise ValueError("missing risk_score")
    payload["risk_score"] = float(payload["risk_score"])
    if not 0 <= payload["risk_score"] <= 100:
        raise ValueError("risk_score out of range")
    if "source_ip" not in payload or payload["source_ip"].count(".") != 3:
        raise ValueError("invalid source_ip")
    if "context" not in payload or "tenant_id" not in payload["context"]:
        raise ValueError("missing nested context.tenant_id")
    return payload
Every new field = more if-blocks. No consistency. No reuse. Breaks silently.
With Pydantic
Python
from pydantic import BaseModel, ConfigDict, field_validator

class ThreatContext(BaseModel):
    tenant_id: str

class ThreatEvent(BaseModel):
    model_config = ConfigDict(extra="forbid")
    event_id: str
    severity: str
    risk_score: float
    source_ip: str
    context: ThreatContext

    @field_validator("severity")
    @classmethod
    def validate_severity(cls, value: str) -> str:
        allowed = {"low", "medium", "high", "critical"}
        if value not in allowed:
            raise ValueError("invalid severity")
        return value

model = ThreatEvent(**raw_data)
Define your structure once using familiar Python syntax. Pydantic validates, coerces, and returns structured errors without isinstance chains.
Pydantic does not replace your logic - it removes the boilerplate that surrounds it.
Validation at boundaries

Pydantic: the contract layer for AI services

fade-up 0.3sSlide 06
  • Runtime validation catches malformed inputs before business logic or LLM prompts see them.
  • Serialization keeps service responses predictable across internal consumers.
  • Schema generation feeds OpenAPI, config validation, and typed contracts for downstream teams.
  • Structured LLM output parsing converts vague text into typed response objects.
Every field validated before it reaches your LLM.
flowchart LR raw[Inbound threat event JSON] --> model[Pydantic model] model --> typed[Validated Python object] typed --> service[Downstream service] service --> llm[Prompt / tool layer]
Pydantic v2

BaseModel, field validators, and Config

fade-up 0.3sSlide 07
Python
from datetime import datetime, timezone
from typing import Literal
from uuid import UUID

from pydantic import BaseModel, ConfigDict, IPvAnyAddress, field_validator

class AlertContext(BaseModel):
    tenant_id: str
    source: str

class ThreatEvent(BaseModel):
    model_config = ConfigDict(frozen=True, extra="forbid")

    event_id: UUID
    severity: Literal["low", "medium", "high", "critical"]
    timestamp: datetime
    source_ip: IPvAnyAddress
    context: AlertContext

    @field_validator("timestamp")
    @classmethod
    def must_be_timezone_aware(cls, value: datetime) -> datetime:
        if value.tzinfo is None or value.tzinfo.utcoffset(value) is None:
            raise ValueError("timestamp must include timezone")
        return value.astimezone(timezone.utc)
UUID gives a strict event identifier contract.
Literal constrains severity to known levels.
IPvAnyAddress parses and validates IPv4 or IPv6 input.
Nested submodels keep context typed instead of raw dicts.
ConfigDict(frozen=True) makes records immutable after validation.
Domain models

Modelling Singularity domain objects

fade-up 0.3sSlide 08
Python
from datetime import datetime
from enum import Enum

from pydantic import BaseModel, ConfigDict, Field

class OSType(str, Enum):
    linux = "linux"
    windows = "windows"
    macos = "macos"

class EndpointAgent(BaseModel):
    model_config = ConfigDict(frozen=True)
    agent_id: str
    hostname: str
    os_type: OSType
    threat_score: float = Field(ge=0, le=100)
    tags: list[str] = []
    last_seen: datetime

class AgentHeartbeat(BaseModel):
    model_config = ConfigDict(frozen=True)
    heartbeat_id: str
    status: str
    agent: EndpointAgent
    received_at: datetime
Frozen models = immutable telemetry records.
  • Enums express limited operating system choices directly in type signatures.
  • Field constraints make risk score boundaries explicit and testable.
  • Nested references let one validated domain object travel through several services unchanged.
Pydantic AI

Structured LLM responses with Pydantic AI

fade-up 0.3sSlide 09
Python
from typing import Literal

from pydantic import BaseModel
from pydantic_ai import Agent

class ThreatSummary(BaseModel):
    severity: Literal["low", "medium", "high", "critical"]
    affected_hosts: list[str]
    recommended_action: Literal[
        "monitor", "isolate-host", "rotate-credentials", "open-investigation"
    ]

triage_agent = Agent(
    "openai:gpt-4.1",
    result_type=ThreatSummary,
    system_prompt="Summarise threat telemetry for SOC analysts.",
)

result = await triage_agent.run("Summarise this endpoint incident")
summary = result.output
The agent guarantees the LLM returns valid structured data - no manual JSON parsing.
  • Reduce fragile regex or schema-guessing code.
  • Push malformed output into validation errors you can log and retry safely.
  • Keep downstream action routing typed and auditable.
Tradeoffs

Choosing the right data container

fade-up 0.3sSlide 10
OptionBest forWhy
DataclassesInternal compute objects, speed-critical loopsNo boundary validation overhead, simple memory shape.
Pydantic BaseModelAPI requests, responses, config, LLM outputsRich validation, serialization, schema, and error reporting.
Pydantic dataclassMiddle ground with lightweight validationUseful when dataclass ergonomics matter but some validation is needed.
Verdict: Pydantic at boundaries, dataclasses internally.

Critical: Pydantic dataclasses are NOT a replacement for BaseModel.

  • Initialisation hooks differ. Use __post_init__ instead of model_post_init, and validator inheritance differs.
  • JSON serialisation is not equivalent. model_dump_json and schema generation have gaps compared to BaseModel.
  • Mypy type-checking requires the Pydantic Mypy plugin after v1.2 or incorrect types can pass static checks.
mypy.ini
[mypy]
plugins = pydantic.mypy
Add this to CI or type-safety is false confidence.
Boundary to runtime

Validated models flowing into managed resources

fade-up 0.3sSlide 11
flowchart LR req[HTTP request] --> validate[Pydantic RequestModel validation] validate --> di[FastAPI dependency injection] di --> ctx[Context manager opens DB or LLM session] ctx --> handler[Handler receives typed object] handler --> resp[Pydantic ResponseModel serialization]
Section C · Resource Patterns

Context managers: deterministic resource control

fade-up 0.3sSlide 12
Class-based
class AuditSession:
    def __enter__(self):
        self.conn = open_audit_channel()
        return self.conn

    def __exit__(self, exc_type, exc, tb):
        self.conn.close()
        return False
Generator-based
from contextlib import contextmanager

@contextmanager
def audit_session():
    conn = open_audit_channel()
    try:
        yield conn
    finally:
        conn.close()
asynccontextmanager for async resources - critical in FastAPI.
Async resources

Managing LLM client sessions and DB connections

fade-up 0.3sSlide 13
Python
from contextlib import asynccontextmanager

import httpx
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with httpx.AsyncClient(timeout=10.0) as client:
        app.state.http_client = client
        app.state.llm_client = build_llm_client(client)
        yield

app = FastAPI(lifespan=lifespan)
lifespan creates client once at startup, yields app state, tears down on shutdown - not per-request.
  • Preserves TCP connection reuse.
  • Stops repeated auth and TLS overhead.
  • Provides one composition root for shared infra clients.
Streaming

Generators power streaming AI output

fade-up 0.3sSlide 14
Python
from fastapi.responses import StreamingResponse

async def stream_llm_tokens(prompt: str):
    async for chunk in llm_client.stream(prompt):
        yield f"data: {chunk.text}\n\n"

@router.get("/analysis/stream")
async def stream_analysis(prompt: str):
    return StreamingResponse(
        stream_llm_tokens(prompt),
        media_type="text/event-stream",
    )
flowchart LR llm[LLM stream] --> gen[Async generator] gen --> sse[StreamingResponse] sse --> client[SSE client]
Purple AI uses streaming to deliver real-time threat analysis - you must own this pattern.
Pipelines

Memory-efficient event processing with generators

fade-up 0.3sSlide 15
Python
def read_events(lines):
    for line in lines:
        yield json.loads(line)

def parse_events(records):
    for record in records:
        yield ThreatEvent.model_validate(record)

def enrich_with_context(events):
    for event in events:
        yield event, load_context(event.event_id)

batch_write(enrich_with_context(parse_events(read_events(stream))))
Processes billions of endpoint events without loading all into memory.
  • Each pipeline stage stays single-purpose and testable.
  • Backpressure is easier to reason about than with giant in-memory lists.
  • Pairs well with batch writers, chunked uploads, and telemetry fan-in.
Section D · AsyncIO

How Python asyncio works

fade-up 0.3sSlide 16
flowchart TD loop((Event loop)) --> task1[Task A] loop --> task2[Task B] loop --> io[I/O callback] task1 --> await1[await socket read] task2 --> await2[await http response] await1 --> loop await2 --> loop io --> loop
Coroutines are cooperative.
I/O yields control back to the loop.
No GIL contention for I/O-bound work on the same thread.

AsyncIO is about hiding wait time, not making CPU work magically parallel.

Syntax essentials

async def, await, and coroutine objects

fade-up 0.3sSlide 17
Python
async def fetch_signal(signal_id: str) -> dict:
    return await api_client.get(signal_id)

async def fetch_all(ids: list[str]) -> list[dict]:
    first = await fetch_signal(ids[0])
    rest = await asyncio.gather(*(fetch_signal(i) for i in ids[1:]))
    asyncio.create_task(write_audit_log(ids))
    return [first, *rest]

Coroutine object

Created when async function is called, executed when awaited or scheduled.

await

Suspends current coroutine until awaited work completes.

gather

Run many awaitables concurrently and aggregate results.

create_task

Schedule work independently, but track lifecycle deliberately.

Concurrency patterns

Running concurrent AI calls

fade-up 0.3sSlide 18
asyncio.gather
summaries = await asyncio.gather(
    *(call_llm(event) for event in events)
)
TaskGroup
results = []
async with asyncio.TaskGroup() as tg:
    tasks = [tg.create_task(call_llm(event)) for event in events]

for task in tasks:
    results.append(task.result())
TaskGroup cancels all siblings on first failure - safer for production workflows with shared context.
Rate control

Protecting LLM API rate limits

fade-up 0.3sSlide 19
Python
semaphore = asyncio.Semaphore(10)

async def score_event(event: ThreatEvent) -> ThreatSummary:
    async with semaphore:
        return await llm_gateway.score(event)

results = await asyncio.gather(*(score_event(event) for event in events))
flowchart LR q[Request queue] --> gate[Semaphore gate\nmax_concurrent = 10] gate --> api[LLM API]
SentinelOne processes millions of events; you cannot hammer the LLM API.
CPU-bound escapes

Mixing sync and async: loop.run_in_executor

fade-up 0.3sSlide 20
Python
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

async def scan_binary(binary: bytes) -> ScanResult:
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        executor,
        cpu_heavy_fn,
        binary,
    )
ML inference, regex scanning on raw binaries, and legacy SDK calls should not block the event loop.
Deferred side effects

Fire-and-forget with BackgroundTasks

fade-up 0.3sSlide 21
Python
from fastapi import BackgroundTasks

@router.post("/alerts/{event_id}/notify")
async def notify(event_id: str, background_tasks: BackgroundTasks):
    event = await service.get_event(event_id)
    background_tasks.add_task(send_alert_webhook, event)
    return {"status": "queued", "event_id": event_id}
Response returns immediately; webhook fires after. Use for non-critical side-effects only.
AsyncIO in context

Concurrency in a threat-detection pipeline

fade-up 0.3sSlide 22
flowchart LR telemetry[Inbound endpoint telemetry] --> consumer[Async Kafka consumer] consumer --> score[await LLM threat scoring] consumer --> rules[await rule-engine check] consumer --> graph[await graph DB enrichment] score --> join[asyncio.gather] rules --> join graph --> join join --> aggregate[Aggregate result] aggregate --> alert[Alert dispatch]
Section E · FastAPI

Organising a production FastAPI service

fade-up 0.3sSlide 23
flowchart TD app[app/] --> main[main.py] app --> api[api/routes/] app --> core[core/config.py] app --> domain[models/domain.py] app --> api_models[models/api.py] app --> services[services/] app --> deps[dependencies.py] app --> life[lifespan.py]
Separation of domain models from API schemas is non-negotiable.
  • Keep transport concerns away from internal business objects.
  • Move config and dependency builders out of route modules.
  • Preserve a clean testing seam around service functions.
Startup and shutdown

lifespan: managing shared resources

fade-up 0.3sSlide 24
Python
@asynccontextmanager
async def lifespan(app: FastAPI):
    async with httpx.AsyncClient() as http_client:
        redis = await redis.from_url(settings.redis_url)
        llm = build_llm_gateway(http_client)
        app.state.http_client = http_client
        app.state.redis = redis
        app.state.llm = llm
        yield
        await redis.close()
Never create clients per-request - connection overhead will kill throughput.
Dependency injection

FastAPI Depends(): the composition root

fade-up 0.3sSlide 25
Python
def get_llm_client(request: Request) -> LLMGateway:
    return request.app.state.llm

@router.post("/triage")
async def triage(
    payload: ThreatEventRequest,
    db: AsyncSession = Depends(get_db_session),
    user: User = Depends(get_current_user),
    llm: LLMGateway = Depends(get_llm_client),
):
    return await service.triage(payload, db, user, llm)
flowchart TD route[Route handler] --> dep1[get_db_session] route --> dep2[get_current_user] route --> dep3[get_llm_client] dep3 --> state[app.state.llm]
Testability is the main benefit - swap real clients for mocks in test overrides.
Schemas

API schema vs domain model separation

fade-up 0.3sSlide 26
ThreatEventRequest
API schema
class ThreatEventRequest(BaseModel):
    eventId: str | None = None
    sourceIp: str | None = None
    hostName: str | None = None
    rawPayload: dict | None = None
ThreatEvent
Domain model
class ThreatEvent(BaseModel):
    model_config = ConfigDict(frozen=True)
    event_id: UUID
    source_ip: IPvAnyAddress
    hostname: str
    payload: ThreatPayload
Mapper
def to_domain_model(request: ThreatEventRequest) -> ThreatEvent:
    return ThreatEvent(
        event_id=request.eventId,
        source_ip=request.sourceIp,
        hostname=request.hostName,
        payload=request.rawPayload,
    )
API shape and business logic shape evolve independently.
Error handling

Structured error responses at scale

fade-up 0.3sSlide 27
Python
class ErrorResponse(BaseModel):
    code: str
    message: str
    trace_id: str

@app.exception_handler(ServiceUnavailableError)
async def service_unavailable_handler(request: Request, exc: ServiceUnavailableError):
    payload = ErrorResponse(
        code="upstream_unavailable",
        message=str(exc),
        trace_id=request.state.trace_id,
    )
    return JSONResponse(status_code=503, content=payload.model_dump())

raise HTTPException(status_code=404, detail={"code": "not_found", "entity": "event"})
Clients, including internal services, must be able to parse your errors programmatically.
Middleware

Middleware stack in a SentinelOne service

fade-up 0.3sSlide 28
flowchart TD req[Request] --> auth[Auth middleware\nJWT validation] auth --> trace[Trace middleware\ninject trace_id] trace --> log[Logging middleware\nstructured JSON log] log --> rate[Rate-limit middleware] rate --> route[Route handler] route --> resp[Response]
Python
trace_id_var: ContextVar[str] = ContextVar("trace_id")

@app.middleware("http")
async def inject_trace_id(request: Request, call_next):
    trace_id = request.headers.get("x-trace-id", str(uuid4()))
    request.state.trace_id = trace_id
    token = trace_id_var.set(trace_id)
    try:
        response = await call_next(request)
        response.headers["x-trace-id"] = trace_id
        return response
    finally:
        trace_id_var.reset(token)
Testing

Test strategy: unit, integration, contract

fade-up 0.3sSlide 29

Unit tests

Mock dependencies and assert business decisions, branching, and mapping logic.

Integration tests

Use TestClient and overrides against real adapters such as Dockerized Postgres or Redis.

Contract tests

Validate request and response shapes against OpenAPI and consumer expectations.

Python
app.dependency_overrides[get_db] = get_test_db
client = TestClient(app)

response = client.post("/triage", json=payload)
assert response.status_code == 200
Section F · LLM Systems

Wrapping LLMs for production reliability

fade-up 0.3sSlide 30
flowchart LR service[FastAPI service] --> gateway[LLM gateway\nrate limit + routing + cost tracking] gateway --> openai[OpenAI] gateway --> anthropic[Anthropic] gateway --> internal[Internal model]
Timeout Retry with backoff Fallback model Structured output validation
Context engineering

What goes in the context window matters

fade-up 0.3sSlide 31
System promptRetrieved contextConversation historyUser queryTools

Fixed

System prompt and tool definitions.

Dynamic

RAG context and short memory window.

Budget

Trade off recall, recency, and action space.

Context engineering is the main lever for LLM output quality.
Agentic loop

Building a threat-analysis agent

fade-up 0.3sSlide 32
flowchart LR q[User query] --> agent[Agent] agent --> llm[LLM] llm --> tool[Tool call] tool --> db[Threat DB / CVE / Endpoint graph] db --> agent agent --> llm llm --> final[Structured Pydantic response]
Python
@agent.tool
async def lookup_cve(cve_id: str) -> CVERecord:
    return await cve_client.fetch(cve_id)

@agent.tool
async def search_endpoint(hostname: str) -> EndpointRecord:
    return await graph_client.lookup_host(hostname)

result = await agent.run("Investigate suspicious PowerShell activity")
MCP integration

Connecting agents to SentinelOne APIs via MCP

fade-up 0.3sSlide 33
flowchart LR purple[Purple AI agent] --> mcp[MCP server] mcp --> endpoint[Endpoint data API] mcp --> intel[Threat intelligence API] mcp --> vuln[Vulnerability management API]
Python
@tool
async def list_compromised_hosts(campaign_id: str) -> list[str]:
    return await singularity_client.list_hosts(campaign_id)

@tool
async def fetch_indicator(indicator: str) -> IndicatorRecord:
    return await singularity_client.get_indicator(indicator)
MCP is explicitly mentioned in the job spec - know it.
Reliability

Making agents production-grade

fade-up 0.3sSlide 34

Max iteration limits

Prevent infinite loops and exploding latency.

Deterministic tool routing

Reduce ambiguity when the action surface is safety-sensitive.

Human checkpoints

Gate host isolation or high-severity actions behind review.

Full observability

Trace LLM calls, tool invocations, tokens, and retries end to end.

Section G · RAG & Evals

Retrieval-augmented generation for threat intel

fade-up 0.3sSlide 35
flowchart LR docs[Threat intel docs] --> chunk[Chunking strategy] chunk --> embed[Embedding model] embed --> vdb[Vector DB] question[User question] --> qembed[Embed query] qembed --> ann[ANN search] vdb --> ann ann --> topk[Top-k chunks] topk --> llm[Inject into LLM context] llm --> grounded[Grounded answer]
Chunking and embeddings

Getting retrieval right

fade-up 0.3sSlide 36

Chunking options

  • Fixed-size chunks are simple and cheap, but often split meaning.
  • Sentence-boundary chunks preserve local coherence.
  • Semantic chunking improves recall when documents are structurally uneven.

Embedding choices

  • OpenAI for strong default quality and operational simplicity.
  • Cohere for high-quality retrieval-focused embeddings.
  • Local models when cost, data residency, or control dominate.
Garbage in, garbage out - bad chunking is the most common RAG failure mode.
Evaluation

How do you know your AI feature works?

fade-up 0.3sSlide 37

Offline evals

Golden datasets and automated scoring to anchor improvements.

LLM-as-judge

Critic models can scale comparative review with clear prompts.

Regression tests

Run on every deploy to catch prompt, retriever, or tool drift.

Human feedback

Feed analyst labels back into prioritization and fine-tuning loops.

Python
class EvalResult(BaseModel):
    query_id: str
    grounded: bool
    score: float

result = EvalResult(query_id="q-102", grounded=True, score=0.91)
Observability

What to instrument in an AI service

fade-up 0.3sSlide 38
MetricWhy it mattersTooling
Latency percentilesMeasure user-visible delay per model and route.OpenTelemetry, Grafana
Token input and outputTrack cost, prompt bloat, and response size.LangSmith, vendor telemetry
Cost per requestPrevent silent spend expansion.Billing tags, warehouse exports
Tool call successCatch broken integrations quickly.Trace spans, structured logs
Retrieval precision@kEvaluate grounding quality.Arize Phoenix, internal eval harness
Human feedback scoreLink output quality to analyst reality.Feedback pipeline
You cannot improve what you cannot measure.
Section H · Integrations

Resilient inter-service communication

fade-up 0.3sSlide 39

REST

Retry, backoff, timeout, and circuit breaker for straightforward APIs.

gRPC

High-throughput internal services where schema control and efficiency matter.

Kafka

Event-driven pipelines for telemetry fan-in and asynchronous processing.

PatternUse when
RESTHuman-readable APIs, moderate volume, broad compatibility.
gRPCStrict contracts and high-throughput internal RPC.
KafkaLoose coupling, replay, and streaming workloads.
Data layers

Picking the right datastore

fade-up 0.3sSlide 40
StoreBest forSingularity use case
PostgresStructured, transactional dataAgent state, audits, workflow records
DynamoDB / MongoFlexible schemas, high write throughputEvent metadata and enrichment caches
Vector DBSemantic retrievalThreat intel RAG
RedisCache, rate limiting, session stateHot lookups and guardrails
KafkaStreaming ingestionTelemetry pipelines
CI/CD

From code to production at SentinelOne

fade-up 0.3sSlide 41
flowchart LR pr[Pull request] --> gha[GitHub Actions\nlint + test + build] gha --> ecr[Push image to ECR] ecr --> argo[ArgoCD detects image] argo --> k8s[Kubernetes rolling deploy] k8s --> health[Health check] health --> infra[Terraform-managed infrastructure]
Docker multi-stage build Helm chart ArgoCD app-of-apps
Cloud patterns

AWS managed services for AI workloads

fade-up 0.3sSlide 42
flowchart LR api[API Gateway] --> eks[EKS\nFastAPI pods] eks --> sqs[SQS queue] sqs --> lambda[Lambda enrichment] lambda --> model[Bedrock or SageMaker] eks --> rds[RDS Postgres] eks --> redis[ElastiCache Redis] eks --> search[OpenSearch vector]
API Gateway for edge control EKS for long-lived services Managed stateful services where ops burden should stay low
Closing architecture

How it all connects

fade-up 0.3sSlide 43
flowchart LR telemetry[Inbound telemetry] --> kafka[Async Kafka consumer] kafka --> validate[Pydantic validation] validate --> fastapi[FastAPI service\ndepends + lifespan + background tasks] fastapi --> agent[LLM agent\nPydantic AI + tools + MCP] agent --> rag[RAG retrieval\nvector DB] rag --> response[Structured Pydantic response] response --> observe[Observability layer] observe --> alert[Alert dispatch]
You own this end to end.
Industry Architecture

Petabyte telemetry ingestion to Purple AI streaming

fade-up 0.3sSlide 44
flowchart TD subgraph src[" Data Sources "] ep[Singularity Endpoint\n50k+ agents] cl[Singularity Cloud\nruntime signals] id[Singularity Identity\nanomalies] end kafka["Kafka\n~1M events / sec · multi-partition"] flink["Flink\nnormalise · enrich · dedupe · validate"] subgraph store[" Petabyte Store "] chdb[(ClickHouse\ntime-series telemetry)] vdb[(Vector DB\nRAG embeddings)] end gateway["Purple AI Gateway\nFastAPI orchestrator"] llm["LLM Inference\nGPT-4o / fine-tuned"] analyst["Security Analyst\nbrowser / SIEM console"] ep -->|detections & events| kafka cl -->|runtime signals| kafka id -->|identity anomalies| kafka kafka --> flink flink -->|structured rows| chdb flink -->|chunk embeddings| vdb analyst -->|natural-language query| gateway gateway -->|telemetry context lookup| chdb gateway -->|semantic retrieval| vdb gateway -->|grounded prompt| llm llm -->|SSE token stream| analyst
Two parallel tracks: continuous ingestion and on-demand reasoning share the same petabyte store.
  • Kafka decouples endpoint agents from processing — back-pressure never reaches the source.
  • Flink enforces Pydantic-compatible schemas in the stream before rows ever land in storage.
  • ClickHouse serves sub-second aggregation queries over billions of telemetry rows.
  • Vector DB enables Purple AI to retrieve semantically-relevant evidence for the LLM context window.
  • SSE streams first token in <800 ms even while the full reasoning chain is still building.
PB+
Storage scale
1M/s
Ingest rate
<800ms
First token
Pseudo Code

Purple AI streaming pipeline — pseudo code

fade-up 0.3sSlide 45
Ingestion pipeline
# ── INGESTION (runs continuously) ──────────────
consumer = KafkaConsumer(
    topics=["endpoint.events.*"],
    group_id="telemetry-ingest",
)

async def ingest_loop():
    async for batch in consumer.stream(
        max_batch=10_000,
        timeout_ms=500,
    ):
        # Validate & normalise at stream edge
        events = [
            TelemetryEvent.model_validate(raw)
            for raw in batch
        ]
        normalized = await enrich_pipeline(events)

        # Dual-write: structured + semantic
        await clickhouse.bulk_insert(
            table="telemetry",
            rows=normalized,
        )
        embeddings = await embed(normalized)
        await vector_db.upsert(embeddings)

        await consumer.commit()
Purple AI streaming query handler
# ── PURPLE AI QUERY (on-demand) ────────────────
@app.post("/purple/query")
async def purple_query(
    req: QueryRequest,
    ctx: AppContext = Depends(get_context),
) -> StreamingResponse:

    # 1. Retrieve grounding evidence
    tel_rows  = await clickhouse.query(
        sql=build_telemetry_sql(req),
        time_window="24h",
        limit=200,
    )
    rag_hits  = await vector_db.search(
        query=req.query,
        top_k=50,
    )
    context = build_context(
        tel_rows, rag_hits, max_tokens=8_192
    )

    # 2. Build grounded prompt
    messages = [
        SystemMessage(PURPLE_SYSTEM_PROMPT),
        UserMessage(req.query, context),
    ]

    # 3. Stream tokens back via SSE
    async def token_stream():
        async for chunk in ctx.llm.stream(
            messages, model="purple-ai-v2"
        ):
            yield f"data: {chunk.json()}\n\n"

    return StreamingResponse(
        token_stream(),
        media_type="text/event-stream",
    )
Ingestion and query paths are fully decoupled — the stream never blocks analyst responses and the query path never stalls ingest.
Data Architecture

Telemetry pipeline across data architectures

fade-up 0.3sSlide 46
flowchart TD SRC["Endpoint · Cloud · Identity · Network"] ING["Kafka / OTEL · 1M events / sec"] PROC["Flink — validate · enrich · dedupe"] subgraph LAMBDA["Lambda Architecture"] SPD[("ClickHouse\nspeed layer")] --> SRV["Serving Layer\nquery router"] BAT[("S3 Parquet\nbatch layer")] --> SRV end subgraph KAPPA["Kappa Architecture"] KS["Replayable Kafka stream\nFlink — single processing path"] end subgraph LAKE["Data Lakehouse"] RZ[("Raw Zone · S3")] --> CZ[("Curated · Delta / Iceberg")] --> DM["Athena / DuckDB"] end subgraph VECAI["AI / Vector Architecture"] EMB["Embed events"] --> VS[("Vector Store\nPinecone / pgvector")] --> LLM["RAG → Purple AI"] end SRC --> ING --> PROC PROC -->|"sub-second alerts"| LAMBDA PROC -->|"stream-only path"| KAPPA PROC -->|"durable archival"| LAKE PROC -->|"semantic embeddings"| VECAI
  • Lambda — dual-track: a speed layer answers sub-second SIEM queries while a batch layer rebuilds full history for ML re-training.
  • Kappa — stream-only: every event is replayable from Kafka, removing batch complexity at the cost of longer retention storage.
  • Lakehouse — raw events land in S3, curated into Delta/Iceberg tables, then queried via Athena or DuckDB for compliance and ML feature stores.
  • AI / Vector — events are embedded at ingest time; vector search enables RAG grounding for Purple AI context windows.
All four paths share the same Kafka + Flink ingest core — architecture diverges only at the storage write.
4
Architecture patterns
1
Shared ingest core
Section I · MCP Protocols

Enterprise MCP: auditing protocol standards

fade-up 0.3sSlide 47
ProtocolLayerRole in enterprise MCP
W3C Trace ContextObservabilitytraceparent / tracestate headers propagated across every tool call hop
OpenTelemetry (OTEL)Telemetry SDKSpans, metrics, and logs emitted from every MCP tool invocation
OAuth 2.0 / OIDCIdentityBearer token scoping which tools and resources an agent may call
mTLS / X.509Transport authMutual TLS for verified service-to-service MCP identities
HMAC-SHA256 signingIntegrityTool call payload signing for non-repudiation and tamper detection
CEF / JSON audit logsComplianceTamper-evident structured events for SIEM ingestion and SOC 2
IETF Rate-Limit headersQuotaX-RateLimit-* transparency so tool consumers can self-throttle
flowchart LR A["LLM Agent"] -->|"1. traceparent injected"| B["OTEL Middleware"] B -->|"2. Bearer verified"| C["OAuth 2.0 Gate"] C -->|"3. client cert checked"| D["mTLS Layer"] D -->|"4. payload signed"| E["HMAC-SHA256"] E -->|"5. tool runs"| F["MCP Tool"] F -->|"6. audit event"| G["CEF Log\nSIEM"] F -->|"7. span closed"| H["OTEL Collector\nJaeger / Tempo"]
Every enterprise MCP call must be traceable (W3C), authenticated (OAuth / mTLS), integrity-checked (HMAC), and auditable (CEF) before it reaches production.
W3C Trace Context · RFC

W3C Trace Context — traceparent through MCP hops

fade-up 0.3sSlide 48
W3C traceparent anatomy + FastAPI propagation
# traceparent: {version}-{trace-id}-{parent-id}-{flags}
# ─────────────────────────────────────────────────────────
# version  : 00   (W3C spec version, always 00)
# trace-id : 4bf92f3577b34da6a3ce929d0e0e4736  (128-bit, IMMUTABLE across all hops)
# parent-id: 00f067aa0ba902b7  (64-bit, NEW value at every hop)
# flags    : 01   (01 = sampled, 00 = not sampled)

traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
tracestate  = "s1=purple-ai,sentinel=v2"   # vendor-specific extensions

# Inject + propagate in FastAPI middleware
from opentelemetry.propagate import extract, inject

class TraceContextMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        ctx = extract(request.headers)          # read inbound traceparent
        with tracer.start_as_current_span(
            "mcp.tool.call",
            context=ctx,
        ) as span:
            span.set_attribute("mcp.tool", request.url.path)
            span.set_attribute("tenant.id", request.headers.get("x-tenant-id", ""))
            outbound: dict[str, str] = {}
            inject(outbound)                    # write NEW traceparent (same trace-id, new parent-id)
            request.state.trace_headers = outbound
            return await call_next(request)
flowchart LR browser["Analyst\nbrowser"] -->|"traceparent hop 1"| gw["Purple AI\nGateway"] gw -->|"same trace-id\nnew span-id"| mcp["MCP Server"] mcp -->|"traceparent hop 3"| api1["Endpoint API"] mcp -->|"traceparent hop 4"| api2["Threat Intel API"] gw --> otel["OTEL Collector"] mcp --> otel otel --> jaeger["Jaeger / Tempo\nfull trace tree"]
trace-id is immutable across all hops — one Jaeger query reconstructs every service involved in a single analyst request.
128-bit trace-id: immutable 64-bit span-id: new per hop flags byte: sampling decision
Section J · Request Lifecycle

FastAPI request lifecycle — 8 stages

fade-up 0.3sSlide 49
flowchart TD S1["Stage 1: ASGI Receive\nTCP bytes arrive · scope dict created\nmethod · path · headers · body stream"] S2["Stage 2: Middleware Stack\nCORS · auth verification · OTEL\ntraceparent injected into request state"] S3["Stage 3: Route Match\nmethod + path trie lookup\n/threats/{id} → path params extracted"] S4["Stage 4: Request Parse\nbody stream decoded · Content-Type checked\nJSON deserialised to raw dict"] S5["Stage 5: Schema Validation\nPydantic model_validate()\npath · query · headers · body → typed objects"] S6["Stage 6: Depends Resolution\nasync DAG walked · generators yield\nDB session · auth user · rate-limit token injected"] S7["Stage 7: Handler Execution\nawait service.process(validated_params)\nbusiness logic · DB I/O · LLM calls"] S8["Stage 8: Response + Cleanup\nresponse_model.model_dump() → JSON bytes\ngenerator finally blocks · background tasks · ASGI send"] S1 --> S2 --> S3 --> S4 --> S5 --> S6 --> S7 --> S8 S5 -.->|"response_model schema reused"| S8 S6 -.->|"generator teardown after send"| S8
Two hidden back-connections define correctness: the Pydantic schema gates both input (stage 5) and output (stage 8). The Depends generators clean up in the same stage that sends the response.
  • Stage 3 — Route Match: wrong HTTP method returns 405 before any parsing or validation runs.
  • Stage 5 — Schema: model_validate() rejects bad input before Depends or handler execute — no wasted I/O.
  • Stage 6 — Depends: async generators yield once into the handler; the finally block is the implicit context manager.
  • Stage 8 — Cleanup: the same Pydantic model used for input validation also drives response_model serialisation — schema closes the loop at both ends.
8
Pipeline stages
2
Back-connections
405
Before schema runs
Deep Dive

Schema → Depends → Context Manager: the full contract

fade-up 0.3sSlide 50
Python — the three-way contract in one route
from typing import Literal
from fastapi import Depends
from pydantic import BaseModel

# ── Schema: BOTH input gate (stage 5) AND output gate (stage 8) ──────────
class ThreatRequest(BaseModel):          # stage 5 — validated before handler runs
    tenant_id: str
    severity: Literal["high", "critical"]

class ThreatResponse(BaseModel):         # stage 8 — serialised after handler returns
    alert_id: str
    risk_score: float

# ── Context Manager inside Depends: yields into handler, cleans up after send ──
async def get_db_session():
    session = await db_pool.acquire()    # stage 6: resource acquired
    try:
        yield session                    # <── handler body runs between yield and finally
    finally:
        await session.close()            # stage 8: cleanup AFTER response is already on wire

# ── Route: response_model= closes the schema loop ────────────────────────
@app.post("/threats", response_model=ThreatResponse)
async def create_threat(
    req: ThreatRequest,                  # Pydantic validates at stage 5
    db: Session = Depends(get_db_session),  # generator injected at stage 6
) -> ThreatResponse:
    result = await db.save_threat(req)
    return ThreatResponse(alert_id=result.id, risk_score=result.score)
flowchart TD SCHI["ThreatRequest\nPydantic input schema\nstage 5 — validates request"] DEP["get_db_session\nasync generator Depends\nstage 6 — yields resource"] H["Handler body\nawait db.save_threat(req)"] SCHO["ThreatResponse\nresponse_model\nstage 8 — serialises output"] CTX["finally: session.close\nContext Manager teardown\nstage 8 — after ASGI send"] SCHI -->|"validated req obj"| H DEP -->|"db session injected"| H H -->|"return value"| SCHO SCHO --> CTX DEP -.->|"generator resumes"| CTX SCHI -.->|"response_model= reuses class"| SCHO
The Pydantic schema is both the front door (input validation at stage 5) and the back door (output serialisation at stage 8). The Depends generator's finally block is the Context Manager — it runs after the response bytes are already on the wire, freeing the DB connection without blocking the client.
Quiz · All Topics

Interactive Revision Quiz

40 questions · 9 sections Slide 51

40 Questions — 9 Topics

Test your recall across Pydantic, Context Managers, AsyncIO, FastAPI, LLM Systems, RAG & Evals, Integrations, and Cloud & Ops.

Pydantic Context Managers AsyncIO FastAPI LLM Systems RAG & Evals Integrations Cloud & Ops MCP & Protocols