Zum Hauptinhalt springen
Die meisten statischen Security-Tools finden Bugs isoliert. Sie scannen eine Datei, listen Probleme auf und sind fertig. Das Problem: Die schädlichsten Schwachstellen in modernen Codebasen sind selten ein einzelner Bug. Sie sind eine Kette: ein hardcodierter Signing-Key plus eine fehlende Autorisierungsprüfung plus eine SQL-Injection, die einzeln alle handhabbar wirken. Zusammen sind sie ein Account-Takeover-Pfad. Genau diese Art von übergreifender Argumentation können LLMs gut – wenn man ihnen die richtige Struktur gibt. In diesem Artikel bauen wir einen Zwei-Agenten-Security-Code-Reviewer mit Python und der Venice-API. Am Ende hast du ein CLI, das du auf jede Python-Codebase richten kannst, um einen Markdown-Report mit atomaren Findings und Exploit-Chains zu erzeugen. Interesse an der vollständigen Implementierung? Schau in das GitHub-Repo. Bevor wir loslegen, brauchst du einen Venice-API-Schlüssel. Exportiere ihn als Umgebungsvariable:
export VENICE_API_KEY=<my-key>

Was wir bauen

Der Reviewer ist ein kleines Python-Projekt mit einigen klaren Teilen:
TeilWas es tut
Pydantic-ModelleDefinieren Evidence, Finding und Chain und schaffen eine harte Validierungsgrenze zwischen dem LLM und dem Rest des Programms
Venice-ClientUmschließt das OpenAI-Python-SDK, gerichtet auf Venices OpenAI-kompatiblen Endpoint
AST-Repo-MapLäuft mit Pythons ast-Modul durch den Zielbaum und baut eine deterministische Karte der öffentlichen Symbole und Import-Kanten jedes Moduls
Scanner-AgentLiest eine Python-Datei plus einen pro Datei zugeschnittenen Nachbarschafts-Slice der Repo-Map und liefert atomare Findings mit Datei:Zeile-Evidenz
Chainer-AgentLiest die Vereinigung der Findings plus eine kondensierte vollständige Repo-Map und liefert Exploit-Chains, die zwei oder mehr Findings kombinieren
Referenz-ValidatorVerwirft jede Chain, die eine Finding-ID referenziert, die der Scanner nicht erzeugt hat, oder eine Datei nennt, aus der keines ihrer referenzierten Findings tatsächlich stammt
Markdown-ReportRendert Findings und Chains in einen menschenlesbaren Bericht
CLIVerdrahtet alles mit Typer
Der Flow sieht so aus:
  1. Den Zielordner nach .py-Dateien durchwandern.
  2. Eine deterministische Repo-Map bauen (Imports, öffentliche Symbole, Signaturen).
  3. Für jede Datei dem Scanner ihren Quelltext plus einen pro Datei zugeschnittenen Nachbarschafts-Slice der Map senden und atomare Findings sammeln.
  4. Die Vereinigung der Findings plus die kondensierte Repo-Map dem Chainer senden und Exploit-Chains sammeln.
  5. Jede Chain verwerfen, die eine Finding-ID referenziert, die der Scanner nicht erzeugt hat, oder eine Datei nennt, aus der keines ihrer Findings stammt.
  6. Einen Markdown-Report schreiben.
Zwei Designentscheidungen lohnen sich vor dem Code zu nennen. Erstens: Warum zwei Agenten statt einem? Ein Einzelagent-Scanner, der in einem Prompt alles zu erledigen versucht, muss zwischen Gründlichkeit bei Per-Datei-Bugs und Cleverness bei kombinatorischer Argumentation balancieren. Die Aufteilung erlaubt dem Scanner, hartnäckig und laut zu sein, und dem Chainer, selektiv und ruhig. Ein zusätzlicher LLM-Aufruf, dediziert zum Kombinieren von Findings, schaltet für sehr wenig Extra-Code eine ganze Klasse von Bugs frei. Zweitens: Warum eine Repo-Map? Echte Codebasen verteilen sich über viele Dateien. Ein Bug, der lautet „der Validator läuft, wird aber nicht pro Iteration im Fetcher angewendet, und die Antwort des Fetchers landet im Renderer”, ist für einen reinen Per-Datei-Scanner unsichtbar. Vor jedem LLM-Aufruf laufen wir mit Pythons ast durch den Zielbaum und bauen eine strukturelle Map. Der Scanner sieht eine pro Datei zugeschnittene Nachbarschaft (wer von dieser Datei importiert, was diese Datei importiert, Signaturen dieser externen Symbole). Der Chainer sieht eine kondensierte vollständige Map (jedes Modul, jedes öffentliche Symbol, jede Import-Kante, kein Quelltext). Das ist das kleinste Context-Engineering, das wir gefunden haben, das es dem Chainer erlaubt, Chains zu bauen, deren Datenfluss Modulgrenzen überschreitet, ohne die Token-Kosten zu zahlen, die ganze Codebase in jeden Prompt zu stopfen.

Voraussetzungen

  • Python 3.12+
  • Ein Venice-API-Schlüssel von venice.ai
  • Grundverständnis von Pydantic, Pythons ast-Modul und dem OpenAI-Python-SDK
Die Referenz nutzt uv für Dependency-Management; eine reguläre Virtual Environment funktioniert genauso.

Projekt einrichten

Neues Projekt anlegen und Abhängigkeiten installieren:
mkdir venice-security-reviewer
cd venice-security-reviewer
uv init
uv add "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0"
Wer lieber pip nutzt, legt stattdessen eine Virtual Environment an:
python -m venv .venv
source .venv/bin/activate
pip install "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0"
Lege eine .env-Datei für die lokale Entwicklung an:
VENICE_API_KEY=your-venice-api-key-here
# Optionale Overrides
# VENICE_BASE_URL=https://api.venice.ai/api/v1
# VENICE_MODEL=zai-org-glm-5
Wir legen den Quellcode unter src/venice_security_reviewer/ ab, damit er als Paket importierbar ist, und die Prompts unter prompts/ im Repo-Root, damit sie wie jede andere Quelldatei reviewbar und diff-bar sind:
src/venice_security_reviewer/
  __init__.py
  models.py     # Pydantic-Modelle
  client.py     # Venice-Client-Factory
  repo_map.py   # AST-erstellte Repo-Map
  scanner.py    # Scanner-Agent
  chainer.py    # Chainer-Agent
  report.py     # Jinja2-Markdown-Rendering
  cli.py        # Typer-CLI
  templates/
    report.md.j2
prompts/
  scanner.md
  chainer.md
tests/
  test_models.py
  test_cross_file_chain.py

Venice-Client aufsetzen

Venice ist OpenAI-kompatibel, also können wir das offizielle OpenAI-Python-SDK nutzen und nur die base_url auf Venice zeigen lassen. Die Client-Erstellung in einer Datei zu zentralisieren bedeutet, dass der Rest des Codes nie wissen muss, mit welchem Provider er spricht: Ein Backend-Wechsel betrifft nur dieses Modul. Erstelle src/venice_security_reviewer/client.py:
from __future__ import annotations

import os
from dataclasses import dataclass

from dotenv import load_dotenv
from openai import OpenAI

DEFAULT_BASE_URL = "https://api.venice.ai/api/v1"
DEFAULT_MODEL = "zai-org-glm-5"


class VeniceConfigError(RuntimeError):
    """Raised when Venice client config is missing or invalid."""


