الانتقال إلى المحتوى الرئيسي
تعثر معظم أدوات الأمان الثابتة على العلل بمعزل. تفحص ملفًا واحدًا، تُدرج المشكلات، ثم تنتقل. والمشكلة أن أكثر الثغرات تدميرًا في قواعد الشيفرة الحديثة نادرًا ما تكون عللاً فردية. إنها سلسلة: مفتاح توقيع مضمَّن في الشيفرة، إضافة إلى فحص ترخيص مفقود، إضافة إلى حقن SQL — كلٌّ منها بمفرده يبدو قابلًا للإدارة. لكنها مجتمعة طريق للاستيلاء على الحساب. هذا بالضبط نوع التفكير المتشابك الذي تتقنه نماذج اللغة الكبيرة، إن أعطيتها البنية المناسبة. في هذا المقال، سنبني مراجع أمان لشيفرة من وكيلين باستخدام Python و Venice AI API. بنهاية المقال، ستحصل على CLI يمكنك توجيهه إلى أي قاعدة شيفرة Python لإنتاج تقرير Markdown يحوي نتائج ذرّية وسلاسل استغلال. هل تهتم بالتنفيذ الكامل للكود؟ راجع مستودع GitHub. قبل المتابعة، ستحتاج إلى مفتاح Venice API. صدّره كمتغيّر بيئة:
export VENICE_API_KEY=<my-key>

ما الذي سنبنيه

المراجع مشروع Python صغير ببضعة أجزاء واضحة:
الجزءما يفعله
نماذج Pydanticتعرّف Evidence وFinding وChain، وتمنحنا حدود تحقق صارمة بين LLM وبقية البرنامج
عميل Veniceيلفّ OpenAI Python SDK مع توجيهه إلى نقطة نهاية Venice المتوافقة مع OpenAI
خريطة المستودع بـ ASTيمشي شجرة الهدف بوحدة ast في Python ويبني خريطة محدّدة لكل رموز كل وحدة العامة وحواف الاستيراد
وكيل الفحص (Scanner)يقرأ ملف Python واحدًا في كل مرة بالإضافة إلى شريحة الحيّ (neighborhood) من خريطة المستودع، ويُصدر نتائج ثغرات ذرّية مع أدلة file:line
وكيل التسلسل (Chainer)يقرأ اتحاد النتائج بالإضافة إلى خريطة مستودع مكثّفة، ويُصدر سلاسل استغلال تجمع نتيجتين أو أكثر
محقّق المراجعيُسقط أي سلسلة تُشير إلى معرّف نتيجة لم يُنتجه Scanner، أو تُسمي ملفًا لم يأتِ منه أي من نتائجها المُشار إليها
تقرير Markdownيعرض النتائج والسلاسل في تقرير قابل للقراءة من البشر
CLIيربط كل شيء معًا باستخدام Typer
يبدو التدفق هكذا:
  1. امشِ في دليل الهدف لملفات .py.
  2. ابنِ خريطة مستودع محدّدة (الاستيرادات، الرموز العامة، التواقيع).
  3. لكل ملف، أرسل إلى Scanner مصدره بالإضافة إلى شريحة الحيّ من الخريطة، واجمع النتائج الذرّية.
  4. أرسل اتحاد النتائج بالإضافة إلى خريطة المستودع المكثّفة إلى Chainer واجمع سلاسل الاستغلال.
  5. أسقط أي سلسلة تُشير إلى معرّف نتيجة لم يُنتجه Scanner، أو تُسمي ملفًا لم يأتِ منه أي من نتائجها المُشار إليها.
  6. اكتب تقرير Markdown.
قراران تصميميان يستحقّان الإشارة قبل أن نبدأ بكتابة الشيفرة. الأول هو لماذا وكيلان بدلًا من واحد. فاحص بوكيل واحد يحاول فعل كل شيء في تعليمة واحدة عليه الموازنة بين التعمّق في علل كل ملف وبين الذكاء في التفكير التوافقي. تقسيم العمل يعني أن Scanner يمكنه أن يكون لا يكلّ وضوضائيًا، وأن Chainer يمكنه أن يكون انتقائيًا وهادئًا. إضافة استدعاء LLM واحد إضافي مكرّس لدمج النتائج تفتح فئة كاملة من العلل بشيفرة إضافية ضئيلة. الثاني هو لماذا خريطة مستودع. قواعد الشيفرة الحقيقية تعيش عبر ملفات كثيرة. علّة تتكوّن من «المتحقّق يعمل لكنه لا يُطبَّق لكل تكرار في الجالب، وردّ الجالب ينتهي في العارض» غير مرئية لفاحص لكل ملف. قبل أي استدعاء LLM، نمشي شجرة الهدف بـast في Python ونبني خريطة بنيوية. يرى Scanner حيًّا لكل ملف (مَن يستورد من هذا الملف، ما يستورده هذا الملف، تواقيع تلك الرموز الخارجية). يرى Chainer خريطة كاملة مكثّفة (كل وحدة، كل رمز عام، كل حافة استيراد، بدون مصدر). هذا أقل قدر من هندسة السياق الذي وجدنا أنه يسمح لـ Chainer ببناء سلاسل يعبر تدفق بياناتها حدود الوحدات، دون دفع تكلفة الرموز لحشو قاعدة الشيفرة بالكامل في كل تعليمة.

المتطلبات المسبقة

  • Python 3.12+
  • مفتاح Venice API من venice.ai
  • إلمام أساسي بـ Pydantic، ووحدة ast في Python، و OpenAI Python SDK
يستخدم المستودع المرجعي uv لإدارة التبعيات، لكن بيئة افتراضية عادية تعمل بنفس الكفاءة.

إعداد المشروع

أنشئ مشروعًا جديدًا وثبّت التبعيات:
mkdir venice-security-reviewer
cd venice-security-reviewer
uv init
uv add "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0"
إذا فضّلت pip، أنشئ بيئة افتراضية بدلًا من ذلك:
python -m venv .venv
source .venv/bin/activate
pip install "openai>=1.54" "pydantic>=2.9" "typer>=0.12" "jinja2>=3.1" "python-dotenv>=1.0" "rich>=13.0"
أنشئ ملف .env للتطوير المحلي:
VENICE_API_KEY=your-venice-api-key-here
# تجاوزات اختيارية
# VENICE_BASE_URL=https://api.venice.ai/api/v1
# VENICE_MODEL=zai-org-glm-5
سنضع المصدر تحت src/venice_security_reviewer/ لإبقائه قابلًا للاستيراد كحزمة، مع التعليمات تحت prompts/ في جذر المستودع كي يمكن مراجعتها ومقارنتها كأي مصدر آخر:
src/venice_security_reviewer/
  __init__.py
  models.py     # نماذج Pydantic
  client.py     # مصنع عميل Venice
  repo_map.py   # خريطة مستودع مبنية بـ AST
  scanner.py    # وكيل Scanner
  chainer.py    # وكيل Chainer
  report.py     # عرض Markdown بـ Jinja2
  cli.py        # واجهة Typer
  templates/
    report.md.j2
prompts/
  scanner.md
  chainer.md
tests/
  test_models.py
  test_cross_file_chain.py

إعداد عميل Venice

Venice متوافق مع OpenAI، لذا يمكننا استخدام OpenAI Python SDK الرسمي وتوجيه base_url إلى Venice. تركيز إنشاء العميل في ملف واحد يعني أن باقي الشيفرة لا تحتاج إلى معرفة المزوّد الذي تخاطبه: تبديل الخلفية لا يلامس إلا هذه الوحدة الوحيدة. أنشئ src/venice_security_reviewer/client.py:
from __future__ import annotations

import os
from dataclasses import dataclass

from dotenv import load_dotenv
from openai import OpenAI

DEFAULT_BASE_URL = "https://api.venice.ai/api/v1"
DEFAULT_MODEL = "zai-org-glm-5"


