Passer au contenu principal
La plupart des outils de sécurité statiques détectent les bugs de manière isolée. Ils analysent un fichier, listent les problèmes et passent au suivant. Le problème, c’est que les vulnérabilités les plus dommageables dans les bases de code modernes sont rarement un bug unique. Ce sont une chaîne : une clé de signature codée en dur plus une vérification d’autorisation manquante plus une injection SQL qui, prises isolément, semblent toutes gérables. Ensemble, elles forment un chemin de prise de contrôle de compte. C’est exactement le type de raisonnement transversal pour lequel les LLM sont efficaces, à condition de leur fournir la bonne structure. Dans cet article, nous allons construire un réviseur de sécurité de code à deux agents en utilisant Python et l’API Venice AI. À la fin, vous disposerez d’une CLI que vous pourrez pointer sur n’importe quelle base de code Python afin de produire un rapport Markdown avec des résultats atomiques et des chaînes d’exploitation. Intéressé par l’implémentation complète du code ? Consultez le dépôt GitHub. Avant de continuer, vous aurez besoin d’une clé API Venice. Exportez-la en tant que variable d’environnement :
export VENICE_API_KEY=<my-key>

Ce que nous allons construire

Le réviseur est un petit projet Python comportant quelques parties bien distinctes :
PartieCe qu’elle fait
Modèles PydanticDéfinissent Evidence, Finding et Chain, et nous donnent une frontière de validation stricte entre le LLM et le reste du programme
Client VeniceEncapsule le SDK Python d’OpenAI pointé vers le point de terminaison compatible OpenAI de Venice
Carte de dépôt ASTParcourt l’arborescence cible avec le module ast de Python et construit une carte déterministe des symboles publics et des arêtes d’import de chaque module
Agent ScannerLit un fichier Python à la fois plus une tranche de voisinage par fichier de la carte du dépôt, et émet des résultats de vulnérabilités atomiques avec des preuves file:line
Agent ChainerLit l’union des résultats plus une carte complète condensée du dépôt, et émet des chaînes d’exploitation qui combinent deux ou plusieurs résultats
Validateur de référencesSupprime toute chaîne qui référence un ID de résultat que le Scanner n’a pas produit, ou qui nomme un fichier dont aucun des résultats référencés ne provient réellement
Rapport MarkdownRestitue les résultats et les chaînes dans un rapport lisible par un humain
CLIConnecte tout avec Typer
Le flux ressemble à ceci :
  1. Parcourir le répertoire cible à la recherche de fichiers .py.
  2. Construire une carte de dépôt déterministe (imports, symboles publics, signatures).
  3. Pour chaque fichier, envoyer au Scanner sa source plus une tranche de voisinage par fichier de la carte et collecter les résultats atomiques.
  4. Envoyer l’union des résultats plus la carte de dépôt condensée au Chainer et collecter les chaînes d’exploitation.
  5. Supprimer toute chaîne qui référence un ID de résultat que le Scanner n’a pas produit, ou qui nomme un fichier dont aucun des résultats référencés ne provient réellement.
  6. Écrire un rapport Markdown.
Deux décisions de conception méritent d’être soulignées avant de commencer à écrire du code. La première est pourquoi deux agents au lieu d’un. Un scanner à agent unique qui tente de tout faire en un seul prompt doit trouver un équilibre entre être exhaustif sur les bugs par fichier et être astucieux sur le raisonnement combinatoire. Diviser le travail signifie que le Scanner peut être impitoyable et bruyant, et que le Chainer peut être sélectif et discret. Ajouter un appel LLM supplémentaire dédié à la combinaison des résultats déverrouille toute une catégorie de bugs pour très peu de code supplémentaire. La seconde est pourquoi une carte de dépôt. Les bases de code réelles s’étendent sur de nombreux fichiers. Un bug du type « le validateur s’exécute mais ne s’applique pas par itération dans le fetcher, et la réponse du fetcher se retrouve dans le rendu » est invisible pour un scanner par fichier. Avant tout appel LLM, nous parcourons l’arborescence cible avec ast de Python et construisons une carte structurelle. Le Scanner voit un voisinage par fichier (qui importe depuis ce fichier, ce que ce fichier importe, les signatures de ces symboles externes). Le Chainer voit une carte complète condensée (chaque module, chaque symbole public, chaque arête d’import, sans source). C’est la plus petite quantité d’ingénierie de contexte que nous ayons trouvée qui permet au Chainer de construire des chaînes dont le flux de données traverse les frontières de modules, sans payer le coût en tokens consistant à insérer la totalité de la base de code dans chaque prompt.

Prérequis

  • Python 3.12+
  • Une clé API Venice depuis venice.ai
  • Une familiarité de base avec Pydantic, le module ast de Python et le SDK Python d’OpenAI
Le dépôt de référence utilise uv pour la gestion des dépendances, mais un environnement virtuel classique fonctionne tout aussi bien.

Mise en place du projet

Créez un nouveau projet et installez les dépendances :
mkdir venice-security-reviewer
cd venice-security-reviewer
uv init
uv add "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0"
Si vous préférez pip, créez plutôt un environnement virtuel :
python -m venv .venv
source .venv/bin/activate
pip install "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0"
Créez un fichier .env pour le développement local :
VENICE_API_KEY=your-venice-api-key-here
# Surcharges facultatives
# VENICE_BASE_URL=https://api.venice.ai/api/v1
# VENICE_MODEL=zai-org-glm-5
Nous allons disposer le code source sous src/venice_security_reviewer/ pour qu’il puisse être importé comme un package, avec les prompts sous prompts/ à la racine du dépôt afin qu’ils puissent être révisés et comparés comme tout autre artefact source :
src/venice_security_reviewer/
  __init__.py
  models.py     # Modèles Pydantic
  client.py     # Fabrique de client Venice
  repo_map.py   # Carte de dépôt construite par AST
  scanner.py    # Agent Scanner
  chainer.py    # Agent Chainer
  report.py     # Rendu Markdown via Jinja2
  cli.py        # CLI Typer
  templates/
    report.md.j2
prompts/
  scanner.md
  chainer.md
tests/
  test_models.py
  test_cross_file_chain.py

Mise en place du client Venice

Venice est compatible OpenAI, nous pouvons donc utiliser le SDK Python officiel d’OpenAI et pointer son base_url vers Venice. Centraliser la construction du client dans un seul fichier signifie que le reste du code n’a jamais à savoir avec quel fournisseur il communique : changer de backend ne toucherait que ce module. Créez src/venice_security_reviewer/client.py :
from __future__ import annotations

import os
from dataclasses import dataclass

from dotenv import load_dotenv
from openai import OpenAI

DEFAULT_BASE_URL = "https://api.venice.ai/api/v1"
DEFAULT_MODEL = "zai-org-glm-5"


class VeniceConfigError(RuntimeError):
    """Raised when Venice client config is missing or invalid."""


@dataclass(frozen=True, slots=True)
class VeniceConfig:
    api_key: str
    base_url: str
    model: str

    @classmethod
    def from_env(cls) -> "VeniceConfig":
        load_dotenv()
        api_key = os.getenv("VENICE_API_KEY")
        if not api_key:
            raise VeniceConfigError(
                "VENICE_API_KEY is not set. Add it to your .env file, "
                "or export VENICE_API_KEY in your shell."
            )
        return cls(
            api_key=api_key,
            base_url=os.getenv("VENICE_BASE_URL", DEFAULT_BASE_URL),
            model=os.getenv("VENICE_MODEL", DEFAULT_MODEL),
        )


