Skip to content

Data Agents

The versifai.data_agents package provides agents for data engineering and quality validation.

DataEngineerAgent

DataEngineerAgent

DataEngineerAgent(cfg: ProjectConfig | None = None, dbutils=None, volume_path: str | None = None)

Autonomous data engineering agent powered by Claude.

The agent processes data sources one at a time, resetting its conversation memory between sources to stay within context limits. Cross-source knowledge (schemas, decisions) is carried forward via the memory's carryover context.

Usage in a Databricks notebook::

from versifai.data_agents.engineer.agent import DataEngineerAgent
from versifai.data_agents.engineer.config import ProjectConfig

cfg = ProjectConfig()  # or customize for your project
agent = DataEngineerAgent(cfg=cfg, dbutils=dbutils)
agent.run()
Source code in src/versifai/data_agents/engineer/agent.py
def __init__(
    self,
    cfg: ProjectConfig | None = None,
    dbutils=None,
    volume_path: str | None = None,
) -> None:
    if cfg is None:
        raise ValueError(
            "cfg is required. Pass a ProjectConfig instance. "
            "See examples/ for sample configurations."
        )
    self._cfg = cfg

    self._volume_path = volume_path or cfg.volume_path
    self._display = AgentDisplay(dbutils=dbutils)
    self._state = AgentState(started_at=datetime.now())
    self._memory = AgentMemory()
    self._llm = LLMClient(
        model=cfg.llm.model,
        max_tokens=cfg.llm.max_tokens,
        api_key=cfg.llm.api_key or None,
        api_base=cfg.llm.api_base or None,
        extended_context=cfg.llm.extended_context,
    )

    # Build the tool registry with inter-tool dependencies
    self._schema_tool = SchemaDesignerTool(cfg=cfg)
    self._transformer_tool = DataTransformerTool(
        schema_designer_tool=self._schema_tool,
        cfg=cfg,
    )
    self._catalog_writer = CatalogWriterTool(transformer_tool=self._transformer_tool)
    # Back-reference so transformer can auto-flush to catalog
    self._transformer_tool.set_catalog_writer(self._catalog_writer)

    self._registry = ToolRegistry()
    self._register_tools()

    # Human-in-the-loop state
    self._dbutils = dbutils

    # Error tracking for self-healing
    self._error_tracker: dict[str, list[dict]] = {}
    self._consecutive_errors = 0
    self._max_consecutive_errors = 5

    # Track repeated missing-parameter patterns (LLM output size limits)
    self._missing_param_tracker: dict[str, dict[str, int]] = {}

    # Discovery results — populated in Phase 1
    self._discovered_sources: list[dict] = []

    # Pre-build prompts from config
    self._system_prompt = build_system_prompt(cfg)

run

run(force_full: bool = False, source_path: str | None = None) -> dict
Run the full agent pipeline
  1. Discovery — crawl the volume, identify all data sources 1.5. Incremental detection — check existing tables for loaded files
  2. Per-source processing — process each source independently
  3. Acceptance loop — analyst validates, engineer fixes, repeat

Parameters:

Name Type Description Default
force_full bool

If True, skip incremental detection and reprocess all sources from scratch (overwrite existing tables).

False
source_path str | None

If provided, skip discovery and process only this directory path directly. Useful for re-processing a single source or adding a new source without re-crawling everything.

None

Returns a summary dict with results and statistics.

