메인 콘텐츠로 건너뛰기
대부분의 정적 보안 도구는 격리된 환경에서 버그를 찾습니다. 한 파일을 스캔하고, 이슈를 나열하고, 다음으로 넘어갑니다. 문제는 현대 코드베이스에서 가장 큰 피해를 주는 취약점은 거의 하나의 버그가 아니라는 것입니다. 그것들은 사슬입니다: 하드코딩된 서명 키 + 누락된 권한 확인 + SQL injection이 각각 따로 보면 다 관리 가능해 보이지만 함께라면 계정 탈취 경로가 됩니다. 이것이 바로 LLM이 잘하는 종류의 cross-cutting 추론입니다. 올바른 구조를 제공한다면 말이죠. 이 글에서는 Python과 Venice AI API로 두 에이전트 보안 코드 리뷰어를 만듭니다. 마지막에는 어떤 Python 코드베이스든 가리키면 원자적 발견과 익스플로잇 체인이 포함된 Markdown 보고서를 생성하는 CLI가 갖춰집니다. 전체 코드 구현이 궁금하다면 GitHub 레포를 확인하세요. 계속하기 전에 Venice API 키가 필요합니다. 환경 변수로 export하세요:
export VENICE_API_KEY=<my-key>

무엇을 만드나요

리뷰어는 몇 가지 명확한 부분으로 구성된 작은 Python 프로젝트입니다:
PartWhat it does
Pydantic 모델Evidence, Finding, Chain 정의, LLM과 나머지 프로그램 사이에 단단한 검증 경계 제공
Venice 클라이언트Venice의 OpenAI 호환 endpoint를 가리키는 OpenAI Python SDK를 래핑
AST 레포 맵Python의 ast 모듈로 타겟 트리를 순회해 모든 모듈의 공개 심볼과 import 엣지의 결정적 맵을 빌드
Scanner 에이전트한 번에 한 Python 파일과 레포 맵의 파일별 neighborhood 슬라이스를 읽고, file:line 증거가 있는 원자적 취약점 발견을 출력
Chainer 에이전트발견의 합집합과 압축된 전체 레포 맵을 읽고, 둘 이상의 발견을 결합한 익스플로잇 체인을 출력
Reference validatorScanner가 생성하지 않은 finding ID를 참조하거나, 참조된 findings 중 어디서도 오지 않은 파일을 명명하는 체인을 모두 폐기
Markdown 보고서findings와 chains를 사람이 읽기 쉬운 보고서로 렌더링
CLITyper로 모든 것을 연결
흐름은 다음과 같습니다:
  1. 타겟 디렉터리에서 .py 파일 순회.
  2. 결정적 레포 맵 빌드(imports, 공개 심볼, 시그니처).
  3. 각 파일에 대해 Scanner에 소스와 맵의 파일별 neighborhood 슬라이스를 전송해 원자적 발견 수집.
  4. 발견의 합집합과 압축된 레포 맵을 Chainer에 전송하고 익스플로잇 체인 수집.
  5. Scanner가 생성하지 않은 finding ID를 참조하거나, 참조된 findings 중 어디서도 오지 않은 파일을 명명하는 체인을 모두 폐기.
  6. Markdown 보고서 작성.
코드를 작성하기 전에 짚어둘 두 가지 설계 결정이 있습니다. 첫 번째는 왜 하나가 아닌 두 에이전트인가입니다. 모든 것을 하나의 prompt에서 하려는 단일 에이전트 스캐너는 파일별 버그에 철저한 것과 조합적 추론에 영리한 것 사이에서 균형을 잡아야 합니다. 작업을 분리하면 Scanner는 끈질기고 시끄러울 수 있고, Chainer는 선별적이고 조용할 수 있습니다. findings를 결합하는 데 전념하는 LLM 호출 하나를 추가하면 매우 적은 추가 코드로 한 가지 버그 클래스 전체가 열립니다. 두 번째는 왜 레포 맵인가입니다. 실제 코드베이스는 많은 파일에 걸쳐 있습니다. “validator가 실행되지만 fetcher에서 반복마다 적용되지 않고, fetcher의 응답이 renderer에서 끝나는” 버그는 파일별 스캐너에게는 보이지 않습니다. 어떤 LLM 호출 전에 Python의 ast로 타겟 트리를 순회하고 구조 맵을 빌드합니다. Scanner는 파일별 neighborhood를 봅니다(이 파일에서 import하는 사람, 이 파일이 import하는 것, 그 외부 심볼의 시그니처). Chainer는 압축된 전체 맵을 봅니다(모든 모듈, 모든 공개 심볼, 모든 import 엣지, 소스 없음). 이것이 전체 코드베이스를 모든 prompt에 채우는 토큰 비용을 지불하지 않고도 Chainer가 모듈 경계를 넘는 데이터 흐름의 체인을 구성할 수 있게 하는 최소량의 context 엔지니어링입니다.

사전 요구사항

  • Python 3.12+
  • venice.ai의 Venice API 키
  • Pydantic, Python의 ast 모듈, OpenAI Python SDK에 대한 기본 친숙도
레퍼런스 레포는 의존성 관리에 uv를 사용하지만, 일반 가상 환경도 동등하게 동작합니다.

프로젝트 설정

새 프로젝트 생성 및 의존성 설치:
mkdir venice-security-reviewer
cd venice-security-reviewer
uv init
uv add "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0"
pip를 선호한다면 가상 환경을 만드세요:
python -m venv .venv
source .venv/bin/activate
pip install "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0"
로컬 개발용 .env 파일 생성:
VENICE_API_KEY=your-venice-api-key-here
# 선택적 override
# VENICE_BASE_URL=https://api.venice.ai/api/v1
# VENICE_MODEL=zai-org-glm-5
소스는 패키지로 import 가능하도록 src/venice_security_reviewer/ 아래에 두고, prompt는 레포 루트의 prompts/ 아래에 두어 다른 소스 아티팩트처럼 리뷰하고 diff할 수 있게 합니다:
src/venice_security_reviewer/
  __init__.py
  models.py     # Pydantic 모델
  client.py     # Venice 클라이언트 팩토리
  repo_map.py   # AST로 빌드된 레포 맵
  scanner.py    # Scanner 에이전트
  chainer.py    # Chainer 에이전트
  report.py     # Jinja2 Markdown 렌더링
  cli.py        # Typer CLI
  templates/
    report.md.j2
prompts/
  scanner.md
  chainer.md
tests/
  test_models.py
  test_cross_file_chain.py

Venice 클라이언트 설정

Venice는 OpenAI 호환이므로 공식 OpenAI Python SDK를 사용하고 base_url을 Venice로 가리키기만 하면 됩니다. 클라이언트 생성을 한 파일에 중앙화하면 나머지 코드는 어떤 공급자와 통신하는지 알 필요가 없습니다: 백엔드 교체는 이 모듈만 건드리면 됩니다. src/venice_security_reviewer/client.py 생성:
from __future__ import annotations

import os
from dataclasses import dataclass

from dotenv import load_dotenv
from openai import OpenAI

DEFAULT_BASE_URL = "https://api.venice.ai/api/v1"
DEFAULT_MODEL = "zai-org-glm-5"


class VeniceConfigError(RuntimeError):
    """Raised when Venice client config is missing or invalid."""


@dataclass(frozen=True, slots=True)
class VeniceConfig:
    api_key: str
    base_url: str
    model: str

    @classmethod
    def from_env(cls) -> "VeniceConfig":
        load_dotenv()
        api_key = os.getenv("VENICE_API_KEY")
        if not api_key:
            raise VeniceConfigError(
                "VENICE_API_KEY is not set. Add it to your .env file, "
                "or export VENICE_API_KEY in your shell."
            )
        return cls(
            api_key=api_key,
            base_url=os.getenv("VENICE_BASE_URL", DEFAULT_BASE_URL),
            model=os.getenv("VENICE_MODEL", DEFAULT_MODEL),
        )


