Skip to content

Science Agents

The versifai.science_agents package provides agents for autonomous research analysis.

DataScientistAgent

DataScientistAgent

DataScientistAgent(cfg: ResearchConfig | None = None, dbutils=None, resume: bool = False)

Bases: BaseAgent

Autonomous research analysis agent powered by Claude.

Workflow
  1. Orientation — inventory bronze tables, understand available data
  2. Silver Construction — build pre-joined analytical datasets
  3. Research Analysis — investigate each research question
  4. Synthesis — compile findings, create executive summary

Usage in a Databricks notebook::

from versifai.science_agents.scientist.agent import DataScientistAgent
from versifai.science_agents.scientist.config import ResearchConfig

cfg = ResearchConfig()
agent = DataScientistAgent(cfg=cfg, dbutils=dbutils)
agent.run()
Source code in src/versifai/science_agents/scientist/agent.py
def __init__(
    self,
    cfg: ResearchConfig | None = None,
    dbutils=None,
    resume: bool = False,
) -> None:
    if cfg is None:
        raise ValueError("cfg is required. See examples/ for sample configurations.")
    self._cfg = cfg

    display = AgentDisplay(dbutils=dbutils)
    memory = AgentMemory()
    llm = LLMClient(
        model=cfg.llm.model,
        max_tokens=cfg.llm.max_tokens,
        api_key=cfg.llm.api_key or None,
        api_base=cfg.llm.api_base or None,
        extended_context=cfg.llm.extended_context,
    )
    registry = ToolRegistry()

    super().__init__(display=display, memory=memory, llm=llm, registry=registry)

    self._dbutils = dbutils

    # Resolve run path — always isolated
    if cfg.run_id:
        # Explicit run_id: reuse that exact run directory
        self._run_id = cfg.run_id
    elif resume:
        # Resume: find the latest existing run
        try:
            latest_path = resolve_run_path(cfg.results_volume_path)
            self._run_id = os.path.basename(latest_path)
            logger.info("Resuming previous run: %s", self._run_id)
        except FileNotFoundError:
            self._run_id = generate_run_id()
            logger.info("No previous run found — starting new run: %s", self._run_id)
    else:
        # Fresh run
        self._run_id = generate_run_id()

    self._run_path = init_run_directory(cfg.results_volume_path, self._run_id)
    write_run_metadata(
        self._run_path, config_name=cfg.name, run_id=self._run_id, agent_type="scientist"
    )
    logger.info("Run ID: %s", self._run_id)
    logger.info("Run path: %s", self._run_path)

    # Run state — initialised per entry point, persisted for resume
    self._run_state: RunState | None = None

    # Tools — use self._run_path for all output paths
    self._finding_tool = SaveFindingTool(results_path=self._run_path)
    self._note_tool = SaveNoteTool(
        notes_path=os.path.join(self._run_path, "notes"),
    )
    self._viz_tool = CreateVisualizationTool(
        cfg=cfg,
        display=self._display,
        notes_path=os.path.join(self._run_path, "notes"),
        results_path=self._run_path,
    )
    self._view_chart_tool = ViewChartTool(
        charts_path=os.path.join(self._run_path, "charts"),
        tables_path=os.path.join(self._run_path, "tables"),
    )
    self._register_tools()

    # Pre-build system prompt
    self._system_prompt = build_scientist_system_prompt(cfg)

run

run(instructions: str = '', rerun_analysis: bool = False) -> dict

Run the full research analysis pipeline.

By default, scans for existing state (silver tables, findings, charts) and resumes from where the pipeline left off. Set rerun_analysis=True to force a complete fresh start.

Parameters:

Name Type Description Default
instructions str

Optional high-level guidance prepended to every phase prompt. e.g., "Focus on 2023 data only."

''
rerun_analysis bool

If True, ignore all existing state and re-run every phase from scratch. Default is False (smart resume).

False

Returns a summary dict with findings, charts, and statistics.

Usage in a Databricks notebook::

