跳转到主要内容
大多数静态安全工具都是孤立地发现 bug。它们扫描一个文件、列出问题,然后继续。问题在于现代代码库中最具破坏性的漏洞很少是单一的 bug。它们是一条链:硬编码的签名密钥加上缺失的授权检查加上 SQL 注入,单独来看似乎都可以处理。但合在一起就是一条账户接管路径。 这正是 LLM 擅长的跨切面推理,只要您给它正确的结构。在本文中,我们将使用 Python 和 Venice AI API 构建一个双 agent 安全代码审查器。最后,您将拥有一个 CLI,您可以将其指向任何 Python 代码库,以生成包含原子发现和漏洞利用链的 Markdown 报告。 对完整代码实现感兴趣?请查看 GitHub 仓库 在继续之前,您需要一个 Venice API 密钥。将其导出为环境变量:
export VENICE_API_KEY=<my-key>

我们要构建什么

审查器是一个小型 Python 项目,包含几个明确的部分:
部分功能
Pydantic 模型定义 EvidenceFindingChain,并在 LLM 和程序其余部分之间提供硬验证边界
Venice 客户端包装指向 Venice OpenAI 兼容端点的 OpenAI Python SDK
AST 仓库映射使用 Python 的 ast 模块遍历目标树,并构建每个模块的公共符号和导入边的确定性映射
扫描器 agent一次读取一个 Python 文件加上仓库映射的每文件邻域切片,并发出带有 file:line 证据的原子漏洞发现
链接器 agent读取发现的并集加上压缩的完整仓库映射,并发出组合两个或更多发现的漏洞利用链
引用验证器删除任何引用了扫描器未生成的发现 ID 或命名了其引用发现实际上都没有来自的文件的链
Markdown 报告将发现和链渲染为人类可读的报告
CLI使用 Typer 将所有内容连接在一起
流程如下:
  1. 遍历目标目录以查找 .py 文件。
  2. 构建确定性仓库映射(导入、公共符号、签名)。
  3. 对于每个文件,向扫描器发送其源代码加上映射的每文件邻域切片,并收集原子发现。
  4. 将发现的并集加上压缩的仓库映射发送给链接器,并收集漏洞利用链。
  5. 删除任何引用了扫描器未生成的发现 ID 的链,或命名了其引用发现实际上都没有来自的文件的链。
  6. 编写 Markdown 报告。
在开始编写代码之前,有两个设计决策值得标记。 第一个是为什么用两个 agent 而不是一个。一个尝试在一个 prompt 中完成所有工作的单 agent 扫描器必须在彻底处理每个文件的 bug 和聪明地进行组合推理之间取得平衡。拆分工作意味着扫描器可以毫不留情地嘈杂,而链接器可以选择性地安静。增加一个专门用于组合发现的额外 LLM 调用以非常少的额外代码解锁了整个类别的 bug。 第二个是为什么需要仓库映射。真实的代码库存在于许多文件中。一个 bug,比如”验证器运行但没有在 fetcher 中按迭代应用,而 fetcher 的响应最终进入渲染器”,对于按文件扫描器是不可见的。在任何 LLM 调用之前,我们使用 Python 的 ast 遍历目标树并构建结构映射。扫描器看到每个文件的邻域(谁从此文件导入、此文件导入什么、那些外部符号的签名)。链接器看到一个压缩的完整映射(每个模块、每个公共符号、每个导入边,无源代码)。这是我们发现的最少量的上下文工程,使链接器能够构造数据流跨越模块边界的链,而无需为将整个代码库塞入每个 prompt 而付出 token 成本。

先决条件

  • Python 3.12+
  • 来自 venice.ai 的 Venice API 密钥
  • 对 Pydantic、Python 的 ast 模块和 OpenAI Python SDK 的基本熟悉
参考仓库使用 uv 进行依赖管理,但常规虚拟环境同样可以工作。

设置项目

创建一个新项目并安装依赖项:
mkdir venice-security-reviewer
cd venice-security-reviewer
uv init
uv add "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0"
如果您更喜欢 pip,请创建虚拟环境:
python -m venv .venv
source .venv/bin/activate
pip install "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0"
为本地开发创建 .env 文件:
VENICE_API_KEY=your-venice-api-key-here
# Optional overrides
# VENICE_BASE_URL=https://api.venice.ai/api/v1
# VENICE_MODEL=zai-org-glm-5
我们将源代码放在 src/venice_security_reviewer/ 下以保持其作为包可导入,prompts 放在仓库根目录的 prompts/ 下,以便它们可以像任何其他源工件一样进行审查和比较:
src/venice_security_reviewer/
  __init__.py
  models.py     # Pydantic models
  client.py     # Venice client factory
  repo_map.py   # AST-built repo map
  scanner.py    # Scanner agent
  chainer.py    # Chainer agent
  report.py     # Jinja2 Markdown rendering
  cli.py        # Typer CLI
  templates/
    report.md.j2
prompts/
  scanner.md
  chainer.md
tests/
  test_models.py
  test_cross_file_chain.py

设置 Venice 客户端

Venice 与 OpenAI 兼容,因此我们可以使用官方 OpenAI Python SDK 并将其 base_url 指向 Venice。将客户端构建集中在一个文件中意味着代码的其余部分不需要知道它在与哪个提供商通信:交换后端只会影响这一个模块。 创建 src/venice_security_reviewer/client.py
from __future__ import annotations

import os
from dataclasses import dataclass

from dotenv import load_dotenv
from openai import OpenAI

DEFAULT_BASE_URL = "https://api.venice.ai/api/v1"
DEFAULT_MODEL = "zai-org-glm-5"


class VeniceConfigError(RuntimeError):
    """Raised when Venice client config is missing or invalid."""


@dataclass(frozen=True, slots=True)
class VeniceConfig:
    api_key: str
    base_url: str
    model: str

    @classmethod
    def from_env(cls) -> "VeniceConfig":
        load_dotenv()
        api_key = os.getenv("VENICE_API_KEY")
        if not api_key:
            raise VeniceConfigError(
                "VENICE_API_KEY is not set. Add it to your .env file, "
                "or export VENICE_API_KEY in your shell."
            )
        return cls(
            api_key=api_key,
            base_url=os.getenv("VENICE_BASE_URL", DEFAULT_BASE_URL),
            model=os.getenv("VENICE_MODEL", DEFAULT_MODEL),
        )


def build_client(config: VeniceConfig | None = None) -> tuple[OpenAI, str]:
    cfg = config or VeniceConfig.from_env()
    client = OpenAI(api_key=cfg.api_key, base_url=cfg.base_url)
    return client, cfg.model