Source code in src/versifai/data_agents/engineer/agent.py
def run(
    self,
    force_full: bool = False,
    source_path: str | None = None,
) -> dict:
    """
    Run the full agent pipeline:
      1. Discovery — crawl the volume, identify all data sources
      1.5. Incremental detection — check existing tables for loaded files
      2. Per-source processing — process each source independently
      3. Acceptance loop — analyst validates, engineer fixes, repeat

    Args:
        force_full: If True, skip incremental detection and reprocess
            all sources from scratch (overwrite existing tables).
        source_path: If provided, skip discovery and process only this
            directory path directly. Useful for re-processing a single
            source or adding a new source without re-crawling everything.

    Returns a summary dict with results and statistics.
    """
    cfg = self._cfg
    self._display.phase("AGENTIC DATA PROFILER STARTING")
    self._display.step(f"Project: {cfg.name}")
    self._display.step(f"Volume: {self._volume_path}")
    self._display.step(f"Target: {cfg.full_schema}")
    self._display.step(f"Tools: {self._registry.tool_names + ['ask_human']}")

    try:
        if source_path:
            # ── Shortcut: Skip discovery, process a single directory ──
            import os

            source_name = os.path.basename(source_path.rstrip("/"))
            file_count = 0
            if os.path.isdir(source_path):
                file_count = len(
                    [
                        f
                        for f in os.listdir(source_path)
                        if os.path.isfile(os.path.join(source_path, f))
                    ]
                )

            self._display.phase(f"Direct mode: {source_name} ({file_count} files)")
            self._display.step("Skipping discovery — processing target directory only.")

            self._discovered_sources = [
                {
                    "name": source_name,
                    "path": source_path,
                    "file_count": file_count,
                    "files": "",
                    "mode": "full" if force_full else "check_incremental",
                }
            ]

            if not force_full:
                self._display.phase("Incremental Status Check")
                self._detect_incremental_status()

        else:
            # ── Phase 1: Discovery ──────────────────────────────────
            self._display.phase("Phase 1: Discovery")
            self._run_phase(
                prompt=build_discovery_prompt(cfg),
                max_turns=MAX_TURNS_PER_SOURCE,
            )

            # Parse the discovery results to find sources
            self._discovered_sources = self._parse_discovery_results()

            # Always merge with filesystem fallback to catch missed directories
            fallback_sources = self._fallback_discover_sources()
            existing_paths = {s["path"] for s in self._discovered_sources}
            for fb_source in fallback_sources:
                if fb_source["path"] not in existing_paths:
                    self._discovered_sources.append(fb_source)
                    self._display.step(f"  Added from fallback: {fb_source['name']}")

            self._display.step(
                f"Discovered {len(self._discovered_sources)} data sources to process"
            )

            if not self._discovered_sources:
                self._display.warning(
                    "No data sources discovered. Check the volume path and directory structure."
                )

            # ── Phase 1.5: Incremental Detection ────────────────────
            if not force_full:
                self._display.phase("Phase 1.5: Incremental Status Check")
                self._detect_incremental_status()
            else:
                self._display.step("Force full reload — skipping incremental detection.")
                for source in self._discovered_sources:
                    source["mode"] = "full"

        # ── Phase 2: Process each source ────────────────────────
        self._display.phase("Phase 2: Source-by-Source Processing")

        for i, source in enumerate(self._discovered_sources, 1):
            source_name = source["name"]
            source_path = source["path"]
            file_count = source.get("file_count", "?")
            mode = source.get("mode", "full")

            # Calculate turn budget — multi-table sources get more
            hint = cfg.get_source_hint(source_name)
            if hint and hint.multi_table:
                source_max_turns = MAX_TURNS_PER_SOURCE + (len(hint.files) * 15)
            else:
                source_max_turns = MAX_TURNS_PER_SOURCE

            self._display.phase(
                f"Source {i}/{len(self._discovered_sources)}: {source_name} "
                f"({file_count} files) [{mode.upper()}] "
                f"[budget: {source_max_turns} turns]"
            )

            # Reset memory for this source, keeping cross-source context
            carryover = self._memory.get_carryover_context()
            self._memory.reset_for_new_source()
            self._error_tracker.clear()
            self._consecutive_errors = 0
            self._missing_param_tracker.clear()

            # Register auto-flush target so large staging buffers
            # get written to Unity Catalog before running out of memory
            table_name = f"{cfg.full_schema}.{source_name}"
            self._transformer_tool.register_flush_target(source_name, table_name)

            # Route to the right prompt based on mode
            if mode == "check_incremental":
                source_prompt = build_incremental_source_prompt(
                    cfg=cfg,
                    source_name=source_name,
                    source_path=source_path,
                    existing_table=source["existing_table"],
                    loaded_files=source["loaded_files"],
                )
            else:
                # mode == "full" or unknown — full processing
                source_prompt = build_source_processing_prompt(
                    cfg=cfg,
                    source_name=source_name,
                    source_path=source_path,
                    file_list=source.get("files", "See directory listing"),
                    suggested_table=source_name,
                )

            # Prepend carryover context if we have it
            if carryover:
                source_prompt = (
                    f"## Context From Previous Sources\n{carryover}\n\n---\n\n{source_prompt}"
                )

            # Process this source
            self._run_phase(
                prompt=source_prompt,
                max_turns=source_max_turns,
            )

            # Log source completion
            self._memory.log_source_summary(
                source_name, f"Processed from {source_path} ({file_count} files) [mode={mode}]"
            )
            self._display.success(f"Completed source: {source_name}")

            # ── Per-source completeness check (multi-table retry) ──
            # For multi-table sources, verify all expected tables were
            # created. If any are missing, force the agent back in with
            # a focused prompt to finish the job.
            if hint and hint.multi_table:
                missing = self._check_source_tables(source_name)
                retry_attempts = 0
                max_retries = 2

                while missing and retry_attempts < max_retries:
                    retry_attempts += 1
                    self._display.warning(
                        f"Multi-table source {source_name} INCOMPLETE: "
                        f"missing {len(missing)} table(s): "
                        f"{', '.join(missing)}"
                    )

                    # Reset error state for the retry
                    self._error_tracker.clear()
                    self._consecutive_errors = 0
                    self._missing_param_tracker.clear()

                    retry_prompt = self._build_retry_prompt(
                        source_name=source_name,
                        source_path=source_path,
                        missing_tables=missing,
                        hint=hint,
                    )
                    retry_turns = max(30, len(missing) * 20)
                    self._display.phase(
                        f"RETRY {retry_attempts}/{max_retries}: "
                        f"{source_name}{len(missing)} missing table(s) "
                        f"[budget: {retry_turns} turns]"
                    )
                    self._run_phase(
                        prompt=retry_prompt,
                        max_turns=retry_turns,
                    )

                    # Re-check
                    missing = self._check_source_tables(source_name)

                if missing:
                    self._display.warning(
                        f"After {retry_attempts} retries, still missing: {', '.join(missing)}"
                    )
                else:
                    self._display.success(
                        f"All {len(hint.files)} tables created for {source_name}."
                    )

        # ── Phase 3: Table Completeness Check ─────────────────
        self._display.phase("Phase 3: Table Completeness Check")
        processed_names = [s["name"] for s in self._discovered_sources]
        self._check_table_completeness(source_names=processed_names)

    except KeyboardInterrupt:
        self._display.warning("Agent interrupted by user.")
    except Exception as e:
        self._display.error(f"Agent failed: {e}")
        logger.exception("Agent top-level failure")

    # Final summary
    summary = self._build_summary()
    self._display.phase("PIPELINE COMPLETE")
    self._display.summary_table("Final Results", summary["tables"])
    self._display.step(f"Engineer LLM Usage: {self._llm.usage_summary}")

    return summary

