Saltar al contenido principal
Los agentes de investigación resultan útiles cuando se necesita más que un único resultado de búsqueda o una respuesta rápida de un modelo. Un buen agente de investigación puede convertir un tema amplio en consultas de búsqueda, recopilar fuentes, extraer la evidencia importante, hacer un seguimiento de las lagunas y escribir un informe con citas que puedas revisar después. En este tutorial, construiremos un agente de investigación privado utilizando Python y la API de Venice. Al final tendrás una CLI capaz de investigar un tema, extraer páginas públicas a Markdown, resumir fragmentos de las fuentes, ejecutar pases de investigación adicionales conscientes de las lagunas y generar un informe con citas y, opcionalmente, artefactos JSONL locales. ¿Te interesa la implementación completa del código? Échale un vistazo al repositorio de GitHub. Antes de continuar, necesitarás una clave API de Venice:
export VENICE_API_KEY=<my-key>

Qué vamos a construir

La implementación de referencia es un pequeño proyecto en Python con varias partes claras:
ParteQué hace
CLIAcepta un tema de investigación, modelo, proveedores, ajustes de profundidad, ruta de salida y directorio de artefactos
Cliente de VeniceLlama a chat completions, chat completions con streaming y POST /augment/scrape
Capa de búsquedaBusca en DuckDuckGo por defecto, con descubrimiento opcional de artículos en arXiv
Modelos de datosRealiza un seguimiento de URL de fuentes, URL canónicas, fragmentos, evidencias, notas, errores e informes
Agente de investigaciónPlanifica búsquedas, lee fuentes, extrae evidencias, analiza lagunas, genera consultas de seguimiento y escribe el informe final
Escritor de artefactosAlmacena registros JSONL auditables de consultas, lagunas de investigación, resultados, descargas, fragmentos, notas de fuentes, borradores de informe, errores e informes
El flujo es así: Tubería del agente de investigación privado
  1. Pide a Venice que genere consultas de búsqueda diversas para el tema.
  2. Busca en la web con uno o más proveedores.
  3. Elimina URL duplicadas antes de leerlas.
  4. Usa el endpoint de scrape de Venice para convertir cada página fuente pública en Markdown.
  5. Divide las páginas largas en fragmentos.
  6. Pide a Venice que extraiga evidencias de cada fragmento.
  7. Pide a Venice que convierta las evidencias de los fragmentos en notas de la fuente.
  8. Identifica lagunas de investigación y problemas de equilibrio entre fuentes antes de generar consultas de seguimiento.
  9. Pide a Venice que sintetice el informe final con citas estilo notas al pie.
Esto es “privado” en el sentido práctico de que el agente mantiene la orquestación, las notas de fuente, los artefactos y los informes finales en tu máquina. Venice se encarga de las llamadas al modelo y del scraping a través de su API. La implementación de referencia por defecto sigue enviando las consultas de búsqueda a DuckDuckGo o arXiv, así que considera la elección del proveedor como parte de tu diseño de privacidad.

Configurando el proyecto

El proyecto de referencia usa Python 3.13 y uv, pero el mismo código funciona con un entorno virtual normal. Crea un nuevo proyecto:
mkdir venice-research-agent
cd venice-research-agent
uv init
Instala las dependencias:
uv add httpx beautifulsoup4 python-dotenv
Si prefieres pip, crea un entorno virtual e instala los mismos paquetes:
python -m venv .venv
source .venv/bin/activate
pip install "httpx>=0.28.0" "beautifulsoup4>=4.13.0" "python-dotenv>=1.0.0"
Crea un archivo .env para el desarrollo local:
VENICE_API_KEY=your_venice_api_key_here
VENICE_MODEL=openai-gpt-55
Usamos VENICE_MODEL para que puedas cambiar el modelo sin editar el código. La implementación de referencia actualmente usa openai-gpt-55 por defecto, pero puedes cambiarlo por otro modelo de chat disponible en tu cuenta de Venice.

Creando los modelos de datos

Antes de escribir la lógica del agente, definiremos los objetos que viajan por la tubería. Estos modelos hacen que el resto del código sea más fácil de razonar porque cada fuente lleva su procedencia: de dónde proviene, qué consulta la encontró, cuándo se recuperó y cómo se fragmentó. Crea research_agent/models.py:
from __future__ import annotations

import hashlib
from dataclasses import dataclass, field
from datetime import UTC, datetime
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse

TRACKING_PARAMS = {
    "fbclid",
    "gclid",
    "igshid",
    "mc_cid",
    "mc_eid",
    "msclkid",
    "ref",
    "ref_src",
}


@dataclass(frozen=True)
class SearchResult:
    title: str
    url: str
    snippet: str
    query: str = ""
    rank: int = 0
    provider: str = "duckduckgo"
    canonical_url: str = ""

    def __post_init__(self) -> None:
        if not self.canonical_url:
            object.__setattr__(self, "canonical_url", canonicalize_url(self.url))


@dataclass(frozen=True)
class ScrapeResult:
    url: str
    content: str
    title: str = ""
    final_url: str = ""
    content_type: str = "text/markdown"


@dataclass(frozen=True)
class TextChunk:
    chunk_id: str
    text: str
    start: int
    end: int
    content_hash: str


@dataclass(frozen=True)
class WebPage:
    title: str
    url: str
    text: str
    final_url: str = ""
    canonical_url: str = ""
    content_type: str = ""
    retrieved_at: str = ""
    content_hash: str = ""
    chunks: tuple[TextChunk, ...] = field(default_factory=tuple)

    def __post_init__(self) -> None:
        final_url = self.final_url or self.url
        object.__setattr__(self, "final_url", final_url)
        if not self.canonical_url:
            object.__setattr__(self, "canonical_url", canonicalize_url(final_url))
        if not self.retrieved_at:
            object.__setattr__(self, "retrieved_at", utc_now())
        if not self.content_hash:
            object.__setattr__(self, "content_hash", content_hash(self.text))


@dataclass(frozen=True)
class EvidenceChunk:
    chunk_id: str
    text: str
    summary: str
    quotes: tuple[str, ...] = field(default_factory=tuple)


@dataclass(frozen=True)
class SourceNote:
    source_id: str
    title: str
    url: str
    query: str
    summary: str
    canonical_url: str = ""
    final_url: str = ""
    rank: int = 0
    snippet: str = ""
    provider: str = "duckduckgo"
    retrieved_at: str = ""
    content_type: str = ""
    content_hash: str = ""
    chunks: tuple[EvidenceChunk, ...] = field(default_factory=tuple)
