Saltar al contenido principal
La mayoría de las herramientas de seguridad estáticas encuentran bugs de forma aislada. Escanean un archivo, listan los problemas y siguen adelante. El problema es que las vulnerabilidades más dañinas en las bases de código modernas rara vez son un único bug. Son una cadena: una clave de firma codificada en duro más una falta de comprobación de autorización más una inyección SQL que, por separado, parecen manejables. Juntas son una ruta de toma de control de cuenta. Este es exactamente el tipo de razonamiento transversal en el que los LLM son buenos, si les das la estructura adecuada. En este artículo, construiremos un revisor de seguridad de código de dos agentes usando Python y la API de Venice AI. Al final tendrás una CLI que podrás apuntar a cualquier base de código en Python para producir un informe Markdown con hallazgos atómicos y cadenas de explotación. ¿Te interesa la implementación completa del código? Échale un vistazo al repositorio de GitHub. Antes de continuar, necesitarás una clave API de Venice. Expórtala como una variable de entorno:
export VENICE_API_KEY=<my-key>

Qué vamos a construir

El revisor es un pequeño proyecto en Python con unas pocas partes claras:
ParteQué hace
Modelos PydanticDefinen Evidence, Finding y Chain, y nos dan un límite de validación estricto entre el LLM y el resto del programa
Cliente de VeniceEnvuelve el SDK de Python de OpenAI apuntado al endpoint de Venice compatible con OpenAI
Mapa AST del repoRecorre el árbol objetivo con el módulo ast de Python y construye un mapa determinista de los símbolos públicos e importaciones de cada módulo
Agente ScannerLee un archivo Python a la vez junto con un corte de vecindad por archivo del mapa del repo, y emite hallazgos atómicos de vulnerabilidades con evidencia archivo:línea
Agente ChainerLee la unión de los hallazgos junto con un mapa condensado completo del repo, y emite cadenas de explotación que combinan dos o más hallazgos
Validador de referenciasDescarta cualquier cadena que referencie un ID de hallazgo que el Scanner no produjo, o que nombre un archivo del que no provino ninguno de sus hallazgos referenciados
Informe MarkdownRenderiza los hallazgos y las cadenas en un informe legible
CLIConecta todo con Typer
El flujo es así:
  1. Recorrer el directorio objetivo en busca de archivos .py.
  2. Construir un mapa determinista del repo (importaciones, símbolos públicos, firmas).
  3. Para cada archivo, enviar al Scanner su código fuente junto con un corte de vecindad del mapa por archivo y recopilar los hallazgos atómicos.
  4. Enviar la unión de los hallazgos junto con el mapa condensado del repo al Chainer y recopilar las cadenas de explotación.
  5. Descartar cualquier cadena que referencie un ID de hallazgo que el Scanner no produjo, o que nombre un archivo del que no provenga ninguno de sus hallazgos referenciados.
  6. Escribir un informe Markdown.
Dos decisiones de diseño merecen señalarse antes de empezar a escribir código. La primera es por qué dos agentes en vez de uno. Un escáner de un solo agente que intente hacerlo todo en un único prompt tiene que equilibrar ser minucioso con los bugs por archivo y ser hábil con el razonamiento combinatorio. Dividir el trabajo permite al Scanner ser implacable y ruidoso, y al Chainer ser selectivo y silencioso. Añadir una llamada adicional al LLM dedicada a combinar hallazgos desbloquea toda una clase de bug por muy poco código extra. La segunda es por qué un mapa del repo. Las bases de código reales viven en muchos archivos. Un bug que consiste en “el validador se ejecuta pero no se aplica por iteración en el fetcher, y la respuesta del fetcher acaba en el renderer” es invisible para un escáner por archivo. Antes de cualquier llamada al LLM, recorremos el árbol objetivo con el ast de Python y construimos un mapa estructural. El Scanner ve una vecindad por archivo (qué otros módulos importan desde este archivo, qué importa este archivo, firmas de esos símbolos externos). El Chainer ve un mapa condensado completo (cada módulo, cada símbolo público, cada arista de importación, sin código fuente). Esa es la mínima cantidad de ingeniería de contexto que hemos encontrado que permite al Chainer construir cadenas cuyo flujo de datos cruza los límites entre módulos, sin pagar el coste en tokens de meter toda la base de código en cada prompt.

Requisitos previos

  • Python 3.12+
  • Una clave API de Venice de venice.ai
  • Familiaridad básica con Pydantic, el módulo ast de Python y el SDK de Python de OpenAI
El repositorio de referencia usa uv para la gestión de dependencias, pero un entorno virtual normal funciona igualmente bien.

Configurando el proyecto

Crea un nuevo proyecto e instala las dependencias:
mkdir venice-security-reviewer
cd venice-security-reviewer
uv init
uv add "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0"
Si prefieres pip, crea un entorno virtual en su lugar:
python -m venv .venv
source .venv/bin/activate
pip install "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0"
Crea un archivo .env para el desarrollo local:
VENICE_API_KEY=your-venice-api-key-here
# Anulaciones opcionales
# VENICE_BASE_URL=https://api.venice.ai/api/v1
# VENICE_MODEL=zai-org-glm-5
Pondremos el código en src/venice_security_reviewer/ para mantenerlo importable como paquete, con los prompts en prompts/ en la raíz del repo para que se puedan revisar y diffear como cualquier otro artefacto de código:
src/venice_security_reviewer/
  __init__.py
  models.py     # Modelos Pydantic
  client.py     # Factory del cliente de Venice
  repo_map.py   # Mapa del repo construido con AST
  scanner.py    # Agente Scanner
  chainer.py    # Agente Chainer
  report.py     # Renderizado Markdown con Jinja2
  cli.py        # CLI con Typer
  templates/
    report.md.j2
prompts/
  scanner.md
  chainer.md
tests/
  test_models.py
  test_cross_file_chain.py

Configurando el cliente de Venice

Venice es compatible con OpenAI, así que podemos usar el SDK oficial de Python de OpenAI y simplemente apuntar su base_url a Venice. Centralizar la construcción del cliente en un único archivo significa que el resto del código nunca tiene que saber con qué proveedor está hablando: cambiar de backend solo tocaría este módulo. Crea src/venice_security_reviewer/client.py:
from __future__ import annotations

import os
from dataclasses import dataclass

from dotenv import load_dotenv
from openai import OpenAI

DEFAULT_BASE_URL = "https://api.venice.ai/api/v1"
DEFAULT_MODEL = "zai-org-glm-5"


class VeniceConfigError(RuntimeError):
    """Raised when Venice client config is missing or invalid."""


@dataclass(frozen=True, slots=True)
class VeniceConfig:
    api_key: str
    base_url: str
    model: str

    @classmethod
    def from_env(cls) -> "VeniceConfig":
        load_dotenv()
        api_key = os.getenv("VENICE_API_KEY")
        if not api_key:
            raise VeniceConfigError(
                "VENICE_API_KEY is not set. Add it to your .env file, "
                "or export VENICE_API_KEY in your shell."
            )
        return cls(
            api_key=api_key,
            base_url=os.getenv("VENICE_BASE_URL", DEFAULT_BASE_URL),
            model=os.getenv("VENICE_MODEL", DEFAULT_MODEL),
        )