def build_client(config: VeniceConfig | None = None) -> tuple[OpenAI, str]:
    cfg = config or VeniceConfig.from_env()
    client = OpenAI(api_key=cfg.api_key, base_url=cfg.base_url)
    return client, cfg.model
몇 가지 짚어볼 점:
  • 강력한 범용 Venice 모델이라서 zai-org-glm-5를 기본값으로 사용하지만, VENICE_MODEL 환경 변수로 override할 수 있습니다. 더 크거나 더 미묘한 코드베이스에서는 더 강력한 모델로 교체하면 Chainer의 서술 품질이 눈에 띄게 개선될 수 있습니다.
  • build_client는 클라이언트 모델 ID를 반환해, 호출자가 직접 환경 변수를 읽을 필요가 없고 테스트는 monkeypatching 없이 가짜 config를 주입할 수 있습니다.

데이터 모델 정의

raw dict를 주고받는 대신 Pydantic을 사용하는 핵심은 LLM과 나머지 프로그램 사이에 단단한 검증 경계를 얻는 것입니다. 모델이 잘못된 JSON을 반환하거나 존재하지 않는 finding ID를 만들어내면 파싱이 시끄럽게 실패하고 환각을 보고서로 전파하지 않습니다. src/venice_security_reviewer/models.py 생성:
from __future__ import annotations

from pathlib import Path
from typing import Literal, Self

from pydantic import BaseModel, ConfigDict, Field, model_validator

Severity = Literal["low", "medium", "high", "critical"]
ChainSeverity = Literal["high", "critical"]


class Evidence(BaseModel):
    """A concrete code span that justifies a finding."""

    model_config = ConfigDict(frozen=True)

    file: Path
    start_line: int = Field(ge=1)
    end_line: int = Field(ge=1)
    snippet: str

    @model_validator(mode="after")
    def _check_line_range(self) -> Self:
        if self.end_line < self.start_line:
            raise ValueError(
                f"end_line ({self.end_line}) must be >= start_line ({self.start_line})"
            )
        return self


class Finding(BaseModel):
    """An atomic vulnerability surfaced by the Scanner agent."""

    model_config = ConfigDict(frozen=True)

    id: str = Field(pattern=r"^F-\d{3,}$")
    title: str = Field(min_length=1)
    severity: Severity
    description: str = Field(min_length=1)
    cwe: str | None = None
    evidence: Evidence


class Chain(BaseModel):
    """An exploit chain combining two or more atomic findings."""

    model_config = ConfigDict(frozen=True)

    id: str = Field(pattern=r"^C-\d{3,}$")
    findings: list[str] = Field(min_length=2)
    narrative: str = Field(min_length=1)
    severity: ChainSeverity
    files_involved: list[Path] = Field(min_length=1)
여기 제약 조건들은 실제 일을 합니다:
  • Finding.idChain.idF-001, C-001 같은 정규식으로 제한됩니다. 모델이 포맷을 창의적으로 만들면 검증이 실패합니다.
  • Chain.findings는 최소 두 개의 항목을 요구합니다: 하나의 finding으로 만든 “체인”은 그냥 finding입니다.
  • Chain.severityhigh 또는 critical로 제한됩니다. 가장 높은 개별 severity 위로 영향을 올리지 않는 findings 조합은 보고할 가치가 있는 체인이 아닙니다.
  • Evidenceend_line >= start_line을 강제해 모델이 무의미한 라인 범위를 반환할 수 없게 합니다.
그것이 모양 검증입니다. cross-reference 검증도 필요합니다: Scanner가 생성하지 않은 finding ID를 참조하는 체인은 무의미합니다. models.py에 다음 함수를 추가하세요:
def validate_chain_references(
    chains: list[Chain], findings: list[Finding]
) -> tuple[list[Chain], list[Chain]]:
    findings_by_id = {f.id: f for f in findings}
    valid: list[Chain] = []
    dropped: list[Chain] = []
    for chain in chains:
        if not all(ref in findings_by_id for ref in chain.findings):
            dropped.append(chain)
            continue
        chain_evidence_files = {
            findings_by_id[ref].evidence.file.as_posix() for ref in chain.findings
        }
        if not all(p.as_posix() in chain_evidence_files for p in chain.files_involved):
            dropped.append(chain)
            continue
        valid.append(chain)
    return valid, dropped
이것이 Chainer를 정직하게 유지하는 결정적 가드레일입니다. Scanner가 실제로 생성한 findings만 참조할 수 있고, 그 findings 중 적어도 하나가 실제로 온 파일만 체인에 관련된 파일로 주장할 수 있습니다. 폐기된 체인을 조용히 필터링하지 않고 반환하면 모델이 무언가를 지어내려 할 때 CLI가 경고를 표시할 수 있습니다.

AST 레포 맵 빌드

레포 맵은 Python 코드베이스의 구조적 골격입니다: 모든 모듈의 공개 표면, 모든 import 엣지, “모듈 M”에서 “M에서 import하는 모듈들”로의 역방향 인덱스. 실행이 아닌 Python의 ast로 스캔 실행당 한 번 빌드되므로 적대적 코드에도 안전하게 실행할 수 있습니다: 파서는 스캔된 트리에서 어떤 것도 import하거나 호출하지 않습니다. 맵을 두 가지 모양으로 소비합니다. Scanner는 prompt 크기가 제한되도록 파일별 neighborhood 슬라이스를 받습니다. Chainer는 파일 간 체인을 구성할 수 있도록 압축된 전체 맵을 받습니다. src/venice_security_reviewer/repo_map.py를 생성하고 맵을 설명하는 Pydantic 모델로 시작하세요:
from __future__ import annotations

import ast
import logging
from collections.abc import Iterable
from pathlib import Path
from typing import Literal

from pydantic import BaseModel, ConfigDict, Field

logger = logging.getLogger(__name__)

SymbolKind = Literal["function", "class", "constant"]
_SIGNATURE_CHAR_CAP = 200

SKIP_DIR_NAMES: frozenset[str] = frozenset({
    ".git", ".venv", "venv", "env", "__pycache__", "node_modules",
    "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "site-packages",
})


class SymbolDef(BaseModel):
    model_config = ConfigDict(frozen=True)
    name: str
    kind: SymbolKind
    line: int = Field(ge=1)
    signature: str | None = None


class ImportEdge(BaseModel):
    model_config = ConfigDict(frozen=True)
    from_module: str
    imported_names: list[str]
    line: int = Field(ge=1)


class ModuleEntry(BaseModel):
    model_config = ConfigDict(frozen=True)
    path: Path
    module_name: str
    defines: list[SymbolDef]
    imports: list[ImportEdge]
    exports: list[str]
이제 트리를 순회하고 인덱싱해서는 안 되는 디렉터리를 건너뛰는 헬퍼:
def _iter_python_files(root: Path) -> Iterable[Path]:
    for path in sorted(root.rglob("*.py")):
        if any(part in SKIP_DIR_NAMES for part in path.parts):
            continue
        if path.is_file():
            yield path


def _path_to_module_name(path: Path, root: Path) -> str:
    rel = path.relative_to(root)
    parts = list(rel.with_suffix("").parts)
    if parts and parts[-1] == "__init__":
        parts = parts[:-1]
    return ".".join(parts)
