Skip to content

Core Framework

The versifai.core package provides the shared agentic infrastructure: the ReAct loop engine, LLM client, memory management, tool system, and configuration.

Agent Base

BaseAgent

BaseAgent(display: AgentDisplay, memory: AgentMemory, llm: LLMClient, registry: ToolRegistry)

Base class providing the ReAct loop and shared infrastructure.

Subclasses must
  • Call super().__init__(display, memory, llm, registry)
  • Implement _register_tools()
  • Set self._system_prompt
Source code in src/versifai/core/agent.py
def __init__(
    self,
    display: AgentDisplay,
    memory: AgentMemory,
    llm: LLMClient,
    registry: ToolRegistry,
) -> None:
    self._display = display
    self._memory = memory
    self._llm = llm
    self._registry = registry

    # Error tracking
    self._consecutive_errors = 0
    self._max_consecutive_errors = 5

    # Track repeated missing-parameter patterns (LLM output size limits)
    self._missing_param_tracker: dict[str, dict[str, int]] = {}

    # Subclasses must set this
    self._system_prompt: str = ""

LLM Client

LLMClient

LLMClient(model: str = 'claude-sonnet-4-6', max_tokens: int = 8192, api_key: str | None = None, api_base: str | None = None, retry_attempts: int = 3, retry_base_delay: float = 2.0, extended_context: bool = True)

Multi-provider LLM client for tool-use conversations.

Uses LiteLLM <https://docs.litellm.ai/>_ under the hood to support Anthropic Claude, OpenAI, Azure, Bedrock, Vertex, and 100+ other providers through a single unified API.

Provider is inferred from the model string:

  • "claude-sonnet-4-6" → Anthropic
  • "gpt-4o" → OpenAI
  • "bedrock/anthropic.claude-3-5-sonnet" → AWS Bedrock

API keys are resolved from environment variables automatically by LiteLLM (ANTHROPIC_API_KEY, OPENAI_API_KEY, etc.) or can be passed explicitly.

Example::

from versifai.core import LLMClient

# Anthropic Claude (default)
llm = LLMClient()

# OpenAI
llm = LLMClient(model="gpt-4o")

# Explicit key
llm = LLMClient(model="claude-sonnet-4-6", api_key="sk-...")
Source code in src/versifai/core/llm.py
def __init__(
    self,
    model: str = "claude-sonnet-4-6",
    max_tokens: int = 8192,
    api_key: str | None = None,
    api_base: str | None = None,
    retry_attempts: int = 3,
    retry_base_delay: float = 2.0,
    extended_context: bool = True,
) -> None:
    self._model = model
    self._max_tokens = max_tokens
    self._api_key = api_key
    self._api_base = self._normalize_api_base(model, api_base)
    self._retry_attempts = retry_attempts
    self._retry_base_delay = retry_base_delay
    self._extended_context = extended_context

    # Usage tracking
    self._total_input_tokens = 0
    self._total_output_tokens = 0
    self._total_cache_read_tokens = 0
    self._total_cache_creation_tokens = 0
    self._call_count = 0

send

send(messages: list[dict], system: str, tools: list[dict]) -> LLMResponse

Send a message with tool definitions and return the response.

For Anthropic models, applies prompt caching on the system prompt and tool definitions to reduce token costs.

Parameters

messages : list[dict] Conversation history (Anthropic or OpenAI format — auto-converted). system : str System prompt text. tools : list[dict] Tool definitions (Anthropic format — auto-converted for other providers by LiteLLM).

Returns

LLMResponse Normalized response with content blocks and usage stats.