def build_client(config: VeniceConfig | None = None) -> tuple[OpenAI, str]:
    cfg = config or VeniceConfig.from_env()
    client = OpenAI(api_key=cfg.api_key, base_url=cfg.base_url)
    return client, cfg.model
Algunos puntos a destacar:
  • Usamos zai-org-glm-5 por defecto porque es un modelo de Venice potente y de uso general, pero puedes anularlo con la variable de entorno VENICE_MODEL. Para bases de código más grandes o con más matices, cambiar a un modelo más potente puede hacer al Chainer notablemente mejor en la calidad narrativa.
  • build_client devuelve el cliente y el id del modelo, de modo que los llamadores no tienen que leer las variables de entorno por su cuenta y los tests pueden inyectar una configuración falsa sin monkeypatching.

Definiendo los modelos de datos

El punto de usar Pydantic aquí, en lugar de pasar dicts crudos, es que obtenemos un límite de validación estricto entre el LLM y el resto del programa. Si el modelo devuelve JSON mal formado o inventa un ID de hallazgo que no existe, el parseo falla ruidosamente y nunca propagamos la alucinación al informe. Crea src/venice_security_reviewer/models.py:
from __future__ import annotations

from pathlib import Path
from typing import Literal, Self

from pydantic import BaseModel, ConfigDict, Field, model_validator

Severity = Literal["low", "medium", "high", "critical"]
ChainSeverity = Literal["high", "critical"]


class Evidence(BaseModel):
    """A concrete code span that justifies a finding."""

    model_config = ConfigDict(frozen=True)

    file: Path
    start_line: int = Field(ge=1)
    end_line: int = Field(ge=1)
    snippet: str

    @model_validator(mode="after")
    def _check_line_range(self) -> Self:
        if self.end_line < self.start_line:
            raise ValueError(
                f"end_line ({self.end_line}) must be >= start_line ({self.start_line})"
            )
        return self


class Finding(BaseModel):
    """An atomic vulnerability surfaced by the Scanner agent."""

    model_config = ConfigDict(frozen=True)

    id: str = Field(pattern=r"^F-\d{3,}$")
    title: str = Field(min_length=1)
    severity: Severity
    description: str = Field(min_length=1)
    cwe: str | None = None
    evidence: Evidence


class Chain(BaseModel):
    """An exploit chain combining two or more atomic findings."""

    model_config = ConfigDict(frozen=True)

    id: str = Field(pattern=r"^C-\d{3,}$")
    findings: list[str] = Field(min_length=2)
    narrative: str = Field(min_length=1)
    severity: ChainSeverity
    files_involved: list[Path] = Field(min_length=1)
Las restricciones están haciendo un trabajo real aquí:
  • Finding.id y Chain.id están restringidos a una regex como F-001, C-001. Si el modelo se pone creativo con el formato, la validación falla.
  • Chain.findings requiere al menos dos entradas: una “cadena” de un único hallazgo es simplemente un hallazgo.
  • Chain.severity está restringido a high o critical. Una combinación de hallazgos que no eleve el impacto por encima de la mayor severidad individual no es una cadena que merezca la pena reportar.
  • Evidence impone que end_line >= start_line para que el modelo no pueda devolver rangos de línea sin sentido.
Esa es la validación de forma. También necesitamos validación de referencias cruzadas: una cadena que referencia un ID de hallazgo que el Scanner nunca produjo no tiene sentido. Añade esta función a models.py:
def validate_chain_references(
    chains: list[Chain], findings: list[Finding]
) -> tuple[list[Chain], list[Chain]]:
    findings_by_id = {f.id: f for f in findings}
    valid: list[Chain] = []
    dropped: list[Chain] = []
    for chain in chains:
        if not all(ref in findings_by_id for ref in chain.findings):
            dropped.append(chain)
            continue
        chain_evidence_files = {
            findings_by_id[ref].evidence.file.as_posix() for ref in chain.findings
        }
        if not all(p.as_posix() in chain_evidence_files for p in chain.files_involved):
            dropped.append(chain)
            continue
        valid.append(chain)
    return valid, dropped
Este es el guardrail determinista que mantiene honesto al Chainer. Solo puede referenciar hallazgos que el Scanner realmente produjo, y solo puede reclamar como archivos involucrados en la cadena aquellos de los que uno de esos hallazgos realmente provino. Devolver las cadenas descartadas en lugar de filtrarlas silenciosamente permite a la CLI mostrar una advertencia cuando el modelo intenta inventar algo.

Construyendo el mapa AST del repo

El mapa del repo es el esqueleto estructural de una base de código Python: la superficie pública de cada módulo, cada arista de importación y un índice inverso desde “el módulo M” a “los módulos que importan desde M”. Se construye una vez por ejecución de scan con el ast de Python, nunca mediante ejecución, así que es seguro ejecutarlo sobre código adversario: el parser no importa ni invoca nada del árbol escaneado. Consumiremos el mapa en dos formas. El Scanner obtiene un corte de vecindad por archivo para que sus prompts mantengan un tamaño acotado. El Chainer obtiene un mapa condensado completo para que pueda construir cadenas entre archivos. Crea src/venice_security_reviewer/repo_map.py y comienza con los modelos Pydantic que describen el mapa:
from __future__ import annotations

import ast
import logging
from collections.abc import Iterable
from pathlib import Path
from typing import Literal

from pydantic import BaseModel, ConfigDict, Field

logger = logging.getLogger(__name__)

SymbolKind = Literal["function", "class", "constant"]
_SIGNATURE_CHAR_CAP = 200

SKIP_DIR_NAMES: frozenset[str] = frozenset({
    ".git", ".venv", "venv", "env", "__pycache__", "node_modules",
    "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "site-packages",
})


class SymbolDef(BaseModel):
    model_config = ConfigDict(frozen=True)
    name: str
    kind: SymbolKind
    line: int = Field(ge=1)
    signature: str | None = None


class ImportEdge(BaseModel):
    model_config = ConfigDict(frozen=True)
    from_module: str
    imported_names: list[str]
    line: int = Field(ge=1)


class ModuleEntry(BaseModel):
    model_config = ConfigDict(frozen=True)
    path: Path
    module_name: str
    defines: list[SymbolDef]
    imports: list[ImportEdge]
    exports: list[str]
Ahora el helper que recorre el árbol y omite los directorios que no debemos indexar:
def _iter_python_files(root: Path) -> Iterable[Path]:
    for path in sorted(root.rglob("*.py")):
        if any(part in SKIP_DIR_NAMES for part in path.parts):
            continue
        if path.is_file():
            yield path


def _path_to_module_name(path: Path, root: Path) -> str:
    rel = path.relative_to(root)
    parts = list(rel.with_suffix("").parts)
    if parts and parts[-1] == "__init__":
        parts = parts[:-1]
    return ".".join(parts)
