Vai al contenuto principale
La maggior parte dei tool di sicurezza statici trova i bug in isolamento. Scansionano un file, elencano i problemi e proseguono. Il problema è che le vulnerabilità più dannose nelle codebase moderne raramente sono un singolo bug. Sono una catena: una chiave di firma hardcoded più un controllo di autorizzazione mancante più una SQL injection che, presi singolarmente, sembrano tutti gestibili. Insieme costituiscono un percorso di account-takeover. È esattamente il tipo di ragionamento trasversale in cui gli LLM eccellono, se gli dai la giusta struttura. In questo articolo costruiremo un revisore di sicurezza a due agenti usando Python e l’API di Venice AI. Alla fine avrai una CLI da puntare su qualsiasi codebase Python per produrre un report Markdown con findings atomici e catene di exploit. Ti interessa l’implementazione completa? Dai un’occhiata al repository GitHub. Prima di continuare, ti servirà una API key Venice. Esportala come variabile d’ambiente:
export VENICE_API_KEY=<my-key>

Cosa costruiremo

Il revisore è un piccolo progetto Python con poche parti ben definite:
ParteCosa fa
Modelli PydanticDefiniscono Evidence, Finding e Chain e ci forniscono un confine di validazione rigido tra l’LLM e il resto del programma
Client VeniceAvvolge l’SDK Python di OpenAI puntato all’endpoint compatibile con OpenAI di Venice
Mappa AST del repoAttraversa l’albero del target con il modulo ast di Python e costruisce una mappa deterministica dei simboli pubblici e degli archi di import di ogni modulo
Scanner agentLegge un file Python alla volta più una “vicinato” per-file della mappa del repo ed emette findings atomici di vulnerabilità con evidenza file:riga
Chainer agentLegge l’unione dei findings più una mappa completa condensata del repo ed emette catene di exploit che combinano due o più findings
Validatore di riferimentiScarta qualsiasi catena che faccia riferimento a un ID finding che lo Scanner non ha prodotto, o che nomini un file da cui nessuno dei findings referenziati proviene
Report MarkdownRenderizza findings e catene in un report leggibile
CLIMette tutto insieme con Typer
Il flusso è il seguente:
  1. Attraversa la directory target alla ricerca di file .py.
  2. Costruisce una mappa deterministica del repo (import, simboli pubblici, firme).
  3. Per ogni file, invia allo Scanner il suo sorgente più una porzione di vicinato della mappa e raccoglie i findings atomici.
  4. Invia l’unione dei findings più la mappa condensata del repo al Chainer e raccoglie le catene di exploit.
  5. Scarta qualsiasi catena che faccia riferimento a un ID finding non prodotto dallo Scanner, o che nomini un file da cui nessuno dei findings referenziati proviene davvero.
  6. Scrive un report Markdown.
Due scelte di design vanno evidenziate prima di scrivere il codice. La prima è perché due agenti invece di uno. Uno scanner single-agent che prova a fare tutto in un solo prompt deve bilanciare l’essere meticoloso sui bug per-file con l’essere brillante nel ragionamento combinatorio. Dividere il lavoro consente allo Scanner di essere implacabile e rumoroso, e al Chainer di essere selettivo e silenzioso. Aggiungere una sola chiamata LLM in più dedicata alla combinazione dei findings sblocca un’intera classe di bug con pochissimo codice extra. La seconda è perché una mappa del repo. Le codebase reali si distribuiscono su molti file. Un bug che consiste in “il validator viene eseguito ma non è applicato per iterazione nel fetcher, e la risposta del fetcher finisce nel renderer” è invisibile a uno scanner per-file. Prima di qualsiasi chiamata LLM, attraversiamo l’albero target con ast di Python e costruiamo una mappa strutturale. Lo Scanner vede un vicinato per file (chi importa da questo file, cosa importa questo file, firme di quei simboli esterni). Il Chainer vede una mappa condensata completa (ogni modulo, ogni simbolo pubblico, ogni arco di import, nessun sorgente). È la minima quantità di context engineering che abbiamo trovato e che permette al Chainer di costruire catene il cui flusso di dati attraversa i confini dei moduli, senza pagare il costo in token di infilare l’intera codebase in ogni prompt.

Prerequisiti

  • Python 3.12+
  • Una API key Venice da venice.ai
  • Familiarità di base con Pydantic, il modulo ast di Python e l’SDK Python di OpenAI
Il repo di riferimento usa uv per la gestione delle dipendenze, ma un normale virtual environment funziona altrettanto bene.

Configurare il progetto

Crea un nuovo progetto e installa le dipendenze:
mkdir venice-security-reviewer
cd venice-security-reviewer
uv init
uv add "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0"
Se preferisci pip, crea invece un virtual environment:
python -m venv .venv
source .venv/bin/activate
pip install "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0"
Crea un file .env per lo sviluppo locale:
VENICE_API_KEY=your-venice-api-key-here
# Override opzionali
# VENICE_BASE_URL=https://api.venice.ai/api/v1
# VENICE_MODEL=zai-org-glm-5
Distribuiremo i sorgenti sotto src/venice_security_reviewer/ per mantenerli importabili come package, con i prompt sotto prompts/ nella root del repo in modo che possano essere revisionati e diff-ati come qualsiasi altro artefatto sorgente:
src/venice_security_reviewer/
  __init__.py
  models.py     # Modelli Pydantic
  client.py     # Factory del client Venice
  repo_map.py   # Mappa del repo costruita via AST
  scanner.py    # Scanner agent
  chainer.py    # Chainer agent
  report.py     # Rendering Markdown con Jinja2
  cli.py        # CLI Typer
  templates/
    report.md.j2
prompts/
  scanner.md
  chainer.md
tests/
  test_models.py
  test_cross_file_chain.py

Configurare il client Venice

Venice è compatibile con OpenAI, quindi possiamo usare l’SDK Python ufficiale di OpenAI puntando semplicemente il suo base_url a Venice. Centralizzando la costruzione del client in un unico file, il resto del codice non deve mai sapere con quale provider sta parlando: sostituire il backend toccherebbe solo questo modulo. Crea src/venice_security_reviewer/client.py:
from __future__ import annotations

import os
from dataclasses import dataclass

from dotenv import load_dotenv
from openai import OpenAI

DEFAULT_BASE_URL = "https://api.venice.ai/api/v1"
DEFAULT_MODEL = "zai-org-glm-5"


class VeniceConfigError(RuntimeError):
    """Raised when Venice client config is missing or invalid."""


@dataclass(frozen=True, slots=True)
class VeniceConfig:
    api_key: str
    base_url: str
    model: str

    @classmethod
    def from_env(cls) -> "VeniceConfig":
        load_dotenv()
        api_key = os.getenv("VENICE_API_KEY")
        if not api_key:
            raise VeniceConfigError(
                "VENICE_API_KEY is not set. Add it to your .env file, "
                "or export VENICE_API_KEY in your shell."
            )
        return cls(
            api_key=api_key,
            base_url=os.getenv("VENICE_BASE_URL", DEFAULT_BASE_URL),
            model=os.getenv("VENICE_MODEL", DEFAULT_MODEL),
        )


