Pular para o conteúdo principal
A maioria das ferramentas de segurança estática encontra bugs isoladamente. Elas escaneiam um arquivo, listam os problemas e seguem em frente. O problema é que as vulnerabilidades mais danosas em bases de código modernas raramente são um único bug. Elas são uma cadeia: uma chave de assinatura hardcoded mais uma verificação de autorização ausente mais uma SQL injection que, sozinhas, parecem todas gerenciáveis. Juntas, são um caminho de tomada de conta. Esse é exatamente o tipo de raciocínio cruzado em que LLMs são bons, se você der a estrutura certa. Neste artigo, construiremos um revisor de segurança de código com dois agentes usando Python e a API Venice AI. Ao final, você terá um CLI que pode apontar para qualquer base de código Python para produzir um relatório Markdown com findings atômicos e exploit chains. Interessado na implementação completa do código? Confira o repositório no GitHub. Antes de continuarmos, você precisará de uma chave de API Venice. Exporte-a como variável de ambiente:
export VENICE_API_KEY=<my-key>

O que vamos construir

O revisor é um pequeno projeto Python com algumas partes claras:
ParteO que faz
Modelos PydanticDefinem Evidence, Finding e Chain, e nos dão um limite duro de validação entre o LLM e o resto do programa
Cliente VeniceEnvolve o SDK Python da OpenAI apontado para o endpoint compatível com OpenAI da Venice
Repo map ASTCaminha pela árvore alvo com o módulo ast do Python e constrói um mapa determinístico de cada símbolo público de módulo e arestas de import
Agente ScannerLê um arquivo Python por vez mais uma fatia de vizinhança do repo map por arquivo, e emite findings atômicos de vulnerabilidade com evidência file:line
Agente ChainerLê a união de findings mais um repo map completo condensado, e emite exploit chains que combinam dois ou mais findings
Validador de referênciaDescarta qualquer chain que faça referência a um ID de finding que o Scanner não produziu, ou nomeie um arquivo do qual nenhum dos findings referenciados realmente veio
Relatório MarkdownRenderiza findings e chains em um relatório legível
CLIConecta tudo com Typer
O fluxo é assim:
  1. Caminhar pelo diretório alvo procurando arquivos .py.
  2. Construir um repo map determinístico (imports, símbolos públicos, assinaturas).
  3. Para cada arquivo, enviar ao Scanner seu código-fonte mais uma fatia de vizinhança do mapa por arquivo e coletar findings atômicos.
  4. Enviar a união dos findings mais o repo map condensado ao Chainer e coletar exploit chains.
  5. Descartar qualquer chain que referencie um ID de finding que o Scanner não produziu, ou que nomeie um arquivo do qual nenhum dos findings referenciados realmente veio.
  6. Escrever um relatório Markdown.
Duas decisões de design valem destacar antes de começarmos a escrever código. A primeira é por que dois agentes em vez de um. Um scanner de agente único que tenta fazer tudo em um prompt tem que equilibrar ser minucioso sobre bugs por arquivo e ser inteligente sobre raciocínio combinatório. Dividir o trabalho significa que o Scanner pode ser implacável e barulhento, e o Chainer pode ser seletivo e silencioso. Adicionar uma chamada LLM extra dedicada a combinar findings desbloqueia uma classe inteira de bugs por muito pouco código extra. A segunda é por que um repo map. Bases de código reais vivem em muitos arquivos. Um bug que consiste em “o validator roda mas não aplica por iteração no fetcher, e a resposta do fetcher acaba no renderer” é invisível para um scanner por arquivo. Antes de qualquer chamada LLM, caminhamos pela árvore alvo com o ast do Python e construímos um mapa estrutural. O Scanner vê uma vizinhança por arquivo (quem importa deste arquivo, o que este arquivo importa, assinaturas desses símbolos externos). O Chainer vê um mapa completo condensado (cada módulo, cada símbolo público, cada aresta de import, sem código-fonte). Essa é a menor quantidade de engenharia de contexto que encontramos que permite ao Chainer construir chains cujo fluxo de dados atravessa limites de módulo, sem pagar o custo de tokens de empilhar a base de código inteira em cada prompt.

Pré-requisitos

  • Python 3.12+
  • Uma chave de API Venice de venice.ai
  • Familiaridade básica com Pydantic, o módulo ast do Python e o SDK Python da OpenAI
O repo de referência usa uv para gerenciamento de dependências, mas um ambiente virtual normal funciona igualmente bem.

Configurando o projeto

Crie um novo projeto e instale as dependências:
mkdir venice-security-reviewer
cd venice-security-reviewer
uv init
uv add "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0"
Se preferir pip, crie um ambiente virtual:
python -m venv .venv
source .venv/bin/activate
pip install "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0"
Crie um arquivo .env para desenvolvimento local:
VENICE_API_KEY=your-venice-api-key-here
# Sobrescritas opcionais
# VENICE_BASE_URL=https://api.venice.ai/api/v1
# VENICE_MODEL=zai-org-glm-5
Vamos colocar o código-fonte em src/venice_security_reviewer/ para mantê-lo importável como pacote, com os prompts em prompts/ na raiz do repo para que possam ser revisados e diferenciados como qualquer outro artefato de código:
src/venice_security_reviewer/
  __init__.py
  models.py     # Modelos Pydantic
  client.py     # Fábrica do cliente Venice
  repo_map.py   # Repo map construído por AST
  scanner.py    # Agente Scanner
  chainer.py    # Agente Chainer
  report.py     # Renderização Markdown com Jinja2
  cli.py        # CLI com Typer
  templates/
    report.md.j2
prompts/
  scanner.md
  chainer.md
tests/
  test_models.py
  test_cross_file_chain.py

Configurando o cliente Venice

A Venice é compatível com OpenAI, então podemos usar o SDK Python oficial da OpenAI e apenas apontar seu base_url para a Venice. Centralizar a construção do cliente em um arquivo significa que o resto do código nunca precisa saber com qual provedor está falando: trocar backends afetaria apenas este módulo. Crie src/venice_security_reviewer/client.py:
from __future__ import annotations

import os
from dataclasses import dataclass

from dotenv import load_dotenv
from openai import OpenAI

DEFAULT_BASE_URL = "https://api.venice.ai/api/v1"
DEFAULT_MODEL = "zai-org-glm-5"


class VeniceConfigError(RuntimeError):
    """Raised when Venice client config is missing or invalid."""


@dataclass(frozen=True, slots=True)
class VeniceConfig:
    api_key: str
    base_url: str
    model: str

    @classmethod
    def from_env(cls) -> "VeniceConfig":
        load_dotenv()
        api_key = os.getenv("VENICE_API_KEY")
        if not api_key:
            raise VeniceConfigError(
                "VENICE_API_KEY is not set. Add it to your .env file, "
                "or export VENICE_API_KEY in your shell."
            )
        return cls(
            api_key=api_key,
            base_url=os.getenv("VENICE_BASE_URL", DEFAULT_BASE_URL),
            model=os.getenv("VENICE_MODEL", DEFAULT_MODEL),
        )