有几点值得注意:
  • 我们默认使用 zai-org-glm-5,因为它是一个强大的通用 Venice 模型,但您可以使用 VENICE_MODEL 环境变量覆盖它。对于更大或更微妙的代码库,换用更强的模型可以使链接器在叙述质量上明显更好。
  • build_client 返回客户端模型 ID,因此调用者不必自己读取环境变量,测试可以注入假配置而无需 monkeypatching。

定义数据模型

在这里使用 Pydantic 而不是传递原始字典的全部意义在于,我们在 LLM 和程序的其余部分之间获得了一个硬验证边界。如果模型返回格式错误的 JSON 或发明了一个不存在的发现 ID,解析会大声失败,我们永远不会将幻觉传播到报告中。 创建 src/venice_security_reviewer/models.py
from __future__ import annotations

from pathlib import Path
from typing import Literal, Self

from pydantic import BaseModel, ConfigDict, Field, model_validator

Severity = Literal["low", "medium", "high", "critical"]
ChainSeverity = Literal["high", "critical"]


class Evidence(BaseModel):
    """A concrete code span that justifies a finding."""

    model_config = ConfigDict(frozen=True)

    file: Path
    start_line: int = Field(ge=1)
    end_line: int = Field(ge=1)
    snippet: str

    @model_validator(mode="after")
    def _check_line_range(self) -> Self:
        if self.end_line < self.start_line:
            raise ValueError(
                f"end_line ({self.end_line}) must be >= start_line ({self.start_line})"
            )
        return self


class Finding(BaseModel):
    """An atomic vulnerability surfaced by the Scanner agent."""

    model_config = ConfigDict(frozen=True)

    id: str = Field(pattern=r"^F-\d{3,}$")
    title: str = Field(min_length=1)
    severity: Severity
    description: str = Field(min_length=1)
    cwe: str | None = None
    evidence: Evidence


class Chain(BaseModel):
    """An exploit chain combining two or more atomic findings."""

    model_config = ConfigDict(frozen=True)

    id: str = Field(pattern=r"^C-\d{3,}$")
    findings: list[str] = Field(min_length=2)
    narrative: str = Field(min_length=1)
    severity: ChainSeverity
    files_involved: list[Path] = Field(min_length=1)
这些约束在这里发挥着真正的作用:
  • Finding.idChain.id 被约束为像 F-001C-001 这样的正则表达式。如果模型对格式发挥创意,验证就会失败。
  • Chain.findings 需要至少两个条目:一个发现的”链”只是一个发现。
  • Chain.severity 被限制为 highcritical。不会将影响提高到最高单个严重程度之上的发现组合不是值得报告的链。
  • Evidence 强制 end_line >= start_line,因此模型不能返回无意义的行范围。
这是形状验证。我们还需要交叉引用验证:引用了扫描器从未生成的发现 ID 的链是无意义的。将此函数添加到 models.py
def validate_chain_references(
    chains: list[Chain], findings: list[Finding]
) -> tuple[list[Chain], list[Chain]]:
    findings_by_id = {f.id: f for f in findings}
    valid: list[Chain] = []
    dropped: list[Chain] = []
    for chain in chains:
        if not all(ref in findings_by_id for ref in chain.findings):
            dropped.append(chain)
            continue
        chain_evidence_files = {
            findings_by_id[ref].evidence.file.as_posix() for ref in chain.findings
        }
        if not all(p.as_posix() in chain_evidence_files for p in chain.files_involved):
            dropped.append(chain)
            continue
        valid.append(chain)
    return valid, dropped
这是保持链接器诚实的确定性护栏。它只能引用扫描器实际生成的发现,并且它只能声称涉及链中的文件,这些文件之一来自这些发现之一。返回丢弃的链而不是悄悄过滤它们让 CLI 在模型尝试发明某些东西时显示警告。

构建 AST 仓库映射

仓库映射是 Python 代码库的结构骨架:每个模块的公共表面、每个导入边,以及从”模块 M”到”从 M 导入的模块”的反向索引。它使用 Python 的 ast 在每次扫描运行中构建一次,从不通过执行构建,因此在敌对代码上运行是安全的:解析器不会从被扫描的树中导入或调用任何东西。 我们将以两种形状消费映射。扫描器获得每个文件的邻域切片,因此其 prompt 大小保持有限。链接器获得一个压缩的完整映射,因此它可以跨文件构造链。 创建 src/venice_security_reviewer/repo_map.py 并从描述映射的 Pydantic 模型开始:
from __future__ import annotations

import ast
import logging
from collections.abc import Iterable
from pathlib import Path
from typing import Literal

from pydantic import BaseModel, ConfigDict, Field

logger = logging.getLogger(__name__)

SymbolKind = Literal["function", "class", "constant"]
_SIGNATURE_CHAR_CAP = 200

SKIP_DIR_NAMES: frozenset[str] = frozenset({
    ".git", ".venv", "venv", "env", "__pycache__", "node_modules",
    "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "site-packages",
})


class SymbolDef(BaseModel):
    model_config = ConfigDict(frozen=True)
    name: str
    kind: SymbolKind
    line: int = Field(ge=1)
    signature: str | None = None


class ImportEdge(BaseModel):
    model_config = ConfigDict(frozen=True)
    from_module: str
    imported_names: list[str]
    line: int = Field(ge=1)


class ModuleEntry(BaseModel):
    model_config = ConfigDict(frozen=True)
    path: Path
    module_name: str
    defines: list[SymbolDef]
    imports: list[ImportEdge]
    exports: list[str]
现在是遍历树并跳过我们不应该索引的目录的辅助函数:
def _iter_python_files(root: Path) -> Iterable[Path]:
    for path in sorted(root.rglob("*.py")):
        if any(part in SKIP_DIR_NAMES for part in path.parts):
            continue
        if path.is_file():
            yield path


def _path_to_module_name(path: Path, root: Path) -> str:
    rel = path.relative_to(root)
    parts = list(rel.with_suffix("").parts)
    if parts and parts[-1] == "__init__":
        parts = parts[:-1]
    return ".".join(parts)
对于每个文件,我们希望从 AST 中获得三样东西:它定义的顶级符号、导入边以及(如果存在)显式 __all__ 列表。函数签名和类标头被渲染为 LLM 可以直接读取的紧凑字符串:
def _render_signature(node: ast.FunctionDef | ast.AsyncFunctionDef) -> str:
    try:
        prefix = "async def " if isinstance(node, ast.AsyncFunctionDef) else "def "
        args = ast.unparse(node.args)
        returns = f" -> {ast.unparse(node.returns)}" if node.returns is not None else ""
        sig = f"{prefix}{node.name}({args}){returns}"
        if len(sig) > _SIGNATURE_CHAR_CAP:
            return f"{prefix}{node.name}(...)"
        return sig
    except Exception:
        return f"def {node.name}(...)"