def build_client(config: VeniceConfig | None = None) -> tuple[OpenAI, str]:
    cfg = config or VeniceConfig.from_env()
    client = OpenAI(api_key=cfg.api_key, base_url=cfg.base_url)
    return client, cfg.model
Alcuni punti da notare:
  • Usiamo zai-org-glm-5 come default perché è un solido modello Venice general-purpose, ma puoi sovrascriverlo con la variabile d’ambiente VENICE_MODEL. Per codebase più grandi o più sfumate, passare a un modello più potente può rendere il Chainer notevolmente migliore in termini di qualità narrativa.
  • build_client restituisce il client e il model id, così i chiamanti non devono leggere variabili d’ambiente e i test possono iniettare una config fake senza monkeypatching.

Definire i data model

Tutto il senso dell’usare Pydantic qui, anziché passare dict raw, è ottenere un confine di validazione rigido tra l’LLM e il resto del programma. Se il modello restituisce JSON malformato o inventa un ID finding inesistente, il parsing fallisce rumorosamente e non propaghiamo mai l’allucinazione nel report. Crea src/venice_security_reviewer/models.py:
from __future__ import annotations

from pathlib import Path
from typing import Literal, Self

from pydantic import BaseModel, ConfigDict, Field, model_validator

Severity = Literal["low", "medium", "high", "critical"]
ChainSeverity = Literal["high", "critical"]


class Evidence(BaseModel):
    """A concrete code span that justifies a finding."""

    model_config = ConfigDict(frozen=True)

    file: Path
    start_line: int = Field(ge=1)
    end_line: int = Field(ge=1)
    snippet: str

    @model_validator(mode="after")
    def _check_line_range(self) -> Self:
        if self.end_line < self.start_line:
            raise ValueError(
                f"end_line ({self.end_line}) must be >= start_line ({self.start_line})"
            )
        return self


class Finding(BaseModel):
    """An atomic vulnerability surfaced by the Scanner agent."""

    model_config = ConfigDict(frozen=True)

    id: str = Field(pattern=r"^F-\d{3,}$")
    title: str = Field(min_length=1)
    severity: Severity
    description: str = Field(min_length=1)
    cwe: str | None = None
    evidence: Evidence


class Chain(BaseModel):
    """An exploit chain combining two or more atomic findings."""

    model_config = ConfigDict(frozen=True)

    id: str = Field(pattern=r"^C-\d{3,}$")
    findings: list[str] = Field(min_length=2)
    narrative: str = Field(min_length=1)
    severity: ChainSeverity
    files_involved: list[Path] = Field(min_length=1)
I vincoli qui fanno un lavoro concreto:
  • Finding.id e Chain.id sono vincolati a un regex tipo F-001, C-001. Se il modello diventa creativo sul formato, la validazione fallisce.
  • Chain.findings richiede almeno due voci: una “catena” di un solo finding è solo un finding.
  • Chain.severity è limitato a high o critical. Una combinazione di findings che non eleva l’impatto sopra la severità individuale più alta non è una catena degna di essere segnalata.
  • Evidence impone che end_line >= start_line così il modello non può restituire intervalli di riga senza senso.
Questa è la validazione della forma. Ci serve anche la validazione di cross-reference: una catena che fa riferimento a un ID finding mai prodotto dallo Scanner è priva di significato. Aggiungi questa funzione a models.py:
def validate_chain_references(
    chains: list[Chain], findings: list[Finding]
) -> tuple[list[Chain], list[Chain]]:
    findings_by_id = {f.id: f for f in findings}
    valid: list[Chain] = []
    dropped: list[Chain] = []
    for chain in chains:
        if not all(ref in findings_by_id for ref in chain.findings):
            dropped.append(chain)
            continue
        chain_evidence_files = {
            findings_by_id[ref].evidence.file.as_posix() for ref in chain.findings
        }
        if not all(p.as_posix() in chain_evidence_files for p in chain.files_involved):
            dropped.append(chain)
            continue
        valid.append(chain)
    return valid, dropped
Questo è il guardrail deterministico che mantiene il Chainer onesto. Può fare riferimento solo a findings effettivamente prodotti dallo Scanner e può rivendicare solo file coinvolti nella catena da cui uno di quei findings proviene davvero. Restituire le catene scartate invece di filtrarle silenziosamente permette alla CLI di emettere un warning quando il modello prova a inventare qualcosa.

Costruire la mappa AST del repo

La mappa del repo è lo scheletro strutturale di una codebase Python: la superficie pubblica di ogni modulo, ogni arco di import e un indice inverso da “modulo M” ai “moduli che importano da M”. Si costruisce una volta per run di scansione con ast di Python, mai tramite esecuzione, quindi è sicura da eseguire su codice avversariale: il parser non importa né invoca nulla dall’albero scansionato. Consumeremo la mappa in due forme. Lo Scanner riceve una slice di vicinato per file in modo che i suoi prompt restino di dimensioni limitate. Il Chainer riceve una mappa completa condensata per poter costruire catene attraverso i file. Crea src/venice_security_reviewer/repo_map.py e inizia con i modelli Pydantic che descrivono la mappa:
from __future__ import annotations

import ast
import logging
from collections.abc import Iterable
from pathlib import Path
from typing import Literal

from pydantic import BaseModel, ConfigDict, Field

logger = logging.getLogger(__name__)

SymbolKind = Literal["function", "class", "constant"]
_SIGNATURE_CHAR_CAP = 200

SKIP_DIR_NAMES: frozenset[str] = frozenset({
    ".git", ".venv", "venv", "env", "__pycache__", "node_modules",
    "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "site-packages",
})


class SymbolDef(BaseModel):
    model_config = ConfigDict(frozen=True)
    name: str
    kind: SymbolKind
    line: int = Field(ge=1)
    signature: str | None = None


class ImportEdge(BaseModel):
    model_config = ConfigDict(frozen=True)
    from_module: str
    imported_names: list[str]
    line: int = Field(ge=1)


class ModuleEntry(BaseModel):
    model_config = ConfigDict(frozen=True)
    path: Path
    module_name: str
    defines: list[SymbolDef]
    imports: list[ImportEdge]
    exports: list[str]
Ora l’helper che attraversa l’albero e salta le directory che non vogliamo indicizzare:
def _iter_python_files(root: Path) -> Iterable[Path]:
    for path in sorted(root.rglob("*.py")):
        if any(part in SKIP_DIR_NAMES for part in path.parts):
            continue
        if path.is_file():
            yield path


def _path_to_module_name(path: Path, root: Path) -> str:
    rel = path.relative_to(root)
    parts = list(rel.with_suffix("").parts)
    if parts and parts[-1] == "__init__":
        parts = parts[:-1]
    return ".".join(parts)