class VeniceConfigError(RuntimeError):
    """Raised when Venice client config is missing or invalid."""


@dataclass(frozen=True, slots=True)
class VeniceConfig:
    api_key: str
    base_url: str
    model: str

    @classmethod
    def from_env(cls) -> "VeniceConfig":
        load_dotenv()
        api_key = os.getenv("VENICE_API_KEY")
        if not api_key:
            raise VeniceConfigError(
                "VENICE_API_KEY is not set. Add it to your .env file, "
                "or export VENICE_API_KEY in your shell."
            )
        return cls(
            api_key=api_key,
            base_url=os.getenv("VENICE_BASE_URL", DEFAULT_BASE_URL),
            model=os.getenv("VENICE_MODEL", DEFAULT_MODEL),
        )


def build_client(config: VeniceConfig | None = None) -> tuple[OpenAI, str]:
    cfg = config or VeniceConfig.from_env()
    client = OpenAI(api_key=cfg.api_key, base_url=cfg.base_url)
    return client, cfg.model
بعض الأمور الجديرة بالملاحظة:
  • نلجأ افتراضيًا إلى zai-org-glm-5 لأنه نموذج Venice قوي عام الغرض، لكن يمكنك تجاوزه عبر متغير البيئة VENICE_MODEL. لقواعد شيفرة أكبر أو أكثر دقّة، تبديل نموذج أقوى يمكن أن يجعل Chainer أفضل بشكل ملحوظ في جودة السرد.
  • يُرجع build_client العميل ومعرّف النموذج، لذا لا يضطر المتصلون لقراءة متغيرات البيئة بأنفسهم ويمكن للاختبارات حقن تكوين وهمي دون monkeypatching.

تعريف نماذج البيانات

الهدف الكامل من استخدام Pydantic هنا، بدلًا من تمرير قواميس خام، هو الحصول على حدود تحقق صارمة بين LLM وبقية البرنامج. إذا أعاد النموذج JSON مشوّهًا أو اخترع معرّف نتيجة غير موجود، يفشل التحليل بصخب ولا نُمرّر الهلوسة إلى التقرير أبدًا. أنشئ src/venice_security_reviewer/models.py:
from __future__ import annotations

from pathlib import Path
from typing import Literal, Self

from pydantic import BaseModel, ConfigDict, Field, model_validator

Severity = Literal["low", "medium", "high", "critical"]
ChainSeverity = Literal["high", "critical"]


class Evidence(BaseModel):
    """A concrete code span that justifies a finding."""

    model_config = ConfigDict(frozen=True)

    file: Path
    start_line: int = Field(ge=1)
    end_line: int = Field(ge=1)
    snippet: str

    @model_validator(mode="after")
    def _check_line_range(self) -> Self:
        if self.end_line < self.start_line:
            raise ValueError(
                f"end_line ({self.end_line}) must be >= start_line ({self.start_line})"
            )
        return self


class Finding(BaseModel):
    """An atomic vulnerability surfaced by the Scanner agent."""

    model_config = ConfigDict(frozen=True)

    id: str = Field(pattern=r"^F-\d{3,}$")
    title: str = Field(min_length=1)
    severity: Severity
    description: str = Field(min_length=1)
    cwe: str | None = None
    evidence: Evidence


class Chain(BaseModel):
    """An exploit chain combining two or more atomic findings."""

    model_config = ConfigDict(frozen=True)

    id: str = Field(pattern=r"^C-\d{3,}$")
    findings: list[str] = Field(min_length=2)
    narrative: str = Field(min_length=1)
    severity: ChainSeverity
    files_involved: list[Path] = Field(min_length=1)
القيود تقوم بعمل فعلي هنا:
  • Finding.id وChain.id مقيّدتان بتعبير عادي مثل F-001، C-001. إذا حاول النموذج الإبداع في التنسيق، يفشل التحقق.
  • Chain.findings يتطلب مدخلَيْن على الأقل: «سلسلة» من نتيجة واحدة هي مجرد نتيجة.
  • Chain.severity مقتصرة على high أو critical. مزيج نتائج لا يرفع الأثر فوق أعلى شدّة فردية ليس سلسلة تستحق الإبلاغ عنها.
  • Evidence يفرض أن end_line >= start_line بحيث لا يستطيع النموذج إرجاع نطاقات أسطر بلا معنى.
هذا تحقق الشكل. نحتاج أيضًا إلى تحقق المرجع المتبادل: سلسلة تُشير إلى معرّف نتيجة لم ينتجه Scanner أبدًا لا معنى لها. أضف هذه الدالة إلى models.py:
def validate_chain_references(
    chains: list[Chain], findings: list[Finding]
) -> tuple[list[Chain], list[Chain]]:
    findings_by_id = {f.id: f for f in findings}
    valid: list[Chain] = []
    dropped: list[Chain] = []
    for chain in chains:
        if not all(ref in findings_by_id for ref in chain.findings):
            dropped.append(chain)
            continue
        chain_evidence_files = {
            findings_by_id[ref].evidence.file.as_posix() for ref in chain.findings
        }
        if not all(p.as_posix() in chain_evidence_files for p in chain.files_involved):
            dropped.append(chain)
            continue
        valid.append(chain)
    return valid, dropped
هذه هي الحماية المحدّدة التي تُبقي Chainer أمينًا. لا يستطيع الإشارة إلا إلى نتائج أنتجها Scanner فعلًا، ولا يستطيع ادّعاء ملفات متورّطة في السلسلة إلا تلك التي جاء منها فعلًا أي من نتائجه. إعادة السلاسل المُسقطة بدلًا من تصفيتها بصمت تتيح لـ CLI إظهار تحذير حين يحاول النموذج اختراع شيء.

بناء خريطة المستودع بـ AST

خريطة المستودع هي الهيكل البنيوي لقاعدة شيفرة Python: السطح العام لكل وحدة، كل حافة استيراد، وفهرس عكسي من «وحدة M» إلى «الوحدات التي تستورد من M». تُبنى مرة واحدة لكل تشغيل فحص باستخدام ast في Python، أبدًا عبر التنفيذ، لذا فهي آمنة للتشغيل على شيفرة عدائية: المحلّل لا يستورد أو يستدعي شيئًا من الشجرة المفحوصة. سنستهلك الخريطة بشكلين. يحصل Scanner على شريحة حيّ لكل ملف بحيث تبقى تعليماته محدودة الحجم. ويحصل Chainer على خريطة كاملة مكثّفة ليتمكن من بناء سلاسل عبر الملفات. أنشئ src/venice_security_reviewer/repo_map.py وابدأ بنماذج Pydantic التي تصف الخريطة:
from __future__ import annotations

import ast
import logging
from collections.abc import Iterable
from pathlib import Path
from typing import Literal

from pydantic import BaseModel, ConfigDict, Field

logger = logging.getLogger(__name__)

SymbolKind = Literal["function", "class", "constant"]
_SIGNATURE_CHAR_CAP = 200

SKIP_DIR_NAMES: frozenset[str] = frozenset({
    ".git", ".venv", "venv", "env", "__pycache__", "node_modules",
    "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "site-packages",
})


class SymbolDef(BaseModel):
    model_config = ConfigDict(frozen=True)
    name: str
    kind: SymbolKind
    line: int = Field(ge=1)
    signature: str | None = None


class ImportEdge(BaseModel):
    model_config = ConfigDict(frozen=True)
    from_module: str
    imported_names: list[str]
    line: int = Field(ge=1)


class ModuleEntry(BaseModel):
    model_config = ConfigDict(frozen=True)
    path: Path
    module_name: str
    defines: list[SymbolDef]
    imports: list[ImportEdge]
    exports: list[str]
الآن الدالة المساعدة التي تمشي الشجرة وتتخطى الأدلة التي يجب ألّا نفهرسها:
def _iter_python_files(root: Path) -> Iterable[Path]:
    for path in sorted(root.rglob("*.py")):
        if any(part in SKIP_DIR_NAMES for part in path.parts):
            continue
        if path.is_file():
            yield path