def _render_class_header(node: ast.ClassDef) -> str:
    try:
        bases = [ast.unparse(b) for b in node.bases]
        sig = f"class {node.name}({', '.join(bases)})" if bases else f"class {node.name}"
        if len(sig) > _SIGNATURE_CHAR_CAP:
            return f"class {node.name}(...)"
        return sig
    except Exception:
        return f"class {node.name}"
200 的 _SIGNATURE_CHAR_CAP 保留了典型的真实签名(包括类型提示),同时防止像 200 行类型联合那样的病态情况炸毁 prompt。 接下来是提取器,它从解析的模块中提取结构数据。我们处理 ast.FunctionDefast.ClassDef、顶级 ast.Assignast.AnnAssign(用于常量),以及 ast.Importast.ImportFrom(用于导入边)。相对导入被解析为它们的绝对点分形式,以便链接器稍后可以将它们与模块名匹配:
def _resolve_relative_package(
    *, importer_module: str, importer_is_init: bool, level: int
) -> str | None:
    if level <= 0:
        return None
    importer_parts = importer_module.split(".") if importer_module else []
    base_parts = list(importer_parts) if importer_is_init else importer_parts[:-1]
    steps_up = level - 1
    if steps_up > len(base_parts):
        return None
    package_parts = (
        base_parts[: len(base_parts) - steps_up] if steps_up else list(base_parts)
    )
    return ".".join(package_parts)
完整的提取逻辑遍历 tree.body 并为每个顶级节点发出 SymbolDefImportEdge 条目。参考仓库的 repo_map.py 中的 _extract 函数涵盖了完整的实现。出来的形状是 ModuleEntry 对象的列表,每个文件一个。 有趣的部分是我们对这些条目所做的事情。将它们包装在一个 RepoMap 中,带有两个面向消费者的方法:
class RepoMap(BaseModel):
    model_config = ConfigDict(frozen=True)
    root: Path
    modules: list[ModuleEntry]

    def by_module_name(self, module_name: str) -> ModuleEntry | None:
        for m in self.modules:
            if m.module_name == module_name:
                return m
        return None

    def importers_of(self, module_name: str) -> list["ImportingRef"]:
        refs: list["ImportingRef"] = []
        for m in self.modules:
            for edge in m.imports:
                if edge.from_module == module_name:
                    refs.append(
                        ImportingRef(
                            importer_path=m.path,
                            importer_module=m.module_name,
                            imported_names=list(edge.imported_names),
                            line=edge.line,
                        )
                    )
        return refs

    def neighborhood(self, path: Path) -> "ModuleNeighborhood | None":
        m = next((mod for mod in self.modules if mod.path == path), None)
        if m is None:
            return None
        return ModuleNeighborhood(
            this_module=m,
            imported_by=self.importers_of(m.module_name),
            imports_from_repo=self.resolve_imports_in_repo(m.module_name),
        )

    def condensed_dict(self) -> dict[str, object]:
        return {
            "modules": [
                {
                    "path": str(m.path),
                    "module": m.module_name,
                    "exports": list(m.exports),
                    "imports": [
                        {"from": e.from_module, "names": list(e.imported_names)}
                        for e in m.imports
                    ],
                }
                for m in self.modules
            ]
        }
neighborhood(path) 是扫描器为每个文件调用的内容。它返回一个 ModuleNeighborhood 对象,包含模块本身、从中导入的每个其他模块以及它从其他地方导入的每个仓库内符号(带有它们解析后的签名)。这为扫描器提供了足够的上下文来标记仅在跨文件上下文中明显的发现,而无需将整个代码库拖入 prompt。 condensed_dict() 是链接器获得的内容。代码片段和签名被丢弃;只剩下路径、模块名、公共导出和导入边。这是仍然让链接器能够推理跨模块数据流的最小表示。 最后,构建整个内容的入口点:
def build_repo_map(root: Path) -> RepoMap:
    root = root.resolve()
    modules: list[ModuleEntry] = []
    for path in _iter_python_files(root):
        rel = path.relative_to(root)
        module_name = _path_to_module_name(path, root)
        is_init = path.stem == "__init__"
        try:
            source = path.read_text(encoding="utf-8")
            tree = ast.parse(source)
        except (OSError, SyntaxError, UnicodeDecodeError) as exc:
            logger.warning("repo_map: skipping %s: %s", rel, exc)
            continue
        defines, imports, explicit_all = _extract(
            tree, importer_module=module_name, importer_is_init=is_init
        )
        exports = explicit_all or [s.name for s in defines if not s.name.startswith("_")]
        modules.append(
            ModuleEntry(
                path=rel,
                module_name=module_name,
                defines=defines,
                imports=imports,
                exports=exports,
            )
        )
    return RepoMap(root=root, modules=modules)
我们无法读取或解析失败的文件会被记录并跳过。我们返回部分映射而不是失败整个运行;最坏的情况是扫描器调用对一个文件看不到邻域,这仍然是一个工作的扫描。

编写扫描器 Agent

扫描器遍历目标路径、获取 Python 源文件,并要求 Venice 一次识别一个文件的原子漏洞。按文件扫描保持 prompt 小并使故障隔离:一个坏文件不会杀死整个运行。 我们将 prompt 本身保存在单独的文件中,以便它可以像任何其他源工件一样进行审查和比较。创建 prompts/scanner.md
You are a static security analyst reviewing a single Python source file for
vulnerabilities. You will be given the file path, its full contents, and a
*neighborhood* slice of the surrounding repo: which other modules import
from this file (and what symbols they pull), and which in-repo symbols this
file imports from elsewhere. You must respond with a JSON object that lists
every distinct vulnerability you can identify, with concrete file:line
evidence for each.

# Rules

1. Output a single JSON object. No prose before or after. No markdown fences.
2. The object must match this schema exactly:

```json
{
  "findings": [
    {
      "id": "F-001",
      "title": "Short imperative title, e.g. 'Hardcoded session signing key'",
      "severity": "low | medium | high | critical",
      "description": "One to three sentences explaining the vulnerability and why it matters.",
      "cwe": "CWE-798 or null if not applicable",
      "evidence": {
        "file": "{filename}",
        "start_line": 12,
        "end_line": 14,
        "snippet": "the exact lines from the source, copied verbatim including whitespace"
      }
    }
  ]
}
```