run_rename

run_rename() -> dict

Run the retroactive column rename phase.

Reads documentation for each existing table in Unity Catalog, builds a mapping from cryptic column names to descriptive names, and applies ALTER TABLE RENAME COLUMN statements.

This is safe to run at any time — it's metadata-only on Delta tables and doesn't rewrite data.

Usage in a Databricks notebook::

from versifai.data_agents.engineer.agent import DataEngineerAgent
from versifai.data_agents.engineer.config import ProjectConfig

cfg = ProjectConfig()
agent = DataEngineerAgent(cfg=cfg, dbutils=dbutils)
agent.run_rename()
Source code in src/versifai/data_agents/engineer/agent.py
def run_rename(self) -> dict:
    """
    Run the retroactive column rename phase.

    Reads documentation for each existing table in Unity Catalog,
    builds a mapping from cryptic column names to descriptive names,
    and applies ALTER TABLE RENAME COLUMN statements.

    This is safe to run at any time — it's metadata-only on Delta
    tables and doesn't rewrite data.

    Usage in a Databricks notebook::

        from versifai.data_agents.engineer.agent import DataEngineerAgent
        from versifai.data_agents.engineer.config import ProjectConfig

        cfg = ProjectConfig()
        agent = DataEngineerAgent(cfg=cfg, dbutils=dbutils)
        agent.run_rename()
    """
    cfg = self._cfg
    self._display.phase("RETROACTIVE COLUMN RENAME")
    self._display.step(f"Target schema: {cfg.full_schema}")
    self._display.step(f"Documentation source: {cfg.volume_path}")
    self._display.step(f"Tools: {self._registry.tool_names + ['ask_human']}")

    try:
        self._memory.reset_for_new_source()
        self._error_tracker.clear()
        self._consecutive_errors = 0
        self._missing_param_tracker.clear()

        prompt = build_rename_prompt(cfg)
        self._run_phase(prompt=prompt, max_turns=MAX_TURNS_PER_SOURCE)

        self._display.success("Column rename phase complete.")

    except KeyboardInterrupt:
        self._display.warning("Rename interrupted by user.")
    except Exception as e:
        self._display.error(f"Rename failed: {e}")
        logger.exception("Rename phase failure")

    summary = {
        "phase": "rename",
        "total_turns": self._state.total_turns,
        "llm_usage": self._llm.usage_summary,
    }
    self._display.phase("RENAME COMPLETE")
    self._display.step(f"LLM Usage: {self._llm.usage_summary}")
    return summary

run_catalog

run_catalog() -> dict

Build or update the data_catalog table documenting every column in every table.

Uses delta-fill: detects which tables are new or changed since the last run and only documents those. Existing, unchanged entries are preserved.

Hard constraint: every table in Unity Catalog must be documented. The agent will verify 100% coverage before finishing.

Safe to run at any time — incrementally updates the data_catalog table.

Usage in a Databricks notebook::

from versifai.data_agents.engineer.agent import DataEngineerAgent
from versifai.data_agents.engineer.config import ProjectConfig