agent = DataScientistAgent(cfg=cfg, dbutils=dbutils)
agent.run()                             # resumes from last state
agent.run(rerun_analysis=True)          # full fresh start
agent.run(instructions="Focus on 2024") # resume with guidance
Source code in src/versifai/science_agents/scientist/agent.py
def run(
    self,
    instructions: str = "",
    rerun_analysis: bool = False,
) -> dict:
    """
    Run the full research analysis pipeline.

    By default, scans for existing state (silver tables, findings,
    charts) and resumes from where the pipeline left off. Set
    ``rerun_analysis=True`` to force a complete fresh start.

    Args:
        instructions: Optional high-level guidance prepended to every
                      phase prompt. e.g., "Focus on 2023 data only."
        rerun_analysis: If True, ignore all existing state and re-run
                        every phase from scratch. Default is False
                        (smart resume).

    Returns a summary dict with findings, charts, and statistics.

    Usage in a Databricks notebook::

        agent = DataScientistAgent(cfg=cfg, dbutils=dbutils)
        agent.run()                             # resumes from last state
        agent.run(rerun_analysis=True)          # full fresh start
        agent.run(instructions="Focus on 2024") # resume with guidance
    """
    cfg = self._cfg
    self._instructions = instructions
    self._display.phase("DATA SCIENTIST AGENT STARTING")
    self._display.step(f"Project: {cfg.name}")
    self._display.step(f"Thesis: {cfg.thesis[:100]}...")
    self._display.step(f"Analysis themes: {len(cfg.analysis_themes)}")
    self._display.step(f"Results: {self._run_path}")
    self._display.step(f"Run ID: {self._run_id}")
    self._display.step(f"Tools: {self._registry.tool_names + ['ask_human']}")
    if rerun_analysis:
        self._display.step("Mode: FULL RE-RUN (ignoring existing state)")
    else:
        self._display.step("Mode: SMART RESUME (skipping completed phases)")

    # Ensure results directories exist
    os.makedirs(os.path.join(self._run_path, "charts"), exist_ok=True)
    os.makedirs(os.path.join(self._run_path, "tables"), exist_ok=True)
    os.makedirs(os.path.join(self._run_path, "notes"), exist_ok=True)

    # Initialise or resume run state
    if not rerun_analysis:
        existing = load_run_state(self._run_path)
        if existing and existing.status in ("running", "interrupted", "failed"):
            self._run_state = existing
            self._run_state.status = "running"
            self._display.step(f"Resuming previous run (was {existing.status})")
            self._display.step(f"  Completed phases: {existing.completed_phases}")
    if self._run_state is None:
        self._run_state = RunState(entry_point="run")
    save_run_state(self._run_path, self._run_state)

    # Resolve dependencies
    self._resolved_deps: dict[str, str] = {}
    if cfg.dependencies:
        for dep in cfg.dependencies:
            resolved = resolve_dependency(dep)
            self._resolved_deps[dep.config_name] = resolved
            self._display.step(f"Dependency '{dep.config_name}' -> {resolved}")

    try:
        # ── Pre-flight: discover what tables actually exist ─────
        available_tables, table_schemas = self._discover_catalog_tables()
        self._display.step(
            f"Found {len(available_tables)} tables in catalog: "
            f"{', '.join(sorted(available_tables)) if available_tables else '(none)'}"
        )

        if not available_tables:
            self._display.error(
                "No tables found in catalog. Run the Data Engineer agent first."
            )
            return self._build_summary()

        # Log table coverage — purely informational, nothing is skipped
        self._log_table_coverage(available_tables)

        # Rebuild system prompt with schemas injected (cached)
        # The agent queries data_catalog on demand via execute_sql
        self._system_prompt = build_scientist_system_prompt(
            cfg,
            table_schemas=table_schemas,
        )

        # ── Scan existing pipeline state ───────────────────────
        state = None
        if not rerun_analysis:
            state = self._scan_pipeline_state(available_tables)
            completed_silver = state["completed_silver"]
            completed_themes = state["completed_themes"]
            pending_silver = [
                ds.name for ds in cfg.silver_datasets if ds.name not in completed_silver
            ]
            all_themes = sorted(cfg.analysis_themes, key=lambda t: t.sequence)
            pending_themes = [t for t in all_themes if t.id not in completed_themes]

            self._display.step("--- Pipeline State ---")
            if completed_silver:
                self._display.step(
                    f"  Silver DONE ({len(completed_silver)}): "
                    f"{', '.join(sorted(completed_silver))}"
                )
            if pending_silver:
                self._display.step(
                    f"  Silver TODO ({len(pending_silver)}): {', '.join(pending_silver)}"
                )
            if completed_themes:
                self._display.step(
                    f"  Themes DONE ({len(completed_themes)}): "
                    f"{', '.join(sorted(completed_themes))}"
                )
            if pending_themes:
                self._display.step(
                    f"  Themes TODO ({len(pending_themes)}): "
                    f"{', '.join(t.id for t in pending_themes)}"
                )
            if state["has_findings"]:
                self._display.step(f"  Existing findings: {state['findings_count']}")
                # Load existing findings so synthesis has full picture
                self._load_existing_findings(state["existing_findings"])
            if state.get("existing_notes"):
                note_themes = sorted(state["existing_notes"].keys())
                self._display.step(
                    f"  Existing notes ({len(note_themes)}): {', '.join(note_themes)}"
                )
            self._display.step("---")

        # ── Phase 1: Orientation ────────────────────────────────
        # Always run — lightweight context-setting phase
        if self._run_state:
            self._run_state.mark_phase_start("orientation")
            self._save_state()
        self._display.phase("Phase 1: Orientation")
        self._run_phase(
            prompt=self._inject_instructions(
                build_orientation_prompt(cfg, available_tables, table_schemas)
            ),
            max_turns=cfg.max_turns_per_phase,
        )
        if self._run_state:
            self._run_state.mark_phase_complete("orientation")
            self._save_state()

        # ── Phase 2: Silver Construction ────────────────────────
        if self._run_state:
            self._run_state.mark_phase_start("silver")
            self._save_state()
        self._display.phase("Phase 2: Silver Dataset Construction")
        for i, dataset_spec in enumerate(cfg.silver_datasets, 1):
            # Smart resume: skip silver tables that already exist
            if state and dataset_spec.name in state["completed_silver"]:
                self._display.step(
                    f"  Silver {i}/{len(cfg.silver_datasets)} "
                    f"{dataset_spec.name} — SKIPPED (already exists)"
                )
                continue

            self._display.phase(
                f"Silver Dataset {i}/{len(cfg.silver_datasets)}: {dataset_spec.name}"
            )
            carryover = self._memory.get_carryover_context()
            self._memory.reset_for_new_source()
            self._consecutive_errors = 0
            self._missing_param_tracker.clear()

            prompt = build_silver_construction_prompt(
                cfg,
                dataset_spec,
                available_tables,
                table_schemas,
            )
            if carryover:
                prompt = f"## Context From Orientation\n{carryover}\n\n---\n\n{prompt}"

            self._run_phase(
                prompt=self._inject_instructions(prompt),
                max_turns=cfg.max_turns_per_phase,
            )
            self._memory.log_source_summary(
                f"silver_{dataset_spec.name}",
                f"Built silver dataset: {dataset_spec.name}",
            )
            if self._run_state:
                self._run_state.mark_item_complete("silver", dataset_spec.name)
                self._save_state()

        if self._run_state:
            self._run_state.mark_phase_complete("silver")
            self._save_state()

        # ── Phase 3: Theme-Driven Research Analysis ────────────
        all_themes = sorted(cfg.analysis_themes, key=lambda t: t.sequence)

        # If silver tables were just built, refresh catalog knowledge
        # so theme prompts include the new silver schemas
        if state and state["completed_silver"] != {ds.name for ds in cfg.silver_datasets}:
            refreshed, refreshed_schemas = self._discover_catalog_tables()
            if refreshed:
                available_tables = refreshed
                table_schemas = refreshed_schemas

        if self._run_state:
            self._run_state.mark_phase_start("themes")
            self._save_state()
        self._display.phase(
            f"Phase 3: Theme-Driven Research Analysis ({len(all_themes)} themes)"
        )
        themes_skipped = 0
        for _i, theme in enumerate(all_themes, 1):
            # Smart resume: skip themes that already have findings
            if state and theme.id in state["completed_themes"]:
                self._display.step(
                    f"  Theme {theme.sequence}/{len(all_themes)} "
                    f"{theme.title} — SKIPPED (findings exist)"
                )
                themes_skipped += 1
                continue

            self._display.phase(
                f"Theme {theme.sequence}/{len(all_themes)}: "
                f"{theme.title}{theme.question[:80]}..."
            )
            carryover = self._memory.get_carryover_context()
            self._memory.reset_for_new_source()
            self._consecutive_errors = 0
            self._missing_param_tracker.clear()

            # Load any prior-session notes for this theme
            theme_notes = self._note_tool.get_theme_notes_from_disk(theme.id)
            prompt = build_theme_analysis_prompt(
                cfg,
                theme,
                available_tables,
                table_schemas,
                existing_notes=theme_notes,
            )
            if carryover:
                prompt = f"## Context From Prior Work\n{carryover}\n\n---\n\n{prompt}"

            # Track findings count before this theme
            findings_before = len(self._finding_tool.findings)

            self._run_phase(
                prompt=self._inject_instructions(prompt),
                max_turns=cfg.max_turns_per_theme,
            )

            # Nudge agent if it forgot to call save_finding
            theme_findings = len(self._finding_tool.findings) - findings_before
            if theme_findings == 0:
                self._display.warning(
                    f"Theme {theme.sequence} produced 0 findings — "
                    f"prompting agent to record findings"
                )
                self._nudge_save_finding(theme)

            self._memory.log_source_summary(
                f"analysis_{theme.id}",
                f"Analyzed Theme {theme.sequence}: {theme.title}",
            )
            if self._run_state:
                self._run_state.mark_item_complete("themes", theme.id)
                self._save_state()
            self._display.success(f"Completed Theme {theme.sequence}: {theme.title}")

        if self._run_state:
            self._run_state.mark_phase_complete("themes")
            self._save_state()

        # ── Phase 3b: Retrospective Findings ────────────────────
        # If themes produced sparse findings, review charts/notes
        # and create structured findings from existing output.
        self._retrospective_findings(all_themes, available_tables, table_schemas)

        # ── Phase 4: Synthesis ──────────────────────────────────
        # Skip synthesis only if ALL themes were skipped (nothing new)
        new_findings = (
            len(self._finding_tool.findings) - state["findings_count"]
            if state
            else len(self._finding_tool.findings)
        )
        if state and themes_skipped == len(all_themes) and new_findings == 0:
            self._display.step("Phase 4: Synthesis — SKIPPED (no new findings)")
            self._display.step(
                "All phases complete. Nothing to do. Use "
                "rerun_analysis=True to force a fresh start."
            )
            if self._run_state:
                self._run_state.mark_phase_complete("synthesis")
                self._save_state()
        else:
            if self._run_state:
                self._run_state.mark_phase_start("synthesis")
                self._save_state()
            self._display.phase("Phase 4: Synthesis")
            self._memory.reset_for_new_source()
            self._consecutive_errors = 0
            self._missing_param_tracker.clear()

            findings_text = self._finding_tool.findings_summary_text()
            prompt = build_synthesis_prompt(cfg, findings_text)
            self._run_phase(prompt=prompt, max_turns=cfg.max_turns_per_phase)

            # Export findings JSON
            findings_path = self._finding_tool.export_findings_json()
            self._display.success(f"Findings exported to {findings_path}")
            if self._run_state:
                self._run_state.mark_phase_complete("synthesis")
                self._save_state()

        # Mark run as completed
        if self._run_state:
            self._run_state.mark_completed()
            self._save_state()

    except KeyboardInterrupt:
        self._display.warning("Agent interrupted by user.")
        if self._run_state:
            self._run_state.mark_interrupted()
            self._save_state()
        self._dump_progress_on_crash()
    except Exception as e:
        self._display.error(f"Agent failed: {e}")
        logger.exception("Scientist agent top-level failure")
        if self._run_state:
            self._run_state.mark_failed(str(e))
            self._save_state()
        self._dump_progress_on_crash()

    # Always export findings (even after crash/interrupt) so nothing is lost
    if self._finding_tool.findings:
        try:
            findings_path = self._finding_tool.export_findings_json()
            self._display.success(f"Findings exported to {findings_path}")
        except Exception as export_err:
            self._display.warning(f"Could not export findings: {export_err}")

    # Update run metadata on completion
    write_run_metadata(
        self._run_path,
        cfg.name,
        self._run_id,
        extra={
            "completed_at": datetime.now().isoformat(),
            "total_findings": len(self._finding_tool.findings),
            "total_charts": len(self._viz_tool.charts_created),
        },
    )

    # Final summary
    summary = self._build_summary()
    self._display.phase("RESEARCH ANALYSIS COMPLETE")
    self._display.step(f"Total findings: {len(self._finding_tool.findings)}")
    self._display.step(f"Charts created: {len(self._viz_tool.charts_created)}")
    self._display.step(f"LLM usage: {self._llm.usage_summary}")

    return summary