3. Finding IDs must be sequential within this file: F-001, F-002, F-003, etc.
4. The `file` field in evidence must equal the filename you were given, exactly.
5. `start_line` and `end_line` must be 1-indexed line numbers from the source you were given.
6. The `snippet` must be the exact text of those lines, copied verbatim. Do not paraphrase. Do not truncate.
7. Do not invent vulnerabilities. If you are unsure, omit it. False positives waste the operator's time and erode trust in the tool.
8. Every finding's evidence must point at lines in THIS file. Do not produce findings whose evidence lives in a different file. The Chainer is the agent that reasons across files.
9. If the file contains no vulnerabilities, return `{"findings": []}`.
参考仓库 中的完整 prompt 还包含一个”What to look for”部分,列出常见的漏洞类别(硬编码密钥、SQL 注入、命令注入、SSRF、不安全的反序列化等)和一个”How to use the neighborhood”部分,说明模型应如何使用跨文件上下文。 一些 prompt 设计说明:
  • 我们告诉模型只发出 JSON,不带散文或栅栏。OpenAI SDK 支持一个 response_format={"type": "json_object"} 参数在 API 端强制执行此项,但在 prompt 中重申会减少边缘情况。
  • 我们明确禁止扫描器生成跨文件链。链是链接器的工作,要求扫描器同时执行两者会模糊责任。
  • 我们要求逐字复制片段。这意味着报告可以引用模型声称看到的确切字节,审查员可以通过将片段与源进行比较来抽查发现。
现在是 agent 代码。创建 src/venice_security_reviewer/scanner.py 并从文件遍历器和 prompt 加载器开始:
from __future__ import annotations

import json
import logging
from collections.abc import Iterable, Iterator
from pathlib import Path

from openai import OpenAI
from pydantic import ValidationError

from .models import Finding
from .repo_map import ModuleNeighborhood, RepoMap

logger = logging.getLogger(__name__)

DEFAULT_SOURCE_EXTENSIONS: frozenset[str] = frozenset({".py"})

SKIP_DIR_NAMES: frozenset[str] = frozenset({
    ".git", ".venv", "venv", "env", "__pycache__", "node_modules",
    "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "site-packages",
})

MAX_FILE_BYTES = 200_000


def _load_prompt_template(name: str) -> str:
    here = Path(__file__).resolve()
    return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8")


def iter_source_files(
    root: Path, extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS
) -> Iterator[Path]:
    exts = {e.lower() for e in extensions}
    for path in sorted(root.rglob("*")):
        if not path.is_file():
            continue
        if path.suffix.lower() not in exts:
            continue
        if any(part in SKIP_DIR_NAMES for part in path.parts):
            continue
        try:
            if path.stat().st_size > MAX_FILE_BYTES:
                logger.warning("skipping %s: exceeds %d bytes", path, MAX_FILE_BYTES)
                continue
        except OSError:
            continue
        yield path
MAX_FILE_BYTES 是一个安全上限。超过 ~200 KB 时,我们跳过而不是发送一个可能既昂贵又质量低的巨大 prompt。 下一部分是 prompt 构建器。模板使用 {filename}{source}{neighborhood} 作为占位符;我们使用 str.replace 而不是 .format(),因为模板包含带文字大括号的 JSON 示例:
def _render_neighborhood(neighborhood: ModuleNeighborhood | None) -> str:
    if neighborhood is None:
        return "null"
    return neighborhood.model_dump_json(indent=2)


def _build_prompt(
    template: str, *, filename: str, source: str, neighborhood: ModuleNeighborhood | None
) -> str:
    return (
        template.replace("{filename}", filename)
        .replace("{source}", source)
        .replace("{neighborhood}", _render_neighborhood(neighborhood))
    )
现在是解析器。我们反序列化 JSON、通过 Pydantic 验证每个发现,并丢弃单个格式错误的发现,而不是使整个文件失败。一个坏发现不应让我们失去好的发现:
def _parse_findings(raw: str, *, source_file: Path) -> list[Finding]:
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError(f"model did not return valid JSON: {exc}") from exc

    if not isinstance(data, dict) or "findings" not in data:
        raise ValueError("model JSON missing 'findings' key")

    findings: list[Finding] = []
    for entry in data["findings"]:
        try:
            findings.append(Finding.model_validate(entry))
        except ValidationError as exc:
            logger.warning("dropping malformed finding from %s: %s", source_file, exc)
    return findings
扫描器每个文件发出像 F-001 的 ID,但链接器需要在整个仓库中引用发现。我们针对单调计数器重新发布 ID,因此它们是全局唯一的:
def _renumber_findings(findings: list[Finding], offset: int) -> tuple[list[Finding], int]:
    renumbered: list[Finding] = []
    for i, f in enumerate(findings):
        new_id = f"F-{offset + i + 1:03d}"
        renumbered.append(f.model_copy(update={"id": new_id}))
    return renumbered, offset + len(findings)
单文件扫描调用结合了所有这些。我们读取文件、构建 prompt、用 response_format={"type": "json_object"} 和低温度将其发送到 Venice,并解析结果:
def scan_file(
    client: OpenAI,
    model: str,
    path: Path,
    *,
    prompt_template: str,
    repo_root: Path,
    repo_map: RepoMap,
    max_retries: int = 1,
) -> list[Finding]:
    try:
        source = path.read_text(encoding="utf-8")
    except (OSError, UnicodeDecodeError) as exc:
        logger.warning("could not read %s: %s", path, exc)
        return []

    rel = path.relative_to(repo_root)
    neighborhood = repo_map.neighborhood(rel)
    prompt = _build_prompt(
        prompt_template, filename=str(rel), source=source, neighborhood=neighborhood
    )

    last_error: Exception | None = None
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You are a precise static security analyst. You respond "
                            "only with valid JSON matching the schema in the user prompt."
                        ),
                    },
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.1,
            )
        except Exception as exc:
            logger.warning("Venice call failed for %s on attempt %d: %s", rel, attempt, exc)
            last_error = exc
            continue

        content = response.choices[0].message.content or ""
        try:
            findings = _parse_findings(content, source_file=path)
        except ValueError as exc:
            logger.warning("parse failure for %s on attempt %d: %s", rel, attempt, exc)
            last_error = exc
            continue

        return [
            f.model_copy(update={"evidence": f.evidence.model_copy(update={"file": rel})})
            for f in findings
        ]

    logger.error("giving up on %s after %d attempts: %s", rel, max_retries + 1, last_error)
    return []