Para cada archivo queremos tres cosas del AST: los símbolos de nivel superior que define, las aristas de importación y una lista __all__ explícita si está presente. Las firmas de funciones y las cabeceras de clase se renderizan como cadenas compactas que el LLM puede leer directamente:
def _render_signature(node: ast.FunctionDef | ast.AsyncFunctionDef) -> str:
    try:
        prefix = "async def " if isinstance(node, ast.AsyncFunctionDef) else "def "
        args = ast.unparse(node.args)
        returns = f" -> {ast.unparse(node.returns)}" if node.returns is not None else ""
        sig = f"{prefix}{node.name}({args}){returns}"
        if len(sig) > _SIGNATURE_CHAR_CAP:
            return f"{prefix}{node.name}(...)"
        return sig
    except Exception:
        return f"def {node.name}(...)"


def _render_class_header(node: ast.ClassDef) -> str:
    try:
        bases = [ast.unparse(b) for b in node.bases]
        sig = f"class {node.name}({', '.join(bases)})" if bases else f"class {node.name}"
        if len(sig) > _SIGNATURE_CHAR_CAP:
            return f"class {node.name}(...)"
        return sig
    except Exception:
        return f"class {node.name}"
El _SIGNATURE_CHAR_CAP de 200 preserva las firmas reales típicas (incluidos los type hints) a la vez que evita casos patológicos como una unión tipada de 200 líneas que haría explotar el prompt. A continuación, el extractor que saca los datos estructurales de un módulo parseado. Manejamos ast.FunctionDef, ast.ClassDef, ast.Assign y ast.AnnAssign de nivel superior para las constantes, y tanto ast.Import como ast.ImportFrom para las aristas de importación. Las importaciones relativas se resuelven en su forma punteada absoluta para que el Chainer pueda compararlas con los nombres de módulo más adelante:
def _resolve_relative_package(
    *, importer_module: str, importer_is_init: bool, level: int
) -> str | None:
    if level <= 0:
        return None
    importer_parts = importer_module.split(".") if importer_module else []
    base_parts = list(importer_parts) if importer_is_init else importer_parts[:-1]
    steps_up = level - 1
    if steps_up > len(base_parts):
        return None
    package_parts = (
        base_parts[: len(base_parts) - steps_up] if steps_up else list(base_parts)
    )
    return ".".join(package_parts)
La lógica completa de extracción recorre tree.body y emite entradas SymbolDef e ImportEdge por cada nodo de nivel superior. La función _extract del repositorio de referencia en repo_map.py cubre la implementación completa. La forma que sale es una lista de objetos ModuleEntry, uno por archivo. La parte interesante es lo que hacemos con esas entradas. Las envolvemos en un RepoMap con dos métodos orientados al consumidor:
class RepoMap(BaseModel):
    model_config = ConfigDict(frozen=True)
    root: Path
    modules: list[ModuleEntry]

    def by_module_name(self, module_name: str) -> ModuleEntry | None:
        for m in self.modules:
            if m.module_name == module_name:
                return m
        return None

    def importers_of(self, module_name: str) -> list["ImportingRef"]:
        refs: list["ImportingRef"] = []
        for m in self.modules:
            for edge in m.imports:
                if edge.from_module == module_name:
                    refs.append(
                        ImportingRef(
                            importer_path=m.path,
                            importer_module=m.module_name,
                            imported_names=list(edge.imported_names),
                            line=edge.line,
                        )
                    )
        return refs

    def neighborhood(self, path: Path) -> "ModuleNeighborhood | None":
        m = next((mod for mod in self.modules if mod.path == path), None)
        if m is None:
            return None
        return ModuleNeighborhood(
            this_module=m,
            imported_by=self.importers_of(m.module_name),
            imports_from_repo=self.resolve_imports_in_repo(m.module_name),
        )

    def condensed_dict(self) -> dict[str, object]:
        return {
            "modules": [
                {
                    "path": str(m.path),
                    "module": m.module_name,
                    "exports": list(m.exports),
                    "imports": [
                        {"from": e.from_module, "names": list(e.imported_names)}
                        for e in m.imports
                    ],
                }
                for m in self.modules
            ]
        }
neighborhood(path) es lo que el Scanner llama por cada archivo. Devuelve un objeto ModuleNeighborhood que contiene el propio módulo, cada otro módulo que importa desde él, y cada símbolo dentro del repo que importa de otros lugares (con sus firmas resueltas). Eso le da al Scanner suficiente contexto para señalar hallazgos que solo son evidentes en contexto entre archivos, sin arrastrar toda la base de código al prompt. condensed_dict() es lo que recibe el Chainer. Los snippets y las firmas se descartan; solo quedan rutas, nombres de módulo, exportaciones públicas y aristas de importación. Esa es la representación más pequeña que aún permite al Chainer razonar sobre el flujo de datos entre módulos. Finalmente, el punto de entrada que construye todo:
def build_repo_map(root: Path) -> RepoMap:
    root = root.resolve()
    modules: list[ModuleEntry] = []
    for path in _iter_python_files(root):
        rel = path.relative_to(root)
        module_name = _path_to_module_name(path, root)
        is_init = path.stem == "__init__"
        try:
            source = path.read_text(encoding="utf-8")
            tree = ast.parse(source)
        except (OSError, SyntaxError, UnicodeDecodeError) as exc:
            logger.warning("repo_map: skipping %s: %s", rel, exc)
            continue
        defines, imports, explicit_all = _extract(
            tree, importer_module=module_name, importer_is_init=is_init
        )
        exports = explicit_all or [s.name for s in defines if not s.name.startswith("_")]
        modules.append(
            ModuleEntry(
                path=rel,
                module_name=module_name,
                defines=defines,
                imports=imports,
                exports=exports,
            )
        )
    return RepoMap(root=root, modules=modules)
Los archivos que no podemos leer o que no parsean correctamente quedan registrados y se omiten. Devolvemos un mapa parcial en lugar de hacer que falle toda la ejecución; en el peor caso, una llamada al Scanner no verá vecindad para un archivo, lo que sigue siendo un scan que funciona.

Escribiendo el agente Scanner

El Scanner recorre una ruta objetivo, recoge archivos de código fuente Python y pide a Venice que identifique vulnerabilidades atómicas un archivo a la vez. Escanear por archivo mantiene el prompt pequeño y hace que los fallos queden aislados: un archivo malo no mata toda la ejecución. Mantendremos el prompt en sí en un archivo aparte para que pueda revisarse y diffearse como cualquier otro artefacto de código. Crea prompts/scanner.md:
You are a static security analyst reviewing a single Python source file for
vulnerabilities. You will be given the file path, its full contents, and a
*neighborhood* slice of the surrounding repo: which other modules import
from this file (and what symbols they pull), and which in-repo symbols this
file imports from elsewhere. You must respond with a JSON object that lists
every distinct vulnerability you can identify, with concrete file:line
evidence for each.

# Rules

1. Output a single JSON object. No prose before or after. No markdown fences.
2. The object must match this schema exactly:

```json
{
  "findings": [
    {
      "id": "F-001",
      "title": "Short imperative title, e.g. 'Hardcoded session signing key'",
      "severity": "low | medium | high | critical",
      "description": "One to three sentences explaining the vulnerability and why it matters.",
      "cwe": "CWE-798 or null if not applicable",
      "evidence": {
        "file": "{filename}",
        "start_line": 12,
        "end_line": 14,
        "snippet": "the exact lines from the source, copied verbatim including whitespace"
      }
    }
  ]
}
```