def _path_to_module_name(path: Path, root: Path) -> str:
    rel = path.relative_to(root)
    parts = list(rel.with_suffix("").parts)
    if parts and parts[-1] == "__init__":
        parts = parts[:-1]
    return ".".join(parts)
لكل ملف نريد ثلاثة أشياء من AST: الرموز التي يعرّفها على المستوى الأعلى، حواف الاستيراد، وقائمة __all__ صريحة إن كانت موجودة. تواقيع الدوال وترويسات الفئات تُعرض كسلاسل مدمجة يستطيع LLM قراءتها مباشرة:
def _render_signature(node: ast.FunctionDef | ast.AsyncFunctionDef) -> str:
    try:
        prefix = "async def " if isinstance(node, ast.AsyncFunctionDef) else "def "
        args = ast.unparse(node.args)
        returns = f" -> {ast.unparse(node.returns)}" if node.returns is not None else ""
        sig = f"{prefix}{node.name}({args}){returns}"
        if len(sig) > _SIGNATURE_CHAR_CAP:
            return f"{prefix}{node.name}(...)"
        return sig
    except Exception:
        return f"def {node.name}(...)"


def _render_class_header(node: ast.ClassDef) -> str:
    try:
        bases = [ast.unparse(b) for b in node.bases]
        sig = f"class {node.name}({', '.join(bases)})" if bases else f"class {node.name}"
        if len(sig) > _SIGNATURE_CHAR_CAP:
            return f"class {node.name}(...)"
        return sig
    except Exception:
        return f"class {node.name}"
يحافظ _SIGNATURE_CHAR_CAP البالغ 200 على التواقيع الواقعية النموذجية (بما فيها تلميحات الأنواع) مع منع الحالات المرضية كاتحاد مكتوب من 200 سطر يُفجّر التعليمة. بعد ذلك، المُستخرج الذي يسحب البيانات البنيوية من وحدة محلّلة. نتعامل مع ast.FunctionDef وast.ClassDef وast.Assign وast.AnnAssign على المستوى الأعلى للثوابت، وكل من ast.Import وast.ImportFrom لحواف الاستيراد. الاستيرادات النسبية تُحلّ إلى صيغتها المطلقة بنقاط بحيث يستطيع Chainer مطابقتها بأسماء الوحدات لاحقًا:
def _resolve_relative_package(
    *, importer_module: str, importer_is_init: bool, level: int
) -> str | None:
    if level <= 0:
        return None
    importer_parts = importer_module.split(".") if importer_module else []
    base_parts = list(importer_parts) if importer_is_init else importer_parts[:-1]
    steps_up = level - 1
    if steps_up > len(base_parts):
        return None
    package_parts = (
        base_parts[: len(base_parts) - steps_up] if steps_up else list(base_parts)
    )
    return ".".join(package_parts)
منطق الاستخراج الكامل يمشي tree.body ويُصدر مدخلات SymbolDef وImportEdge لكل عقدة من المستوى الأعلى. تغطي دالة _extract في repo_map.py للمستودع المرجعي التنفيذ الكامل. الشكل الذي يخرج هو قائمة من كائنات ModuleEntry، واحد لكل ملف. الجزء المثير هو ما نفعله بتلك المدخلات. لفّها في RepoMap بطريقتين موجّهتين للمستهلك:
class RepoMap(BaseModel):
    model_config = ConfigDict(frozen=True)
    root: Path
    modules: list[ModuleEntry]

    def by_module_name(self, module_name: str) -> ModuleEntry | None:
        for m in self.modules:
            if m.module_name == module_name:
                return m
        return None

    def importers_of(self, module_name: str) -> list["ImportingRef"]:
        refs: list["ImportingRef"] = []
        for m in self.modules:
            for edge in m.imports:
                if edge.from_module == module_name:
                    refs.append(
                        ImportingRef(
                            importer_path=m.path,
                            importer_module=m.module_name,
                            imported_names=list(edge.imported_names),
                            line=edge.line,
                        )
                    )
        return refs

    def neighborhood(self, path: Path) -> "ModuleNeighborhood | None":
        m = next((mod for mod in self.modules if mod.path == path), None)
        if m is None:
            return None
        return ModuleNeighborhood(
            this_module=m,
            imported_by=self.importers_of(m.module_name),
            imports_from_repo=self.resolve_imports_in_repo(m.module_name),
        )

    def condensed_dict(self) -> dict[str, object]:
        return {
            "modules": [
                {
                    "path": str(m.path),
                    "module": m.module_name,
                    "exports": list(m.exports),
                    "imports": [
                        {"from": e.from_module, "names": list(e.imported_names)}
                        for e in m.imports
                    ],
                }
                for m in self.modules
            ]
        }
neighborhood(path) هو ما يستدعيه Scanner لكل ملف. يُرجع كائن ModuleNeighborhood يحتوي الوحدة نفسها، وكل وحدة أخرى تستورد منها، وكل رمز داخل المستودع تستورده من غيرها (مع تواقيعها المُحلّلة). يمنح ذلك Scanner سياقًا كافيًا لرصد نتائج لا تكون واضحة إلا في سياق متعدد الملفات، دون جرّ قاعدة الشيفرة كاملة إلى التعليمة. condensed_dict() هو ما يحصل عليه Chainer. تُسقط المقاطع والتواقيع؛ ولا يبقى سوى المسارات وأسماء الوحدات والصادرات العامة وحواف الاستيراد. هذا أصغر تمثيل لا يزال يتيح لـ Chainer التفكير في تدفق البيانات بين الوحدات. أخيرًا، نقطة الدخول التي تبني الشيء بأكمله:
def build_repo_map(root: Path) -> RepoMap:
    root = root.resolve()
    modules: list[ModuleEntry] = []
    for path in _iter_python_files(root):
        rel = path.relative_to(root)
        module_name = _path_to_module_name(path, root)
        is_init = path.stem == "__init__"
        try:
            source = path.read_text(encoding="utf-8")
            tree = ast.parse(source)
        except (OSError, SyntaxError, UnicodeDecodeError) as exc:
            logger.warning("repo_map: skipping %s: %s", rel, exc)
            continue
        defines, imports, explicit_all = _extract(
            tree, importer_module=module_name, importer_is_init=is_init
        )
        exports = explicit_all or [s.name for s in defines if not s.name.startswith("_")]
        modules.append(
            ModuleEntry(
                path=rel,
                module_name=module_name,
                defines=defines,
                imports=imports,
                exports=exports,
            )
        )
    return RepoMap(root=root, modules=modules)
الملفات التي لا نستطيع قراءتها أو التي يفشل تحليلها تُسجّل وتُتخطّى. نُرجع خريطة جزئية بدلًا من فشل التشغيل كله؛ أسوأ حالة هي أن استدعاء Scanner لا يرى حيًّا لملف واحد، وهذا لا يزال فحصًا يعمل.

كتابة وكيل Scanner

يمشي Scanner مسار الهدف، ويلتقط ملفات مصدر Python، ويطلب من Venice تحديد ثغرات ذرّية ملفًا واحدًا في كل مرة. الفحص لكل ملف يبقي التعليمة صغيرة ويجعل الإخفاقات معزولة: ملف سيئ واحد لا يقتل التشغيل كله. سنبقي التعليمة نفسها في ملف منفصل بحيث يمكن مراجعتها ومقارنتها كأي مصدر آخر. أنشئ prompts/scanner.md:
You are a static security analyst reviewing a single Python source file for
vulnerabilities. You will be given the file path, its full contents, and a
*neighborhood* slice of the surrounding repo: which other modules import
from this file (and what symbols they pull), and which in-repo symbols this
file imports from elsewhere. You must respond with a JSON object that lists
every distinct vulnerability you can identify, with concrete file:line
evidence for each.

# Rules

1. Output a single JSON object. No prose before or after. No markdown fences.
2. The object must match this schema exactly:

```json
{
  "findings": [
    {
      "id": "F-001",
      "title": "Short imperative title, e.g. 'Hardcoded session signing key'",
      "severity": "low | medium | high | critical",
      "description": "One to three sentences explaining the vulnerability and why it matters.",
      "cwe": "CWE-798 or null if not applicable",
      "evidence": {
        "file": "{filename}",
        "start_line": 12,
        "end_line": 14,
        "snippet": "the exact lines from the source, copied verbatim including whitespace"
      }
    }
  ]
}
```

3. Finding IDs must be sequential within this file: F-001, F-002, F-003, etc.
4. The `file` field in evidence must equal the filename you were given, exactly.
5. `start_line` and `end_line` must be 1-indexed line numbers from the source you were given.
6. The `snippet` must be the exact text of those lines, copied verbatim. Do not paraphrase. Do not truncate.
7. Do not invent vulnerabilities. If you are unsure, omit it. False positives waste the operator's time and erode trust in the tool.
8. Every finding's evidence must point at lines in THIS file. Do not produce findings whose evidence lives in a different file. The Chainer is the agent that reasons across files.
9. If the file contains no vulnerabilities, return `{"findings": []}`.
التعليمة الكاملة في المستودع المرجعي تحوي أيضًا قسم «What to look for» يسرد فئات الثغرات الشائعة (الأسرار المضمَّنة، حقن SQL، حقن الأوامر، SSRF، إلغاء تسلسل غير آمن، إلخ) وقسم «How to use the neighborhood» يشرح كيف ينبغي للنموذج استهلاك السياق متعدد الملفات. بعض ملاحظات تصميم التعليمة:
  • نُخبر النموذج بإصدار JSON فقط، بلا نثر أو أسوار. تدعم OpenAI SDK معامل response_format={"type": "json_object"} يفرض هذا على جانب الواجهة، لكن تعزيزه في التعليمة يقلّل الحالات الحدّية.
  • نمنع Scanner صراحة من إنتاج سلاسل بين الملفات. السلاسل وظيفة Chainer، وطلب Scanner القيام بكليهما يُشوّش المسؤولية.
  • نطلب نسخ المقطع حرفيًا. هذا يعني أن التقرير يستطيع اقتباس البايتات الدقيقة التي يدّعي النموذج رؤيتها، ويستطيع المراجع التحقق العشوائي من نتيجة بمقارنة المقطع بالمصدر.
الآن شيفرة الوكيل. أنشئ src/venice_security_reviewer/scanner.py وابدأ بمتنقّل الملفات ومحمّل التعليمة:
from __future__ import annotations

import json
import logging
from collections.abc import Iterable, Iterator
from pathlib import Path

from openai import OpenAI
from pydantic import ValidationError

from .models import Finding
from .repo_map import ModuleNeighborhood, RepoMap

logger = logging.getLogger(__name__)

DEFAULT_SOURCE_EXTENSIONS: frozenset[str] = frozenset({".py"})

SKIP_DIR_NAMES: frozenset[str] = frozenset({
    ".git", ".venv", "venv", "env", "__pycache__", "node_modules",
    "dist", "build", ".mypy_cache", ".pytest_cache", ".ruff_cache",
    "site-packages",
})

MAX_FILE_BYTES = 200_000


def _load_prompt_template(name: str) -> str:
    here = Path(__file__).resolve()
    return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8")


def iter_source_files(
    root: Path, extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS
) -> Iterator[Path]:
    exts = {e.lower() for e in extensions}
    for path in sorted(root.rglob("*")):
        if not path.is_file():
            continue
        if path.suffix.lower() not in exts:
            continue
        if any(part in SKIP_DIR_NAMES for part in path.parts):
            continue
        try:
            if path.stat().st_size > MAX_FILE_BYTES:
                logger.warning("skipping %s: exceeds %d bytes", path, MAX_FILE_BYTES)
                continue
        except OSError:
            continue
        yield path
MAX_FILE_BYTES حدّ سلامة. وراء ~200 كيلوبايت نتخطى بدلًا من إرسال تعليمة ضخمة من المرجح أنها مكلفة ومنخفضة الجودة. القطعة التالية هي باني التعليمة. يستخدم القالب {filename} و{source} و{neighborhood} كنائبات؛ نستخدم str.replace بدلًا من .format() لأن القالب يحوي أمثلة JSON بأقواس مجعّدة حرفية:
def _render_neighborhood(neighborhood: ModuleNeighborhood | None) -> str:
    if neighborhood is None:
        return "null"
    return neighborhood.model_dump_json(indent=2)


def _build_prompt(
    template: str, *, filename: str, source: str, neighborhood: ModuleNeighborhood | None
) -> str:
    return (
        template.replace("{filename}", filename)
        .replace("{source}", source)
        .replace("{neighborhood}", _render_neighborhood(neighborhood))
    )
الآن المحلّل. نفكّ JSON، ونحقّق كل نتيجة عبر Pydantic، ونُسقط النتائج المشوّهة فرديًا بدلًا من فشل الملف كله. نتيجة سيئة واحدة يجب ألّا تُضيّع علينا الجيدة:
def _parse_findings(raw: str, *, source_file: Path) -> list[Finding]:
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError(f"model did not return valid JSON: {exc}") from exc

    if not isinstance(data, dict) or "findings" not in data:
        raise ValueError("model JSON missing 'findings' key")

    findings: list[Finding] = []
    for entry in data["findings"]:
        try:
            findings.append(Finding.model_validate(entry))
        except ValidationError as exc:
            logger.warning("dropping malformed finding from %s: %s", source_file, exc)
    return findings
يُصدر Scanner معرّفات مثل F-001 لكل ملف، لكن Chainer يحتاج إلى الإشارة إلى نتائج عبر المستودع كله. نعيد إصدار المعرّفات مقابل عدّاد رتيب بحيث تكون فريدة عالميًا:
def _renumber_findings(findings: list[Finding], offset: int) -> tuple[list[Finding], int]:
    renumbered: list[Finding] = []
    for i, f in enumerate(findings):
        new_id = f"F-{offset + i + 1:03d}"
        renumbered.append(f.model_copy(update={"id": new_id}))
    return renumbered, offset + len(findings)
استدعاء فحص الملف الواحد يجمع كل هذا. نقرأ الملف، نبني التعليمة، نرسلها إلى Venice مع response_format={"type": "json_object"} ودرجة حرارة منخفضة، ونحلّل النتيجة:
def scan_file(
    client: OpenAI,
    model: str,
    path: Path,
    *,
    prompt_template: str,
    repo_root: Path,
    repo_map: RepoMap,
    max_retries: int = 1,
) -> list[Finding]:
    try:
        source = path.read_text(encoding="utf-8")
    except (OSError, UnicodeDecodeError) as exc:
        logger.warning("could not read %s: %s", path, exc)
        return []

    rel = path.relative_to(repo_root)
    neighborhood = repo_map.neighborhood(rel)
    prompt = _build_prompt(
        prompt_template, filename=str(rel), source=source, neighborhood=neighborhood
    )

    last_error: Exception | None = None
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You are a precise static security analyst. You respond "
                            "only with valid JSON matching the schema in the user prompt."
                        ),
                    },
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.1,
            )
        except Exception as exc:
            logger.warning("Venice call failed for %s on attempt %d: %s", rel, attempt, exc)
            last_error = exc
            continue

        content = response.choices[0].message.content or ""
        try:
            findings = _parse_findings(content, source_file=path)
        except ValueError as exc:
            logger.warning("parse failure for %s on attempt %d: %s", rel, attempt, exc)
            last_error = exc
            continue

        return [
            f.model_copy(update={"evidence": f.evidence.model_copy(update={"file": rel})})
            for f in findings
        ]

    logger.error("giving up on %s after %d attempts: %s", rel, max_retries + 1, last_error)
    return []