值得强调的两个细节:
  • 我们在解析之后将证据文件路径修补为相对于 repo_root 的路径,因为模型会回显我们给它的任何文件名,但我们希望整个报告中有一个规范形式。
  • temperature=0.1 故意低。我们希望扫描器在运行之间保持保守和一致;创造力是链接器的工作。
最后,扫描根目录下每个符合条件文件的协调器:
def scan_path(
    client: OpenAI,
    model: str,
    root: Path,
    repo_map: RepoMap,
    *,
    extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS,
) -> list[Finding]:
    template = _load_prompt_template("scanner.md")
    all_findings: list[Finding] = []
    offset = 0
    for path in iter_source_files(root, extensions=extensions):
        logger.info("scanning %s", path.relative_to(root))
        findings = scan_file(
            client, model, path,
            prompt_template=template,
            repo_root=root,
            repo_map=repo_map,
        )
        renumbered, offset = _renumber_findings(findings, offset)
        all_findings.extend(renumbered)
    return all_findings
仓库映射由调用者构建一次,并为每个文件复用,因此即使单个文件解析失败或被跳过,扫描器也能看到一致的全局结构。

编写链接器 Agent

链接器接收扫描器发现的并集加上压缩的仓库映射,并询问 Venice 任何发现是否组合成真实的漏洞利用链。两个确定性护栏位于 LLM 和报告之间:
  1. 每个链必须只引用扫描器生成的发现 ID。
  2. 每个链必须只声明至少一个引用发现的证据涉及的文件。
违反任一规则的链在解析时被丢弃。这阻止模型”以防万一”幻想链,并阻止它声称链跨越其没有证据的文件。 链接器 prompt 位于 prompts/chainer.md。它的核心如下:
You are a senior offensive security engineer. You are given a list of atomic
vulnerability findings discovered in a single codebase, plus a structural map
of that codebase showing every module's public symbols and import edges. Your
job is to identify whether any subset of the findings can be combined into a
real, end-to-end exploit chain.

# Rules

1. Output a single JSON object. No prose before or after. No markdown fences.
2. The object must match this schema exactly:

```json
{
  "chains": [
    {
      "id": "C-001",
      "findings": ["F-001", "F-003"],
      "narrative": "Step-by-step explanation of how an attacker combines these specific findings into a single exploit. Reference each finding by ID where it is used.",
      "severity": "high | critical",
      "files_involved": ["pkg/validators.py", "pkg/fetcher.py"]
    }
  ]
}
```

3. Chain IDs must be sequential: C-001, C-002, C-003, etc.
4. Every entry in `findings` MUST be the ID of a finding from the input list. You may NOT invent new finding IDs.
5. Every entry in `files_involved` MUST be the `evidence.file` of at least one of the findings you reference in this chain.
6. A chain must reference at least two distinct findings.
7. Chains are by definition severity high or critical. If a combination doesn't raise the impact above the highest individual severity, it is not a chain worth reporting.
8. If no real chain exists, return `{"chains": []}`. It is correct and expected for many codebases to have findings that do not chain.
参考仓库 中的完整 prompt 还解释了如何读取仓库映射、如何决定 files_involved 中包含什么,以及关键的何时链接。告诉模型”许多代码库的发现不会链接是正确且预期的”阻止它发明链以看起来富有成效。 现在是 agent 代码。创建 src/venice_security_reviewer/chainer.py
from __future__ import annotations

import json
import logging
from pathlib import Path

from openai import OpenAI
from pydantic import ValidationError

from .models import Chain, Finding, validate_chain_references
from .repo_map import RepoMap

logger = logging.getLogger(__name__)

MAX_REPO_MAP_CHARS = 8000


def _load_prompt_template(name: str) -> str:
    here = Path(__file__).resolve()
    return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8")
MAX_REPO_MAP_CHARS = 8000 是链接器 prompt 中 JSON 渲染的仓库映射块的软上限。按每个 token 大约 4 个字符计算,这是约 2000 个 token,即使在发现和叙述预算之上,也舒适地适合任何 Venice 模型的上下文窗口。 我们将发现序列化为紧凑的 JSON 块。注意我们在这里故意从证据中剥离 snippet:链接器不需要原始字节来决定两个发现是否结合,包含它们大约会使真实代码库的 token 成本加倍:
def _findings_to_input_json(findings: list[Finding]) -> str:
    payload = [
        {
            "id": f.id,
            "title": f.title,
            "severity": f.severity,
            "description": f.description,
            "cwe": f.cwe,
            "evidence": {
                "file": str(f.evidence.file),
                "start_line": f.evidence.start_line,
                "end_line": f.evidence.end_line,
            },
        }
        for f in findings
    ]
    return json.dumps(payload, indent=2)
对于较大的代码库,完整的压缩仓库映射可能超过我们的字符预算。当发生这种情况时,我们修剪为带发现的模块加上它们的直接邻居。这保留了足够的结构供链接器推理我们有证据的链,并丢弃其余部分:
def _prune_for_budget(
    repo_map: RepoMap, findings: list[Finding], *, char_budget: int
) -> dict[str, object]:
    full = repo_map.condensed_dict()
    if len(json.dumps(full)) <= char_budget:
        return full

    finding_files = {f.evidence.file for f in findings}
    keep_modules = {
        m.module_name for m in repo_map.modules if m.path in finding_files
    }
    if not keep_modules:
        return full

    neighbours: set[str] = set()
    for m in repo_map.modules:
        if m.module_name in keep_modules:
            for edge in m.imports:
                neighbours.add(edge.from_module)
        for edge in m.imports:
            if edge.from_module in keep_modules:
                neighbours.add(m.module_name)
    keep_modules.update(neighbours)

    pruned_modules = [
        {
            "path": str(m.path),
            "module": m.module_name,
            "exports": list(m.exports),
            "imports": [
                {"from": e.from_module, "names": list(e.imported_names)}
                for e in m.imports
            ],
        }
        for m in repo_map.modules
        if m.module_name in keep_modules
    ]
    return {
        "modules": pruned_modules,
        "_pruned": True,
        "_kept": len(pruned_modules),
        "_total": len(repo_map.modules),
    }


def _render_repo_map(
    repo_map: RepoMap, findings: list[Finding], *, char_budget: int = MAX_REPO_MAP_CHARS
) -> str:
    payload = _prune_for_budget(repo_map, findings, char_budget=char_budget)
    if payload.get("_pruned"):
        logger.info(
            "chainer: repo map pruned for token budget (kept %s of %s modules)",
            payload.get("_kept"),
            payload.get("_total"),
        )
    return json.dumps(payload, indent=2)