3. Finding IDs must be sequential within this file: F-001, F-002, F-003, etc.
4. The `file` field in evidence must equal the filename you were given, exactly.
5. `start_line` and `end_line` must be 1-indexed line numbers from the source you were given.
6. The `snippet` must be the exact text of those lines, copied verbatim. Do not paraphrase. Do not truncate.
7. Do not invent vulnerabilities. If you are unsure, omit it. False positives waste the operator's time and erode trust in the tool.
8. Every finding's evidence must point at lines in THIS file. Do not produce findings whose evidence lives in a different file. The Chainer is the agent that reasons across files.
9. If the file contains no vulnerabilities, return `{"findings": []}`.
El prompt completo en el repositorio de referencia también contiene una sección “What to look for” que enumera clases comunes de vulnerabilidades (secretos codificados en duro, inyección SQL, inyección de comandos, SSRF, deserialización insegura, etc.) y una sección “How to use the neighborhood” que explica cómo el modelo debería consumir el contexto entre archivos. Algunas notas sobre el diseño del prompt:
  • Le decimos al modelo que emita solo JSON, sin prosa ni vallas. El SDK de OpenAI admite un parámetro response_format={"type": "json_object"} que impone esto del lado de la API, pero reforzarlo en el prompt reduce los casos límite.
  • Prohibimos explícitamente al Scanner producir cadenas entre archivos. Las cadenas son trabajo del Chainer, y pedir al Scanner que haga ambas cosas difumina la responsabilidad.
  • Exigimos que el snippet se copie palabra por palabra. Esto significa que el informe puede citar los bytes exactos que el modelo afirma haber visto, y un revisor puede verificar un hallazgo comparando el snippet con el código fuente.
Ahora el código del agente. Crea src/venice_security_reviewer/scanner.py y empieza con el recorredor de archivos y el cargador de prompts:
from __future__ import annotations

import json
import logging
from collections.abc import Iterable, Iterator
from pathlib import Path

from openai import OpenAI
from pydantic import ValidationError

from .models import Finding
from .repo_map import ModuleNeighborhood, RepoMap

logger = logging.getLogger(__name__)

DEFAULT_SOURCE_EXTENSIONS: frozenset[str] = frozenset({".py"})

SKIP_DIR_NAMES: frozenset[str] = frozenset({
    ".git", ".venv", "venv", "env", "__pycache__", "node_modules",
    "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "site-packages",
})

MAX_FILE_BYTES = 200_000


def _load_prompt_template(name: str) -> str:
    here = Path(__file__).resolve()
    return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8")


def iter_source_files(
    root: Path, extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS
) -> Iterator[Path]:
    exts = {e.lower() for e in extensions}
    for path in sorted(root.rglob("*")):
        if not path.is_file():
            continue
        if path.suffix.lower() not in exts:
            continue
        if any(part in SKIP_DIR_NAMES for part in path.parts):
            continue
        try:
            if path.stat().st_size > MAX_FILE_BYTES:
                logger.warning("skipping %s: exceeds %d bytes", path, MAX_FILE_BYTES)
                continue
        except OSError:
            continue
        yield path
MAX_FILE_BYTES es un límite de seguridad. Por encima de ~200 KB omitimos en lugar de enviar un prompt enorme que probablemente sea caro y de baja calidad. La siguiente pieza es el constructor del prompt. La plantilla usa {filename}, {source} y {neighborhood} como marcadores de posición; usamos str.replace en lugar de .format() porque la plantilla contiene ejemplos JSON con llaves literales:
def _render_neighborhood(neighborhood: ModuleNeighborhood | None) -> str:
    if neighborhood is None:
        return "null"
    return neighborhood.model_dump_json(indent=2)


def _build_prompt(
    template: str, *, filename: str, source: str, neighborhood: ModuleNeighborhood | None
) -> str:
    return (
        template.replace("{filename}", filename)
        .replace("{source}", source)
        .replace("{neighborhood}", _render_neighborhood(neighborhood))
    )
Ahora el parser. Deserializamos el JSON, validamos cada hallazgo mediante Pydantic y descartamos los hallazgos mal formados individualmente en lugar de hacer que falle todo el archivo. Un hallazgo malo no debería hacernos perder los buenos:
def _parse_findings(raw: str, *, source_file: Path) -> list[Finding]:
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError(f"model did not return valid JSON: {exc}") from exc

    if not isinstance(data, dict) or "findings" not in data:
        raise ValueError("model JSON missing 'findings' key")

    findings: list[Finding] = []
    for entry in data["findings"]:
        try:
            findings.append(Finding.model_validate(entry))
        except ValidationError as exc:
            logger.warning("dropping malformed finding from %s: %s", source_file, exc)
    return findings
El Scanner emite IDs como F-001 por archivo, pero el Chainer necesita referenciar hallazgos en todo el repo. Reasignamos los IDs frente a un contador monótono para que sean únicos globalmente:
def _renumber_findings(findings: list[Finding], offset: int) -> tuple[list[Finding], int]:
    renumbered: list[Finding] = []
    for i, f in enumerate(findings):
        new_id = f"F-{offset + i + 1:03d}"
        renumbered.append(f.model_copy(update={"id": new_id}))
    return renumbered, offset + len(findings)
La llamada de scan de un solo archivo combina todo esto. Leemos el archivo, construimos el prompt, lo enviamos a Venice con response_format={"type": "json_object"} y una temperature baja, y parseamos el resultado:
def scan_file(
    client: OpenAI,
    model: str,
    path: Path,
    *,
    prompt_template: str,
    repo_root: Path,
    repo_map: RepoMap,
    max_retries: int = 1,
) -> list[Finding]:
    try:
        source = path.read_text(encoding="utf-8")
    except (OSError, UnicodeDecodeError) as exc:
        logger.warning("could not read %s: %s", path, exc)
        return []

    rel = path.relative_to(repo_root)
    neighborhood = repo_map.neighborhood(rel)
    prompt = _build_prompt(
        prompt_template, filename=str(rel), source=source, neighborhood=neighborhood
    )

    last_error: Exception | None = None
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You are a precise static security analyst. You respond "
                            "only with valid JSON matching the schema in the user prompt."
                        ),
                    },
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.1,
            )
        except Exception as exc:
            logger.warning("Venice call failed for %s on attempt %d: %s", rel, attempt, exc)
            last_error = exc
            continue

        content = response.choices[0].message.content or ""
        try:
            findings = _parse_findings(content, source_file=path)
        except ValueError as exc:
            logger.warning("parse failure for %s on attempt %d: %s", rel, attempt, exc)
            last_error = exc
            continue

        return [
            f.model_copy(update={"evidence": f.evidence.model_copy(update={"file": rel})})
            for f in findings
        ]

    logger.error("giving up on %s after %d attempts: %s", rel, max_retries + 1, last_error)
    return []
Dos detalles que merece la pena destacar:
  • Parcheamos la ruta de archivo de la evidencia para que sea relativa a repo_root después del parseo, ya que el modelo nos devuelve el nombre de archivo que le pasamos, pero queremos una única forma canónica en todo el informe.
  • temperature=0.1 es intencionadamente bajo. Queremos que el Scanner sea conservador y consistente entre ejecuciones; la creatividad es trabajo del Chainer.