run_visualizations

run_visualizations(chart_types: list[str] | None = None, instructions: str = '') -> dict

Re-generate visualizations and output tables from existing data.

Pre-scans Unity Catalog for silver/bronze tables and the results volume for existing charts and tables. Then runs a focused prompt per theme that uses existing data to create new outputs.

Parameters:

Name Type Description Default
chart_types list[str] | None

Only regenerate these output types. e.g., ["choropleth", "dual_choropleth"] for maps only, ["table"] for summary tables only, or None for everything.

None
instructions str

High-level guidance for the agent. e.g., "Recreate all choropleths with RdYlGn scale. Also create a summary table per theme."

''

Usage in a Databricks notebook::

agent = DataScientistAgent(cfg=cfg, dbutils=dbutils)
agent.run_visualizations()                                  # all outputs
agent.run_visualizations(chart_types=["choropleth"])        # maps only
agent.run_visualizations(chart_types=["table"])             # tables only
agent.run_visualizations(
    chart_types=["choropleth", "dual_choropleth"],
    instructions="Every map must cover all ~3,200 US counties."
)
Source code in src/versifai/science_agents/scientist/agent.py
def run_visualizations(
    self,
    chart_types: list[str] | None = None,
    instructions: str = "",
) -> dict:
    """
    Re-generate visualizations and output tables from existing data.

    Pre-scans Unity Catalog for silver/bronze tables and the results
    volume for existing charts and tables. Then runs a focused prompt
    per theme that uses existing data to create new outputs.

    Args:
        chart_types: Only regenerate these output types.
                     e.g., ``["choropleth", "dual_choropleth"]`` for
                     maps only, ``["table"]`` for summary tables only,
                     or ``None`` for everything.
        instructions: High-level guidance for the agent. e.g.,
                      "Recreate all choropleths with RdYlGn scale.
                      Also create a summary table per theme."

    Usage in a Databricks notebook::

        agent = DataScientistAgent(cfg=cfg, dbutils=dbutils)
        agent.run_visualizations()                                  # all outputs
        agent.run_visualizations(chart_types=["choropleth"])        # maps only
        agent.run_visualizations(chart_types=["table"])             # tables only
        agent.run_visualizations(
            chart_types=["choropleth", "dual_choropleth"],
            instructions="Every map must cover all ~3,200 US counties."
        )
    """
    cfg = self._cfg
    self._instructions = instructions
    focus = chart_types or []
    focus_label = ", ".join(focus) if focus else "all"
    self._display.phase(f"VISUALIZATION REFRESH — {focus_label}")

    os.makedirs(os.path.join(self._run_path, "charts"), exist_ok=True)
    os.makedirs(os.path.join(self._run_path, "tables"), exist_ok=True)
    os.makedirs(os.path.join(self._run_path, "notes"), exist_ok=True)

    # Fresh run state for this entry point
    self._run_state = RunState(entry_point="run_visualizations")
    self._save_state()

    try:
        # ── Pre-scan ──────────────────────────────────────────────
        available_tables, table_schemas = self._discover_catalog_tables()
        silver_tables = sorted(t for t in available_tables if t.startswith("silver_"))
        bronze_tables = sorted(
            t for t in available_tables if not t.startswith("silver_") and t != "data_catalog"
        )
        self._display.step(
            f"Silver tables: {', '.join(silver_tables) if silver_tables else '(none)'}"
        )

        if not silver_tables:
            self._display.error("No silver tables found. Run the full pipeline first.")
            return self._build_summary()

        # Scan existing outputs (charts + tables)
        existing_outputs = self._scan_existing_outputs()
        existing_charts = existing_outputs.get("charts", [])
        existing_tables = existing_outputs.get("tables", [])
        if existing_charts:
            self._display.step(
                f"Existing charts ({len(existing_charts)}): "
                f"{', '.join(existing_charts[:10])}"
                + (
                    f"... +{len(existing_charts) - 10} more"
                    if len(existing_charts) > 10
                    else ""
                )
            )
        if existing_tables:
            self._display.step(
                f"Existing tables ({len(existing_tables)}): "
                f"{', '.join(existing_tables[:10])}"
                + (
                    f"... +{len(existing_tables) - 10} more"
                    if len(existing_tables) > 10
                    else ""
                )
            )

        self._system_prompt = build_scientist_system_prompt(
            cfg,
            table_schemas=table_schemas,
        )

        # ── Per-theme visualization ───────────────────────────────
        all_themes = sorted(cfg.analysis_themes, key=lambda t: t.sequence)
        if self._run_state:
            self._run_state.mark_phase_start("visualizations")
            self._save_state()
        for theme in all_themes:
            self._display.phase(f"Viz: Theme {theme.sequence}{theme.title}")
            self._memory.reset_for_new_source()
            self._consecutive_errors = 0
            self._missing_param_tracker.clear()

            # Load prior-session notes for context
            theme_notes = self._note_tool.get_theme_notes_from_disk(theme.id)
            prompt = self._build_viz_refresh_prompt(
                theme,
                silver_tables,
                bronze_tables,
                existing_charts,
                existing_tables,
                focus,
                existing_notes=theme_notes,
            )
            self._run_phase(
                prompt=self._inject_instructions(prompt),
                max_turns=30,
            )
            if self._run_state:
                self._run_state.mark_item_complete("visualizations", theme.id)
                self._save_state()
            self._display.success(f"Viz complete: Theme {theme.sequence}")

        if self._run_state:
            self._run_state.mark_phase_complete("visualizations")
            self._run_state.mark_completed()
            self._save_state()

    except KeyboardInterrupt:
        self._display.warning("Interrupted.")
        if self._run_state:
            self._run_state.mark_interrupted()
            self._save_state()
        self._dump_progress_on_crash()
    except Exception as e:
        self._display.error(f"Failed: {e}")
        logger.exception("run_visualizations failure")
        if self._run_state:
            self._run_state.mark_failed(str(e))
            self._save_state()
        self._dump_progress_on_crash()

    summary = self._build_summary()
    self._display.phase("VISUALIZATION REFRESH COMPLETE")
    self._display.step(f"Charts created: {len(self._viz_tool.charts_created)}")
    self._display.step(f"LLM usage: {self._llm.usage_summary}")
    return summary