def build_client(config: VeniceConfig | None = None) -> tuple[OpenAI, str]:
    cfg = config or VeniceConfig.from_env()
    client = OpenAI(api_key=cfg.api_key, base_url=cfg.base_url)
    return client, cfg.model
Algumas coisas valem nota:
  • Por padrão usamos zai-org-glm-5 porque é um modelo Venice forte de uso geral, mas você pode sobrescrever com a variável de ambiente VENICE_MODEL. Para bases de código maiores ou mais nuançadas, trocar para um modelo mais forte pode tornar o Chainer notavelmente melhor em qualidade narrativa.
  • build_client retorna o cliente e o id do modelo, então os chamadores não precisam ler variáveis de ambiente eles mesmos e testes podem injetar uma configuração falsa sem monkeypatching.

Definindo os modelos de dados

O ponto de usar Pydantic aqui, em vez de passar dicts crus, é que temos um limite duro de validação entre o LLM e o resto do programa. Se o modelo retornar JSON malformado ou inventar um ID de finding que não existe, o parsing falha alto e nunca propagamos a alucinação para o relatório. Crie src/venice_security_reviewer/models.py:
from __future__ import annotations

from pathlib import Path
from typing import Literal, Self

from pydantic import BaseModel, ConfigDict, Field, model_validator

Severity = Literal["low", "medium", "high", "critical"]
ChainSeverity = Literal["high", "critical"]


class Evidence(BaseModel):
    """A concrete code span that justifies a finding."""

    model_config = ConfigDict(frozen=True)

    file: Path
    start_line: int = Field(ge=1)
    end_line: int = Field(ge=1)
    snippet: str

    @model_validator(mode="after")
    def _check_line_range(self) -> Self:
        if self.end_line < self.start_line:
            raise ValueError(
                f"end_line ({self.end_line}) must be >= start_line ({self.start_line})"
            )
        return self


class Finding(BaseModel):
    """An atomic vulnerability surfaced by the Scanner agent."""

    model_config = ConfigDict(frozen=True)

    id: str = Field(pattern=r"^F-\d{3,}$")
    title: str = Field(min_length=1)
    severity: Severity
    description: str = Field(min_length=1)
    cwe: str | None = None
    evidence: Evidence


class Chain(BaseModel):
    """An exploit chain combining two or more atomic findings."""

    model_config = ConfigDict(frozen=True)

    id: str = Field(pattern=r"^C-\d{3,}$")
    findings: list[str] = Field(min_length=2)
    narrative: str = Field(min_length=1)
    severity: ChainSeverity
    files_involved: list[Path] = Field(min_length=1)
As restrições estão fazendo trabalho real aqui:
  • Finding.id e Chain.id são restritos a uma regex como F-001, C-001. Se o modelo for criativo com o formato, a validação falha.
  • Chain.findings requer pelo menos duas entradas: uma “chain” de um único finding é só um finding.
  • Chain.severity é restrito a high ou critical. Uma combinação de findings que não eleva o impacto acima da maior severidade individual não é uma chain que vale a pena reportar.
  • Evidence impõe que end_line >= start_line para que o modelo não retorne faixas de linha sem sentido.
Essa é a validação de forma. Também precisamos de validação de referência cruzada: uma chain que referencia um ID de finding que o Scanner nunca produziu é sem sentido. Adicione esta função ao models.py:
def validate_chain_references(
    chains: list[Chain], findings: list[Finding]
) -> tuple[list[Chain], list[Chain]]:
    findings_by_id = {f.id: f for f in findings}
    valid: list[Chain] = []
    dropped: list[Chain] = []
    for chain in chains:
        if not all(ref in findings_by_id for ref in chain.findings):
            dropped.append(chain)
            continue
        chain_evidence_files = {
            findings_by_id[ref].evidence.file.as_posix() for ref in chain.findings
        }
        if not all(p.as_posix() in chain_evidence_files for p in chain.files_involved):
            dropped.append(chain)
            continue
        valid.append(chain)
    return valid, dropped
Este é o guardrail determinístico que mantém o Chainer honesto. Ele só pode referenciar findings que o Scanner de fato produziu, e só pode reivindicar arquivos envolvidos na chain dos quais um dos findings de fato veio. Retornar as chains descartadas em vez de filtrá-las silenciosamente permite ao CLI sinalizar um aviso quando o modelo tenta inventar algo.

Construindo o repo map AST

O repo map é o esqueleto estrutural de uma base de código Python: a superfície pública de cada módulo, cada aresta de import e um índice reverso de “módulo M” para “módulos que importam de M”. Ele é construído uma vez por execução de scan com o ast do Python, nunca via execução, então é seguro rodar em código adversarial: o parser não importa nem invoca nada da árvore escaneada. Vamos consumir o mapa em duas formas. O Scanner recebe uma fatia de vizinhança por arquivo para que seus prompts permaneçam de tamanho limitado. O Chainer recebe um mapa completo condensado para que possa construir chains entre arquivos. Crie src/venice_security_reviewer/repo_map.py e comece com os modelos Pydantic que descrevem o mapa:
from __future__ import annotations

import ast
import logging
from collections.abc import Iterable
from pathlib import Path
from typing import Literal

from pydantic import BaseModel, ConfigDict, Field

logger = logging.getLogger(__name__)

SymbolKind = Literal["function", "class", "constant"]
_SIGNATURE_CHAR_CAP = 200

SKIP_DIR_NAMES: frozenset[str] = frozenset({
    ".git", ".venv", "venv", "env", "__pycache__", "node_modules",
    "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "site-packages",
})


class SymbolDef(BaseModel):
    model_config = ConfigDict(frozen=True)
    name: str
    kind: SymbolKind
    line: int = Field(ge=1)
    signature: str | None = None


class ImportEdge(BaseModel):
    model_config = ConfigDict(frozen=True)
    from_module: str
    imported_names: list[str]
    line: int = Field(ge=1)


class ModuleEntry(BaseModel):
    model_config = ConfigDict(frozen=True)
    path: Path
    module_name: str
    defines: list[SymbolDef]
    imports: list[ImportEdge]
    exports: list[str]
Agora o helper que caminha pela árvore e pula diretórios que não devemos indexar:
def _iter_python_files(root: Path) -> Iterable[Path]:
    for path in sorted(root.rglob("*.py")):
        if any(part in SKIP_DIR_NAMES for part in path.parts):
            continue
        if path.is_file():
            yield path


def _path_to_module_name(path: Path, root: Path) -> str:
    rel = path.relative_to(root)
    parts = list(rel.with_suffix("").parts)
    if parts and parts[-1] == "__init__":
        parts = parts[:-1]
    return ".".join(parts)