@dataclass(frozen=True, slots=True)
class VeniceConfig:
    api_key: str
    base_url: str
    model: str

    @classmethod
    def from_env(cls) -> "VeniceConfig":
        load_dotenv()
        api_key = os.getenv("VENICE_API_KEY")
        if not api_key:
            raise VeniceConfigError(
                "VENICE_API_KEY is not set. Add it to your .env file, "
                "or export VENICE_API_KEY in your shell."
            )
        return cls(
            api_key=api_key,
            base_url=os.getenv("VENICE_BASE_URL", DEFAULT_BASE_URL),
            model=os.getenv("VENICE_MODEL", DEFAULT_MODEL),
        )


def build_client(config: VeniceConfig | None = None) -> tuple[OpenAI, str]:
    cfg = config or VeniceConfig.from_env()
    client = OpenAI(api_key=cfg.api_key, base_url=cfg.base_url)
    return client, cfg.model
Ein paar Hinweise:
  • Wir nehmen standardmäßig zai-org-glm-5, weil es ein starkes generisches Venice-Modell ist; per Umgebungsvariable VENICE_MODEL kannst du das überschreiben. Für größere oder feinere Codebasen kann ein stärkeres Modell den Chainer in der erzählerischen Qualität spürbar besser machen.
  • build_client gibt Client und Modell-ID zurück, sodass Caller keine Env-Vars selbst lesen müssen und Tests eine Fake-Config ohne Monkey-Patching injizieren können.

Datenmodelle definieren

Der ganze Sinn, hier Pydantic statt rohe Dicts zu nutzen, ist eine harte Validierungsgrenze zwischen LLM und dem Rest des Programms. Liefert das Modell fehlerhaftes JSON oder erfindet eine Finding-ID, schlägt das Parsen laut fehl und wir propagieren die Halluzination nie in den Report. Erstelle src/venice_security_reviewer/models.py:
from __future__ import annotations

from pathlib import Path
from typing import Literal, Self

from pydantic import BaseModel, ConfigDict, Field, model_validator

Severity = Literal["low", "medium", "high", "critical"]
ChainSeverity = Literal["high", "critical"]


class Evidence(BaseModel):
    """A concrete code span that justifies a finding."""

    model_config = ConfigDict(frozen=True)

    file: Path
    start_line: int = Field(ge=1)
    end_line: int = Field(ge=1)
    snippet: str

    @model_validator(mode="after")
    def _check_line_range(self) -> Self:
        if self.end_line < self.start_line:
            raise ValueError(
                f"end_line ({self.end_line}) must be >= start_line ({self.start_line})"
            )
        return self


class Finding(BaseModel):
    """An atomic vulnerability surfaced by the Scanner agent."""

    model_config = ConfigDict(frozen=True)

    id: str = Field(pattern=r"^F-\d{3,}$")
    title: str = Field(min_length=1)
    severity: Severity
    description: str = Field(min_length=1)
    cwe: str | None = None
    evidence: Evidence


class Chain(BaseModel):
    """An exploit chain combining two or more atomic findings."""

    model_config = ConfigDict(frozen=True)

    id: str = Field(pattern=r"^C-\d{3,}$")
    findings: list[str] = Field(min_length=2)
    narrative: str = Field(min_length=1)
    severity: ChainSeverity
    files_involved: list[Path] = Field(min_length=1)
Die Constraints leisten hier echte Arbeit:
  • Finding.id und Chain.id sind auf eine Regex wie F-001, C-001 festgelegt. Wird das Modell kreativ, scheitert die Validierung.
  • Chain.findings verlangt mindestens zwei Einträge: Eine „Chain” mit einem Finding ist nur ein Finding.
  • Chain.severity ist auf high oder critical beschränkt. Eine Kombination, die den Impact nicht über die höchste Einzel-Severity hebt, ist keine berichtenswerte Chain.
  • Evidence erzwingt end_line >= start_line, sodass das Modell keine unsinnigen Zeilenbereiche liefern kann.
Das ist die Form-Validierung. Wir brauchen außerdem Cross-Reference-Validierung: Eine Chain, die eine Finding-ID referenziert, die der Scanner nie erzeugt hat, ist sinnlos. Ergänze in models.py:
def validate_chain_references(
    chains: list[Chain], findings: list[Finding]
) -> tuple[list[Chain], list[Chain]]:
    findings_by_id = {f.id: f for f in findings}
    valid: list[Chain] = []
    dropped: list[Chain] = []
    for chain in chains:
        if not all(ref in findings_by_id for ref in chain.findings):
            dropped.append(chain)
            continue
        chain_evidence_files = {
            findings_by_id[ref].evidence.file.as_posix() for ref in chain.findings
        }
        if not all(p.as_posix() in chain_evidence_files for p in chain.files_involved):
            dropped.append(chain)
            continue
        valid.append(chain)
    return valid, dropped
Das ist die deterministische Leitplanke, die den Chainer ehrlich hält. Er darf nur Findings referenzieren, die der Scanner tatsächlich erzeugt hat, und er darf nur Dateien als beteiligt nennen, aus denen eines dieser Findings stammt. Die verworfenen Chains zurückzugeben statt sie still zu filtern, erlaubt es dem CLI, eine Warnung anzuzeigen, wenn das Modell versucht, etwas zu erfinden.

Die AST-Repo-Map bauen

Die Repo-Map ist das strukturelle Skelett einer Python-Codebase: jede öffentliche Oberfläche eines Moduls, jede Import-Kante und ein Reverse-Index von „Modul M” zu „Modulen, die von M importieren”. Sie wird einmal pro Scan-Lauf mit Pythons ast gebaut, nie per Ausführung – sicher auch bei adversärem Code, weil der Parser nichts aus dem gescannten Baum importiert oder ausführt. Wir konsumieren die Map in zwei Formen. Der Scanner bekommt einen pro Datei zugeschnittenen Nachbarschafts-Slice, damit die Prompts in der Größe begrenzt bleiben. Der Chainer bekommt eine kondensierte vollständige Map, um Chains über Dateien hinweg zu konstruieren. Erstelle src/venice_security_reviewer/repo_map.py und beginne mit den Pydantic-Modellen für die Map:
from __future__ import annotations

import ast
import logging
from collections.abc import Iterable
from pathlib import Path
from typing import Literal

from pydantic import BaseModel, ConfigDict, Field

logger = logging.getLogger(__name__)

SymbolKind = Literal["function", "class", "constant"]
_SIGNATURE_CHAR_CAP = 200

SKIP_DIR_NAMES: frozenset[str] = frozenset({
    ".git", ".venv", "venv", "env", "__pycache__", "node_modules",
    "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "site-packages",
})


class SymbolDef(BaseModel):
    model_config = ConfigDict(frozen=True)
    name: str
    kind: SymbolKind
    line: int = Field(ge=1)
    signature: str | None = None


class ImportEdge(BaseModel):
    model_config = ConfigDict(frozen=True)
    from_module: str
    imported_names: list[str]
    line: int = Field(ge=1)


class ModuleEntry(BaseModel):
    model_config = ConfigDict(frozen=True)
    path: Path
    module_name: str
    defines: list[SymbolDef]
    imports: list[ImportEdge]
    exports: list[str]
Jetzt der Helper, der durch den Baum läuft und Verzeichnisse überspringt, die wir nicht indizieren sollten:
def _iter_python_files(root: Path) -> Iterable[Path]:
    for path in sorted(root.rglob("*.py")):
        if any(part in SKIP_DIR_NAMES for part in path.parts):
            continue
        if path.is_file():
            yield path