cfg = ProjectConfig()
agent = DataEngineerAgent(cfg=cfg, dbutils=dbutils)
agent.run_catalog()
Source code in src/versifai/data_agents/engineer/agent.py
def run_catalog(self) -> dict:
    """
    Build or update the data_catalog table documenting every column in every table.

    Uses delta-fill: detects which tables are new or changed since the last
    run and only documents those. Existing, unchanged entries are preserved.

    Hard constraint: every table in Unity Catalog must be documented.
    The agent will verify 100% coverage before finishing.

    Safe to run at any time — incrementally updates the data_catalog table.

    Usage in a Databricks notebook::

        from versifai.data_agents.engineer.agent import DataEngineerAgent
        from versifai.data_agents.engineer.config import ProjectConfig

        cfg = ProjectConfig()
        agent = DataEngineerAgent(cfg=cfg, dbutils=dbutils)
        agent.run_catalog()
    """
    cfg = self._cfg
    self._display.phase("DATA CATALOG BUILDER")
    self._display.step(f"Target schema: {cfg.full_schema}")
    self._display.step(f"Catalog table: {cfg.full_schema}.data_catalog")
    self._display.step(f"Documentation source: {cfg.volume_path}")
    self._display.step(f"Tools: {self._registry.tool_names + ['ask_human']}")

    try:
        self._memory.reset_for_new_source()
        self._error_tracker.clear()
        self._consecutive_errors = 0
        self._missing_param_tracker.clear()

        prompt = build_catalog_prompt(cfg)
        self._run_phase(prompt=prompt, max_turns=MAX_TURNS_PER_SOURCE)

        self._display.success("Data catalog built.")

    except KeyboardInterrupt:
        self._display.warning("Catalog build interrupted by user.")
    except Exception as e:
        self._display.error(f"Catalog build failed: {e}")
        logger.exception("Catalog build failure")

    summary = {
        "phase": "catalog",
        "total_turns": self._state.total_turns,
        "llm_usage": self._llm.usage_summary,
    }
    self._display.phase("CATALOG BUILD COMPLETE")
    self._display.step(f"LLM Usage: {self._llm.usage_summary}")
    return summary

run_quality_check

run_quality_check(tables: list[str] | None = None, hints: str = '') -> dict

Run acceptance testing on tables in Unity Catalog.

Uses the data_catalog table (if it exists) to ground the analyst in real table names, real column names, and real descriptions. This prevents hallucination and ensures all validation queries reference actual schema objects.

Should be called AFTER run_catalog() so the data_catalog is available.

Parameters:

Name Type Description Default
tables list[str] | None

Optional list of specific table names to validate. If None, validates ALL tables. Use this to focus on a specific dataset at a time (e.g., ["scc_enrollment"]).

None
hints str

Optional user instructions injected into the analyst prompt. Use this to tell the analyst what to focus on or what specific issues you want checked. Example: "Check that enrollment has no suppressed values. Star ratings should have 2022-2026 coverage."

''

Usage in a Databricks notebook::

from versifai.data_agents.engineer.agent import DataEngineerAgent
from versifai.data_agents.engineer.config import ProjectConfig

cfg = ProjectConfig()
agent = DataEngineerAgent(cfg=cfg, dbutils=dbutils)

# Check everything:
agent.run_quality_check()