Per ogni file vogliamo tre cose dall’AST: i simboli di top-level che definisce, gli archi di import e una lista __all__ esplicita se presente. Le firme delle funzioni e gli header delle classi vengono renderizzati come stringhe compatte che l’LLM può leggere direttamente:
def _render_signature(node: ast.FunctionDef | ast.AsyncFunctionDef) -> str:
    try:
        prefix = "async def " if isinstance(node, ast.AsyncFunctionDef) else "def "
        args = ast.unparse(node.args)
        returns = f" -> {ast.unparse(node.returns)}" if node.returns is not None else ""
        sig = f"{prefix}{node.name}({args}){returns}"
        if len(sig) > _SIGNATURE_CHAR_CAP:
            return f"{prefix}{node.name}(...)"
        return sig
    except Exception:
        return f"def {node.name}(...)"


def _render_class_header(node: ast.ClassDef) -> str:
    try:
        bases = [ast.unparse(b) for b in node.bases]
        sig = f"class {node.name}({', '.join(bases)})" if bases else f"class {node.name}"
        if len(sig) > _SIGNATURE_CHAR_CAP:
            return f"class {node.name}(...)"
        return sig
    except Exception:
        return f"class {node.name}"
Il _SIGNATURE_CHAR_CAP di 200 preserva firme reali tipiche (inclusi i type hint) prevenendo casi patologici come un union typed da 200 righe che farebbe esplodere il prompt. Poi, l’extractor che tira fuori i dati strutturali da un modulo parsato. Gestiamo ast.FunctionDef, ast.ClassDef, top-level ast.Assign e ast.AnnAssign per le costanti, e sia ast.Import sia ast.ImportFrom per gli archi di import. Gli import relativi vengono risolti nella loro forma assoluta dotted in modo che il Chainer possa abbinarli ai nomi dei moduli più avanti:
def _resolve_relative_package(
    *, importer_module: str, importer_is_init: bool, level: int
) -> str | None:
    if level <= 0:
        return None
    importer_parts = importer_module.split(".") if importer_module else []
    base_parts = list(importer_parts) if importer_is_init else importer_parts[:-1]
    steps_up = level - 1
    if steps_up > len(base_parts):
        return None
    package_parts = (
        base_parts[: len(base_parts) - steps_up] if steps_up else list(base_parts)
    )
    return ".".join(package_parts)
La logica di estrazione completa attraversa tree.body ed emette voci SymbolDef e ImportEdge per ogni nodo di top-level. La funzione _extract del repo di riferimento in repo_map.py copre l’implementazione completa. La forma che ne esce è una lista di oggetti ModuleEntry, uno per file. La parte interessante è cosa facciamo con queste voci. Le avvolgiamo in una RepoMap con due metodi destinati ai consumer:
class RepoMap(BaseModel):
    model_config = ConfigDict(frozen=True)
    root: Path
    modules: list[ModuleEntry]

    def by_module_name(self, module_name: str) -> ModuleEntry | None:
        for m in self.modules:
            if m.module_name == module_name:
                return m
        return None

    def importers_of(self, module_name: str) -> list["ImportingRef"]:
        refs: list["ImportingRef"] = []
        for m in self.modules:
            for edge in m.imports:
                if edge.from_module == module_name:
                    refs.append(
                        ImportingRef(
                            importer_path=m.path,
                            importer_module=m.module_name,
                            imported_names=list(edge.imported_names),
                            line=edge.line,
                        )
                    )
        return refs

    def neighborhood(self, path: Path) -> "ModuleNeighborhood | None":
        m = next((mod for mod in self.modules if mod.path == path), None)
        if m is None:
            return None
        return ModuleNeighborhood(
            this_module=m,
            imported_by=self.importers_of(m.module_name),
            imports_from_repo=self.resolve_imports_in_repo(m.module_name),
        )

    def condensed_dict(self) -> dict[str, object]:
        return {
            "modules": [
                {
                    "path": str(m.path),
                    "module": m.module_name,
                    "exports": list(m.exports),
                    "imports": [
                        {"from": e.from_module, "names": list(e.imported_names)}
                        for e in m.imports
                    ],
                }
                for m in self.modules
            ]
        }
neighborhood(path) è ciò che lo Scanner chiama per ogni file. Restituisce un oggetto ModuleNeighborhood contenente il modulo stesso, ogni altro modulo che importa da esso e ogni simbolo in-repo che importa da altrove (con le loro firme risolte). Questo dà allo Scanner abbastanza contesto per segnalare findings che sono evidenti solo in contesto cross-file, senza trascinare l’intera codebase nel prompt. condensed_dict() è ciò che riceve il Chainer. Snippet e firme vengono eliminati; restano solo path, nomi dei moduli, export pubblici e archi di import. È la rappresentazione più piccola che consenta comunque al Chainer di ragionare sul flusso di dati cross-module. Infine, l’entry point che costruisce il tutto:
def build_repo_map(root: Path) -> RepoMap:
    root = root.resolve()
    modules: list[ModuleEntry] = []
    for path in _iter_python_files(root):
        rel = path.relative_to(root)
        module_name = _path_to_module_name(path, root)
        is_init = path.stem == "__init__"
        try:
            source = path.read_text(encoding="utf-8")
            tree = ast.parse(source)
        except (OSError, SyntaxError, UnicodeDecodeError) as exc:
            logger.warning("repo_map: skipping %s: %s", rel, exc)
            continue
        defines, imports, explicit_all = _extract(
            tree, importer_module=module_name, importer_is_init=is_init
        )
        exports = explicit_all or [s.name for s in defines if not s.name.startswith("_")]
        modules.append(
            ModuleEntry(
                path=rel,
                module_name=module_name,
                defines=defines,
                imports=imports,
                exports=exports,
            )
        )
    return RepoMap(root=root, modules=modules)
I file che non possiamo leggere o che non si parsano vengono loggati e saltati. Restituiamo una mappa parziale invece di far fallire l’intero run; il caso peggiore è che una chiamata Scanner non veda alcun vicinato per un file, il che è comunque una scansione funzionante.

Scrivere lo Scanner agent

Lo Scanner attraversa un path target, raccoglie i file sorgente Python e chiede a Venice di identificare vulnerabilità atomiche un file alla volta. La scansione per-file mantiene il prompt piccolo e isola i fallimenti: un file errato non distrugge l’intero run. Manterremo il prompt in un file separato così può essere revisionato e diff-ato come qualsiasi altro artefatto sorgente. Crea prompts/scanner.md:
You are a static security analyst reviewing a single Python source file for
vulnerabilities. You will be given the file path, its full contents, and a
*neighborhood* slice of the surrounding repo: which other modules import
from this file (and what symbols they pull), and which in-repo symbols this
file imports from elsewhere. You must respond with a JSON object that lists
every distinct vulnerability you can identify, with concrete file:line
evidence for each.

# Rules

1. Output a single JSON object. No prose before or after. No markdown fences.
2. The object must match this schema exactly:

```json
{
  "findings": [
    {
      "id": "F-001",
      "title": "Short imperative title, e.g. 'Hardcoded session signing key'",
      "severity": "low | medium | high | critical",
      "description": "One to three sentences explaining the vulnerability and why it matters.",
      "cwe": "CWE-798 or null if not applicable",
      "evidence": {
        "file": "{filename}",
        "start_line": 12,
        "end_line": 14,
        "snippet": "the exact lines from the source, copied verbatim including whitespace"
      }
    }
  ]
}
```