각 파일에 대해 AST에서 세 가지를 원합니다: 정의하는 최상위 심볼, import 엣지, 존재한다면 명시적 __all__ 목록. 함수 시그니처와 클래스 헤더는 LLM이 직접 읽을 수 있는 컴팩트한 문자열로 렌더링됩니다:
def _render_signature(node: ast.FunctionDef | ast.AsyncFunctionDef) -> str:
    try:
        prefix = "async def " if isinstance(node, ast.AsyncFunctionDef) else "def "
        args = ast.unparse(node.args)
        returns = f" -> {ast.unparse(node.returns)}" if node.returns is not None else ""
        sig = f"{prefix}{node.name}({args}){returns}"
        if len(sig) > _SIGNATURE_CHAR_CAP:
            return f"{prefix}{node.name}(...)"
        return sig
    except Exception:
        return f"def {node.name}(...)"


def _render_class_header(node: ast.ClassDef) -> str:
    try:
        bases = [ast.unparse(b) for b in node.bases]
        sig = f"class {node.name}({', '.join(bases)})" if bases else f"class {node.name}"
        if len(sig) > _SIGNATURE_CHAR_CAP:
            return f"class {node.name}(...)"
        return sig
    except Exception:
        return f"class {node.name}"
200의 _SIGNATURE_CHAR_CAP은 (타입 힌트를 포함한) 일반적인 실제 시그니처를 보존하면서, 200줄짜리 타이프된 union 같은 병적인 경우가 prompt를 폭발시키는 것을 막습니다. 다음으로, 파싱된 모듈에서 구조 데이터를 뽑아내는 추출기. ast.FunctionDef, ast.ClassDef, 상수에 대한 최상위 ast.Assignast.AnnAssign, import 엣지에 대한 ast.Importast.ImportFrom을 모두 처리합니다. 상대 import는 절대 점 표기 형태로 해결되어 Chainer가 나중에 모듈 이름과 매칭할 수 있게 합니다:
def _resolve_relative_package(
    *, importer_module: str, importer_is_init: bool, level: int
) -> str | None:
    if level <= 0:
        return None
    importer_parts = importer_module.split(".") if importer_module else []
    base_parts = list(importer_parts) if importer_is_init else importer_parts[:-1]
    steps_up = level - 1
    if steps_up > len(base_parts):
        return None
    package_parts = (
        base_parts[: len(base_parts) - steps_up] if steps_up else list(base_parts)
    )
    return ".".join(package_parts)
전체 추출 로직은 tree.body를 순회하고 각 최상위 노드에 대해 SymbolDefImportEdge 항목을 출력합니다. 레퍼런스 레포의 repo_map.py에 있는 _extract 함수가 전체 구현을 다룹니다. 결과 모양은 파일당 하나의 ModuleEntry 객체 목록입니다. 흥미로운 부분은 그 항목들로 무엇을 하느냐입니다. 두 개의 소비자 측 메서드가 있는 RepoMap으로 감싸세요:
class RepoMap(BaseModel):
    model_config = ConfigDict(frozen=True)
    root: Path
    modules: list[ModuleEntry]

    def by_module_name(self, module_name: str) -> ModuleEntry | None:
        for m in self.modules:
            if m.module_name == module_name:
                return m
        return None

    def importers_of(self, module_name: str) -> list["ImportingRef"]:
        refs: list["ImportingRef"] = []
        for m in self.modules:
            for edge in m.imports:
                if edge.from_module == module_name:
                    refs.append(
                        ImportingRef(
                            importer_path=m.path,
                            importer_module=m.module_name,
                            imported_names=list(edge.imported_names),
                            line=edge.line,
                        )
                    )
        return refs

    def neighborhood(self, path: Path) -> "ModuleNeighborhood | None":
        m = next((mod for mod in self.modules if mod.path == path), None)
        if m is None:
            return None
        return ModuleNeighborhood(
            this_module=m,
            imported_by=self.importers_of(m.module_name),
            imports_from_repo=self.resolve_imports_in_repo(m.module_name),
        )

    def condensed_dict(self) -> dict[str, object]:
        return {
            "modules": [
                {
                    "path": str(m.path),
                    "module": m.module_name,
                    "exports": list(m.exports),
                    "imports": [
                        {"from": e.from_module, "names": list(e.imported_names)}
                        for e in m.imports
                    ],
                }
                for m in self.modules
            ]
        }
neighborhood(path)가 Scanner가 각 파일에 대해 호출하는 것입니다. 모듈 자체, 그것에서 import하는 모든 다른 모듈, 그것이 다른 곳에서 import하는 모든 in-repo 심볼(해결된 시그니처와 함께)을 포함하는 ModuleNeighborhood 객체를 반환합니다. 그러면 Scanner는 전체 코드베이스를 prompt에 끌어들이지 않고도 cross-file context에서만 명확한 findings를 표시할 충분한 context를 갖습니다. condensed_dict()가 Chainer가 받는 것입니다. snippet과 시그니처는 떨어집니다. 경로, 모듈 이름, 공개 exports, import 엣지만 남습니다. 그것이 Chainer가 모듈 간 데이터 흐름에 대해 추론할 수 있게 하는 가장 작은 표현입니다. 마지막으로, 전체를 빌드하는 진입점:
def build_repo_map(root: Path) -> RepoMap:
    root = root.resolve()
    modules: list[ModuleEntry] = []
    for path in _iter_python_files(root):
        rel = path.relative_to(root)
        module_name = _path_to_module_name(path, root)
        is_init = path.stem == "__init__"
        try:
            source = path.read_text(encoding="utf-8")
            tree = ast.parse(source)
        except (OSError, SyntaxError, UnicodeDecodeError) as exc:
            logger.warning("repo_map: skipping %s: %s", rel, exc)
            continue
        defines, imports, explicit_all = _extract(
            tree, importer_module=module_name, importer_is_init=is_init
        )
        exports = explicit_all or [s.name for s in defines if not s.name.startswith("_")]
        modules.append(
            ModuleEntry(
                path=rel,
                module_name=module_name,
                defines=defines,
                imports=imports,
                exports=exports,
            )
        )
    return RepoMap(root=root, modules=modules)
읽을 수 없거나 파싱에 실패하는 파일은 로그되고 건너뛰어집니다. 전체 실행을 실패시키는 대신 부분 맵을 반환합니다. 최악의 경우는 Scanner 호출이 한 파일에 대해 neighborhood가 없다는 것을 보는 것이며, 이는 여전히 동작하는 스캔입니다.

Scanner 에이전트 작성

Scanner는 타겟 경로를 순회하고, Python 소스 파일을 가져와 한 번에 한 파일씩 원자적 취약점을 식별하도록 Venice에 요청합니다. 파일별 스캐닝은 prompt를 작게 유지하고 실패를 격리합니다: 잘못된 파일 하나가 전체 실행을 죽이지 않습니다. prompt 자체를 별도 파일에 두어 다른 소스 아티팩트처럼 리뷰하고 diff할 수 있게 합니다. prompts/scanner.md 생성:
You are a static security analyst reviewing a single Python source file for
vulnerabilities. You will be given the file path, its full contents, and a
*neighborhood* slice of the surrounding repo: which other modules import
from this file (and what symbols they pull), and which in-repo symbols this
file imports from elsewhere. You must respond with a JSON object that lists
every distinct vulnerability you can identify, with concrete file:line
evidence for each.

# Rules

1. Output a single JSON object. No prose before or after. No markdown fences.
2. The object must match this schema exactly:

```json
{
  "findings": [
    {
      "id": "F-001",
      "title": "Short imperative title, e.g. 'Hardcoded session signing key'",
      "severity": "low | medium | high | critical",
      "description": "One to three sentences explaining the vulnerability and why it matters.",
      "cwe": "CWE-798 or null if not applicable",
      "evidence": {
        "file": "{filename}",
        "start_line": 12,
        "end_line": 14,
        "snippet": "the exact lines from the source, copied verbatim including whitespace"
      }
    }
  ]
}
```

3. Finding IDs must be sequential within this file: F-001, F-002, F-003, etc.
4. The `file` field in evidence must equal the filename you were given, exactly.
5. `start_line` and `end_line` must be 1-indexed line numbers from the source you were given.
6. The `snippet` must be the exact text of those lines, copied verbatim. Do not paraphrase. Do not truncate.
7. Do not invent vulnerabilities. If you are unsure, omit it. False positives waste the operator's time and erode trust in the tool.
8. Every finding's evidence must point at lines in THIS file. Do not produce findings whose evidence lives in a different file. The Chainer is the agent that reasons across files.
9. If the file contains no vulnerabilities, return `{"findings": []}`.
레퍼런스 레포의 전체 prompt에는 일반적인 취약점 클래스(하드코딩된 시크릿, SQL injection, command injection, SSRF, 안전하지 않은 역직렬화 등)를 나열하는 “What to look for” 섹션과 모델이 cross-file context를 어떻게 소비해야 하는지 설명하는 “How to use the neighborhood” 섹션도 포함됩니다. prompt 설계 노트 몇 가지:
  • 모델에게 prose나 fence 없이 JSON만 출력하라고 말합니다. OpenAI SDK는 API 측에서 이를 강제하는 response_format={"type": "json_object"} 파라미터를 지원하지만, prompt에서 다시 강화하면 엣지 케이스를 줄입니다.
  • Scanner가 cross-file 체인을 생성하는 것을 명시적으로 금지합니다. 체인은 Chainer의 일이며, Scanner에게 둘 다 하라고 요청하면 책임이 흐려집니다.
  • snippet을 verbatim으로 복사하도록 요구합니다. 그러면 보고서는 모델이 봤다고 주장하는 정확한 바이트를 인용할 수 있고, 리뷰어는 snippet을 소스와 비교해 finding을 spot-check할 수 있습니다.
이제 에이전트 코드. src/venice_security_reviewer/scanner.py를 생성하고 파일 walker와 prompt 로더로 시작하세요:
from __future__ import annotations

import json
import logging
from collections.abc import Iterable, Iterator
from pathlib import Path

from openai import OpenAI
from pydantic import ValidationError

from .models import Finding
from .repo_map import ModuleNeighborhood, RepoMap

logger = logging.getLogger(__name__)

DEFAULT_SOURCE_EXTENSIONS: frozenset[str] = frozenset({".py"})

SKIP_DIR_NAMES: frozenset[str] = frozenset({
    ".git", ".venv", "venv", "env", "__pycache__", "node_modules",
    "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "site-packages",
})

MAX_FILE_BYTES = 200_000


def _load_prompt_template(name: str) -> str:
    here = Path(__file__).resolve()
    return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8")


def iter_source_files(
    root: Path, extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS
) -> Iterator[Path]:
    exts = {e.lower() for e in extensions}
    for path in sorted(root.rglob("*")):
        if not path.is_file():
            continue
        if path.suffix.lower() not in exts:
            continue
        if any(part in SKIP_DIR_NAMES for part in path.parts):
            continue
        try:
            if path.stat().st_size > MAX_FILE_BYTES:
                logger.warning("skipping %s: exceeds %d bytes", path, MAX_FILE_BYTES)
                continue
        except OSError:
            continue
        yield path
MAX_FILE_BYTES는 안전 상한입니다. 약 200 KB를 넘으면 비싸고 품질도 낮을 가능성이 높은 거대한 prompt를 보내는 대신 건너뜁니다. 다음 부품은 prompt 빌더입니다. 템플릿은 {filename}, {source}, {neighborhood}를 자리표시자로 사용합니다. 템플릿이 리터럴 중괄호가 있는 JSON 예시를 포함하므로 .format() 대신 str.replace를 사용합니다:
def _render_neighborhood(neighborhood: ModuleNeighborhood | None) -> str:
    if neighborhood is None:
        return "null"
    return neighborhood.model_dump_json(indent=2)


def _build_prompt(
    template: str, *, filename: str, source: str, neighborhood: ModuleNeighborhood | None
) -> str:
    return (
        template.replace("{filename}", filename)
        .replace("{source}", source)
        .replace("{neighborhood}", _render_neighborhood(neighborhood))
    )
이제 파서. JSON을 역직렬화하고, 각 finding을 Pydantic으로 검증하고, 전체 파일을 실패시키는 대신 개별 잘못된 findings를 드롭합니다. 잘못된 finding 하나가 좋은 것들을 잃게 해서는 안 됩니다:
def _parse_findings(raw: str, *, source_file: Path) -> list[Finding]:
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError(f"model did not return valid JSON: {exc}") from exc

    if not isinstance(data, dict) or "findings" not in data:
        raise ValueError("model JSON missing 'findings' key")

    findings: list[Finding] = []
    for entry in data["findings"]:
        try:
            findings.append(Finding.model_validate(entry))
        except ValidationError as exc:
            logger.warning("dropping malformed finding from %s: %s", source_file, exc)
    return findings
Scanner는 파일당 F-001 같은 ID를 출력하지만, Chainer는 전체 레포에 걸쳐 findings를 참조해야 합니다. 전역적으로 고유하도록 단조 증가 카운터에 대해 ID를 재발급합니다:
def _renumber_findings(findings: list[Finding], offset: int) -> tuple[list[Finding], int]:
    renumbered: list[Finding] = []
    for i, f in enumerate(findings):
        new_id = f"F-{offset + i + 1:03d}"
        renumbered.append(f.model_copy(update={"id": new_id}))
    return renumbered, offset + len(findings)
단일 파일 스캔 호출은 이 모든 것을 결합합니다. 파일을 읽고, prompt를 빌드하고, response_format={"type": "json_object"}와 낮은 temperature로 Venice에 보내고, 결과를 파싱합니다:
def scan_file(
    client: OpenAI,
    model: str,
    path: Path,
    *,
    prompt_template: str,
    repo_root: Path,
    repo_map: RepoMap,
    max_retries: int = 1,
) -> list[Finding]:
    try:
        source = path.read_text(encoding="utf-8")
    except (OSError, UnicodeDecodeError) as exc:
        logger.warning("could not read %s: %s", path, exc)
        return []

    rel = path.relative_to(repo_root)
    neighborhood = repo_map.neighborhood(rel)
    prompt = _build_prompt(
        prompt_template, filename=str(rel), source=source, neighborhood=neighborhood
    )

    last_error: Exception | None = None
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You are a precise static security analyst. You respond "
                            "only with valid JSON matching the schema in the user prompt."
                        ),
                    },
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.1,
            )
        except Exception as exc:
            logger.warning("Venice call failed for %s on attempt %d: %s", rel, attempt, exc)
            last_error = exc
            continue

        content = response.choices[0].message.content or ""
        try:
            findings = _parse_findings(content, source_file=path)
        except ValueError as exc:
            logger.warning("parse failure for %s on attempt %d: %s", rel, attempt, exc)
            last_error = exc
            continue

        return [
            f.model_copy(update={"evidence": f.evidence.model_copy(update={"file": rel})})
            for f in findings
        ]

    logger.error("giving up on %s after %d attempts: %s", rel, max_retries + 1, last_error)
    return []