Para cada arquivo queremos três coisas do AST: os símbolos top-level que ele define, as arestas de import e uma lista __all__ explícita se uma estiver presente. Assinaturas de função e cabeçalhos de classe são renderizados como strings compactas que o LLM pode ler diretamente:
def _render_signature(node: ast.FunctionDef | ast.AsyncFunctionDef) -> str:
    try:
        prefix = "async def " if isinstance(node, ast.AsyncFunctionDef) else "def "
        args = ast.unparse(node.args)
        returns = f" -> {ast.unparse(node.returns)}" if node.returns is not None else ""
        sig = f"{prefix}{node.name}({args}){returns}"
        if len(sig) > _SIGNATURE_CHAR_CAP:
            return f"{prefix}{node.name}(...)"
        return sig
    except Exception:
        return f"def {node.name}(...)"


def _render_class_header(node: ast.ClassDef) -> str:
    try:
        bases = [ast.unparse(b) for b in node.bases]
        sig = f"class {node.name}({', '.join(bases)})" if bases else f"class {node.name}"
        if len(sig) > _SIGNATURE_CHAR_CAP:
            return f"class {node.name}(...)"
        return sig
    except Exception:
        return f"class {node.name}"
O _SIGNATURE_CHAR_CAP de 200 preserva assinaturas reais típicas (incluindo type hints) e evita casos patológicos como uma união tipada de 200 linhas estourando o prompt. A seguir, o extractor que puxa os dados estruturais de um módulo parseado. Lidamos com ast.FunctionDef, ast.ClassDef, ast.Assign top-level e ast.AnnAssign para constantes, e tanto ast.Import quanto ast.ImportFrom para as arestas de import. Imports relativos são resolvidos para sua forma pontuada absoluta para que o Chainer possa correspondê-los a nomes de módulo depois:
def _resolve_relative_package(
    *, importer_module: str, importer_is_init: bool, level: int
) -> str | None:
    if level <= 0:
        return None
    importer_parts = importer_module.split(".") if importer_module else []
    base_parts = list(importer_parts) if importer_is_init else importer_parts[:-1]
    steps_up = level - 1
    if steps_up > len(base_parts):
        return None
    package_parts = (
        base_parts[: len(base_parts) - steps_up] if steps_up else list(base_parts)
    )
    return ".".join(package_parts)
A lógica completa de extração caminha por tree.body e emite entradas SymbolDef e ImportEdge para cada nó top-level. A função _extract no repo_map.py do repo de referência cobre a implementação completa. A forma que sai é uma lista de objetos ModuleEntry, um por arquivo. A parte interessante é o que fazemos com essas entradas. Envolva-as em um RepoMap com dois métodos voltados ao consumidor:
class RepoMap(BaseModel):
    model_config = ConfigDict(frozen=True)
    root: Path
    modules: list[ModuleEntry]

    def by_module_name(self, module_name: str) -> ModuleEntry | None:
        for m in self.modules:
            if m.module_name == module_name:
                return m
        return None

    def importers_of(self, module_name: str) -> list["ImportingRef"]:
        refs: list["ImportingRef"] = []
        for m in self.modules:
            for edge in m.imports:
                if edge.from_module == module_name:
                    refs.append(
                        ImportingRef(
                            importer_path=m.path,
                            importer_module=m.module_name,
                            imported_names=list(edge.imported_names),
                            line=edge.line,
                        )
                    )
        return refs

    def neighborhood(self, path: Path) -> "ModuleNeighborhood | None":
        m = next((mod for mod in self.modules if mod.path == path), None)
        if m is None:
            return None
        return ModuleNeighborhood(
            this_module=m,
            imported_by=self.importers_of(m.module_name),
            imports_from_repo=self.resolve_imports_in_repo(m.module_name),
        )

    def condensed_dict(self) -> dict[str, object]:
        return {
            "modules": [
                {
                    "path": str(m.path),
                    "module": m.module_name,
                    "exports": list(m.exports),
                    "imports": [
                        {"from": e.from_module, "names": list(e.imported_names)}
                        for e in m.imports
                    ],
                }
                for m in self.modules
            ]
        }
neighborhood(path) é o que o Scanner chama para cada arquivo. Retorna um objeto ModuleNeighborhood contendo o próprio módulo, cada outro módulo que importa dele e cada símbolo do repo que ele importa de outro lugar (com suas assinaturas resolvidas). Isso dá ao Scanner contexto suficiente para sinalizar findings que só são óbvios em contexto cross-file, sem arrastar toda a base de código para o prompt. condensed_dict() é o que o Chainer recebe. Snippets e assinaturas são descartados; só permanecem caminhos, nomes de módulo, exports públicos e arestas de import. Essa é a menor representação que ainda permite ao Chainer raciocinar sobre fluxo de dados cross-module. Finalmente, o ponto de entrada que constrói tudo:
def build_repo_map(root: Path) -> RepoMap:
    root = root.resolve()
    modules: list[ModuleEntry] = []
    for path in _iter_python_files(root):
        rel = path.relative_to(root)
        module_name = _path_to_module_name(path, root)
        is_init = path.stem == "__init__"
        try:
            source = path.read_text(encoding="utf-8")
            tree = ast.parse(source)
        except (OSError, SyntaxError, UnicodeDecodeError) as exc:
            logger.warning("repo_map: skipping %s: %s", rel, exc)
            continue
        defines, imports, explicit_all = _extract(
            tree, importer_module=module_name, importer_is_init=is_init
        )
        exports = explicit_all or [s.name for s in defines if not s.name.startswith("_")]
        modules.append(
            ModuleEntry(
                path=rel,
                module_name=module_name,
                defines=defines,
                imports=imports,
                exports=exports,
            )
        )
    return RepoMap(root=root, modules=modules)
Arquivos que não conseguimos ler ou que falham no parse são registrados e pulados. Retornamos um mapa parcial em vez de falhar a execução inteira; o pior caso é que uma chamada do Scanner veja nenhuma vizinhança para um arquivo, o que ainda é um scan funcional.

Escrevendo o agente Scanner

O Scanner caminha por um caminho alvo, pega arquivos de código-fonte Python e pede à Venice para identificar vulnerabilidades atômicas, um arquivo por vez. Scanning por arquivo mantém o prompt pequeno e torna as falhas isoladas: um arquivo ruim não mata a execução inteira. Manteremos o prompt em si em um arquivo separado para que possa ser revisado e diferenciado como qualquer outro artefato de código. Crie prompts/scanner.md:
You are a static security analyst reviewing a single Python source file for
vulnerabilities. You will be given the file path, its full contents, and a
*neighborhood* slice of the surrounding repo: which other modules import
from this file (and what symbols they pull), and which in-repo symbols this
file imports from elsewhere. You must respond with a JSON object that lists
every distinct vulnerability you can identify, with concrete file:line
evidence for each.

# Rules

1. Output a single JSON object. No prose before or after. No markdown fences.
2. The object must match this schema exactly:

```json
{
  "findings": [
    {
      "id": "F-001",
      "title": "Short imperative title, e.g. 'Hardcoded session signing key'",
      "severity": "low | medium | high | critical",
      "description": "One to three sentences explaining the vulnerability and why it matters.",
      "cwe": "CWE-798 or null if not applicable",
      "evidence": {
        "file": "{filename}",
        "start_line": 12,
        "end_line": 14,
        "snippet": "the exact lines from the source, copied verbatim including whitespace"
      }
    }
  ]
}
```