3. Finding IDs must be sequential within this file: F-001, F-002, F-003, etc.
4. The `file` field in evidence must equal the filename you were given, exactly.
5. `start_line` and `end_line` must be 1-indexed line numbers from the source you were given.
6. The `snippet` must be the exact text of those lines, copied verbatim. Do not paraphrase. Do not truncate.
7. Do not invent vulnerabilities. If you are unsure, omit it. False positives waste the operator's time and erode trust in the tool.
8. Every finding's evidence must point at lines in THIS file. Do not produce findings whose evidence lives in a different file. The Chainer is the agent that reasons across files.
9. If the file contains no vulnerabilities, return `{"findings": []}`.
Il prompt completo nel repo di riferimento contiene anche una sezione “What to look for” che elenca le classi di vulnerabilità comuni (secret hardcoded, SQL injection, command injection, SSRF, deserializzazione insicura, ecc.) e una sezione “How to use the neighborhood” che spiega come il modello deve consumare il contesto cross-file. Alcune note sul design del prompt:
  • Diciamo al modello di emettere solo JSON, senza prose o code fence. L’SDK OpenAI supporta un parametro response_format={"type": "json_object"} che lo impone lato API, ma rinforzarlo nel prompt riduce i casi limite.
  • Vietiamo esplicitamente allo Scanner di produrre catene cross-file. Le catene sono lavoro del Chainer e chiedere allo Scanner di fare entrambe le cose offusca le responsabilità.
  • Richiediamo che lo snippet sia copiato verbatim. Significa che il report può citare gli esatti byte che il modello dichiara di aver visto, e un revisore può verificare a campione un finding confrontando lo snippet con il sorgente.
Ora il codice dell’agente. Crea src/venice_security_reviewer/scanner.py e inizia con il walker dei file e il prompt loader:
from __future__ import annotations

import json
import logging
from collections.abc import Iterable, Iterator
from pathlib import Path

from openai import OpenAI
from pydantic import ValidationError

from .models import Finding
from .repo_map import ModuleNeighborhood, RepoMap

logger = logging.getLogger(__name__)

DEFAULT_SOURCE_EXTENSIONS: frozenset[str] = frozenset({".py"})

SKIP_DIR_NAMES: frozenset[str] = frozenset({
    ".git", ".venv", "venv", "env", "__pycache__", "node_modules",
    "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "site-packages",
})

MAX_FILE_BYTES = 200_000


def _load_prompt_template(name: str) -> str:
    here = Path(__file__).resolve()
    return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8")


def iter_source_files(
    root: Path, extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS
) -> Iterator[Path]:
    exts = {e.lower() for e in extensions}
    for path in sorted(root.rglob("*")):
        if not path.is_file():
            continue
        if path.suffix.lower() not in exts:
            continue
        if any(part in SKIP_DIR_NAMES for part in path.parts):
            continue
        try:
            if path.stat().st_size > MAX_FILE_BYTES:
                logger.warning("skipping %s: exceeds %d bytes", path, MAX_FILE_BYTES)
                continue
        except OSError:
            continue
        yield path
MAX_FILE_BYTES è un tetto di sicurezza. Oltre ~200 KB saltiamo invece di inviare un prompt enorme che probabilmente sarebbe sia costoso sia di bassa qualità. Il pezzo successivo è il builder del prompt. Il template usa {filename}, {source} e {neighborhood} come segnaposto; usiamo str.replace invece di .format() perché il template contiene esempi JSON con graffe letterali:
def _render_neighborhood(neighborhood: ModuleNeighborhood | None) -> str:
    if neighborhood is None:
        return "null"
    return neighborhood.model_dump_json(indent=2)


def _build_prompt(
    template: str, *, filename: str, source: str, neighborhood: ModuleNeighborhood | None
) -> str:
    return (
        template.replace("{filename}", filename)
        .replace("{source}", source)
        .replace("{neighborhood}", _render_neighborhood(neighborhood))
    )
Ora il parser. Deserializziamo il JSON, validiamo ogni finding attraverso Pydantic e scartiamo i singoli findings malformati anziché far fallire l’intero file. Un finding errato non deve farci perdere quelli buoni:
def _parse_findings(raw: str, *, source_file: Path) -> list[Finding]:
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError(f"model did not return valid JSON: {exc}") from exc

    if not isinstance(data, dict) or "findings" not in data:
        raise ValueError("model JSON missing 'findings' key")

    findings: list[Finding] = []
    for entry in data["findings"]:
        try:
            findings.append(Finding.model_validate(entry))
        except ValidationError as exc:
            logger.warning("dropping malformed finding from %s: %s", source_file, exc)
    return findings
Lo Scanner emette ID come F-001 per file, ma il Chainer deve poter fare riferimento ai findings nell’intero repo. Rinumeriamo gli ID con un contatore monotono in modo che siano globalmente unici:
def _renumber_findings(findings: list[Finding], offset: int) -> tuple[list[Finding], int]:
    renumbered: list[Finding] = []
    for i, f in enumerate(findings):
        new_id = f"F-{offset + i + 1:03d}"
        renumbered.append(f.model_copy(update={"id": new_id}))
    return renumbered, offset + len(findings)
La chiamata di scansione del singolo file combina tutto questo. Leggiamo il file, costruiamo il prompt, lo inviamo a Venice con response_format={"type": "json_object"} e una temperatura bassa, e parsiamo il risultato:
def scan_file(
    client: OpenAI,
    model: str,
    path: Path,
    *,
    prompt_template: str,
    repo_root: Path,
    repo_map: RepoMap,
    max_retries: int = 1,
) -> list[Finding]:
    try:
        source = path.read_text(encoding="utf-8")
    except (OSError, UnicodeDecodeError) as exc:
        logger.warning("could not read %s: %s", path, exc)
        return []

    rel = path.relative_to(repo_root)
    neighborhood = repo_map.neighborhood(rel)
    prompt = _build_prompt(
        prompt_template, filename=str(rel), source=source, neighborhood=neighborhood
    )

    last_error: Exception | None = None
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You are a precise static security analyst. You respond "
                            "only with valid JSON matching the schema in the user prompt."
                        ),
                    },
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.1,
            )
        except Exception as exc:
            logger.warning("Venice call failed for %s on attempt %d: %s", rel, attempt, exc)
            last_error = exc
            continue

        content = response.choices[0].message.content or ""
        try:
            findings = _parse_findings(content, source_file=path)
        except ValueError as exc:
            logger.warning("parse failure for %s on attempt %d: %s", rel, attempt, exc)
            last_error = exc
            continue

        return [
            f.model_copy(update={"evidence": f.evidence.model_copy(update={"file": rel})})
            for f in findings
        ]

    logger.error("giving up on %s after %d attempts: %s", rel, max_retries + 1, last_error)
    return []
Due dettagli da evidenziare:
  • Patchamo il path del file in evidence per essere relativo a repo_root dopo il parsing, perché il modello rispedisce il nome del file che gli abbiamo dato ma vogliamo una sola forma canonica in tutto il report.
  • temperature=0.1 è intenzionalmente basso. Vogliamo che lo Scanner sia conservativo e coerente tra run; la creatività è lavoro del Chainer.