run_themes

run_themes(start_theme: int = 0, themes: list[int] | None = None, synthesize: bool = True, instructions: str = '') -> dict

Re-run ONLY the theme analysis phase (and optionally synthesis).

Skips orientation and silver construction — assumes silver tables already exist in Unity Catalog. Useful for re-running analysis with updated prompts, regenerating visualizations, or continuing from a specific theme.

Parameters:

Name Type Description Default
start_theme int

Theme sequence number to start from (0 = all themes). e.g., start_theme=3 skips Themes 0-2. Ignored if themes is provided.

0
themes list[int] | None

Explicit list of theme sequence numbers to run. e.g., themes=[1, 4, 7] runs only those three themes. Overrides start_theme when provided.

None
synthesize bool

Whether to run the synthesis phase after themes.

True
instructions str

High-level guidance for the agent. e.g., "Focus on generating choropleths for every finding."

''

Usage in a Databricks notebook::

agent = DataScientistAgent(cfg=cfg, dbutils=dbutils)
agent.run_themes()                    # all themes + synthesis
agent.run_themes(start_theme=4)       # themes 4-8 + synthesis
agent.run_themes(themes=[1, 4, 7])    # only these three themes
agent.run_themes(themes=[3])           # single theme re-run
agent.run_themes(synthesize=False)     # themes only, no synthesis
agent.run_themes(instructions="Prioritize geographic maps.")
Source code in src/versifai/science_agents/scientist/agent.py
def run_themes(
    self,
    start_theme: int = 0,
    themes: list[int] | None = None,
    synthesize: bool = True,
    instructions: str = "",
) -> dict:
    """
    Re-run ONLY the theme analysis phase (and optionally synthesis).

    Skips orientation and silver construction — assumes silver tables
    already exist in Unity Catalog. Useful for re-running analysis with
    updated prompts, regenerating visualizations, or continuing from a
    specific theme.

    Args:
        start_theme: Theme sequence number to start from (0 = all themes).
                     e.g., ``start_theme=3`` skips Themes 0-2.
                     Ignored if ``themes`` is provided.
        themes: Explicit list of theme sequence numbers to run.
                e.g., ``themes=[1, 4, 7]`` runs only those three themes.
                Overrides ``start_theme`` when provided.
        synthesize: Whether to run the synthesis phase after themes.
        instructions: High-level guidance for the agent. e.g.,
                      "Focus on generating choropleths for every finding."

    Usage in a Databricks notebook::

        agent = DataScientistAgent(cfg=cfg, dbutils=dbutils)
        agent.run_themes()                    # all themes + synthesis
        agent.run_themes(start_theme=4)       # themes 4-8 + synthesis
        agent.run_themes(themes=[1, 4, 7])    # only these three themes
        agent.run_themes(themes=[3])           # single theme re-run
        agent.run_themes(synthesize=False)     # themes only, no synthesis
        agent.run_themes(instructions="Prioritize geographic maps.")
    """
    cfg = self._cfg
    self._instructions = instructions
    self._display.phase("DATA SCIENTIST AGENT — THEMES ONLY")
    self._display.step(f"Project: {cfg.name}")
    self._display.step(f"Thesis: {cfg.thesis[:100]}...")
    if themes is not None:
        self._display.step(f"Themes: {themes}")
    else:
        self._display.step(f"Start theme: {start_theme}")

    os.makedirs(os.path.join(self._run_path, "charts"), exist_ok=True)
    os.makedirs(os.path.join(self._run_path, "tables"), exist_ok=True)
    os.makedirs(os.path.join(self._run_path, "notes"), exist_ok=True)

    # Fresh run state for this entry point
    self._run_state = RunState(entry_point="run_themes")
    self._save_state()

    try:
        # ── Pre-flight (still needed for schemas + data catalog) ──
        available_tables, table_schemas = self._discover_catalog_tables()
        self._display.step(f"Found {len(available_tables)} tables in catalog")
        if not available_tables:
            self._display.error("No tables found. Run the Data Engineer first.")
            return self._build_summary()

        self._system_prompt = build_scientist_system_prompt(
            cfg,
            table_schemas=table_schemas,
        )

        # ── Theme Analysis ────────────────────────────────────────
        all_themes = sorted(cfg.analysis_themes, key=lambda t: t.sequence)
        if themes is not None:
            theme_set = set(themes)
            themes_to_run = [t for t in all_themes if t.sequence in theme_set]
        else:
            themes_to_run = [t for t in all_themes if t.sequence >= start_theme]
        if self._run_state:
            self._run_state.mark_phase_start("themes")
            self._save_state()
        self._display.phase(
            f"Theme Analysis ({len(themes_to_run)} of {len(all_themes)} themes)"
        )

        for _i, theme in enumerate(themes_to_run, 1):
            self._display.phase(
                f"Theme {theme.sequence}/{len(all_themes)}: "
                f"{theme.title}{theme.question[:80]}..."
            )
            carryover = self._memory.get_carryover_context()
            self._memory.reset_for_new_source()
            self._consecutive_errors = 0
            self._missing_param_tracker.clear()

            # Load any prior-session notes for this theme
            theme_notes = self._note_tool.get_theme_notes_from_disk(theme.id)
            prompt = build_theme_analysis_prompt(
                cfg,
                theme,
                available_tables,
                table_schemas,
                existing_notes=theme_notes,
            )
            if carryover:
                prompt = f"## Context From Prior Work\n{carryover}\n\n---\n\n{prompt}"

            # Track findings count before this theme
            findings_before = len(self._finding_tool.findings)

            self._run_phase(
                prompt=self._inject_instructions(prompt),
                max_turns=cfg.max_turns_per_theme,
            )

            # Nudge agent if it forgot to call save_finding
            theme_findings = len(self._finding_tool.findings) - findings_before
            if theme_findings == 0:
                self._display.warning(
                    f"Theme {theme.sequence} produced 0 findings — "
                    f"prompting agent to record findings"
                )
                self._nudge_save_finding(theme)

            self._memory.log_source_summary(
                f"analysis_{theme.id}",
                f"Analyzed Theme {theme.sequence}: {theme.title}",
            )
            if self._run_state:
                self._run_state.mark_item_complete("themes", theme.id)
                self._save_state()
            self._display.success(f"Completed Theme {theme.sequence}: {theme.title}")

        if self._run_state:
            self._run_state.mark_phase_complete("themes")
            self._save_state()

        # ── Retrospective Findings ─────────────────────────────────
        self._retrospective_findings(themes_to_run, available_tables, table_schemas)

        # ── Synthesis (optional) ──────────────────────────────────
        if synthesize:
            if self._run_state:
                self._run_state.mark_phase_start("synthesis")
                self._save_state()
            self._display.phase("Synthesis")
            self._memory.reset_for_new_source()
            self._consecutive_errors = 0
            self._missing_param_tracker.clear()

            findings_text = self._finding_tool.findings_summary_text()
            prompt = build_synthesis_prompt(cfg, findings_text)
            self._run_phase(
                prompt=self._inject_instructions(prompt),
                max_turns=cfg.max_turns_per_phase,
            )

            findings_path = self._finding_tool.export_findings_json()
            self._display.success(f"Findings exported to {findings_path}")
            if self._run_state:
                self._run_state.mark_phase_complete("synthesis")
                self._save_state()

        if self._run_state:
            self._run_state.mark_completed()
            self._save_state()

    except KeyboardInterrupt:
        self._display.warning("Agent interrupted by user.")
        if self._run_state:
            self._run_state.mark_interrupted()
            self._save_state()
        self._dump_progress_on_crash()
    except Exception as e:
        self._display.error(f"Agent failed: {e}")
        logger.exception("Scientist agent run_themes failure")
        if self._run_state:
            self._run_state.mark_failed(str(e))
            self._save_state()
        self._dump_progress_on_crash()

    # Always export findings (even after crash/interrupt) so nothing is lost
    if self._finding_tool.findings:
        try:
            findings_path = self._finding_tool.export_findings_json()
            self._display.success(f"Findings exported to {findings_path}")
        except Exception as export_err:
            self._display.warning(f"Could not export findings: {export_err}")

    summary = self._build_summary()
    self._display.phase("THEME ANALYSIS COMPLETE")
    self._display.step(f"Total findings: {len(self._finding_tool.findings)}")
    self._display.step(f"Charts created: {len(self._viz_tool.charts_created)}")
    self._display.step(f"LLM usage: {self._llm.usage_summary}")
    return summary