강조할 만한 두 가지 세부사항:
  • 파싱 증거 파일 경로를 repo_root에 상대적으로 패치합니다. 모델은 우리가 준 파일 이름을 그대로 돌려주지만 보고서 전체에 걸쳐 하나의 표준 형식을 원하기 때문입니다.
  • temperature=0.1은 의도적으로 낮습니다. Scanner가 실행 간 보수적이고 일관되기를 원합니다. 창의성은 Chainer의 일입니다.
마지막으로, 루트 아래의 모든 적격 파일을 스캔하는 오케스트레이터:
def scan_path(
    client: OpenAI,
    model: str,
    root: Path,
    repo_map: RepoMap,
    *,
    extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS,
) -> list[Finding]:
    template = _load_prompt_template("scanner.md")
    all_findings: list[Finding] = []
    offset = 0
    for path in iter_source_files(root, extensions=extensions):
        logger.info("scanning %s", path.relative_to(root))
        findings = scan_file(
            client, model, path,
            prompt_template=template,
            repo_root=root,
            repo_map=repo_map,
        )
        renumbered, offset = _renumber_findings(findings, offset)
        all_findings.extend(renumbered)
    return all_findings
레포 맵은 호출자에 의해 한 번 빌드되고 모든 파일에 재사용되어, 개별 파일이 파싱에 실패하거나 건너뛰어지더라도 Scanner가 일관된 전역 구조를 봅니다.

Chainer 에이전트 작성

Chainer는 Scanner findings의 합집합과 압축된 레포 맵을 받아 Venice에 findings 중 어떤 것이 실제 익스플로잇 체인으로 결합되는지 묻습니다. 두 개의 결정적 가드레일이 LLM과 보고서 사이에 있습니다:
  1. 모든 체인은 Scanner가 생성한 finding ID만 참조해야 합니다.
  2. 모든 체인은 최소 하나의 참조된 finding 증거가 닿는 파일만 주장해야 합니다.
두 규칙 중 하나를 위반하는 체인은 파싱 시점에 폐기됩니다. 이는 모델이 “혹시 모르니” 체인을 환각하고 증거가 없는 파일에 걸친다고 주장하는 것을 막습니다. Chainer prompt는 prompts/chainer.md에 있습니다. 핵심은 다음과 같습니다:
You are a senior offensive security engineer. You are given a list of atomic
vulnerability findings discovered in a single codebase, plus a structural map
of that codebase showing every module's public symbols and import edges. Your
job is to identify whether any subset of the findings can be combined into a
real, end-to-end exploit chain.

# Rules

1. Output a single JSON object. No prose before or after. No markdown fences.
2. The object must match this schema exactly:

```json
{
  "chains": [
    {
      "id": "C-001",
      "findings": ["F-001", "F-003"],
      "narrative": "Step-by-step explanation of how an attacker combines these specific findings into a single exploit. Reference each finding by ID where it is used.",
      "severity": "high | critical",
      "files_involved": ["pkg/validators.py", "pkg/fetcher.py"]
    }
  ]
}
```

3. Chain IDs must be sequential: C-001, C-002, C-003, etc.
4. Every entry in `findings` MUST be the ID of a finding from the input list. You may NOT invent new finding IDs.
5. Every entry in `files_involved` MUST be the `evidence.file` of at least one of the findings you reference in this chain.
6. A chain must reference at least two distinct findings.
7. Chains are by definition severity high or critical. If a combination doesn't raise the impact above the highest individual severity, it is not a chain worth reporting.
8. If no real chain exists, return `{"chains": []}`. It is correct and expected for many codebases to have findings that do not chain.
레퍼런스 레포의 전체 prompt에는 레포 맵을 읽는 방법, files_involved에 무엇이 들어가는지 결정하는 방법, 결정적으로 체인하지 않을 때를 설명합니다. 모델에게 “많은 코드베이스가 체인되지 않는 findings를 갖는 것은 옳고 기대되는 일”이라고 말하는 것이 모델이 생산적으로 보이려고 체인을 발명하는 것을 막습니다. 이제 에이전트 코드. src/venice_security_reviewer/chainer.py 생성:
from __future__ import annotations

import json
import logging
from pathlib import Path

from openai import OpenAI
from pydantic import ValidationError

from .models import Chain, Finding, validate_chain_references
from .repo_map import RepoMap

logger = logging.getLogger(__name__)

MAX_REPO_MAP_CHARS = 8000


def _load_prompt_template(name: str) -> str:
    here = Path(__file__).resolve()
    return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8")
MAX_REPO_MAP_CHARS = 8000은 Chainer prompt의 JSON 렌더링된 레포 맵 블록에 대한 소프트 상한입니다. 대략 토큰당 4자로, 약 2000 토큰이며, findings와 narrative 예산이 위에 있어도 모든 Venice 모델의 context 윈도우 안에 편안하게 들어갑니다. findings를 컴팩트한 JSON 블록으로 직렬화합니다. 여기서 의도적으로 evidence에서 snippet을 제거합니다: Chainer는 두 findings가 결합되는지 결정하기 위해 raw 바이트가 필요 없고, 그것들을 포함시키면 실제 코드베이스에서 토큰 비용이 대략 두 배가 됩니다:
def _findings_to_input_json(findings: list[Finding]) -> str:
    payload = [
        {
            "id": f.id,
            "title": f.title,
            "severity": f.severity,
            "description": f.description,
            "cwe": f.cwe,
            "evidence": {
                "file": str(f.evidence.file),
                "start_line": f.evidence.start_line,
                "end_line": f.evidence.end_line,
            },
        }
        for f in findings
    ]
    return json.dumps(payload, indent=2)
더 큰 코드베이스의 경우 전체 압축된 레포 맵이 문자 예산을 초과할 수 있습니다. 그럴 때 finding을 가진 모듈과 그들의 직접 이웃으로 가지치기합니다. 이는 증거가 있는 체인에 대해 Chainer가 추론할 수 있는 충분한 구조를 보존하고 나머지는 폐기합니다:
def _prune_for_budget(
    repo_map: RepoMap, findings: list[Finding], *, char_budget: int
) -> dict[str, object]:
    full = repo_map.condensed_dict()
    if len(json.dumps(full)) <= char_budget:
        return full

    finding_files = {f.evidence.file for f in findings}
    keep_modules = {
        m.module_name for m in repo_map.modules if m.path in finding_files
    }
    if not keep_modules:
        return full

    neighbours: set[str] = set()
    for m in repo_map.modules:
        if m.module_name in keep_modules:
            for edge in m.imports:
                neighbours.add(edge.from_module)
        for edge in m.imports:
            if edge.from_module in keep_modules:
                neighbours.add(m.module_name)
    keep_modules.update(neighbours)

    pruned_modules = [
        {
            "path": str(m.path),
            "module": m.module_name,
            "exports": list(m.exports),
            "imports": [
                {"from": e.from_module, "names": list(e.imported_names)}
                for e in m.imports
            ],
        }
        for m in repo_map.modules
        if m.module_name in keep_modules
    ]
    return {
        "modules": pruned_modules,
        "_pruned": True,
        "_kept": len(pruned_modules),
        "_total": len(repo_map.modules),
    }


def _render_repo_map(
    repo_map: RepoMap, findings: list[Finding], *, char_budget: int = MAX_REPO_MAP_CHARS
) -> str:
    payload = _prune_for_budget(repo_map, findings, char_budget=char_budget)
    if payload.get("_pruned"):
        logger.info(
            "chainer: repo map pruned for token budget (kept %s of %s modules)",
            payload.get("_kept"),
            payload.get("_total"),
        )
    return json.dumps(payload, indent=2)