3. Finding IDs must be sequential within this file: F-001, F-002, F-003, etc.
4. The `file` field in evidence must equal the filename you were given, exactly.
5. `start_line` and `end_line` must be 1-indexed line numbers from the source you were given.
6. The `snippet` must be the exact text of those lines, copied verbatim. Do not paraphrase. Do not truncate.
7. Do not invent vulnerabilities. If you are unsure, omit it. False positives waste the operator's time and erode trust in the tool.
8. Every finding's evidence must point at lines in THIS file. Do not produce findings whose evidence lives in a different file. The Chainer is the agent that reasons across files.
9. If the file contains no vulnerabilities, return `{"findings": []}`.
O prompt completo no repo de referência também contém uma seção “What to look for” listando classes comuns de vulnerabilidade (segredos hardcoded, SQL injection, command injection, SSRF, deserialização insegura, etc.) e uma seção “How to use the neighborhood” explicando como o modelo deve consumir o contexto cross-file. Algumas notas de design de prompt:
  • Dizemos ao modelo para emitir apenas JSON, sem prosa ou cercas. O SDK da OpenAI suporta um parâmetro response_format={"type": "json_object"} que reforça isso no lado da API, mas reforçá-lo no prompt reduz casos extremos.
  • Explicitamente proibimos o Scanner de produzir chains cross-file. Chains são trabalho do Chainer, e pedir ao Scanner para fazer ambos turva a responsabilidade.
  • Exigimos que o snippet seja copiado palavra por palavra. Isso significa que o relatório pode citar os bytes exatos que o modelo alega ter visto, e um revisor pode verificar pontualmente um finding comparando o snippet ao código-fonte.
Agora o código do agente. Crie src/venice_security_reviewer/scanner.py e comece com o caminhador de arquivos e o carregador de prompt:
from __future__ import annotations

import json
import logging
from collections.abc import Iterable, Iterator
from pathlib import Path

from openai import OpenAI
from pydantic import ValidationError

from .models import Finding
from .repo_map import ModuleNeighborhood, RepoMap

logger = logging.getLogger(__name__)

DEFAULT_SOURCE_EXTENSIONS: frozenset[str] = frozenset({".py"})

SKIP_DIR_NAMES: frozenset[str] = frozenset({
    ".git", ".venv", "venv", "env", "__pycache__", "node_modules",
    "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "site-packages",
})

MAX_FILE_BYTES = 200_000


def _load_prompt_template(name: str) -> str:
    here = Path(__file__).resolve()
    return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8")


def iter_source_files(
    root: Path, extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS
) -> Iterator[Path]:
    exts = {e.lower() for e in extensions}
    for path in sorted(root.rglob("*")):
        if not path.is_file():
            continue
        if path.suffix.lower() not in exts:
            continue
        if any(part in SKIP_DIR_NAMES for part in path.parts):
            continue
        try:
            if path.stat().st_size > MAX_FILE_BYTES:
                logger.warning("skipping %s: exceeds %d bytes", path, MAX_FILE_BYTES)
                continue
        except OSError:
            continue
        yield path
MAX_FILE_BYTES é um teto de segurança. Acima de ~200 KB pulamos em vez de enviar um prompt enorme que provavelmente será caro e de baixa qualidade. A próxima peça é o construtor de prompt. O template usa {filename}, {source} e {neighborhood} como placeholders; usamos str.replace em vez de .format() porque o template contém exemplos de JSON com chaves literais:
def _render_neighborhood(neighborhood: ModuleNeighborhood | None) -> str:
    if neighborhood is None:
        return "null"
    return neighborhood.model_dump_json(indent=2)


def _build_prompt(
    template: str, *, filename: str, source: str, neighborhood: ModuleNeighborhood | None
) -> str:
    return (
        template.replace("{filename}", filename)
        .replace("{source}", source)
        .replace("{neighborhood}", _render_neighborhood(neighborhood))
    )
Agora o parser. Desserializamos o JSON, validamos cada finding através do Pydantic e descartamos findings individuais malformados em vez de falhar o arquivo inteiro. Um finding ruim não deve nos custar os bons:
def _parse_findings(raw: str, *, source_file: Path) -> list[Finding]:
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError(f"model did not return valid JSON: {exc}") from exc

    if not isinstance(data, dict) or "findings" not in data:
        raise ValueError("model JSON missing 'findings' key")

    findings: list[Finding] = []
    for entry in data["findings"]:
        try:
            findings.append(Finding.model_validate(entry))
        except ValidationError as exc:
            logger.warning("dropping malformed finding from %s: %s", source_file, exc)
    return findings
O Scanner emite IDs como F-001 por arquivo, mas o Chainer precisa referenciar findings em toda a repo. Reemitimos os IDs contra um contador monotônico para que sejam globalmente únicos:
def _renumber_findings(findings: list[Finding], offset: int) -> tuple[list[Finding], int]:
    renumbered: list[Finding] = []
    for i, f in enumerate(findings):
        new_id = f"F-{offset + i + 1:03d}"
        renumbered.append(f.model_copy(update={"id": new_id}))
    return renumbered, offset + len(findings)
A chamada de scan de arquivo único combina tudo isso. Lemos o arquivo, construímos o prompt, enviamos à Venice com response_format={"type": "json_object"} e baixa temperatura, e parseamos o resultado:
def scan_file(
    client: OpenAI,
    model: str,
    path: Path,
    *,
    prompt_template: str,
    repo_root: Path,
    repo_map: RepoMap,
    max_retries: int = 1,
) -> list[Finding]:
    try:
        source = path.read_text(encoding="utf-8")
    except (OSError, UnicodeDecodeError) as exc:
        logger.warning("could not read %s: %s", path, exc)
        return []

    rel = path.relative_to(repo_root)
    neighborhood = repo_map.neighborhood(rel)
    prompt = _build_prompt(
        prompt_template, filename=str(rel), source=source, neighborhood=neighborhood
    )

    last_error: Exception | None = None
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You are a precise static security analyst. You respond "
                            "only with valid JSON matching the schema in the user prompt."
                        ),
                    },
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.1,
            )
        except Exception as exc:
            logger.warning("Venice call failed for %s on attempt %d: %s", rel, attempt, exc)
            last_error = exc
            continue

        content = response.choices[0].message.content or ""
        try:
            findings = _parse_findings(content, source_file=path)
        except ValueError as exc:
            logger.warning("parse failure for %s on attempt %d: %s", rel, attempt, exc)
            last_error = exc
            continue

        return [
            f.model_copy(update={"evidence": f.evidence.model_copy(update={"file": rel})})
            for f in findings
        ]

    logger.error("giving up on %s after %d attempts: %s", rel, max_retries + 1, last_error)
    return []