run_validation

run_validation(themes: list[int] | None = None, instructions: str = '') -> dict

Validate and improve all analysis outputs against original specs.

Audits every theme's findings, charts, and tables for correctness, data quality, visualization quality, and completeness. Rebuilds anything that is wrong or unconvincing. Stores all validation findings into notes.

The agent has FULL tool access — it can re-run queries, rebuild charts with advanced types, update findings, and create new outputs.

Parameters:

Name Type Description Default
themes list[int] | None

Specific theme sequence numbers to validate. e.g., themes=[1, 3, 7]. None = all themes.

None
instructions str

High-level guidance for the agent.

''

Usage in a Databricks notebook::

agent = DataScientistAgent(cfg=cfg, dbutils=dbutils)
agent.run_validation()                     # validate all themes
agent.run_validation(themes=[1, 3, 7])     # validate specific themes
agent.run_validation(instructions="Focus on upgrading chart quality.")
Source code in src/versifai/science_agents/scientist/agent.py
def run_validation(
    self,
    themes: list[int] | None = None,
    instructions: str = "",
) -> dict:
    """
    Validate and improve all analysis outputs against original specs.

    Audits every theme's findings, charts, and tables for correctness,
    data quality, visualization quality, and completeness. Rebuilds
    anything that is wrong or unconvincing. Stores all validation
    findings into notes.

    The agent has FULL tool access — it can re-run queries, rebuild
    charts with advanced types, update findings, and create new outputs.

    Args:
        themes: Specific theme sequence numbers to validate.
                e.g., ``themes=[1, 3, 7]``. None = all themes.
        instructions: High-level guidance for the agent.

    Usage in a Databricks notebook::

        agent = DataScientistAgent(cfg=cfg, dbutils=dbutils)
        agent.run_validation()                     # validate all themes
        agent.run_validation(themes=[1, 3, 7])     # validate specific themes
        agent.run_validation(instructions="Focus on upgrading chart quality.")
    """
    cfg = self._cfg
    self._instructions = instructions
    self._display.phase("VALIDATION & QUALITY REVIEW")
    self._display.step(f"Project: {cfg.name}")
    if themes is not None:
        self._display.step(f"Themes to validate: {themes}")
    else:
        self._display.step("Validating ALL themes")

    os.makedirs(os.path.join(self._run_path, "charts"), exist_ok=True)
    os.makedirs(os.path.join(self._run_path, "tables"), exist_ok=True)
    os.makedirs(os.path.join(self._run_path, "notes"), exist_ok=True)

    # Fresh run state for this entry point
    self._run_state = RunState(entry_point="run_validation")
    self._save_state()

    try:
        # ── Pre-flight ────────────────────────────────────────────
        available_tables, table_schemas = self._discover_catalog_tables()
        self._display.step(f"Found {len(available_tables)} tables in catalog")

        if not available_tables:
            self._display.error("No tables found. Run the pipeline first.")
            return self._build_summary()

        self._system_prompt = build_scientist_system_prompt(
            cfg,
            table_schemas=table_schemas,
        )

        # ── Scan existing outputs ─────────────────────────────────
        existing_outputs = self._scan_existing_outputs()
        all_charts = existing_outputs.get("charts", [])
        all_tables = existing_outputs.get("tables", [])

        self._display.step(f"Existing charts: {len(all_charts)}, tables: {len(all_tables)}")

        # ── Load findings.json ────────────────────────────────────
        findings_path = os.path.join(self._run_path, "findings.json")
        all_findings: list[dict] = []
        if os.path.isfile(findings_path):
            try:
                with open(findings_path) as f:
                    data = json.load(f)
                if isinstance(data, list):
                    all_findings = data
                    # Load into the finding tool for continuity
                    self._load_existing_findings(all_findings)
            except (json.JSONDecodeError, OSError):
                pass
        self._display.step(f"Loaded {len(all_findings)} existing findings")

        # ── Per-theme validation ──────────────────────────────────
        all_themes = sorted(cfg.analysis_themes, key=lambda t: t.sequence)
        if themes is not None:
            theme_set = set(themes)
            themes_to_validate = [t for t in all_themes if t.sequence in theme_set]
        else:
            themes_to_validate = all_themes

        if self._run_state:
            self._run_state.mark_phase_start("validation")
            self._save_state()
        self._display.phase(f"Validating {len(themes_to_validate)} of {len(all_themes)} themes")

        for theme in themes_to_validate:
            self._display.phase(f"Validate Theme {theme.sequence}: {theme.title}")
            self._memory.reset_for_new_source()
            self._consecutive_errors = 0
            self._missing_param_tracker.clear()

            # Gather theme-specific outputs
            theme_prefix = f"theme{theme.sequence}_"
            alt_prefix = f"t{theme.sequence}_"
            theme_id_prefix = f"{theme.id}_"
            theme_charts = [
                c
                for c in all_charts
                if c.lower().startswith(theme_prefix)
                or c.lower().startswith(alt_prefix)
                or c.lower().startswith(theme_id_prefix)
            ]
            theme_tables = [
                t
                for t in all_tables
                if t.lower().startswith(theme_prefix)
                or t.lower().startswith(alt_prefix)
                or t.lower().startswith(theme_id_prefix)
            ]
            theme_findings = [
                f for f in all_findings if f.get("research_question_id", "") == theme.id
            ]
            theme_notes = self._note_tool.get_theme_notes_from_disk(theme.id)

            self._display.step(
                f"  Findings: {len(theme_findings)}, "
                f"Charts: {len(theme_charts)}, "
                f"Tables: {len(theme_tables)}"
            )

            prompt = build_validation_prompt(
                cfg=cfg,
                theme=theme,
                findings_for_theme=theme_findings,
                chart_files=theme_charts,
                table_files=theme_tables,
                theme_notes=theme_notes,
                instructions=instructions,
            )

            self._run_phase(
                prompt=prompt,
                max_turns=120,
            )
            if self._run_state:
                self._run_state.mark_item_complete("validation", theme.id)
                self._save_state()
            self._display.success(f"Validated Theme {theme.sequence}: {theme.title}")

        # ── Re-export updated findings ────────────────────────────
        updated_path = self._finding_tool.export_findings_json()
        self._display.success(f"Updated findings exported to {updated_path}")

        if self._run_state:
            self._run_state.mark_phase_complete("validation")
            self._run_state.mark_completed()
            self._save_state()

    except KeyboardInterrupt:
        self._display.warning("Interrupted.")
        if self._run_state:
            self._run_state.mark_interrupted()
            self._save_state()
        self._dump_progress_on_crash()
    except Exception as e:
        self._display.error(f"Validation failed: {e}")
        logger.exception("run_validation failure")
        if self._run_state:
            self._run_state.mark_failed(str(e))
            self._save_state()
        self._dump_progress_on_crash()

    # Always export findings (even after crash/interrupt) so nothing is lost
    if self._finding_tool.findings:
        try:
            findings_path = self._finding_tool.export_findings_json()
            self._display.success(f"Findings exported to {findings_path}")
        except Exception as export_err:
            self._display.warning(f"Could not export findings: {export_err}")

    summary = self._build_summary()
    self._display.phase("VALIDATION COMPLETE")
    self._display.step(f"Total findings: {len(self._finding_tool.findings)}")
    self._display.step(f"Charts created: {len(self._viz_tool.charts_created)}")
    self._display.step(f"LLM usage: {self._llm.usage_summary}")
    return summary