Source code in src/versifai/core/llm.py
def send(
    self,
    messages: list[dict],
    system: str,
    tools: list[dict],
) -> LLMResponse:
    """
    Send a message with tool definitions and return the response.

    For Anthropic models, applies prompt caching on the system prompt
    and tool definitions to reduce token costs.

    Parameters
    ----------
    messages : list[dict]
        Conversation history (Anthropic or OpenAI format — auto-converted).
    system : str
        System prompt text.
    tools : list[dict]
        Tool definitions (Anthropic format — auto-converted for other
        providers by LiteLLM).

    Returns
    -------
    LLMResponse
        Normalized response with content blocks and usage stats.
    """
    # Convert messages from Anthropic-native to OpenAI format for LiteLLM
    converted_messages = self._convert_messages_for_litellm(messages)

    # Build kwargs for litellm.completion
    kwargs: dict[str, Any] = {
        "model": self._model,
        "max_tokens": self._max_tokens,
        "tools": tools or None,
    }

    if self._api_key:
        kwargs["api_key"] = self._api_key
    if self._api_base:
        kwargs["api_base"] = self._api_base

    # Provider-specific optimizations
    if self._is_direct_anthropic:
        # Anthropic prompt caching: mark system + last tool for caching
        kwargs["messages"] = [{"role": "system", "content": system}] + converted_messages
        if tools:
            # LiteLLM passes cache_control through for Anthropic
            cached_tools = list(tools)
            last_tool = {**cached_tools[-1], "cache_control": {"type": "ephemeral"}}
            cached_tools[-1] = last_tool
            kwargs["tools"] = cached_tools
        # Enable 1M context window (beta) — avoids 200K default rejection
        if self._extended_context:
            kwargs["extra_headers"] = {"anthropic-beta": "context-1m-2025-08-07"}
    else:
        # OpenAI and others: system goes in messages
        kwargs["messages"] = [{"role": "system", "content": system}] + converted_messages

    # Retry loop with exponential backoff
    last_error: Exception | None = None

    for attempt in range(self._retry_attempts):
        try:
            response = litellm.completion(**kwargs)
            self._call_count += 1

            # Track usage
            if hasattr(response, "usage") and response.usage:
                usage = response.usage
                self._total_input_tokens += getattr(usage, "prompt_tokens", 0) or 0
                self._total_output_tokens += getattr(usage, "completion_tokens", 0) or 0
                self._total_cache_read_tokens += (
                    getattr(usage, "cache_read_input_tokens", 0) or 0
                )
                self._total_cache_creation_tokens += (
                    getattr(usage, "cache_creation_input_tokens", 0) or 0
                )

            return self._normalize_response(response)

        except Exception as e:
            last_error = e
            err_str = str(e).lower()
            # Retry on rate limits, connection errors, server errors
            if any(kw in err_str for kw in ("rate", "429", "500", "502", "503", "connection")):
                delay = self._retry_base_delay * (2**attempt)
                logger.warning(
                    f"LLM error (attempt {attempt + 1}/{self._retry_attempts}): "
                    f"{type(e).__name__}: {e}. Retrying in {delay:.1f}s"
                )
                time.sleep(delay)
            else:
                raise

    raise RuntimeError(
        f"LLM call failed after {self._retry_attempts} attempts. Last error: {last_error}"
    )

extract_actions staticmethod

extract_actions(response: LLMResponse | Any) -> list[dict]

Parse a response into a list of actions.

Each action is one of
  • {"type": "text", "text": "..."}
  • {"type": "tool_use", "id": "...", "name": "...", "input": {...}}

Accepts both our LLMResponse and raw Anthropic Message objects for backward compatibility.

Source code in src/versifai/core/llm.py
@staticmethod
def extract_actions(response: LLMResponse | Any) -> list[dict]:
    """
    Parse a response into a list of actions.

    Each action is one of:
      - ``{"type": "text", "text": "..."}``
      - ``{"type": "tool_use", "id": "...", "name": "...", "input": {...}}``

    Accepts both our ``LLMResponse`` and raw Anthropic ``Message`` objects
    for backward compatibility.
    """
    # Handle our normalized LLMResponse
    if isinstance(response, LLMResponse):
        return list(response.content)

    # Backward compat: raw Anthropic Message objects
    actions = []
    for block in response.content:
        if block.type == "text":
            actions.append({"type": "text", "text": block.text})
        elif block.type == "tool_use":
            actions.append(
                {
                    "type": "tool_use",
                    "id": block.id,
                    "name": block.name,
                    "input": block.input,
                }
            )
    return actions

build_tool_result_message staticmethod

build_tool_result_message(tool_use_id: str, result: str, is_error: bool = False) -> dict

Build a tool_result message block for the conversation.

Source code in src/versifai/core/llm.py
@staticmethod
def build_tool_result_message(
    tool_use_id: str,
    result: str,
    is_error: bool = False,
) -> dict:
    """Build a tool_result message block for the conversation."""
    return {
        "role": "user",
        "content": [
            {
                "type": "tool_result",
                "tool_use_id": tool_use_id,
                "content": result,
                "is_error": is_error,
            }
        ],
    }

LLMResponse dataclass

LLMResponse(content: list[dict] = list(), stop_reason: str = '', usage: dict = dict(), raw: Any = None)

Normalized response from any LLM provider.

Wraps the raw provider response into a consistent shape so the rest of the framework doesn't need to care which provider was used.

Memory

AgentMemory

AgentMemory()

Manages conversation history and decision logging for the agent.

Handles context window management by summarizing old messages when the history gets too long. Supports per-source resets to keep the context clean across data sources.

Source code in src/versifai/core/memory.py
def __init__(self) -> None:
    self._messages: list[dict] = []
    self._decisions: list[DecisionRecord] = []
    self._source_summaries: dict[str, str] = {}
    self._context_notes: list[str] = []

messages property

messages: list[dict]

Return the current conversation history.

add_user_message

add_user_message(content: str) -> None

Add a user message (or initial prompt) to the conversation.

Source code in src/versifai/core/memory.py
def add_user_message(self, content: str) -> None:
    """Add a user message (or initial prompt) to the conversation."""
    self._messages.append({"role": "user", "content": content})
    self._maybe_summarize()

add_assistant_message

add_assistant_message(content: list[dict]) -> None

Add an assistant message to the conversation.

Content is the raw content blocks from Claude's response.

Source code in src/versifai/core/memory.py
def add_assistant_message(self, content: list[dict]) -> None:
    """
    Add an assistant message to the conversation.

    Content is the raw content blocks from Claude's response.
    """
    self._messages.append({"role": "assistant", "content": content})
    self._maybe_summarize()

add_tool_result

add_tool_result(tool_use_id: str, result: str, is_error: bool = False, image_base64: str = '', image_media_type: str = 'image/png') -> None