Por último, el orquestador que escanea cada archivo elegible bajo la raíz:
def scan_path(
    client: OpenAI,
    model: str,
    root: Path,
    repo_map: RepoMap,
    *,
    extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS,
) -> list[Finding]:
    template = _load_prompt_template("scanner.md")
    all_findings: list[Finding] = []
    offset = 0
    for path in iter_source_files(root, extensions=extensions):
        logger.info("scanning %s", path.relative_to(root))
        findings = scan_file(
            client, model, path,
            prompt_template=template,
            repo_root=root,
            repo_map=repo_map,
        )
        renumbered, offset = _renumber_findings(findings, offset)
        all_findings.extend(renumbered)
    return all_findings
El mapa del repo lo construye el llamador una sola vez y se reutiliza para cada archivo, así que el Scanner ve una estructura global consistente incluso cuando archivos individuales fallan al parsear o se omiten.

Escribiendo el agente Chainer

El Chainer toma la unión de los hallazgos del Scanner más el mapa condensado del repo y le pregunta a Venice si alguno de los hallazgos se combina en una cadena de explotación real. Hay dos guardrails deterministas entre el LLM y el informe:
  1. Cada cadena debe referenciar solo IDs de hallazgo que el Scanner haya producido.
  2. Cada cadena debe reclamar solo archivos que la evidencia de al menos uno de los hallazgos referenciados toque.
Las cadenas que violan cualquiera de las dos reglas se descartan en el momento del parseo. Esto evita que el modelo alucine cadenas “por si acaso” y que reclame que una cadena se extiende por archivos para los que no tiene evidencia. El prompt del Chainer vive en prompts/chainer.md. El núcleo se ve así:
You are a senior offensive security engineer. You are given a list of atomic
vulnerability findings discovered in a single codebase, plus a structural map
of that codebase showing every module's public symbols and import edges. Your
job is to identify whether any subset of the findings can be combined into a
real, end-to-end exploit chain.

# Rules

1. Output a single JSON object. No prose before or after. No markdown fences.
2. The object must match this schema exactly:

```json
{
  "chains": [
    {
      "id": "C-001",
      "findings": ["F-001", "F-003"],
      "narrative": "Step-by-step explanation of how an attacker combines these specific findings into a single exploit. Reference each finding by ID where it is used.",
      "severity": "high | critical",
      "files_involved": ["pkg/validators.py", "pkg/fetcher.py"]
    }
  ]
}
```

3. Chain IDs must be sequential: C-001, C-002, C-003, etc.
4. Every entry in `findings` MUST be the ID of a finding from the input list. You may NOT invent new finding IDs.
5. Every entry in `files_involved` MUST be the `evidence.file` of at least one of the findings you reference in this chain.
6. A chain must reference at least two distinct findings.
7. Chains are by definition severity high or critical. If a combination doesn't raise the impact above the highest individual severity, it is not a chain worth reporting.
8. If no real chain exists, return `{"chains": []}`. It is correct and expected for many codebases to have findings that do not chain.
El prompt completo en el repositorio de referencia también explica cómo leer el mapa del repo, cómo decidir qué entra en files_involved y, lo más importante, cuándo no encadenar. Decirle al modelo “es correcto y esperable que muchas bases de código tengan hallazgos que no encadenen” es lo que evita que invente cadenas para parecer productivo. Ahora el código del agente. Crea src/venice_security_reviewer/chainer.py:
from __future__ import annotations

import json
import logging
from pathlib import Path

from openai import OpenAI
from pydantic import ValidationError

from .models import Chain, Finding, validate_chain_references
from .repo_map import RepoMap

logger = logging.getLogger(__name__)

MAX_REPO_MAP_CHARS = 8000


def _load_prompt_template(name: str) -> str:
    here = Path(__file__).resolve()
    return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8")
MAX_REPO_MAP_CHARS = 8000 es un techo suave para el bloque del mapa del repo renderizado en JSON en el prompt del Chainer. A aproximadamente 4 caracteres por token, eso son unos ~2000 tokens, lo que cabe cómodamente dentro de la ventana de contexto de cualquier modelo de Venice, incluso con los hallazgos y el presupuesto para la narrativa por encima. Serializamos los hallazgos en un bloque JSON compacto. Ten en cuenta que aquí quitamos el snippet de la evidencia a propósito: el Chainer no necesita los bytes en bruto para decidir si dos hallazgos se combinan, e incluirlos aproximadamente duplica el coste de tokens en bases de código reales:
def _findings_to_input_json(findings: list[Finding]) -> str:
    payload = [
        {
            "id": f.id,
            "title": f.title,
            "severity": f.severity,
            "description": f.description,
            "cwe": f.cwe,
            "evidence": {
                "file": str(f.evidence.file),
                "start_line": f.evidence.start_line,
                "end_line": f.evidence.end_line,
            },
        }
        for f in findings
    ]
    return json.dumps(payload, indent=2)
Para bases de código más grandes, el mapa condensado completo del repo puede pasar muy por encima de nuestro presupuesto de caracteres. Cuando eso sucede, podamos hasta los módulos que contienen hallazgos más sus vecinos directos. Eso conserva suficiente estructura para que el Chainer razone sobre cadenas para las que tenemos evidencia, y descarta el resto:
def _prune_for_budget(
    repo_map: RepoMap, findings: list[Finding], *, char_budget: int
) -> dict[str, object]:
    full = repo_map.condensed_dict()
    if len(json.dumps(full)) <= char_budget:
        return full

    finding_files = {f.evidence.file for f in findings}
    keep_modules = {
        m.module_name for m in repo_map.modules if m.path in finding_files
    }
    if not keep_modules:
        return full

    neighbours: set[str] = set()
    for m in repo_map.modules:
        if m.module_name in keep_modules:
            for edge in m.imports:
                neighbours.add(edge.from_module)
        for edge in m.imports:
            if edge.from_module in keep_modules:
                neighbours.add(m.module_name)
    keep_modules.update(neighbours)

    pruned_modules = [
        {
            "path": str(m.path),
            "module": m.module_name,
            "exports": list(m.exports),
            "imports": [
                {"from": e.from_module, "names": list(e.imported_names)}
                for e in m.imports
            ],
        }
        for m in repo_map.modules
        if m.module_name in keep_modules
    ]
    return {
        "modules": pruned_modules,
        "_pruned": True,
        "_kept": len(pruned_modules),
        "_total": len(repo_map.modules),
    }


def _render_repo_map(
    repo_map: RepoMap, findings: list[Finding], *, char_budget: int = MAX_REPO_MAP_CHARS
) -> str:
    payload = _prune_for_budget(repo_map, findings, char_budget=char_budget)
    if payload.get("_pruned"):
        logger.info(
            "chainer: repo map pruned for token budget (kept %s of %s modules)",
            payload.get("_kept"),
            payload.get("_total"),
        )
    return json.dumps(payload, indent=2)