Dois detalhes vale destacar:
  • Ajustamos o caminho do arquivo de evidência para ser relativo a repo_root após o parsing, já que o modelo ecoa de volta qualquer nome de arquivo que demos a ele, mas queremos uma única forma canônica em todo o relatório.
  • temperature=0.1 é intencionalmente baixa. Queremos que o Scanner seja conservador e consistente entre execuções; criatividade é trabalho do Chainer.
Finalmente, o orquestrador que escaneia cada arquivo elegível sob o root:
def scan_path(
    client: OpenAI,
    model: str,
    root: Path,
    repo_map: RepoMap,
    *,
    extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS,
) -> list[Finding]:
    template = _load_prompt_template("scanner.md")
    all_findings: list[Finding] = []
    offset = 0
    for path in iter_source_files(root, extensions=extensions):
        logger.info("scanning %s", path.relative_to(root))
        findings = scan_file(
            client, model, path,
            prompt_template=template,
            repo_root=root,
            repo_map=repo_map,
        )
        renumbered, offset = _renumber_findings(findings, offset)
        all_findings.extend(renumbered)
    return all_findings
O repo map é construído uma vez pelo chamador e reutilizado para cada arquivo, então o Scanner vê uma estrutura global consistente mesmo quando arquivos individuais falham no parse ou são pulados.

Escrevendo o agente Chainer

O Chainer toma a união de findings do Scanner mais o repo map condensado e pergunta à Venice se algum dos findings se combina em uma exploit chain real. Dois guardrails determinísticos ficam entre o LLM e o relatório:
  1. Cada chain deve referenciar apenas IDs de finding que o Scanner produziu.
  2. Cada chain deve reivindicar apenas arquivos que a evidência de pelo menos um finding referenciado toca.
Chains que violam qualquer regra são descartadas no momento do parse. Isso impede o modelo de alucinar chains “para garantir” e de reivindicar que uma chain se estende por arquivos para os quais não tem evidência. O prompt do Chainer vive em prompts/chainer.md. O núcleo dele parece com isto:
You are a senior offensive security engineer. You are given a list of atomic
vulnerability findings discovered in a single codebase, plus a structural map
of that codebase showing every module's public symbols and import edges. Your
job is to identify whether any subset of the findings can be combined into a
real, end-to-end exploit chain.

# Rules

1. Output a single JSON object. No prose before or after. No markdown fences.
2. The object must match this schema exactly:

```json
{
  "chains": [
    {
      "id": "C-001",
      "findings": ["F-001", "F-003"],
      "narrative": "Step-by-step explanation of how an attacker combines these specific findings into a single exploit. Reference each finding by ID where it is used.",
      "severity": "high | critical",
      "files_involved": ["pkg/validators.py", "pkg/fetcher.py"]
    }
  ]
}
```

3. Chain IDs must be sequential: C-001, C-002, C-003, etc.
4. Every entry in `findings` MUST be the ID of a finding from the input list. You may NOT invent new finding IDs.
5. Every entry in `files_involved` MUST be the `evidence.file` of at least one of the findings you reference in this chain.
6. A chain must reference at least two distinct findings.
7. Chains are by definition severity high or critical. If a combination doesn't raise the impact above the highest individual severity, it is not a chain worth reporting.
8. If no real chain exists, return `{"chains": []}`. It is correct and expected for many codebases to have findings that do not chain.
O prompt completo no repo de referência também explica como ler o repo map, como decidir o que vai em files_involved e, crucialmente, quando não encadear. Dizer ao modelo “it is correct and expected for many codebases to have findings that do not chain” é o que o impede de inventar chains para parecer produtivo. Agora o código do agente. Crie src/venice_security_reviewer/chainer.py:
from __future__ import annotations

import json
import logging
from pathlib import Path

from openai import OpenAI
from pydantic import ValidationError

from .models import Chain, Finding, validate_chain_references
from .repo_map import RepoMap

logger = logging.getLogger(__name__)

MAX_REPO_MAP_CHARS = 8000


def _load_prompt_template(name: str) -> str:
    here = Path(__file__).resolve()
    return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8")
MAX_REPO_MAP_CHARS = 8000 é um teto suave para o bloco do repo map renderizado em JSON no prompt do Chainer. A aproximadamente 4 caracteres por token, isso são ~2000 tokens, o que cabe confortavelmente em qualquer janela de contexto de modelo Venice mesmo com findings e o orçamento da narrativa por cima. Serializamos findings em um bloco JSON compacto. Note que removemos o snippet da evidência aqui de propósito: o Chainer não precisa dos bytes crus para decidir se dois findings combinam, e incluí-los aproximadamente dobra o custo de tokens em bases de código reais:
def _findings_to_input_json(findings: list[Finding]) -> str:
    payload = [
        {
            "id": f.id,
            "title": f.title,
            "severity": f.severity,
            "description": f.description,
            "cwe": f.cwe,
            "evidence": {
                "file": str(f.evidence.file),
                "start_line": f.evidence.start_line,
                "end_line": f.evidence.end_line,
            },
        }
        for f in findings
    ]
    return json.dumps(payload, indent=2)
Para bases de código maiores, o repo map completo condensado pode estourar nosso orçamento de caracteres. Quando isso acontece, podamos para módulos que carregam findings mais seus vizinhos diretos. Isso preserva estrutura suficiente para o Chainer raciocinar sobre chains para as quais temos evidência, e descarta o resto:
def _prune_for_budget(
    repo_map: RepoMap, findings: list[Finding], *, char_budget: int
) -> dict[str, object]:
    full = repo_map.condensed_dict()
    if len(json.dumps(full)) <= char_budget:
        return full

    finding_files = {f.evidence.file for f in findings}
    keep_modules = {
        m.module_name for m in repo_map.modules if m.path in finding_files
    }
    if not keep_modules:
        return full

    neighbours: set[str] = set()
    for m in repo_map.modules:
        if m.module_name in keep_modules:
            for edge in m.imports:
                neighbours.add(edge.from_module)
        for edge in m.imports:
            if edge.from_module in keep_modules:
                neighbours.add(m.module_name)
    keep_modules.update(neighbours)

    pruned_modules = [
        {
            "path": str(m.path),
            "module": m.module_name,
            "exports": list(m.exports),
            "imports": [
                {"from": e.from_module, "names": list(e.imported_names)}
                for e in m.imports
            ],
        }
        for m in repo_map.modules
        if m.module_name in keep_modules
    ]
    return {
        "modules": pruned_modules,
        "_pruned": True,
        "_kept": len(pruned_modules),
        "_total": len(repo_map.modules),
    }


def _render_repo_map(
    repo_map: RepoMap, findings: list[Finding], *, char_budget: int = MAX_REPO_MAP_CHARS
) -> str:
    payload = _prune_for_budget(repo_map, findings, char_budget=char_budget)
    if payload.get("_pruned"):
        logger.info(
            "chainer: repo map pruned for token budget (kept %s of %s modules)",
            payload.get("_kept"),
            payload.get("_total"),
        )
    return json.dumps(payload, indent=2)