def _path_to_module_name(path: Path, root: Path) -> str:
    rel = path.relative_to(root)
    parts = list(rel.with_suffix("").parts)
    if parts and parts[-1] == "__init__":
        parts = parts[:-1]
    return ".".join(parts)
Pro Datei wollen wir drei Dinge aus dem AST: die top-level definierten Symbole, die Import-Kanten und – falls vorhanden – eine explizite __all__-Liste. Funktions-Signaturen und Class-Header werden zu kompakten Strings gerendert, die das LLM direkt lesen kann:
def _render_signature(node: ast.FunctionDef | ast.AsyncFunctionDef) -> str:
    try:
        prefix = "async def " if isinstance(node, ast.AsyncFunctionDef) else "def "
        args = ast.unparse(node.args)
        returns = f" -> {ast.unparse(node.returns)}" if node.returns is not None else ""
        sig = f"{prefix}{node.name}({args}){returns}"
        if len(sig) > _SIGNATURE_CHAR_CAP:
            return f"{prefix}{node.name}(...)"
        return sig
    except Exception:
        return f"def {node.name}(...)"


def _render_class_header(node: ast.ClassDef) -> str:
    try:
        bases = [ast.unparse(b) for b in node.bases]
        sig = f"class {node.name}({', '.join(bases)})" if bases else f"class {node.name}"
        if len(sig) > _SIGNATURE_CHAR_CAP:
            return f"class {node.name}(...)"
        return sig
    except Exception:
        return f"class {node.name}"
Das _SIGNATURE_CHAR_CAP von 200 erhält typische echte Signaturen (inkl. Type-Hints) und verhindert pathologische Fälle wie eine 200-zeilige typisierte Union, die den Prompt sprengen würde. Als Nächstes der Extraktor, der die strukturellen Daten aus einem geparsten Modul herauszieht. Wir behandeln ast.FunctionDef, ast.ClassDef, top-level ast.Assign und ast.AnnAssign für Konstanten und sowohl ast.Import als auch ast.ImportFrom für die Import-Kanten. Relative Imports werden in ihre absolute, punktierte Form aufgelöst, damit der Chainer sie später gegen Modulnamen matchen kann:
def _resolve_relative_package(
    *, importer_module: str, importer_is_init: bool, level: int
) -> str | None:
    if level <= 0:
        return None
    importer_parts = importer_module.split(".") if importer_module else []
    base_parts = list(importer_parts) if importer_is_init else importer_parts[:-1]
    steps_up = level - 1
    if steps_up > len(base_parts):
        return None
    package_parts = (
        base_parts[: len(base_parts) - steps_up] if steps_up else list(base_parts)
    )
    return ".".join(package_parts)
Die vollständige Extraktionslogik läuft durch tree.body und gibt pro top-level Knoten SymbolDef- und ImportEdge-Einträge aus. Die _extract-Funktion in repo_map.py des Referenz-Repos zeigt die vollständige Implementierung. Heraus kommt eine Liste von ModuleEntry-Objekten, eines pro Datei. Der interessante Teil ist, was wir mit diesen Einträgen tun. Wickeln wir sie in eine RepoMap mit zwei Consumer-Methoden:
class RepoMap(BaseModel):
    model_config = ConfigDict(frozen=True)
    root: Path
    modules: list[ModuleEntry]

    def by_module_name(self, module_name: str) -> ModuleEntry | None:
        for m in self.modules:
            if m.module_name == module_name:
                return m
        return None

    def importers_of(self, module_name: str) -> list["ImportingRef"]:
        refs: list["ImportingRef"] = []
        for m in self.modules:
            for edge in m.imports:
                if edge.from_module == module_name:
                    refs.append(
                        ImportingRef(
                            importer_path=m.path,
                            importer_module=m.module_name,
                            imported_names=list(edge.imported_names),
                            line=edge.line,
                        )
                    )
        return refs

    def neighborhood(self, path: Path) -> "ModuleNeighborhood | None":
        m = next((mod for mod in self.modules if mod.path == path), None)
        if m is None:
            return None
        return ModuleNeighborhood(
            this_module=m,
            imported_by=self.importers_of(m.module_name),
            imports_from_repo=self.resolve_imports_in_repo(m.module_name),
        )

    def condensed_dict(self) -> dict[str, object]:
        return {
            "modules": [
                {
                    "path": str(m.path),
                    "module": m.module_name,
                    "exports": list(m.exports),
                    "imports": [
                        {"from": e.from_module, "names": list(e.imported_names)}
                        for e in m.imports
                    ],
                }
                for m in self.modules
            ]
        }
neighborhood(path) ruft der Scanner pro Datei auf. Es liefert ein ModuleNeighborhood-Objekt mit dem Modul selbst, jedem anderen Modul, das von ihm importiert, und jedem in-repo-Symbol, das es anderswo importiert (mit aufgelösten Signaturen). Damit hat der Scanner genug Kontext, um Findings zu flaggen, die nur im Cross-File-Kontext offensichtlich sind, ohne die ganze Codebase in den Prompt zu ziehen. condensed_dict() bekommt der Chainer. Snippets und Signaturen entfallen; nur Pfade, Modulnamen, öffentliche Exports und Import-Kanten bleiben. Das ist die kleinste Darstellung, mit der der Chainer trotzdem über modulübergreifenden Datenfluss räsonieren kann. Schließlich der Entry Point, der das Ganze baut:
def build_repo_map(root: Path) -> RepoMap:
    root = root.resolve()
    modules: list[ModuleEntry] = []
    for path in _iter_python_files(root):
        rel = path.relative_to(root)
        module_name = _path_to_module_name(path, root)
        is_init = path.stem == "__init__"
        try:
            source = path.read_text(encoding="utf-8")
            tree = ast.parse(source)
        except (OSError, SyntaxError, UnicodeDecodeError) as exc:
            logger.warning("repo_map: skipping %s: %s", rel, exc)
            continue
        defines, imports, explicit_all = _extract(
            tree, importer_module=module_name, importer_is_init=is_init
        )
        exports = explicit_all or [s.name for s in defines if not s.name.startswith("_")]
        modules.append(
            ModuleEntry(
                path=rel,
                module_name=module_name,
                defines=defines,
                imports=imports,
                exports=exports,
            )
        )
    return RepoMap(root=root, modules=modules)
Dateien, die wir nicht lesen können oder die nicht parsen, werden geloggt und übersprungen. Wir geben eine partielle Map zurück, statt den ganzen Lauf scheitern zu lassen; im schlimmsten Fall sieht ein Scanner-Aufruf für eine Datei keine Nachbarschaft, was immer noch ein funktionierender Scan ist.

Den Scanner-Agenten schreiben

Der Scanner läuft einen Zielpfad ab, sammelt Python-Quelldateien und bittet Venice, atomare Schwachstellen Datei für Datei zu identifizieren. Per-Datei-Scanning hält den Prompt klein und macht Fehler lokal: Eine kaputte Datei kippt nicht den ganzen Lauf. Wir halten den Prompt in einer separaten Datei, damit er wie jede andere Quelldatei reviewbar und diff-bar ist. Erstelle prompts/scanner.md:
You are a static security analyst reviewing a single Python source file for
vulnerabilities. You will be given the file path, its full contents, and a
*neighborhood* slice of the surrounding repo: which other modules import
from this file (and what symbols they pull), and which in-repo symbols this
file imports from elsewhere. You must respond with a JSON object that lists
every distinct vulnerability you can identify, with concrete file:line
evidence for each.