تفصيلتان جديرتان بالإبراز:
  • نُصلح مسار ملف الأدلة ليكون نسبيًا إلى repo_root بعد التحليل، لأن النموذج يردّد اسم الملف الذي أعطيناه له لكننا نريد شكلًا قانونيًا واحدًا عبر التقرير كله.
  • temperature=0.1 منخفضة عمدًا. نريد Scanner أن يكون محافظًا ومتسقًا عبر التشغيلات؛ الإبداع وظيفة Chainer.
أخيرًا، المنسّق الذي يفحص كل ملف مؤهل تحت الجذر:
def scan_path(
    client: OpenAI,
    model: str,
    root: Path,
    repo_map: RepoMap,
    *,
    extensions: Iterable[str] = DEFAULT_SOURCE_EXTENSIONS,
) -> list[Finding]:
    template = _load_prompt_template("scanner.md")
    all_findings: list[Finding] = []
    offset = 0
    for path in iter_source_files(root, extensions=extensions):
        logger.info("scanning %s", path.relative_to(root))
        findings = scan_file(
            client, model, path,
            prompt_template=template,
            repo_root=root,
            repo_map=repo_map,
        )
        renumbered, offset = _renumber_findings(findings, offset)
        all_findings.extend(renumbered)
    return all_findings
تُبنى خريطة المستودع مرة واحدة من المتصل وتُعاد استخدامها لكل ملف، بحيث يرى Scanner بنية عالمية متسقة حتى عندما تفشل ملفات فردية في التحليل أو يتم تخطّيها.

كتابة وكيل Chainer

يأخذ Chainer اتحاد نتائج Scanner بالإضافة إلى خريطة المستودع المكثّفة، ويسأل Venice ما إذا كانت أي من النتائج تتجمع في سلسلة استغلال حقيقية. توجد حمايتان محدّدتان بين LLM والتقرير:
  1. يجب أن تُشير كل سلسلة فقط إلى معرّفات نتائج أنتجها Scanner.
  2. يجب أن تدّعي كل سلسلة فقط ملفات تلامس أدلة نتيجة مشار إليها واحدة على الأقل.
السلاسل التي تنتهك أي قاعدة تُسقط وقت التحليل. هذا يمنع النموذج من هلوسة السلاسل «احتياطًا» ومن ادعاء أن سلسلة تمتد عبر ملفات لا أدلة له فيها. تعليمة Chainer في prompts/chainer.md. جوهرها يبدو هكذا:
You are a senior offensive security engineer. You are given a list of atomic
vulnerability findings discovered in a single codebase, plus a structural map
of that codebase showing every module's public symbols and import edges. Your
job is to identify whether any subset of the findings can be combined into a
real, end-to-end exploit chain.

# Rules

1. Output a single JSON object. No prose before or after. No markdown fences.
2. The object must match this schema exactly:

```json
{
  "chains": [
    {
      "id": "C-001",
      "findings": ["F-001", "F-003"],
      "narrative": "Step-by-step explanation of how an attacker combines these specific findings into a single exploit. Reference each finding by ID where it is used.",
      "severity": "high | critical",
      "files_involved": ["pkg/validators.py", "pkg/fetcher.py"]
    }
  ]
}
```

3. Chain IDs must be sequential: C-001, C-002, C-003, etc.
4. Every entry in `findings` MUST be the ID of a finding from the input list. You may NOT invent new finding IDs.
5. Every entry in `files_involved` MUST be the `evidence.file` of at least one of the findings you reference in this chain.
6. A chain must reference at least two distinct findings.
7. Chains are by definition severity high or critical. If a combination doesn't raise the impact above the highest individual severity, it is not a chain worth reporting.
8. If no real chain exists, return `{"chains": []}`. It is correct and expected for many codebases to have findings that do not chain.
التعليمة الكاملة في المستودع المرجعي تشرح أيضًا كيفية قراءة خريطة المستودع، وكيفية تقرير ما يدخل files_involved، وأهم شيء: متى لا نُسلسل. إخبار النموذج بأن «من الصحيح والمتوقع أن قواعد شيفرة كثيرة تحوي نتائج لا تتسلسل» هو ما يمنعه من اختراع سلاسل ليبدو منتجًا. الآن شيفرة الوكيل. أنشئ src/venice_security_reviewer/chainer.py:
from __future__ import annotations

import json
import logging
from pathlib import Path

from openai import OpenAI
from pydantic import ValidationError

from .models import Chain, Finding, validate_chain_references
from .repo_map import RepoMap

logger = logging.getLogger(__name__)

MAX_REPO_MAP_CHARS = 8000


def _load_prompt_template(name: str) -> str:
    here = Path(__file__).resolve()
    return (here.parents[2] / "prompts" / name).read_text(encoding="utf-8")
MAX_REPO_MAP_CHARS = 8000 سقف ناعم لكتلة خريطة المستودع المعروضة كـ JSON في تعليمة Chainer. عند نحو 4 أحرف لكل رمز، هذا ~2000 رمز، يجلس بشكل مريح داخل نافذة سياق أي نموذج Venice حتى مع وجود النتائج وميزانية السرد فوقها. نُسلسل النتائج إلى كتلة JSON مدمجة. لاحظ أننا نُجرّد snippet من الأدلة هنا عمدًا: لا يحتاج Chainer إلى بايتات خام ليُقرّر ما إذا كانت نتيجتان تتجمعان، وتضمينها يضاعف تقريبًا تكلفة الرموز على قواعد الشيفرة الواقعية:
def _findings_to_input_json(findings: list[Finding]) -> str:
    payload = [
        {
            "id": f.id,
            "title": f.title,
            "severity": f.severity,
            "description": f.description,
            "cwe": f.cwe,
            "evidence": {
                "file": str(f.evidence.file),
                "start_line": f.evidence.start_line,
                "end_line": f.evidence.end_line,
            },
        }
        for f in findings
    ]
    return json.dumps(payload, indent=2)
لقواعد الشيفرة الأكبر يمكن أن تتجاوز خريطة المستودع المكثّفة الكاملة ميزانيتنا الحرفية. عندما يحدث ذلك، نقصّ إلى الوحدات الحاملة للنتائج بالإضافة إلى جيرانها المباشرين. هذا يحفظ بنية كافية لـ Chainer للتفكير في سلاسل لدينا أدلة عليها، ويتخلى عن الباقي:
def _prune_for_budget(
    repo_map: RepoMap, findings: list[Finding], *, char_budget: int
) -> dict[str, object]:
    full = repo_map.condensed_dict()
    if len(json.dumps(full)) <= char_budget:
        return full

    finding_files = {f.evidence.file for f in findings}
    keep_modules = {
        m.module_name for m in repo_map.modules if m.path in finding_files
    }
    if not keep_modules:
        return full

    neighbours: set[str] = set()
    for m in repo_map.modules:
        if m.module_name in keep_modules:
            for edge in m.imports:
                neighbours.add(edge.from_module)
        for edge in m.imports:
            if edge.from_module in keep_modules:
                neighbours.add(m.module_name)
    keep_modules.update(neighbours)

    pruned_modules = [
        {
            "path": str(m.path),
            "module": m.module_name,
            "exports": list(m.exports),
            "imports": [
                {"from": e.from_module, "names": list(e.imported_names)}
                for e in m.imports
            ],
        }
        for m in repo_map.modules
        if m.module_name in keep_modules
    ]
    return {
        "modules": pruned_modules,
        "_pruned": True,
        "_kept": len(pruned_modules),
        "_total": len(repo_map.modules),
    }


def _render_repo_map(
    repo_map: RepoMap, findings: list[Finding], *, char_budget: int = MAX_REPO_MAP_CHARS
) -> str:
    payload = _prune_for_budget(repo_map, findings, char_budget=char_budget)
    if payload.get("_pruned"):
        logger.info(
            "chainer: repo map pruned for token budget (kept %s of %s modules)",
            payload.get("_kept"),
            payload.get("_total"),
        )
    return json.dumps(payload, indent=2)