La estrategia de poda es intencionadamente simple: mantener los módulos en los que viven nuestros hallazgos y mantener sus vecinos directos en el grafo de importaciones. Cualquier cosa más allá no tiene un papel plausible en una cadena para la que tengamos evidencia actualmente, así que se puede descartar sin perder capacidad de razonamiento. También anotamos el payload con marcadores _pruned, _kept y _total, para que el prompt del Chainer pueda advertir al modelo cuando el mapa ha sido recortado. Parsear la respuesta tiene la misma forma que el Scanner: deserializar, validar cada cadena mediante Pydantic, descartar las entradas mal formadas:
def _parse_chains(raw: str) -> list[Chain]:
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError(f"chainer did not return valid JSON: {exc}") from exc

    if not isinstance(data, dict) or "chains" not in data:
        raise ValueError("chainer JSON missing 'chains' key")

    chains: list[Chain] = []
    for entry in data["chains"]:
        try:
            chains.append(Chain.model_validate(entry))
        except ValidationError as exc:
            logger.warning("dropping malformed chain: %s", exc)
    return chains
Luego el agente en sí:
def find_chains(
    client: OpenAI,
    model: str,
    findings: list[Finding],
    repo_map: RepoMap,
    *,
    max_retries: int = 1,
) -> tuple[list[Chain], list[Chain]]:
    if len(findings) < 2:
        return [], []

    template = _load_prompt_template("chainer.md")
    prompt = template.replace(
        "{findings_json}", _findings_to_input_json(findings)
    ).replace("{repo_map}", _render_repo_map(repo_map, findings))

    last_error: Exception | None = None
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You are a senior offensive security engineer. You respond "
                            "only with valid JSON matching the schema in the user prompt."
                        ),
                    },
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.2,
            )
        except Exception as exc:
            logger.warning("Venice call failed on attempt %d: %s", attempt, exc)
            last_error = exc
            continue

        content = response.choices[0].message.content or ""
        try:
            chains = _parse_chains(content)
        except ValueError as exc:
            logger.warning("chainer parse failure on attempt %d: %s", attempt, exc)
            last_error = exc
            continue

        valid, dropped = validate_chain_references(chains, findings)
        if dropped:
            logger.warning(
                "chainer referenced %d unknown finding id(s) or file(s); chains dropped: %s",
                len(dropped),
                [c.id for c in dropped],
            )
        return valid, dropped

    logger.error("giving up on chainer after %d attempts: %s", max_retries + 1, last_error)
    return [], []
Un par de cosas dignas de mención:
  • Salimos antes de llamar al modelo cuando hay menos de dos hallazgos. No puedes encadenar un solo hallazgo, y saltarse la llamada significa que no quemamos tokens en un resultado garantizado vacío.
  • temperature=0.2 es ligeramente más alta que el 0.1 del Scanner. El Chainer se beneficia de un poco más de creatividad para detectar combinaciones no obvias, pero aún queremos que esté anclado en los hallazgos y el mapa que se le ha dado.
  • Tras el parseo, validate_chain_references ejecuta la comprobación determinista de referencias cruzadas que escribimos antes. Cualquier cosa que sobreviva es segura para renderizar; cualquier cosa que no, se registra para que el operador sepa que el modelo intentó inventar algo.
Esa comprobación de referencias cruzadas es la pieza más importante de todo el proyecto. Es la frontera entre “herramienta de seguridad útil” e “informe de IA ocasionalmente equivocado con confianza”. Con ella en su lugar, incluso si el modelo alucina, la cadena incorrecta nunca llega al informe.

Renderizando el informe Markdown

Mantener el renderizado separado de la lógica del agente significa que los mismos objetos Finding y Chain pueden alimentarse luego a otros formatos (JSON, SARIF, HTML) sin tocar el Scanner ni el Chainer. Usaremos Jinja2 con un pequeño archivo de plantilla. Crea src/venice_security_reviewer/templates/report.md.j2:
# Security Review Report

**Target:** `{{ target }}`
**Scanned at:** {{ scanned_at }}
**Model:** `{{ model }}`

---

## Summary

- **Atomic findings:** {{ findings | length }}
- **Exploit chains:** {{ chains | length }}
{%- if dropped_chains %}
- **Dropped chains (referenced unknown findings):** {{ dropped_chains | length }}
{%- endif %}

---

## Exploit Chains

{% if not chains %}
_No exploit chains were identified by the Chainer agent._
{% else %}
{% for c in chains %}
### {{ c.id }}{{ c.severity | upper }}

**Findings combined:** {{ c.findings | join(', ') }}
**Files involved:** {{ c.files_involved | map('string') | join(', ') }}

{{ c.narrative }}

{% endfor %}
{% endif %}

---

## Atomic Findings

{% for f in findings %}
### {{ f.id }}{{ f.title }}

- **Severity:** {{ f.severity }}
{%- if f.cwe %}
- **CWE:** {{ f.cwe }}
{%- endif %}
- **Location:** `{{ f.evidence.file }}:{{ f.evidence.start_line }}-{{ f.evidence.end_line }}`

{{ f.description }}

```
{{ f.evidence.snippet }}
```

{% endfor %}
Luego el renderer en src/venice_security_reviewer/report.py:
from __future__ import annotations

from datetime import UTC, datetime
from pathlib import Path

from jinja2 import Environment, PackageLoader, select_autoescape

from .models import Chain, Finding


def _build_env() -> Environment:
    return Environment(
        loader=PackageLoader("venice_security_reviewer", "templates"),
        autoescape=select_autoescape(enabled_extensions=("html",)),
        keep_trailing_newline=True,
    )


def render_report(
    *,
    target: Path,
    model: str,
    findings: list[Finding],
    chains: list[Chain],
    dropped_chains: list[Chain] | None = None,
) -> str:
    env = _build_env()
    template = env.get_template("report.md.j2")
    return template.render(
        target=str(target),
        scanned_at=datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC"),
        model=model,
        findings=findings,
        chains=chains,
        dropped_chains=dropped_chains or [],
    )
El autoescape se queda desactivado para la plantilla Markdown (Markdown no es HTML), pero lo dejamos habilitado para cualquier futura plantilla .html por extensión.

Conectando la CLI

La CLI es el orquestador: construye el mapa del repo, escanea, encadena, renderiza. Usaremos Typer para gestionar el parseo de argumentos y Rich para imprimir una tabla resumen agradable. Crea src/venice_security_reviewer/cli.py:
from __future__ import annotations

import logging
import sys
from pathlib import Path
from typing import Annotated

import typer
from rich.console import Console
from rich.table import Table

from .chainer import find_chains
from .client import VeniceConfigError, build_client
from .models import Chain, Finding
from .repo_map import build_repo_map
from .report import render_report
from .scanner import scan_path

app = typer.Typer(
    add_completion=False,
    help="Two-agent security code reviewer powered by Venice AI.",
    no_args_is_help=True,
)
console = Console()


@app.callback()
def _root() -> None:
    """Force Typer to keep `scan` as a named subcommand."""


def _configure_logging(verbose: bool) -> None:
    logging.basicConfig(
        level=logging.DEBUG if verbose else logging.INFO,
        format="%(levelname)s %(name)s: %(message)s",
        stream=sys.stderr,
    )