Los campos importantes aquí son canonical_url, content_hash y chunks. canonical_url permite al agente evitar leer la misma fuente repetidamente cuando los resultados de búsqueda solo difieren en parámetros de seguimiento o fragmentos. content_hash ayuda a detectar páginas duplicadas aunque vivan en URL distintas. chunks permite resumir páginas largas en piezas más pequeñas en lugar de perder evidencias útiles por los límites de contexto. Añade las funciones auxiliares debajo de los dataclasses:
def utc_now() -> str:
    return datetime.now(UTC).isoformat()


def content_hash(text: str) -> str:
    return hashlib.sha256(text.encode("utf-8")).hexdigest()


def canonicalize_url(raw_url: str) -> str:
    if not raw_url:
        return ""

    parsed = urlparse(raw_url.strip())
    if parsed.scheme not in {"http", "https"} or not parsed.netloc:
        return ""

    scheme = parsed.scheme.lower()
    netloc = parsed.netloc.lower()
    path = parsed.path or "/"
    if path != "/":
        path = path.rstrip("/")

    query_pairs = [
        (key, value)
        for key, value in parse_qsl(parsed.query, keep_blank_values=True)
        if not _is_tracking_param(key)
    ]
    query = urlencode(sorted(query_pairs), doseq=True)
    return urlunparse((scheme, netloc, path, "", query, ""))


def chunk_text(text: str, *, chunk_chars: int = 3000, overlap: int = 250) -> tuple[TextChunk, ...]:
    clean = text.strip()
    if not clean:
        return ()
    if chunk_chars <= 0:
        raise ValueError("chunk_chars must be greater than 0")
    if overlap < 0 or overlap >= chunk_chars:
        raise ValueError("overlap must be at least 0 and smaller than chunk_chars")

    chunks: list[TextChunk] = []
    start = 0
    index = 1
    while start < len(clean):
        end = min(len(clean), start + chunk_chars)
        chunk = clean[start:end].strip()
        if chunk:
            chunks.append(
                TextChunk(
                    chunk_id=f"C{index}",
                    text=chunk,
                    start=start,
                    end=end,
                    content_hash=content_hash(chunk),
                )
            )
            index += 1
        if end == len(clean):
            break
        start = end - overlap

    return tuple(chunks)


def _is_tracking_param(key: str) -> bool:
    lowered = key.lower()
    return lowered.startswith("utm_") or lowered in TRACKING_PARAMS
El fragmentado aquí es deliberadamente simple: fragmentos de tamaño fijo en caracteres con superposición. Es suficiente para un agente de investigación de demostración porque el endpoint de scrape de Venice devuelve Markdown, que suele estar mucho más limpio que el HTML en bruto. Para investigación en producción sobre documentos técnicos largos, puedes mejorarlo dividiendo por encabezados, párrafos o cantidad de tokens.

Construyendo el cliente de Venice

A continuación, crearemos un pequeño cliente de Venice. Podrías utilizar el SDK de Python de OpenAI para las chat completions porque Venice es compatible con OpenAI, pero la implementación de referencia usa httpx directamente para que el mismo cliente pueda llamar al endpoint POST /augment/scrape de Venice. Crea research_agent/venice.py:
from __future__ import annotations

import json
import os
import time
from dataclasses import dataclass
from typing import Any

import httpx

from .models import ScrapeResult

DEFAULT_BASE_URL = "https://api.venice.ai/api/v1"
DEFAULT_MODEL = "openai-gpt-55"
RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}


class VeniceError(RuntimeError):
    """Raised when the Venice API returns an unusable response."""


@dataclass(frozen=True)
class VeniceClient:
    api_key: str
    model: str = DEFAULT_MODEL
    base_url: str = DEFAULT_BASE_URL
    timeout: float = 60.0
    max_retries: int = 2
    backoff_seconds: float = 1.0

    @classmethod
    def from_env(cls, model: str | None = None, *, max_retries: int = 2) -> "VeniceClient":
        api_key = os.getenv("VENICE_API_KEY")
        if not api_key:
            raise VeniceError("VENICE_API_KEY is required.")

        return cls(
            api_key=api_key,
            model=model or os.getenv("VENICE_MODEL", DEFAULT_MODEL),
            base_url=os.getenv("VENICE_BASE_URL", DEFAULT_BASE_URL).rstrip("/"),
            max_retries=max_retries,
        )
El helper from_env() mantiene los secretos fuera del código fuente. También resulta cómodo para el desarrollo local porque python-dotenv puede cargar VENICE_API_KEY y VENICE_MODEL desde .env. Ahora añade las chat completions:
    def chat(
        self,
        messages: list[dict[str, str]],
        *,
        temperature: float = 0.2,
        max_tokens: int = 1600,
    ) -> str:
        payload: dict[str, Any] = {
            "model": self.model,
            "messages": messages,
            "temperature": temperature,
            "max_tokens": max_tokens,
        }

        data = self._post_json("/chat/completions", payload)
        try:
            return data["choices"][0]["message"]["content"].strip()
        except (KeyError, IndexError, TypeError) as exc:
            raise VeniceError(f"Unexpected Venice API response: {data}") from exc
Para el informe final, queremos usar streaming porque los informes profundos pueden tardar bastante más (al producir mucho más texto). Esto puede provocar problemas de timeout en solicitudes donde puede tardar muchísimo tiempo en producirse la salida final. Al usar streaming, podemos eliminar este problema y hacer que la solicitud sea más resistente a los fallos por timeout:
    def chat_stream(
        self,
        messages: list[dict[str, str]],
        *,
        temperature: float = 0.2,
        max_tokens: int = 1600,
    ) -> str:
        payload: dict[str, Any] = {
            "model": self.model,
            "messages": messages,
            "temperature": temperature,
            "max_tokens": max_tokens,
            "stream": True,
        }
        return self._post_chat_stream("/chat/completions", payload).strip()
Luego añade el scraping:
    def scrape(self, url: str) -> ScrapeResult:
        data = self._post_json("/augment/scrape", {"url": url})
        content = _first_string(data, "content", "markdown", "text")
        if not content:
            raise VeniceError(f"Unexpected Venice scrape response: {data}")

        return ScrapeResult(
            url=url,
            final_url=_first_string(data, "final_url", "url", "source_url") or url,
            title=_first_string(data, "title"),
            content=content,
            content_type="text/markdown",
        )
El endpoint de scrape de Venice acepta una URL accesible públicamente y devuelve la página como Markdown. Eso significa que el modelo no necesita parsear HTML en bruto, y tus prompts de extracción pueden trabajar con texto más limpio. El helper restante gestiona los reintentos y el parseo de respuestas:
    def _post_json(self, path: str, payload: dict[str, Any]) -> dict[str, Any]:
        for attempt in range(self.max_retries + 1):
            try:
                response = httpx.post(
                    f"{self.base_url}{path}",
                    headers={
                        "Authorization": f"Bearer {self.api_key}",
                        "Content-Type": "application/json",
                    },
                    json=payload,
                    timeout=self.timeout,
                )
                if response.status_code in RETRYABLE_STATUS_CODES and attempt < self.max_retries:
                    time.sleep(self.backoff_seconds * (2**attempt))
                    continue
                response.raise_for_status()
                data = response.json()
                if not isinstance(data, dict):
                    raise VeniceError(f"Unexpected Venice API response: {data}")
                return data
            except httpx.HTTPError as exc:
                if attempt < self.max_retries:
                    time.sleep(self.backoff_seconds * (2**attempt))
                    continue
                raise VeniceError(f"Could not reach Venice API: {exc}") from exc

        raise VeniceError("Could not reach Venice API")