Infine, l’orchestratore che scansiona ogni file idoneo sotto la root:
def scan_path(
    client: OpenAI,
    model: str,
    root: Path,
    repo_map: RepoMap,
    *,
    extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS,
) -> list[Finding]:
    template = _load_prompt_template("scanner.md")
    all_findings: list[Finding] = []
    offset = 0
    for path in iter_source_files(root, extensions=extensions):
        logger.info("scanning %s", path.relative_to(root))
        findings = scan_file(
            client, model, path,
            prompt_template=template,
            repo_root=root,
            repo_map=repo_map,
        )
        renumbered, offset = _renumber_findings(findings, offset)
        all_findings.extend(renumbered)
    return all_findings
La mappa del repo viene costruita una sola volta dal chiamante e riutilizzata per ogni file, così lo Scanner vede una struttura globale coerente anche quando singoli file non si parsano o vengono saltati.

Scrivere il Chainer agent

Il Chainer prende l’unione dei findings dello Scanner più la mappa condensata del repo e chiede a Venice se uno qualsiasi dei findings si combini in una catena di exploit reale. Due guardrail deterministici si trovano tra l’LLM e il report:
  1. Ogni catena deve fare riferimento solo a ID finding prodotti dallo Scanner.
  2. Ogni catena deve rivendicare solo file che almeno una evidence dei findings referenziati tocca.
Le catene che violano una delle due regole vengono scartate in fase di parsing. Questo impedisce al modello di allucinare catene “tanto per” e di rivendicare che una catena copra file per i quali non ha alcuna evidenza. Il prompt del Chainer risiede in prompts/chainer.md. Il suo nucleo è il seguente:
You are a senior offensive security engineer. You are given a list of atomic
vulnerability findings discovered in a single codebase, plus a structural map
of that codebase showing every module's public symbols and import edges. Your
job is to identify whether any subset of the findings can be combined into a
real, end-to-end exploit chain.

# Rules

1. Output a single JSON object. No prose before or after. No markdown fences.
2. The object must match this schema exactly:

```json
{
  "chains": [
    {
      "id": "C-001",
      "findings": ["F-001", "F-003"],
      "narrative": "Step-by-step explanation of how an attacker combines these specific findings into a single exploit. Reference each finding by ID where it is used.",
      "severity": "high | critical",
      "files_involved": ["pkg/validators.py", "pkg/fetcher.py"]
    }
  ]
}
```

3. Chain IDs must be sequential: C-001, C-002, C-003, etc.
4. Every entry in `findings` MUST be the ID of a finding from the input list. You may NOT invent new finding IDs.
5. Every entry in `files_involved` MUST be the `evidence.file` of at least one of the findings you reference in this chain.
6. A chain must reference at least two distinct findings.
7. Chains are by definition severity high or critical. If a combination doesn't raise the impact above the highest individual severity, it is not a chain worth reporting.
8. If no real chain exists, return `{"chains": []}`. It is correct and expected for many codebases to have findings that do not chain.
Il prompt completo nel repo di riferimento spiega anche come leggere la mappa del repo, come decidere cosa va in files_involved e, soprattutto, quando non concatenare. Dire al modello “è corretto e atteso che molte codebase abbiano findings che non si concatenano” è ciò che gli impedisce di inventare catene per sembrare produttivo. Ora il codice dell’agente. Crea src/venice_security_reviewer/chainer.py:
from __future__ import annotations

import json
import logging
from pathlib import Path

from openai import OpenAI
from pydantic import ValidationError

from .models import Chain, Finding, validate_chain_references
from .repo_map import RepoMap

logger = logging.getLogger(__name__)

MAX_REPO_MAP_CHARS = 8000


def _load_prompt_template(name: str) -> str:
    here = Path(__file__).resolve()
    return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8")
MAX_REPO_MAP_CHARS = 8000 è un tetto morbido per il blocco di mappa repo renderizzata in JSON nel prompt del Chainer. A circa 4 caratteri per token, sono ~2000 token, che rientrano comodamente in qualsiasi finestra di contesto dei modelli Venice anche con findings e budget narrativo in cima. Serializziamo i findings in un blocco JSON compatto. Nota che togliamo apposta lo snippet dall’evidence qui: il Chainer non ha bisogno dei byte raw per decidere se due findings si combinano, e includerli circa raddoppia il costo in token su codebase reali:
def _findings_to_input_json(findings: list[Finding]) -> str:
    payload = [
        {
            "id": f.id,
            "title": f.title,
            "severity": f.severity,
            "description": f.description,
            "cwe": f.cwe,
            "evidence": {
                "file": str(f.evidence.file),
                "start_line": f.evidence.start_line,
                "end_line": f.evidence.end_line,
            },
        }
        for f in findings
    ]
    return json.dumps(payload, indent=2)
Per codebase più grandi la mappa condensata completa del repo può sforare il nostro budget di caratteri. Quando succede, sfrondiamo limitandoci ai moduli che contengono findings più i loro vicini diretti. Questo preserva struttura sufficiente per consentire al Chainer di ragionare su catene per cui abbiamo evidenza, e scarta il resto:
def _prune_for_budget(
    repo_map: RepoMap, findings: list[Finding], *, char_budget: int
) -> dict[str, object]:
    full = repo_map.condensed_dict()
    if len(json.dumps(full)) <= char_budget:
        return full

    finding_files = {f.evidence.file for f in findings}
    keep_modules = {
        m.module_name for m in repo_map.modules if m.path in finding_files
    }
    if not keep_modules:
        return full

    neighbours: set[str] = set()
    for m in repo_map.modules:
        if m.module_name in keep_modules:
            for edge in m.imports:
                neighbours.add(edge.from_module)
        for edge in m.imports:
            if edge.from_module in keep_modules:
                neighbours.add(m.module_name)
    keep_modules.update(neighbours)

    pruned_modules = [
        {
            "path": str(m.path),
            "module": m.module_name,
            "exports": list(m.exports),
            "imports": [
                {"from": e.from_module, "names": list(e.imported_names)}
                for e in m.imports
            ],
        }
        for m in repo_map.modules
        if m.module_name in keep_modules
    ]
    return {
        "modules": pruned_modules,
        "_pruned": True,
        "_kept": len(pruned_modules),
        "_total": len(repo_map.modules),
    }


def _render_repo_map(
    repo_map: RepoMap, findings: list[Finding], *, char_budget: int = MAX_REPO_MAP_CHARS
) -> str:
    payload = _prune_for_budget(repo_map, findings, char_budget=char_budget)
    if payload.get("_pruned"):
        logger.info(
            "chainer: repo map pruned for token budget (kept %s of %s modules)",
            payload.get("_kept"),
            payload.get("_total"),
        )
    return json.dumps(payload, indent=2)