Add a tool result message, optionally with an image.

When image_base64 is provided, the tool result content becomes a list of content blocks (text + image) so Claude can see chart output.

Source code in src/versifai/core/memory.py
def add_tool_result(
    self,
    tool_use_id: str,
    result: str,
    is_error: bool = False,
    image_base64: str = "",
    image_media_type: str = "image/png",
) -> None:
    """Add a tool result message, optionally with an image.

    When image_base64 is provided, the tool result content becomes a list
    of content blocks (text + image) so Claude can see chart output.
    """
    if image_base64:
        # Multi-block content: text + image for Claude vision
        content = [
            {"type": "text", "text": result},
            {
                "type": "image",
                "source": {
                    "type": "base64",
                    "media_type": image_media_type,
                    "data": image_base64,
                },
            },
        ]
    else:
        content = result  # type: ignore[assignment]

    self._messages.append(
        {
            "role": "user",
            "content": [
                {
                    "type": "tool_result",
                    "tool_use_id": tool_use_id,
                    "content": content,
                    "is_error": is_error,
                }
            ],
        }
    )
    self._compress_old_tool_results()
    self._maybe_summarize()

add_context_note

add_context_note(note: str) -> None

Add a persistent context note that survives summarization.

Source code in src/versifai/core/memory.py
def add_context_note(self, note: str) -> None:
    """Add a persistent context note that survives summarization."""
    self._context_notes.append(note)

reset_for_new_source

reset_for_new_source(source_summary: str = '') -> None

Reset the conversation for a new data source.

Clears all messages but preserves decisions, source summaries, and context notes. Optionally logs a summary of the previous source.

Source code in src/versifai/core/memory.py
def reset_for_new_source(self, source_summary: str = "") -> None:
    """
    Reset the conversation for a new data source.

    Clears all messages but preserves decisions, source summaries,
    and context notes. Optionally logs a summary of the previous source.
    """
    if source_summary:
        # Auto-detect the source name from the summary or use a generic key
        self._context_notes.append(source_summary)

    self._messages.clear()

    logger.debug(
        f"Memory reset for new source. "
        f"Preserved: {len(self._decisions)} decisions, "
        f"{len(self._source_summaries)} source summaries, "
        f"{len(self._context_notes)} context notes."
    )

get_carryover_context

get_carryover_context() -> str

Build a context string of everything learned so far. Injected into the prompt for a new source so the agent retains cross-source knowledge (schema decisions, FIPS patterns, etc).

Source code in src/versifai/core/memory.py
def get_carryover_context(self) -> str:
    """
    Build a context string of everything learned so far.
    Injected into the prompt for a new source so the agent retains
    cross-source knowledge (schema decisions, FIPS patterns, etc).
    """
    parts = []

    if self._source_summaries:
        parts.append("## Sources Already Processed")
        for name, summ in self._source_summaries.items():
            parts.append(f"- **{name}**: {summ}")

    if self._context_notes:
        parts.append("\n## Key Notes From Prior Work")
        for note in self._context_notes[-10:]:  # Last 10 notes
            parts.append(f"- {note}")

    return "\n".join(parts) if parts else ""

strip_images

strip_images() -> int

Remove ALL base64 images from conversation history.

Returns the number of images stripped. Images are replaced with a text placeholder so the agent knows a chart was viewed.

Source code in src/versifai/core/memory.py
def strip_images(self) -> int:
    """Remove ALL base64 images from conversation history.

    Returns the number of images stripped. Images are replaced with a
    text placeholder so the agent knows a chart was viewed.
    """
    count = 0
    for msg in self._messages:
        if msg.get("role") != "user":
            continue
        content = msg.get("content")
        if not isinstance(content, list):
            continue
        for block in content:
            if not isinstance(block, dict) or block.get("type") != "tool_result":
                continue
            result_content = block.get("content")
            if isinstance(result_content, list):
                new_blocks = _strip_images_from_blocks(result_content)
                if len(new_blocks) != len(result_content):
                    count += len(result_content) - len(new_blocks)
                block["content"] = new_blocks
    return count

compress_all_tool_results

compress_all_tool_results() -> None

Force-compress ALL tool results regardless of age.

Uses a lower threshold (300 chars) than the normal compression. Called during emergency recalibration after context window overflow.

Source code in src/versifai/core/memory.py
def compress_all_tool_results(self) -> None:
    """Force-compress ALL tool results regardless of age.

    Uses a lower threshold (300 chars) than the normal compression.
    Called during emergency recalibration after context window overflow.
    """
    self._compress_tool_results_in_range(0, len(self._messages), char_threshold=300)

force_summarize

force_summarize() -> None

Summarize the conversation regardless of message count.

Used during emergency recalibration to reduce context size.

Source code in src/versifai/core/memory.py
def force_summarize(self) -> None:
    """Summarize the conversation regardless of message count.

    Used during emergency recalibration to reduce context size.
    """
    if len(self._messages) <= 3:
        return
    # Temporarily lower the trigger and run summarization
    original_trigger = SUMMARY_TRIGGER
    try:
        self._force_summarize_impl()
    finally:
        # SUMMARY_TRIGGER is a module constant, not mutated — we just
        # call the summarization logic directly instead
        _ = original_trigger