# Check specific tables with guidance:
agent.run_quality_check(
    tables=["scc_enrollment", "star_ratings_summary"],
    hints="Verify enrollment suppression handling. "
          "Star ratings should cover 2022-2026."
)
Source code in src/versifai/data_agents/engineer/agent.py
def run_quality_check(
    self,
    tables: list[str] | None = None,
    hints: str = "",
) -> dict:
    """
    Run acceptance testing on tables in Unity Catalog.

    Uses the data_catalog table (if it exists) to ground the analyst in
    real table names, real column names, and real descriptions. This
    prevents hallucination and ensures all validation queries reference
    actual schema objects.

    Should be called AFTER run_catalog() so the data_catalog is available.

    Args:
        tables: Optional list of specific table names to validate.
            If None, validates ALL tables. Use this to focus on a
            specific dataset at a time (e.g., ["scc_enrollment"]).
        hints: Optional user instructions injected into the analyst
            prompt. Use this to tell the analyst what to focus on
            or what specific issues you want checked.
            Example: "Check that enrollment has no suppressed values.
            Star ratings should have 2022-2026 coverage."

    Usage in a Databricks notebook::

        from versifai.data_agents.engineer.agent import DataEngineerAgent
        from versifai.data_agents.engineer.config import ProjectConfig

        cfg = ProjectConfig()
        agent = DataEngineerAgent(cfg=cfg, dbutils=dbutils)

        # Check everything:
        agent.run_quality_check()

        # Check specific tables with guidance:
        agent.run_quality_check(
            tables=["scc_enrollment", "star_ratings_summary"],
            hints="Verify enrollment suppression handling. "
                  "Star ratings should cover 2022-2026."
        )
    """
    cfg = self._cfg
    self._display.phase("DATA QUALITY CHECK")
    self._display.step(f"Target schema: {cfg.full_schema}")
    if tables:
        self._display.step(f"Focused on: {', '.join(tables)}")

    try:
        # Pre-build the table inventory from Unity Catalog
        table_inventory = self._build_table_inventory()

        if not table_inventory:
            self._display.error("No tables found in Unity Catalog. Nothing to validate.")
            return {"overall_status": "ERROR", "summary": "No tables found."}

        # Filter to requested tables if specified
        if tables:
            tables_lower = {t.lower() for t in tables}
            table_inventory = [
                t for t in table_inventory if t["table_name"].lower() in tables_lower
            ]
            if not table_inventory:
                self._display.error(f"None of the requested tables found: {tables}")
                return {
                    "overall_status": "ERROR",
                    "summary": f"Requested tables not found: {tables}",
                }

        self._display.step(f"Found {len(table_inventory)} tables to validate.")

        # Run the analyst with the pre-built inventory
        analyst = DataAnalystAgent(
            cfg=cfg,
            dbutils=self._dbutils,
            table_inventory=table_inventory,
            user_hints=hints,
        )
        feedback = analyst.run()

        # Display results
        verdicts = feedback.get("verdicts", {})
        if verdicts:
            verdict_rows = []
            for table_name, verdict in verdicts.items():
                verdict_rows.append(
                    {
                        "table": table_name,
                        "status": verdict.get("status", "?"),
                        "issues": str(len(verdict.get("issues", []))),
                    }
                )
            self._display.summary_table("Quality Check Verdicts", verdict_rows)

        overall = feedback.get("overall_status", "NEEDS_REVIEW")
        if overall == "PASS":
            self._display.success("All tables ACCEPTED.")
        else:
            self._display.warning(f"Quality check: {overall}. Review verdicts above.")

    except KeyboardInterrupt:
        self._display.warning("Quality check interrupted by user.")
        feedback = {"overall_status": "INTERRUPTED"}
    except Exception as e:
        self._display.error(f"Quality check failed: {e}")
        logger.exception("Quality check failure")
        feedback = {"overall_status": "ERROR", "summary": str(e)}

    summary = {
        "phase": "quality_check",
        "overall_status": feedback.get("overall_status", "UNKNOWN"),
        "verdicts": feedback.get("verdicts", {}),
        "total_turns": self._state.total_turns,
        "llm_usage": self._llm.usage_summary,
    }
    self._display.phase("QUALITY CHECK COMPLETE")
    self._display.step(f"LLM Usage: {self._llm.usage_summary}")
    return summary

DataAnalystAgent

DataAnalystAgent

DataAnalystAgent(cfg: ProjectConfig | None = None, dbutils=None, max_turns: int = 80, table_inventory: list[dict] | None = None, user_hints: str = '')

Acceptance testing agent that validates tables in Unity Catalog.

The analyst has a limited tool set — primarily SQL execution and catalog listing. It doesn't need file readers or transformers because it works only with data already in the catalog.

Usage::

from versifai.data_agents.engineer.config import ProjectConfig

cfg = ProjectConfig()
analyst = DataAnalystAgent(cfg=cfg, dbutils=dbutils)
feedback = analyst.run()
# feedback is a dict with verdicts per table
Source code in src/versifai/data_agents/analyst/agent.py
def __init__(
    self,
    cfg: ProjectConfig | None = None,
    dbutils=None,
    max_turns: int = 80,
    table_inventory: list[dict] | None = None,
    user_hints: str = "",
) -> None:
    if cfg is None:
        raise ValueError("cfg is required. See examples/ for sample configurations.")
    self._cfg = cfg
    self._table_inventory = table_inventory or []
    self._user_hints = user_hints

    self._display = AgentDisplay(dbutils=dbutils)
    self._memory = AgentMemory()
    self._llm = LLMClient(
        model=cfg.llm.model,
        max_tokens=cfg.llm.max_tokens,
        api_key=cfg.llm.api_key or None,
        api_base=cfg.llm.api_base or None,
        extended_context=cfg.llm.extended_context,
    )
    self._max_turns = max_turns
    self._dbutils = dbutils

    # Analyst only needs SQL and catalog tools
    self._registry = ToolRegistry()
    self._register_tools()

    # Error tracking
    self._consecutive_errors = 0
    self._max_consecutive_errors = 5

    # Pre-build prompts from config
    self._system_prompt = build_analyst_system_prompt(cfg)
    self._initial_prompt = build_analyst_initial_prompt(
        cfg,
        table_inventory=self._table_inventory,
        user_hints=self._user_hints,
    )