A estratégia de poda é intencionalmente simples: mantenha os módulos em que nossos findings vivem e mantenha seus vizinhos diretos do grafo de imports. Qualquer coisa mais distante não tem papel plausível em uma chain para a qual temos evidência, então pode ser descartada sem perder poder de raciocínio. Também anotamos o payload com marcadores _pruned, _kept e _total, para que o prompt do Chainer possa avisar o modelo quando o mapa foi aparado. Parsear a resposta tem a mesma forma do Scanner: desserializar, validar cada chain via Pydantic, descartar entradas malformadas:
def _parse_chains(raw: str) -> list[Chain]:
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError(f"chainer did not return valid JSON: {exc}") from exc

    if not isinstance(data, dict) or "chains" not in data:
        raise ValueError("chainer JSON missing 'chains' key")

    chains: list[Chain] = []
    for entry in data["chains"]:
        try:
            chains.append(Chain.model_validate(entry))
        except ValidationError as exc:
            logger.warning("dropping malformed chain: %s", exc)
    return chains
Depois o próprio agente:
def find_chains(
    client: OpenAI,
    model: str,
    findings: list[Finding],
    repo_map: RepoMap,
    *,
    max_retries: int = 1,
) -> tuple[list[Chain], list[Chain]]:
    if len(findings) < 2:
        return [], []

    template = _load_prompt_template("chainer.md")
    prompt = template.replace(
        "{findings_json}", _findings_to_input_json(findings)
    ).replace("{repo_map}", _render_repo_map(repo_map, findings))

    last_error: Exception | None = None
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You are a senior offensive security engineer. You respond "
                            "only with valid JSON matching the schema in the user prompt."
                        ),
                    },
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.2,
            )
        except Exception as exc:
            logger.warning("Venice call failed on attempt %d: %s", attempt, exc)
            last_error = exc
            continue

        content = response.choices[0].message.content or ""
        try:
            chains = _parse_chains(content)
        except ValueError as exc:
            logger.warning("chainer parse failure on attempt %d: %s", attempt, exc)
            last_error = exc
            continue

        valid, dropped = validate_chain_references(chains, findings)
        if dropped:
            logger.warning(
                "chainer referenced %d unknown finding id(s) or file(s); chains dropped: %s",
                len(dropped),
                [c.id for c in dropped],
            )
        return valid, dropped

    logger.error("giving up on chainer after %d attempts: %s", max_retries + 1, last_error)
    return [], []
Algumas coisas vale apontar:
  • Saímos antes de chamar o modelo quando há menos de dois findings. Você não pode encadear um único finding, e pular a chamada significa que não queimamos tokens em um resultado garantido vazio.
  • temperature=0.2 é um pouco maior que 0.1 do Scanner. O Chainer se beneficia de um toque mais de criatividade para detectar combinações não óbvias, mas ainda queremos que ele esteja ancorado nos findings e mapa que lhe foram dados.
  • Após parsear, validate_chain_references executa a verificação determinística de referência cruzada que escrevemos antes. Qualquer coisa que sobrevive é segura de renderizar; qualquer coisa que não sobrevive é registrada para que o operador saiba que o modelo tentou inventar algo.
Essa verificação de referência cruzada é a peça mais importante do projeto inteiro. É o limite entre “ferramenta de segurança útil” e “relatório de IA ocasionalmente confiantemente errado”. Com isso no lugar, mesmo se o modelo alucina, a chain errada nunca alcança o relatório.

Renderizando o relatório Markdown

Manter a renderização separada da lógica do agente significa que os mesmos objetos Finding e Chain podem ser alimentados posteriormente em outros formatos (JSON, SARIF, HTML) sem tocar o Scanner ou Chainer. Usaremos Jinja2 com um pequeno arquivo de template. Crie src/venice_security_reviewer/templates/report.md.j2:
# Security Review Report

**Target:** `{{ target }}`
**Scanned at:** {{ scanned_at }}
**Model:** `{{ model }}`

---

## Summary

- **Atomic findings:** {{ findings | length }}
- **Exploit chains:** {{ chains | length }}
{%- if dropped_chains %}
- **Dropped chains (referenced unknown findings):** {{ dropped_chains | length }}
{%- endif %}

---

## Exploit Chains

{% if not chains %}
_No exploit chains were identified by the Chainer agent._
{% else %}
{% for c in chains %}
### {{ c.id }}{{ c.severity | upper }}

**Findings combined:** {{ c.findings | join(', ') }}
**Files involved:** {{ c.files_involved | map('string') | join(', ') }}

{{ c.narrative }}

{% endfor %}
{% endif %}

---

## Atomic Findings

{% for f in findings %}
### {{ f.id }}{{ f.title }}

- **Severity:** {{ f.severity }}
{%- if f.cwe %}
- **CWE:** {{ f.cwe }}
{%- endif %}
- **Location:** `{{ f.evidence.file }}:{{ f.evidence.start_line }}-{{ f.evidence.end_line }}`

{{ f.description }}

```
{{ f.evidence.snippet }}
```

{% endfor %}
Depois o renderizador em src/venice_security_reviewer/report.py:
from __future__ import annotations

from datetime import UTC, datetime
from pathlib import Path

from jinja2 import Environment, PackageLoader, select_autoescape

from .models import Chain, Finding


def _build_env() -> Environment:
    return Environment(
        loader=PackageLoader("venice_security_reviewer", "templates"),
        autoescape=select_autoescape(enabled_extensions=("html",)),
        keep_trailing_newline=True,
    )


def render_report(
    *,
    target: Path,
    model: str,
    findings: list[Finding],
    chains: list[Chain],
    dropped_chains: list[Chain] | None = None,
) -> str:
    env = _build_env()
    template = env.get_template("report.md.j2")
    return template.render(
        target=str(target),
        scanned_at=datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC"),
        model=model,
        findings=findings,
        chains=chains,
        dropped_chains=dropped_chains or [],
    )
Autoescape fica desligado para o template Markdown (Markdown não é HTML), mas o deixamos habilitado para quaisquer futuros templates .html por extensão.

Conectando o CLI

O CLI é o orquestrador: constrói o repo map, escaneia, encadeia, renderiza. Usaremos Typer para tratar o parsing de argumentos e Rich para imprimir uma tabela de resumo bonita. Crie src/venice_security_reviewer/cli.py:
from __future__ import annotations

import logging
import sys
from pathlib import Path
from typing import Annotated

import typer
from rich.console import Console
from rich.table import Table

from .chainer import find_chains
from .client import VeniceConfigError, build_client
from .models import Chain, Finding
from .repo_map import build_repo_map
from .report import render_report
from .scanner import scan_path

app = typer.Typer(
    add_completion=False,
    help="Two-agent security code reviewer powered by Venice AI.",
    no_args_is_help=True,
)
console = Console()


@app.callback()
def _root() -> None:
    """Force Typer to keep `scan` as a named subcommand."""


def _configure_logging(verbose: bool) -> None:
    logging.basicConfig(
        level=logging.DEBUG if verbose else logging.INFO,
        format="%(levelname)s %(name)s: %(message)s",
        stream=sys.stderr,
    )