def build_client(config: VeniceConfig | None = None) -> tuple[OpenAI, str]:
    cfg = config or VeniceConfig.from_env()
    client = OpenAI(api_key=cfg.api_key, base_url=cfg.base_url)
    return client, cfg.model
Quelques points à noter :
  • Nous utilisons par défaut zai-org-glm-5 parce que c’est un modèle Venice polyvalent et performant, mais vous pouvez le remplacer via la variable d’environnement VENICE_MODEL. Pour des bases de code plus volumineuses ou plus nuancées, choisir un modèle plus puissant peut rendre le Chainer nettement meilleur en qualité narrative.
  • build_client retourne le client et l’ID du modèle, afin que les appelants n’aient pas à lire les variables d’environnement eux-mêmes et que les tests puissent injecter une fausse configuration sans monkeypatching.

Définition des modèles de données

Tout l’intérêt d’utiliser Pydantic ici, plutôt que de faire circuler des dicts bruts, est que nous obtenons une frontière de validation stricte entre le LLM et le reste du programme. Si le modèle renvoie du JSON malformé ou invente un ID de résultat qui n’existe pas, le parsing échoue bruyamment et nous ne propageons jamais l’hallucination dans le rapport. Créez src/venice_security_reviewer/models.py :
from __future__ import annotations

from pathlib import Path
from typing import Literal, Self

from pydantic import BaseModel, ConfigDict, Field, model_validator

Severity = Literal["low", "medium", "high", "critical"]
ChainSeverity = Literal["high", "critical"]


class Evidence(BaseModel):
    """A concrete code span that justifies a finding."""

    model_config = ConfigDict(frozen=True)

    file: Path
    start_line: int = Field(ge=1)
    end_line: int = Field(ge=1)
    snippet: str

    @model_validator(mode="after")
    def _check_line_range(self) -> Self:
        if self.end_line < self.start_line:
            raise ValueError(
                f"end_line ({self.end_line}) must be >= start_line ({self.start_line})"
            )
        return self


class Finding(BaseModel):
    """An atomic vulnerability surfaced by the Scanner agent."""

    model_config = ConfigDict(frozen=True)

    id: str = Field(pattern=r"^F-\d{3,}$")
    title: str = Field(min_length=1)
    severity: Severity
    description: str = Field(min_length=1)
    cwe: str | None = None
    evidence: Evidence


class Chain(BaseModel):
    """An exploit chain combining two or more atomic findings."""

    model_config = ConfigDict(frozen=True)

    id: str = Field(pattern=r"^C-\d{3,}$")
    findings: list[str] = Field(min_length=2)
    narrative: str = Field(min_length=1)
    severity: ChainSeverity
    files_involved: list[Path] = Field(min_length=1)
Les contraintes font ici un vrai travail :
  • Finding.id et Chain.id sont contraints à une regex du type F-001, C-001. Si le modèle fait preuve de créativité sur le format, la validation échoue.
  • Chain.findings exige au moins deux entrées : une « chaîne » d’un seul résultat n’est qu’un résultat.
  • Chain.severity est restreint à high ou critical. Une combinaison de résultats qui n’élève pas l’impact au-dessus de la sévérité individuelle la plus élevée n’est pas une chaîne digne d’être rapportée.
  • Evidence impose que end_line >= start_line afin que le modèle ne puisse pas retourner des plages de lignes absurdes.
C’est la validation de forme. Nous avons également besoin d’une validation de références croisées : une chaîne qui référence un ID de résultat que le Scanner n’a jamais produit n’a aucun sens. Ajoutez cette fonction à models.py :
def validate_chain_references(
    chains: list[Chain], findings: list[Finding]
) -> tuple[list[Chain], list[Chain]]:
    findings_by_id = {f.id: f for f in findings}
    valid: list[Chain] = []
    dropped: list[Chain] = []
    for chain in chains:
        if not all(ref in findings_by_id for ref in chain.findings):
            dropped.append(chain)
            continue
        chain_evidence_files = {
            findings_by_id[ref].evidence.file.as_posix() for ref in chain.findings
        }
        if not all(p.as_posix() in chain_evidence_files for p in chain.files_involved):
            dropped.append(chain)
            continue
        valid.append(chain)
    return valid, dropped
C’est le garde-fou déterministe qui maintient la sincérité du Chainer. Il ne peut référencer que les résultats que le Scanner a réellement produits, et il ne peut prétendre que des fichiers impliqués dans la chaîne d’où provient effectivement l’un de ces résultats. Renvoyer les chaînes supprimées plutôt que de les filtrer silencieusement permet à la CLI d’émettre un avertissement quand le modèle essaie d’inventer quelque chose.

Construction de la carte de dépôt AST

La carte de dépôt est le squelette structurel d’une base de code Python : la surface publique de chaque module, chaque arête d’import et un index inverse de « module M » vers « modules qui importent depuis M ». Elle est construite une fois par exécution de scan avec ast de Python, jamais par exécution, donc il est sûr de l’exécuter sur du code adversarial : le parseur n’importe ni n’invoque rien depuis l’arbre analysé. Nous consommerons la carte sous deux formes. Le Scanner reçoit une tranche de voisinage par fichier afin que ses prompts restent de taille limitée. Le Chainer reçoit une carte complète condensée afin qu’il puisse construire des chaînes à travers les fichiers. Créez src/venice_security_reviewer/repo_map.py et commencez par les modèles Pydantic qui décrivent la carte :
from __future__ import annotations

import ast
import logging
from collections.abc import Iterable
from pathlib import Path
from typing import Literal

from pydantic import BaseModel, ConfigDict, Field

logger = logging.getLogger(__name__)

SymbolKind = Literal["function", "class", "constant"]
_SIGNATURE_CHAR_CAP = 200

SKIP_DIR_NAMES: frozenset[str] = frozenset({
    ".git", ".venv", "venv", "env", "__pycache__", "node_modules",
    "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "site-packages",
})


class SymbolDef(BaseModel):
    model_config = ConfigDict(frozen=True)
    name: str
    kind: SymbolKind
    line: int = Field(ge=1)
    signature: str | None = None


class ImportEdge(BaseModel):
    model_config = ConfigDict(frozen=True)
    from_module: str
    imported_names: list[str]
    line: int = Field(ge=1)


class ModuleEntry(BaseModel):
    model_config = ConfigDict(frozen=True)
    path: Path
    module_name: str
    defines: list[SymbolDef]
    imports: list[ImportEdge]
    exports: list[str]
Maintenant, l’aide qui parcourt l’arborescence et saute les répertoires que nous ne devrions pas indexer :
def _iter_python_files(root: Path) -> Iterable[Path]:
    for path in sorted(root.rglob("*.py")):
        if any(part in SKIP_DIR_NAMES for part in path.parts):
            continue
        if path.is_file():
            yield path


def _path_to_module_name(path: Path, root: Path) -> str:
    rel = path.relative_to(root)
    parts = list(rel.with_suffix("").parts)
    if parts and parts[-1] == "__init__":
        parts = parts[:-1]
    return ".".join(parts)