استراتيجية القصّ بسيطة عمدًا: احتفظ بالوحدات التي تعيش فيها نتائجنا، واحتفظ بجيرانها المباشرين في رسم الاستيراد. ما هو أبعد من ذلك ليس له دور معقول في سلسلة لدينا أدلة عليها حاليًا، فيمكن إسقاطه دون فقدان قوة التفكير. نُعلِّم أيضًا الحمولة بـ_pruned و_kept و_total، بحيث تستطيع تعليمة Chainer تحذير النموذج عندما تكون الخريطة قد قُصّت. تحليل الاستجابة بنفس شكل Scanner: فكّ، حقّق كل سلسلة عبر Pydantic، أسقط المدخلات المشوّهة:
def _parse_chains(raw: str) -> list[Chain]:
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError(f"chainer did not return valid JSON: {exc}") from exc

    if not isinstance(data, dict) or "chains" not in data:
        raise ValueError("chainer JSON missing 'chains' key")

    chains: list[Chain] = []
    for entry in data["chains"]:
        try:
            chains.append(Chain.model_validate(entry))
        except ValidationError as exc:
            logger.warning("dropping malformed chain: %s", exc)
    return chains
ثم الوكيل نفسه:
def find_chains(
    client: OpenAI,
    model: str,
    findings: list[Finding],
    repo_map: RepoMap,
    *,
    max_retries: int = 1,
) -> tuple[list[Chain], list[Chain]]:
    if len(findings) < 2:
        return [], []

    template = _load_prompt_template("chainer.md")
    prompt = template.replace(
        "{findings_json}", _findings_to_input_json(findings)
    ).replace("{repo_map}", _render_repo_map(repo_map, findings))

    last_error: Exception | None = None
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You are a senior offensive security engineer. You respond "
                            "only with valid JSON matching the schema in the user prompt."
                        ),
                    },
                    {"role": "user", "content": prompt},
                ],
                response_format={"type": "json_object"},
                temperature=0.2,
            )
        except Exception as exc:
            logger.warning("Venice call failed on attempt %d: %s", attempt, exc)
            last_error = exc
            continue

        content = response.choices[0].message.content or ""
        try:
            chains = _parse_chains(content)
        except ValueError as exc:
            logger.warning("chainer parse failure on attempt %d: %s", attempt, exc)
            last_error = exc
            continue

        valid, dropped = validate_chain_references(chains, findings)
        if dropped:
            logger.warning(
                "chainer referenced %d unknown finding id(s) or file(s); chains dropped: %s",
                len(dropped),
                [c.id for c in dropped],
            )
        return valid, dropped

    logger.error("giving up on chainer after %d attempts: %s", max_retries + 1, last_error)
    return [], []
أمران يستحقّان الإشارة:
  • نخرج قبل استدعاء النموذج عندما يكون هناك أقل من نتيجتين. لا يمكنك تسلسل نتيجة واحدة، وتخطّي الاستدعاء يعني أننا لا نُحرق رموزًا على نتيجة فارغة مضمونة.
  • temperature=0.2 أعلى قليلًا من 0.1 لـ Scanner. يستفيد Chainer من لمسة إبداع إضافية لرصد التجمعات غير الواضحة، لكننا لا نزال نريده مرتكزًا على النتائج والخريطة التي أُعطي إياها.
  • بعد التحليل، يعمل validate_chain_references التحقق المتقاطع المحدّد الذي كتبناه سابقًا. كل ما ينجو آمن للعرض؛ وكل ما لا ينجو يُسجّل لذا يعرف المشغّل أن النموذج حاول اختراع شيء.
التحقق المتقاطع هذا هو القطعة الأهم في كامل المشروع. إنه الحدّ بين «أداة أمان مفيدة» و«تقرير ذكاء اصطناعي خاطئ بثقة أحيانًا». بوجوده، حتى لو هلوس النموذج، لن تصل السلسلة الخاطئة إلى التقرير أبدًا.

عرض تقرير Markdown

إبقاء العرض منفصلًا عن منطق الوكيل يعني أن كائنات Finding وChain نفسها يمكن تغذيتها لاحقًا إلى صيغ أخرى (JSON، SARIF، HTML) دون لمس Scanner أو Chainer. سنستخدم Jinja2 مع ملف قالب صغير. أنشئ src/venice_security_reviewer/templates/report.md.j2:
# Security Review Report

**Target:** `{{ target }}`
**Scanned at:** {{ scanned_at }}
**Model:** `{{ model }}`

---

## Summary

- **Atomic findings:** {{ findings | length }}
- **Exploit chains:** {{ chains | length }}
{%- if dropped_chains %}
- **Dropped chains (referenced unknown findings):** {{ dropped_chains | length }}
{%- endif %}

---

## Exploit Chains

{% if not chains %}
_No exploit chains were identified by the Chainer agent._
{% else %}
{% for c in chains %}
### {{ c.id }}{{ c.severity | upper }}

**Findings combined:** {{ c.findings | join(', ') }}
**Files involved:** {{ c.files_involved | map('string') | join(', ') }}

{{ c.narrative }}

{% endfor %}
{% endif %}

---

## Atomic Findings

{% for f in findings %}
### {{ f.id }}{{ f.title }}

- **Severity:** {{ f.severity }}
{%- if f.cwe %}
- **CWE:** {{ f.cwe }}
{%- endif %}
- **Location:** `{{ f.evidence.file }}:{{ f.evidence.start_line }}-{{ f.evidence.end_line }}`

{{ f.description }}

```
{{ f.evidence.snippet }}
```

{% endfor %}
ثم العارض في src/venice_security_reviewer/report.py:
from __future__ import annotations

from datetime import UTC, datetime
from pathlib import Path

from jinja2 import Environment, PackageLoader, select_autoescape

from .models import Chain, Finding


def _build_env() -> Environment:
    return Environment(
        loader=PackageLoader("venice_security_reviewer", "templates"),
        autoescape=select_autoescape(enabled_extensions=("html",)),
        keep_trailing_newline=True,
    )


def render_report(
    *,
    target: Path,
    model: str,
    findings: list[Finding],
    chains: list[Chain],
    dropped_chains: list[Chain] | None = None,
) -> str:
    env = _build_env()
    template = env.get_template("report.md.j2")
    return template.render(
        target=str(target),
        scanned_at=datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC"),
        model=model,
        findings=findings,
        chains=chains,
        dropped_chains=dropped_chains or [],
    )
يبقى Autoescape مُعطّلًا لقالب Markdown (Markdown ليس HTML)، لكن نتركه مُمكّنًا لأي قوالب .html مستقبلية حسب الامتداد.

ربط واجهة CLI

CLI هو المنسّق: ابنِ خريطة المستودع، افحص، سلسل، اعرض. سنستخدم Typer لتولّي تحليل الوسائط و Rich لطباعة جدول ملخص جميل. أنشئ src/venice_security_reviewer/cli.py:
from __future__ import annotations

import logging
import sys
from pathlib import Path
from typing import Annotated

import typer
from rich.console import Console
from rich.table import Table

from .chainer import find_chains
from .client import VeniceConfigError, build_client
from .models import Chain, Finding
from .repo_map import build_repo_map
from .report import render_report
from .scanner import scan_path

app = typer.Typer(
    add_completion=False,
    help="Two-agent security code reviewer powered by Venice AI.",
    no_args_is_help=True,
)
console = Console()


@app.callback()
def _root() -> None:
    """Force Typer to keep `scan` as a named subcommand."""


def _configure_logging(verbose: bool) -> None:
    logging.basicConfig(
        level=logging.DEBUG if verbose else logging.INFO,
        format="%(levelname)s %(name)s: %(message)s",
        stream=sys.stderr,
    )