# Rules

1. Output a single JSON object. No prose before or after. No markdown fences.
2. The object must match this schema exactly:

```json
{
  "findings": [
    {
      "id": "F-001",
      "title": "Short imperative title, e.g. 'Hardcoded session signing key'",
      "severity": "low | medium | high | critical",
      "description": "One to three sentences explaining the vulnerability and why it matters.",
      "cwe": "CWE-798 or null if not applicable",
      "evidence": {
        "file": "{filename}",
        "start_line": 12,
        "end_line": 14,
        "snippet": "the exact lines from the source, copied verbatim including whitespace"
      }
    }
  ]
}
```

3. Finding IDs must be sequential within this file: F-001, F-002, F-003, etc.
4. The `file` field in evidence must equal the filename you were given, exactly.
5. `start_line` and `end_line` must be 1-indexed line numbers from the source you were given.
6. The `snippet` must be the exact text of those lines, copied verbatim. Do not paraphrase. Do not truncate.
7. Do not invent vulnerabilities. If you are unsure, omit it. False positives waste the operator's time and erode trust in the tool.
8. Every finding's evidence must point at lines in THIS file. Do not produce findings whose evidence lives in a different file. The Chainer is the agent that reasons across files.
9. If the file contains no vulnerabilities, return `{"findings": []}`.
Der vollständige Prompt im Referenz-Repo enthält außerdem einen Abschnitt „What to look for” mit gängigen Schwachstellenklassen (hardcodierte Secrets, SQL-Injection, Command-Injection, SSRF, unsichere Deserialisierung usw.) und einen Abschnitt „How to use the neighborhood”, der erklärt, wie das Modell den Cross-File-Kontext konsumieren soll. Ein paar Prompt-Design-Hinweise:
  • Wir weisen das Modell an, ausschließlich JSON ohne Prosa oder Fences zu liefern. Das OpenAI-SDK unterstützt response_format={"type": "json_object"}, das das API-seitig erzwingt – die Verstärkung im Prompt reduziert aber Edge-Cases.
  • Wir verbieten dem Scanner ausdrücklich, Cross-File-Chains zu produzieren. Chains sind die Aufgabe des Chainers; beides verlangen würde die Verantwortung verwischen.
  • Das Snippet muss verbatim kopiert sein. So kann der Report die genauen Bytes zitieren, die das Modell gesehen haben will, und ein Reviewer kann ein Finding stichprobenartig prüfen.
Jetzt der Agent-Code. Erstelle src/venice_security_reviewer/scanner.py und beginne mit File-Walker und Prompt-Loader:
from __future__ import annotations

import json
import logging
from collections.abc import Iterable, Iterator
from pathlib import Path

from openai import OpenAI
from pydantic import ValidationError

from .models import Finding
from .repo_map import ModuleNeighborhood, RepoMap

logger = logging.getLogger(__name__)

DEFAULT_SOURCE_EXTENSIONS: frozenset[str] = frozenset({".py"})

SKIP_DIR_NAMES: frozenset[str] = frozenset({
    ".git", ".venv", "venv", "env", "__pycache__", "node_modules",
    "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "site-packages",
})

MAX_FILE_BYTES = 200_000


def _load_prompt_template(name: str) -> str:
    here = Path(__file__).resolve()
    return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8")


def iter_source_files(
    root: Path, extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS
) -> Iterator[Path]:
    exts = {e.lower() for e in extensions}
    for path in sorted(root.rglob("*")):
        if not path.is_file():
            continue
        if path.suffix.lower() not in exts:
            continue
        if any(part in SKIP_DIR_NAMES for part in path.parts):
            continue
        try:
            if path.stat().st_size > MAX_FILE_BYTES:
                logger.warning("skipping %s: exceeds %d bytes", path, MAX_FILE_BYTES)
                continue
        except OSError:
            continue
        yield path
MAX_FILE_BYTES ist eine Sicherheitsobergrenze. Jenseits ~200 KB überspringen wir die Datei, statt einen riesigen, wahrscheinlich teuren und qualitativ schwachen Prompt zu senden. Als Nächstes der Prompt-Builder. Das Template nutzt {filename}, {source} und {neighborhood} als Platzhalter; wir verwenden str.replace statt .format(), weil das Template JSON-Beispiele mit literalen geschweiften Klammern enthält:
def _render_neighborhood(neighborhood: ModuleNeighborhood | None) -> str:
    if neighborhood is None:
        return "null"
    return neighborhood.model_dump_json(indent=2)


def _build_prompt(
    template: str, *, filename: str, source: str, neighborhood: ModuleNeighborhood | None
) -> str:
    return (
        template.replace("{filename}", filename)
        .replace("{source}", source)
        .replace("{neighborhood}", _render_neighborhood(neighborhood))
    )
Jetzt der Parser. Wir deserialisieren das JSON, validieren jedes Finding über Pydantic und verwerfen einzelne fehlerhafte Findings, statt die Datei zu scheitern. Ein schlechtes Finding soll nicht die guten kosten:
def _parse_findings(raw: str, *, source_file: Path) -> list[Finding]:
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError(f"model did not return valid JSON: {exc}") from exc

    if not isinstance(data, dict) or "findings" not in data:
        raise ValueError("model JSON missing 'findings' key")

    findings: list[Finding] = []
    for entry in data["findings"]:
        try:
            findings.append(Finding.model_validate(entry))
        except ValidationError as exc:
            logger.warning("dropping malformed finding from %s: %s", source_file, exc)
    return findings
Der Scanner vergibt IDs wie F-001 pro Datei, aber der Chainer muss Findings über das ganze Repo hinweg referenzieren. Wir vergeben die IDs gegen einen monotonen Zähler neu, damit sie global eindeutig sind:
def _renumber_findings(findings: list[Finding], offset: int) -> tuple[list[Finding], int]:
    renumbered: list[Finding] = []
    for i, f in enumerate(findings):
        new_id = f"F-{offset + i + 1:03d}"
        renumbered.append(f.model_copy(update={"id": new_id}))
    return renumbered, offset + len(findings)
Der Single-File-Scan kombiniert all das. Wir lesen die Datei, bauen den Prompt, schicken ihn mit response_format={"type": "json_object"} und niedriger Temperatur an Venice und parsen das Ergebnis:
def scan_file(
    client: OpenAI,
    model: str,
    path: Path,
    *,
    prompt_template: str,
    repo_root: Path,
    repo_map: RepoMap,
    max_retries: int = 1,
) -> list[Finding]:
    try:
        source = path.read_text(encoding="utf-8")
    except (OSError, UnicodeDecodeError) as exc:
        logger.warning("could not read %s: %s", path, exc)
        return []

    rel = path.relative_to(repo_root)
    neighborhood = repo_map.neighborhood(rel)
    prompt = _build_prompt(
        prompt_template, filename=str(rel), source=source, neighborhood=neighborhood
    )

    last_error: Exception | None = None
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You are a precise static security analyst. You respond "
                            "only with valid JSON matching the schema in the user prompt."
                        ),
                    },
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.1,
            )
        except Exception as exc:
            logger.warning("Venice call failed for %s on attempt %d: %s", rel, attempt, exc)
            last_error = exc
            continue

        content = response.choices[0].message.content or ""
        try:
            findings = _parse_findings(content, source_file=path)
        except ValueError as exc:
            logger.warning("parse failure for %s on attempt %d: %s", rel, attempt, exc)
            last_error = exc
            continue

        return [
            f.model_copy(update={"evidence": f.evidence.model_copy(update={"file": rel})})
            for f in findings
        ]

    logger.error("giving up on %s after %d attempts: %s", rel, max_retries + 1, last_error)
    return []