Pour chaque fichier, nous voulons trois choses de l’AST : les symboles de niveau supérieur qu’il définit, les arêtes d’import et une liste __all__ explicite si elle est présente. Les signatures de fonctions et les en-têtes de classes sont rendus sous forme de chaînes compactes que le LLM peut lire directement :
def _render_signature(node: ast.FunctionDef | ast.AsyncFunctionDef) -> str:
    try:
        prefix = "async def " if isinstance(node, ast.AsyncFunctionDef) else "def "
        args = ast.unparse(node.args)
        returns = f" -> {ast.unparse(node.returns)}" if node.returns is not None else ""
        sig = f"{prefix}{node.name}({args}){returns}"
        if len(sig) > _SIGNATURE_CHAR_CAP:
            return f"{prefix}{node.name}(...)"
        return sig
    except Exception:
        return f"def {node.name}(...)"


def _render_class_header(node: ast.ClassDef) -> str:
    try:
        bases = [ast.unparse(b) for b in node.bases]
        sig = f"class {node.name}({', '.join(bases)})" if bases else f"class {node.name}"
        if len(sig) > _SIGNATURE_CHAR_CAP:
            return f"class {node.name}(...)"
        return sig
    except Exception:
        return f"class {node.name}"
Le _SIGNATURE_CHAR_CAP de 200 préserve les signatures réelles typiques (y compris les hints de type) tout en empêchant les cas pathologiques comme une union typée de 200 lignes qui ferait exploser le prompt. Ensuite, l’extracteur qui tire les données structurelles d’un module parsé. Nous gérons ast.FunctionDef, ast.ClassDef, les ast.Assign et ast.AnnAssign de niveau supérieur pour les constantes, et à la fois ast.Import et ast.ImportFrom pour les arêtes d’import. Les imports relatifs sont résolus dans leur forme pointée absolue afin que le Chainer puisse les apparier ultérieurement aux noms de modules :
def _resolve_relative_package(
    *, importer_module: str, importer_is_init: bool, level: int
) -> str | None:
    if level <= 0:
        return None
    importer_parts = importer_module.split(".") if importer_module else []
    base_parts = list(importer_parts) if importer_is_init else importer_parts[:-1]
    steps_up = level - 1
    if steps_up > len(base_parts):
        return None
    package_parts = (
        base_parts[: len(base_parts) - steps_up] if steps_up else list(base_parts)
    )
    return ".".join(package_parts)
La logique d’extraction complète parcourt tree.body et émet des entrées SymbolDef et ImportEdge pour chaque nœud de niveau supérieur. La fonction _extract du dépôt de référence dans repo_map.py couvre l’implémentation complète. La forme qui en ressort est une liste d’objets ModuleEntry, un par fichier. La partie intéressante est ce que nous faisons avec ces entrées. Encapsulez-les dans une RepoMap avec deux méthodes orientées consommateur :
class RepoMap(BaseModel):
    model_config = ConfigDict(frozen=True)
    root: Path
    modules: list[ModuleEntry]

    def by_module_name(self, module_name: str) -> ModuleEntry | None:
        for m in self.modules:
            if m.module_name == module_name:
                return m
        return None

    def importers_of(self, module_name: str) -> list["ImportingRef"]:
        refs: list["ImportingRef"] = []
        for m in self.modules:
            for edge in m.imports:
                if edge.from_module == module_name:
                    refs.append(
                        ImportingRef(
                            importer_path=m.path,
                            importer_module=m.module_name,
                            imported_names=list(edge.imported_names),
                            line=edge.line,
                        )
                    )
        return refs

    def neighborhood(self, path: Path) -> "ModuleNeighborhood | None":
        m = next((mod for mod in self.modules if mod.path == path), None)
        if m is None:
            return None
        return ModuleNeighborhood(
            this_module=m,
            imported_by=self.importers_of(m.module_name),
            imports_from_repo=self.resolve_imports_in_repo(m.module_name),
        )

    def condensed_dict(self) -> dict[str, object]:
        return {
            "modules": [
                {
                    "path": str(m.path),
                    "module": m.module_name,
                    "exports": list(m.exports),
                    "imports": [
                        {"from": e.from_module, "names": list(e.imported_names)}
                        for e in m.imports
                    ],
                }
                for m in self.modules
            ]
        }
neighborhood(path) est ce que le Scanner appelle pour chaque fichier. Cela retourne un objet ModuleNeighborhood contenant le module lui-même, tout autre module qui importe depuis lui, et chaque symbole intra-dépôt qu’il importe d’ailleurs (avec leurs signatures résolues). Cela donne au Scanner suffisamment de contexte pour signaler des résultats qui ne sont évidents que dans un contexte transversal aux fichiers, sans traîner toute la base de code dans le prompt. condensed_dict() est ce que le Chainer reçoit. Les extraits et les signatures sont supprimés ; seuls les chemins, les noms de modules, les exports publics et les arêtes d’import demeurent. C’est la plus petite représentation qui permette encore au Chainer de raisonner sur le flux de données entre modules. Enfin, le point d’entrée qui construit le tout :
def build_repo_map(root: Path) -> RepoMap:
    root = root.resolve()
    modules: list[ModuleEntry] = []
    for path in _iter_python_files(root):
        rel = path.relative_to(root)
        module_name = _path_to_module_name(path, root)
        is_init = path.stem == "__init__"
        try:
            source = path.read_text(encoding="utf-8")
            tree = ast.parse(source)
        except (OSError, SyntaxError, UnicodeDecodeError) as exc:
            logger.warning("repo_map: skipping %s: %s", rel, exc)
            continue
        defines, imports, explicit_all = _extract(
            tree, importer_module=module_name, importer_is_init=is_init
        )
        exports = explicit_all or [s.name for s in defines if not s.name.startswith("_")]
        modules.append(
            ModuleEntry(
                path=rel,
                module_name=module_name,
                defines=defines,
                imports=imports,
                exports=exports,
            )
        )
    return RepoMap(root=root, modules=modules)
Les fichiers que nous ne pouvons pas lire ou qui ne parviennent pas à être parsés sont journalisés et sautés. Nous retournons une carte partielle plutôt que de faire échouer toute l’exécution ; au pire, un appel au Scanner ne verra aucun voisinage pour un fichier, ce qui reste un scan fonctionnel.

Écriture de l’agent Scanner

Le Scanner parcourt un chemin cible, récupère les fichiers source Python et demande à Venice d’identifier les vulnérabilités atomiques un fichier à la fois. L’analyse par fichier maintient le prompt à une taille réduite et isole les défaillances : un mauvais fichier ne fait pas tomber toute l’exécution. Nous garderons le prompt lui-même dans un fichier séparé afin qu’il puisse être révisé et comparé comme tout autre artefact source. Créez prompts/scanner.md :
You are a static security analyst reviewing a single Python source file for
vulnerabilities. You will be given the file path, its full contents, and a
*neighborhood* slice of the surrounding repo: which other modules import
from this file (and what symbols they pull), and which in-repo symbols this
file imports from elsewhere. You must respond with a JSON object that lists
every distinct vulnerability you can identify, with concrete file:line
evidence for each.

# Rules

1. Output a single JSON object. No prose before or after. No markdown fences.
2. The object must match this schema exactly:

```json
{
  "findings": [
    {
      "id": "F-001",
      "title": "Short imperative title, e.g. 'Hardcoded session signing key'",
      "severity": "low | medium | high | critical",
      "description": "One to three sentences explaining the vulnerability and why it matters.",
      "cwe": "CWE-798 or null if not applicable",
      "evidence": {
        "file": "{filename}",
        "start_line": 12,
        "end_line": 14,
        "snippet": "the exact lines from the source, copied verbatim including whitespace"
      }
    }
  ]
}
```