가지치기 전략은 의도적으로 단순합니다: findings가 사는 모듈을 유지하고, 그들의 직접 import-그래프 이웃을 유지합니다. 더 먼 것은 현재 증거가 있는 체인에서 그럴듯한 역할이 없으므로 추론력을 잃지 않고 떨어뜨릴 수 있습니다. 또한 payload에 _pruned, _kept, _total 마커로 주석을 달아 맵이 트림된 경우 Chainer prompt가 모델에 경고할 수 있도록 합니다. 응답 파싱은 Scanner와 같은 모양입니다: 역직렬화, 각 체인을 Pydantic으로 검증, 잘못된 항목 드롭:
def _parse_chains(raw: str) -> list[Chain]:
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError(f"chainer did not return valid JSON: {exc}") from exc

    if not isinstance(data, dict) or "chains" not in data:
        raise ValueError("chainer JSON missing 'chains' key")

    chains: list[Chain] = []
    for entry in data["chains"]:
        try:
            chains.append(Chain.model_validate(entry))
        except ValidationError as exc:
            logger.warning("dropping malformed chain: %s", exc)
    return chains
그런 다음 에이전트 자체:
def find_chains(
    client: OpenAI,
    model: str,
    findings: list[Finding],
    repo_map: RepoMap,
    *,
    max_retries: int = 1,
) -> tuple[list[Chain], list[Chain]]:
    if len(findings) < 2:
        return [], []

    template = _load_prompt_template("chainer.md")
    prompt = template.replace(
        "{findings_json}", _findings_to_input_json(findings)
    ).replace("{repo_map}", _render_repo_map(repo_map, findings))

    last_error: Exception | None = None
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You are a senior offensive security engineer. You respond "
                            "only with valid JSON matching the schema in the user prompt."
                        ),
                    },
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.2,
            )
        except Exception as exc:
            logger.warning("Venice call failed on attempt %d: %s", attempt, exc)
            last_error = exc
            continue

        content = response.choices[0].message.content or ""
        try:
            chains = _parse_chains(content)
        except ValueError as exc:
            logger.warning("chainer parse failure on attempt %d: %s", attempt, exc)
            last_error = exc
            continue

        valid, dropped = validate_chain_references(chains, findings)
        if dropped:
            logger.warning(
                "chainer referenced %d unknown finding id(s) or file(s); chains dropped: %s",
                len(dropped),
                [c.id for c in dropped],
            )
        return valid, dropped

    logger.error("giving up on chainer after %d attempts: %s", max_retries + 1, last_error)
    return [], []
짚어볼 만한 몇 가지:
  • findings가 2개 미만일 때는 모델을 호출하기 전에 빠져나옵니다. 단일 finding으로는 체인을 만들 수 없고, 호출을 건너뛰면 보장된 빈 결과에 토큰을 태우지 않습니다.
  • temperature=0.2는 Scanner의 0.1보다 약간 높습니다. Chainer는 명백하지 않은 조합을 발견하는 데 약간 더 많은 창의성의 혜택을 받지만, 여전히 받은 findings와 맵에 근거하기를 원합니다.
  • 파싱 후 우리가 앞서 작성한 결정적 cross-reference 검사인 validate_chain_references가 실행됩니다. 살아남는 것은 렌더링하기에 안전하고, 그렇지 않은 것은 운영자가 모델이 무언가를 발명하려 했다는 것을 알 수 있도록 로그됩니다.
그 cross-reference 검사가 전체 프로젝트에서 가장 중요한 부분입니다. 그것이 “유용한 보안 도구”와 “가끔 자신 있게 틀린 AI 보고서”의 경계입니다. 그것이 있으면 모델이 환각하더라도 잘못된 체인이 보고서에 도달하지 않습니다.

Markdown 보고서 렌더링

렌더링을 에이전트 로직과 분리하면 같은 FindingChain 객체가 나중에 Scanner나 Chainer를 건드리지 않고 다른 형식(JSON, SARIF, HTML)에 공급될 수 있습니다. 작은 템플릿 파일과 함께 Jinja2를 사용합니다. src/venice_security_reviewer/templates/report.md.j2 생성:
# Security Review Report

**Target:** `{{ target }}`
**Scanned at:** {{ scanned_at }}
**Model:** `{{ model }}`

---

## Summary

- **Atomic findings:** {{ findings | length }}
- **Exploit chains:** {{ chains | length }}
{%- if dropped_chains %}
- **Dropped chains (referenced unknown findings):** {{ dropped_chains | length }}
{%- endif %}

---

## Exploit Chains

{% if not chains %}
_No exploit chains were identified by the Chainer agent._
{% else %}
{% for c in chains %}
### {{ c.id }}{{ c.severity | upper }}

**Findings combined:** {{ c.findings | join(', ') }}
**Files involved:** {{ c.files_involved | map('string') | join(', ') }}

{{ c.narrative }}

{% endfor %}
{% endif %}

---

## Atomic Findings

{% for f in findings %}
### {{ f.id }}{{ f.title }}

- **Severity:** {{ f.severity }}
{%- if f.cwe %}
- **CWE:** {{ f.cwe }}
{%- endif %}
- **Location:** `{{ f.evidence.file }}:{{ f.evidence.start_line }}-{{ f.evidence.end_line }}`

{{ f.description }}

```
{{ f.evidence.snippet }}
```

{% endfor %}
그런 다음 src/venice_security_reviewer/report.py의 렌더러:
from __future__ import annotations

from datetime import UTC, datetime
from pathlib import Path

from jinja2 import Environment, PackageLoader, select_autoescape

from .models import Chain, Finding


def _build_env() -> Environment:
    return Environment(
        loader=PackageLoader("venice_security_reviewer", "templates"),
        autoescape=select_autoescape(enabled_extensions=("html",)),
        keep_trailing_newline=True,
    )


def render_report(
    *,
    target: Path,
    model: str,
    findings: list[Finding],
    chains: list[Chain],
    dropped_chains: list[Chain] | None = None,
) -> str:
    env = _build_env()
    template = env.get_template("report.md.j2")
    return template.render(
        target=str(target),
        scanned_at=datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC"),
        model=model,
        findings=findings,
        chains=chains,
        dropped_chains=dropped_chains or [],
    )
Markdown 템플릿(Markdown은 HTML이 아니므로)에는 autoescape를 꺼두지만, 확장자별로 향후 .html 템플릿에는 활성화된 상태로 둡니다.

CLI 연결

CLI는 오케스트레이터입니다: 레포 맵 빌드, 스캔, 체인, 렌더링. 인수 파싱은 Typer, 멋진 요약 테이블 출력은 Rich를 사용합니다. src/venice_security_reviewer/cli.py 생성:
from __future__ import annotations

import logging
import sys
from pathlib import Path
from typing import Annotated

import typer
from rich.console import Console
from rich.table import Table

from .chainer import find_chains
from .client import VeniceConfigError, build_client
from .models import Chain, Finding
from .repo_map import build_repo_map
from .report import render_report
from .scanner import scan_path

app = typer.Typer(
    add_completion=False,
    help="Two-agent security code reviewer powered by Venice AI.",
    no_args_is_help=True,
)
console = Console()


@app.callback()
def _root() -> None:
    """Force Typer to keep `scan` as a named subcommand."""


def _configure_logging(verbose: bool) -> None:
    logging.basicConfig(
        level=logging.DEBUG if verbose else logging.INFO,
        format="%(levelname)s %(name)s: %(message)s",
        stream=sys.stderr,
    )


