export VENICE_API_KEY=<my-key>
我们要构建什么
参考实现是一个小型 Python 项目,包含几个明确的部分:| 部分 | 功能 |
|---|---|
| CLI | 接受研究主题、模型、提供商、深度设置、输出路径和工件目录 |
| Venice 客户端 | 调用聊天补全、流式聊天补全和 POST /augment/scrape |
| 搜索层 | 默认搜索 DuckDuckGo,可选 arXiv 论文发现 |
| 数据模型 | 跟踪源 URL、规范 URL、chunk、证据、注释、错误和报告 |
| 研究 agent | 规划搜索、读取源、提取证据、分析漏洞、生成后续查询,并撰写最终报告 |
| 工件写入器 | 存储查询、研究漏洞、结果、获取、chunk、源注释、报告草案、错误和报告的可审计 JSONL 记录 |
- 让 Venice 为该主题生成多样化的搜索查询。
- 使用一个或多个提供商搜索 web。
- 在读取之前对 URL 进行去重。
- 使用 Venice 的 scrape 端点将每个公共源页面转换为 Markdown。
- 将长页面拆分为 chunk。
- 让 Venice 从每个 chunk 中提取证据。
- 让 Venice 将 chunk 证据转换为源注释。
- 在生成后续查询之前识别研究漏洞和源平衡问题。
- 让 Venice 综合最终报告,并附带脚注式引用。
设置项目
参考项目使用 Python 3.13 和uv,但相同的代码也可以与普通的虚拟环境一起使用。
创建一个新项目:
mkdir venice-research-agent
cd venice-research-agent
uv init
uv add httpx beautifulsoup4 python-dotenv
pip,请创建虚拟环境并安装相同的包:
python -m venv .venv
source .venv/bin/activate
pip install "httpx>=0.28.0" "beautifulsoup4>=4.13.0" "python-dotenv>=1.0.0"
.env 文件:
VENICE_API_KEY=your_venice_api_key_here
VENICE_MODEL=openai-gpt-55
VENICE_MODEL 以便您可以在不编辑代码的情况下更改模型。参考实现当前默认为 openai-gpt-55,但您可以将其换为您的 Venice 账户可用的另一个聊天模型。
创建数据模型
在编写 agent 逻辑之前,我们将定义流经流水线的对象。这些模型使代码的其余部分更容易推理,因为每个源都携带来源信息:它来自哪里、哪个查询找到了它、什么时候获取的、以及它是如何被分块的。 创建research_agent/models.py:
from __future__ import annotations
import hashlib
from dataclasses import dataclass, field
from datetime import UTC, datetime
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
TRACKING_PARAMS = {
"fbclid",
"gclid",
"igshid",
"mc_cid",
"mc_eid",
"msclkid",
"ref",
"ref_src",
}
@dataclass(frozen=True)
class SearchResult:
title: str
url: str
snippet: str
query: str = ""
rank: int = 0
provider: str = "duckduckgo"
canonical_url: str = ""
def __post_init__(self) -> None:
if not self.canonical_url:
object.__setattr__(self, "canonical_url", canonicalize_url(self.url))
@dataclass(frozen=True)
class ScrapeResult:
url: str
content: str
title: str = ""
final_url: str = ""
content_type: str = "text/markdown"
@dataclass(frozen=True)
class TextChunk:
chunk_id: str
text: str
start: int
end: int
content_hash: str
@dataclass(frozen=True)
class WebPage:
title: str
url: str
text: str
final_url: str = ""
canonical_url: str = ""
content_type: str = ""
retrieved_at: str = ""
content_hash: str = ""
chunks: tuple[TextChunk, ...] = field(default_factory=tuple)
def __post_init__(self) -> None:
final_url = self.final_url or self.url
object.__setattr__(self, "final_url", final_url)
if not self.canonical_url:
object.__setattr__(self, "canonical_url", canonicalize_url(final_url))
if not self.retrieved_at:
object.__setattr__(self, "retrieved_at", utc_now())
if not self.content_hash:
object.__setattr__(self, "content_hash", content_hash(self.text))
@dataclass(frozen=True)
class EvidenceChunk:
chunk_id: str
text: str
summary: str
quotes: tuple[str, ...] = field(default_factory=tuple)
@dataclass(frozen=True)
class SourceNote:
source_id: str
title: str
url: str
query: str
summary: str
canonical_url: str = ""
final_url: str = ""
rank: int = 0
snippet: str = ""
provider: str = "duckduckgo"
retrieved_at: str = ""
content_type: str = ""
content_hash: str = ""
chunks: tuple[EvidenceChunk, ...] = field(default_factory=tuple)
canonical_url、content_hash 和 chunks。
canonical_url 让 agent 避免在搜索结果仅在跟踪参数或片段上不同的情况下重复读取相同的源。content_hash 帮助即使页面位于不同的 URL 也能捕获重复页面。chunks 让我们将长页面汇总为较小的部分,而不是因上下文限制而丢失有用的证据。
在数据类下方添加辅助函数:
def utc_now() -> str:
return datetime.now(UTC).isoformat()
def content_hash(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def canonicalize_url(raw_url: str) -> str:
if not raw_url:
return ""
parsed = urlparse(raw_url.strip())
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
return ""
scheme = parsed.scheme.lower()
netloc = parsed.netloc.lower()
path = parsed.path or "/"
if path != "/":
path = path.rstrip("/")
query_pairs = [
(key, value)
for key, value in parse_qsl(parsed.query, keep_blank_values=True)
if not _is_tracking_param(key)
]
query = urlencode(sorted(query_pairs), doseq=True)
return urlunparse((scheme, netloc, path, "", query, ""))
def chunk_text(text: str, *, chunk_chars: int = 3000, overlap: int = 250) -> tuple[TextChunk, ...]:
clean = text.strip()
if not clean:
return ()
if chunk_chars <= 0:
raise ValueError("chunk_chars must be greater than 0")
if overlap < 0 or overlap >= chunk_chars:
raise ValueError("overlap must be at least 0 and smaller than chunk_chars")
chunks: list[TextChunk] = []
start = 0
index = 1
while start < len(clean):
end = min(len(clean), start + chunk_chars)
chunk = clean[start:end].strip()
if chunk:
chunks.append(
TextChunk(
chunk_id=f"C{index}",
text=chunk,
start=start,
end=end,
content_hash=content_hash(chunk),
)
)
index += 1
if end == len(clean):
break
start = end - overlap
return tuple(chunks)
def _is_tracking_param(key: str) -> bool:
lowered = key.lower()
return lowered.startswith("utm_") or lowered in TRACKING_PARAMS
构建 Venice 客户端
接下来,我们将创建一个小型 Venice 客户端。由于 Venice 与 OpenAI 兼容,您可以使用 OpenAI Python SDK 进行聊天补全,但参考实现直接使用httpx,以便相同的客户端可以调用 Venice 的 POST /augment/scrape 端点。
创建 research_agent/venice.py:
from __future__ import annotations
import json
import os
import time
from dataclasses import dataclass
from typing import Any
import httpx
from .models import ScrapeResult
DEFAULT_BASE_URL = "https://api.venice.ai/api/v1"
DEFAULT_MODEL = "openai-gpt-55"
RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}
class VeniceError(RuntimeError):
"""Raised when the Venice API returns an unusable response."""
@dataclass(frozen=True)
class VeniceClient:
api_key: str
model: str = DEFAULT_MODEL
base_url: str = DEFAULT_BASE_URL
timeout: float = 60.0
max_retries: int = 2
backoff_seconds: float = 1.0
@classmethod
def from_env(cls, model: str | None = None, *, max_retries: int = 2) -> "VeniceClient":
api_key = os.getenv("VENICE_API_KEY")
if not api_key:
raise VeniceError("VENICE_API_KEY is required.")
return cls(
api_key=api_key,
model=model or os.getenv("VENICE_MODEL", DEFAULT_MODEL),
base_url=os.getenv("VENICE_BASE_URL", DEFAULT_BASE_URL).rstrip("/"),
max_retries=max_retries,
)
from_env() 辅助函数将密钥保持在源代码之外。它还使本地开发方便,因为 python-dotenv 可以从 .env 加载 VENICE_API_KEY 和 VENICE_MODEL。
现在添加聊天补全:
def chat(
self,
messages: list[dict[str, str]],
*,
temperature: float = 0.2,
max_tokens: int = 1600,
) -> str:
payload: dict[str, Any] = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
}
data = self._post_json("/chat/completions", payload)
try:
return data["choices"][0]["message"]["content"].strip()
except (KeyError, IndexError, TypeError) as exc:
raise VeniceError(f"Unexpected Venice API response: {data}") from exc
def chat_stream(
self,
messages: list[dict[str, str]],
*,
temperature: float = 0.2,
max_tokens: int = 1600,
) -> str:
payload: dict[str, Any] = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
"stream": True,
}
return self._post_chat_stream("/chat/completions", payload).strip()
def scrape(self, url: str) -> ScrapeResult:
data = self._post_json("/augment/scrape", {"url": url})
content = _first_string(data, "content", "markdown", "text")
if not content:
raise VeniceError(f"Unexpected Venice scrape response: {data}")
return ScrapeResult(
url=url,
final_url=_first_string(data, "final_url", "url", "source_url") or url,
title=_first_string(data, "title"),
content=content,
content_type="text/markdown",
)
def _post_json(self, path: str, payload: dict[str, Any]) -> dict[str, Any]:
for attempt in range(self.max_retries + 1):
try:
response = httpx.post(
f"{self.base_url}{path}",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
json=payload,
timeout=self.timeout,
)
if response.status_code in RETRYABLE_STATUS_CODES and attempt < self.max_retries:
time.sleep(self.backoff_seconds * (2**attempt))
continue
response.raise_for_status()
data = response.json()
if not isinstance(data, dict):
raise VeniceError(f"Unexpected Venice API response: {data}")
return data
except httpx.HTTPError as exc:
if attempt < self.max_retries:
time.sleep(self.backoff_seconds * (2**attempt))
continue
raise VeniceError(f"Could not reach Venice API: {exc}") from exc
raise VeniceError("Could not reach Venice API")
def _first_string(data: dict[str, Any], *keys: str) -> str:
for key in keys:
value = data.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
for nested_key in ("data", "result", "scrape"):
nested = data.get(nested_key)
if isinstance(nested, dict):
value = _first_string(nested, *keys)
if value:
return value
return ""
_post_chat_stream() 辅助函数,它从流式聊天补全读取服务器发送事件。您可以先不使用流式传输开始,然后在研究流程的其余部分工作后再添加它。
添加搜索提供商
搜索层有两个工作:找到源 URL 并通过 Venice scraper 获取这些 URL。参考实现使用 DuckDuckGo 的 HTML 端点进行一般网页搜索,使用 arXiv 的 Atom API 进行论文搜索。 创建research_agent/web.py:
from __future__ import annotations
import re
import xml.etree.ElementTree as ET
from collections.abc import Callable, Iterable
from urllib.parse import parse_qs, unquote, urlparse
import httpx
from bs4 import BeautifulSoup
from .models import ScrapeResult, SearchResult, TextChunk, WebPage, canonicalize_url, chunk_text, content_hash, utc_now
USER_AGENT = "venice-research-agent-demo/0.1 (+https://venice.ai)"
class SearchProvider:
name = "provider"
def search(self, web: "WebSearch", query: str, limit: int) -> list[SearchResult]:
raise NotImplementedError
class DuckDuckGoProvider(SearchProvider):
name = "duckduckgo"
def search(self, web: "WebSearch", query: str, limit: int) -> list[SearchResult]:
response = web.get("https://duckduckgo.com/html/", params={"q": query})
soup = BeautifulSoup(response.text, "html.parser")
results: list[SearchResult] = []
seen_urls: set[str] = set()
for node in soup.select(".result"):
link = node.select_one(".result__a")
if link is None:
continue
url = _normalize_duckduckgo_url(link.get("href", ""))
canonical_url = canonicalize_url(url)
if not canonical_url or canonical_url in seen_urls:
continue
snippet = node.select_one(".result__snippet")
results.append(
SearchResult(
title=_clean_text(link.get_text(" ", strip=True)),
url=url,
snippet=_clean_text(snippet.get_text(" ", strip=True) if snippet else ""),
query=query,
rank=len(results) + 1,
provider=self.name,
canonical_url=canonical_url,
)
)
seen_urls.add(canonical_url)
if len(results) >= limit:
break
return results
class ArxivProvider(SearchProvider):
name = "arxiv"
def search(self, web: "WebSearch", query: str, limit: int) -> list[SearchResult]:
response = web.get(
"https://export.arxiv.org/api/query",
params={
"search_query": f"all:{query}",
"start": 0,
"max_results": limit,
"sortBy": "relevance",
},
)
namespace = {"atom": "http://www.w3.org/2005/Atom"}
root = ET.fromstring(response.text)
results: list[SearchResult] = []
for entry in root.findall("atom:entry", namespace):
title = _clean_text(_xml_text(entry.find("atom:title", namespace)))
summary = _clean_text(_xml_text(entry.find("atom:summary", namespace)))
url = _xml_text(entry.find("atom:id", namespace)).strip()
canonical_url = canonicalize_url(url)
if not url or not canonical_url:
continue
results.append(
SearchResult(
title=title or url,
url=url,
snippet=summary,
query=query,
rank=len(results) + 1,
provider=self.name,
canonical_url=canonical_url,
)
)
if len(results) >= limit:
break
return results
WebSearch 类协调提供商和获取页面:
class WebSearch:
def __init__(
self,
timeout: float = 15.0,
*,
providers: Iterable[SearchProvider] | None = None,
chunk_chars: int = 3000,
scraper: Callable[[str], ScrapeResult] | None = None,
) -> None:
self._client = httpx.Client(
timeout=timeout,
follow_redirects=True,
headers={"User-Agent": USER_AGENT},
)
self.providers = tuple(providers or (DuckDuckGoProvider(),))
self.chunk_chars = chunk_chars
self.scraper = scraper
@classmethod
def from_provider_names(cls, provider_names: Iterable[str], **kwargs: object) -> "WebSearch":
providers = [_provider_from_name(name) for name in provider_names]
return cls(providers=providers, **kwargs)
def search(self, query: str, limit: int = 5) -> list[SearchResult]:
results: list[SearchResult] = []
seen_urls: set[str] = set()
for provider in self.providers:
for result in provider.search(self, query, limit):
if result.canonical_url in seen_urls:
continue
results.append(result)
seen_urls.add(result.canonical_url)
return results
def fetch(self, result: SearchResult) -> WebPage:
if self.scraper is None:
raise RuntimeError("WebSearch.fetch requires a Venice scrape function.")
scraped = self.scraper(result.url)
text = scraped.content.strip() or result.snippet
chunks = self._chunk_text(text)
return WebPage(
title=scraped.title or result.title,
url=result.url,
final_url=scraped.final_url or scraped.url or result.url,
canonical_url=canonicalize_url(scraped.final_url or result.url),
text=text,
content_type=scraped.content_type or "text/markdown",
retrieved_at=utc_now(),
content_hash=content_hash(text),
chunks=chunks,
)
def get(self, url: str, *, params: dict[str, object] | None = None) -> httpx.Response:
response = self._client.get(url, params=params)
response.raise_for_status()
return response
def close(self) -> None:
self._client.close()
def __enter__(self) -> "WebSearch":
return self
def __exit__(self, *_: object) -> None:
self.close()
def _chunk_text(self, text: str) -> tuple[TextChunk, ...]:
overlap = min(250, max(0, self.chunk_chars // 10))
return chunk_text(text, chunk_chars=self.chunk_chars, overlap=overlap)
def _normalize_duckduckgo_url(raw_url: str) -> str:
if not raw_url:
return ""
parsed = urlparse(raw_url)
if parsed.netloc.endswith("duckduckgo.com") and parsed.path == "/l/":
target = parse_qs(parsed.query).get("uddg", [""])[0]
return unquote(target)
if parsed.scheme in {"http", "https"}:
return raw_url
return ""
def _provider_from_name(name: str) -> SearchProvider:
normalized = name.strip().lower()
if normalized in {"duckduckgo", "ddg", "web"}:
return DuckDuckGoProvider()
if normalized == "arxiv":
return ArxivProvider()
raise ValueError(f"Unknown source provider: {name}")
def _clean_text(value: str) -> str:
return re.sub(r"\s+", " ", value).strip()
def _xml_text(node: ET.Element | None) -> str:
return "" if node is None or node.text is None else node.text
写入本地工件
对于研究工作流,可审计性很重要。如果最终报告说了一些令人惊讶的事情,您应该能够检查哪个源导致了它。 创建research_agent/artifacts.py:
from __future__ import annotations
import json
from dataclasses import asdict, is_dataclass
from pathlib import Path
from typing import Any
class ArtifactWriter:
def __init__(self, root: Path | None = None) -> None:
self.root = root
if self.root is not None:
self.root.mkdir(parents=True, exist_ok=True)
@property
def enabled(self) -> bool:
return self.root is not None
def write(self, kind: str, record: object) -> None:
if self.root is None:
return
path = self.root / f"{kind}.jsonl"
payload = json.dumps(_to_jsonable(record), ensure_ascii=False, sort_keys=True)
with path.open("a", encoding="utf-8") as file:
file.write(f"{payload}\n")
def _to_jsonable(value: object) -> Any:
if is_dataclass(value):
return _to_jsonable(asdict(value))
if isinstance(value, Path):
return str(value)
if isinstance(value, dict):
return {str(key): _to_jsonable(item) for key, item in value.items()}
if isinstance(value, (list, tuple)):
return [_to_jsonable(item) for item in value]
return value
构建研究 Agent
现在我们有了 Venice、搜索、模型和工件,我们可以构建实际的 agent。 创建research_agent/agent.py:
from __future__ import annotations
import json
from collections.abc import Callable
from textwrap import dedent
from .artifacts import ArtifactWriter
from .models import CollectionError, EvidenceChunk, ResearchReport, SearchResult, SourceNote, WebPage, utc_now
from .venice import VeniceClient, VeniceError
from .web import WebSearch
SYSTEM_PROMPT = """You are a careful research assistant.
Use the supplied source material only when making factual claims.
Flag uncertainty, contradictions, and missing context instead of filling gaps."""
ProgressCallback = Callable[[str], None]
DEFAULT_ITERATIONS = 3
DEFAULT_QUERY_COUNT = 6
DEFAULT_RESULTS_PER_QUERY = 4
DEFAULT_MAX_SOURCES = 40
DEFAULT_MAX_CHUNKS_PER_SOURCE = 6
models.py 中添加两个最终数据类:
@dataclass(frozen=True)
class CollectionError:
stage: str
message: str
query: str = ""
url: str = ""
source_id: str = ""
provider: str = ""
@dataclass(frozen=True)
class ResearchReport:
topic: str
markdown: str
sources: list[SourceNote]
artifacts_dir: str | None = None
ResearchAgent:
class ResearchAgent:
def __init__(
self,
venice: VeniceClient,
web: WebSearch | None = None,
artifacts: ArtifactWriter | None = None,
progress: ProgressCallback | None = None,
max_sources: int | None = DEFAULT_MAX_SOURCES,
max_chunks_per_source: int = DEFAULT_MAX_CHUNKS_PER_SOURCE,
) -> None:
self.venice = venice
self.web = web or WebSearch(scraper=venice.scrape)
self.artifacts = artifacts or ArtifactWriter()
self.progress = progress or (lambda _: None)
self.max_sources = max_sources
self.max_chunks_per_source = max_chunks_per_source
run() 方法协调研究 pass:
def run(
self,
topic: str,
*,
iterations: int = DEFAULT_ITERATIONS,
query_count: int = DEFAULT_QUERY_COUNT,
results_per_query: int = DEFAULT_RESULTS_PER_QUERY,
) -> ResearchReport:
notes: list[SourceNote] = []
seen_source_keys: set[str] = set()
seen_content_hashes: set[str] = set()
queries = self._initial_queries(topic, query_count)
self.artifacts.write("queries", {"stage": "initial", "topic": topic, "queries": queries})
for iteration in range(1, iterations + 1):
self.progress(f"Research pass {iteration}/{iterations}: {', '.join(queries)}")
self._collect_notes(
topic,
queries,
results_per_query,
seen_source_keys,
seen_content_hashes,
notes,
iteration,
)
if iteration < iterations:
gaps, queries = self._gap_follow_up_queries(topic, notes, query_count)
self.artifacts.write(
"research_gaps",
{
"topic": topic,
"after_iteration": iteration,
"source_balance": _source_cluster_counts(notes),
"gaps": gaps,
"queries": queries,
},
)
self.artifacts.write(
"queries",
{
"stage": "follow_up",
"topic": topic,
"iteration": iteration + 1,
"gap_count": len(gaps),
"queries": queries,
},
)
report = self._write_report(topic, notes)
self.artifacts.write(
"reports",
{
"topic": topic,
"source_count": len(notes),
"generated_at": utc_now(),
"markdown": report,
},
)
return ResearchReport(
topic=topic,
markdown=report,
sources=notes,
artifacts_dir=str(self.artifacts.root) if self.artifacts.root is not None else None,
)
seen_* 集合是防止 agent 在重复源上浪费时间的方式。URL 去重捕获重复链接。内容哈希去重捕获镜像、联合发布的帖子和重定向到相同最终内容的页面。
规划初始和后续搜索
第一个模型调用将主题转换为搜索查询: def _initial_queries(self, topic: str, count: int) -> list[str]:
prompt = dedent(
f"""
Create {count} diverse web search queries for researching this topic:
{topic}
Cover background, recent developments, primary sources, criticism, and data.
Include at least one query likely to find primary sources or datasets.
Return JSON only in this shape: {{"queries": ["..."]}}
"""
).strip()
return self._query_list(prompt, count, fallback=[topic])
from urllib.parse import urlparse
def _source_cluster_counts(notes: list[SourceNote]) -> list[dict[str, object]]:
total = len(notes)
if total == 0:
return []
clusters: dict[str, list[str]] = {}
for note in notes:
cluster = _source_cluster(note)
clusters.setdefault(cluster, []).append(note.source_id)
return [
{
"cluster": cluster,
"source_count": len(source_ids),
"source_share": round(len(source_ids) / total, 3),
"source_ids": source_ids,
}
for cluster, source_ids in sorted(
clusters.items(), key=lambda item: (-len(item[1]), item[0])
)
]
def _source_cluster(note: SourceNote) -> str:
url = note.canonical_url or note.final_url or note.url
host = urlparse(url).netloc.lower()
if host.startswith("www."):
host = host[4:]
return host or "unknown"
def _source_balance_digest(notes: list[SourceNote], limit: int = 8) -> str:
clusters = _source_cluster_counts(notes)
if not clusters:
return "No source clusters yet."
total = len(notes)
lines = [
f"- {cluster['cluster']}: {cluster['source_count']}/{total} sources "
f"({cluster['source_share']:.0%}); IDs: {', '.join(cluster['source_ids'])}"
for cluster in clusters[:limit]
]
return "\n".join(lines)
def _follow_up_queries(self, topic: str, notes: list[SourceNote], count: int) -> list[str]:
digest = _source_digest(notes, max_chars=9000)
source_balance = _source_balance_digest(notes)
prompt = dedent(
f"""
We are researching: {topic}
Current notes:
{digest}
Source balance:
{source_balance}
Create {count} follow-up web search queries that fill gaps, verify important claims,
find primary evidence, and look for dissenting evidence.
If one source domain, vendor, framework, product, or perspective is overrepresented,
deliberately broaden beyond it unless the topic explicitly asks for that focus.
Return JSON only in this shape: {{"queries": ["..."]}}
"""
).strip()
return self._query_list(prompt, count, fallback=[topic])
_gap_follow_up_queries() 中,它要求 Venice 同时返回漏洞记录和查询:
def _gap_follow_up_queries(
self, topic: str, notes: list[SourceNote], count: int
) -> tuple[list[dict[str, str]], list[str]]:
if not notes:
return [], [topic]
digest = _source_digest(notes, max_chars=12000)
source_balance = _source_balance_digest(notes)
prompt = dedent(
f"""
Identify coverage gaps before the next research pass.
Research topic:
{topic}
Current source notes:
{digest}
Source balance:
{source_balance}
Find important missing coverage that would improve a deep research report.
Look specifically for primary sources, technical concepts, dissenting views,
overrepresented source clusters, and claims that need verification.
Return JSON only in this shape:
{{"gaps": [{{"missing": "...", "why_it_matters": "...", "query": "..."}}],
"queries": ["targeted web search query"]}}
"""
).strip()
response = self.venice.chat(
[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=0.3,
max_tokens=900,
)
data = json.loads(response)
gaps = _clean_gap_records(data.get("gaps"))
queries = _clean_string_list(data.get("queries"))
if not queries:
queries = [gap["query"] for gap in gaps if gap.get("query")]
return gaps, queries[:count]
--artifacts 时,这些记录被写入 research_gaps.jsonl。这为您提供了 agent 为何搜索特定第二 pass 查询的有用审计线索。
解析器应宽容。如果模型返回格式错误的 JSON,agent 会回退到原始主题:
def _query_list(self, prompt: str, count: int, fallback: list[str]) -> list[str]:
response = self.venice.chat(
[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=0.4,
max_tokens=500,
)
try:
data = json.loads(response)
queries = data.get("queries", [])
except (json.JSONDecodeError, AttributeError):
queries = []
clean_queries = [
query.strip()
for query in queries
if isinstance(query, str) and query.strip()
]
return (clean_queries or fallback)[:count]
读取和汇总源
现在我们收集源注释。agent 搜索每个查询、通过 Venice scrape 获取每个结果、对 Markdown 进行分块,并汇总有用的证据。 def _collect_notes(
self,
topic: str,
queries: list[str],
results_per_query: int,
seen_source_keys: set[str],
seen_content_hashes: set[str],
notes: list[SourceNote],
iteration: int,
) -> None:
for query in queries:
if self.max_sources is not None and len(notes) >= self.max_sources:
return
self.progress(f"Searching: {query}")
try:
results = self.web.search(query, limit=results_per_query)
except Exception as exc:
self._record_error("search", exc, query=query)
continue
self.artifacts.write(
"search_results",
{"iteration": iteration, "query": query, "results": results},
)
for result in results:
if self.max_sources is not None and len(notes) >= self.max_sources:
return
source_key = result.canonical_url or result.url
if source_key in seen_source_keys:
self.artifacts.write("dedupe", {"reason": "canonical_url", "url": result.url})
continue
seen_source_keys.add(source_key)
source_id = f"S{len(notes) + 1}"
note = self._read_source(topic, query, source_id, result, seen_source_keys, seen_content_hashes)
if note is not None:
notes.append(note)
def _read_source(
self,
topic: str,
query: str,
source_id: str,
result: SearchResult,
seen_source_keys: set[str],
seen_content_hashes: set[str],
) -> SourceNote | None:
self.progress(f"Reading {source_id}: {result.title}")
try:
page = self.web.fetch(result)
except Exception as exc:
self._record_error("fetch", exc, query=query, url=result.url, source_id=source_id)
return None
if page.content_hash in seen_content_hashes:
self.artifacts.write(
"dedupe",
{"reason": "content_hash", "source_id": source_id, "url": result.url},
)
return None
seen_content_hashes.add(page.content_hash)
chunks = self._summarize_chunks(topic, query, source_id, page)
if not chunks:
self._record_error("summarize_chunk", VeniceError("no chunks could be summarized"), url=result.url)
return None
summary = self._summarize_source(topic, query, source_id, page, chunks)
note = SourceNote(
source_id=source_id,
title=page.title,
url=result.url,
canonical_url=page.canonical_url,
final_url=page.final_url,
query=query,
rank=result.rank,
snippet=result.snippet,
provider=result.provider,
retrieved_at=page.retrieved_at,
content_type=page.content_type,
content_hash=page.content_hash,
chunks=chunks,
summary=summary,
)
self.artifacts.write("source_notes", note)
return note
def _summarize_chunks(
self,
topic: str,
query: str,
source_id: str,
page: WebPage,
) -> tuple[EvidenceChunk, ...]:
evidence: list[EvidenceChunk] = []
for chunk in page.chunks[: self.max_chunks_per_source]:
prompt = dedent(
f"""
Topic: {topic}
Search query: {query}
Source ID: {source_id}
Chunk ID: {chunk.chunk_id}
Source title: {page.title}
Source URL: {page.final_url}
Source chunk:
{chunk.text}
Extract only evidence relevant to the topic.
Return JSON only in this shape:
{{"summary": "...", "quotes": ["short exact quote", "..."]}}
"""
).strip()
try:
response = self.venice.chat(
[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=0.1,
max_tokens=600,
)
data = json.loads(response)
evidence.append(
EvidenceChunk(
chunk_id=chunk.chunk_id,
text=chunk.text,
summary=str(data.get("summary", "")).strip(),
quotes=tuple(
quote.strip()
for quote in data.get("quotes", [])
if isinstance(quote, str) and quote.strip()
),
)
)
except Exception as exc:
self._record_error("summarize_chunk", exc, query=query, url=page.final_url, source_id=source_id)
continue
return tuple(evidence)
def _summarize_source(
self,
topic: str,
query: str,
source_id: str,
page: WebPage,
chunks: tuple[EvidenceChunk, ...],
) -> str:
chunk_digest = _chunk_digest(chunks, max_chars=9000)
prompt = dedent(
f"""
Topic: {topic}
Search query: {query}
Source ID: {source_id}
Source title: {page.title}
Source URL: {page.final_url}
Chunk evidence:
{chunk_digest}
Synthesize a source note using only the chunk evidence. Include:
- key facts with dates/numbers where present
- any limitations or bias in the source
- useful exact wording from quotes if it is short
Keep the note under 180 words and refer to the source as [{source_id}].
"""
).strip()
return self.venice.chat(
[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=0.1,
max_tokens=500,
)
撰写最终报告
一旦 agent 有了源注释,就可以撰写报告。从单 pass 报告编写器开始: def _write_report(self, topic: str, notes: list[SourceNote]) -> str:
if not notes:
return (
f"# Research report: {topic}\n\n"
"No usable web sources were collected. Check your network connection or try a narrower topic."
)
prompt = dedent(
f"""
Research topic:
{topic}
Source notes:
{_source_digest(notes, max_chars=45000)}
Write a detailed source-backed Markdown research survey.
Requirements:
- Start with a precise H1 title.
- Open with "## Overview".
- Use topic-specific sections.
- Use footnote-style citation markers like [^1] and [^2].
- Do not cite with internal source IDs like [S1] in the report body.
- Do not include uncited factual claims.
- Avoid source-cluster capture from one vendor, domain, framework, or viewpoint.
- Include uncertainty, contradictions, and missing context where relevant.
- End with "## References" as a numbered list ordered by first citation.
"""
).strip()
return self.venice.chat_stream(
[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=0.2,
max_tokens=7000,
)
def _chunk_digest(chunks: tuple[EvidenceChunk, ...], max_chars: int) -> str:
parts = []
for chunk in chunks:
quote_text = "; ".join(chunk.quotes)
parts.append(
f"{chunk.chunk_id}: {chunk.summary}"
+ (f"\nQuotes: {quote_text}" if quote_text else "")
)
return "\n\n".join(parts)[:max_chars]
def _source_digest(notes: list[SourceNote], max_chars: int) -> str:
chunks = [
"\n".join(
[
f"[{note.source_id}] {note.title}",
f"URL: {note.final_url or note.url}",
f"Canonical URL: {note.canonical_url}",
f"Found via: {note.query}",
f"Provider/rank: {note.provider}/{note.rank}",
f"Retrieved: {note.retrieved_at}",
f"Content hash: {note.content_hash}",
f"Note: {note.summary}",
f"Chunk evidence: {_chunk_digest(note.chunks, max_chars=1000)}",
]
)
for note in notes
]
return "\n\n".join(chunks)[:max_chars]
def _record_error(
self,
stage: str,
exc: Exception,
*,
query: str = "",
url: str = "",
source_id: str = "",
provider: str = "",
) -> None:
message = str(exc)
self.progress(f"{stage.replace('_', ' ').title()} failed: {message}")
self.artifacts.write(
"errors",
CollectionError(
stage=stage,
message=message,
query=query,
url=url,
source_id=source_id,
provider=provider,
),
)
添加 CLI
现在我们需要一个命令行入口点。创建main.py:
from __future__ import annotations
import argparse
from pathlib import Path
from dotenv import load_dotenv
from research_agent.agent import (
DEFAULT_ITERATIONS,
DEFAULT_MAX_CHUNKS_PER_SOURCE,
DEFAULT_MAX_SOURCES,
DEFAULT_QUERY_COUNT,
DEFAULT_REPORT_STYLE,
DEFAULT_RESULTS_PER_QUERY,
ResearchAgent,
)
from research_agent.artifacts import ArtifactWriter
from research_agent.venice import VeniceClient, VeniceError
from research_agent.web import WebSearch
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run a minimal deep research agent powered by Venice AI.",
)
parser.add_argument("topic", nargs="+", help="Research topic, wrapped in quotes for best results.")
parser.add_argument("--model", help="Venice model name. Defaults to VENICE_MODEL or openai-gpt-55.")
parser.add_argument("--iterations", type=int, default=DEFAULT_ITERATIONS)
parser.add_argument("--queries", type=int, default=DEFAULT_QUERY_COUNT)
parser.add_argument("--results", type=int, default=DEFAULT_RESULTS_PER_QUERY)
parser.add_argument("--output", "--markdown-output", dest="output", type=Path)
parser.add_argument("--artifacts", type=Path, help="Optional directory for JSONL research artifacts.")
parser.add_argument("--providers", default="duckduckgo", help="Comma-separated providers: duckduckgo, arxiv.")
parser.add_argument("--max-sources", type=int, default=DEFAULT_MAX_SOURCES)
parser.add_argument("--chunk-chars", type=int, default=3000)
parser.add_argument("--max-chunks-per-source", type=int, default=DEFAULT_MAX_CHUNKS_PER_SOURCE)
parser.add_argument(
"--report-style",
choices=["brief", "standard", "deep"],
default=DEFAULT_REPORT_STYLE,
help=f"Final report depth. Default: {DEFAULT_REPORT_STYLE}.",
)
parser.add_argument("--quiet", action="store_true", help="Hide progress messages.")
return parser.parse_args()
| 选项 | 控制内容 |
|---|---|
--iterations | 研究 pass 的数量 |
--queries | 每次 pass 生成的搜索查询数 |
--results | 每个查询从提供商读取的结果数 |
--providers | 搜索提供商,如 duckduckgo 或 duckduckgo,arxiv |
--max-sources | 要收集的最大可用源数 |
--chunk-chars | 源证据提取前的近似 chunk 大小 |
--max-chunks-per-source | 每个源汇总的 chunk 数 |
--report-style | 最终报告深度:brief、standard 或 deep |
--artifacts | JSONL 审计记录的目录 |
--output | 最终 Markdown 报告的路径 |
def main() -> int:
load_dotenv()
args = parse_args()
topic = " ".join(args.topic)
try:
venice = VeniceClient.from_env(model=args.model)
progress = None if args.quiet else lambda message: print(f"[agent] {message}")
provider_names = [name.strip() for name in args.providers.split(",") if name.strip()]
with WebSearch.from_provider_names(
provider_names,
chunk_chars=args.chunk_chars,
scraper=venice.scrape,
) as web:
agent = ResearchAgent(
venice=venice,
web=web,
artifacts=ArtifactWriter(args.artifacts),
progress=progress,
max_sources=args.max_sources,
max_chunks_per_source=args.max_chunks_per_source,
report_style=args.report_style,
)
report = agent.run(
topic,
iterations=args.iterations,
query_count=args.queries,
results_per_query=args.results,
)
except ValueError as exc:
print(f"Configuration error: {exc}")
return 1
except VeniceError as exc:
print(f"Venice API error: {exc}")
return 1
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(report.markdown, encoding="utf-8")
print(f"\nSaved report to {args.output}")
else:
print()
print(report.markdown)
if report.artifacts_dir:
print(f"Saved research artifacts to {report.artifacts_dir}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
运行 Agent
运行快速研究 pass:uv run python main.py "How are AI agents changing software engineering workflows?"
uv run python main.py "state of open source LLM inference in 2026" \
--output reports/inference.md
uv run python main.py "agentic coding research" \
--providers duckduckgo,arxiv \
--iterations 3 \
--queries 5 \
--results 4 \
--max-sources 12
uv run python main.py "AI agents in software engineering" --report-style deep
brief 获得简洁的源支持简报,standard 获得更完整的调查,deep 获得分阶段大纲/部分/编辑器工作流。
保存可审计的工件:
uv run python main.py "privacy tradeoffs in hosted LLM APIs" \
--output reports/privacy.md \
--artifacts runs/privacy
runs/privacy/
queries.jsonl
research_gaps.jsonl
search_results.jsonl
fetches.jsonl
source_chunks.jsonl
chunk_summaries.jsonl
source_notes.jsonl
dedupe.jsonl
errors.jsonl
report_outline.jsonl
report_sections.jsonl
report_editor.jsonl
reports.jsonl
source_notes.jsonl 显示汇总的源证据,research_gaps.jsonl 显示生成后续搜索的原因,errors.jsonl 显示在搜索、抓取或汇总期间失败的页面。
隐私和可靠性注意事项
研究 agent 涉及多个系统,因此精确说明数据流向是有帮助的:| 层 | 看到数据的对象 |
|---|---|
| 本地 CLI | 主题、配置、源注释、工件和最终报告留在您的机器上 |
| 搜索提供商 | 搜索查询发送到您选择的提供商,如 DuckDuckGo 或 arXiv |
| Venice scrape | 公共源 URL 发送到 Venice 的 scrape 端点 |
| Venice 聊天补全 | Prompt、源 chunk、源注释和报告生成指令发送到 Venice |
| 输出文件 | Markdown 报告和 JSONL 工件在本地写入 |
POST /augment/search 端点,而不是直接查询 DuckDuckGo。参考实现使用轻量级公共提供商,使演示易于运行和理解。
为了可靠性,保持这些默认值保守:
- 对 Venice 调用和 web 请求使用重试。
- 如果您从同一主机读取许多页面,添加小的
--request-delay。 - 限制
--max-sources,使广泛的主题不会无限期运行。 - 为重要报告保存
--artifacts,以便您可以审计最终输出。 - 将报告视为简报,而不是地面真相。当准确性重要时,沿着引用追溯到原始源。
测试各部分
您不需要实时 web 请求或 Venice 调用即可测试大多数系统。参考仓库使用假 Venice 和假 web 类来测试研究循环、去重行为、工件和报告 prompt。 有用的第一个测试是 URL 规范化:from research_agent.models import canonicalize_url
def test_canonicalize_url_removes_tracking_params():
url = "https://example.com/post?utm_source=x&b=2&a=1#section"
assert canonicalize_url(url) == "https://example.com/post?a=1&b=2"
from research_agent.models import SearchResult, WebPage, chunk_text
class FakeWeb:
def search(self, query: str, limit: int = 5) -> list[SearchResult]:
return [
SearchResult(title="First source", url="https://example.com/a", snippet="snippet"),
SearchResult(title="Mirror", url="https://example.com/b", snippet="snippet"),
]
def fetch(self, result: SearchResult) -> WebPage:
text = "This page contains relevant evidence. " * 5
return WebPage(
title=result.title,
url=result.url,
final_url=result.url,
text=text,
content_hash="same-content",
chunks=chunk_text(text, chunk_chars=80, overlap=10),
)
基准测试
许多 AI 提供商现在都有自己的深度研究工作流,因此参考仓库包括针对 Perplexity 的 Deep Research 工具的简单基准测试。两个 agent 都被要求撰写关于 AI agent 框架架构的报告,然后将生成的报告检入 GitHub 仓库。 这不是一个正式的基准测试。它是一种实用的方式来检查报告结构、源覆盖、引用质量,以及 agent 是否过度关注一个源集群。这也是为什么更新的实现在后续搜索之前跟踪research_gaps.jsonl 和源平衡的原因。
扩展此示例
一旦基线 agent 工作,以下是改进它的实用方法:- 使用
POST /augment/search添加 Venice 搜索提供商。 - 将报告和工件存储在小型 SQLite 数据库中,而不是 JSONL 文件中。
- 为受信任的研究域名添加源允许列表或阻止列表。
- 通过将 Venice scrape 与文档解析相结合,为不公开干净 HTML 的源添加 PDF 支持。
- 添加主题和预期源类型的评估集,以便您可以比较 prompt 更改后的研究质量。
- 添加一个审查步骤,要求 Venice 在保存之前在最终报告中查找未支持的声明。