3. Finding IDs must be sequential within this file: F-001, F-002, F-003, etc.
4. The `file` field in evidence must equal the filename you were given, exactly.
5. `start_line` and `end_line` must be 1-indexed line numbers from the source you were given.
6. The `snippet` must be the exact text of those lines, copied verbatim. Do not paraphrase. Do not truncate.
7. Do not invent vulnerabilities. If you are unsure, omit it. False positives waste the operator's time and erode trust in the tool.
8. Every finding's evidence must point at lines in THIS file. Do not produce findings whose evidence lives in a different file. The Chainer is the agent that reasons across files.
9. If the file contains no vulnerabilities, return `{"findings": []}`.
Le prompt complet dans le dépôt de référence contient également une section « What to look for » listant les classes courantes de vulnérabilités (secrets codés en dur, injection SQL, injection de commandes, SSRF, désérialisation non sécurisée, etc.) et une section « How to use the neighborhood » expliquant comment le modèle doit consommer le contexte transversal aux fichiers. Quelques notes sur la conception du prompt :
  • Nous indiquons au modèle d’émettre uniquement du JSON, sans prose ni clôtures. Le SDK d’OpenAI prend en charge un paramètre response_format={"type": "json_object"} qui impose cela côté API, mais le renforcer dans le prompt réduit les cas limites.
  • Nous interdisons explicitement au Scanner de produire des chaînes transversales aux fichiers. Les chaînes sont la mission du Chainer, et demander au Scanner de faire les deux brouille la responsabilité.
  • Nous exigeons que l’extrait soit copié textuellement. Cela signifie que le rapport peut citer les octets exacts que le modèle prétend avoir vus, et qu’un réviseur peut vérifier ponctuellement un résultat en comparant l’extrait à la source.
Maintenant, le code de l’agent. Créez src/venice_security_reviewer/scanner.py et commencez par le parcoureur de fichiers et le chargeur de prompt :
from __future__ import annotations

import json
import logging
from collections.abc import Iterable, Iterator
from pathlib import Path

from openai import OpenAI
from pydantic import ValidationError

from .models import Finding
from .repo_map import ModuleNeighborhood, RepoMap

logger = logging.getLogger(__name__)

DEFAULT_SOURCE_EXTENSIONS: frozenset[str] = frozenset({".py"})

SKIP_DIR_NAMES: frozenset[str] = frozenset({
    ".git", ".venv", "venv", "env", "__pycache__", "node_modules",
    "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "site-packages",
})

MAX_FILE_BYTES = 200_000


def _load_prompt_template(name: str) -> str:
    here = Path(__file__).resolve()
    return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8")


def iter_source_files(
    root: Path, extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS
) -> Iterator[Path]:
    exts = {e.lower() for e in extensions}
    for path in sorted(root.rglob("*")):
        if not path.is_file():
            continue
        if path.suffix.lower() not in exts:
            continue
        if any(part in SKIP_DIR_NAMES for part in path.parts):
            continue
        try:
            if path.stat().st_size > MAX_FILE_BYTES:
                logger.warning("skipping %s: exceeds %d bytes", path, MAX_FILE_BYTES)
                continue
        except OSError:
            continue
        yield path
MAX_FILE_BYTES est un plafond de sécurité. Au-delà d’environ 200 Ko, nous sautons plutôt que d’envoyer un prompt énorme qui sera probablement à la fois coûteux et de faible qualité. Le morceau suivant est le constructeur de prompt. Le modèle utilise {filename}, {source} et {neighborhood} comme espaces réservés ; nous utilisons str.replace plutôt que .format() parce que le modèle contient des exemples JSON avec des accolades littérales :
def _render_neighborhood(neighborhood: ModuleNeighborhood | None) -> str:
    if neighborhood is None:
        return "null"
    return neighborhood.model_dump_json(indent=2)


def _build_prompt(
    template: str, *, filename: str, source: str, neighborhood: ModuleNeighborhood | None
) -> str:
    return (
        template.replace("{filename}", filename)
        .replace("{source}", source)
        .replace("{neighborhood}", _render_neighborhood(neighborhood))
    )
Maintenant le parseur. Nous désérialisons le JSON, validons chaque résultat via Pydantic, et écartons les résultats malformés individuels plutôt que de faire échouer tout le fichier. Un mauvais résultat ne devrait pas nous faire perdre les bons :
def _parse_findings(raw: str, *, source_file: Path) -> list[Finding]:
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError(f"model did not return valid JSON: {exc}") from exc

    if not isinstance(data, dict) or "findings" not in data:
        raise ValueError("model JSON missing 'findings' key")

    findings: list[Finding] = []
    for entry in data["findings"]:
        try:
            findings.append(Finding.model_validate(entry))
        except ValidationError as exc:
            logger.warning("dropping malformed finding from %s: %s", source_file, exc)
    return findings
Le Scanner émet des IDs comme F-001 par fichier, mais le Chainer a besoin de référencer les résultats à travers tout le dépôt. Nous réattribuons les IDs par rapport à un compteur monotone afin qu’ils soient globalement uniques :
def _renumber_findings(findings: list[Finding], offset: int) -> tuple[list[Finding], int]:
    renumbered: list[Finding] = []
    for i, f in enumerate(findings):
        new_id = f"F-{offset + i + 1:03d}"
        renumbered.append(f.model_copy(update={"id": new_id}))
    return renumbered, offset + len(findings)
L’appel de scan d’un seul fichier combine tout cela. Nous lisons le fichier, construisons le prompt, l’envoyons à Venice avec response_format={"type": "json_object"} et une température basse, et parsons le résultat :
def scan_file(
    client: OpenAI,
    model: str,
    path: Path,
    *,
    prompt_template: str,
    repo_root: Path,
    repo_map: RepoMap,
    max_retries: int = 1,
) -> list[Finding]:
    try:
        source = path.read_text(encoding="utf-8")
    except (OSError, UnicodeDecodeError) as exc:
        logger.warning("could not read %s: %s", path, exc)
        return []

    rel = path.relative_to(repo_root)
    neighborhood = repo_map.neighborhood(rel)
    prompt = _build_prompt(
        prompt_template, filename=str(rel), source=source, neighborhood=neighborhood
    )

    last_error: Exception | None = None
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You are a precise static security analyst. You respond "
                            "only with valid JSON matching the schema in the user prompt."
                        ),
                    },
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.1,
            )
        except Exception as exc:
            logger.warning("Venice call failed for %s on attempt %d: %s", rel, attempt, exc)
            last_error = exc
            continue

        content = response.choices[0].message.content or ""
        try:
            findings = _parse_findings(content, source_file=path)
        except ValueError as exc:
            logger.warning("parse failure for %s on attempt %d: %s", rel, attempt, exc)
            last_error = exc
            continue

        return [
            f.model_copy(update={"evidence": f.evidence.model_copy(update={"file": rel})})
            for f in findings
        ]

    logger.error("giving up on %s after %d attempts: %s", rel, max_retries + 1, last_error)
    return []
Deux détails méritent d’être soulignés :
  • Nous corrigeons le chemin du fichier de preuve pour qu’il soit relatif à repo_root après le parsing, puisque le modèle renvoie en écho le nom de fichier que nous lui avons donné, mais nous voulons une forme canonique unique dans tout le rapport.
  • temperature=0.1 est intentionnellement basse. Nous voulons que le Scanner soit conservateur et cohérent entre les exécutions ; la créativité est la mission du Chainer.