run

run() -> dict

Run acceptance testing on all tables in the target schema.

Returns a structured feedback dict with per-table verdicts: { "verdicts": { "table_name": {"status": "...", "issues": [...], "fixes": [...]} }, "cross_table_issues": [...], "overall_status": "PASS | NEEDS_WORK", "summary": "...", "raw_response": "..." }

Source code in src/versifai/data_agents/analyst/agent.py
def run(self) -> dict:
    """
    Run acceptance testing on all tables in the target schema.

    Returns a structured feedback dict with per-table verdicts:
    {
        "verdicts": { "table_name": {"status": "...", "issues": [...], "fixes": [...]} },
        "cross_table_issues": [...],
        "overall_status": "PASS | NEEDS_WORK",
        "summary": "...",
        "raw_response": "..."
    }
    """
    self._display.phase("DATA ANALYST: Acceptance Testing")
    self._display.step(f"Target schema: {self._cfg.full_schema}")
    self._display.step(f"Tools: {self._registry.tool_names + ['ask_human']}")

    self._memory.add_user_message(self._initial_prompt)

    last_text = ""
    turns = 0

    while turns < self._max_turns:
        turns += 1
        self._display.step(f"Analyst turn {turns}/{self._max_turns}")

        # Send to Claude
        try:
            response = self._llm.send(
                messages=self._memory.messages,
                system=self._system_prompt,
                tools=self._get_tool_definitions(),
            )
        except Exception as e:
            self._display.error(f"Analyst LLM call failed: {e}")
            break

        actions = LLMClient.extract_actions(response)
        self._memory.add_assistant_message(response.content)

        has_tool_use = False
        for action in actions:
            if action["type"] == "text":
                last_text = action["text"]
                self._display.thinking(action["text"])

            elif action["type"] == "tool_use":
                has_tool_use = True
                tool_name = action["name"]
                tool_input = action["input"]
                tool_use_id = action["id"]

                self._display.tool_call(tool_name, tool_input)

                if tool_name == "ask_human":
                    result_str = self._display.ask_human(
                        question=tool_input.get("question", ""),
                        context=tool_input.get("context", ""),
                        options=tool_input.get("options"),
                    )
                    self._memory.add_tool_result(tool_use_id, result_str or "(no response)")
                    self._display.tool_result(tool_name, result_str or "(no response)")
                    continue

                result = self._registry.execute(tool_name, **tool_input)
                result_str = result.to_content_str()

                if not result.success:
                    self._consecutive_errors += 1
                    if self._consecutive_errors >= self._max_consecutive_errors:
                        result_str += (
                            f"\n\n[WARNING] {self._consecutive_errors} consecutive errors."
                        )
                else:
                    self._consecutive_errors = 0

                if len(result_str) > 15000:
                    result_str = result_str[:15000] + "\n\n[... TRUNCATED ...]"

                self._display.tool_result(
                    tool_name,
                    result.summary or result_str[:400],
                    is_error=not result.success,
                )

                self._memory.add_tool_result(
                    tool_use_id, result_str, is_error=not result.success
                )

        if response.stop_reason == "end_turn" and not has_tool_use:
            self._display.step("Analyst review complete.")
            break

    if turns >= self._max_turns:
        self._display.warning(f"Analyst reached max turns ({self._max_turns}).")

    # Parse the structured feedback from the last text response
    feedback = self._parse_feedback(last_text)
    feedback["raw_response"] = last_text

    self._display.step(f"Analyst LLM usage: {self._llm.usage_summary}")

    return feedback

Configuration

ProjectConfig dataclass

ProjectConfig(name: str = '', description: str = '', domain_expertise: str = '', analyst_specialty: str = '', catalog: str = '', schema: str = '', llm: LLMConfig = LLMConfig(), volume_path: str = '', staging_path: str = '', join_key: JoinKeyConfig = JoinKeyConfig(), alternative_keys: list[AlternativeKeyConfig] = list(), geographic_grain: str = '', grain_description: str = '', staging_flush_threshold_rows: int = 30000000, metadata_columns: list[MetadataColumnConfig] = list(), naming_convention: str = 'snake_case', naming_rules: str = '', column_naming_examples: str = '', grain_detection_guidance: str = '', known_sources: list[DataSourceHint] = list(), documentation_urls: dict[str, list[str]] = dict(), source_processing_hints: list[SourceProcessingHint] = list())

Configuration for a data engineering project.

The DataEngineerAgent uses this to drive its data processing pipeline. Assemble one from building blocks (join keys, source hints, metadata) and pass it to the agent. The agent code is generic — all domain knowledge lives in the config instance.

full_schema property

full_schema: str