recalibrate

recalibrate() -> None

Emergency context reduction pipeline.

Called when the LLM rejects a request due to context window overflow. Applies three progressive reduction steps: 1. Strip all base64 images (biggest per-item savings) 2. Compress all tool results aggressively (300 char threshold) 3. Force-summarize old conversation turns

Source code in src/versifai/core/memory.py
def recalibrate(self) -> None:
    """Emergency context reduction pipeline.

    Called when the LLM rejects a request due to context window overflow.
    Applies three progressive reduction steps:
      1. Strip all base64 images (biggest per-item savings)
      2. Compress all tool results aggressively (300 char threshold)
      3. Force-summarize old conversation turns
    """
    images_stripped = self.strip_images()
    logger.info(f"Recalibrate step 1: stripped {images_stripped} images from memory.")

    self.compress_all_tool_results()
    logger.info("Recalibrate step 2: compressed all tool results.")

    self.force_summarize()
    logger.info(
        f"Recalibrate step 3: force-summarized. History now has {len(self._messages)} messages."
    )

log_decision

log_decision(source_name: str, decision: str, reasoning: str, tool_used: str | None = None) -> None

Log a decision made by the agent.

Source code in src/versifai/core/memory.py
def log_decision(
    self, source_name: str, decision: str, reasoning: str, tool_used: str | None = None
) -> None:
    """Log a decision made by the agent."""
    self._decisions.append(
        DecisionRecord(
            timestamp=datetime.now().isoformat(),
            source_name=source_name,
            decision=decision,
            reasoning=reasoning,
            tool_used=tool_used,
        )
    )

log_source_summary

log_source_summary(source_name: str, summary: str) -> None

Store a summary for a completed source (survives context compression).

Source code in src/versifai/core/memory.py
def log_source_summary(self, source_name: str, summary: str) -> None:
    """Store a summary for a completed source (survives context compression)."""
    self._source_summaries[source_name] = summary

Display

AgentDisplay

AgentDisplay(dbutils=None)

Clean chat-style display for the agent.

In Databricks: renders styled HTML bubbles via displayHTML. Outside: prints clean formatted text with unicode icons.

Source code in src/versifai/core/display.py
def __init__(self, dbutils=None):
    self._dbutils = dbutils
    self._is_databricks = dbutils is not None or _in_databricks()
    # Rolling plain-text log of everything displayed
    self._log: list[str] = []
    # Counter for notebook display calls (for periodic truncation)
    self._display_count = 0

dump_progress

dump_progress(path: str) -> None

Overwrite path with the full plain-text log so far.

Builds the full string first and writes in a single call — Databricks FUSE mounts don't support append and can behave unpredictably with multiple writes within one open().

Source code in src/versifai/core/display.py
def dump_progress(self, path: str) -> None:
    """Overwrite *path* with the full plain-text log so far.

    Builds the full string first and writes in a single call —
    Databricks FUSE mounts don't support append and can behave
    unpredictably with multiple writes within one open().
    """
    os.makedirs(os.path.dirname(path), exist_ok=True)
    content = (
        f"Progress dump — {datetime.now():%Y-%m-%d %H:%M:%S}\n"
        + "=" * 72
        + "\n\n"
        + "\n".join(self._log)
        + "\n"
    )
    with open(path, "w") as f:
        f.write(content)

phase

phase(title: str) -> None

Display a major phase header.

Source code in src/versifai/core/display.py
def phase(self, title: str) -> None:
    """Display a major phase header."""
    self._append_log(f"\n{'=' * 72}\n  {title}\n{'=' * 72}")
    if self._is_databricks:
        self._display_html(f'<div class="agent-phase">{html.escape(title)}</div>')
    else:
        print(f"\n{'=' * 60}")
        print(f"  {title}")
        print(f"{'=' * 60}")
    logger.debug(f"PHASE: {title}")

step

step(message: str) -> None

Display a minor step (turn counter, etc). Kept minimal.

Source code in src/versifai/core/display.py
def step(self, message: str) -> None:
    """Display a minor step (turn counter, etc). Kept minimal."""
    self._append_log(f"  {message}")
    if self._is_databricks:
        self._display_html(f'<div class="agent-step">{html.escape(message)}</div>')
    else:
        print(f"  {message}")
    logger.debug(f"STEP: {message}")

thinking

thinking(thought: str) -> None

Display the agent's reasoning as a chat bubble.

Source code in src/versifai/core/display.py
def thinking(self, thought: str) -> None:
    """Display the agent's reasoning as a chat bubble."""
    self._append_log(f"\n  [Reasoning]\n  {thought[:2000]}")
    # Truncate very long thoughts for display (full text stays in memory)
    display_text = thought[:1200] + ("..." if len(thought) > 1200 else "")
    escaped = html.escape(display_text).replace("\n", "<br>")
    if self._is_databricks:
        self._display_html(
            f'<div class="agent-bubble thinking">'
            f'<div class="label">Agent Reasoning</div>'
            f"{escaped}</div>"
        )
    else:
        wrapped = textwrap.fill(
            display_text, width=88, initial_indent="  ", subsequent_indent="  "
        )
        print("\n  [Reasoning]")
        print(wrapped)
    logger.debug(f"THINKING: {thought[:200]}")