Zwei Details lohnen sich:
  • Wir setzen den Evidence-Dateipfad nach dem Parsen relativ zu repo_root, weil das Modell den übergebenen Dateinamen zurückgibt, wir aber im ganzen Report eine kanonische Form wollen.
  • temperature=0.1 ist bewusst niedrig. Der Scanner soll konservativ und über Läufe konsistent sein; Kreativität ist die Aufgabe des Chainers.
Zuletzt der Orchestrator, der jede passende Datei unter dem Root scannt:
def scan_path(
    client: OpenAI,
    model: str,
    root: Path,
    repo_map: RepoMap,
    *,
    extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS,
) -> list[Finding]:
    template = _load_prompt_template("scanner.md")
    all_findings: list[Finding] = []
    offset = 0
    for path in iter_source_files(root, extensions=extensions):
        logger.info("scanning %s", path.relative_to(root))
        findings = scan_file(
            client, model, path,
            prompt_template=template,
            repo_root=root,
            repo_map=repo_map,
        )
        renumbered, offset = _renumber_findings(findings, offset)
        all_findings.extend(renumbered)
    return all_findings
Die Repo-Map wird vom Caller einmal gebaut und für jede Datei wiederverwendet, sodass der Scanner eine konsistente globale Struktur sieht, auch wenn einzelne Dateien beim Parsen scheitern oder übersprungen werden.

Den Chainer-Agenten schreiben

Der Chainer nimmt die Vereinigung der Scanner-Findings plus die kondensierte Repo-Map und fragt Venice, ob eine Teilmenge der Findings zu einer realen Exploit-Chain kombiniert werden kann. Zwei deterministische Leitplanken stehen zwischen LLM und Report:
  1. Jede Chain darf nur Finding-IDs referenzieren, die der Scanner erzeugt hat.
  2. Jede Chain darf nur Dateien benennen, die mindestens ein referenziertes Finding tatsächlich berührt.
Chains, die eine Regel verletzen, werden beim Parsen verworfen. Das stoppt das Modell daran, Chains „nur sicherheitshalber” zu halluzinieren oder zu behaupten, eine Chain umfasse Dateien, für die es keine Evidenz hat. Der Chainer-Prompt liegt unter prompts/chainer.md. Der Kern davon:
You are a senior offensive security engineer. You are given a list of atomic
vulnerability findings discovered in a single codebase, plus a structural map
of that codebase showing every module's public symbols and import edges. Your
job is to identify whether any subset of the findings can be combined into a
real, end-to-end exploit chain.

# Rules

1. Output a single JSON object. No prose before or after. No markdown fences.
2. The object must match this schema exactly:

```json
{
  "chains": [
    {
      "id": "C-001",
      "findings": ["F-001", "F-003"],
      "narrative": "Step-by-step explanation of how an attacker combines these specific findings into a single exploit. Reference each finding by ID where it is used.",
      "severity": "high | critical",
      "files_involved": ["pkg/validators.py", "pkg/fetcher.py"]
    }
  ]
}
```

3. Chain IDs must be sequential: C-001, C-002, C-003, etc.
4. Every entry in `findings` MUST be the ID of a finding from the input list. You may NOT invent new finding IDs.
5. Every entry in `files_involved` MUST be the `evidence.file` of at least one of the findings you reference in this chain.
6. A chain must reference at least two distinct findings.
7. Chains are by definition severity high or critical. If a combination doesn't raise the impact above the highest individual severity, it is not a chain worth reporting.
8. If no real chain exists, return `{"chains": []}`. It is correct and expected for many codebases to have findings that do not chain.
Der vollständige Prompt im Referenz-Repo erklärt zudem, wie man die Repo-Map liest, wie man entscheidet, was in files_involved gehört, und vor allem, wann nicht zu chainen. Dem Modell zu sagen „it is correct and expected for many codebases to have findings that do not chain” hält es davon ab, Chains zu erfinden, um produktiv zu wirken. Jetzt der Agent-Code. Erstelle src/venice_security_reviewer/chainer.py:
from __future__ import annotations

import json
import logging
from pathlib import Path

from openai import OpenAI
from pydantic import ValidationError

from .models import Chain, Finding, validate_chain_references
from .repo_map import RepoMap

logger = logging.getLogger(__name__)

MAX_REPO_MAP_CHARS = 8000


def _load_prompt_template(name: str) -> str:
    here = Path(__file__).resolve()
    return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8")
MAX_REPO_MAP_CHARS = 8000 ist eine weiche Obergrenze für den als JSON gerenderten Repo-Map-Block im Chainer-Prompt. Bei rund 4 Zeichen pro Token entspricht das ~2000 Tokens und passt komfortabel in jedes Venice-Modellkontext, selbst mit Findings und Erzähl-Budget obendrauf. Wir serialisieren Findings in einen kompakten JSON-Block. Wir streichen das snippet aus Evidence bewusst: Der Chainer braucht keine rohen Bytes, um zu entscheiden, ob zwei Findings kombinieren, und ihre Aufnahme verdoppelt grob den Token-Kostenpunkt auf realen Codebasen:
def _findings_to_input_json(findings: list[Finding]) -> str:
    payload = [
        {
            "id": f.id,
            "title": f.title,
            "severity": f.severity,
            "description": f.description,
            "cwe": f.cwe,
            "evidence": {
                "file": str(f.evidence.file),
                "start_line": f.evidence.start_line,
                "end_line": f.evidence.end_line,
            },
        }
        for f in findings
    ]
    return json.dumps(payload, indent=2)
Bei größeren Codebasen kann die vollständige kondensierte Repo-Map unser Zeichen-Budget sprengen. Dann beschneiden wir auf finding-haltige Module plus deren direkte Nachbarn. Das bewahrt genug Struktur, damit der Chainer über Chains mit Evidenz räsonieren kann, und verwirft den Rest:
def _prune_for_budget(
    repo_map: RepoMap, findings: list[Finding], *, char_budget: int
) -> dict[str, object]:
    full = repo_map.condensed_dict()
    if len(json.dumps(full)) <= char_budget:
        return full

    finding_files = {f.evidence.file for f in findings}
    keep_modules = {
        m.module_name for m in repo_map.modules if m.path in finding_files
    }
    if not keep_modules:
        return full

    neighbours: set[str] = set()
    for m in repo_map.modules:
        if m.module_name in keep_modules:
            for edge in m.imports:
                neighbours.add(edge.from_module)
        for edge in m.imports:
            if edge.from_module in keep_modules:
                neighbours.add(m.module_name)
    keep_modules.update(neighbours)

    pruned_modules = [
        {
            "path": str(m.path),
            "module": m.module_name,
            "exports": list(m.exports),
            "imports": [
                {"from": e.from_module, "names": list(e.imported_names)}
                for e in m.imports
            ],
        }
        for m in repo_map.modules
        if m.module_name in keep_modules
    ]
    return {
        "modules": pruned_modules,
        "_pruned": True,
        "_kept": len(pruned_modules),
        "_total": len(repo_map.modules),
    }