修剪策略故意简单:保留我们发现所在的模块,并保留它们的直接导入图邻居。任何更远的东西在我们当前有证据的链中没有合理的作用,因此可以丢弃而不损失推理能力。我们还用 _pruned_kept_total 标记注释 payload,以便当映射被修剪时,链接器 prompt 可以警告模型。 解析响应的形状与扫描器相同:反序列化、通过 Pydantic 验证每个链、丢弃格式错误的条目:
def _parse_chains(raw: str) -> list[Chain]:
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError(f"chainer did not return valid JSON: {exc}") from exc

    if not isinstance(data, dict) or "chains" not in data:
        raise ValueError("chainer JSON missing 'chains' key")

    chains: list[Chain] = []
    for entry in data["chains"]:
        try:
            chains.append(Chain.model_validate(entry))
        except ValidationError as exc:
            logger.warning("dropping malformed chain: %s", exc)
    return chains
然后是 agent 本身:
def find_chains(
    client: OpenAI,
    model: str,
    findings: list[Finding],
    repo_map: RepoMap,
    *,
    max_retries: int = 1,
) -> tuple[list[Chain], list[Chain]]:
    if len(findings) < 2:
        return [], []

    template = _load_prompt_template("chainer.md")
    prompt = template.replace(
        "{findings_json}", _findings_to_input_json(findings)
    ).replace("{repo_map}", _render_repo_map(repo_map, findings))

    last_error: Exception | None = None
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You are a senior offensive security engineer. You respond "
                            "only with valid JSON matching the schema in the user prompt."
                        ),
                    },
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.2,
            )
        except Exception as exc:
            logger.warning("Venice call failed on attempt %d: %s", attempt, exc)
            last_error = exc
            continue

        content = response.choices[0].message.content or ""
        try:
            chains = _parse_chains(content)
        except ValueError as exc:
            logger.warning("chainer parse failure on attempt %d: %s", attempt, exc)
            last_error = exc
            continue

        valid, dropped = validate_chain_references(chains, findings)
        if dropped:
            logger.warning(
                "chainer referenced %d unknown finding id(s) or file(s); chains dropped: %s",
                len(dropped),
                [c.id for c in dropped],
            )
        return valid, dropped

    logger.error("giving up on chainer after %d attempts: %s", max_retries + 1, last_error)
    return [], []
值得指出的几件事:
  • 当少于两个发现时,我们在调用模型之前退出。您无法链接单个发现,跳过调用意味着我们不会在保证为空的结果上消耗 token。
  • temperature=0.2 略高于扫描器的 0.1。链接器从稍多的创造力中受益以发现非显而易见的组合,但我们仍希望它扎根于给定的发现和映射。
  • 解析后,validate_chain_references 运行我们之前编写的确定性交叉引用检查。幸存下来的任何内容都可以安全地渲染;没有的内容被记录,因此操作员知道模型试图发明某些东西。
那个交叉引用检查是整个项目最重要的部分。它是”有用的安全工具”和”偶尔自信地错误的 AI 报告”之间的边界。有了它,即使模型幻想,错误的链也永远不会到达报告。

渲染 Markdown 报告

将渲染与 agent 逻辑分离意味着相同的 FindingChain 对象稍后可以被馈送到其他格式(JSON、SARIF、HTML),而无需触摸扫描器或链接器。 我们将使用 Jinja2 加一个小模板文件。创建 src/venice_security_reviewer/templates/report.md.j2
# Security Review Report

**Target:** `{{ target }}`
**Scanned at:** {{ scanned_at }}
**Model:** `{{ model }}`

---

## Summary

- **Atomic findings:** {{ findings | length }}
- **Exploit chains:** {{ chains | length }}
{%- if dropped_chains %}
- **Dropped chains (referenced unknown findings):** {{ dropped_chains | length }}
{%- endif %}

---

## Exploit Chains

{% if not chains %}
_No exploit chains were identified by the Chainer agent._
{% else %}
{% for c in chains %}
### {{ c.id }}{{ c.severity | upper }}

**Findings combined:** {{ c.findings | join(', ') }}
**Files involved:** {{ c.files_involved | map('string') | join(', ') }}

{{ c.narrative }}

{% endfor %}
{% endif %}

---

## Atomic Findings

{% for f in findings %}
### {{ f.id }}{{ f.title }}

- **Severity:** {{ f.severity }}
{%- if f.cwe %}
- **CWE:** {{ f.cwe }}
{%- endif %}
- **Location:** `{{ f.evidence.file }}:{{ f.evidence.start_line }}-{{ f.evidence.end_line }}`

{{ f.description }}

```
{{ f.evidence.snippet }}
```

{% endfor %}
然后是 src/venice_security_reviewer/report.py 中的渲染器:
from __future__ import annotations

from datetime import UTC, datetime
from pathlib import Path

from jinja2 import Environment, PackageLoader, select_autoescape

from .models import Chain, Finding


def _build_env() -> Environment:
    return Environment(
        loader=PackageLoader("venice_security_reviewer", "templates"),
        autoescape=select_autoescape(enabled_extensions=("html",)),
        keep_trailing_newline=True,
    )


def render_report(
    *,
    target: Path,
    model: str,
    findings: list[Finding],
    chains: list[Chain],
    dropped_chains: list[Chain] | None = None,
) -> str:
    env = _build_env()
    template = env.get_template("report.md.j2")
    return template.render(
        target=str(target),
        scanned_at=datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC"),
        model=model,
        findings=findings,
        chains=chains,
        dropped_chains=dropped_chains or [],
    )
Markdown 模板关闭了 autoescape(Markdown 不是 HTML),但我们通过扩展为任何未来的 .html 模板启用它。

连接 CLI

CLI 是协调器:构建仓库映射、扫描、链接、渲染。我们将使用 Typer 处理参数解析,使用 Rich 打印漂亮的摘要表。 创建 src/venice_security_reviewer/cli.py
from __future__ import annotations

import logging
import sys
from pathlib import Path
from typing import Annotated

import typer
from rich.console import Console
from rich.table import Table

from .chainer import find_chains
from .client import VeniceConfigError, build_client
from .models import Chain, Finding
from .repo_map import build_repo_map
from .report import render_report
from .scanner import scan_path

app = typer.Typer(
    add_completion=False,
    help="Two-agent security code reviewer powered by Venice AI.",
    no_args_is_help=True,
)
console = Console()