Fully qualified schema name.

known_sources_text property

known_sources_text: str

Formatted text block listing known data sources for prompts.

metadata_columns_text property

metadata_columns_text: str

Formatted text block listing metadata columns for prompts.

alternative_keys_text property

alternative_keys_text: str

Formatted text about recognized alternative keys for prompts.

alternative_key_names property

alternative_key_names: list[str]

List of alternative key column names.

join_key_related_text: str

Formatted text about related geographic columns.

metadata_column_names property

metadata_column_names: list[str]

List of metadata column name strings.

expected_tables property

expected_tables: list[str]

All tables the pipeline MUST produce (from source processing hints).

get_source_hint

get_source_hint(source_name: str) -> SourceProcessingHint | None

Find a matching SourceProcessingHint for a given source directory name.

Source code in src/versifai/data_agents/engineer/config.py
def get_source_hint(self, source_name: str) -> SourceProcessingHint | None:
    """Find a matching SourceProcessingHint for a given source directory name."""
    name_lower = source_name.lower()
    for hint in self.source_processing_hints:
        if hint.source_pattern.lower() in name_lower:
            return hint
    return None

format_source_hint

format_source_hint(source_name: str) -> str

Format the source processing hint as prompt text, or empty string if none.

Source code in src/versifai/data_agents/engineer/config.py
def format_source_hint(self, source_name: str) -> str:
    """Format the source processing hint as prompt text, or empty string if none."""
    hint = self.get_source_hint(source_name)
    if not hint:
        return ""
    lines = [
        "\n## IMPORTANT: Multi-Table Source Processing Instructions",
        f"\n**{hint.description}**\n",
    ]
    if hint.multi_table:
        lines.append(
            "This source contains MULTIPLE file types. Each file type must "
            "become its OWN SEPARATE TABLE. Do NOT combine them into one table.\n"
        )
    lines.append("### Files to Extract and Process:\n")
    for f in hint.files:
        lines.append(
            f"| **{f.file_pattern}** → `{f.target_table}` | "
            f"{f.description} | Used in: {f.used_in} |"
        )
    lines.append("")
    if hint.notes:
        lines.append(f"### Processing Notes:\n{hint.notes}\n")
    lines.append(
        "### Workflow:\n"
        "1. Extract ALL archives in this source directory.\n"
        "2. Inventory ALL extracted files — match each to a file type above "
        "using the file_pattern.\n"
        "3. **SKIP files that don't match any pattern** — do not create tables "
        "for unrecognized files.\n"
        "4. For EACH file type: group matching files across all years, "
        "profile the most recent, design one schema, and batch process "
        "all years into that table.\n"
        "5. You will make multiple write_to_catalog calls — one per target table."
    )
    return "\n".join(lines)

JoinKeyConfig dataclass

JoinKeyConfig(column_name: str = '', data_type: str = 'STRING', description: str = '', width: int = 0, validation_rule: str = '', expected_entity_count: int = 0, related_columns: list[dict] = list())

Defines the primary join key for cross-table joins.

AlternativeKeyConfig dataclass

AlternativeKeyConfig(column_name: str, description: str, data_type: str = 'STRING', grain: str = '')

An alternative join key for tables at a non-county grain.

MetadataColumnConfig dataclass

MetadataColumnConfig(name: str, data_type: str, description: str, nullable: bool = False)

Defines metadata columns automatically added to every table.

DataSourceHint dataclass

DataSourceHint(name: str, description: str, keywords: list[str] = list())

Optional hint about a known data source — helps the agent recognize it.

SourceProcessingHint dataclass

SourceProcessingHint(source_pattern: str, description: str, multi_table: bool = False, files: list[SourceFileHint] = list(), notes: str = '')

Per-source processing instructions for a specific data source directory.

When multi_table=True, the agent should create a SEPARATE table for each file matching the hints, rather than combining everything into one table.

SourceFileHint dataclass

SourceFileHint(file_pattern: str, target_table: str, description: str, used_in: str = '')

Describes a specific file expected within a source directory/archive.

Data Models

FileInfo dataclass

FileInfo(path: str, name: str, extension: str, size_bytes: int, modified_at: datetime | None = None, is_archive: bool = False, extracted_to: str | None = None, encoding: str | None = None, row_count: int | None = None, column_count: int | None = None, columns: list[str] = list())

Metadata about a single file discovered in the Volume.

from_path classmethod

from_path(path: str) -> FileInfo

Build a FileInfo from a filesystem path.

Source code in src/versifai/data_agents/models/source.py
@classmethod
def from_path(cls, path: str) -> FileInfo:
    """Build a FileInfo from a filesystem path."""
    stat = os.stat(path)
    name = os.path.basename(path)
    _, ext = os.path.splitext(name)
    return cls(
        path=path,
        name=name,
        extension=ext.lower().lstrip("."),
        size_bytes=stat.st_size,
        modified_at=datetime.fromtimestamp(stat.st_mtime),
        is_archive=ext.lower() in (".zip", ".gz", ".tar", ".tgz", ".7z"),
    )