Configuration

ResearchConfig dataclass

ResearchConfig(name: str = '', thesis: str = '', llm: LLMConfig = LLMConfig(), agent_role: str = 'Data Scientist', domain_context: str = '', analysis_method_guidance: dict[str, str] = dict(), visualization_guidance: str = '', project: ProjectConfig = (lambda: _default_project())(), results_volume_path: str = '', analysis_themes: list[AnalysisTheme] = list(), silver_datasets: list[SilverDatasetSpec] = list(), research_references: list[ResearchReference] = list(), max_turns: int = 150, max_turns_per_phase: int = 120, max_turns_per_theme: int = 120, run_id: str = '', dependencies: list[AgentDependency] = list(), mlflow_experiment: str = '', mlflow_registry_name: str = '', chart_style: str = 'seaborn-v0_8-whitegrid', chart_dpi: int = 150, color_palette: str = 'viridis')

Configuration for a research analysis project.

The DataScientistAgent uses this to drive its analysis workflow. Assemble one from building blocks (themes, silver datasets, references) and pass it to the agent. The agent code is generic — all domain knowledge lives in the config instance.

Usage::

cfg = ResearchConfig(
    name="My Analysis",
    thesis="...",
    analysis_themes=[theme_a, theme_b],
    silver_datasets=[silver_county_master, silver_contract_year],
    research_references=[ref_kff, ref_cms],
)
agent = DataScientistAgent(cfg=cfg)
agent.run()