def _print_summary(
    findings: list[Finding], chains: list[Chain], dropped: list[Chain]
) -> None:
    table = Table(title="Scan summary", show_header=True, header_style="bold")
    table.add_column("Metric")
    table.add_column("Count", justify="right")
    table.add_row("Atomic findings", str(len(findings)))
    table.add_row("Exploit chains", str(len(chains)))
    if dropped:
        table.add_row("Chains dropped (bad refs)", str(len(dropped)))
    console.print(table)


@app.command()
def scan(
    path: Annotated[
        Path,
        typer.Argument(
            exists=True, file_okay=False, dir_okay=True, readable=True, resolve_path=True,
            help="Path to the codebase to scan.",
        ),
    ],
    out: Annotated[
        Path, typer.Option("--out", "-o", help="Where to write the Markdown report.")
    ] = Path("report.md"),
    verbose: Annotated[
        bool, typer.Option("--verbose", "-v", help="Enable debug logging.")
    ] = False,
) -> None:
    """Scan a codebase for vulnerabilities and exploit chains."""
    _configure_logging(verbose)

    try:
        client, model = build_client()
    except VeniceConfigError as exc:
        console.print(f"[red]error:[/red] {exc}")
        raise typer.Exit(code=2) from exc

    console.print(f"[bold]Indexing[/bold] {path} (AST repo map)...")
    repo_map = build_repo_map(path)
    edge_count = sum(len(m.imports) for m in repo_map.modules)
    console.print(
        f"Repo map: [bold]{len(repo_map.modules)}[/bold] module(s), "
        f"[bold]{edge_count}[/bold] import edge(s)."
    )

    console.print(f"[bold]Scanning[/bold] {path} with model [cyan]{model}[/cyan]...")
    findings = scan_path(client, model, path, repo_map)
    console.print(f"Scanner produced [bold]{len(findings)}[/bold] finding(s).")

    console.print("[bold]Chaining[/bold] findings...")
    chains, dropped = find_chains(client, model, findings, repo_map)
    console.print(f"Chainer produced [bold]{len(chains)}[/bold] chain(s).")

    report = render_report(
        target=path, model=model,
        findings=findings, chains=chains, dropped_chains=dropped,
    )
    out.write_text(report, encoding="utf-8")
    console.print(f"Report written to [green]{out}[/green]")
    _print_summary(findings, chains, dropped)


def main() -> None:
    app()


if __name__ == "__main__":
    main()
أضف نقطة دخول السكربت إلى pyproject.toml:
[project.scripts]
venice-security-reviewer = "venice_security_reviewer.cli:app"
هذا هو خط الأنابيب بأكمله موصولًا.

اختبار الحمايات

اعتمدنا بشدّة على فكرة واحدة طوال هذا البناء: الحمايات المحدّدة هي ما يفصل أداة أمان مفيدة عن أداة خاطئة بثقة. هذا الادعاء يستحق القول فقط إذا استطعنا إثبات أن الحمايات تصمد فعلًا، لذا فإن أكثر الاختبارات قيمة في هذا المشروع لا تستدعي Venice أبدًا. إنها تُحكم حدود Pydantic وأنابيب تجميع التعليمات، مما يعني أنها تعمل خارج الاتصال، في أجزاء من الألفية، بلا مفتاح API وبلا تكلفة رموز. أضف تبعيات التطوير أولًا:
uv add --dev "pytest>=8.3" "ruff>=0.7" "mypy>=1.13"
أول شيء يستحق الاختبار هو حدود النموذج نفسها. تؤكّد هذه الاختبارات أن النتائج والسلاسل المشوّهة تُرفض وقت الإنشاء، قبل أن تتمكّن أبدًا من الوصول إلى تقرير. أنشئ tests/test_models.py:
from __future__ import annotations

from pathlib import Path

import pytest
from pydantic import ValidationError

from venice_security_reviewer.models import (
    Chain,
    Evidence,
    Finding,
    validate_chain_references,
)


def _finding(fid: str) -> Finding:
    return Finding(
        id=fid,
        title="t",
        severity="medium",
        description="d",
        evidence=Evidence(file=Path("a.py"), start_line=1, end_line=2, snippet="x"),
    )


def test_evidence_rejects_inverted_line_range() -> None:
    with pytest.raises(ValidationError):
        Evidence(file=Path("a.py"), start_line=10, end_line=5, snippet="x")


def test_finding_id_pattern_enforced() -> None:
    with pytest.raises(ValidationError):
        Finding(
            id="not-an-id",
            title="t",
            severity="medium",
            description="d",
            evidence=Evidence(file=Path("a.py"), start_line=1, end_line=2, snippet="x"),
        )


def test_chain_requires_two_findings() -> None:
    with pytest.raises(ValidationError):
        Chain(
            id="C-001",
            findings=["F-001"],
            narrative="n",
            severity="high",
            files_involved=[Path("a.py")],
        )
كل واحد من هذه يعكس قيدًا فرضناه على النماذج سابقًا: نطاق أسطر مقلوب، معرّف لا يطابق نمط F-###، و«سلسلة» من نتيجة واحدة. إذا توقّف أي منها عن إطلاق استثناء، فإن فئة كاملة من الهلوسة قد أصبحت ممكنة مرة أخرى بهدوء. أهم اختبار يغطي محقّق المرجع المتبادل، لأنها الدالة التي تُسقط فعلًا السلاسل المخترعة:
def test_validate_chain_references_drops_unknown_ids() -> None:
    findings = [_finding("F-001"), _finding("F-002")]
    good = Chain(
        id="C-001",
        findings=["F-001", "F-002"],
        narrative="n",
        severity="critical",
        files_involved=[Path("a.py")],
    )
    bad = Chain(
        id="C-002",
        findings=["F-001", "F-999"],
        narrative="n",
        severity="critical",
        files_involved=[Path("a.py")],
    )
    valid, dropped = validate_chain_references([good, bad], findings)
    assert [c.id for c in valid] == ["C-001"]
    assert [c.id for c in dropped] == ["C-002"]
F-999 لم يُنتجها Scanner أبدًا، لذا فالسلسلة التي تُشير إليها تنتهي في dropped ولا تصل أبدًا إلى التقرير. الاختبار المرافق في المستودع المرجعي، test_validate_chain_references_drops_unknown_files، يفعل الشيء نفسه لسلسلة تدّعي ملفًا لم يأتِ منه أي من نتائجها. الشيء الثاني الذي يستحق الاختبار هو الأنابيب التي تُغذّي Chainer. من السهل إعادة هيكلة تجميع التعليمة والتوقف بصمت عن تمرير السياق المتعدد الملفات، عند هذه النقطة سيستمر Chainer في العمل لكنه سيصبح أسوأ بهدوء. يبني هذا الاختبار ثبت وحدتين، يعرض التعليمة، ويؤكّد أن المعلومات بين الملفات موجودة فعلًا، مرة أخرى بلا جولة إلى Venice. أنشئ tests/test_cross_file_chain.py:
from __future__ import annotations

from pathlib import Path

from venice_security_reviewer.chainer import (
    _findings_to_input_json,
    _load_prompt_template,
    _render_repo_map,
)
from venice_security_reviewer.models import Evidence, Finding
from venice_security_reviewer.repo_map import build_repo_map


def _write(root: Path, rel: str, content: str) -> None:
    path = root / rel
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content, encoding="utf-8")