Enfin, l’orchestrateur qui scanne chaque fichier éligible sous la racine :
def scan_path(
    client: OpenAI,
    model: str,
    root: Path,
    repo_map: RepoMap,
    *,
    extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS,
) -> list[Finding]:
    template = _load_prompt_template("scanner.md")
    all_findings: list[Finding] = []
    offset = 0
    for path in iter_source_files(root, extensions=extensions):
        logger.info("scanning %s", path.relative_to(root))
        findings = scan_file(
            client, model, path,
            prompt_template=template,
            repo_root=root,
            repo_map=repo_map,
        )
        renumbered, offset = _renumber_findings(findings, offset)
        all_findings.extend(renumbered)
    return all_findings
La carte du dépôt est construite une seule fois par l’appelant et réutilisée pour chaque fichier, de sorte que le Scanner voit une structure globale cohérente même lorsque des fichiers individuels échouent à être parsés ou sont sautés.

Écriture de l’agent Chainer

Le Chainer prend l’union des résultats du Scanner plus la carte de dépôt condensée et demande à Venice si l’un des sous-ensembles de résultats se combine en une véritable chaîne d’exploitation. Deux garde-fous déterministes se trouvent entre le LLM et le rapport :
  1. Chaque chaîne doit référencer uniquement des IDs de résultats que le Scanner a produits.
  2. Chaque chaîne doit revendiquer uniquement les fichiers que la preuve d’au moins un résultat référencé touche.
Les chaînes qui violent l’une ou l’autre règle sont écartées au moment du parsing. Cela empêche le modèle d’halluciner des chaînes « au cas où » et de prétendre qu’une chaîne s’étend sur des fichiers pour lesquels il n’a aucune preuve. Le prompt du Chainer se trouve dans prompts/chainer.md. Son cœur ressemble à ceci :
You are a senior offensive security engineer. You are given a list of atomic
vulnerability findings discovered in a single codebase, plus a structural map
of that codebase showing every module's public symbols and import edges. Your
job is to identify whether any subset of the findings can be combined into a
real, end-to-end exploit chain.

# Rules

1. Output a single JSON object. No prose before or after. No markdown fences.
2. The object must match this schema exactly:

```json
{
  "chains": [
    {
      "id": "C-001",
      "findings": ["F-001", "F-003"],
      "narrative": "Step-by-step explanation of how an attacker combines these specific findings into a single exploit. Reference each finding by ID where it is used.",
      "severity": "high | critical",
      "files_involved": ["pkg/validators.py", "pkg/fetcher.py"]
    }
  ]
}
```

3. Chain IDs must be sequential: C-001, C-002, C-003, etc.
4. Every entry in `findings` MUST be the ID of a finding from the input list. You may NOT invent new finding IDs.
5. Every entry in `files_involved` MUST be the `evidence.file` of at least one of the findings you reference in this chain.
6. A chain must reference at least two distinct findings.
7. Chains are by definition severity high or critical. If a combination doesn't raise the impact above the highest individual severity, it is not a chain worth reporting.
8. If no real chain exists, return `{"chains": []}`. It is correct and expected for many codebases to have findings that do not chain.
Le prompt complet dans le dépôt de référence explique également comment lire la carte du dépôt, comment décider ce qui figure dans files_involved, et surtout, quand ne pas enchaîner. Dire au modèle « il est correct et attendu pour de nombreuses bases de code d’avoir des résultats qui ne s’enchaînent pas » est ce qui l’empêche d’inventer des chaînes pour paraître productif. Maintenant le code de l’agent. Créez src/venice_security_reviewer/chainer.py :
from __future__ import annotations

import json
import logging
from pathlib import Path

from openai import OpenAI
from pydantic import ValidationError

from .models import Chain, Finding, validate_chain_references
from .repo_map import RepoMap

logger = logging.getLogger(__name__)

MAX_REPO_MAP_CHARS = 8000


def _load_prompt_template(name: str) -> str:
    here = Path(__file__).resolve()
    return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8")
MAX_REPO_MAP_CHARS = 8000 est un plafond souple pour le bloc de carte de dépôt rendu en JSON dans le prompt du Chainer. À environ 4 caractères par token, cela représente environ 2 000 tokens, ce qui tient confortablement dans la fenêtre de contexte de n’importe quel modèle Venice, même avec les résultats et le budget narratif en sus. Nous sérialisons les résultats dans un bloc JSON compact. Notez que nous supprimons délibérément le snippet des preuves ici : le Chainer n’a pas besoin des octets bruts pour décider si deux résultats se combinent, et les inclure double approximativement le coût en tokens sur de vraies bases de code :
def _findings_to_input_json(findings: list[Finding]) -> str:
    payload = [
        {
            "id": f.id,
            "title": f.title,
            "severity": f.severity,
            "description": f.description,
            "cwe": f.cwe,
            "evidence": {
                "file": str(f.evidence.file),
                "start_line": f.evidence.start_line,
                "end_line": f.evidence.end_line,
            },
        }
        for f in findings
    ]
    return json.dumps(payload, indent=2)
Pour les bases de code plus volumineuses, la carte de dépôt condensée complète peut dépasser notre budget de caractères. Lorsque cela se produit, nous élaguons pour ne conserver que les modules porteurs de résultats plus leurs voisins directs. Cela préserve suffisamment de structure pour que le Chainer puisse raisonner sur les chaînes pour lesquelles nous avons des preuves, et écarte le reste :
def _prune_for_budget(
    repo_map: RepoMap, findings: list[Finding], *, char_budget: int
) -> dict[str, object]:
    full = repo_map.condensed_dict()
    if len(json.dumps(full)) <= char_budget:
        return full

    finding_files = {f.evidence.file for f in findings}
    keep_modules = {
        m.module_name for m in repo_map.modules if m.path in finding_files
    }
    if not keep_modules:
        return full

    neighbours: set[str] = set()
    for m in repo_map.modules:
        if m.module_name in keep_modules:
            for edge in m.imports:
                neighbours.add(edge.from_module)
        for edge in m.imports:
            if edge.from_module in keep_modules:
                neighbours.add(m.module_name)
    keep_modules.update(neighbours)

    pruned_modules = [
        {
            "path": str(m.path),
            "module": m.module_name,
            "exports": list(m.exports),
            "imports": [
                {"from": e.from_module, "names": list(e.imported_names)}
                for e in m.imports
            ],
        }
        for m in repo_map.modules
        if m.module_name in keep_modules
    ]
    return {
        "modules": pruned_modules,
        "_pruned": True,
        "_kept": len(pruned_modules),
        "_total": len(repo_map.modules),
    }


def _render_repo_map(
    repo_map: RepoMap, findings: list[Finding], *, char_budget: int = MAX_REPO_MAP_CHARS
) -> str:
    payload = _prune_for_budget(repo_map, findings, char_budget=char_budget)
    if payload.get("_pruned"):
        logger.info(
            "chainer: repo map pruned for token budget (kept %s of %s modules)",
            payload.get("_kept"),
            payload.get("_total"),
        )
    return json.dumps(payload, indent=2)