def _print_summary(
    findings: list[Finding], chains: list[Chain], dropped: list[Chain]
) -> None:
    table = Table(title="Scan summary", show_header=True, header_style="bold")
    table.add_column("Metric")
    table.add_column("Count", justify="right")
    table.add_row("Atomic findings", str(len(findings)))
    table.add_row("Exploit chains", str(len(chains)))
    if dropped:
        table.add_row("Chains dropped (bad refs)", str(len(dropped)))
    console.print(table)


@app.command()
def scan(
    path: Annotated[
        Path,
        typer.Argument(
            exists=True, file_okay=False, dir_okay=True, readable=True, resolve_path=True,
            help="Path to the codebase to scan.",
        ),
    ],
    out: Annotated[
        Path, typer.Option("--out", "-o", help="Where to write the Markdown report.")
    ] = Path("report.md"),
    verbose: Annotated[
        bool, typer.Option("--verbose", "-v", help="Enable debug logging.")
    ] = False,
) -> None:
    """Scan a codebase for vulnerabilities and exploit chains."""
    _configure_logging(verbose)

    try:
        client, model = build_client()
    except VeniceConfigError as exc:
        console.print(f"[red]error:[/red] {exc}")
        raise typer.Exit(code=2) from exc

    console.print(f"[bold]Indexing[/bold] {path} (AST repo map)...")
    repo_map = build_repo_map(path)
    edge_count = sum(len(m.imports) for m in repo_map.modules)
    console.print(
        f"Repo map: [bold]{len(repo_map.modules)}[/bold] module(s), "
        f"[bold]{edge_count}[/bold] import edge(s)."
    )

    console.print(f"[bold]Scanning[/bold] {path} with model [cyan]{model}[/cyan]...")
    findings = scan_path(client, model, path, repo_map)
    console.print(f"Scanner produced [bold]{len(findings)}[/bold] finding(s).")

    console.print("[bold]Chaining[/bold] findings...")
    chains, dropped = find_chains(client, model, findings, repo_map)
    console.print(f"Chainer produced [bold]{len(chains)}[/bold] chain(s).")

    report = render_report(
        target=path, model=model,
        findings=findings, chains=chains, dropped_chains=dropped,
    )
    out.write_text(report, encoding="utf-8")
    console.print(f"Report written to [green]{out}[/green]")
    _print_summary(findings, chains, dropped)


def main() -> None:
    app()


if __name__ == "__main__":
    main()
pyproject.toml에 스크립트 진입점 추가:
[project.scripts]
venice-security-reviewer = "venice_security_reviewer.cli:app"
그것이 전체 파이프라인의 연결입니다.

가드레일 테스트

이 빌드 전반에 걸쳐 한 가지 아이디어에 강하게 의존했습니다: 결정적 가드레일이 유용한 보안 도구와 자신 있게 틀린 것을 분리합니다. 그 주장은 가드레일이 실제로 유지된다는 것을 증명할 수 있을 때만 가치가 있으므로, 이 프로젝트에서 가장 가치 있는 테스트는 Venice를 전혀 호출하지 않습니다. Pydantic 경계와 prompt 조립 배관을 잠그며, 그것은 API 키와 토큰 비용 없이 오프라인에서 밀리초 단위로 실행된다는 의미입니다. 먼저 dev 의존성 추가:
uv add --dev "pytest>=8.3" "ruff>=0.7" "mypy>=1.13"
테스트할 가치가 있는 첫 번째 것은 모델 경계 자체입니다. 이 테스트는 잘못된 findings와 chains가 보고서에 도달하기 전 구성 시점에 거부되는 것을 주장합니다. tests/test_models.py 생성:
from __future__ import annotations

from pathlib import Path

import pytest
from pydantic import ValidationError

from venice_security_reviewer.models import (
    Chain,
    Evidence,
    Finding,
    validate_chain_references,
)


def _finding(fid: str) -> Finding:
    return Finding(
        id=fid,
        title="t",
        severity="medium",
        description="d",
        evidence=Evidence(file=Path("a.py"), start_line=1, end_line=2, snippet="x"),
    )


def test_evidence_rejects_inverted_line_range() -> None:
    with pytest.raises(ValidationError):
        Evidence(file=Path("a.py"), start_line=10, end_line=5, snippet="x")


def test_finding_id_pattern_enforced() -> None:
    with pytest.raises(ValidationError):
        Finding(
            id="not-an-id",
            title="t",
            severity="medium",
            description="d",
            evidence=Evidence(file=Path("a.py"), start_line=1, end_line=2, snippet="x"),
        )


def test_chain_requires_two_findings() -> None:
    with pytest.raises(ValidationError):
        Chain(
            id="C-001",
            findings=["F-001"],
            narrative="n",
            severity="high",
            files_involved=[Path("a.py")],
        )
이들 각각은 우리가 앞서 모델에 둔 제약을 반영합니다: 뒤집힌 라인 범위, F-### 패턴에 맞지 않는 ID, 단일 finding의 “체인”. 그것들 중 어느 것이라도 raise를 멈추면 환각 클래스 하나 전체가 조용히 다시 가능해진 것입니다. 가장 중요한 테스트는 cross-reference validator를 다룹니다. 이는 실제로 발명된 체인을 떨어뜨리는 함수이기 때문입니다:
def test_validate_chain_references_drops_unknown_ids() -> None:
    findings = [_finding("F-001"), _finding("F-002")]
    good = Chain(
        id="C-001",
        findings=["F-001", "F-002"],
        narrative="n",
        severity="critical",
        files_involved=[Path("a.py")],
    )
    bad = Chain(
        id="C-002",
        findings=["F-001", "F-999"],
        narrative="n",
        severity="critical",
        files_involved=[Path("a.py")],
    )
    valid, dropped = validate_chain_references([good, bad], findings)
    assert [c.id for c in valid] == ["C-001"]
    assert [c.id for c in dropped] == ["C-002"]
F-999는 Scanner가 결코 생성하지 않았으므로 그것을 참조하는 체인은 dropped에 들어가고 보고서에 도달하지 않습니다. 레퍼런스 레포의 동반 테스트인 test_validate_chain_references_drops_unknown_files는 findings 중 어디서도 오지 않은 파일을 주장하는 체인에 대해 같은 일을 합니다. 테스트할 가치가 있는 두 번째는 Chainer에 공급하는 배관입니다. prompt 조립을 리팩토링하고 조용히 cross-file context 전달을 멈추기 쉬우며, 그러면 Chainer는 계속 작동하지만 조용히 나빠집니다. 이 테스트는 2-모듈 fixture를 빌드하고, prompt를 렌더링하고, cross-file 정보가 실제로 존재하는지 다시 한 번 Venice 왕복 없이 주장합니다. tests/test_cross_file_chain.py 생성:
from __future__ import annotations

from pathlib import Path

from venice_security_reviewer.chainer import (
    _findings_to_input_json,
    _load_prompt_template,
    _render_repo_map,
)
from venice_security_reviewer.models import Evidence, Finding
from venice_security_reviewer.repo_map import build_repo_map


def _write(root: Path, rel: str, content: str) -> None:
    path = root / rel
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content, encoding="utf-8")