La strategia di pruning è volutamente semplice: tieni i moduli in cui vivono i nostri findings e tieni i loro vicini diretti nel grafo di import. Qualsiasi cosa più lontana non ha un ruolo plausibile in una catena per la quale attualmente abbiamo evidenza, quindi può essere scartata senza perdere potere di ragionamento. Annotiamo anche il payload con i marker _pruned, _kept e _total, così il prompt del Chainer può avvisare il modello quando la mappa è stata sfrondata. Il parsing della risposta ha la stessa forma di quello dello Scanner: deserializza, valida ogni catena tramite Pydantic, scarta le voci malformate:
def _parse_chains(raw: str) -> list[Chain]:
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError(f"chainer did not return valid JSON: {exc}") from exc

    if not isinstance(data, dict) or "chains" not in data:
        raise ValueError("chainer JSON missing 'chains' key")

    chains: list[Chain] = []
    for entry in data["chains"]:
        try:
            chains.append(Chain.model_validate(entry))
        except ValidationError as exc:
            logger.warning("dropping malformed chain: %s", exc)
    return chains
Poi l’agente stesso:
def find_chains(
    client: OpenAI,
    model: str,
    findings: list[Finding],
    repo_map: RepoMap,
    *,
    max_retries: int = 1,
) -> tuple[list[Chain], list[Chain]]:
    if len(findings) < 2:
        return [], []

    template = _load_prompt_template("chainer.md")
    prompt = template.replace(
        "{findings_json}", _findings_to_input_json(findings)
    ).replace("{repo_map}", _render_repo_map(repo_map, findings))

    last_error: Exception | None = None
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You are a senior offensive security engineer. You respond "
                            "only with valid JSON matching the schema in the user prompt."
                        ),
                    },
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.2,
            )
        except Exception as exc:
            logger.warning("Venice call failed on attempt %d: %s", attempt, exc)
            last_error = exc
            continue

        content = response.choices[0].message.content or ""
        try:
            chains = _parse_chains(content)
        except ValueError as exc:
            logger.warning("chainer parse failure on attempt %d: %s", attempt, exc)
            last_error = exc
            continue

        valid, dropped = validate_chain_references(chains, findings)
        if dropped:
            logger.warning(
                "chainer referenced %d unknown finding id(s) or file(s); chains dropped: %s",
                len(dropped),
                [c.id for c in dropped],
            )
        return valid, dropped

    logger.error("giving up on chainer after %d attempts: %s", max_retries + 1, last_error)
    return [], []
Un paio di cose da segnalare:
  • Usciamo prima di chiamare il modello quando ci sono meno di due findings. Non puoi concatenare un singolo finding, e saltare la chiamata significa non bruciare token su un risultato garantito-vuoto.
  • temperature=0.2 è leggermente più alta di 0.1 dello Scanner. Il Chainer beneficia di un tocco di creatività in più per individuare combinazioni non ovvie, ma vogliamo comunque che resti ancorato ai findings e alla mappa che gli sono stati forniti.
  • Dopo il parsing, validate_chain_references esegue il controllo cross-reference deterministico che abbiamo scritto prima. Tutto ciò che sopravvive è sicuro da renderizzare; tutto ciò che non sopravvive viene loggato così l’operatore sa che il modello ha provato a inventarsi qualcosa.
Quel controllo di cross-reference è il pezzo più importante dell’intero progetto. È il confine tra “tool di sicurezza utile” e “report AI occasionalmente convintamente sbagliato”. Con esso a posto, anche se il modello allucina, la catena sbagliata non raggiunge mai il report.

Renderizzare il report Markdown

Tenere il rendering separato dalla logica degli agenti significa che gli stessi oggetti Finding e Chain potranno in seguito essere alimentati in altri formati (JSON, SARIF, HTML) senza toccare lo Scanner o il Chainer. Useremo Jinja2 con un piccolo file di template. Crea src/venice_security_reviewer/templates/report.md.j2:
# Security Review Report

**Target:** `{{ target }}`
**Scanned at:** {{ scanned_at }}
**Model:** `{{ model }}`

---

## Summary

- **Atomic findings:** {{ findings | length }}
- **Exploit chains:** {{ chains | length }}
{%- if dropped_chains %}
- **Dropped chains (referenced unknown findings):** {{ dropped_chains | length }}
{%- endif %}

---

## Exploit Chains

{% if not chains %}
_No exploit chains were identified by the Chainer agent._
{% else %}
{% for c in chains %}
### {{ c.id }}{{ c.severity | upper }}

**Findings combined:** {{ c.findings | join(', ') }}
**Files involved:** {{ c.files_involved | map('string') | join(', ') }}

{{ c.narrative }}

{% endfor %}
{% endif %}

---

## Atomic Findings

{% for f in findings %}
### {{ f.id }}{{ f.title }}

- **Severity:** {{ f.severity }}
{%- if f.cwe %}
- **CWE:** {{ f.cwe }}
{%- endif %}
- **Location:** `{{ f.evidence.file }}:{{ f.evidence.start_line }}-{{ f.evidence.end_line }}`

{{ f.description }}

```
{{ f.evidence.snippet }}
```

{% endfor %}
Poi il renderer in src/venice_security_reviewer/report.py:
from __future__ import annotations

from datetime import UTC, datetime
from pathlib import Path

from jinja2 import Environment, PackageLoader, select_autoescape

from .models import Chain, Finding


def _build_env() -> Environment:
    return Environment(
        loader=PackageLoader("venice_security_reviewer", "templates"),
        autoescape=select_autoescape(enabled_extensions=("html",)),
        keep_trailing_newline=True,
    )


def render_report(
    *,
    target: Path,
    model: str,
    findings: list[Finding],
    chains: list[Chain],
    dropped_chains: list[Chain] | None = None,
) -> str:
    env = _build_env()
    template = env.get_template("report.md.j2")
    return template.render(
        target=str(target),
        scanned_at=datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC"),
        model=model,
        findings=findings,
        chains=chains,
        dropped_chains=dropped_chains or [],
    )
L’autoescape resta disattivato per il template Markdown (Markdown non è HTML), ma lo lasciamo abilitato per qualsiasi futuro template .html per estensione.

Cablare la CLI

La CLI è l’orchestratore: costruisce la mappa del repo, scansiona, concatena, renderizza. Useremo Typer per gestire il parsing degli argomenti e Rich per stampare una bella tabella di sintesi. Crea src/venice_security_reviewer/cli.py:
from __future__ import annotations

import logging
import sys
from pathlib import Path
from typing import Annotated

import typer
from rich.console import Console
from rich.table import Table

from .chainer import find_chains
from .client import VeniceConfigError, build_client
from .models import Chain, Finding
from .repo_map import build_repo_map
from .report import render_report
from .scanner import scan_path

app = typer.Typer(
    add_completion=False,
    help="Two-agent security code reviewer powered by Venice AI.",
    no_args_is_help=True,
)
console = Console()


@app.callback()
def _root() -> None:
    """Force Typer to keep `scan` as a named subcommand."""


def _configure_logging(verbose: bool) -> None:
    logging.basicConfig(
        level=logging.DEBUG if verbose else logging.INFO,
        format="%(levelname)s %(name)s: %(message)s",
        stream=sys.stderr,
    )