def _first_string(data: dict[str, Any], *keys: str) -> str:
    for key in keys:
        value = data.get(key)
        if isinstance(value, str) and value.strip():
            return value.strip()

    for nested_key in ("data", "result", "scrape"):
        nested = data.get(nested_key)
        if isinstance(nested, dict):
            value = _first_string(nested, *keys)
            if value:
                return value

    return ""
El repositorio completo también incluye un helper robusto _post_chat_stream() que lee server-sent events de las chat completions en streaming. Puedes empezar sin streaming y añadirlo una vez que funcione el resto del flujo de investigación.

Añadiendo proveedores de búsqueda

La capa de búsqueda tiene dos tareas: encontrar URL de fuentes y obtener esas URL a través del scraper de Venice. La implementación de referencia usa el endpoint HTML de DuckDuckGo para búsquedas web generales y la API Atom de arXiv para artículos. Crea research_agent/web.py:
from __future__ import annotations

import re
import xml.etree.ElementTree as ET
from collections.abc import Callable, Iterable
from urllib.parse import parse_qs, unquote, urlparse

import httpx
from bs4 import BeautifulSoup

from .models import ScrapeResult, SearchResult, TextChunk, WebPage, canonicalize_url, chunk_text, content_hash, utc_now

USER_AGENT = "venice-research-agent-demo/0.1 (+https://venice.ai)"


class SearchProvider:
    name = "provider"

    def search(self, web: "WebSearch", query: str, limit: int) -> list[SearchResult]:
        raise NotImplementedError
Ahora añade DuckDuckGo:
class DuckDuckGoProvider(SearchProvider):
    name = "duckduckgo"

    def search(self, web: "WebSearch", query: str, limit: int) -> list[SearchResult]:
        response = web.get("https://duckduckgo.com/html/", params={"q": query})
        soup = BeautifulSoup(response.text, "html.parser")
        results: list[SearchResult] = []
        seen_urls: set[str] = set()

        for node in soup.select(".result"):
            link = node.select_one(".result__a")
            if link is None:
                continue

            url = _normalize_duckduckgo_url(link.get("href", ""))
            canonical_url = canonicalize_url(url)
            if not canonical_url or canonical_url in seen_urls:
                continue

            snippet = node.select_one(".result__snippet")
            results.append(
                SearchResult(
                    title=_clean_text(link.get_text(" ", strip=True)),
                    url=url,
                    snippet=_clean_text(snippet.get_text(" ", strip=True) if snippet else ""),
                    query=query,
                    rank=len(results) + 1,
                    provider=self.name,
                    canonical_url=canonical_url,
                )
            )
            seen_urls.add(canonical_url)

            if len(results) >= limit:
                break

        return results
Y arXiv:
class ArxivProvider(SearchProvider):
    name = "arxiv"

    def search(self, web: "WebSearch", query: str, limit: int) -> list[SearchResult]:
        response = web.get(
            "https://export.arxiv.org/api/query",
            params={
                "search_query": f"all:{query}",
                "start": 0,
                "max_results": limit,
                "sortBy": "relevance",
            },
        )
        namespace = {"atom": "http://www.w3.org/2005/Atom"}
        root = ET.fromstring(response.text)
        results: list[SearchResult] = []

        for entry in root.findall("atom:entry", namespace):
            title = _clean_text(_xml_text(entry.find("atom:title", namespace)))
            summary = _clean_text(_xml_text(entry.find("atom:summary", namespace)))
            url = _xml_text(entry.find("atom:id", namespace)).strip()
            canonical_url = canonicalize_url(url)
            if not url or not canonical_url:
                continue

            results.append(
                SearchResult(
                    title=title or url,
                    url=url,
                    snippet=summary,
                    query=query,
                    rank=len(results) + 1,
                    provider=self.name,
                    canonical_url=canonical_url,
                )
            )

            if len(results) >= limit:
                break

        return results