La stratégie d’élagage est intentionnellement simple : on conserve les modules dans lesquels vivent nos résultats, et on conserve leurs voisins directs dans le graphe d’imports. Tout ce qui est plus éloigné n’a aucun rôle plausible dans une chaîne pour laquelle nous avons actuellement des preuves, et peut donc être écarté sans perdre de puissance de raisonnement. Nous annotons aussi le payload avec les marqueurs _pruned, _kept et _total, afin que le prompt du Chainer puisse avertir le modèle quand la carte a été réduite. Le parsing de la réponse a la même forme que celui du Scanner : désérialisation, validation de chaque chaîne via Pydantic, suppression des entrées malformées :
def _parse_chains(raw: str) -> list[Chain]:
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError(f"chainer did not return valid JSON: {exc}") from exc

    if not isinstance(data, dict) or "chains" not in data:
        raise ValueError("chainer JSON missing 'chains' key")

    chains: list[Chain] = []
    for entry in data["chains"]:
        try:
            chains.append(Chain.model_validate(entry))
        except ValidationError as exc:
            logger.warning("dropping malformed chain: %s", exc)
    return chains
Puis l’agent lui-même :
def find_chains(
    client: OpenAI,
    model: str,
    findings: list[Finding],
    repo_map: RepoMap,
    *,
    max_retries: int = 1,
) -> tuple[list[Chain], list[Chain]]:
    if len(findings) < 2:
        return [], []

    template = _load_prompt_template("chainer.md")
    prompt = template.replace(
        "{findings_json}", _findings_to_input_json(findings)
    ).replace("{repo_map}", _render_repo_map(repo_map, findings))

    last_error: Exception | None = None
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You are a senior offensive security engineer. You respond "
                            "only with valid JSON matching the schema in the user prompt."
                        ),
                    },
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.2,
            )
        except Exception as exc:
            logger.warning("Venice call failed on attempt %d: %s", attempt, exc)
            last_error = exc
            continue

        content = response.choices[0].message.content or ""
        try:
            chains = _parse_chains(content)
        except ValueError as exc:
            logger.warning("chainer parse failure on attempt %d: %s", attempt, exc)
            last_error = exc
            continue

        valid, dropped = validate_chain_references(chains, findings)
        if dropped:
            logger.warning(
                "chainer referenced %d unknown finding id(s) or file(s); chains dropped: %s",
                len(dropped),
                [c.id for c in dropped],
            )
        return valid, dropped

    logger.error("giving up on chainer after %d attempts: %s", max_retries + 1, last_error)
    return [], []
Quelques points à souligner :
  • Nous abandonnons avant d’appeler le modèle lorsqu’il y a moins de deux résultats. On ne peut pas enchaîner un seul résultat, et éviter l’appel signifie que nous ne brûlons pas de tokens sur un résultat garanti vide.
  • temperature=0.2 est légèrement plus élevée que les 0.1 du Scanner. Le Chainer bénéficie d’un peu plus de créativité pour repérer des combinaisons non évidentes, mais nous voulons toujours qu’il soit ancré dans les résultats et la carte qui lui ont été donnés.
  • Après le parsing, validate_chain_references exécute la vérification déterministe des références croisées que nous avons écrite plus tôt. Tout ce qui survit peut être rendu en toute sécurité ; tout ce qui ne survit pas est journalisé afin que l’opérateur sache que le modèle a essayé d’inventer quelque chose.
Cette vérification des références croisées est la pièce la plus importante de tout le projet. C’est la frontière entre « outil de sécurité utile » et « rapport d’IA parfois confiant à tort ». Avec cette vérification en place, même si le modèle hallucine, la mauvaise chaîne n’atteint jamais le rapport.

Rendu du rapport Markdown

Garder le rendu séparé de la logique de l’agent signifie que les mêmes objets Finding et Chain peuvent ensuite être alimentés dans d’autres formats (JSON, SARIF, HTML) sans toucher au Scanner ou au Chainer. Nous utiliserons Jinja2 avec un petit fichier de modèle. Créez src/venice_security_reviewer/templates/report.md.j2 :
# Security Review Report

**Target:** `{{ target }}`
**Scanned at:** {{ scanned_at }}
**Model:** `{{ model }}`

---

## Summary

- **Atomic findings:** {{ findings | length }}
- **Exploit chains:** {{ chains | length }}
{%- if dropped_chains %}
- **Dropped chains (referenced unknown findings):** {{ dropped_chains | length }}
{%- endif %}

---

## Exploit Chains

{% if not chains %}
_No exploit chains were identified by the Chainer agent._
{% else %}
{% for c in chains %}
### {{ c.id }}{{ c.severity | upper }}

**Findings combined:** {{ c.findings | join(', ') }}
**Files involved:** {{ c.files_involved | map('string') | join(', ') }}

{{ c.narrative }}

{% endfor %}
{% endif %}

---

## Atomic Findings

{% for f in findings %}
### {{ f.id }}{{ f.title }}

- **Severity:** {{ f.severity }}
{%- if f.cwe %}
- **CWE:** {{ f.cwe }}
{%- endif %}
- **Location:** `{{ f.evidence.file }}:{{ f.evidence.start_line }}-{{ f.evidence.end_line }}`

{{ f.description }}

```
{{ f.evidence.snippet }}
```

{% endfor %}
Puis le renderer dans src/venice_security_reviewer/report.py :
from __future__ import annotations

from datetime import UTC, datetime
from pathlib import Path

from jinja2 import Environment, PackageLoader, select_autoescape

from .models import Chain, Finding


def _build_env() -> Environment:
    return Environment(
        loader=PackageLoader("venice_security_reviewer", "templates"),
        autoescape=select_autoescape(enabled_extensions=("html",)),
        keep_trailing_newline=True,
    )


def render_report(
    *,
    target: Path,
    model: str,
    findings: list[Finding],
    chains: list[Chain],
    dropped_chains: list[Chain] | None = None,
) -> str:
    env = _build_env()
    template = env.get_template("report.md.j2")
    return template.render(
        target=str(target),
        scanned_at=datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC"),
        model=model,
        findings=findings,
        chains=chains,
        dropped_chains=dropped_chains or [],
    )
L’autoescape reste désactivé pour le modèle Markdown (Markdown n’est pas du HTML), mais nous le laissons activé pour tous les futurs modèles .html par extension.

Câblage de la CLI

La CLI est l’orchestrateur : construire la carte du dépôt, scanner, enchaîner, restituer. Nous utiliserons Typer pour gérer le parsing des arguments et Rich pour afficher un joli tableau de résumé. Créez src/venice_security_reviewer/cli.py :
from __future__ import annotations

import logging
import sys
from pathlib import Path
from typing import Annotated

import typer
from rich.console import Console
from rich.table import Table

from .chainer import find_chains
from .client import VeniceConfigError, build_client
from .models import Chain, Finding
from .repo_map import build_repo_map
from .report import render_report
from .scanner import scan_path

app = typer.Typer(
    add_completion=False,
    help="Two-agent security code reviewer powered by Venice AI.",
    no_args_is_help=True,
)
console = Console()


@app.callback()
def _root() -> None:
    """Force Typer to keep `scan` as a named subcommand."""


def _configure_logging(verbose: bool) -> None:
    logging.basicConfig(
        level=logging.DEBUG if verbose else logging.INFO,
        format="%(levelname)s %(name)s: %(message)s",
        stream=sys.stderr,
    )


