Skip to content

Data Engineer Agent

The DataEngineerAgent discovers raw files, profiles data, designs schemas, and transforms and loads data into Databricks Unity Catalog Delta tables. It handles archives, schema drift, encoding issues, and multi-year historical data autonomously.

The DataAnalystAgent works in tandem -it validates every table the engineer builds, checking schema quality, join key integrity, and cross-table joinability.

Pipeline Phases

Phase 1: Discovery

Explores the volume, maps directories, identifies file types and archives, creates a processing plan.

Phase 1.5: Incremental Detection

Queries Unity Catalog for existing tables. Compares loaded files (via source_file_name metadata) against current directory contents. Routes each source to:

  • Full processing -new source, no existing table
  • Incremental append -existing table, new files detected
  • Skip -existing table, no new files

Pass force_full=True to run() to bypass detection and reprocess everything.

Phase 2: Source Processing (per source)

For each source: read documentation, extract archives, profile a reference file, design the target schema, batch-transform all files, write to catalog.

Phase 3: Acceptance Loop

The Data Analyst agent reviews all tables via SQL queries. Issues get sent back to the engineer for fixes. Repeats up to 3 iterations until all tables are accepted.

Batch Transform & Auto-Flush

For sources with many files (e.g., 45 monthly CSVs), transform_and_load supports batch mode -pass a files array to process all files in a single tool call instead of one-at-a-time.

When staged data exceeds 30M rows, the tool auto-flushes to parquet on the staging volume and clears memory. The final write_to_catalog call creates the Delta table from all accumulated parquet batches.

Write Pipeline

write_to_catalog uses a three-tier strategy based on data size:

Data Size Method Path
≤ 2M rows spark.createDataFrame() Direct in-memory
> 2M rows (no flush) Pandas → temp parquet → Spark SQL Bypasses gRPC protobuf limits
Auto-flushed batches Spark SQL reads parquet directory CREATE TABLE ... AS SELECT * FROM parquet.path

Available Tools

Tool Description
explore_volume Browse Unity Volume directories (recursive, max 3 levels)
extract_archive Extract ZIP, GZ, TAR, TGZ archives
scan_for_documentation Find docs (READMEs, data dictionaries, codebooks)
read_documentation Extract text from TXT, MD, HTML, PDF, CSV/Excel data dictionaries
read_file_header Column names, dtypes, sample rows (CSV, Excel, Parquet, SAS, Stata)
profile_data Column-level stats: nulls, cardinality, ranges, FIPS detection
design_schema Validate and register target schema
transform_and_load Transform files against schema with batch mode and auto-flush
write_to_catalog Write staged data to Delta table
execute_sql Execute SQL against Databricks
list_catalog_tables List all tables in the target schema
web_search Fetch external documentation
create_custom_tool Create tools at runtime with pandas
ask_human Pause and ask the operator a question

See the Tool Reference for detailed parameter and return value tables.

Usage

from versifai.data_agents import DataEngineerAgent, ProjectConfig

cfg = ProjectConfig(
    name="Sales Pipeline",
    catalog="analytics",
    schema="sales",
    volume_path="/Volumes/analytics/sales/raw_data",
    # Domain-specific guidance (optional, but recommended)
    grain_detection_guidance=(
        "Account-level: Look for account_id columns\n"
        "Transaction-level: Look for transaction_id or order_id"
    ),
    column_naming_examples=(
        "`AcctID` → `account_id`\n"
        "`TxnAmt` → `transaction_amount_usd`"
    ),
)

agent = DataEngineerAgent(cfg=cfg, dbutils=dbutils)

# Full run
result = agent.run()

# Force reprocess everything
result = agent.run(force_full=True)

Domain Guidance Fields

The agent prompts are domain-agnostic. Use these config fields to inject domain knowledge:

Field Purpose Default
grain_detection_guidance How to detect data grain (account? county? device?) "" (generic fallback)
column_naming_examples Domain-specific column rename patterns "" (generic fallback)

When these fields are empty, the agent uses generic grain detection and naming heuristics.

Data Analyst (Acceptance Testing)

The DataAnalystAgent runs after the engineer finishes loading tables. It performs acceptance testing via SQL queries.

Checks: schema quality, join key integrity (exists, correct type, zero-padded, no nulls), volume and completeness, data quality (null rates, cardinality, ranges), cross-table joinability, metadata columns.

Returns: Structured verdict per table (ACCEPTED, NEEDS_FIX, REJECTED) with specific issues and fixes.

Available Tools: execute_sql, list_catalog_tables, create_custom_tool, ask_human