def _print_summary(
    findings: list[Finding], chains: list[Chain], dropped: list[Chain]
) -> None:
    table = Table(title="Scan summary", show_header=True, header_style="bold")
    table.add_column("Metric")
    table.add_column("Count", justify="right")
    table.add_row("Atomic findings", str(len(findings)))
    table.add_row("Exploit chains", str(len(chains)))
    if dropped:
        table.add_row("Chains dropped (bad refs)", str(len(dropped)))
    console.print(table)


@app.command()
def scan(
    path: Annotated[
        Path,
        typer.Argument(
            exists=True, file_okay=False, dir_okay=True, readable=True, resolve_path=True,
            help="Path to the codebase to scan.",
        ),
    ],
    out: Annotated[
        Path, typer.Option("--out", "-o", help="Where to write the Markdown report.")
    ] = Path("report.md"),
    verbose: Annotated[
        bool, typer.Option("--verbose", "-v", help="Enable debug logging.")
    ] = False,
) -> None:
    """Scan a codebase for vulnerabilities and exploit chains."""
    _configure_logging(verbose)

    try:
        client, model = build_client()
    except VeniceConfigError as exc:
        console.print(f"[red]error:[/red] {exc}")
        raise typer.Exit(code=2) from exc

    console.print(f"[bold]Indexing[/bold] {path} (AST repo map)...")
    repo_map = build_repo_map(path)
    edge_count = sum(len(m.imports) for m in repo_map.modules)
    console.print(
        f"Repo map: [bold]{len(repo_map.modules)}[/bold] module(s), "
        f"[bold]{edge_count}[/bold] import edge(s)."
    )

    console.print(f"[bold]Scanning[/bold] {path} with model [cyan]{model}[/cyan]...")
    findings = scan_path(client, model, path, repo_map)
    console.print(f"Scanner produced [bold]{len(findings)}[/bold] finding(s).")

    console.print("[bold]Chaining[/bold] findings...")
    chains, dropped = find_chains(client, model, findings, repo_map)
    console.print(f"Chainer produced [bold]{len(chains)}[/bold] chain(s).")

    report = render_report(
        target=path, model=model,
        findings=findings, chains=chains, dropped_chains=dropped,
    )
    out.write_text(report, encoding="utf-8")
    console.print(f"Report written to [green]{out}[/green]")
    _print_summary(findings, chains, dropped)


def main() -> None:
    app()


if __name__ == "__main__":
    main()
Añade el punto de entrada del script al pyproject.toml:
[project.scripts]
venice-security-reviewer = "venice_security_reviewer.cli:app"
Eso es toda la tubería conectada.

Probando los guardrails

Hemos insistido mucho en una idea a lo largo de esta construcción: los guardrails deterministas son lo que separa una herramienta de seguridad útil de una equivocada con confianza. Esa afirmación solo merece la pena hacerla si podemos demostrar que los guardrails realmente se mantienen, así que las pruebas más valiosas de este proyecto no llaman a Venice en absoluto. Bloquean el límite de Pydantic y la fontanería de ensamblaje de prompts, lo que significa que se ejecutan offline, en milisegundos, sin clave API y sin coste de tokens. Añade primero las dependencias de desarrollo:
uv add --dev "pytest>=8.3" "ruff>=0.7" "mypy>=1.13"
Lo primero que merece la pena probar es el propio límite de los modelos. Estas pruebas afirman que los hallazgos y cadenas mal formados se rechazan en el momento de la construcción, antes de que puedan llegar nunca a un informe. Crea tests/test_models.py:
from __future__ import annotations

from pathlib import Path

import pytest
from pydantic import ValidationError

from venice_security_reviewer.models import (
    Chain,
    Evidence,
    Finding,
    validate_chain_references,
)


def _finding(fid: str) -> Finding:
    return Finding(
        id=fid,
        title="t",
        severity="medium",
        description="d",
        evidence=Evidence(file=Path("a.py"), start_line=1, end_line=2, snippet="x"),
    )


def test_evidence_rejects_inverted_line_range() -> None:
    with pytest.raises(ValidationError):
        Evidence(file=Path("a.py"), start_line=10, end_line=5, snippet="x")


def test_finding_id_pattern_enforced() -> None:
    with pytest.raises(ValidationError):
        Finding(
            id="not-an-id",
            title="t",
            severity="medium",
            description="d",
            evidence=Evidence(file=Path("a.py"), start_line=1, end_line=2, snippet="x"),
        )


def test_chain_requires_two_findings() -> None:
    with pytest.raises(ValidationError):
        Chain(
            id="C-001",
            findings=["F-001"],
            narrative="n",
            severity="high",
            files_involved=[Path("a.py")],
        )
Cada una de estas refleja una restricción que pusimos antes en los modelos: un rango de líneas invertido, un ID que no coincide con el patrón F-### y una “cadena” de un único hallazgo. Si alguna deja de lanzar excepción, toda una clase de alucinación se vuelve silenciosamente posible de nuevo. La prueba más importante cubre el validador de referencias cruzadas, ya que esa es la función que realmente descarta las cadenas inventadas:
def test_validate_chain_references_drops_unknown_ids() -> None:
    findings = [_finding("F-001"), _finding("F-002")]
    good = Chain(
        id="C-001",
        findings=["F-001", "F-002"],
        narrative="n",
        severity="critical",
        files_involved=[Path("a.py")],
    )
    bad = Chain(
        id="C-002",
        findings=["F-001", "F-999"],
        narrative="n",
        severity="critical",
        files_involved=[Path("a.py")],
    )
    valid, dropped = validate_chain_references([good, bad], findings)
    assert [c.id for c in valid] == ["C-001"]
    assert [c.id for c in dropped] == ["C-002"]
El Scanner nunca produjo F-999, así que la cadena que lo referencia acaba en dropped y nunca llega al informe. La prueba complementaria en el repo de referencia, test_validate_chain_references_drops_unknown_files, hace lo mismo para una cadena que reclama un archivo del que no provino ninguno de sus hallazgos. La segunda cosa que merece la pena probar es la fontanería que alimenta al Chainer. Es fácil refactorizar el ensamblaje del prompt y dejar silenciosamente de pasar el contexto entre archivos, momento en el que el Chainer seguiría funcionando pero empeoraría silenciosamente. Esta prueba construye un fixture de dos módulos, renderiza el prompt y afirma que la información entre archivos realmente está presente, todo ello sin un viaje de ida y vuelta a Venice. Crea tests/test_cross_file_chain.py:
from __future__ import annotations

from pathlib import Path

from venice_security_reviewer.chainer import (
    _findings_to_input_json,
    _load_prompt_template,
    _render_repo_map,
)
from venice_security_reviewer.models import Evidence, Finding
from venice_security_reviewer.repo_map import build_repo_map


def _write(root: Path, rel: str, content: str) -> None:
    path = root / rel
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content, encoding="utf-8")