def test_chainer_prompt_carries_cross_file_context(tmp_path: Path) -> None:
    _write(tmp_path, "validators.py", "def is_safe_url(url: str) -> bool:\n    return True")
    _write(
        tmp_path,
        "fetcher.py",
        "from .validators import is_safe_url\n\ndef fetch(url: str) -> bytes:\n    return b''",
    )

    rmap = build_repo_map(tmp_path)
    findings = [
        Finding(
            id="F-001",
            title="Validator returns True unconditionally",
            severity="low",
            description="The validator always returns True.",
            evidence=Evidence(
                file=Path("validators.py"), start_line=1, end_line=2, snippet="..."
            ),
        ),
        Finding(
            id="F-002",
            title="Fetcher trusts a stub validator",
            severity="low",
            description="The fetcher gates network access on is_safe_url.",
            evidence=Evidence(
                file=Path("fetcher.py"), start_line=1, end_line=1, snippet="..."
            ),
        ),
    ]

    template = _load_prompt_template("chainer.md")
    prompt = template.replace(
        "{findings_json}", _findings_to_input_json(findings)
    ).replace("{repo_map}", _render_repo_map(rmap, findings))

    assert "{findings_json}" not in prompt and "{repo_map}" not in prompt
    assert "F-001" in prompt and "F-002" in prompt
    assert "validators.py" in prompt and "fetcher.py" in prompt
    assert "is_safe_url" in prompt
إذا نجح هذا الاختبار، فإن Chainer يُسلَّم تعليمة تحتوي على كلتا النتيجتين، وكلا مسارَي الملفين، وحافة الاستيراد بينهما. ما إذا كان النموذج يستخدم تلك المعلومات بشكل جيد هو تقييم منفصل خارج النطاق؛ يحرس هذا الاختبار فقط الأنابيب التي تُدخل المعلومات إلى التعليمة في المقام الأول. شغّل مجموعة الاختبارات كلها، مع المُدقّق ومُتحقّق الأنواع، بـ:
uv run pytest          # اختبارات بلا اتصال، بلا استدعاءات Venice حية
uv run ruff check .
uv run mypy src/
ولأن أيًا من هذه الاختبارات لا يلامس الشبكة، فهي آمنة للتشغيل عند كل التزام وفي CI دون حرق رموز أو الحاجة إلى مفتاح Venice. يتضمن المستودع المرجعي أيضًا tests/test_scanner_parse.py و tests/test_chainer_parse.py و tests/test_repo_map.py، التي تغطي حالات حدّية لتحليل JSON (المدخلات المشوّهة تُسقط بدلًا من إعطاب التشغيل) وباني خريطة المستودع بـ AST.

تشغيل المشروع

لتجربته على قاعدة شيفرة حقيقية، وجّه CLI إلى دليل مصدر Python:
uv run venice-security-reviewer scan path/to/your/code
أو ثبّته في بيئتك الافتراضية بـ pip install -e . ثم نفّذ venice-security-reviewer scan path/to/your/code. يبدو الإخراج هكذا تقريبًا:
Indexing /path/to/code (AST repo map)...
Repo map: 6 module(s), 14 import edge(s).
Scanning /path/to/code with model zai-org-glm-5...
Scanner produced 4 finding(s).
Chaining findings...
Chainer produced 1 chain(s).
Report written to report.md
                Scan summary
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━┓
┃ Metric                    ┃ Count ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━┩
│ Atomic findings           │     4 │
│ Exploit chains            │     1 │
└───────────────────────────┴───────┘
يُظهر تقرير Markdown كل سلسلة في الأعلى مع سردها، ثم كل نتيجة فردية تحتها مع الشدّة و CWE وموقع الملف والوصف والمقطع الحرفي الذي يدّعي النموذج قراءته. يُرفق المستودع المرجعي أيضًا بأربعة أهداف تجريبية مجمّعة كلّها تُمارس شكلًا مختلفًا من التفكير الذي يجب على Chainer القيام به:
  • examples/vulnerable_app — تطبيق Flask متعدد الملفات بثلاث نتائج «منخفضة»، تتجمع اثنتان منها في سلسلة تصعيد امتيازات حرجة عبر الملفات. يختبر ما إذا كان Chainer انتقائيًا فيما يجمعه.
  • examples/url_preview — جالب URL متعدد الملفات بقائمة سماح دفاعية لا تُطبّق لكل تكرار. يختبر التفكير بتدفق البيانات بين الملفات مدمجًا مع طوبولوجيا النشر (عناوين IP المحلية الرابط هي بوابات اعتماد سحابية).
  • examples/csv_query — مصفاة CSV من ملف واحد بهروب صندوق eval عبر __class__.__base__.__subclasses__(). يختبر التفكير على مستوى اللغة بدلًا من تدفق HTTP.
  • examples/webhook_handler — متحقّق HMAC من ملف واحد بثغرة تفاضلية في محلل JSON. يختبر التفكير عبر مواصفات متعددة.
جرّبها بـ:
uv run venice-security-reviewer scan examples/vulnerable_app
uv run venice-security-reviewer scan examples/csv_query
إذا رأيت CLI يسجّل chainer referenced N unknown finding id(s) or file(s); chains dropped، فهذا محقّق المرجع المتبادل يُمسك النموذج متلبسًا باختراع سلسلة. السلاسل المُسقطة لا تصل أبدًا إلى التقرير؛ تحصل فقط على تحذير يمكنك استخدامه لتعديل التعليمة أو أخذ عينات تشغيلات Chainer إضافية.

توسيع هذا المثال

شكل الوكيلين يعمّم بشكل جيد. بعض الاتجاهات الجديرة بالاستكشاف:
  • لغات أكثر. Scanner لا يتقيّد بلغة على مستوى التعليمة؛ باني AST هو ما يتقيّد بـ Python. بدّل إلى tree-sitter ويمكنك بناء نفس أشكال الحيّ/الخريطة المكثّفة لـ TypeScript أو Go أو Rust.
  • وكيل ثالث للإصلاحات. بمجرد أن تكون لديك سلسلة، طلب وكيل Patcher صياغة diff موحّد يُعطّل إحدى النتائج المكوّنة هو خطوة صغيرة. حقّق الـ diff عبر Pydantic مقابل نفس مجموعة ملفات الأدلة وستحصل على نفس حارس الهلوسة مجانًا.
  • صيغ الإخراج. render_report هو المكان الوحيد الذي يعرف عن Markdown. أضف عارض SARIF ويمكن للنتائج نفسها أن تُسقط في فحص شيفرة GitHub. أضف عارض JSON ويمكنك توجيه النتائج إلى نظام مصبّ.
  • التخزين المؤقت بهاش الملف. استدعاءات Scanner لكل ملف مستقلة ومتعادية. التخزين المؤقت بـ (file_hash, prompt_hash, model) يعني أن إعادة فحص مستودع تغيّر فيه ملف واحد تعيد تشغيل Scanner فقط على ذلك الملف.
  • أخذ عينات لـ Chainer. للتشغيلات عالية المخاطر، استدعِ Chainer N مرة بدرجة حرارة أعلى قليلًا واقطع النتائج. السلاسل التي يجدها النموذج باستمرار من المرجح أن تكون حقيقية؛ السلاسل التي يجدها مرة ولا يجدها مرة أخرى من المرجح أنها ضوضاء.
  • نماذج أقوى. zai-org-glm-5 هو الافتراضي لأنه يحقّق توازنًا جيدًا بين التكلفة والجودة للتفكير التوافقي، لكن لقواعد الشيفرة الأصعب يمكن أن يجعل تبديل نموذج Venice أقوى (عبر VENICE_MODEL) سرديات Chainer أكثر إحكامًا بشكل ملحوظ.

الختام

شكرًا لقراءتك! آمل أن يكون هذا قد ساعدك على فهم كيفية بناء أداة أمان بالذكاء الاصطناعي جديرة بالثقة فعلًا. النمط الذي استخدمناه هنا يعمّم خارج الأمان أيضًا: في أي وقت تريد فيه LLM التفكير عبر الملفات بطريقة يجب أن ترتكز على أدلة حقيقية، الوصفة هي نفسها. ابنِ خريطة بنيوية محدّدة، سلّم النموذج شريحة منها تتسع للسياق، حقّق مراجع النموذج مقابل البنية، وأسقط أي شيء لا يستطيع ارتكازه. باستخدام Python مع Venice AI API، يمكننا بناء وكلاء يجمعون بين تفكير LLM وحدود تحقق صارمة، ونشحن شيئًا يعطي إجابة مفيدة بدلًا من إجابة تبدو واثقة.