def _print_summary(
    findings: list[Finding], chains: list[Chain], dropped: list[Chain]
) -> None:
    table = Table(title="Scan summary", show_header=True, header_style="bold")
    table.add_column("Metric")
    table.add_column("Count", justify="right")
    table.add_row("Atomic findings", str(len(findings)))
    table.add_row("Exploit chains", str(len(chains)))
    if dropped:
        table.add_row("Chains dropped (bad refs)", str(len(dropped)))
    console.print(table)


@app.command()
def scan(
    path: Annotated[
        Path,
        typer.Argument(
            exists=True, file_okay=False, dir_okay=True, readable=True, resolve_path=True,
            help="Path to the codebase to scan.",
        ),
    ],
    out: Annotated[
        Path, typer.Option("--out", "-o", help="Where to write the Markdown report.")
    ] = Path("report.md"),
    verbose: Annotated[
        bool, typer.Option("--verbose", "-v", help="Enable debug logging.")
    ] = False,
) -> None:
    """Scan a codebase for vulnerabilities and exploit chains."""
    _configure_logging(verbose)

    try:
        client, model = build_client()
    except VeniceConfigError as exc:
        console.print(f"[red]error:[/red] {exc}")
        raise typer.Exit(code=2) from exc

    console.print(f"[bold]Indexing[/bold] {path} (AST repo map)...")
    repo_map = build_repo_map(path)
    edge_count = sum(len(m.imports) for m in repo_map.modules)
    console.print(
        f"Repo map: [bold]{len(repo_map.modules)}[/bold] module(s), "
        f"[bold]{edge_count}[/bold] import edge(s)."
    )

    console.print(f"[bold]Scanning[/bold] {path} with model [cyan]{model}[/cyan]...")
    findings = scan_path(client, model, path, repo_map)
    console.print(f"Scanner produced [bold]{len(findings)}[/bold] finding(s).")

    console.print("[bold]Chaining[/bold] findings...")
    chains, dropped = find_chains(client, model, findings, repo_map)
    console.print(f"Chainer produced [bold]{len(chains)}[/bold] chain(s).")

    report = render_report(
        target=path, model=model,
        findings=findings, chains=chains, dropped_chains=dropped,
    )
    out.write_text(report, encoding="utf-8")
    console.print(f"Report written to [green]{out}[/green]")
    _print_summary(findings, chains, dropped)


def main() -> None:
    app()


if __name__ == "__main__":
    main()
Adicione o ponto de entrada do script ao pyproject.toml:
[project.scripts]
venice-security-reviewer = "venice_security_reviewer.cli:app"
Esse é o pipeline inteiro conectado.

Testando os guardrails

Nos apoiamos firmemente em uma ideia ao longo desta construção: os guardrails determinísticos são o que separam uma ferramenta de segurança útil de uma confiantemente errada. Essa afirmação só vale ser feita se conseguirmos provar que os guardrails realmente seguram, então os testes mais valiosos neste projeto não chamam a Venice de jeito nenhum. Eles travam o limite Pydantic e a tubulação de montagem de prompt, o que significa que rodam offline, em milissegundos, sem chave de API e sem custo de tokens. Adicione as dependências dev primeiro:
uv add --dev "pytest>=8.3" "ruff>=0.7" "mypy>=1.13"
A primeira coisa que vale testar é o próprio limite do modelo. Esses testes afirmam que findings e chains malformados são rejeitados em tempo de construção, antes que possam alcançar um relatório. Crie tests/test_models.py:
from __future__ import annotations

from pathlib import Path

import pytest
from pydantic import ValidationError

from venice_security_reviewer.models import (
    Chain,
    Evidence,
    Finding,
    validate_chain_references,
)


def _finding(fid: str) -> Finding:
    return Finding(
        id=fid,
        title="t",
        severity="medium",
        description="d",
        evidence=Evidence(file=Path("a.py"), start_line=1, end_line=2, snippet="x"),
    )


def test_evidence_rejects_inverted_line_range() -> None:
    with pytest.raises(ValidationError):
        Evidence(file=Path("a.py"), start_line=10, end_line=5, snippet="x")


def test_finding_id_pattern_enforced() -> None:
    with pytest.raises(ValidationError):
        Finding(
            id="not-an-id",
            title="t",
            severity="medium",
            description="d",
            evidence=Evidence(file=Path("a.py"), start_line=1, end_line=2, snippet="x"),
        )


def test_chain_requires_two_findings() -> None:
    with pytest.raises(ValidationError):
        Chain(
            id="C-001",
            findings=["F-001"],
            narrative="n",
            severity="high",
            files_involved=[Path("a.py")],
        )
Cada um deles espelha uma restrição que colocamos nos modelos anteriormente: uma faixa de linha invertida, um ID que não corresponde ao padrão F-### e uma “chain” de um único finding. Se algum deles parar de levantar exceção, uma classe inteira de alucinação silenciosamente se torna possível novamente. O teste mais importante cobre o validador de referência cruzada, já que essa é a função que de fato descarta chains inventadas:
def test_validate_chain_references_drops_unknown_ids() -> None:
    findings = [_finding("F-001"), _finding("F-002")]
    good = Chain(
        id="C-001",
        findings=["F-001", "F-002"],
        narrative="n",
        severity="critical",
        files_involved=[Path("a.py")],
    )
    bad = Chain(
        id="C-002",
        findings=["F-001", "F-999"],
        narrative="n",
        severity="critical",
        files_involved=[Path("a.py")],
    )
    valid, dropped = validate_chain_references([good, bad], findings)
    assert [c.id for c in valid] == ["C-001"]
    assert [c.id for c in dropped] == ["C-002"]
F-999 nunca foi produzido pelo Scanner, então a chain que o referencia cai em dropped e nunca chega ao relatório. O teste companheiro no repo de referência, test_validate_chain_references_drops_unknown_files, faz o mesmo para uma chain que reivindica um arquivo do qual nenhum de seus findings veio. A segunda coisa que vale testar é a tubulação que alimenta o Chainer. É fácil refatorar a montagem do prompt e silenciosamente parar de passar contexto cross-file, momento em que o Chainer continuaria funcionando, mas silenciosamente ficaria pior. Este teste constrói um fixture de dois módulos, renderiza o prompt e afirma que a informação cross-file de fato está presente, novamente sem um round-trip à Venice. Crie tests/test_cross_file_chain.py:
from __future__ import annotations

from pathlib import Path

from venice_security_reviewer.chainer import (
    _findings_to_input_json,
    _load_prompt_template,
    _render_repo_map,
)
from venice_security_reviewer.models import Evidence, Finding
from venice_security_reviewer.repo_map import build_repo_map


def _write(root: Path, rel: str, content: str) -> None:
    path = root / rel
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content, encoding="utf-8")


