Documentation Index
Fetch the complete documentation index at: https://docs.venice.ai/llms.txt
Use this file to discover all available pages before exploring further.
Research agents are useful when you want more than a single search result or a quick model answer. A good research agent can turn a broad topic into search queries, collect sources, extract the important evidence, follow up on gaps, and write a cited briefing that you can inspect afterward.
In this tutorial, we’ll build a private research agent using Python and the Venice API. By the end, you’ll have a CLI that can research a topic, scrape public pages into Markdown, summarize source chunks, run gap-aware follow-up research passes, and generate a cited report with optional local JSONL artifacts.
Interested in the full code implementation? Check out the GitHub repo.
Before we continue, you’ll need a Venice API key:
export VENICE_API_KEY=<my-key>
What We’re Building
The reference implementation is a small Python project with a few clear parts:
| Part | What it does |
|---|
| CLI | Accepts a research topic, model, providers, depth settings, output path, and artifact directory |
| Venice client | Calls chat completions, streaming chat completions, and POST /augment/scrape |
| Search layer | Searches DuckDuckGo by default, with optional arXiv paper discovery |
| Data models | Tracks source URLs, canonical URLs, chunks, evidence, notes, errors, and reports |
| Research agent | Plans searches, reads sources, extracts evidence, analyzes gaps, generates follow-up queries, and writes the final report |
| Artifact writer | Stores auditable JSONL records for queries, research gaps, results, fetches, chunks, source notes, report drafts, errors, and reports |
The flow looks like this:
- Ask Venice to generate diverse search queries for the topic.
- Search the web with one or more providers.
- Deduplicate URLs before reading them.
- Use Venice’s scrape endpoint to turn each public source page into Markdown.
- Split long pages into chunks.
- Ask Venice to extract evidence from each chunk.
- Ask Venice to turn chunk evidence into source notes.
- Identify research gaps and source-balance issues before generating follow-up queries.
- Ask Venice to synthesize the final report with footnote-style citations.
This is “private” in the practical sense that the agent keeps the orchestration, source notes, artifacts, and final reports on your machine. Venice handles the model calls and scraping through its API. The default reference implementation still sends search queries to DuckDuckGo or arXiv, so treat provider choice as part of your privacy design.
Setting Up the Project
The reference project uses Python 3.13 and uv, but the same code works with a normal virtual environment too.
Create a new project:
mkdir venice-research-agent
cd venice-research-agent
uv init
Install the dependencies:
uv add httpx beautifulsoup4 python-dotenv
If you prefer pip, create a virtual environment and install the same packages:
python -m venv .venv
source .venv/bin/activate
pip install "httpx>=0.28.0" "beautifulsoup4>=4.13.0" "python-dotenv>=1.0.0"
Create a .env file for local development:
VENICE_API_KEY=your_venice_api_key_here
VENICE_MODEL=openai-gpt-55
We use VENICE_MODEL so you can change the model without editing code. The reference implementation currently defaults to openai-gpt-55, but you can swap it for another chat model available to your Venice account.
Creating the Data Models
Before writing the agent logic, we’ll define the objects that move through the pipeline. These models keep the rest of the code easier to reason about because every source carries provenance: where it came from, which query found it, when it was retrieved, and how it was chunked.
Create research_agent/models.py:
from __future__ import annotations
import hashlib
from dataclasses import dataclass, field
from datetime import UTC, datetime
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
TRACKING_PARAMS = {
"fbclid",
"gclid",
"igshid",
"mc_cid",
"mc_eid",
"msclkid",
"ref",
"ref_src",
}
@dataclass(frozen=True)
class SearchResult:
title: str
url: str
snippet: str
query: str = ""
rank: int = 0
provider: str = "duckduckgo"
canonical_url: str = ""
def __post_init__(self) -> None:
if not self.canonical_url:
object.__setattr__(self, "canonical_url", canonicalize_url(self.url))
@dataclass(frozen=True)
class ScrapeResult:
url: str
content: str
title: str = ""
final_url: str = ""
content_type: str = "text/markdown"
@dataclass(frozen=True)
class TextChunk:
chunk_id: str
text: str
start: int
end: int
content_hash: str
@dataclass(frozen=True)
class WebPage:
title: str
url: str
text: str
final_url: str = ""
canonical_url: str = ""
content_type: str = ""
retrieved_at: str = ""
content_hash: str = ""
chunks: tuple[TextChunk, ...] = field(default_factory=tuple)
def __post_init__(self) -> None:
final_url = self.final_url or self.url
object.__setattr__(self, "final_url", final_url)
if not self.canonical_url:
object.__setattr__(self, "canonical_url", canonicalize_url(final_url))
if not self.retrieved_at:
object.__setattr__(self, "retrieved_at", utc_now())
if not self.content_hash:
object.__setattr__(self, "content_hash", content_hash(self.text))
@dataclass(frozen=True)
class EvidenceChunk:
chunk_id: str
text: str
summary: str
quotes: tuple[str, ...] = field(default_factory=tuple)
@dataclass(frozen=True)
class SourceNote:
source_id: str
title: str
url: str
query: str
summary: str
canonical_url: str = ""
final_url: str = ""
rank: int = 0
snippet: str = ""
provider: str = "duckduckgo"
retrieved_at: str = ""
content_type: str = ""
content_hash: str = ""
chunks: tuple[EvidenceChunk, ...] = field(default_factory=tuple)
The important fields here are canonical_url, content_hash, and chunks.
canonical_url lets the agent avoid reading the same source repeatedly when search results differ only by tracking parameters or fragments. content_hash helps catch duplicate pages even when they live at different URLs. chunks lets us summarize long pages in smaller pieces instead of losing useful evidence to context limits.
Add the helper functions below the dataclasses:
def utc_now() -> str:
return datetime.now(UTC).isoformat()
def content_hash(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def canonicalize_url(raw_url: str) -> str:
if not raw_url:
return ""
parsed = urlparse(raw_url.strip())
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
return ""
scheme = parsed.scheme.lower()
netloc = parsed.netloc.lower()
path = parsed.path or "/"
if path != "/":
path = path.rstrip("/")
query_pairs = [
(key, value)
for key, value in parse_qsl(parsed.query, keep_blank_values=True)
if not _is_tracking_param(key)
]
query = urlencode(sorted(query_pairs), doseq=True)
return urlunparse((scheme, netloc, path, "", query, ""))
def chunk_text(text: str, *, chunk_chars: int = 3000, overlap: int = 250) -> tuple[TextChunk, ...]:
clean = text.strip()
if not clean:
return ()
if chunk_chars <= 0:
raise ValueError("chunk_chars must be greater than 0")
if overlap < 0 or overlap >= chunk_chars:
raise ValueError("overlap must be at least 0 and smaller than chunk_chars")
chunks: list[TextChunk] = []
start = 0
index = 1
while start < len(clean):
end = min(len(clean), start + chunk_chars)
chunk = clean[start:end].strip()
if chunk:
chunks.append(
TextChunk(
chunk_id=f"C{index}",
text=chunk,
start=start,
end=end,
content_hash=content_hash(chunk),
)
)
index += 1
if end == len(clean):
break
start = end - overlap
return tuple(chunks)
def _is_tracking_param(key: str) -> bool:
lowered = key.lower()
return lowered.startswith("utm_") or lowered in TRACKING_PARAMS
Chunking is deliberately simple here: fixed-size character chunks with overlap. That is enough for a demo research agent because Venice’s scrape endpoint returns Markdown, which is usually much cleaner than raw HTML. For production research on long technical documents, you can improve this by splitting on headings, paragraphs, or token counts.
Building the Venice Client
Next, we’ll create a small Venice client. You could use the OpenAI Python SDK for chat completions because Venice is OpenAI-compatible, but the reference implementation uses httpx directly so the same client can call Venice’s POST /augment/scrape endpoint.
Create research_agent/venice.py:
from __future__ import annotations
import json
import os
import time
from dataclasses import dataclass
from typing import Any
import httpx
from .models import ScrapeResult
DEFAULT_BASE_URL = "https://api.venice.ai/api/v1"
DEFAULT_MODEL = "openai-gpt-55"
RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}
class VeniceError(RuntimeError):
"""Raised when the Venice API returns an unusable response."""
@dataclass(frozen=True)
class VeniceClient:
api_key: str
model: str = DEFAULT_MODEL
base_url: str = DEFAULT_BASE_URL
timeout: float = 60.0
max_retries: int = 2
backoff_seconds: float = 1.0
@classmethod
def from_env(cls, model: str | None = None, *, max_retries: int = 2) -> "VeniceClient":
api_key = os.getenv("VENICE_API_KEY")
if not api_key:
raise VeniceError("VENICE_API_KEY is required.")
return cls(
api_key=api_key,
model=model or os.getenv("VENICE_MODEL", DEFAULT_MODEL),
base_url=os.getenv("VENICE_BASE_URL", DEFAULT_BASE_URL).rstrip("/"),
max_retries=max_retries,
)
The from_env() helper keeps secrets out of your source code. It also makes local development convenient because python-dotenv can load VENICE_API_KEY and VENICE_MODEL from .env.
Now add chat completions:
def chat(
self,
messages: list[dict[str, str]],
*,
temperature: float = 0.2,
max_tokens: int = 1600,
) -> str:
payload: dict[str, Any] = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
}
data = self._post_json("/chat/completions", payload)
try:
return data["choices"][0]["message"]["content"].strip()
except (KeyError, IndexError, TypeError) as exc:
raise VeniceError(f"Unexpected Venice API response: {data}") from exc
For the final report, we want to use streaming because deep reports can take significantly longer (because it will produce a lot more text). This can cause timeout issues for requests where it may take an extremely long time to produce the final output. By using streaming, we can eliminate this issue and make the request more resistant to timeout failures:
def chat_stream(
self,
messages: list[dict[str, str]],
*,
temperature: float = 0.2,
max_tokens: int = 1600,
) -> str:
payload: dict[str, Any] = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
"stream": True,
}
return self._post_chat_stream("/chat/completions", payload).strip()
Then add scraping:
def scrape(self, url: str) -> ScrapeResult:
data = self._post_json("/augment/scrape", {"url": url})
content = _first_string(data, "content", "markdown", "text")
if not content:
raise VeniceError(f"Unexpected Venice scrape response: {data}")
return ScrapeResult(
url=url,
final_url=_first_string(data, "final_url", "url", "source_url") or url,
title=_first_string(data, "title"),
content=content,
content_type="text/markdown",
)
Venice’s scrape endpoint accepts a publicly accessible URL and returns the page as Markdown. That means the model does not need to parse raw HTML, and your source extraction prompts can work with cleaner text.
The remaining helper handles retries and response parsing:
def _post_json(self, path: str, payload: dict[str, Any]) -> dict[str, Any]:
for attempt in range(self.max_retries + 1):
try:
response = httpx.post(
f"{self.base_url}{path}",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
json=payload,
timeout=self.timeout,
)
if response.status_code in RETRYABLE_STATUS_CODES and attempt < self.max_retries:
time.sleep(self.backoff_seconds * (2**attempt))
continue
response.raise_for_status()
data = response.json()
if not isinstance(data, dict):
raise VeniceError(f"Unexpected Venice API response: {data}")
return data
except httpx.HTTPError as exc:
if attempt < self.max_retries:
time.sleep(self.backoff_seconds * (2**attempt))
continue
raise VeniceError(f"Could not reach Venice API: {exc}") from exc
raise VeniceError("Could not reach Venice API")
def _first_string(data: dict[str, Any], *keys: str) -> str:
for key in keys:
value = data.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
for nested_key in ("data", "result", "scrape"):
nested = data.get(nested_key)
if isinstance(nested, dict):
value = _first_string(nested, *keys)
if value:
return value
return ""
The complete repo also includes a robust _post_chat_stream() helper that reads server-sent events from streaming chat completions. You can start without streaming, then add it once the rest of the research flow works.
Adding Search Providers
The search layer has two jobs: find source URLs and fetch those URLs through the Venice scraper. The reference implementation uses DuckDuckGo’s HTML endpoint for general web search and arXiv’s Atom API for papers.
Create research_agent/web.py:
from __future__ import annotations
import re
import xml.etree.ElementTree as ET
from collections.abc import Callable, Iterable
from urllib.parse import parse_qs, unquote, urlparse
import httpx
from bs4 import BeautifulSoup
from .models import ScrapeResult, SearchResult, TextChunk, WebPage, canonicalize_url, chunk_text, content_hash, utc_now
USER_AGENT = "venice-research-agent-demo/0.1 (+https://venice.ai)"
class SearchProvider:
name = "provider"
def search(self, web: "WebSearch", query: str, limit: int) -> list[SearchResult]:
raise NotImplementedError
Now add DuckDuckGo:
class DuckDuckGoProvider(SearchProvider):
name = "duckduckgo"
def search(self, web: "WebSearch", query: str, limit: int) -> list[SearchResult]:
response = web.get("https://duckduckgo.com/html/", params={"q": query})
soup = BeautifulSoup(response.text, "html.parser")
results: list[SearchResult] = []
seen_urls: set[str] = set()
for node in soup.select(".result"):
link = node.select_one(".result__a")
if link is None:
continue
url = _normalize_duckduckgo_url(link.get("href", ""))
canonical_url = canonicalize_url(url)
if not canonical_url or canonical_url in seen_urls:
continue
snippet = node.select_one(".result__snippet")
results.append(
SearchResult(
title=_clean_text(link.get_text(" ", strip=True)),
url=url,
snippet=_clean_text(snippet.get_text(" ", strip=True) if snippet else ""),
query=query,
rank=len(results) + 1,
provider=self.name,
canonical_url=canonical_url,
)
)
seen_urls.add(canonical_url)
if len(results) >= limit:
break
return results
And arXiv:
class ArxivProvider(SearchProvider):
name = "arxiv"
def search(self, web: "WebSearch", query: str, limit: int) -> list[SearchResult]:
response = web.get(
"https://export.arxiv.org/api/query",
params={
"search_query": f"all:{query}",
"start": 0,
"max_results": limit,
"sortBy": "relevance",
},
)
namespace = {"atom": "http://www.w3.org/2005/Atom"}
root = ET.fromstring(response.text)
results: list[SearchResult] = []
for entry in root.findall("atom:entry", namespace):
title = _clean_text(_xml_text(entry.find("atom:title", namespace)))
summary = _clean_text(_xml_text(entry.find("atom:summary", namespace)))
url = _xml_text(entry.find("atom:id", namespace)).strip()
canonical_url = canonicalize_url(url)
if not url or not canonical_url:
continue
results.append(
SearchResult(
title=title or url,
url=url,
snippet=summary,
query=query,
rank=len(results) + 1,
provider=self.name,
canonical_url=canonical_url,
)
)
if len(results) >= limit:
break
return results
The WebSearch class coordinates providers and fetches pages:
class WebSearch:
def __init__(
self,
timeout: float = 15.0,
*,
providers: Iterable[SearchProvider] | None = None,
chunk_chars: int = 3000,
scraper: Callable[[str], ScrapeResult] | None = None,
) -> None:
self._client = httpx.Client(
timeout=timeout,
follow_redirects=True,
headers={"User-Agent": USER_AGENT},
)
self.providers = tuple(providers or (DuckDuckGoProvider(),))
self.chunk_chars = chunk_chars
self.scraper = scraper
@classmethod
def from_provider_names(cls, provider_names: Iterable[str], **kwargs: object) -> "WebSearch":
providers = [_provider_from_name(name) for name in provider_names]
return cls(providers=providers, **kwargs)
def search(self, query: str, limit: int = 5) -> list[SearchResult]:
results: list[SearchResult] = []
seen_urls: set[str] = set()
for provider in self.providers:
for result in provider.search(self, query, limit):
if result.canonical_url in seen_urls:
continue
results.append(result)
seen_urls.add(result.canonical_url)
return results
def fetch(self, result: SearchResult) -> WebPage:
if self.scraper is None:
raise RuntimeError("WebSearch.fetch requires a Venice scrape function.")
scraped = self.scraper(result.url)
text = scraped.content.strip() or result.snippet
chunks = self._chunk_text(text)
return WebPage(
title=scraped.title or result.title,
url=result.url,
final_url=scraped.final_url or scraped.url or result.url,
canonical_url=canonicalize_url(scraped.final_url or result.url),
text=text,
content_type=scraped.content_type or "text/markdown",
retrieved_at=utc_now(),
content_hash=content_hash(text),
chunks=chunks,
)
def get(self, url: str, *, params: dict[str, object] | None = None) -> httpx.Response:
response = self._client.get(url, params=params)
response.raise_for_status()
return response
def close(self) -> None:
self._client.close()
def __enter__(self) -> "WebSearch":
return self
def __exit__(self, *_: object) -> None:
self.close()
def _chunk_text(self, text: str) -> tuple[TextChunk, ...]:
overlap = min(250, max(0, self.chunk_chars // 10))
return chunk_text(text, chunk_chars=self.chunk_chars, overlap=overlap)
The full reference implementation adds retries, host-level request delays, and friendlier errors. Those are worth keeping because research agents spend a lot of time dealing with pages that block automation, redirect unexpectedly, or return transient errors.
Add the small provider helpers at the bottom:
def _normalize_duckduckgo_url(raw_url: str) -> str:
if not raw_url:
return ""
parsed = urlparse(raw_url)
if parsed.netloc.endswith("duckduckgo.com") and parsed.path == "/l/":
target = parse_qs(parsed.query).get("uddg", [""])[0]
return unquote(target)
if parsed.scheme in {"http", "https"}:
return raw_url
return ""
def _provider_from_name(name: str) -> SearchProvider:
normalized = name.strip().lower()
if normalized in {"duckduckgo", "ddg", "web"}:
return DuckDuckGoProvider()
if normalized == "arxiv":
return ArxivProvider()
raise ValueError(f"Unknown source provider: {name}")
def _clean_text(value: str) -> str:
return re.sub(r"\s+", " ", value).strip()
def _xml_text(node: ET.Element | None) -> str:
return "" if node is None or node.text is None else node.text
Writing Local Artifacts
For research workflows, auditability matters. If the final report says something surprising, you should be able to inspect which source led to it.
Create research_agent/artifacts.py:
from __future__ import annotations
import json
from dataclasses import asdict, is_dataclass
from pathlib import Path
from typing import Any
class ArtifactWriter:
def __init__(self, root: Path | None = None) -> None:
self.root = root
if self.root is not None:
self.root.mkdir(parents=True, exist_ok=True)
@property
def enabled(self) -> bool:
return self.root is not None
def write(self, kind: str, record: object) -> None:
if self.root is None:
return
path = self.root / f"{kind}.jsonl"
payload = json.dumps(_to_jsonable(record), ensure_ascii=False, sort_keys=True)
with path.open("a", encoding="utf-8") as file:
file.write(f"{payload}\n")
def _to_jsonable(value: object) -> Any:
if is_dataclass(value):
return _to_jsonable(asdict(value))
if isinstance(value, Path):
return str(value)
if isinstance(value, dict):
return {str(key): _to_jsonable(item) for key, item in value.items()}
if isinstance(value, (list, tuple)):
return [_to_jsonable(item) for item in value]
return value
This writes one JSON object per line, which makes the artifacts easy to append, inspect, and process with command-line tools later.
Building the Research Agent
Now that we have Venice, search, models, and artifacts, we can build the actual agent.
Create research_agent/agent.py:
from __future__ import annotations
import json
from collections.abc import Callable
from textwrap import dedent
from .artifacts import ArtifactWriter
from .models import CollectionError, EvidenceChunk, ResearchReport, SearchResult, SourceNote, WebPage, utc_now
from .venice import VeniceClient, VeniceError
from .web import WebSearch
SYSTEM_PROMPT = """You are a careful research assistant.
Use the supplied source material only when making factual claims.
Flag uncertainty, contradictions, and missing context instead of filling gaps."""
ProgressCallback = Callable[[str], None]
DEFAULT_ITERATIONS = 3
DEFAULT_QUERY_COUNT = 6
DEFAULT_RESULTS_PER_QUERY = 4
DEFAULT_MAX_SOURCES = 40
DEFAULT_MAX_CHUNKS_PER_SOURCE = 6
The system prompt is the core behavioral guardrail. We don’t want the model to produce an impressive-sounding report from memory. We want it to use the source material and call out uncertainty when the evidence is thin.
We also need two final dataclasses in models.py if you have not added them yet:
@dataclass(frozen=True)
class CollectionError:
stage: str
message: str
query: str = ""
url: str = ""
source_id: str = ""
provider: str = ""
@dataclass(frozen=True)
class ResearchReport:
topic: str
markdown: str
sources: list[SourceNote]
artifacts_dir: str | None = None
Next, define the ResearchAgent:
class ResearchAgent:
def __init__(
self,
venice: VeniceClient,
web: WebSearch | None = None,
artifacts: ArtifactWriter | None = None,
progress: ProgressCallback | None = None,
max_sources: int | None = DEFAULT_MAX_SOURCES,
max_chunks_per_source: int = DEFAULT_MAX_CHUNKS_PER_SOURCE,
) -> None:
self.venice = venice
self.web = web or WebSearch(scraper=venice.scrape)
self.artifacts = artifacts or ArtifactWriter()
self.progress = progress or (lambda _: None)
self.max_sources = max_sources
self.max_chunks_per_source = max_chunks_per_source
The run() method coordinates the research passes:
def run(
self,
topic: str,
*,
iterations: int = DEFAULT_ITERATIONS,
query_count: int = DEFAULT_QUERY_COUNT,
results_per_query: int = DEFAULT_RESULTS_PER_QUERY,
) -> ResearchReport:
notes: list[SourceNote] = []
seen_source_keys: set[str] = set()
seen_content_hashes: set[str] = set()
queries = self._initial_queries(topic, query_count)
self.artifacts.write("queries", {"stage": "initial", "topic": topic, "queries": queries})
for iteration in range(1, iterations + 1):
self.progress(f"Research pass {iteration}/{iterations}: {', '.join(queries)}")
self._collect_notes(
topic,
queries,
results_per_query,
seen_source_keys,
seen_content_hashes,
notes,
iteration,
)
if iteration < iterations:
gaps, queries = self._gap_follow_up_queries(topic, notes, query_count)
self.artifacts.write(
"research_gaps",
{
"topic": topic,
"after_iteration": iteration,
"source_balance": _source_cluster_counts(notes),
"gaps": gaps,
"queries": queries,
},
)
self.artifacts.write(
"queries",
{
"stage": "follow_up",
"topic": topic,
"iteration": iteration + 1,
"gap_count": len(gaps),
"queries": queries,
},
)
report = self._write_report(topic, notes)
self.artifacts.write(
"reports",
{
"topic": topic,
"source_count": len(notes),
"generated_at": utc_now(),
"markdown": report,
},
)
return ResearchReport(
topic=topic,
markdown=report,
sources=notes,
artifacts_dir=str(self.artifacts.root) if self.artifacts.root is not None else None,
)
The two seen_* sets are what keep the agent from wasting time on duplicate sources. URL dedupe catches repeated links. Content hash dedupe catches mirrors, syndicated posts, and pages that redirect to the same final content.
Planning Initial and Follow-up Searches
The first model call turns the topic into search queries:
def _initial_queries(self, topic: str, count: int) -> list[str]:
prompt = dedent(
f"""
Create {count} diverse web search queries for researching this topic:
{topic}
Cover background, recent developments, primary sources, criticism, and data.
Include at least one query likely to find primary sources or datasets.
Return JSON only in this shape: {{"queries": ["..."]}}
"""
).strip()
return self._query_list(prompt, count, fallback=[topic])
After each research pass, the updated agent does a more deliberate gap-analysis step. It looks at the current notes, counts source clusters by domain, asks Venice what coverage is missing, writes those gaps to artifacts, and then uses the resulting queries for the next pass.
Start by tracking source balance:
from urllib.parse import urlparse
def _source_cluster_counts(notes: list[SourceNote]) -> list[dict[str, object]]:
total = len(notes)
if total == 0:
return []
clusters: dict[str, list[str]] = {}
for note in notes:
cluster = _source_cluster(note)
clusters.setdefault(cluster, []).append(note.source_id)
return [
{
"cluster": cluster,
"source_count": len(source_ids),
"source_share": round(len(source_ids) / total, 3),
"source_ids": source_ids,
}
for cluster, source_ids in sorted(
clusters.items(), key=lambda item: (-len(item[1]), item[0])
)
]
def _source_cluster(note: SourceNote) -> str:
url = note.canonical_url or note.final_url or note.url
host = urlparse(url).netloc.lower()
if host.startswith("www."):
host = host[4:]
return host or "unknown"
def _source_balance_digest(notes: list[SourceNote], limit: int = 8) -> str:
clusters = _source_cluster_counts(notes)
if not clusters:
return "No source clusters yet."
total = len(notes)
lines = [
f"- {cluster['cluster']}: {cluster['source_count']}/{total} sources "
f"({cluster['source_share']:.0%}); IDs: {', '.join(cluster['source_ids'])}"
for cluster in clusters[:limit]
]
return "\n".join(lines)
This gives the agent a simple way to notice source-cluster capture. If every source is coming from one company, one framework, or one domain, follow-up queries should deliberately broaden the source set instead of collecting more of the same.
Now use that balance information when creating follow-up searches:
def _follow_up_queries(self, topic: str, notes: list[SourceNote], count: int) -> list[str]:
digest = _source_digest(notes, max_chars=9000)
source_balance = _source_balance_digest(notes)
prompt = dedent(
f"""
We are researching: {topic}
Current notes:
{digest}
Source balance:
{source_balance}
Create {count} follow-up web search queries that fill gaps, verify important claims,
find primary evidence, and look for dissenting evidence.
If one source domain, vendor, framework, product, or perspective is overrepresented,
deliberately broaden beyond it unless the topic explicitly asks for that focus.
Return JSON only in this shape: {{"queries": ["..."]}}
"""
).strip()
return self._query_list(prompt, count, fallback=[topic])
The newer reference implementation wraps this in _gap_follow_up_queries(), which asks Venice to return both gap records and queries:
def _gap_follow_up_queries(
self, topic: str, notes: list[SourceNote], count: int
) -> tuple[list[dict[str, str]], list[str]]:
if not notes:
return [], [topic]
digest = _source_digest(notes, max_chars=12000)
source_balance = _source_balance_digest(notes)
prompt = dedent(
f"""
Identify coverage gaps before the next research pass.
Research topic:
{topic}
Current source notes:
{digest}
Source balance:
{source_balance}
Find important missing coverage that would improve a deep research report.
Look specifically for primary sources, technical concepts, dissenting views,
overrepresented source clusters, and claims that need verification.
Return JSON only in this shape:
{{"gaps": [{{"missing": "...", "why_it_matters": "...", "query": "..."}}],
"queries": ["targeted web search query"]}}
"""
).strip()
response = self.venice.chat(
[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=0.3,
max_tokens=900,
)
data = json.loads(response)
gaps = _clean_gap_records(data.get("gaps"))
queries = _clean_string_list(data.get("queries"))
if not queries:
queries = [gap["query"] for gap in gaps if gap.get("query")]
return gaps, queries[:count]
When --artifacts is enabled, these records are written to research_gaps.jsonl. That gives you a useful audit trail for why the agent searched for a particular second-pass query.
The parser should be forgiving. If the model returns malformed JSON, the agent falls back to the original topic:
def _query_list(self, prompt: str, count: int, fallback: list[str]) -> list[str]:
response = self.venice.chat(
[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=0.4,
max_tokens=500,
)
try:
data = json.loads(response)
queries = data.get("queries", [])
except (json.JSONDecodeError, AttributeError):
queries = []
clean_queries = [
query.strip()
for query in queries
if isinstance(query, str) and query.strip()
]
return (clean_queries or fallback)[:count]
This pattern is worth using throughout agent code: ask for structured output, parse it, and provide a simple fallback when the output is not usable.
Reading and Summarizing Sources
Now we collect source notes. The agent searches each query, fetches each result through Venice scrape, chunks the Markdown, and summarizes the useful evidence.
def _collect_notes(
self,
topic: str,
queries: list[str],
results_per_query: int,
seen_source_keys: set[str],
seen_content_hashes: set[str],
notes: list[SourceNote],
iteration: int,
) -> None:
for query in queries:
if self.max_sources is not None and len(notes) >= self.max_sources:
return
self.progress(f"Searching: {query}")
try:
results = self.web.search(query, limit=results_per_query)
except Exception as exc:
self._record_error("search", exc, query=query)
continue
self.artifacts.write(
"search_results",
{"iteration": iteration, "query": query, "results": results},
)
for result in results:
if self.max_sources is not None and len(notes) >= self.max_sources:
return
source_key = result.canonical_url or result.url
if source_key in seen_source_keys:
self.artifacts.write("dedupe", {"reason": "canonical_url", "url": result.url})
continue
seen_source_keys.add(source_key)
source_id = f"S{len(notes) + 1}"
note = self._read_source(topic, query, source_id, result, seen_source_keys, seen_content_hashes)
if note is not None:
notes.append(note)
Individual search and fetch failures should not stop the whole run. The public web is messy. Some pages block scraping, some return PDFs, some are down, and some redirect to unexpected places. A research agent should keep moving and record what failed.
Here is the source-reading method:
def _read_source(
self,
topic: str,
query: str,
source_id: str,
result: SearchResult,
seen_source_keys: set[str],
seen_content_hashes: set[str],
) -> SourceNote | None:
self.progress(f"Reading {source_id}: {result.title}")
try:
page = self.web.fetch(result)
except Exception as exc:
self._record_error("fetch", exc, query=query, url=result.url, source_id=source_id)
return None
if page.content_hash in seen_content_hashes:
self.artifacts.write(
"dedupe",
{"reason": "content_hash", "source_id": source_id, "url": result.url},
)
return None
seen_content_hashes.add(page.content_hash)
chunks = self._summarize_chunks(topic, query, source_id, page)
if not chunks:
self._record_error("summarize_chunk", VeniceError("no chunks could be summarized"), url=result.url)
return None
summary = self._summarize_source(topic, query, source_id, page, chunks)
note = SourceNote(
source_id=source_id,
title=page.title,
url=result.url,
canonical_url=page.canonical_url,
final_url=page.final_url,
query=query,
rank=result.rank,
snippet=result.snippet,
provider=result.provider,
retrieved_at=page.retrieved_at,
content_type=page.content_type,
content_hash=page.content_hash,
chunks=chunks,
summary=summary,
)
self.artifacts.write("source_notes", note)
return note
For each source chunk, ask Venice for a short evidence summary and exact quotes:
def _summarize_chunks(
self,
topic: str,
query: str,
source_id: str,
page: WebPage,
) -> tuple[EvidenceChunk, ...]:
evidence: list[EvidenceChunk] = []
for chunk in page.chunks[: self.max_chunks_per_source]:
prompt = dedent(
f"""
Topic: {topic}
Search query: {query}
Source ID: {source_id}
Chunk ID: {chunk.chunk_id}
Source title: {page.title}
Source URL: {page.final_url}
Source chunk:
{chunk.text}
Extract only evidence relevant to the topic.
Return JSON only in this shape:
{{"summary": "...", "quotes": ["short exact quote", "..."]}}
"""
).strip()
try:
response = self.venice.chat(
[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=0.1,
max_tokens=600,
)
data = json.loads(response)
evidence.append(
EvidenceChunk(
chunk_id=chunk.chunk_id,
text=chunk.text,
summary=str(data.get("summary", "")).strip(),
quotes=tuple(
quote.strip()
for quote in data.get("quotes", [])
if isinstance(quote, str) and quote.strip()
),
)
)
except Exception as exc:
self._record_error("summarize_chunk", exc, query=query, url=page.final_url, source_id=source_id)
continue
return tuple(evidence)
Then collapse the chunk summaries into a source note:
def _summarize_source(
self,
topic: str,
query: str,
source_id: str,
page: WebPage,
chunks: tuple[EvidenceChunk, ...],
) -> str:
chunk_digest = _chunk_digest(chunks, max_chars=9000)
prompt = dedent(
f"""
Topic: {topic}
Search query: {query}
Source ID: {source_id}
Source title: {page.title}
Source URL: {page.final_url}
Chunk evidence:
{chunk_digest}
Synthesize a source note using only the chunk evidence. Include:
- key facts with dates/numbers where present
- any limitations or bias in the source
- useful exact wording from quotes if it is short
Keep the note under 180 words and refer to the source as [{source_id}].
"""
).strip()
return self.venice.chat(
[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=0.1,
max_tokens=500,
)
This two-step summarization is the part that makes the agent feel more reliable than a basic “summarize these URLs” script. The model reads source chunks first, then writes a source-level note from those extracted pieces of evidence.
Writing the Final Report
Once the agent has source notes, it can write the report. Start with a single-pass report writer:
def _write_report(self, topic: str, notes: list[SourceNote]) -> str:
if not notes:
return (
f"# Research report: {topic}\n\n"
"No usable web sources were collected. Check your network connection or try a narrower topic."
)
prompt = dedent(
f"""
Research topic:
{topic}
Source notes:
{_source_digest(notes, max_chars=45000)}
Write a detailed source-backed Markdown research survey.
Requirements:
- Start with a precise H1 title.
- Open with "## Overview".
- Use topic-specific sections.
- Use footnote-style citation markers like [^1] and [^2].
- Do not cite with internal source IDs like [S1] in the report body.
- Do not include uncited factual claims.
- Avoid source-cluster capture from one vendor, domain, framework, or viewpoint.
- Include uncertainty, contradictions, and missing context where relevant.
- End with "## References" as a numbered list ordered by first citation.
"""
).strip()
return self.venice.chat_stream(
[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=0.2,
max_tokens=7000,
)
The reference implementation goes further for deep reports: it asks Venice for an outline, drafts each report section separately, then asks a final editor pass to assemble the finished report and convert internal source IDs into footnote-style citations.
That staged approach is useful when you want long-form research output because one giant prompt often compresses too much. The updated prompts also push the report toward a broad, source-backed survey instead of a thin decision guide. If the source base is skewed toward one cluster, the editor prompt tells Venice to acknowledge that skew and avoid presenting it as representative of the whole field.
Add the digest helpers:
def _chunk_digest(chunks: tuple[EvidenceChunk, ...], max_chars: int) -> str:
parts = []
for chunk in chunks:
quote_text = "; ".join(chunk.quotes)
parts.append(
f"{chunk.chunk_id}: {chunk.summary}"
+ (f"\nQuotes: {quote_text}" if quote_text else "")
)
return "\n\n".join(parts)[:max_chars]
def _source_digest(notes: list[SourceNote], max_chars: int) -> str:
chunks = [
"\n".join(
[
f"[{note.source_id}] {note.title}",
f"URL: {note.final_url or note.url}",
f"Canonical URL: {note.canonical_url}",
f"Found via: {note.query}",
f"Provider/rank: {note.provider}/{note.rank}",
f"Retrieved: {note.retrieved_at}",
f"Content hash: {note.content_hash}",
f"Note: {note.summary}",
f"Chunk evidence: {_chunk_digest(note.chunks, max_chars=1000)}",
]
)
for note in notes
]
return "\n\n".join(chunks)[:max_chars]
Finally, add error recording:
def _record_error(
self,
stage: str,
exc: Exception,
*,
query: str = "",
url: str = "",
source_id: str = "",
provider: str = "",
) -> None:
message = str(exc)
self.progress(f"{stage.replace('_', ' ').title()} failed: {message}")
self.artifacts.write(
"errors",
CollectionError(
stage=stage,
message=message,
query=query,
url=url,
source_id=source_id,
provider=provider,
),
)
At this point, the core research loop is in place.
Adding the CLI
Now we need a command-line entry point. Create main.py:
from __future__ import annotations
import argparse
from pathlib import Path
from dotenv import load_dotenv
from research_agent.agent import (
DEFAULT_ITERATIONS,
DEFAULT_MAX_CHUNKS_PER_SOURCE,
DEFAULT_MAX_SOURCES,
DEFAULT_QUERY_COUNT,
DEFAULT_REPORT_STYLE,
DEFAULT_RESULTS_PER_QUERY,
ResearchAgent,
)
from research_agent.artifacts import ArtifactWriter
from research_agent.venice import VeniceClient, VeniceError
from research_agent.web import WebSearch
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run a minimal deep research agent powered by Venice AI.",
)
parser.add_argument("topic", nargs="+", help="Research topic, wrapped in quotes for best results.")
parser.add_argument("--model", help="Venice model name. Defaults to VENICE_MODEL or openai-gpt-55.")
parser.add_argument("--iterations", type=int, default=DEFAULT_ITERATIONS)
parser.add_argument("--queries", type=int, default=DEFAULT_QUERY_COUNT)
parser.add_argument("--results", type=int, default=DEFAULT_RESULTS_PER_QUERY)
parser.add_argument("--output", "--markdown-output", dest="output", type=Path)
parser.add_argument("--artifacts", type=Path, help="Optional directory for JSONL research artifacts.")
parser.add_argument("--providers", default="duckduckgo", help="Comma-separated providers: duckduckgo, arxiv.")
parser.add_argument("--max-sources", type=int, default=DEFAULT_MAX_SOURCES)
parser.add_argument("--chunk-chars", type=int, default=3000)
parser.add_argument("--max-chunks-per-source", type=int, default=DEFAULT_MAX_CHUNKS_PER_SOURCE)
parser.add_argument(
"--report-style",
choices=["brief", "standard", "deep"],
default=DEFAULT_REPORT_STYLE,
help=f"Final report depth. Default: {DEFAULT_REPORT_STYLE}.",
)
parser.add_argument("--quiet", action="store_true", help="Hide progress messages.")
return parser.parse_args()
The CLI exposes the knobs you’ll actually tune during research:
| Option | What it controls |
|---|
--iterations | Number of research passes |
--queries | Search queries generated per pass |
--results | Results read per provider for each query |
--providers | Search providers, such as duckduckgo or duckduckgo,arxiv |
--max-sources | Maximum usable sources to collect |
--chunk-chars | Approximate chunk size before source evidence extraction |
--max-chunks-per-source | Number of chunks summarized per source |
--report-style | Final report depth: brief, standard, or deep |
--artifacts | Directory for JSONL audit records |
--output | Path for the final Markdown report |
Now wire everything together:
def main() -> int:
load_dotenv()
args = parse_args()
topic = " ".join(args.topic)
try:
venice = VeniceClient.from_env(model=args.model)
progress = None if args.quiet else lambda message: print(f"[agent] {message}")
provider_names = [name.strip() for name in args.providers.split(",") if name.strip()]
with WebSearch.from_provider_names(
provider_names,
chunk_chars=args.chunk_chars,
scraper=venice.scrape,
) as web:
agent = ResearchAgent(
venice=venice,
web=web,
artifacts=ArtifactWriter(args.artifacts),
progress=progress,
max_sources=args.max_sources,
max_chunks_per_source=args.max_chunks_per_source,
report_style=args.report_style,
)
report = agent.run(
topic,
iterations=args.iterations,
query_count=args.queries,
results_per_query=args.results,
)
except ValueError as exc:
print(f"Configuration error: {exc}")
return 1
except VeniceError as exc:
print(f"Venice API error: {exc}")
return 1
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(report.markdown, encoding="utf-8")
print(f"\nSaved report to {args.output}")
else:
print()
print(report.markdown)
if report.artifacts_dir:
print(f"Saved research artifacts to {report.artifacts_dir}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
This gives us a working local research CLI.
Running the Agent
Run a quick research pass:
uv run python main.py "How are AI agents changing software engineering workflows?"
Write the report to a Markdown file:
uv run python main.py "state of open source LLM inference in 2026" \
--output reports/inference.md
Use more sources and multiple providers:
uv run python main.py "agentic coding research" \
--providers duckduckgo,arxiv \
--iterations 3 \
--queries 5 \
--results 4 \
--max-sources 12
Choose the final report style:
uv run python main.py "AI agents in software engineering" --report-style deep
Use brief for a concise source-backed briefing, standard for a fuller survey, and deep for the staged outline/section/editor workflow.
Save auditable artifacts:
uv run python main.py "privacy tradeoffs in hosted LLM APIs" \
--output reports/privacy.md \
--artifacts runs/privacy
When artifacts are enabled, you’ll see files like:
runs/privacy/
queries.jsonl
research_gaps.jsonl
search_results.jsonl
fetches.jsonl
source_chunks.jsonl
chunk_summaries.jsonl
source_notes.jsonl
dedupe.jsonl
errors.jsonl
report_outline.jsonl
report_sections.jsonl
report_editor.jsonl
reports.jsonl
These files are useful when you want to understand how the agent reached a conclusion. For example, source_notes.jsonl shows the summarized source evidence, research_gaps.jsonl shows why follow-up searches were generated, and errors.jsonl shows pages that failed during search, scraping, or summarization.
Privacy and Reliability Notes
A research agent touches several systems, so it helps to be precise about what goes where:
| Layer | What sees the data |
|---|
| Local CLI | Topic, configuration, source notes, artifacts, and final reports stay on your machine |
| Search provider | Search queries are sent to the provider you choose, such as DuckDuckGo or arXiv |
| Venice scrape | Public source URLs are sent to Venice’s scrape endpoint |
| Venice chat completions | Prompts, source chunks, source notes, and report-generation instructions are sent to Venice |
| Output files | Markdown reports and JSONL artifacts are written locally |
If you want to keep more of the search path inside Venice, you can adapt the provider layer to call Venice’s POST /augment/search endpoint instead of querying DuckDuckGo directly. The reference implementation uses lightweight public providers so the demo stays easy to run and understand.
For reliability, keep these defaults conservative:
- Use retries for Venice calls and web requests.
- Add a small
--request-delay if you are reading many pages from the same host.
- Cap
--max-sources so broad topics do not run indefinitely.
- Save
--artifacts for important reports so you can audit the final output.
- Treat the report as a briefing, not ground truth. Follow citations back to the original source when accuracy matters.
Testing the Pieces
You do not need live web requests or Venice calls to test most of the system. The reference repo uses fake Venice and fake web classes to test the research loop, dedupe behavior, artifacts, and report prompts.
A useful first test is URL canonicalization:
from research_agent.models import canonicalize_url
def test_canonicalize_url_removes_tracking_params():
url = "https://example.com/post?utm_source=x&b=2&a=1#section"
assert canonicalize_url(url) == "https://example.com/post?a=1&b=2"
Then test that duplicate content gets skipped:
from research_agent.models import SearchResult, WebPage, chunk_text
class FakeWeb:
def search(self, query: str, limit: int = 5) -> list[SearchResult]:
return [
SearchResult(title="First source", url="https://example.com/a", snippet="snippet"),
SearchResult(title="Mirror", url="https://example.com/b", snippet="snippet"),
]
def fetch(self, result: SearchResult) -> WebPage:
text = "This page contains relevant evidence. " * 5
return WebPage(
title=result.title,
url=result.url,
final_url=result.url,
text=text,
content_hash="same-content",
chunks=chunk_text(text, chunk_chars=80, overlap=10),
)
Fakes make agent tests much faster and less flaky. You can verify the orchestration logic without relying on live search results, network conditions, or model output.
Benchmarking
Many AI providers now have their own deep research workflows, so the reference repo includes a simple benchmark against Perplexity’s Deep Research tool. Both agents were asked to write a report on AI agent framework architecture, then the generated reports were checked into the GitHub repo.
This is not meant to be a formal benchmark. It is a practical way to inspect report structure, source coverage, citation quality, and whether the agent over-focuses on one source cluster. That is also why the updated implementation tracks research_gaps.jsonl and source balance before follow-up searches.
Extending This Example
Once the baseline agent works, here are practical ways to improve it:
- Add a Venice search provider using
POST /augment/search.
- Store reports and artifacts in a small SQLite database instead of JSONL files.
- Add source allowlists or blocklists for trusted research domains.
- Add PDF support by combining Venice scrape with document parsing for sources that do not expose clean HTML.
- Add an evaluation set of topics and expected source types so you can compare research quality after prompt changes.
- Add a review step that asks Venice to find unsupported claims in the final report before saving it.
The biggest upgrade is usually better source selection. Query generation helps, but you can also improve quality by preferring primary sources, standards documents, official docs, papers, changelogs, and dataset pages over low-signal summaries.
Finishing Up
Thanks for reading! Hopefully this helped you build a practical private research agent with Python and the Venice API.
The useful pattern here is not just “ask a model to research something.” It is breaking research into auditable steps: plan searches, collect sources, extract evidence, write source notes, follow up on gaps, and synthesize with citations. By keeping those steps explicit, we get a research workflow that is easier to inspect, test, and improve over time.