def _print_summary(
    findings: list[Finding], chains: list[Chain], dropped: list[Chain]
) -> None:
    table = Table(title="Scan summary", show_header=True, header_style="bold")
    table.add_column("Metric")
    table.add_column("Count", justify="right")
    table.add_row("Atomic findings", str(len(findings)))
    table.add_row("Exploit chains", str(len(chains)))
    if dropped:
        table.add_row("Chains dropped (bad refs)", str(len(dropped)))
    console.print(table)


@app.command()
def scan(
    path: Annotated[
        Path,
        typer.Argument(
            exists=True, file_okay=False, dir_okay=True, readable=True, resolve_path=True,
            help="Path to the codebase to scan.",
        ),
    ],
    out: Annotated[
        Path, typer.Option("--out", "-o", help="Where to write the Markdown report.")
    ] = Path("report.md"),
    verbose: Annotated[
        bool, typer.Option("--verbose", "-v", help="Enable debug logging.")
    ] = False,
) -> None:
    """Scan a codebase for vulnerabilities and exploit chains."""
    _configure_logging(verbose)

    try:
        client, model = build_client()
    except VeniceConfigError as exc:
        console.print(f"[red]error:[/red] {exc}")
        raise typer.Exit(code=2) from exc

    console.print(f"[bold]Indexing[/bold] {path} (AST repo map)...")
    repo_map = build_repo_map(path)
    edge_count = sum(len(m.imports) for m in repo_map.modules)
    console.print(
        f"Repo map: [bold]{len(repo_map.modules)}[/bold] module(s), "
        f"[bold]{edge_count}[/bold] import edge(s)."
    )

    console.print(f"[bold]Scanning[/bold] {path} with model [cyan]{model}[/cyan]...")
    findings = scan_path(client, model, path, repo_map)
    console.print(f"Scanner produced [bold]{len(findings)}[/bold] finding(s).")

    console.print("[bold]Chaining[/bold] findings...")
    chains, dropped = find_chains(client, model, findings, repo_map)
    console.print(f"Chainer produced [bold]{len(chains)}[/bold] chain(s).")

    report = render_report(
        target=path, model=model,
        findings=findings, chains=chains, dropped_chains=dropped,
    )
    out.write_text(report, encoding="utf-8")
    console.print(f"Report written to [green]{out}[/green]")
    _print_summary(findings, chains, dropped)


def main() -> None:
    app()


if __name__ == "__main__":
    main()
Ajoutez le point d’entrée du script à pyproject.toml :
[project.scripts]
venice-security-reviewer = "venice_security_reviewer.cli:app"
C’est l’ensemble du pipeline câblé.

Tester les garde-fous

Nous avons largement insisté sur une idée tout au long de cette construction : les garde-fous déterministes sont ce qui distingue un outil de sécurité utile d’un outil confiant à tort. Cette affirmation ne vaut la peine d’être faite que si nous pouvons prouver que les garde-fous tiennent réellement, donc les tests les plus précieux dans ce projet n’appellent pas Venice du tout. Ils verrouillent la frontière Pydantic et la plomberie d’assemblage du prompt, ce qui signifie qu’ils s’exécutent hors ligne, en quelques millisecondes, sans clé API et sans coût en tokens. Ajoutez d’abord les dépendances de développement :
uv add --dev "pytest>=8.3" "ruff>=0.7" "mypy>=1.13"
La première chose qu’il vaut la peine de tester est la frontière du modèle elle-même. Ces tests affirment que les résultats et chaînes malformés sont rejetés au moment de la construction, avant qu’ils ne puissent atteindre un rapport. Créez tests/test_models.py :
from __future__ import annotations

from pathlib import Path

import pytest
from pydantic import ValidationError

from venice_security_reviewer.models import (
    Chain,
    Evidence,
    Finding,
    validate_chain_references,
)


def _finding(fid: str) -> Finding:
    return Finding(
        id=fid,
        title="t",
        severity="medium",
        description="d",
        evidence=Evidence(file=Path("a.py"), start_line=1, end_line=2, snippet="x"),
    )


def test_evidence_rejects_inverted_line_range() -> None:
    with pytest.raises(ValidationError):
        Evidence(file=Path("a.py"), start_line=10, end_line=5, snippet="x")


def test_finding_id_pattern_enforced() -> None:
    with pytest.raises(ValidationError):
        Finding(
            id="not-an-id",
            title="t",
            severity="medium",
            description="d",
            evidence=Evidence(file=Path("a.py"), start_line=1, end_line=2, snippet="x"),
        )


def test_chain_requires_two_findings() -> None:
    with pytest.raises(ValidationError):
        Chain(
            id="C-001",
            findings=["F-001"],
            narrative="n",
            severity="high",
            files_involved=[Path("a.py")],
        )
Chacun de ces tests reflète une contrainte que nous avons placée sur les modèles plus tôt : une plage de lignes inversée, un ID qui ne correspond pas au pattern F-### et une « chaîne » d’un seul résultat. Si l’un d’eux cesse de lever une exception, toute une catégorie d’hallucinations est silencieusement redevenue possible. Le test le plus important couvre le validateur de références croisées, puisque c’est la fonction qui supprime réellement les chaînes inventées :
def test_validate_chain_references_drops_unknown_ids() -> None:
    findings = [_finding("F-001"), _finding("F-002")]
    good = Chain(
        id="C-001",
        findings=["F-001", "F-002"],
        narrative="n",
        severity="critical",
        files_involved=[Path("a.py")],
    )
    bad = Chain(
        id="C-002",
        findings=["F-001", "F-999"],
        narrative="n",
        severity="critical",
        files_involved=[Path("a.py")],
    )
    valid, dropped = validate_chain_references([good, bad], findings)
    assert [c.id for c in valid] == ["C-001"]
    assert [c.id for c in dropped] == ["C-002"]
F-999 n’a jamais été produit par le Scanner, donc la chaîne qui le référence se retrouve dans dropped et n’atteint jamais le rapport. Le test compagnon dans le dépôt de référence, test_validate_chain_references_drops_unknown_files, fait la même chose pour une chaîne qui prétend qu’un fichier ne provient d’aucun de ses résultats. La deuxième chose qu’il vaut la peine de tester est la plomberie qui alimente le Chainer. Il est facile de refactoriser l’assemblage du prompt et d’arrêter silencieusement de passer le contexte transversal aux fichiers, après quoi le Chainer continuerait de fonctionner mais s’aggraverait silencieusement. Ce test construit un fixture à deux modules, rend le prompt et affirme que les informations transversales aux fichiers sont effectivement présentes, là encore sans aller-retour Venice. Créez tests/test_cross_file_chain.py :
from __future__ import annotations

from pathlib import Path

from venice_security_reviewer.chainer import (
    _findings_to_input_json,
    _load_prompt_template,
    _render_repo_map,
)
from venice_security_reviewer.models import Evidence, Finding
from venice_security_reviewer.repo_map import build_repo_map


def _write(root: Path, rel: str, content: str) -> None:
    path = root / rel
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content, encoding="utf-8")