def _render_repo_map(
    repo_map: RepoMap, findings: list[Finding], *, char_budget: int = MAX_REPO_MAP_CHARS
) -> str:
    payload = _prune_for_budget(repo_map, findings, char_budget=char_budget)
    if payload.get("_pruned"):
        logger.info(
            "chainer: repo map pruned for token budget (kept %s of %s modules)",
            payload.get("_kept"),
            payload.get("_total"),
        )
    return json.dumps(payload, indent=2)
Die Prune-Strategie ist bewusst einfach: Module behalten, in denen unsere Findings liegen, und deren direkte Nachbarn im Import-Graph. Alles weiter weg hat keine plausible Rolle in einer Chain, für die wir aktuell Evidenz haben, und kann verworfen werden, ohne Argumentationskraft zu verlieren. Wir versehen das Payload auch mit _pruned-, _kept- und _total-Markern, sodass der Chainer-Prompt das Modell warnen kann, wenn die Map beschnitten wurde. Das Response-Parsen ist wie beim Scanner: deserialisieren, jede Chain über Pydantic validieren, fehlerhafte verwerfen:
def _parse_chains(raw: str) -> list[Chain]:
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError(f"chainer did not return valid JSON: {exc}") from exc

    if not isinstance(data, dict) or "chains" not in data:
        raise ValueError("chainer JSON missing 'chains' key")

    chains: list[Chain] = []
    for entry in data["chains"]:
        try:
            chains.append(Chain.model_validate(entry))
        except ValidationError as exc:
            logger.warning("dropping malformed chain: %s", exc)
    return chains
Dann der Agent selbst:
def find_chains(
    client: OpenAI,
    model: str,
    findings: list[Finding],
    repo_map: RepoMap,
    *,
    max_retries: int = 1,
) -> tuple[list[Chain], list[Chain]]:
    if len(findings) < 2:
        return [], []

    template = _load_prompt_template("chainer.md")
    prompt = template.replace(
        "{findings_json}", _findings_to_input_json(findings)
    ).replace("{repo_map}", _render_repo_map(repo_map, findings))

    last_error: Exception | None = None
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You are a senior offensive security engineer. You respond "
                            "only with valid JSON matching the schema in the user prompt."
                        ),
                    },
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.2,
            )
        except Exception as exc:
            logger.warning("Venice call failed on attempt %d: %s", attempt, exc)
            last_error = exc
            continue

        content = response.choices[0].message.content or ""
        try:
            chains = _parse_chains(content)
        except ValueError as exc:
            logger.warning("chainer parse failure on attempt %d: %s", attempt, exc)
            last_error = exc
            continue

        valid, dropped = validate_chain_references(chains, findings)
        if dropped:
            logger.warning(
                "chainer referenced %d unknown finding id(s) or file(s); chains dropped: %s",
                len(dropped),
                [c.id for c in dropped],
            )
        return valid, dropped

    logger.error("giving up on chainer after %d attempts: %s", max_retries + 1, last_error)
    return [], []
Ein paar Hinweise:
  • Wir brechen vor dem Modell-Aufruf ab, wenn es weniger als zwei Findings gibt. Eine Chain aus einem einzelnen Finding gibt es nicht, und so verbrennen wir keine Tokens auf garantiert leerem Ergebnis.
  • temperature=0.2 ist leicht höher als die 0.1 des Scanners. Der Chainer profitiert von etwas mehr Kreativität, um nicht-offensichtliche Kombinationen zu finden, soll aber in den gegebenen Findings und der Map verankert bleiben.
  • Nach dem Parsen läuft validate_chain_references durch – die deterministische Cross-Reference-Prüfung von oben. Was übrig bleibt, ist sicher zum Rendern; was nicht, wird geloggt, damit der Operator weiß, dass das Modell etwas erfinden wollte.
Diese Cross-Reference-Prüfung ist das wichtigste Stück des ganzen Projekts. Sie ist die Grenze zwischen „nützliches Security-Tool” und „gelegentlich selbstbewusst falscher AI-Report”. Mit ihr im Spiel landet selbst halluzinierter Inhalt nie im Report.

Den Markdown-Report rendern

Das Rendern von der Agent-Logik zu trennen heißt: Dieselben Finding- und Chain-Objekte können später in andere Formate (JSON, SARIF, HTML) fließen, ohne Scanner oder Chainer anzufassen. Wir nutzen Jinja2 mit einer kleinen Template-Datei. Erstelle src/venice_security_reviewer/templates/report.md.j2:
# Security Review Report

**Target:** `{{ target }}`
**Scanned at:** {{ scanned_at }}
**Model:** `{{ model }}`

---

## Summary

- **Atomic findings:** {{ findings | length }}
- **Exploit chains:** {{ chains | length }}
{%- if dropped_chains %}
- **Dropped chains (referenced unknown findings):** {{ dropped_chains | length }}
{%- endif %}

---

## Exploit Chains

{% if not chains %}
_No exploit chains were identified by the Chainer agent._
{% else %}
{% for c in chains %}
### {{ c.id }}{{ c.severity | upper }}

**Findings combined:** {{ c.findings | join(', ') }}
**Files involved:** {{ c.files_involved | map('string') | join(', ') }}

{{ c.narrative }}

{% endfor %}
{% endif %}

---

## Atomic Findings

{% for f in findings %}
### {{ f.id }}{{ f.title }}

- **Severity:** {{ f.severity }}
{%- if f.cwe %}
- **CWE:** {{ f.cwe }}
{%- endif %}
- **Location:** `{{ f.evidence.file }}:{{ f.evidence.start_line }}-{{ f.evidence.end_line }}`

{{ f.description }}

```
{{ f.evidence.snippet }}
```

{% endfor %}
Dann der Renderer in src/venice_security_reviewer/report.py:
from __future__ import annotations

from datetime import UTC, datetime
from pathlib import Path

from jinja2 import Environment, PackageLoader, select_autoescape

from .models import Chain, Finding


def _build_env() -> Environment:
    return Environment(
        loader=PackageLoader("venice_security_reviewer", "templates"),
        autoescape=select_autoescape(enabled_extensions=("html",)),
        keep_trailing_newline=True,
    )


def render_report(
    *,
    target: Path,
    model: str,
    findings: list[Finding],
    chains: list[Chain],
    dropped_chains: list[Chain] | None = None,
) -> str:
    env = _build_env()
    template = env.get_template("report.md.j2")
    return template.render(
        target=str(target),
        scanned_at=datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC"),
        model=model,
        findings=findings,
        chains=chains,
        dropped_chains=dropped_chains or [],
    )
Autoescape bleibt für das Markdown-Template aus (Markdown ist kein HTML), per Extension aber für künftige .html-Templates aktiviert.

Das CLI verdrahten

Das CLI ist der Orchestrator: Repo-Map bauen, scannen, chainen, rendern. Wir nutzen Typer für Argumente und Rich für eine schöne Summary-Tabelle. Erstelle src/venice_security_reviewer/cli.py:
from __future__ import annotations

import logging
import sys
from pathlib import Path
from typing import Annotated

import typer
from rich.console import Console
from rich.table import Table

from .chainer import find_chains
from .client import VeniceConfigError, build_client
from .models import Chain, Finding
from .repo_map import build_repo_map
from .report import render_report
from .scanner import scan_path

app = typer.Typer(
    add_completion=False,
    help="Two-agent security code reviewer powered by Venice AI.",
    no_args_is_help=True,
)
console = Console()


@app.callback()
def _root() -> None:
    """Force Typer to keep `scan` as a named subcommand."""


def _configure_logging(verbose: bool) -> None:
    logging.basicConfig(
        level=logging.DEBUG if verbose else logging.INFO,
        format="%(levelname)s %(name)s: %(message)s",
        stream=sys.stderr,
    )