def _print_summary(
    findings: list[Finding], chains: list[Chain], dropped: list[Chain]
) -> None:
    table = Table(title="Scan summary", show_header=True, header_style="bold")
    table.add_column("Metric")
    table.add_column("Count", justify="right")
    table.add_row("Atomic findings", str(len(findings)))
    table.add_row("Exploit chains", str(len(chains)))
    if dropped:
        table.add_row("Chains dropped (bad refs)", str(len(dropped)))
    console.print(table)


@app.command()
def scan(
    path: Annotated[
        Path,
        typer.Argument(
            exists=True, file_okay=False, dir_okay=True, readable=True, resolve_path=True,
            help="Path to the codebase to scan.",
        ),
    ],
    out: Annotated[
        Path, typer.Option("--out", "-o", help="Where to write the Markdown report.")
    ] = Path("report.md"),
    verbose: Annotated[
        bool, typer.Option("--verbose", "-v", help="Enable debug logging.")
    ] = False,
) -> None:
    """Scan a codebase for vulnerabilities and exploit chains."""
    _configure_logging(verbose)

    try:
        client, model = build_client()
    except VeniceConfigError as exc:
        console.print(f"[red]error:[/red] {exc}")
        raise typer.Exit(code=2) from exc

    console.print(f"[bold]Indexing[/bold] {path} (AST repo map)...")
    repo_map = build_repo_map(path)
    edge_count = sum(len(m.imports) for m in repo_map.modules)
    console.print(
        f"Repo map: [bold]{len(repo_map.modules)}[/bold] module(s), "
        f"[bold]{edge_count}[/bold] import edge(s)."
    )

    console.print(f"[bold]Scanning[/bold] {path} with model [cyan]{model}[/cyan]...")
    findings = scan_path(client, model, path, repo_map)
    console.print(f"Scanner produced [bold]{len(findings)}[/bold] finding(s).")

    console.print("[bold]Chaining[/bold] findings...")
    chains, dropped = find_chains(client, model, findings, repo_map)
    console.print(f"Chainer produced [bold]{len(chains)}[/bold] chain(s).")

    report = render_report(
        target=path, model=model,
        findings=findings, chains=chains, dropped_chains=dropped,
    )
    out.write_text(report, encoding="utf-8")
    console.print(f"Report written to [green]{out}[/green]")
    _print_summary(findings, chains, dropped)


def main() -> None:
    app()


if __name__ == "__main__":
    main()
Aggiungi il script entry point al pyproject.toml:
[project.scripts]
venice-security-reviewer = "venice_security_reviewer.cli:app"
Ecco l’intera pipeline cablata.

Testare i guardrail

Ci siamo appoggiati pesantemente a un’idea per tutto questo build: i guardrail deterministici sono ciò che separa un tool di sicurezza utile da uno convintamente sbagliato. Quell’affermazione vale la pena di farla solo se possiamo dimostrare che i guardrail tengono davvero, quindi i test più preziosi di questo progetto non chiamano affatto Venice. Bloccano il confine Pydantic e il plumbing di assemblaggio del prompt, il che significa che girano offline, in millisecondi, senza API key e senza costo in token. Aggiungi prima le dipendenze di sviluppo:
uv add --dev "pytest>=8.3" "ruff>=0.7" "mypy>=1.13"
La prima cosa che vale la pena testare è il confine del modello stesso. Questi test verificano che findings e catene malformati vengano rifiutati al momento della costruzione, prima ancora di poter raggiungere un report. Crea tests/test_models.py:
from __future__ import annotations

from pathlib import Path

import pytest
from pydantic import ValidationError

from venice_security_reviewer.models import (
    Chain,
    Evidence,
    Finding,
    validate_chain_references,
)


def _finding(fid: str) -> Finding:
    return Finding(
        id=fid,
        title="t",
        severity="medium",
        description="d",
        evidence=Evidence(file=Path("a.py"), start_line=1, end_line=2, snippet="x"),
    )


def test_evidence_rejects_inverted_line_range() -> None:
    with pytest.raises(ValidationError):
        Evidence(file=Path("a.py"), start_line=10, end_line=5, snippet="x")


def test_finding_id_pattern_enforced() -> None:
    with pytest.raises(ValidationError):
        Finding(
            id="not-an-id",
            title="t",
            severity="medium",
            description="d",
            evidence=Evidence(file=Path("a.py"), start_line=1, end_line=2, snippet="x"),
        )


def test_chain_requires_two_findings() -> None:
    with pytest.raises(ValidationError):
        Chain(
            id="C-001",
            findings=["F-001"],
            narrative="n",
            severity="high",
            files_involved=[Path("a.py")],
        )
Ognuno di questi rispecchia un vincolo che abbiamo messo prima sui modelli: un intervallo di righe invertito, un ID che non rispetta il pattern F-### e una “catena” di un singolo finding. Se uno di essi smettesse di sollevare, un’intera classe di allucinazione tornerebbe silenziosamente possibile. Il test più importante copre il validatore di cross-reference, perché è la funzione che effettivamente scarta le catene inventate:
def test_validate_chain_references_drops_unknown_ids() -> None:
    findings = [_finding("F-001"), _finding("F-002")]
    good = Chain(
        id="C-001",
        findings=["F-001", "F-002"],
        narrative="n",
        severity="critical",
        files_involved=[Path("a.py")],
    )
    bad = Chain(
        id="C-002",
        findings=["F-001", "F-999"],
        narrative="n",
        severity="critical",
        files_involved=[Path("a.py")],
    )
    valid, dropped = validate_chain_references([good, bad], findings)
    assert [c.id for c in valid] == ["C-001"]
    assert [c.id for c in dropped] == ["C-002"]
F-999 non è mai stato prodotto dallo Scanner, quindi la catena che lo referenzia finisce in dropped e non raggiunge mai il report. Il test compagno nel repo di riferimento, test_validate_chain_references_drops_unknown_files, fa lo stesso per una catena che rivendica un file da cui nessuno dei suoi findings proviene. La seconda cosa che vale la pena testare è il plumbing che alimenta il Chainer. È facile fare il refactoring dell’assemblaggio del prompt e smettere silenziosamente di passare il contesto cross-file, momento in cui il Chainer continuerebbe a funzionare ma peggiorerebbe silenziosamente. Questo test costruisce una fixture a due moduli, renderizza il prompt e verifica che l’informazione cross-file sia effettivamente presente, di nuovo senza un round-trip su Venice. Crea tests/test_cross_file_chain.py:
from __future__ import annotations

from pathlib import Path

from venice_security_reviewer.chainer import (
    _findings_to_input_json,
    _load_prompt_template,
    _render_repo_map,
)
from venice_security_reviewer.models import Evidence, Finding
from venice_security_reviewer.repo_map import build_repo_map


def _write(root: Path, rel: str, content: str) -> None:
    path = root / rel
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content, encoding="utf-8")