def test_chainer_prompt_carries_cross_file_context(tmp_path: Path) -> None:
    _write(tmp_path, "validators.py", "def is_safe_url(url: str) -> bool:\n    return True")
    _write(
        tmp_path,
        "fetcher.py",
        "from .validators import is_safe_url\n\ndef fetch(url: str) -> bytes:\n    return b''",
    )

    rmap = build_repo_map(tmp_path)
    findings = [
        Finding(
            id="F-001",
            title="Validator returns True unconditionally",
            severity="low",
            description="The validator always returns True.",
            evidence=Evidence(
                file=Path("validators.py"), start_line=1, end_line=2, snippet="..."
            ),
        ),
        Finding(
            id="F-002",
            title="Fetcher trusts a stub validator",
            severity="low",
            description="The fetcher gates network access on is_safe_url.",
            evidence=Evidence(
                file=Path("fetcher.py"), start_line=1, end_line=1, snippet="..."
            ),
        ),
    ]

    template = _load_prompt_template("chainer.md")
    prompt = template.replace(
        "{findings_json}", _findings_to_input_json(findings)
    ).replace("{repo_map}", _render_repo_map(rmap, findings))

    assert "{findings_json}" not in prompt and "{repo_map}" not in prompt
    assert "F-001" in prompt and "F-002" in prompt
    assert "validators.py" in prompt and "fetcher.py" in prompt
    assert "is_safe_url" in prompt
이 테스트가 통과한다면 Chainer는 두 findings, 두 파일 경로, 그것들 사이의 import 엣지를 모두 포함하는 prompt를 받고 있는 것입니다. 모델이 그 정보를 잘 사용하는지는 별도의 out-of-band 평가입니다. 이 테스트는 정보를 prompt에 넣는 배관만 보호합니다. 다음 명령으로 전체 스위트와 linter, 타입 체커를 실행하세요:
uv run pytest          # 오프라인 테스트, 라이브 Venice 호출 없음
uv run ruff check .
uv run mypy src/
이 테스트들은 네트워크에 닿지 않으므로 토큰을 태우거나 Venice 키 없이 모든 커밋과 CI에서 안전하게 실행할 수 있습니다. 레퍼런스 레포에는 또한 tests/test_scanner_parse.py, tests/test_chainer_parse.py, tests/test_repo_map.py가 포함되어 있으며, JSON 파싱 엣지 케이스(잘못된 항목이 실행을 충돌시키는 대신 드롭됨)와 AST 레포 맵 빌더를 다룹니다.

프로젝트 실행

실제 코드베이스에서 시도하려면 CLI를 Python 소스 디렉터리에 가리키세요:
uv run venice-security-reviewer scan path/to/your/code
또는 pip install -e .로 virtualenv에 설치하고 venice-security-reviewer scan path/to/your/code를 실행하세요. 출력은 대략 다음과 같이 보입니다:
Indexing /path/to/code (AST repo map)...
Repo map: 6 module(s), 14 import edge(s).
Scanning /path/to/code with model zai-org-glm-5...
Scanner produced 4 finding(s).
Chaining findings...
Chainer produced 1 chain(s).
Report written to report.md
                Scan summary
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━┓
┃ Metric                    ┃ Count ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━┩
│ Atomic findings           │     4 │
│ Exploit chains            │     1 │
└───────────────────────────┴───────┘
Markdown 보고서는 각 체인을 상단에 narrative와 함께 표시하고, 그 아래에 severity, CWE, 파일 위치, 설명, 모델이 읽었다고 주장하는 verbatim snippet과 함께 모든 개별 finding을 표시합니다. 레퍼런스 레포는 또한 Chainer가 해야 하는 추론의 다른 모양을 각각 운동시키는 4개의 번들된 데모 타겟과 함께 출시됩니다:
  • examples/vulnerable_app — 3개의 “low” findings가 있는 다중 파일 Flask 앱으로, 그 중 2개가 파일 간 critical 권한 상승 체인으로 결합됩니다. Chainer가 결합할 것에 대해 선별적인지 테스트합니다.
  • examples/url_preview — 반복마다 적용되지 않는 방어적 allowlist가 있는 다중 파일 URL fetcher. 배포 토폴로지와 결합된 cross-file 데이터 흐름 추론을 테스트합니다(link-local IP는 클라우드 자격 증명 게이트웨이).
  • examples/csv_query__class__.__base__.__subclasses__()를 통한 eval 샌드박스 탈출이 있는 단일 파일 CSV 필터. HTTP 흐름보다는 언어 수준 추론을 테스트합니다.
  • examples/webhook_handler — JSON 파서 차분 취약점이 있는 단일 파일 HMAC verifier. 여러 명세에 걸친 추론을 테스트합니다.
다음으로 시도해 보세요:
uv run venice-security-reviewer scan examples/vulnerable_app
uv run venice-security-reviewer scan examples/csv_query
CLI가 chainer referenced N unknown finding id(s) or file(s); chains dropped를 로그하는 것을 보면, 그것은 모델이 체인을 발명하는 현장을 cross-reference validator가 잡은 것입니다. 폐기된 체인은 보고서에 도달하지 않습니다. prompt를 조정하거나 추가 Chainer 실행을 샘플링하는 데 사용할 수 있는 경고만 받습니다.

이 예제 확장

두 에이전트 모양은 잘 일반화됩니다. 탐색할 가치가 있는 몇 가지 방향:
  • 더 많은 언어. Scanner는 prompt 수준에서 언어 독립적입니다. AST 빌더가 Python 전용입니다. tree-sitter로 교체하면 TypeScript, Go, Rust에 대해 같은 neighborhood/압축 맵 모양을 빌드할 수 있습니다.
  • 수정을 위한 세 번째 에이전트. 체인을 얻으면 Patcher 에이전트에 그 구성 findings 중 하나를 무력화하는 통합 diff를 초안하도록 요청하는 것이 작은 단계입니다. 같은 evidence-file 집합에 대해 diff를 Pydantic-검증하면 같은 환각 가드를 공짜로 얻습니다.
  • 출력 형식. render_report는 Markdown에 대해 아는 유일한 곳입니다. SARIF 렌더러를 추가하면 같은 findings가 GitHub code scanning에 들어갈 수 있습니다. JSON 렌더러를 추가하면 결과를 다운스트림 시스템에 파이프할 수 있습니다.
  • 파일 해시로 캐싱. Scanner의 파일별 호출은 독립적이고 멱등입니다. (file_hash, prompt_hash, model)로 캐싱하면 한 파일이 변경된 레포를 다시 스캔할 때 그 한 파일에서만 Scanner가 재실행됩니다.
  • Chainer를 위한 샘플링. 고위험 실행의 경우 약간 더 높은 temperature에서 Chainer를 N번 호출하고 결과를 교차시키세요. 모델이 일관되게 찾는 체인은 실제일 가능성이 더 높고, 한 번 찾고 다시 못 찾는 체인은 노이즈일 가능성이 높습니다.
  • 더 강력한 모델. zai-org-glm-5가 기본값인 이유는 조합적 추론에 대해 비용과 품질 사이의 좋은 균형을 이루기 때문이지만, 더 어려운 코드베이스의 경우 더 강력한 Venice 모델(VENICE_MODEL로 설정)로 교체하면 Chainer의 narrative가 눈에 띄게 더 타이트해질 수 있습니다.

마무리

읽어 주셔서 감사합니다! 이것이 실제로 신뢰할 수 있는 AI 보안 도구를 어떻게 구조화하는지 이해하는 데 도움이 되었기를 바랍니다. 여기서 사용한 패턴은 보안 외에도 일반화됩니다: LLM이 실제 증거에 ground되어야 하는 방식으로 파일 간 추론을 하기를 원할 때마다 레시피는 같습니다. 결정적 구조 맵을 빌드하고, context에 맞는 그것의 슬라이스를 모델에 주고, 모델의 참조를 구조에 대해 다시 검증하고, ground할 수 없는 모든 것을 떨어뜨리세요. Python과 Venice AI API를 사용함으로써 LLM 추론을 단단한 검증 경계와 결합하는 에이전트를 만들 수 있고, 자신 있게 들리는 답변 대신 유용한 답변을 주는 것을 출하할 수 있습니다.