def _print_summary(
    findings: list[Finding], chains: list[Chain], dropped: list[Chain]
) -> None:
    table = Table(title="Scan summary", show_header=True, header_style="bold")
    table.add_column("Metric")
    table.add_column("Count", justify="right")
    table.add_row("Atomic findings", str(len(findings)))
    table.add_row("Exploit chains", str(len(chains)))
    if dropped:
        table.add_row("Chains dropped (bad refs)", str(len(dropped)))
    console.print(table)


@app.command()
def scan(
    path: Annotated[
        Path,
        typer.Argument(
            exists=True, file_okay=False, dir_okay=True, readable=True, resolve_path=True,
            help="Path to the codebase to scan.",
        ),
    ],
    out: Annotated[
        Path, typer.Option("--out", "-o", help="Where to write the Markdown report.")
    ] = Path("report.md"),
    verbose: Annotated[
        bool, typer.Option("--verbose", "-v", help="Enable debug logging.")
    ] = False,
) -> None:
    """Scan a codebase for vulnerabilities and exploit chains."""
    _configure_logging(verbose)

    try:
        client, model = build_client()
    except VeniceConfigError as exc:
        console.print(f"[red]error:[/red] {exc}")
        raise typer.Exit(code=2) from exc

    console.print(f"[bold]Indexing[/bold] {path} (AST repo map)...")
    repo_map = build_repo_map(path)
    edge_count = sum(len(m.imports) for m in repo_map.modules)
    console.print(
        f"Repo map: [bold]{len(repo_map.modules)}[/bold] module(s), "
        f"[bold]{edge_count}[/bold] import edge(s)."
    )

    console.print(f"[bold]Scanning[/bold] {path} with model [cyan]{model}[/cyan]...")
    findings = scan_path(client, model, path, repo_map)
    console.print(f"Scanner produced [bold]{len(findings)}[/bold] finding(s).")

    console.print("[bold]Chaining[/bold] findings...")
    chains, dropped = find_chains(client, model, findings, repo_map)
    console.print(f"Chainer produced [bold]{len(chains)}[/bold] chain(s).")

    report = render_report(
        target=path, model=model,
        findings=findings, chains=chains, dropped_chains=dropped,
    )
    out.write_text(report, encoding="utf-8")
    console.print(f"Report written to [green]{out}[/green]")
    _print_summary(findings, chains, dropped)


def main() -> None:
    app()


if __name__ == "__main__":
    main()
Den Skript-Entry-Point in pyproject.toml ergänzen:
[project.scripts]
venice-security-reviewer = "venice_security_reviewer.cli:app"
Damit ist die gesamte Pipeline verdrahtet.

Die Guardrails testen

Wir haben uns durchgehend auf eine Idee gestützt: Die deterministischen Leitplanken trennen ein nützliches Security-Tool von einem selbstbewusst-falschen. Diese Behauptung ist nur wertvoll, wenn wir nachweisen können, dass die Leitplanken halten – deshalb rufen die wertvollsten Tests dieses Projekts gar nicht Venice auf. Sie zurren die Pydantic-Grenze und das Prompt-Assembly-Plumbing fest, laufen offline in Millisekunden, ohne API-Key und ohne Token-Kosten. Zuerst Dev-Abhängigkeiten:
uv add --dev "pytest>=8.3" "ruff>=0.7" "mypy>=1.13"
Als Erstes lohnt es, die Modell-Grenze selbst zu testen. Diese Tests behaupten, dass fehlerhafte Findings und Chains zur Konstruktionszeit abgelehnt werden, bevor sie in einen Report gelangen können. Erstelle tests/test_models.py:
from __future__ import annotations

from pathlib import Path

import pytest
from pydantic import ValidationError

from venice_security_reviewer.models import (
    Chain,
    Evidence,
    Finding,
    validate_chain_references,
)


def _finding(fid: str) -> Finding:
    return Finding(
        id=fid,
        title="t",
        severity="medium",
        description="d",
        evidence=Evidence(file=Path("a.py"), start_line=1, end_line=2, snippet="x"),
    )


def test_evidence_rejects_inverted_line_range() -> None:
    with pytest.raises(ValidationError):
        Evidence(file=Path("a.py"), start_line=10, end_line=5, snippet="x")


def test_finding_id_pattern_enforced() -> None:
    with pytest.raises(ValidationError):
        Finding(
            id="not-an-id",
            title="t",
            severity="medium",
            description="d",
            evidence=Evidence(file=Path("a.py"), start_line=1, end_line=2, snippet="x"),
        )


def test_chain_requires_two_findings() -> None:
    with pytest.raises(ValidationError):
        Chain(
            id="C-001",
            findings=["F-001"],
            narrative="n",
            severity="high",
            files_involved=[Path("a.py")],
        )
Jeder dieser Tests spiegelt ein Constraint, das wir den Modellen gegeben haben: ein invertierter Zeilenbereich, eine ID, die nicht zum F-###-Muster passt, und eine „Chain” aus einem einzelnen Finding. Falls einer jemals aufhört zu raisen, ist eine ganze Halluzinationsklasse stillschweigend wieder möglich. Der wichtigste Test deckt den Cross-Reference-Validator ab, weil das die Funktion ist, die erfundene Chains wirklich verwirft:
def test_validate_chain_references_drops_unknown_ids() -> None:
    findings = [_finding("F-001"), _finding("F-002")]
    good = Chain(
        id="C-001",
        findings=["F-001", "F-002"],
        narrative="n",
        severity="critical",
        files_involved=[Path("a.py")],
    )
    bad = Chain(
        id="C-002",
        findings=["F-001", "F-999"],
        narrative="n",
        severity="critical",
        files_involved=[Path("a.py")],
    )
    valid, dropped = validate_chain_references([good, bad], findings)
    assert [c.id for c in valid] == ["C-001"]
    assert [c.id for c in dropped] == ["C-002"]
F-999 wurde vom Scanner nie erzeugt, daher landet die Chain, die das referenziert, in dropped und erreicht den Report nie. Der zugehörige Test im Referenz-Repo, test_validate_chain_references_drops_unknown_files, macht dasselbe für eine Chain, die eine Datei behauptet, aus der keines ihrer Findings stammt. Als Zweites lohnt es, das Plumbing zu testen, das den Chainer füttert. Es ist leicht, das Prompt-Assembly zu refaktorieren und still aufzuhören, Cross-File-Kontext mitzugeben – woraufhin der Chainer weiterläuft, aber leise schlechter wird. Dieser Test baut ein Zwei-Modul-Fixture, rendert den Prompt und behauptet, dass die Cross-File-Information tatsächlich vorhanden ist – ohne Venice-Round-Trip. Erstelle tests/test_cross_file_chain.py:
from __future__ import annotations

from pathlib import Path

from venice_security_reviewer.chainer import (
    _findings_to_input_json,
    _load_prompt_template,
    _render_repo_map,
)
from venice_security_reviewer.models import Evidence, Finding
from venice_security_reviewer.repo_map import build_repo_map


def _write(root: Path, rel: str, content: str) -> None:
    path = root / rel
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content, encoding="utf-8")