@app.callback()
def _root() -> None:
    """Force Typer to keep `scan` as a named subcommand."""


def _configure_logging(verbose: bool) -> None:
    logging.basicConfig(
        level=logging.DEBUG if verbose else logging.INFO,
        format="%(levelname)s %(name)s: %(message)s",
        stream=sys.stderr,
    )


def _print_summary(
    findings: list[Finding], chains: list[Chain], dropped: list[Chain]
) -> None:
    table = Table(title="Scan summary", show_header=True, header_style="bold")
    table.add_column("Metric")
    table.add_column("Count", justify="right")
    table.add_row("Atomic findings", str(len(findings)))
    table.add_row("Exploit chains", str(len(chains)))
    if dropped:
        table.add_row("Chains dropped (bad refs)", str(len(dropped)))
    console.print(table)


@app.command()
def scan(
    path: Annotated[
        Path,
        typer.Argument(
            exists=True, file_okay=False, dir_okay=True, readable=True, resolve_path=True,
            help="Path to the codebase to scan.",
        ),
    ],
    out: Annotated[
        Path, typer.Option("--out", "-o", help="Where to write the Markdown report.")
    ] = Path("report.md"),
    verbose: Annotated[
        bool, typer.Option("--verbose", "-v", help="Enable debug logging.")
    ] = False,
) -> None:
    """Scan a codebase for vulnerabilities and exploit chains."""
    _configure_logging(verbose)

    try:
        client, model = build_client()
    except VeniceConfigError as exc:
        console.print(f"[red]error:[/red] {exc}")
        raise typer.Exit(code=2) from exc

    console.print(f"[bold]Indexing[/bold] {path} (AST repo map)...")
    repo_map = build_repo_map(path)
    edge_count = sum(len(m.imports) for m in repo_map.modules)
    console.print(
        f"Repo map: [bold]{len(repo_map.modules)}[/bold] module(s), "
        f"[bold]{edge_count}[/bold] import edge(s)."
    )

    console.print(f"[bold]Scanning[/bold] {path} with model [cyan]{model}[/cyan]...")
    findings = scan_path(client, model, path, repo_map)
    console.print(f"Scanner produced [bold]{len(findings)}[/bold] finding(s).")

    console.print("[bold]Chaining[/bold] findings...")
    chains, dropped = find_chains(client, model, findings, repo_map)
    console.print(f"Chainer produced [bold]{len(chains)}[/bold] chain(s).")

    report = render_report(
        target=path, model=model,
        findings=findings, chains=chains, dropped_chains=dropped,
    )
    out.write_text(report, encoding="utf-8")
    console.print(f"Report written to [green]{out}[/green]")
    _print_summary(findings, chains, dropped)


def main() -> None:
    app()


if __name__ == "__main__":
    main()
将脚本入口点添加到 pyproject.toml
[project.scripts]
venice-security-reviewer = "venice_security_reviewer.cli:app"
这就是整个流水线的连接。

测试护栏

在整个构建过程中,我们一直紧紧依赖一个想法:确定性护栏是有用的安全工具与自信地错误的安全工具之间的分隔。这个声明只有在我们能证明护栏实际成立时才值得说,所以这个项目中最有价值的测试根本不调用 Venice。它们锁定 Pydantic 边界和 prompt 装配管道,这意味着它们在毫秒内离线运行,无需 API 密钥,无 token 成本。 首先添加开发依赖项:
uv add --dev "pytest>=8.3" "ruff>=0.7" "mypy>=1.13"
值得测试的第一件事是模型边界本身。这些测试断言格式错误的发现和链在构造时被拒绝,在它们到达报告之前。创建 tests/test_models.py
from __future__ import annotations

from pathlib import Path

import pytest
from pydantic import ValidationError

from venice_security_reviewer.models import (
    Chain,
    Evidence,
    Finding,
    validate_chain_references,
)


def _finding(fid: str) -> Finding:
    return Finding(
        id=fid,
        title="t",
        severity="medium",
        description="d",
        evidence=Evidence(file=Path("a.py"), start_line=1, end_line=2, snippet="x"),
    )


def test_evidence_rejects_inverted_line_range() -> None:
    with pytest.raises(ValidationError):
        Evidence(file=Path("a.py"), start_line=10, end_line=5, snippet="x")


def test_finding_id_pattern_enforced() -> None:
    with pytest.raises(ValidationError):
        Finding(
            id="not-an-id",
            title="t",
            severity="medium",
            description="d",
            evidence=Evidence(file=Path("a.py"), start_line=1, end_line=2, snippet="x"),
        )


def test_chain_requires_two_findings() -> None:
    with pytest.raises(ValidationError):
        Chain(
            id="C-001",
            findings=["F-001"],
            narrative="n",
            severity="high",
            files_involved=[Path("a.py")],
        )
每一个都反映了我们之前对模型施加的约束:颠倒的行范围、不匹配 F-### 模式的 ID 和单个发现的”链”。如果它们中的任何一个停止抛出异常,整个类别的幻觉将悄悄再次成为可能。 最重要的测试涵盖了交叉引用验证器,因为这是实际丢弃发明链的函数:
def test_validate_chain_references_drops_unknown_ids() -> None:
    findings = [_finding("F-001"), _finding("F-002")]
    good = Chain(
        id="C-001",
        findings=["F-001", "F-002"],
        narrative="n",
        severity="critical",
        files_involved=[Path("a.py")],
    )
    bad = Chain(
        id="C-002",
        findings=["F-001", "F-999"],
        narrative="n",
        severity="critical",
        files_involved=[Path("a.py")],
    )
    valid, dropped = validate_chain_references([good, bad], findings)
    assert [c.id for c in valid] == ["C-001"]
    assert [c.id for c in dropped] == ["C-002"]
F-999 从未由扫描器生成,因此引用它的链落在 dropped 中,永远不会到达报告。参考仓库中的伴随测试 test_validate_chain_references_drops_unknown_files 对声称没有发现来自的文件的链执行相同操作。 值得测试的第二件事是馈送链接器的管道。重构 prompt 装配并悄悄停止传递跨文件上下文很容易,此时链接器将继续工作,但悄悄变得更糟。此测试构建一个双模块固件、渲染 prompt 并断言跨文件信息确实存在,再次无需 Venice 往返。创建 tests/test_cross_file_chain.py
from __future__ import annotations

from pathlib import Path

from venice_security_reviewer.chainer import (
    _findings_to_input_json,
    _load_prompt_template,
    _render_repo_map,
)
from venice_security_reviewer.models import Evidence, Finding
from venice_security_reviewer.repo_map import build_repo_map