def test_chainer_prompt_carries_cross_file_context(tmp_path: Path) -> None:
    _write(tmp_path, "validators.py", "def is_safe_url(url: str) -> bool:\n    return True")
    _write(
        tmp_path,
        "fetcher.py",
        "from .validators import is_safe_url\n\ndef fetch(url: str) -> bytes:\n    return b''",
    )

    rmap = build_repo_map(tmp_path)
    findings = [
        Finding(
            id="F-001",
            title="Validator returns True unconditionally",
            severity="low",
            description="The validator always returns True.",
            evidence=Evidence(
                file=Path("validators.py"), start_line=1, end_line=2, snippet="..."
            ),
        ),
        Finding(
            id="F-002",
            title="Fetcher trusts a stub validator",
            severity="low",
            description="The fetcher gates network access on is_safe_url.",
            evidence=Evidence(
                file=Path("fetcher.py"), start_line=1, end_line=1, snippet="..."
            ),
        ),
    ]

    template = _load_prompt_template("chainer.md")
    prompt = template.replace(
        "{findings_json}", _findings_to_input_json(findings)
    ).replace("{repo_map}", _render_repo_map(rmap, findings))

    assert "{findings_json}" not in prompt and "{repo_map}" not in prompt
    assert "F-001" in prompt and "F-002" in prompt
    assert "validators.py" in prompt and "fetcher.py" in prompt
    assert "is_safe_url" in prompt
Se este teste passa, o Chainer está recebendo um prompt que contém ambos os findings, ambos os caminhos de arquivo e a aresta de import entre eles. Se o modelo usa essa informação bem é uma avaliação separada, fora de banda; este teste só guarda a tubulação que coloca a informação no prompt em primeiro lugar. Execute a suíte inteira, mais o linter e o type checker, com:
uv run pytest          # testes offline, sem chamadas Venice ao vivo
uv run ruff check .
uv run mypy src/
Como nenhum desses testes toca a rede, é seguro rodá-los em cada commit e no CI sem queimar tokens ou precisar de chave Venice. O repo de referência também inclui tests/test_scanner_parse.py, tests/test_chainer_parse.py e tests/test_repo_map.py, que cobrem casos extremos de parsing JSON (entradas malformadas sendo descartadas em vez de derrubar a execução) e o construtor do repo map AST.

Executando o projeto

Para experimentá-lo em uma base de código real, aponte o CLI para um diretório de código-fonte Python:
uv run venice-security-reviewer scan path/to/your/code
Ou instale-o no seu virtualenv com pip install -e . e execute venice-security-reviewer scan path/to/your/code. A saída parece aproximadamente com isto:
Indexing /path/to/code (AST repo map)...
Repo map: 6 module(s), 14 import edge(s).
Scanning /path/to/code with model zai-org-glm-5...
Scanner produced 4 finding(s).
Chaining findings...
Chainer produced 1 chain(s).
Report written to report.md
                Scan summary
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━┓
┃ Metric                    ┃ Count ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━┩
│ Atomic findings           │     4 │
│ Exploit chains            │     1 │
└───────────────────────────┴───────┘
O relatório Markdown mostra cada chain no topo com sua narrativa, depois cada finding individual abaixo com severidade, CWE, localização do arquivo, descrição e o snippet palavra por palavra que o modelo alega ter lido. O repo de referência também vem com quatro alvos demo empacotados que cada um exercita uma forma diferente de raciocínio que o Chainer tem que fazer:
  • examples/vulnerable_app — um app Flask multi-arquivo com três findings “low”, dois dos quais combinam em uma chain crítica de escalada de privilégio entre arquivos. Testa se o Chainer é seletivo sobre o que combina.
  • examples/url_preview — um fetcher de URL multi-arquivo com uma allowlist defensiva que não se aplica por iteração. Testa raciocínio de fluxo de dados cross-file combinado com topologia de deployment (IPs link-local são gateways de credenciais cloud).
  • examples/csv_query — um filtro CSV single-file com uma sandbox de eval que escapa via __class__.__base__.__subclasses__(). Testa raciocínio em nível de linguagem em vez de fluxo HTTP.
  • examples/webhook_handler — um verificador HMAC single-file com uma vulnerabilidade diferencial de parser JSON. Testa raciocínio entre múltiplas especificações.
Experimente-os com:
uv run venice-security-reviewer scan examples/vulnerable_app
uv run venice-security-reviewer scan examples/csv_query
Se você ver o CLI registrar chainer referenced N unknown finding id(s) or file(s); chains dropped, esse é o validador de referência cruzada pegando o modelo no ato de inventar uma chain. As chains descartadas nunca chegam ao relatório; você só recebe um aviso que pode usar para ajustar o prompt ou amostrar execuções adicionais do Chainer.

Estendendo este exemplo

A forma de dois agentes generaliza bem. Algumas direções que vale explorar:
  • Mais linguagens. O Scanner é agnóstico de linguagem no nível do prompt; o construtor AST é o que é específico de Python. Troque por tree-sitter e você pode construir as mesmas formas vizinhança/mapa condensado para TypeScript, Go ou Rust.
  • Um terceiro agente para correções. Quando você tem uma chain, pedir a um agente Patcher para esboçar um diff unificado que neutraliza um dos findings constituintes é um passo pequeno. Validar o diff via Pydantic contra o mesmo conjunto de arquivos de evidência e você obtém a mesma proteção contra alucinação de graça.
  • Formatos de saída. render_report é o único lugar que sabe de Markdown. Adicione um renderizador SARIF e os mesmos findings podem cair no code scanning do GitHub. Adicione um renderizador JSON e você pode encaminhar resultados para um sistema downstream.
  • Cache por hash de arquivo. As chamadas por arquivo do Scanner são independentes e idempotentes. Caching por (file_hash, prompt_hash, model) significa que rescanear uma repo onde um arquivo mudou só re-executa o Scanner naquele arquivo.
  • Sampling para o Chainer. Para execuções de alto risco, chame o Chainer N vezes com temperatura ligeiramente mais alta e cruze os resultados. Chains que o modelo encontra consistentemente são mais prováveis de serem reais; chains que ele encontra uma vez e nunca mais provavelmente são ruído.
  • Modelos mais fortes. zai-org-glm-5 é o padrão porque atinge um bom equilíbrio entre custo e qualidade para raciocínio combinatório, mas para bases de código mais difíceis trocar por um modelo Venice mais forte (definido via VENICE_MODEL) pode tornar as narrativas do Chainer perceptivelmente mais apertadas.

Encerrando

Obrigado por ler! Espero que isso tenha ajudado você a entender como estruturar uma ferramenta de segurança de IA que é de fato confiável. O padrão que usamos aqui generaliza para além da segurança também: sempre que você quiser que um LLM raciocine entre arquivos de uma forma que precisa se ancorar em evidência real, a receita é a mesma. Construa um mapa estrutural determinístico, entregue ao modelo uma fatia dele que cabe no contexto, valide as referências do modelo de volta contra a estrutura e descarte qualquer coisa que ele não consiga ancorar. Usando Python com a API Venice AI, podemos construir agentes que combinam raciocínio LLM com limites duros de validação e entregar algo que dá uma resposta útil em vez de uma que apenas soa confiante.