def test_chainer_prompt_carries_cross_file_context(tmp_path: Path) -> None:
    _write(tmp_path, "validators.py", "def is_safe_url(url: str) -> bool:\n    return True")
    _write(
        tmp_path,
        "fetcher.py",
        "from .validators import is_safe_url\n\ndef fetch(url: str) -> bytes:\n    return b''",
    )

    rmap = build_repo_map(tmp_path)
    findings = [
        Finding(
            id="F-001",
            title="Validator returns True unconditionally",
            severity="low",
            description="The validator always returns True.",
            evidence=Evidence(
                file=Path("validators.py"), start_line=1, end_line=2, snippet="..."
            ),
        ),
        Finding(
            id="F-002",
            title="Fetcher trusts a stub validator",
            severity="low",
            description="The fetcher gates network access on is_safe_url.",
            evidence=Evidence(
                file=Path("fetcher.py"), start_line=1, end_line=1, snippet="..."
            ),
        ),
    ]

    template = _load_prompt_template("chainer.md")
    prompt = template.replace(
        "{findings_json}", _findings_to_input_json(findings)
    ).replace("{repo_map}", _render_repo_map(rmap, findings))

    assert "{findings_json}" not in prompt and "{repo_map}" not in prompt
    assert "F-001" in prompt and "F-002" in prompt
    assert "validators.py" in prompt and "fetcher.py" in prompt
    assert "is_safe_url" in prompt
Si ce test passe, le Chainer reçoit un prompt qui contient à la fois les deux résultats, les deux chemins de fichiers et l’arête d’import entre eux. Que le modèle utilise bien cette information est une évaluation séparée et hors-bande ; ce test ne protège que la plomberie qui fait entrer l’information dans le prompt en premier lieu. Exécutez l’ensemble de la suite, plus le linter et le vérificateur de types, avec :
uv run pytest          # tests hors ligne, aucun appel Venice en direct
uv run ruff check .
uv run mypy src/
Comme aucun de ces tests ne touche au réseau, ils peuvent être exécutés sans risque à chaque commit et en CI sans brûler de tokens ni avoir besoin d’une clé Venice. Le dépôt de référence inclut également tests/test_scanner_parse.py, tests/test_chainer_parse.py et tests/test_repo_map.py, qui couvrent les cas limites de parsing JSON (les entrées malformées sont écartées plutôt que de faire planter l’exécution) et le constructeur de carte de dépôt AST.

Exécution du projet

Pour l’essayer sur une vraie base de code, pointez la CLI sur un répertoire de code source Python :
uv run venice-security-reviewer scan path/to/your/code
Ou installez-le dans votre virtualenv avec pip install -e . et exécutez venice-security-reviewer scan path/to/your/code. La sortie ressemble approximativement à ceci :
Indexing /path/to/code (AST repo map)...
Repo map: 6 module(s), 14 import edge(s).
Scanning /path/to/code with model zai-org-glm-5...
Scanner produced 4 finding(s).
Chaining findings...
Chainer produced 1 chain(s).
Report written to report.md
                Scan summary
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━┓
┃ Metric                    ┃ Count ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━┩
│ Atomic findings           │     4 │
│ Exploit chains            │     1 │
└───────────────────────────┴───────┘
Le rapport Markdown affiche chaque chaîne en haut avec son récit, puis chaque résultat individuel en dessous avec la sévérité, le CWE, l’emplacement du fichier, la description et l’extrait textuel que le modèle prétend avoir lu. Le dépôt de référence est aussi livré avec quatre cibles de démonstration incluses qui exercent chacune une forme différente de raisonnement que le Chainer doit effectuer :
  • examples/vulnerable_app — une application Flask multi-fichiers avec trois résultats « low », dont deux se combinent en une chaîne critique d’élévation de privilèges à travers les fichiers. Teste si le Chainer est sélectif sur ce qu’il combine.
  • examples/url_preview — un récupérateur d’URL multi-fichiers avec une liste blanche défensive qui ne s’applique pas par itération. Teste le raisonnement sur le flux de données transversal aux fichiers combiné à la topologie de déploiement (les IP link-local sont des passerelles vers les identifiants cloud).
  • examples/csv_query — un filtre CSV à fichier unique avec une évasion de bac à sable eval via __class__.__base__.__subclasses__(). Teste le raisonnement au niveau du langage plutôt que le flux HTTP.
  • examples/webhook_handler — un vérificateur HMAC à fichier unique avec une vulnérabilité de différentielle d’analyseur JSON. Teste le raisonnement sur plusieurs spécifications.
Essayez-les avec :
uv run venice-security-reviewer scan examples/vulnerable_app
uv run venice-security-reviewer scan examples/csv_query
Si vous voyez un jour la CLI journaliser chainer referenced N unknown finding id(s) or file(s); chains dropped, c’est le validateur de références croisées qui prend le modèle en flagrant délit d’invention d’une chaîne. Les chaînes supprimées n’arrivent jamais dans le rapport ; vous obtenez simplement un avertissement que vous pouvez utiliser pour ajuster le prompt ou échantillonner des exécutions supplémentaires du Chainer.

Étendre cet exemple

La forme à deux agents se généralise bien. Quelques directions qu’il vaut la peine d’explorer :
  • Plus de langages. Le Scanner est indépendant du langage au niveau du prompt ; c’est le constructeur d’AST qui est spécifique à Python. Remplacez-le par tree-sitter et vous pouvez construire les mêmes formes de voisinage/carte condensée pour TypeScript, Go ou Rust.
  • Un troisième agent pour les correctifs. Une fois que vous avez une chaîne, demander à un agent Patcher de rédiger un diff unifié qui désamorce l’un des résultats constitutifs est un petit pas. Validez le diff via Pydantic par rapport au même ensemble de fichiers de preuve et vous obtenez gratuitement la même protection contre les hallucinations.
  • Formats de sortie. render_report est le seul endroit qui connaît Markdown. Ajoutez un renderer SARIF et les mêmes résultats peuvent être déversés dans le code scanning de GitHub. Ajoutez un renderer JSON et vous pouvez acheminer les résultats dans un système en aval.
  • Mise en cache par hachage de fichier. Les appels par fichier du Scanner sont indépendants et idempotents. Mettre en cache par (file_hash, prompt_hash, model) signifie que re-scanner un dépôt dans lequel un fichier a changé ne réexécute le Scanner que sur ce seul fichier.
  • Échantillonnage pour le Chainer. Pour les exécutions à enjeux élevés, appelez le Chainer N fois à une température légèrement plus élevée et croisez les résultats. Les chaînes que le modèle trouve de manière cohérente sont plus susceptibles d’être réelles ; les chaînes qu’il trouve une fois et jamais plus sont probablement du bruit.
  • Modèles plus puissants. zai-org-glm-5 est la valeur par défaut parce qu’il offre un bon équilibre entre coût et qualité pour le raisonnement combinatoire, mais pour des bases de code plus difficiles, choisir un modèle Venice plus puissant (défini via VENICE_MODEL) peut rendre les récits du Chainer notablement plus serrés.

Pour conclure

Merci de votre lecture ! Espérons que cela vous a aidé à comprendre comment structurer un outil de sécurité IA réellement digne de confiance. Le motif que nous avons utilisé ici se généralise au-delà de la sécurité aussi : chaque fois que vous voulez qu’un LLM raisonne à travers des fichiers d’une manière qui doit s’ancrer dans des preuves réelles, la recette est la même. Construisez une carte structurelle déterministe, fournissez au modèle une tranche qui tient dans le contexte, validez les références du modèle par rapport à la structure et supprimez tout ce qu’il ne peut pas ancrer. En utilisant Python avec l’API Venice AI, nous pouvons construire des agents qui combinent le raisonnement LLM avec des frontières de validation strictes, et livrer quelque chose qui donne une réponse utile au lieu d’une réponse qui sonne juste avec assurance.