tool_call

tool_call(tool_name: str, params: dict) -> None

Display a tool invocation.

Source code in src/versifai/core/display.py
def tool_call(self, tool_name: str, params: dict) -> None:
    """Display a tool invocation."""
    params_plain = ", ".join(f"{k}={str(v)[:80]}" for k, v in params.items())
    self._append_log(f"\n  > {tool_name}({params_plain})")

    # Show params concisely — truncate long values
    param_parts = []
    for k, v in params.items():
        val_str = str(v)
        if len(val_str) > 80:
            val_str = val_str[:77] + "..."
        param_parts.append(f"<b>{html.escape(k)}</b>={html.escape(val_str)}")
    params_html = ", ".join(param_parts)

    if self._is_databricks:
        self._display_html(
            f'<div class="agent-bubble tool">'
            f'<div class="label">Tool Call</div>'
            f'<code style="color:#93c5fd;">{html.escape(tool_name)}</code>'
            f'<span style="color:#64748b;font-size:12px;margin-left:8px;">'
            f"({params_html})</span></div>"
        )
    else:
        print(f"\n  > {tool_name}({params_plain})")
    logger.debug(f"TOOL: {tool_name}")

tool_result

tool_result(tool_name: str, result_preview: str, is_error: bool = False) -> None

Display a tool result.

Source code in src/versifai/core/display.py
def tool_result(self, tool_name: str, result_preview: str, is_error: bool = False) -> None:
    """Display a tool result."""
    preview = result_preview[:600]
    icon = "x" if is_error else "+"
    self._append_log(f"  [{icon}] {tool_name}: {preview[:300]}")

    escaped = html.escape(preview).replace("\n", "<br>")
    err_class = " error" if is_error else ""
    label = "Error" if is_error else "Result"

    if self._is_databricks:
        self._display_html(
            f'<div class="agent-bubble result{err_class}">'
            f'<div class="label">{html.escape(tool_name)} - {label}</div>'
            f"{escaped}</div>"
        )
    else:
        # Keep result output compact
        lines = preview.split("\n")
        compact = "\n    ".join(lines[:8])
        if len(lines) > 8:
            compact += f"\n    ... ({len(lines) - 8} more lines)"
        print(f"  [{icon}] {tool_name}: {compact}")

    if is_error:
        logger.debug(f"TOOL ERROR [{tool_name}]: {preview[:120]}")
    else:
        logger.debug(f"TOOL OK [{tool_name}]")

success

success(message: str) -> None

Display a success message.

Source code in src/versifai/core/display.py
def success(self, message: str) -> None:
    """Display a success message."""
    self._append_log(f"\n  [OK] {message}")
    if self._is_databricks:
        self._display_html(
            f'<div class="agent-bubble success">'
            f'<div class="label">Success</div>'
            f"{html.escape(message)}</div>"
        )
    else:
        print(f"\n  [OK] {message}")
    logger.info(f"SUCCESS: {message}")

warning

warning(message: str) -> None

Display a warning.

Source code in src/versifai/core/display.py
def warning(self, message: str) -> None:
    """Display a warning."""
    self._append_log(f"\n  [!] {message}")
    if self._is_databricks:
        self._display_html(
            f'<div class="agent-bubble warn">'
            f'<div class="label">Warning</div>'
            f"{html.escape(message)}</div>"
        )
    else:
        print(f"\n  [!] {message}")
    logger.warning(f"WARNING: {message}")

error

error(message: str) -> None

Display an error.

Source code in src/versifai/core/display.py
def error(self, message: str) -> None:
    """Display an error."""
    self._append_log(f"\n  [X] {message}")
    if self._is_databricks:
        self._display_html(
            f'<div class="agent-bubble err">'
            f'<div class="label">Error</div>'
            f"{html.escape(message)}</div>"
        )
    else:
        print(f"\n  [X] {message}")
    logger.error(f"ERROR: {message}")

summary_table

summary_table(title: str, rows: list[dict]) -> None

Display a summary table.

Source code in src/versifai/core/display.py
def summary_table(self, title: str, rows: list[dict]) -> None:
    """Display a summary table."""
    if not rows:
        return
    headers = list(rows[0].keys())

    if self._is_databricks:
        header_row = "".join(f"<th>{html.escape(h)}</th>" for h in headers)
        body_rows = ""
        for row in rows:
            cells = "".join(f"<td>{html.escape(str(row.get(h, '')))}</td>" for h in headers)
            body_rows += f"<tr>{cells}</tr>"

        self._display_html(
            f'<div class="agent-bubble">'
            f'<div class="label">{html.escape(title)}</div>'
            f'<table class="agent-table">'
            f"<thead><tr>{header_row}</tr></thead>"
            f"<tbody>{body_rows}</tbody></table></div>"
        )
    else:
        print(f"\n  {title}")
        print(f"  {'-' * 60}")
        for row in rows:
            line = " | ".join(f"{k}: {v}" for k, v in row.items())
            print(f"    {line}")

ask_human

ask_human(question: str, context: str = '', options: list[str] | None = None) -> str