def test_chainer_prompt_carries_cross_file_context(tmp_path: Path) -> None:
    _write(tmp_path, "validators.py", "def is_safe_url(url: str) -> bool:\n    return True")
    _write(
        tmp_path,
        "fetcher.py",
        "from .validators import is_safe_url\n\ndef fetch(url: str) -> bytes:\n    return b''",
    )

    rmap = build_repo_map(tmp_path)
    findings = [
        Finding(
            id="F-001",
            title="Validator returns True unconditionally",
            severity="low",
            description="The validator always returns True.",
            evidence=Evidence(
                file=Path("validators.py"), start_line=1, end_line=2, snippet="..."
            ),
        ),
        Finding(
            id="F-002",
            title="Fetcher trusts a stub validator",
            severity="low",
            description="The fetcher gates network access on is_safe_url.",
            evidence=Evidence(
                file=Path("fetcher.py"), start_line=1, end_line=1, snippet="..."
            ),
        ),
    ]

    template = _load_prompt_template("chainer.md")
    prompt = template.replace(
        "{findings_json}", _findings_to_input_json(findings)
    ).replace("{repo_map}", _render_repo_map(rmap, findings))

    assert "{findings_json}" not in prompt and "{repo_map}" not in prompt
    assert "F-001" in prompt and "F-002" in prompt
    assert "validators.py" in prompt and "fetcher.py" in prompt
    assert "is_safe_url" in prompt
Se questo test passa, al Chainer viene passato un prompt che contiene entrambi i findings, entrambi i path dei file e l’arco di import tra loro. Se il modello utilizza bene quell’informazione è una valutazione separata, fuori banda; questo test salvaguarda solo il plumbing che porta l’informazione nel prompt in primo luogo. Esegui l’intera suite, più il linter e il type checker, con:
uv run pytest          # offline tests, no live Venice calls
uv run ruff check .
uv run mypy src/
Poiché nessuno di questi test tocca la rete, sono sicuri da eseguire a ogni commit e in CI senza bruciare token o aver bisogno di una key Venice. Il repo di riferimento include anche tests/test_scanner_parse.py, tests/test_chainer_parse.py e tests/test_repo_map.py, che coprono casi limite di parsing JSON (voci malformate che vengono scartate invece di far crashare il run) e il builder della mappa AST del repo.

Eseguire il progetto

Per provarlo su una codebase reale, punta la CLI a una directory di sorgenti Python:
uv run venice-security-reviewer scan path/to/your/code
Oppure installalo nel tuo virtualenv con pip install -e . ed esegui venice-security-reviewer scan path/to/your/code. L’output ha più o meno questo aspetto:
Indexing /path/to/code (AST repo map)...
Repo map: 6 module(s), 14 import edge(s).
Scanning /path/to/code with model zai-org-glm-5...
Scanner produced 4 finding(s).
Chaining findings...
Chainer produced 1 chain(s).
Report written to report.md
                Scan summary
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━┓
┃ Metric                    ┃ Count ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━┩
│ Atomic findings           │     4 │
│ Exploit chains            │     1 │
└───────────────────────────┴───────┘
Il report Markdown mostra ogni catena in cima con la sua narrazione, poi ogni singolo finding sotto con severità, CWE, posizione del file, descrizione e lo snippet verbatim che il modello dichiara di aver letto. Il repo di riferimento include anche quattro target demo bundled, ognuno dei quali mette alla prova una forma diversa di ragionamento che il Chainer deve fare:
  • examples/vulnerable_app — una app Flask multi-file con tre findings “low”, due dei quali si combinano in una catena critica di privilege-escalation cross-file. Verifica se il Chainer è selettivo su cosa combina.
  • examples/url_preview — un URL-fetcher multi-file con un’allowlist difensiva che non si applica per iterazione. Verifica il ragionamento sul flusso di dati cross-file combinato con la topologia di deployment (gli IP link-local sono gateway per le credenziali cloud).
  • examples/csv_query — un filtro CSV single-file con un sandbox escape via eval attraverso __class__.__base__.__subclasses__(). Verifica ragionamento a livello di linguaggio piuttosto che flusso HTTP.
  • examples/webhook_handler — un verificatore HMAC single-file con una vulnerabilità parser-differential JSON. Verifica il ragionamento su più specifiche.
Provali con:
uv run venice-security-reviewer scan examples/vulnerable_app
uv run venice-security-reviewer scan examples/csv_query
Se vedi mai la CLI loggare chainer referenced N unknown finding id(s) or file(s); chains dropped, è il validatore di cross-reference che becca il modello sul fatto mentre inventa una catena. Le catene scartate non finiscono mai nel report; ottieni solo un warning che puoi usare per aggiustare il prompt o campionare run aggiuntivi del Chainer.

Estendere questo esempio

La forma a due agenti generalizza bene. Alcune direzioni che vale la pena esplorare:
  • Altri linguaggi. Lo Scanner è agnostico al linguaggio a livello di prompt; il builder dell’AST è ciò che è specifico per Python. Sostituiscilo con tree-sitter e puoi costruire le stesse forme di vicinato/mappa condensata per TypeScript, Go o Rust.
  • Un terzo agente per le fix. Una volta che hai una catena, chiedere a un agente Patcher di abbozzare una diff unificata che disinneschi uno dei findings costituenti è un piccolo passo. Pydantic-valida la diff contro lo stesso set di file di evidence e ottieni la stessa protezione contro le allucinazioni gratis.
  • Formati di output. render_report è l’unico posto che conosce Markdown. Aggiungi un renderer SARIF e gli stessi findings possono finire in GitHub code scanning. Aggiungi un renderer JSON e puoi convogliare i risultati in un sistema downstream.
  • Caching per hash del file. Le chiamate per-file dello Scanner sono indipendenti e idempotenti. Cachare per (file_hash, prompt_hash, model) significa che riscansionare un repo in cui è cambiato un file rieseguirà lo Scanner solo su quel singolo file.
  • Sampling per il Chainer. Per run ad alto rischio, chiama il Chainer N volte a temperatura leggermente più alta e interseca i risultati. Le catene che il modello trova in modo consistente sono più probabilmente reali; quelle che trova una volta e mai più sono probabilmente rumore.
  • Modelli più potenti. zai-org-glm-5 è il default perché raggiunge un buon equilibrio tra costo e qualità per il ragionamento combinatorio, ma per codebase più difficili passare a un modello Venice più potente (impostato via VENICE_MODEL) può rendere le narrazioni del Chainer notevolmente più strette.

Per concludere

Grazie per aver letto! Speriamo che ti abbia aiutato a capire come strutturare un tool di sicurezza AI di cui ci si possa effettivamente fidare. Il pattern che abbiamo usato qui generalizza oltre la sicurezza: ogni volta che vuoi un LLM che ragioni attraverso file in un modo che deve ancorarsi a evidenza reale, la ricetta è la stessa. Costruisci una mappa strutturale deterministica, dai al modello una slice che entri nel contesto, valida i riferimenti del modello contro la struttura e scarta tutto ciò che non riesce a fondare. Usando Python con l’API di Venice AI, possiamo costruire agenti che combinano il ragionamento LLM con confini di validazione rigidi, e rilasciare qualcosa che dia una risposta utile invece di una che suoni solo sicura.