Data Agents¶
The versifai.data_agents package provides agents for data engineering and quality validation.
DataEngineerAgent¶
DataEngineerAgent
¶
DataEngineerAgent(cfg: ProjectConfig | None = None, dbutils=None, volume_path: str | None = None)
Autonomous data engineering agent powered by Claude.
The agent processes data sources one at a time, resetting its conversation memory between sources to stay within context limits. Cross-source knowledge (schemas, decisions) is carried forward via the memory's carryover context.
Usage in a Databricks notebook::
from versifai.data_agents.engineer.agent import DataEngineerAgent
from versifai.data_agents.engineer.config import ProjectConfig
cfg = ProjectConfig() # or customize for your project
agent = DataEngineerAgent(cfg=cfg, dbutils=dbutils)
agent.run()
Source code in src/versifai/data_agents/engineer/agent.py
run
¶
Run the full agent pipeline
- Discovery — crawl the volume, identify all data sources 1.5. Incremental detection — check existing tables for loaded files
- Per-source processing — process each source independently
- Acceptance loop — analyst validates, engineer fixes, repeat
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
force_full
|
bool
|
If True, skip incremental detection and reprocess all sources from scratch (overwrite existing tables). |
False
|
source_path
|
str | None
|
If provided, skip discovery and process only this directory path directly. Useful for re-processing a single source or adding a new source without re-crawling everything. |
None
|
Returns a summary dict with results and statistics.
Source code in src/versifai/data_agents/engineer/agent.py
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 | |
run_rename
¶
Run the retroactive column rename phase.
Reads documentation for each existing table in Unity Catalog, builds a mapping from cryptic column names to descriptive names, and applies ALTER TABLE RENAME COLUMN statements.
This is safe to run at any time — it's metadata-only on Delta tables and doesn't rewrite data.
Usage in a Databricks notebook::
from versifai.data_agents.engineer.agent import DataEngineerAgent
from versifai.data_agents.engineer.config import ProjectConfig
cfg = ProjectConfig()
agent = DataEngineerAgent(cfg=cfg, dbutils=dbutils)
agent.run_rename()
Source code in src/versifai/data_agents/engineer/agent.py
run_catalog
¶
Build or update the data_catalog table documenting every column in every table.
Uses delta-fill: detects which tables are new or changed since the last run and only documents those. Existing, unchanged entries are preserved.
Hard constraint: every table in Unity Catalog must be documented. The agent will verify 100% coverage before finishing.
Safe to run at any time — incrementally updates the data_catalog table.
Usage in a Databricks notebook::
from versifai.data_agents.engineer.agent import DataEngineerAgent
from versifai.data_agents.engineer.config import ProjectConfig
cfg = ProjectConfig()
agent = DataEngineerAgent(cfg=cfg, dbutils=dbutils)
agent.run_catalog()
Source code in src/versifai/data_agents/engineer/agent.py
run_quality_check
¶
Run acceptance testing on tables in Unity Catalog.
Uses the data_catalog table (if it exists) to ground the analyst in real table names, real column names, and real descriptions. This prevents hallucination and ensures all validation queries reference actual schema objects.
Should be called AFTER run_catalog() so the data_catalog is available.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tables
|
list[str] | None
|
Optional list of specific table names to validate. If None, validates ALL tables. Use this to focus on a specific dataset at a time (e.g., ["scc_enrollment"]). |
None
|
hints
|
str
|
Optional user instructions injected into the analyst prompt. Use this to tell the analyst what to focus on or what specific issues you want checked. Example: "Check that enrollment has no suppressed values. Star ratings should have 2022-2026 coverage." |
''
|
Usage in a Databricks notebook::
from versifai.data_agents.engineer.agent import DataEngineerAgent
from versifai.data_agents.engineer.config import ProjectConfig
cfg = ProjectConfig()
agent = DataEngineerAgent(cfg=cfg, dbutils=dbutils)
# Check everything:
agent.run_quality_check()
# Check specific tables with guidance:
agent.run_quality_check(
tables=["scc_enrollment", "star_ratings_summary"],
hints="Verify enrollment suppression handling. "
"Star ratings should cover 2022-2026."
)
Source code in src/versifai/data_agents/engineer/agent.py
567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 | |
DataAnalystAgent¶
DataAnalystAgent
¶
DataAnalystAgent(cfg: ProjectConfig | None = None, dbutils=None, max_turns: int = 80, table_inventory: list[dict] | None = None, user_hints: str = '')
Acceptance testing agent that validates tables in Unity Catalog.
The analyst has a limited tool set — primarily SQL execution and catalog listing. It doesn't need file readers or transformers because it works only with data already in the catalog.
Usage::
from versifai.data_agents.engineer.config import ProjectConfig
cfg = ProjectConfig()
analyst = DataAnalystAgent(cfg=cfg, dbutils=dbutils)
feedback = analyst.run()
# feedback is a dict with verdicts per table
Source code in src/versifai/data_agents/analyst/agent.py
run
¶
Run acceptance testing on all tables in the target schema.
Returns a structured feedback dict with per-table verdicts: { "verdicts": { "table_name": {"status": "...", "issues": [...], "fixes": [...]} }, "cross_table_issues": [...], "overall_status": "PASS | NEEDS_WORK", "summary": "...", "raw_response": "..." }
Source code in src/versifai/data_agents/analyst/agent.py
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 | |
Configuration¶
ProjectConfig
dataclass
¶
ProjectConfig(name: str = '', description: str = '', domain_expertise: str = '', analyst_specialty: str = '', catalog: str = '', schema: str = '', llm: LLMConfig = LLMConfig(), volume_path: str = '', staging_path: str = '', join_key: JoinKeyConfig = JoinKeyConfig(), alternative_keys: list[AlternativeKeyConfig] = list(), geographic_grain: str = '', grain_description: str = '', staging_flush_threshold_rows: int = 30000000, metadata_columns: list[MetadataColumnConfig] = list(), naming_convention: str = 'snake_case', naming_rules: str = '', column_naming_examples: str = '', grain_detection_guidance: str = '', known_sources: list[DataSourceHint] = list(), documentation_urls: dict[str, list[str]] = dict(), source_processing_hints: list[SourceProcessingHint] = list())
Configuration for a data engineering project.
The DataEngineerAgent uses this to drive its data processing pipeline. Assemble one from building blocks (join keys, source hints, metadata) and pass it to the agent. The agent code is generic — all domain knowledge lives in the config instance.
known_sources_text
property
¶
Formatted text block listing known data sources for prompts.
metadata_columns_text
property
¶
Formatted text block listing metadata columns for prompts.
alternative_keys_text
property
¶
Formatted text about recognized alternative keys for prompts.
alternative_key_names
property
¶
List of alternative key column names.
join_key_related_text
property
¶
Formatted text about related geographic columns.
metadata_column_names
property
¶
List of metadata column name strings.
expected_tables
property
¶
All tables the pipeline MUST produce (from source processing hints).
get_source_hint
¶
get_source_hint(source_name: str) -> SourceProcessingHint | None
Find a matching SourceProcessingHint for a given source directory name.
Source code in src/versifai/data_agents/engineer/config.py
format_source_hint
¶
Format the source processing hint as prompt text, or empty string if none.
Source code in src/versifai/data_agents/engineer/config.py
JoinKeyConfig
dataclass
¶
JoinKeyConfig(column_name: str = '', data_type: str = 'STRING', description: str = '', width: int = 0, validation_rule: str = '', expected_entity_count: int = 0, related_columns: list[dict] = list())
Defines the primary join key for cross-table joins.
AlternativeKeyConfig
dataclass
¶
AlternativeKeyConfig(column_name: str, description: str, data_type: str = 'STRING', grain: str = '')
An alternative join key for tables at a non-county grain.
MetadataColumnConfig
dataclass
¶
Defines metadata columns automatically added to every table.
DataSourceHint
dataclass
¶
Optional hint about a known data source — helps the agent recognize it.
SourceProcessingHint
dataclass
¶
SourceProcessingHint(source_pattern: str, description: str, multi_table: bool = False, files: list[SourceFileHint] = list(), notes: str = '')
Per-source processing instructions for a specific data source directory.
When multi_table=True, the agent should create a SEPARATE table for each file matching the hints, rather than combining everything into one table.
SourceFileHint
dataclass
¶
Describes a specific file expected within a source directory/archive.
Data Models¶
FileInfo
dataclass
¶
FileInfo(path: str, name: str, extension: str, size_bytes: int, modified_at: datetime | None = None, is_archive: bool = False, extracted_to: str | None = None, encoding: str | None = None, row_count: int | None = None, column_count: int | None = None, columns: list[str] = list())
Metadata about a single file discovered in the Volume.
from_path
classmethod
¶
from_path(path: str) -> FileInfo
Build a FileInfo from a filesystem path.
Source code in src/versifai/data_agents/models/source.py
FileGroup
dataclass
¶
FileGroup(name: str, folder_path: str, files: list[FileInfo] = list(), documentation_files: list[FileInfo] = list(), data_files: list[FileInfo] = list(), archive_files: list[FileInfo] = list())
A logical grouping of related files within a Volume subfolder.
For example, all NOAA weather station CSVs across multiple years might be grouped together under a single FileGroup.
classify_files
¶
Sort files into documentation, data, and archive buckets.
Source code in src/versifai/data_agents/models/source.py
DataSource
dataclass
¶
DataSource(name: str, file_group: FileGroup, description: str = '', documentation_summary: str = '', profile_summary: str = '', target_table_name: str = '', year_range: str = '', county_fips_column: str = '', notes: list[str] = list())
Top-level representation of a data source that the agent will process.
Combines the file group with profiling results, schema decisions, and processing metadata.
ColumnDefinition
dataclass
¶
ColumnDefinition(target_name: str, data_type: str, source_name: str, description: str = '', nullable: bool = True, is_fips: bool = False, is_metadata: bool = False, transform_expression: str = '')
Definition of a single column in a target schema.
TargetSchema
dataclass
¶
TargetSchema(source_name: str, table_name: str, columns: list[ColumnDefinition] = list(), description: str = '', partition_columns: list[str] = list(), historical_mappings: dict[str, dict[str, str]] = dict())
Complete schema definition for a target Unity Catalog table.
Designed by the agent after profiling a data source.
to_create_table_sql
¶
Generate a CREATE TABLE statement for this schema.
Source code in src/versifai/data_agents/models/schema.py
SourceStatus
¶
Bases: str, Enum
Processing phases for a data source.
SourceState
dataclass
¶
SourceState(source_name: str, status: SourceStatus = DISCOVERED, turns_used: int = 0, errors: list[str] = list(), decisions: list[str] = list(), files_processed: int = 0, files_total: int = 0, rows_loaded: int = 0, started_at: datetime | None = None, completed_at: datetime | None = None, table_name: str = '')
Tracks the processing state of a single data source.
AgentState
dataclass
¶
AgentState(sources: dict[str, SourceState] = dict(), current_source: str | None = None, total_turns: int = 0, started_at: datetime | None = None, phase: str = 'discovery')
Global state of the agent across all data sources.
Persisted so the agent can resume if interrupted.