Pause the agent and ask the human operator a question.

Always uses Python input() for reliability — Databricks widgets don't work well in all notebook execution contexts. Displays the question with HTML formatting if in Databricks, then collects the answer via stdin.

Source code in src/versifai/core/display.py
def ask_human(
    self,
    question: str,
    context: str = "",
    options: list[str] | None = None,
) -> str:
    """
    Pause the agent and ask the human operator a question.

    Always uses Python ``input()`` for reliability — Databricks
    widgets don't work well in all notebook execution contexts.
    Displays the question with HTML formatting if in Databricks,
    then collects the answer via stdin.
    """
    # Show formatted question in notebook if possible
    if self._is_databricks:
        options_html = ""
        if options:
            items = "".join(
                f'<li style="margin:4px 0;color:#c4b5fd;">{html.escape(o)}</li>'
                for o in options
            )
            options_html = f'<ul style="margin:8px 0;padding-left:20px;">{items}</ul>'

        context_html = ""
        if context:
            context_html = (
                f'<div style="color:#94a3b8;font-size:12px;margin-top:8px;'
                f"padding:8px;background:#0f172a;border-radius:6px;"
                f'font-family:monospace;white-space:pre-wrap;">'
                f"{html.escape(context)}</div>"
            )

        self._display_html(
            f'<div class="agent-bubble human">'
            f'<div class="label">Agent Needs Your Input</div>'
            f'<div style="font-size:15px;margin:8px 0;">{html.escape(question)}</div>'
            f"{context_html}{options_html}"
            f'<div style="color:#64748b;font-size:11px;margin-top:8px;">'
            f"Type your answer below.</div></div>"
        )

    # Always print to stdout so the question is visible in all contexts
    print(f"\n{'=' * 60}")
    print("  AGENT NEEDS YOUR INPUT")
    print(f"{'=' * 60}")
    print(f"  {question}")
    if context:
        print(f"\n  Context: {context[:500]}")
    if options:
        print()
        for i, opt in enumerate(options, 1):
            print(f"    {i}. {opt}")
    print()

    # Collect input via Python input() — blocks until user responds
    answer = input("  Your answer> ")

    # Show the response as a styled bubble so it's visible in the output
    if self._is_databricks and answer:
        self._display_html(
            f'<div class="agent-bubble" style="border-left:3px solid #8b5cf6;'
            f'background:#1e1b4b;">'
            f'<div class="label" style="color:#a78bfa;">Human Response</div>'
            f"{html.escape(answer)}</div>"
        )
    print(f"  Human responded: {answer}")
    return answer

Configuration

CatalogConfig dataclass

CatalogConfig(catalog: str = '', schema: str = '', volume_path: str = '', staging_path: str = '')

Shared Databricks Unity Catalog connection settings.

Used by all agent families that need to read from or write to Unity Catalog tables and volumes.

databricks_host property

databricks_host: str

Databricks workspace host, read from environment.

databricks_token property

databricks_token: str

Databricks access token, read from environment.

AgentSettings dataclass

AgentSettings(max_agent_turns: int = 200, max_turns_per_source: int = 120, max_acceptance_iterations: int = 3, sample_rows: int = 10, profile_sample_size: int = 500)

Tunable parameters for agent behaviour.

Sensible defaults are provided. Override as needed when constructing agent instances.

Tool System

BaseTool

Bases: ABC

Abstract base class for agent tools.

Subclasses must implement
  • name (property)
  • description (property)
  • parameters_schema (property) — JSON Schema dict
  • _execute(**kwargs) -> ToolResult

name abstractmethod property

name: str

Unique tool name used in Claude tool_use calls.

description abstractmethod property

description: str

Tool description shown to Claude.

parameters_schema abstractmethod property

parameters_schema: dict

JSON Schema describing the tool's input parameters.

Example

{ "type": "object", "properties": { "path": {"type": "string", "description": "Volume path to explore"} }, "required": ["path"] }

execute

execute(**kwargs) -> ToolResult

Public entry point. Wraps _execute with error handling so that any exception is captured and returned as an error ToolResult instead of crashing the agent loop.

Special handling for TypeError (missing/wrong parameters): instead of a raw traceback, returns the tool's parameter schema so the agent can self-correct in one retry.

Source code in src/versifai/core/tools/base.py
def execute(self, **kwargs) -> ToolResult:
    """
    Public entry point.  Wraps _execute with error handling so that
    any exception is captured and returned as an error ToolResult
    instead of crashing the agent loop.

    Special handling for TypeError (missing/wrong parameters): instead
    of a raw traceback, returns the tool's parameter schema so the
    agent can self-correct in one retry.
    """
    try:
        return self._execute(**kwargs)
    except TypeError as exc:
        # Almost always means missing or mistyped parameters.
        # Give the agent the exact schema it needs to fix the call.
        schema = self.parameters_schema
        required = schema.get("required", [])
        props = schema.get("properties", {})
        param_hints = []
        for name, spec in props.items():
            req = " (REQUIRED)" if name in required else ""
            desc = spec.get("description", "")
            ptype = spec.get("type", "any")
            param_hints.append(f"  - {name}: {ptype}{req}{desc}")
        param_block = "\n".join(param_hints) if param_hints else "  (see tool description)"

        return ToolResult(
            success=False,
            error=(
                f"Parameter error calling '{self.name}': {exc}\n\n"
                f"You provided: {list(kwargs.keys())}\n\n"
                f"Expected parameters:\n{param_block}\n\n"
                f"Fix: ensure all REQUIRED parameters are included with correct types."
            ),
            summary=f"Tool '{self.name}' called with wrong parameters: {exc}",
        )
    except Exception as exc:
        tb = traceback.format_exc()
        return ToolResult(
            success=False,
            error=f"{type(exc).__name__}: {exc}\n\nTraceback:\n{tb}",
            summary=f"Tool '{self.name}' failed: {exc}",
        )