La clase WebSearch coordina los proveedores y obtiene las páginas:
class WebSearch:
    def __init__(
        self,
        timeout: float = 15.0,
        *,
        providers: Iterable[SearchProvider] | None = None,
        chunk_chars: int = 3000,
        scraper: Callable[[str], ScrapeResult] | None = None,
    ) -> None:
        self._client = httpx.Client(
            timeout=timeout,
            follow_redirects=True,
            headers={"User-Agent": USER_AGENT},
        )
        self.providers = tuple(providers or (DuckDuckGoProvider(),))
        self.chunk_chars = chunk_chars
        self.scraper = scraper

    @classmethod
    def from_provider_names(cls, provider_names: Iterable[str], **kwargs: object) -> "WebSearch":
        providers = [_provider_from_name(name) for name in provider_names]
        return cls(providers=providers, **kwargs)

    def search(self, query: str, limit: int = 5) -> list[SearchResult]:
        results: list[SearchResult] = []
        seen_urls: set[str] = set()

        for provider in self.providers:
            for result in provider.search(self, query, limit):
                if result.canonical_url in seen_urls:
                    continue
                results.append(result)
                seen_urls.add(result.canonical_url)

        return results

    def fetch(self, result: SearchResult) -> WebPage:
        if self.scraper is None:
            raise RuntimeError("WebSearch.fetch requires a Venice scrape function.")

        scraped = self.scraper(result.url)
        text = scraped.content.strip() or result.snippet
        chunks = self._chunk_text(text)
        return WebPage(
            title=scraped.title or result.title,
            url=result.url,
            final_url=scraped.final_url or scraped.url or result.url,
            canonical_url=canonicalize_url(scraped.final_url or result.url),
            text=text,
            content_type=scraped.content_type or "text/markdown",
            retrieved_at=utc_now(),
            content_hash=content_hash(text),
            chunks=chunks,
        )

    def get(self, url: str, *, params: dict[str, object] | None = None) -> httpx.Response:
        response = self._client.get(url, params=params)
        response.raise_for_status()
        return response

    def close(self) -> None:
        self._client.close()

    def __enter__(self) -> "WebSearch":
        return self

    def __exit__(self, *_: object) -> None:
        self.close()

    def _chunk_text(self, text: str) -> tuple[TextChunk, ...]:
        overlap = min(250, max(0, self.chunk_chars // 10))
        return chunk_text(text, chunk_chars=self.chunk_chars, overlap=overlap)
La implementación de referencia completa añade reintentos, retrasos de solicitud por host y errores más amigables. Vale la pena conservarlos porque los agentes de investigación pasan mucho tiempo lidiando con páginas que bloquean la automatización, redirigen de forma inesperada o devuelven errores transitorios. Añade los pequeños helpers de proveedor al final:
def _normalize_duckduckgo_url(raw_url: str) -> str:
    if not raw_url:
        return ""

    parsed = urlparse(raw_url)
    if parsed.netloc.endswith("duckduckgo.com") and parsed.path == "/l/":
        target = parse_qs(parsed.query).get("uddg", [""])[0]
        return unquote(target)

    if parsed.scheme in {"http", "https"}:
        return raw_url

    return ""


def _provider_from_name(name: str) -> SearchProvider:
    normalized = name.strip().lower()
    if normalized in {"duckduckgo", "ddg", "web"}:
        return DuckDuckGoProvider()
    if normalized == "arxiv":
        return ArxivProvider()
    raise ValueError(f"Unknown source provider: {name}")


def _clean_text(value: str) -> str:
    return re.sub(r"\s+", " ", value).strip()


def _xml_text(node: ET.Element | None) -> str:
    return "" if node is None or node.text is None else node.text

Escribiendo artefactos locales

Para los flujos de investigación, la auditabilidad importa. Si el informe final dice algo sorprendente, deberías poder inspeccionar qué fuente llevó a ello. Crea research_agent/artifacts.py:
from __future__ import annotations

import json
from dataclasses import asdict, is_dataclass
from pathlib import Path
from typing import Any


class ArtifactWriter:
    def __init__(self, root: Path | None = None) -> None:
        self.root = root
        if self.root is not None:
            self.root.mkdir(parents=True, exist_ok=True)

    @property
    def enabled(self) -> bool:
        return self.root is not None

    def write(self, kind: str, record: object) -> None:
        if self.root is None:
            return

        path = self.root / f"{kind}.jsonl"
        payload = json.dumps(_to_jsonable(record), ensure_ascii=False, sort_keys=True)
        with path.open("a", encoding="utf-8") as file:
            file.write(f"{payload}\n")


def _to_jsonable(value: object) -> Any:
    if is_dataclass(value):
        return _to_jsonable(asdict(value))
    if isinstance(value, Path):
        return str(value)
    if isinstance(value, dict):
        return {str(key): _to_jsonable(item) for key, item in value.items()}
    if isinstance(value, (list, tuple)):
        return [_to_jsonable(item) for item in value]
    return value
Esto escribe un objeto JSON por línea, lo que facilita añadir, inspeccionar y procesar los artefactos con herramientas de línea de comandos más adelante.

Construyendo el agente de investigación

Ahora que tenemos Venice, búsqueda, modelos y artefactos, podemos construir el agente propiamente dicho. Crea research_agent/agent.py:
from __future__ import annotations

import json
from collections.abc import Callable
from textwrap import dedent

from .artifacts import ArtifactWriter
from .models import CollectionError, EvidenceChunk, ResearchReport, SearchResult, SourceNote, WebPage, utc_now
from .venice import VeniceClient, VeniceError
from .web import WebSearch

SYSTEM_PROMPT = """You are a careful research assistant.
Use the supplied source material only when making factual claims.
Flag uncertainty, contradictions, and missing context instead of filling gaps."""

ProgressCallback = Callable[[str], None]

DEFAULT_ITERATIONS = 3
DEFAULT_QUERY_COUNT = 6
DEFAULT_RESULTS_PER_QUERY = 4
DEFAULT_MAX_SOURCES = 40
DEFAULT_MAX_CHUNKS_PER_SOURCE = 6
El system prompt es la barrera de comportamiento principal. No queremos que el modelo produzca un informe que suene impresionante a partir de su memoria. Queremos que utilice el material fuente y que señale la incertidumbre cuando la evidencia es escasa. También necesitamos dos dataclasses finales en models.py si todavía no las has añadido:
@dataclass(frozen=True)
class CollectionError:
    stage: str
    message: str
    query: str = ""
    url: str = ""
    source_id: str = ""
    provider: str = ""


@dataclass(frozen=True)
class ResearchReport:
    topic: str
    markdown: str
    sources: list[SourceNote]
    artifacts_dir: str | None = None
A continuación, define ResearchAgent:
class ResearchAgent:
    def __init__(
        self,
        venice: VeniceClient,
        web: WebSearch | None = None,
        artifacts: ArtifactWriter | None = None,
        progress: ProgressCallback | None = None,
        max_sources: int | None = DEFAULT_MAX_SOURCES,
        max_chunks_per_source: int = DEFAULT_MAX_CHUNKS_PER_SOURCE,
    ) -> None:
        self.venice = venice
        self.web = web or WebSearch(scraper=venice.scrape)
        self.artifacts = artifacts or ArtifactWriter()
        self.progress = progress or (lambda _: None)
        self.max_sources = max_sources
        self.max_chunks_per_source = max_chunks_per_source
El método run() coordina los pases de investigación:
    def run(
        self,
        topic: str,
        *,
        iterations: int = DEFAULT_ITERATIONS,
        query_count: int = DEFAULT_QUERY_COUNT,
        results_per_query: int = DEFAULT_RESULTS_PER_QUERY,
    ) -> ResearchReport:
        notes: list[SourceNote] = []
        seen_source_keys: set[str] = set()
        seen_content_hashes: set[str] = set()
        queries = self._initial_queries(topic, query_count)

        self.artifacts.write("queries", {"stage": "initial", "topic": topic, "queries": queries})

        for iteration in range(1, iterations + 1):
            self.progress(f"Research pass {iteration}/{iterations}: {', '.join(queries)}")
            self._collect_notes(
                topic,
                queries,
                results_per_query,
                seen_source_keys,
                seen_content_hashes,
                notes,
                iteration,
            )

            if iteration < iterations:
                gaps, queries = self._gap_follow_up_queries(topic, notes, query_count)
                self.artifacts.write(
                    "research_gaps",
                    {
                        "topic": topic,
                        "after_iteration": iteration,
                        "source_balance": _source_cluster_counts(notes),
                        "gaps": gaps,
                        "queries": queries,
                    },
                )
                self.artifacts.write(
                    "queries",
                    {
                        "stage": "follow_up",
                        "topic": topic,
                        "iteration": iteration + 1,
                        "gap_count": len(gaps),
                        "queries": queries,
                    },
                )

        report = self._write_report(topic, notes)
        self.artifacts.write(
            "reports",
            {
                "topic": topic,
                "source_count": len(notes),
                "generated_at": utc_now(),
                "markdown": report,
            },
        )

        return ResearchReport(
            topic=topic,
            markdown=report,
            sources=notes,
            artifacts_dir=str(self.artifacts.root) if self.artifacts.root is not None else None,
        )
Los dos conjuntos seen_* son los que impiden que el agente pierda tiempo con fuentes duplicadas. La deduplicación por URL detecta enlaces repetidos. La deduplicación por hash de contenido detecta mirrors, publicaciones sindicadas y páginas que redirigen al mismo contenido final.

Planificando búsquedas iniciales y de seguimiento

La primera llamada al modelo convierte el tema en consultas de búsqueda:
    def _initial_queries(self, topic: str, count: int) -> list[str]:
        prompt = dedent(
            f"""
            Create {count} diverse web search queries for researching this topic:
            {topic}

            Cover background, recent developments, primary sources, criticism, and data.
            Include at least one query likely to find primary sources or datasets.
            Return JSON only in this shape: {{"queries": ["..."]}}
            """
        ).strip()
        return self._query_list(prompt, count, fallback=[topic])
Después de cada pase de investigación, el agente actualizado realiza un paso de análisis de lagunas más deliberado. Examina las notas actuales, cuenta los clusters de fuentes por dominio, pregunta a Venice qué cobertura falta, escribe esas lagunas en los artefactos y luego utiliza las consultas resultantes para el siguiente pase. Bucle de análisis de lagunas Empieza haciendo un seguimiento del equilibrio entre fuentes:
from urllib.parse import urlparse


def _source_cluster_counts(notes: list[SourceNote]) -> list[dict[str, object]]:
    total = len(notes)
    if total == 0:
        return []

    clusters: dict[str, list[str]] = {}
    for note in notes:
        cluster = _source_cluster(note)
        clusters.setdefault(cluster, []).append(note.source_id)

    return [
        {
            "cluster": cluster,
            "source_count": len(source_ids),
            "source_share": round(len(source_ids) / total, 3),
            "source_ids": source_ids,
        }
        for cluster, source_ids in sorted(
            clusters.items(), key=lambda item: (-len(item[1]), item[0])
        )
    ]


def _source_cluster(note: SourceNote) -> str:
    url = note.canonical_url or note.final_url or note.url
    host = urlparse(url).netloc.lower()
    if host.startswith("www."):
        host = host[4:]
    return host or "unknown"


def _source_balance_digest(notes: list[SourceNote], limit: int = 8) -> str:
    clusters = _source_cluster_counts(notes)
    if not clusters:
        return "No source clusters yet."

    total = len(notes)
    lines = [
        f"- {cluster['cluster']}: {cluster['source_count']}/{total} sources "
        f"({cluster['source_share']:.0%}); IDs: {', '.join(cluster['source_ids'])}"
        for cluster in clusters[:limit]
    ]
    return "\n".join(lines)
Esto le da al agente una forma sencilla de detectar la captura por un cluster de fuentes. Si cada fuente proviene de una sola empresa, un solo framework o un solo dominio, las consultas de seguimiento deberían ampliar deliberadamente el conjunto de fuentes en lugar de recopilar más de lo mismo. Ahora usa esa información de equilibrio al crear las búsquedas de seguimiento:
    def _follow_up_queries(self, topic: str, notes: list[SourceNote], count: int) -> list[str]:
        digest = _source_digest(notes, max_chars=9000)
        source_balance = _source_balance_digest(notes)
        prompt = dedent(
            f"""
            We are researching: {topic}

            Current notes:
            {digest}

            Source balance:
            {source_balance}

            Create {count} follow-up web search queries that fill gaps, verify important claims,
            find primary evidence, and look for dissenting evidence.
            If one source domain, vendor, framework, product, or perspective is overrepresented,
            deliberately broaden beyond it unless the topic explicitly asks for that focus.
            Return JSON only in this shape: {{"queries": ["..."]}}
            """
        ).strip()
        return self._query_list(prompt, count, fallback=[topic])
La implementación de referencia más reciente envuelve esto en _gap_follow_up_queries(), que pide a Venice que devuelva tanto los registros de lagunas como las consultas:
    def _gap_follow_up_queries(
        self, topic: str, notes: list[SourceNote], count: int
    ) -> tuple[list[dict[str, str]], list[str]]:
        if not notes:
            return [], [topic]

        digest = _source_digest(notes, max_chars=12000)
        source_balance = _source_balance_digest(notes)
        prompt = dedent(
            f"""
            Identify coverage gaps before the next research pass.

            Research topic:
            {topic}

            Current source notes:
            {digest}

            Source balance:
            {source_balance}

            Find important missing coverage that would improve a deep research report.
            Look specifically for primary sources, technical concepts, dissenting views,
            overrepresented source clusters, and claims that need verification.

            Return JSON only in this shape:
            {{"gaps": [{{"missing": "...", "why_it_matters": "...", "query": "..."}}],
              "queries": ["targeted web search query"]}}
            """
        ).strip()
        response = self.venice.chat(
            [
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": prompt},
            ],
            temperature=0.3,
            max_tokens=900,
        )

        data = json.loads(response)
        gaps = _clean_gap_records(data.get("gaps"))
        queries = _clean_string_list(data.get("queries"))
        if not queries:
            queries = [gap["query"] for gap in gaps if gap.get("query")]
        return gaps, queries[:count]
Cuando --artifacts está habilitado, estos registros se escriben en research_gaps.jsonl. Esto te da una pista de auditoría útil sobre por qué el agente buscó una consulta concreta en el segundo pase. El parser debería ser tolerante. Si el modelo devuelve JSON mal formado, el agente recurre al tema original como respaldo:
    def _query_list(self, prompt: str, count: int, fallback: list[str]) -> list[str]:
        response = self.venice.chat(
            [
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": prompt},
            ],
            temperature=0.4,
            max_tokens=500,
        )
        try:
            data = json.loads(response)
            queries = data.get("queries", [])
        except (json.JSONDecodeError, AttributeError):
            queries = []

        clean_queries = [
            query.strip()
            for query in queries
            if isinstance(query, str) and query.strip()
        ]
        return (clean_queries or fallback)[:count]
Vale la pena usar este patrón en todo el código del agente: pedir salida estructurada, parsearla y proporcionar un fallback simple cuando la salida no sea utilizable.

Lectura y resumen de fuentes

Ahora recopilamos notas de fuente. El agente busca cada consulta, obtiene cada resultado a través de Venice scrape, fragmenta el Markdown y resume las evidencias útiles.
    def _collect_notes(
        self,
        topic: str,
        queries: list[str],
        results_per_query: int,
        seen_source_keys: set[str],
        seen_content_hashes: set[str],
        notes: list[SourceNote],
        iteration: int,
    ) -> None:
        for query in queries:
            if self.max_sources is not None and len(notes) >= self.max_sources:
                return

            self.progress(f"Searching: {query}")
            try:
                results = self.web.search(query, limit=results_per_query)
            except Exception as exc:
                self._record_error("search", exc, query=query)
                continue

            self.artifacts.write(
                "search_results",
                {"iteration": iteration, "query": query, "results": results},
            )

            for result in results:
                if self.max_sources is not None and len(notes) >= self.max_sources:
                    return

                source_key = result.canonical_url or result.url
                if source_key in seen_source_keys:
                    self.artifacts.write("dedupe", {"reason": "canonical_url", "url": result.url})
                    continue

                seen_source_keys.add(source_key)
                source_id = f"S{len(notes) + 1}"
                note = self._read_source(topic, query, source_id, result, seen_source_keys, seen_content_hashes)
                if note is not None:
                    notes.append(note)
Los fallos individuales en búsqueda y obtención no deberían detener toda la ejecución. La web pública es caótica. Algunas páginas bloquean el scraping, otras devuelven PDFs, otras están caídas y otras redirigen a lugares inesperados. Un agente de investigación debe seguir adelante y registrar lo que falló. Aquí está el método de lectura de fuentes:
    def _read_source(
        self,
        topic: str,
        query: str,
        source_id: str,
        result: SearchResult,
        seen_source_keys: set[str],
        seen_content_hashes: set[str],
    ) -> SourceNote | None:
        self.progress(f"Reading {source_id}: {result.title}")
        try:
            page = self.web.fetch(result)
        except Exception as exc:
            self._record_error("fetch", exc, query=query, url=result.url, source_id=source_id)
            return None

        if page.content_hash in seen_content_hashes:
            self.artifacts.write(
                "dedupe",
                {"reason": "content_hash", "source_id": source_id, "url": result.url},
            )
            return None
        seen_content_hashes.add(page.content_hash)

        chunks = self._summarize_chunks(topic, query, source_id, page)
        if not chunks:
            self._record_error("summarize_chunk", VeniceError("no chunks could be summarized"), url=result.url)
            return None

        summary = self._summarize_source(topic, query, source_id, page, chunks)
        note = SourceNote(
            source_id=source_id,
            title=page.title,
            url=result.url,
            canonical_url=page.canonical_url,
            final_url=page.final_url,
            query=query,
            rank=result.rank,
            snippet=result.snippet,
            provider=result.provider,
            retrieved_at=page.retrieved_at,
            content_type=page.content_type,
            content_hash=page.content_hash,
            chunks=chunks,
            summary=summary,
        )
        self.artifacts.write("source_notes", note)
        return note
Para cada fragmento de la fuente, pide a Venice un resumen corto de la evidencia y citas exactas:
    def _summarize_chunks(
        self,
        topic: str,
        query: str,
        source_id: str,
        page: WebPage,
    ) -> tuple[EvidenceChunk, ...]:
        evidence: list[EvidenceChunk] = []
        for chunk in page.chunks[: self.max_chunks_per_source]:
            prompt = dedent(
                f"""
                Topic: {topic}
                Search query: {query}
                Source ID: {source_id}
                Chunk ID: {chunk.chunk_id}
                Source title: {page.title}
                Source URL: {page.final_url}

                Source chunk:
                {chunk.text}

                Extract only evidence relevant to the topic.
                Return JSON only in this shape:
                {{"summary": "...", "quotes": ["short exact quote", "..."]}}
                """
            ).strip()

            try:
                response = self.venice.chat(
                    [
                        {"role": "system", "content": SYSTEM_PROMPT},
                        {"role": "user", "content": prompt},
                    ],
                    temperature=0.1,
                    max_tokens=600,
                )
                data = json.loads(response)
                evidence.append(
                    EvidenceChunk(
                        chunk_id=chunk.chunk_id,
                        text=chunk.text,
                        summary=str(data.get("summary", "")).strip(),
                        quotes=tuple(
                            quote.strip()
                            for quote in data.get("quotes", [])
                            if isinstance(quote, str) and quote.strip()
                        ),
                    )
                )
            except Exception as exc:
                self._record_error("summarize_chunk", exc, query=query, url=page.final_url, source_id=source_id)
                continue

        return tuple(evidence)
Luego colapsa los resúmenes de los fragmentos en una nota de fuente:
    def _summarize_source(
        self,
        topic: str,
        query: str,
        source_id: str,
        page: WebPage,
        chunks: tuple[EvidenceChunk, ...],
    ) -> str:
        chunk_digest = _chunk_digest(chunks, max_chars=9000)
        prompt = dedent(
            f"""
            Topic: {topic}
            Search query: {query}
            Source ID: {source_id}
            Source title: {page.title}
            Source URL: {page.final_url}

            Chunk evidence:
            {chunk_digest}

            Synthesize a source note using only the chunk evidence. Include:
            - key facts with dates/numbers where present
            - any limitations or bias in the source
            - useful exact wording from quotes if it is short

            Keep the note under 180 words and refer to the source as [{source_id}].
            """
        ).strip()
        return self.venice.chat(
            [
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": prompt},
            ],
            temperature=0.1,
            max_tokens=500,
        )
Este resumen en dos pasos es la parte que hace que el agente parezca más fiable que un script básico de “resumir estas URL”. El modelo lee primero los fragmentos de la fuente y después escribe una nota a nivel de fuente a partir de esas piezas de evidencia extraídas.

Escribiendo el informe final

Una vez que el agente tiene las notas de las fuentes, puede escribir el informe. Empieza con un escritor de informes de una sola pasada:
    def _write_report(self, topic: str, notes: list[SourceNote]) -> str:
        if not notes:
            return (
                f"# Research report: {topic}\n\n"
                "No usable web sources were collected. Check your network connection or try a narrower topic."
            )

        prompt = dedent(
            f"""
            Research topic:
            {topic}

            Source notes:
            {_source_digest(notes, max_chars=45000)}

            Write a detailed source-backed Markdown research survey.

            Requirements:
            - Start with a precise H1 title.
            - Open with "## Overview".
            - Use topic-specific sections.
            - Use footnote-style citation markers like [^1] and [^2].
            - Do not cite with internal source IDs like [S1] in the report body.
            - Do not include uncited factual claims.
            - Avoid source-cluster capture from one vendor, domain, framework, or viewpoint.
            - Include uncertainty, contradictions, and missing context where relevant.
            - End with "## References" as a numbered list ordered by first citation.
            """
        ).strip()

        return self.venice.chat_stream(
            [
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": prompt},
            ],
            temperature=0.2,
            max_tokens=7000,
        )
La implementación de referencia va más allá para los informes profundos: pide a Venice un esquema, redacta cada sección del informe por separado, y luego pide una pasada final de editor para ensamblar el informe terminado y convertir los IDs internos de fuentes en citas estilo notas al pie. Ese enfoque por etapas resulta útil cuando se desea una salida de investigación de formato largo, porque un solo prompt gigante a menudo comprime demasiado. Los prompts actualizados también empujan el informe hacia un panorama amplio y respaldado por fuentes en lugar de una guía de decisión superficial. Si la base de fuentes está sesgada hacia un cluster, el prompt del editor le indica a Venice que reconozca ese sesgo y evite presentarlo como representativo de todo el campo. Añade los helpers de resumen:
def _chunk_digest(chunks: tuple[EvidenceChunk, ...], max_chars: int) -> str:
    parts = []
    for chunk in chunks:
        quote_text = "; ".join(chunk.quotes)
        parts.append(
            f"{chunk.chunk_id}: {chunk.summary}"
            + (f"\nQuotes: {quote_text}" if quote_text else "")
        )
    return "\n\n".join(parts)[:max_chars]


def _source_digest(notes: list[SourceNote], max_chars: int) -> str:
    chunks = [
        "\n".join(
            [
                f"[{note.source_id}] {note.title}",
                f"URL: {note.final_url or note.url}",
                f"Canonical URL: {note.canonical_url}",
                f"Found via: {note.query}",
                f"Provider/rank: {note.provider}/{note.rank}",
                f"Retrieved: {note.retrieved_at}",
                f"Content hash: {note.content_hash}",
                f"Note: {note.summary}",
                f"Chunk evidence: {_chunk_digest(note.chunks, max_chars=1000)}",
            ]
        )
        for note in notes
    ]
    return "\n\n".join(chunks)[:max_chars]
Por último, añade el registro de errores:
    def _record_error(
        self,
        stage: str,
        exc: Exception,
        *,
        query: str = "",
        url: str = "",
        source_id: str = "",
        provider: str = "",
    ) -> None:
        message = str(exc)
        self.progress(f"{stage.replace('_', ' ').title()} failed: {message}")
        self.artifacts.write(
            "errors",
            CollectionError(
                stage=stage,
                message=message,
                query=query,
                url=url,
                source_id=source_id,
                provider=provider,
            ),
        )
En este punto, el bucle principal de investigación está listo.

Añadiendo la CLI

Ahora necesitamos un punto de entrada por línea de comandos. Crea main.py:
from __future__ import annotations

import argparse
from pathlib import Path

from dotenv import load_dotenv

from research_agent.agent import (
    DEFAULT_ITERATIONS,
    DEFAULT_MAX_CHUNKS_PER_SOURCE,
    DEFAULT_MAX_SOURCES,
    DEFAULT_QUERY_COUNT,
    DEFAULT_REPORT_STYLE,
    DEFAULT_RESULTS_PER_QUERY,
    ResearchAgent,
)
from research_agent.artifacts import ArtifactWriter
from research_agent.venice import VeniceClient, VeniceError
from research_agent.web import WebSearch


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Run a minimal deep research agent powered by Venice AI.",
    )
    parser.add_argument("topic", nargs="+", help="Research topic, wrapped in quotes for best results.")
    parser.add_argument("--model", help="Venice model name. Defaults to VENICE_MODEL or openai-gpt-55.")
    parser.add_argument("--iterations", type=int, default=DEFAULT_ITERATIONS)
    parser.add_argument("--queries", type=int, default=DEFAULT_QUERY_COUNT)
    parser.add_argument("--results", type=int, default=DEFAULT_RESULTS_PER_QUERY)
    parser.add_argument("--output", "--markdown-output", dest="output", type=Path)
    parser.add_argument("--artifacts", type=Path, help="Optional directory for JSONL research artifacts.")
    parser.add_argument("--providers", default="duckduckgo", help="Comma-separated providers: duckduckgo, arxiv.")
    parser.add_argument("--max-sources", type=int, default=DEFAULT_MAX_SOURCES)
    parser.add_argument("--chunk-chars", type=int, default=3000)
    parser.add_argument("--max-chunks-per-source", type=int, default=DEFAULT_MAX_CHUNKS_PER_SOURCE)
    parser.add_argument(
        "--report-style",
        choices=["brief", "standard", "deep"],
        default=DEFAULT_REPORT_STYLE,
        help=f"Final report depth. Default: {DEFAULT_REPORT_STYLE}.",
    )
    parser.add_argument("--quiet", action="store_true", help="Hide progress messages.")
    return parser.parse_args()
La CLI expone los parámetros que realmente vas a ajustar durante la investigación:
OpciónQué controla
--iterationsNúmero de pases de investigación
--queriesConsultas de búsqueda generadas por pase
--resultsResultados leídos por proveedor para cada consulta
--providersProveedores de búsqueda, como duckduckgo o duckduckgo,arxiv
--max-sourcesMáximo de fuentes utilizables a recopilar
--chunk-charsTamaño aproximado de fragmento antes de la extracción de evidencias
--max-chunks-per-sourceNúmero de fragmentos resumidos por fuente
--report-styleProfundidad del informe final: brief, standard o deep
--artifactsDirectorio para los registros JSONL de auditoría
--outputRuta para el informe Markdown final
Ahora conecta todo:
def main() -> int:
    load_dotenv()
    args = parse_args()
    topic = " ".join(args.topic)

    try:
        venice = VeniceClient.from_env(model=args.model)
        progress = None if args.quiet else lambda message: print(f"[agent] {message}")
        provider_names = [name.strip() for name in args.providers.split(",") if name.strip()]

        with WebSearch.from_provider_names(
            provider_names,
            chunk_chars=args.chunk_chars,
            scraper=venice.scrape,
        ) as web:
            agent = ResearchAgent(
                venice=venice,
                web=web,
                artifacts=ArtifactWriter(args.artifacts),
                progress=progress,
                max_sources=args.max_sources,
                max_chunks_per_source=args.max_chunks_per_source,
                report_style=args.report_style,
            )
            report = agent.run(
                topic,
                iterations=args.iterations,
                query_count=args.queries,
                results_per_query=args.results,
            )
    except ValueError as exc:
        print(f"Configuration error: {exc}")
        return 1
    except VeniceError as exc:
        print(f"Venice API error: {exc}")
        return 1

    if args.output:
        args.output.parent.mkdir(parents=True, exist_ok=True)
        args.output.write_text(report.markdown, encoding="utf-8")
        print(f"\nSaved report to {args.output}")
    else:
        print()
        print(report.markdown)

    if report.artifacts_dir:
        print(f"Saved research artifacts to {report.artifacts_dir}")

    return 0


if __name__ == "__main__":
    raise SystemExit(main())
Esto nos da una CLI de investigación local funcional.

Ejecutando el agente

Ejecuta un pase de investigación rápido:
uv run python main.py "How are AI agents changing software engineering workflows?"
Escribe el informe en un archivo Markdown:
uv run python main.py "state of open source LLM inference in 2026" \
  --output reports/inference.md
Usa más fuentes y varios proveedores:
uv run python main.py "agentic coding research" \
  --providers duckduckgo,arxiv \
  --iterations 3 \
  --queries 5 \
  --results 4 \
  --max-sources 12
Elige el estilo del informe final:
uv run python main.py "AI agents in software engineering" --report-style deep
Usa brief para un informe conciso respaldado por fuentes, standard para un panorama más completo y deep para el flujo por etapas de esquema/sección/editor. Guarda artefactos auditables:
uv run python main.py "privacy tradeoffs in hosted LLM APIs" \
  --output reports/privacy.md \
  --artifacts runs/privacy
Cuando los artefactos están habilitados, verás archivos como:
runs/privacy/
  queries.jsonl
  research_gaps.jsonl
  search_results.jsonl
  fetches.jsonl
  source_chunks.jsonl
  chunk_summaries.jsonl
  source_notes.jsonl
  dedupe.jsonl
  errors.jsonl
  report_outline.jsonl
  report_sections.jsonl
  report_editor.jsonl
  reports.jsonl
Estos archivos resultan útiles cuando quieres entender cómo el agente llegó a una conclusión. Por ejemplo, source_notes.jsonl muestra la evidencia resumida de la fuente, research_gaps.jsonl muestra por qué se generaron las búsquedas de seguimiento, y errors.jsonl muestra las páginas que fallaron durante la búsqueda, el scraping o el resumen.

Notas sobre privacidad y fiabilidad

Un agente de investigación toca varios sistemas, por lo que conviene ser preciso sobre qué información va a dónde: Límites de datos del agente de investigación privado
CapaQué ve los datos
CLI localEl tema, la configuración, las notas de fuente, los artefactos y los informes finales permanecen en tu máquina
Proveedor de búsquedaLas consultas de búsqueda se envían al proveedor que elijas, como DuckDuckGo o arXiv
Venice scrapeLas URL públicas de las fuentes se envían al endpoint de scrape de Venice
Venice chat completionsLos prompts, los fragmentos de fuente, las notas de fuente y las instrucciones de generación del informe se envían a Venice
Archivos de salidaLos informes Markdown y los artefactos JSONL se escriben localmente
Si deseas mantener más del recorrido de búsqueda dentro de Venice, puedes adaptar la capa de proveedores para que llame al endpoint POST /augment/search de Venice en lugar de consultar directamente a DuckDuckGo. La implementación de referencia utiliza proveedores públicos ligeros para que la demo siga siendo fácil de ejecutar y comprender. Por motivos de fiabilidad, mantén estos valores predeterminados conservadores:
  • Usa reintentos para las llamadas a Venice y las solicitudes web.
  • Añade un pequeño --request-delay si vas a leer muchas páginas del mismo host.
  • Limita --max-sources para que los temas amplios no se ejecuten indefinidamente.
  • Guarda --artifacts para los informes importantes y poder auditar la salida final.
  • Considera el informe como una nota informativa, no como verdad absoluta. Sigue las citas hasta la fuente original cuando la precisión importe.

Probando las piezas

No necesitas solicitudes web en vivo ni llamadas a Venice para probar la mayor parte del sistema. El repositorio de referencia usa clases falsas de Venice y de web para probar el bucle de investigación, el comportamiento de deduplicación, los artefactos y los prompts del informe. Una primera prueba útil es la canonicalización de URL:
from research_agent.models import canonicalize_url


def test_canonicalize_url_removes_tracking_params():
    url = "https://example.com/post?utm_source=x&b=2&a=1#section"
    assert canonicalize_url(url) == "https://example.com/post?a=1&b=2"
Luego prueba que el contenido duplicado se omita:
from research_agent.models import SearchResult, WebPage, chunk_text


class FakeWeb:
    def search(self, query: str, limit: int = 5) -> list[SearchResult]:
        return [
            SearchResult(title="First source", url="https://example.com/a", snippet="snippet"),
            SearchResult(title="Mirror", url="https://example.com/b", snippet="snippet"),
        ]

    def fetch(self, result: SearchResult) -> WebPage:
        text = "This page contains relevant evidence. " * 5
        return WebPage(
            title=result.title,
            url=result.url,
            final_url=result.url,
            text=text,
            content_hash="same-content",
            chunks=chunk_text(text, chunk_chars=80, overlap=10),
        )
Los fakes hacen que las pruebas del agente sean mucho más rápidas y menos inestables. Puedes verificar la lógica de orquestación sin depender de resultados de búsqueda en vivo, condiciones de red o salida del modelo.

Benchmarking

Muchos proveedores de IA ahora tienen sus propios flujos de deep research, así que el repositorio de referencia incluye un sencillo benchmark frente a la herramienta Deep Research de Perplexity. Se pidió a ambos agentes que escribieran un informe sobre la arquitectura de frameworks de agentes de IA, y los informes generados se han subido al repositorio de GitHub. No pretende ser un benchmark formal. Es una forma práctica de inspeccionar la estructura del informe, la cobertura de fuentes, la calidad de las citas y si el agente se centra demasiado en un cluster de fuentes. Esa es también la razón por la que la implementación actualizada hace seguimiento de research_gaps.jsonl y del equilibrio entre fuentes antes de las búsquedas de seguimiento.

Extendiendo este ejemplo

Una vez que el agente base funciona, aquí tienes formas prácticas de mejorarlo:
  • Añadir un proveedor de búsqueda de Venice utilizando POST /augment/search.
  • Almacenar los informes y artefactos en una pequeña base de datos SQLite en lugar de archivos JSONL.
  • Añadir allowlists o blocklists de fuentes para dominios de investigación de confianza.
  • Añadir soporte de PDF combinando Venice scrape con el parseo de documentos para fuentes que no expongan HTML limpio.
  • Añadir un conjunto de evaluación de temas y tipos de fuente esperados para poder comparar la calidad de la investigación tras cambios en los prompts.
  • Añadir un paso de revisión que pida a Venice encontrar afirmaciones sin respaldo en el informe final antes de guardarlo.
La mayor mejora suele ser una mejor selección de fuentes. La generación de consultas ayuda, pero también puedes mejorar la calidad prefiriendo fuentes primarias, documentos de estándares, documentación oficial, papers, changelogs y páginas de datasets frente a resúmenes de baja señal.

Para terminar

¡Gracias por leer! Esperamos que esto te haya ayudado a construir un agente de investigación privado y práctico con Python y la API de Venice. El patrón útil aquí no es solo “pedir a un modelo que investigue algo”. Es desglosar la investigación en pasos auditables: planificar búsquedas, recopilar fuentes, extraer evidencias, escribir notas de fuente, hacer seguimiento de las lagunas y sintetizar con citas. Al mantener esos pasos explícitos, obtenemos un flujo de investigación que es más fácil de inspeccionar, probar y mejorar con el tiempo.