FileGroup dataclass

FileGroup(name: str, folder_path: str, files: list[FileInfo] = list(), documentation_files: list[FileInfo] = list(), data_files: list[FileInfo] = list(), archive_files: list[FileInfo] = list())

A logical grouping of related files within a Volume subfolder.

For example, all NOAA weather station CSVs across multiple years might be grouped together under a single FileGroup.

classify_files

classify_files() -> None

Sort files into documentation, data, and archive buckets.

Source code in src/versifai/data_agents/models/source.py
def classify_files(self) -> None:
    """Sort files into documentation, data, and archive buckets."""
    doc_extensions = {"txt", "pdf", "doc", "docx", "md", "html", "htm", "rtf"}
    data_extensions = {"csv", "xlsx", "xls", "tsv", "dat", "sas7bdat", "sav", "dta", "parquet"}
    archive_extensions = {"zip", "gz", "tar", "tgz", "7z", "bz2"}

    self.documentation_files.clear()
    self.data_files.clear()
    self.archive_files.clear()

    for f in self.files:
        if f.extension in archive_extensions:
            self.archive_files.append(f)
        elif f.extension in doc_extensions:
            self.documentation_files.append(f)
        elif f.extension in data_extensions:
            self.data_files.append(f)
        else:
            # Unknown extension — still keep as data for the agent to inspect
            self.data_files.append(f)

most_recent_data_file

most_recent_data_file() -> FileInfo | None

Return the data file with the latest modification date.

Source code in src/versifai/data_agents/models/source.py
def most_recent_data_file(self) -> FileInfo | None:
    """Return the data file with the latest modification date."""
    candidates = self.data_files or self.files
    if not candidates:
        return None
    return max(candidates, key=lambda f: f.modified_at or datetime.min)

DataSource dataclass

DataSource(name: str, file_group: FileGroup, description: str = '', documentation_summary: str = '', profile_summary: str = '', target_table_name: str = '', year_range: str = '', county_fips_column: str = '', notes: list[str] = list())

Top-level representation of a data source that the agent will process.

Combines the file group with profiling results, schema decisions, and processing metadata.

ColumnDefinition dataclass

ColumnDefinition(target_name: str, data_type: str, source_name: str, description: str = '', nullable: bool = True, is_fips: bool = False, is_metadata: bool = False, transform_expression: str = '')

Definition of a single column in a target schema.

TargetSchema dataclass

TargetSchema(source_name: str, table_name: str, columns: list[ColumnDefinition] = list(), description: str = '', partition_columns: list[str] = list(), historical_mappings: dict[str, dict[str, str]] = dict())

Complete schema definition for a target Unity Catalog table.

Designed by the agent after profiling a data source.

fips_column property

fips_column: str | None

Return the name of the FIPS column, if any.

to_create_table_sql

to_create_table_sql() -> str

Generate a CREATE TABLE statement for this schema.

Source code in src/versifai/data_agents/models/schema.py
def to_create_table_sql(self) -> str:
    """Generate a CREATE TABLE statement for this schema."""
    col_defs = []
    for col in self.columns:
        nullable = "" if col.nullable else " NOT NULL"
        comment = f' COMMENT "{col.description}"' if col.description else ""
        col_defs.append(f"  {col.target_name} {col.data_type}{nullable}{comment}")

    cols_sql = ",\n".join(col_defs)
    partition = ""
    if self.partition_columns:
        partition = f"\nPARTITIONED BY ({', '.join(self.partition_columns)})"

    comment = f'\nCOMMENT "{self.description}"' if self.description else ""

    return (
        f"CREATE TABLE IF NOT EXISTS {self.table_name} (\n"
        f"{cols_sql}\n"
        f"){comment}{partition}\n"
        f"USING DELTA"
    )

SourceStatus

Bases: str, Enum

Processing phases for a data source.

SourceState dataclass

SourceState(source_name: str, status: SourceStatus = DISCOVERED, turns_used: int = 0, errors: list[str] = list(), decisions: list[str] = list(), files_processed: int = 0, files_total: int = 0, rows_loaded: int = 0, started_at: datetime | None = None, completed_at: datetime | None = None, table_name: str = '')

Tracks the processing state of a single data source.

AgentState dataclass

AgentState(sources: dict[str, SourceState] = dict(), current_source: str | None = None, total_turns: int = 0, started_at: datetime | None = None, phase: str = 'discovery')

Global state of the agent across all data sources.

Persisted so the agent can resume if interrupted.