to_claude_tool_definition

to_claude_tool_definition() -> dict

Return the tool definition in the format expected by the Anthropic API's tool_use feature.

Source code in src/versifai/core/tools/base.py
def to_claude_tool_definition(self) -> dict:
    """
    Return the tool definition in the format expected by
    the Anthropic API's tool_use feature.
    """
    return {
        "name": self.name,
        "description": self.description,
        "input_schema": self.parameters_schema,
    }

ToolResult dataclass

ToolResult(success: bool, data: Any = None, error: str = '', summary: str = '', image_path: str = '')

Structured result returned by every tool invocation.

to_content_str

to_content_str() -> str

Serialize to a string suitable for returning to the LLM.

Source code in src/versifai/core/tools/base.py
def to_content_str(self) -> str:
    """Serialize to a string suitable for returning to the LLM."""
    if self.success:
        if isinstance(self.data, (dict, list)):
            return json.dumps(self.data, indent=2, default=str)
        return str(self.data)
    else:
        return f"ERROR: {self.error}"

ToolRegistry

ToolRegistry()

Central registry of all tools available to the agent.

Source code in src/versifai/core/tools/registry.py
def __init__(self) -> None:
    self._tools: dict[str, BaseTool] = {}

register

register(tool: BaseTool) -> None

Register a tool instance.

Source code in src/versifai/core/tools/registry.py
def register(self, tool: BaseTool) -> None:
    """Register a tool instance."""
    if tool.name in self._tools:
        raise ValueError(f"Tool '{tool.name}' is already registered.")
    self._tools[tool.name] = tool

get

get(name: str) -> BaseTool | None

Retrieve a tool by name.

Source code in src/versifai/core/tools/registry.py
def get(self, name: str) -> BaseTool | None:
    """Retrieve a tool by name."""
    return self._tools.get(name)

execute

execute(tool_name: str, /, **kwargs) -> ToolResult

Look up a tool by name and execute it.

The / makes tool_name positional-only so it can never collide with a keyword argument in **kwargs — even if a tool's own input schema includes a tool_name parameter (e.g. create_custom_tool).

Source code in src/versifai/core/tools/registry.py
def execute(self, tool_name: str, /, **kwargs) -> ToolResult:
    """
    Look up a tool by name and execute it.

    The ``/`` makes ``tool_name`` positional-only so it can never
    collide with a keyword argument in ``**kwargs`` — even if a
    tool's own input schema includes a ``tool_name`` parameter
    (e.g. ``create_custom_tool``).
    """
    tool = self._tools.get(tool_name)
    if tool is None:
        return ToolResult(
            success=False,
            error=f"Unknown tool: '{tool_name}'. Available tools: {list(self._tools.keys())}",
        )
    return tool.execute(**kwargs)

to_claude_tools

to_claude_tools() -> list[dict]

Return all tool definitions in Anthropic API format.

Source code in src/versifai/core/tools/registry.py
def to_claude_tools(self) -> list[dict]:
    """Return all tool definitions in Anthropic API format."""
    return [t.to_claude_tool_definition() for t in self._tools.values()]

Run Management

RunState dataclass

RunState(status: str = 'running', entry_point: str = 'run', current_phase: str = '', current_item: str = '', completed_phases: list[str] = list(), completed_items: dict[str, list[str]] = (lambda: {})(), error: str = '', updated_at: str = '')

Tracks execution state of an agent run for stop/resume support.

Persisted inside run_metadata.json under the "state" key so there is a single file to read on resume.

Typical lifecycle::

state = RunState(entry_point="run")
state.mark_phase_start("orientation")
state.mark_phase_complete("orientation")
state.mark_phase_start("silver")
state.mark_item_complete("silver", "silver_county_master")
...
state.mark_completed()      # or mark_interrupted() / mark_failed()

AgentDependency dataclass

AgentDependency(agent_type: str, config_name: str, run_id: str = '', base_path: str = '', outputs: list[str] = list())

Declares that a config consumes outputs from a previous agent run.

Used by orchestrators to resolve the concrete path to a prior run's outputs (findings, charts, tables, notes) at startup time.

Example::

AgentDependency(
    agent_type="scientist",
    config_name="geographic_disparity",
    base_path="/Volumes/.../research_results/geographic_disparity",
)

generate_run_id

generate_run_id() -> str

Generate a unique run ID: YYYYMMDD_HHMMSS_XXXX.

Format: timestamp (to second) + 4 random hex chars for uniqueness. Lexicographic sort = chronological sort.