def _write(root: Path, rel: str, content: str) -> None:
    path = root / rel
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content, encoding="utf-8")


def test_chainer_prompt_carries_cross_file_context(tmp_path: Path) -> None:
    _write(tmp_path, "validators.py", "def is_safe_url(url: str) -> bool:\n    return True")
    _write(
        tmp_path,
        "fetcher.py",
        "from .validators import is_safe_url\n\ndef fetch(url: str) -> bytes:\n    return b''",
    )

    rmap = build_repo_map(tmp_path)
    findings = [
        Finding(
            id="F-001",
            title="Validator returns True unconditionally",
            severity="low",
            description="The validator always returns True.",
            evidence=Evidence(
                file=Path("validators.py"), start_line=1, end_line=2, snippet="..."
            ),
        ),
        Finding(
            id="F-002",
            title="Fetcher trusts a stub validator",
            severity="low",
            description="The fetcher gates network access on is_safe_url.",
            evidence=Evidence(
                file=Path("fetcher.py"), start_line=1, end_line=1, snippet="..."
            ),
        ),
    ]

    template = _load_prompt_template("chainer.md")
    prompt = template.replace(
        "{findings_json}", _findings_to_input_json(findings)
    ).replace("{repo_map}", _render_repo_map(rmap, findings))

    assert "{findings_json}" not in prompt and "{repo_map}" not in prompt
    assert "F-001" in prompt and "F-002" in prompt
    assert "validators.py" in prompt and "fetcher.py" in prompt
    assert "is_safe_url" in prompt
如果此测试通过,链接器被传递包含两个发现、两个文件路径以及它们之间的导入边的 prompt。模型是否很好地使用该信息是一个单独的带外评估;此测试仅保护首先将信息放入 prompt 的管道。 使用以下命令运行整个测试套件,加上 linter 和类型检查器:
uv run pytest          # offline tests, no live Venice calls
uv run ruff check .
uv run mypy src/
因为这些测试都不接触网络,所以它们在每次提交和 CI 中运行都很安全,无需消耗 token 或需要 Venice 密钥。参考仓库还包括 tests/test_scanner_parse.pytests/test_chainer_parse.pytests/test_repo_map.py,它们覆盖 JSON 解析边缘情况(格式错误的条目被丢弃而不是使运行崩溃)和 AST 仓库映射构建器。

运行项目

要在真实代码库上尝试它,将 CLI 指向 Python 源目录:
uv run venice-security-reviewer scan path/to/your/code
或使用 pip install -e . 安装到您的虚拟环境,并运行 venice-security-reviewer scan path/to/your/code 输出大致如下:
Indexing /path/to/code (AST repo map)...
Repo map: 6 module(s), 14 import edge(s).
Scanning /path/to/code with model zai-org-glm-5...
Scanner produced 4 finding(s).
Chaining findings...
Chainer produced 1 chain(s).
Report written to report.md
                Scan summary
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━┓
┃ Metric                    ┃ Count ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━┩
│ Atomic findings           │     4 │
│ Exploit chains            │     1 │
└───────────────────────────┴───────┘
Markdown 报告在顶部显示每个链及其叙述,然后下方显示每个单独的发现及其严重程度、CWE、文件位置、描述和模型声称已读取的逐字片段。 参考仓库还附带四个捆绑的演示目标,每个都行使链接器必须执行的不同形状的推理:
  • examples/vulnerable_app — 一个多文件 Flask 应用,有三个”低”发现,其中两个跨文件组合成关键的特权升级链。测试链接器是否对其组合内容有选择性。
  • examples/url_preview — 一个多文件 URL 获取器,具有不按迭代应用的防御性允许列表。测试结合部署拓扑(链接本地 IP 是云凭证网关)的跨文件数据流推理。
  • examples/csv_query — 一个单文件 CSV 过滤器,通过 __class__.__base__.__subclasses__()eval 沙盒逃逸。测试语言级推理而不是 HTTP 流。
  • examples/webhook_handler — 一个单文件 HMAC 验证器,具有 JSON 解析器差异漏洞。测试跨多个规范的推理。
使用以下命令试一下:
uv run venice-security-reviewer scan examples/vulnerable_app
uv run venice-security-reviewer scan examples/csv_query
如果您看到 CLI 日志 chainer referenced N unknown finding id(s) or file(s); chains dropped,那是交叉引用验证器捕获模型发明链的行为。丢弃的链永远不会进入报告;您只会收到警告,可用于调整 prompt 或采样额外的链接器运行。

扩展此示例

双 agent 形状很好地泛化。一些值得探索的方向:
  • 更多语言。 扫描器在 prompt 级别是语言无关的;AST 构建器是 Python 特有的。换用 tree-sitter,您可以为 TypeScript、Go 或 Rust 构建相同的邻域/压缩映射形状。
  • 第三个 agent 用于修复。 一旦您有一条链,要求 Patcher agent 起草一个统一 diff 来解除一个组成发现的武装是一个小步骤。针对相同的证据文件集进行 Pydantic 验证 diff,您可以免费获得相同的幻觉保护。
  • 输出格式。 render_report 是唯一了解 Markdown 的地方。添加 SARIF 渲染器,相同的发现可以放入 GitHub 代码扫描。添加 JSON 渲染器,您可以将结果传递到下游系统。
  • 按文件哈希缓存。 扫描器的按文件调用是独立且幂等的。按 (file_hash, prompt_hash, model) 缓存意味着重新扫描一个文件已更改的仓库只会重新运行该一个文件的扫描器。
  • 链接器采样。 对于高风险运行,以稍高的温度调用链接器 N 次,并交集结果。模型一致发现的链更可能是真实的;它发现一次再也找不到的链可能是噪声。
  • 更强的模型。 zai-org-glm-5 是默认值,因为它在组合推理的成本和质量之间取得了良好的平衡,但对于更难的代码库,换用更强的 Venice 模型(通过 VENICE_MODEL 设置)可以使链接器的叙述明显更紧凑。

收尾

感谢阅读!希望这帮助您了解如何构建一个真正值得信赖的 AI 安全工具。我们在这里使用的模式也超越了安全:每当您希望 LLM 跨文件推理时,必须以真实证据为基础,配方都是相同的。构建确定性结构映射、将适合上下文的切片交给模型、根据结构验证模型的引用,并丢弃它无法落地的任何内容。 通过使用 Python 和 Venice AI API,我们可以构建结合 LLM 推理与硬验证边界的 agent,并交付一个给出有用答案的东西,而不是一个听起来自信的东西。