analysis_themes_text property

analysis_themes_text: str

Formatted text listing all analysis themes for prompts.

analysis_themes_detail_text property

analysis_themes_detail_text: str

Detailed theme descriptions including steps, tables, visuals, and punchlines.

delivery_sequence_text property

delivery_sequence_text: str

Delivery sequencing guidance for the synthesis phase.

research_questions_text property

research_questions_text: str

Backward-compatible text — derives from analysis_themes.

all_required_tables property

all_required_tables: list[str]

Union of required_tables from all themes + source processing hint targets.

AnalysisTheme dataclass

AnalysisTheme(id: str, title: str, question: str, analysis_steps: list[str] = list(), tables_to_produce: list[str] = list(), signature_visualization: str = '', visualization_notes: str = '', punchline: str = '', required_tables: list[str] = list(), data_notes: str = '', analysis_type: str = 'comparative', sequence: int = 1)

A major analysis theme with detailed methodology, required tables, and the argument it builds toward the thesis.

Each theme becomes one phase of the research analysis pipeline.

OUTPUT PHILOSOPHY: - tables_to_produce: The PRIMARY analytical output. Every theme must produce well-formatted summary tables with statistics, p-values, and effect sizes. These carry the argument. - signature_visualization: The ONE dense, multi-dimensional, irreplaceable chart for this theme (if any). Not every theme gets one — only 6 themes have prescribed signature visualizations. - visualization_notes: Guidance on what NOT to visualize, and encouragement to create custom discovery-driven visuals when the agent finds something unexpected in the data.

SilverDatasetSpec dataclass

SilverDatasetSpec(name: str, description: str, source_tables: list[str] = list(), join_key: str = '', time_column: str = 'source_year', data_notes: str = '')

Defines a silver-layer pre-joined dataset the agent should build.

ResearchReference dataclass

ResearchReference(title: str, url: str = '', description: str = '', keywords: list[str] = list())

A published research paper or report relevant to the thesis.

ResearchQuestion dataclass

ResearchQuestion(id: str, question: str, required_tables: list[str] = list(), analysis_type: str = 'descriptive')

A specific research question the agent should investigate.