Source code in src/versifai/core/run_manager.py
def generate_run_id() -> str:
    """Generate a unique run ID: YYYYMMDD_HHMMSS_XXXX.

    Format: timestamp (to second) + 4 random hex chars for uniqueness.
    Lexicographic sort = chronological sort.
    """
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    suffix = secrets.token_hex(2)  # 4 hex chars
    return f"{ts}_{suffix}"

init_run_directory

init_run_directory(base_path: str, run_id: str) -> str

Create a run-isolated output directory.

Creates base_path/runs/{run_id}/ with standard subdirectories: charts/, tables/, notes/, models/.

Returns the full run directory path.

Source code in src/versifai/core/run_manager.py
def init_run_directory(base_path: str, run_id: str) -> str:
    """Create a run-isolated output directory.

    Creates ``base_path/runs/{run_id}/`` with standard subdirectories:
    charts/, tables/, notes/, models/.

    Returns the full run directory path.
    """
    run_path = os.path.join(base_path, "runs", run_id)
    for subdir in ("charts", "tables", "notes", "models"):
        os.makedirs(os.path.join(run_path, subdir), exist_ok=True)
    return run_path

resolve_dependency

resolve_dependency(dep: AgentDependency) -> str

Resolve a dependency to its concrete run directory path.

If dep.run_id is set, returns dep.base_path/runs/{dep.run_id}/. If empty, finds the latest run under dep.base_path/runs/. If no runs directory exists, returns dep.base_path directly (backward compatibility with pre-run-isolation outputs).

Source code in src/versifai/core/run_manager.py
def resolve_dependency(dep: AgentDependency) -> str:
    """Resolve a dependency to its concrete run directory path.

    If ``dep.run_id`` is set, returns ``dep.base_path/runs/{dep.run_id}/``.
    If empty, finds the latest run under ``dep.base_path/runs/``.
    If no runs directory exists, returns ``dep.base_path`` directly
    (backward compatibility with pre-run-isolation outputs).
    """
    if not dep.base_path:
        raise ValueError(f"AgentDependency for '{dep.config_name}' has no base_path set.")

    # If a specific run_id is requested
    if dep.run_id:
        run_path = os.path.join(dep.base_path, "runs", dep.run_id)
        if os.path.isdir(run_path):
            return run_path
        raise FileNotFoundError(f"Run '{dep.run_id}' not found at {run_path}")

    # Try to find the latest run
    runs_dir = os.path.join(dep.base_path, "runs")
    if os.path.isdir(runs_dir):
        run_ids = sorted(
            d for d in os.listdir(runs_dir) if os.path.isdir(os.path.join(runs_dir, d))
        )
        if run_ids:
            return os.path.join(runs_dir, run_ids[-1])

    # No runs directory — fall back to base_path (pre-isolation outputs)
    return dep.base_path

resolve_run_path

resolve_run_path(base_path: str, run_id: str | None = None) -> str

Resolve a run directory path.

If run_id is given, returns base_path/runs/{run_id}/. If None, finds the most recent run by lexicographic sort of run IDs.

Raises FileNotFoundError if no runs exist.

Source code in src/versifai/core/run_manager.py
def resolve_run_path(base_path: str, run_id: str | None = None) -> str:
    """Resolve a run directory path.

    If ``run_id`` is given, returns ``base_path/runs/{run_id}/``.
    If None, finds the most recent run by lexicographic sort of run IDs.

    Raises FileNotFoundError if no runs exist.
    """
    if run_id:
        path = os.path.join(base_path, "runs", run_id)
        if not os.path.isdir(path):
            raise FileNotFoundError(f"Run directory not found: {path}")
        return path

    runs_dir = os.path.join(base_path, "runs")
    if not os.path.isdir(runs_dir):
        raise FileNotFoundError(f"No runs directory found at {runs_dir}")

    run_ids = sorted(d for d in os.listdir(runs_dir) if os.path.isdir(os.path.join(runs_dir, d)))
    if not run_ids:
        raise FileNotFoundError(f"No runs found in {runs_dir}")

    return os.path.join(runs_dir, run_ids[-1])

list_runs

list_runs(base_path: str) -> list[dict]

List all runs under base_path/runs/.

Returns a list of dicts: [{run_id, path, created, metadata}].

Source code in src/versifai/core/run_manager.py
def list_runs(base_path: str) -> list[dict]:
    """List all runs under ``base_path/runs/``.

    Returns a list of dicts: [{run_id, path, created, metadata}].
    """
    runs_dir = os.path.join(base_path, "runs")
    if not os.path.isdir(runs_dir):
        return []

    results = []
    for run_id in sorted(os.listdir(runs_dir)):
        run_path = os.path.join(runs_dir, run_id)
        if not os.path.isdir(run_path):
            continue

        entry: dict = {
            "run_id": run_id,
            "path": run_path,
        }

        # Try to read metadata
        meta_path = os.path.join(run_path, "run_metadata.json")
        if os.path.isfile(meta_path):
            try:
                with open(meta_path) as f:
                    entry["metadata"] = json.load(f)
            except (json.JSONDecodeError, OSError):
                pass

        results.append(entry)

    return results