def test_chainer_prompt_carries_cross_file_context(tmp_path: Path) -> None:
    _write(tmp_path, "validators.py", "def is_safe_url(url: str) -> bool:\n    return True")
    _write(
        tmp_path,
        "fetcher.py",
        "from .validators import is_safe_url\n\ndef fetch(url: str) -> bytes:\n    return b''",
    )

    rmap = build_repo_map(tmp_path)
    findings = [
        Finding(
            id="F-001",
            title="Validator returns True unconditionally",
            severity="low",
            description="The validator always returns True.",
            evidence=Evidence(
                file=Path("validators.py"), start_line=1, end_line=2, snippet="..."
            ),
        ),
        Finding(
            id="F-002",
            title="Fetcher trusts a stub validator",
            severity="low",
            description="The fetcher gates network access on is_safe_url.",
            evidence=Evidence(
                file=Path("fetcher.py"), start_line=1, end_line=1, snippet="..."
            ),
        ),
    ]

    template = _load_prompt_template("chainer.md")
    prompt = template.replace(
        "{findings_json}", _findings_to_input_json(findings)
    ).replace("{repo_map}", _render_repo_map(rmap, findings))

    assert "{findings_json}" not in prompt and "{repo_map}" not in prompt
    assert "F-001" in prompt and "F-002" in prompt
    assert "validators.py" in prompt and "fetcher.py" in prompt
    assert "is_safe_url" in prompt
Si esta prueba pasa, al Chainer se le está entregando un prompt que contiene los dos hallazgos, las dos rutas de archivo y la arista de importación entre ellos. Si el modelo usa bien esa información es una evaluación separada y fuera de banda; esta prueba solo protege la fontanería que mete la información en el prompt en primer lugar. Ejecuta toda la suite, además del linter y el comprobador de tipos, con:
uv run pytest          # pruebas offline, sin llamadas en vivo a Venice
uv run ruff check .
uv run mypy src/
Como ninguna de estas pruebas toca la red, son seguras de ejecutar en cada commit y en CI sin quemar tokens ni necesitar una clave de Venice. El repo de referencia también incluye tests/test_scanner_parse.py, tests/test_chainer_parse.py y tests/test_repo_map.py, que cubren casos límite del parseo JSON (entradas mal formadas que se descartan en vez de hacer que la ejecución se rompa) y el constructor del mapa AST del repo.

Ejecutando el proyecto

Para probarlo en una base de código real, apunta la CLI a un directorio de código fuente Python:
uv run venice-security-reviewer scan path/to/your/code
O instálalo en tu virtualenv con pip install -e . y ejecuta venice-security-reviewer scan path/to/your/code. La salida se ve más o menos así:
Indexing /path/to/code (AST repo map)...
Repo map: 6 module(s), 14 import edge(s).
Scanning /path/to/code with model zai-org-glm-5...
Scanner produced 4 finding(s).
Chaining findings...
Chainer produced 1 chain(s).
Report written to report.md
                Scan summary
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━┓
┃ Metric                    ┃ Count ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━┩
│ Atomic findings           │     4 │
│ Exploit chains            │     1 │
└───────────────────────────┴───────┘
El informe Markdown muestra cada cadena en la parte superior con su narrativa, luego cada hallazgo individual debajo con la severidad, CWE, ubicación del archivo, descripción y el snippet textual que el modelo afirma haber leído. El repo de referencia también incluye cuatro objetivos de demo empaquetados que ejercitan cada uno una forma distinta de razonamiento que el Chainer tiene que hacer:
  • examples/vulnerable_app — una app Flask multiarchivo con tres hallazgos “low”, dos de los cuales se combinan en una cadena crítica de escalada de privilegios entre archivos. Prueba si el Chainer es selectivo sobre qué combina.
  • examples/url_preview — un fetcher de URL multiarchivo con una allowlist defensiva que no se aplica por iteración. Prueba el razonamiento de flujo de datos entre archivos combinado con topología de despliegue (las IPs link-local son puertas a credenciales en la nube).
  • examples/csv_query — un filtro CSV de un solo archivo con un escape del sandbox de eval mediante __class__.__base__.__subclasses__(). Prueba el razonamiento a nivel de lenguaje en lugar de flujo HTTP.
  • examples/webhook_handler — un verificador HMAC de un solo archivo con una vulnerabilidad de diferencia entre parsers JSON. Prueba el razonamiento entre múltiples especificaciones.
Pruébalos con:
uv run venice-security-reviewer scan examples/vulnerable_app
uv run venice-security-reviewer scan examples/csv_query
Si alguna vez ves que la CLI registra chainer referenced N unknown finding id(s) or file(s); chains dropped, ese es el validador de referencias cruzadas pillando al modelo en el acto de inventar una cadena. Las cadenas descartadas nunca llegan al informe; solo recibes una advertencia que puedes usar para ajustar el prompt o muestrear ejecuciones adicionales del Chainer.

Extendiendo este ejemplo

La forma de dos agentes se generaliza bien. Algunas direcciones que merece la pena explorar:
  • Más lenguajes. El Scanner es agnóstico al lenguaje a nivel de prompt; el constructor de AST es lo específico de Python. Cambia a tree-sitter y podrás construir las mismas formas de vecindad/mapa-condensado para TypeScript, Go o Rust.
  • Un tercer agente para arreglos. Una vez que tienes una cadena, pedir a un agente Patcher que redacte un diff unificado que neutralice uno de los hallazgos constituyentes es un pequeño paso. Validar el diff con Pydantic contra el mismo conjunto de archivos-evidencia te da gratis la misma protección frente a alucinaciones.
  • Formatos de salida. render_report es el único lugar que conoce el Markdown. Añade un renderer SARIF y los mismos hallazgos pueden caer en code scanning de GitHub. Añade un renderer JSON y podrás canalizar los resultados a un sistema descendente.
  • Caché por hash de archivo. Las llamadas por archivo del Scanner son independientes e idempotentes. Cachear por (file_hash, prompt_hash, model) significa que re-escanear un repo donde cambió un archivo solo vuelve a ejecutar el Scanner sobre ese archivo.
  • Muestreo para el Chainer. Para ejecuciones de alto riesgo, llama al Chainer N veces con una temperature ligeramente más alta e intersecta los resultados. Las cadenas que el modelo encuentra de forma consistente tienen más probabilidades de ser reales; las que encuentra una vez y nunca más son probablemente ruido.
  • Modelos más potentes. zai-org-glm-5 es el predeterminado porque logra un buen equilibrio entre coste y calidad para el razonamiento combinatorio, pero para bases de código más difíciles, cambiar a un modelo de Venice más potente (configurado mediante VENICE_MODEL) puede hacer que las narrativas del Chainer sean notablemente más ajustadas.

Para terminar

¡Gracias por leer! Esperamos que esto te haya ayudado a entender cómo estructurar una herramienta de seguridad con IA que sea realmente confiable. El patrón que hemos usado aquí se generaliza más allá de la seguridad: cada vez que quieras que un LLM razone entre archivos de un modo que tenga que aterrizar en evidencia real, la receta es la misma. Construye un mapa estructural determinista, entrega al modelo un corte del mismo que quepa en el contexto, valida las referencias del modelo contra la estructura y descarta cualquier cosa que no pueda aterrizar. Al utilizar Python con la API de Venice AI, podemos construir agentes que combinen el razonamiento del LLM con límites de validación estrictos, y entregar algo que dé una respuesta útil en lugar de una que suena confiada.