def test_chainer_prompt_carries_cross_file_context(tmp_path: Path) -> None:
    _write(tmp_path, "validators.py", "def is_safe_url(url: str) -> bool:\n    return True")
    _write(
        tmp_path,
        "fetcher.py",
        "from .validators import is_safe_url\n\ndef fetch(url: str) -> bytes:\n    return b''",
    )

    rmap = build_repo_map(tmp_path)
    findings = [
        Finding(
            id="F-001",
            title="Validator returns True unconditionally",
            severity="low",
            description="The validator always returns True.",
            evidence=Evidence(
                file=Path("validators.py"), start_line=1, end_line=2, snippet="..."
            ),
        ),
        Finding(
            id="F-002",
            title="Fetcher trusts a stub validator",
            severity="low",
            description="The fetcher gates network access on is_safe_url.",
            evidence=Evidence(
                file=Path("fetcher.py"), start_line=1, end_line=1, snippet="..."
            ),
        ),
    ]

    template = _load_prompt_template("chainer.md")
    prompt = template.replace(
        "{findings_json}", _findings_to_input_json(findings)
    ).replace("{repo_map}", _render_repo_map(rmap, findings))

    assert "{findings_json}" not in prompt and "{repo_map}" not in prompt
    assert "F-001" in prompt and "F-002" in prompt
    assert "validators.py" in prompt and "fetcher.py" in prompt
    assert "is_safe_url" in prompt
Geht dieser Test durch, bekommt der Chainer einen Prompt mit beiden Findings, beiden Dateipfaden und der Import-Kante dazwischen. Ob das Modell diese Information gut nutzt, ist eine separate, out-of-band-Auswertung; dieser Test schützt nur das Plumbing, das die Information in den Prompt bringt. Die ganze Suite plus Linter und Type-Checker:
uv run pytest          # Offline-Tests, keine Live-Venice-Aufrufe
uv run ruff check .
uv run mypy src/
Da keiner dieser Tests Netzwerk berührt, kannst du sie bei jedem Commit und in CI laufen lassen, ohne Tokens zu verbrennen oder einen Venice-Key zu brauchen. Das Referenz-Repo enthält zudem tests/test_scanner_parse.py, tests/test_chainer_parse.py und tests/test_repo_map.py, die JSON-Parsing-Edge-Cases (fehlerhafte Einträge verwerfen statt zu crashen) und den AST-Repo-Map-Builder abdecken.

Das Projekt ausführen

Auf einer echten Codebase: das CLI auf ein Verzeichnis mit Python-Quellcode richten:
uv run venice-security-reviewer scan path/to/your/code
Oder mit pip install -e . ins Virtualenv installieren und venice-security-reviewer scan path/to/your/code ausführen. Der Output sieht ungefähr so aus:
Indexing /path/to/code (AST repo map)...
Repo map: 6 module(s), 14 import edge(s).
Scanning /path/to/code with model zai-org-glm-5...
Scanner produced 4 finding(s).
Chaining findings...
Chainer produced 1 chain(s).
Report written to report.md
                Scan summary
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━┓
┃ Metric                    ┃ Count ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━┩
│ Atomic findings           │     4 │
│ Exploit chains            │     1 │
└───────────────────────────┴───────┘
Der Markdown-Report zeigt oben jede Chain mit ihrem Narrativ, darunter alle Einzel-Findings mit Severity, CWE, Dateiposition, Beschreibung und dem verbatim Snippet, das das Modell gelesen haben will. Das Referenz-Repo liefert vier mitgelieferte Demo-Ziele, die je eine andere Argumentationsform testen:
  • examples/vulnerable_app — eine Multi-File-Flask-App mit drei „low”-Findings, von denen zwei zu einer kritischen Privilege-Escalation-Chain über Dateien hinweg kombinieren. Testet, ob der Chainer selektiv kombiniert.
  • examples/url_preview — ein Multi-File-URL-Fetcher mit einer defensiven Allowlist, die nicht pro Iteration angewendet wird. Testet Cross-File-Datenfluss kombiniert mit Deployment-Topologie (Link-Local-IPs sind Cloud-Credential-Gateways).
  • examples/csv_query — ein Single-File-CSV-Filter mit eval-Sandbox-Escape via __class__.__base__.__subclasses__(). Testet Sprach-Level-Argumentation statt HTTP-Flow.
  • examples/webhook_handler — ein Single-File-HMAC-Verifier mit einer JSON-Parser-Differential-Schwachstelle. Testet Argumentation über mehrere Spezifikationen hinweg.
Probier sie aus:
uv run venice-security-reviewer scan examples/vulnerable_app
uv run venice-security-reviewer scan examples/csv_query
Wenn das CLI jemals chainer referenced N unknown finding id(s) or file(s); chains dropped loggt, hat der Cross-Reference-Validator das Modell beim Erfinden einer Chain erwischt. Die verworfenen Chains schaffen es nie in den Report; du bekommst nur eine Warnung, mit der du den Prompt anpassen oder weitere Chainer-Läufe sampeln kannst.

Dieses Beispiel erweitern

Die Zwei-Agenten-Form lässt sich gut verallgemeinern. Ein paar lohnenswerte Richtungen:
  • Mehr Sprachen. Der Scanner ist auf Prompt-Ebene sprachagnostisch; nur der AST-Builder ist Python-spezifisch. Tausche ihn gegen tree-sitter aus und du bekommst dieselbe Nachbarschafts-/Kondensiert-Map-Form für TypeScript, Go oder Rust.
  • Ein dritter Agent für Fixes. Ist eine Chain einmal da, kann ein Patcher-Agent einen Unified Diff vorschlagen, der eines der konstituierenden Findings entschärft. Den Diff per Pydantic gegen denselben Evidence-File-Satz validieren und du bekommst denselben Halluzinationsschutz gratis.
  • Output-Formate. Nur render_report kennt Markdown. Ergänze einen SARIF-Renderer und dieselben Findings fließen in GitHub Code Scanning. Ein JSON-Renderer pipest Ergebnisse in ein nachgelagertes System.
  • Caching per Datei-Hash. Die Per-Datei-Aufrufe des Scanners sind unabhängig und idempotent. Caching nach (file_hash, prompt_hash, model) bedeutet, dass ein Rescan eines Repos, in dem nur eine Datei geändert wurde, nur den Scanner für diese eine Datei erneut ausführt.
  • Sampling für den Chainer. Bei kritischen Läufen den Chainer N-mal bei leicht höherer Temperatur aufrufen und die Ergebnisse schneiden. Chains, die das Modell konsistent findet, sind eher real; Chains, die einmal auftauchen und nie wieder, sind eher Rauschen.
  • Stärkere Modelle. zai-org-glm-5 ist der Default, weil es ein gutes Preis-/Qualitätsverhältnis bei kombinatorischer Argumentation bietet. Für härtere Codebasen kann ein stärkeres Venice-Modell (über VENICE_MODEL) die Narrative des Chainers merklich verdichten.

Abschluss

Danke fürs Lesen! Hoffentlich hilft das, ein KI-Security-Tool zu strukturieren, das tatsächlich vertrauenswürdig ist. Das Muster verallgemeinert auch über Security hinaus: Immer wenn ein LLM dateiübergreifend argumentieren soll und sich dabei auf reale Evidenz erden muss, ist das Rezept dasselbe. Eine deterministische strukturelle Map bauen, dem Modell einen kontextfähigen Slice davon geben, die Modell-Referenzen gegen die Struktur zurückvalidieren und alles verwerfen, was es nicht erden kann. Mit Python und der Venice-API können wir Agenten bauen, die LLM-Argumentation mit harten Validierungsgrenzen kombinieren, und liefern etwas, das eine nützliche Antwort gibt, statt nur selbstbewusst zu klingen.