- Lab
-
Libraries: If you want this lab, consider one of these libraries.
- AI
Building a Data Pipeline Repair Agent
Data pipelines fail. A bad row, a malformed date or a missing field is enough to break a nightly job and leave the warehouse half-loaded. In this hands-on Code Lab you build a Python repair agent that turns a broken pipeline into a self-healing one. You will inspect a deliberately failing ETL job that loads a CSV into a local SQLite warehouse, parse its structured failure logs, design the system and user prompts that ask gpt-4o-mini to diagnose each bad row, call the LLM through the OpenAI SDK with a JSON-only response format, apply the model's proposed fixes back to the input data and re-run the pipeline until the warehouse is fully populated. By the end of the lab you have a working repair_agent module that demonstrates a practical, end-to-end pattern for using LLMs to recover failed data pipeline runs.
Lab Info
Table of Contents
-
Challenge
Introduction
Data pipelines fail. A bad row, a malformed date or a missing field is enough to stop a nightly job and leave the warehouse half-loaded. In this lab you will build a Python repair agent that turns a broken pipeline into a self-healing one. The agent reads the pipeline's failure log, asks an LLM to diagnose each bad row, applies the proposed fixes to the input file and re-runs the pipeline until the warehouse is fully populated.
You join Globomantics as a data engineer on the team that owns the nightly sales pipeline. The pipeline reads
data/raw_sales.csv, validates each row and loads the result into a SQLite warehouse atdata/warehouse.db. A handful of rows fail every night and your job is to make the pipeline finish on its own.The lab workspace is pre-configured so you can focus on the agent.
pipeline/holds the ETL job that reads the CSV, validates and casts each row and writes the result into SQLite. You do not edit this code.data/raw_sales.csvis the input file. It contains five clean rows and four poisoned rows that will fail validation.data/warehouse.dbis an empty SQLite database with theraw_salesschema already created.logs/is where the pipeline writes its JSON-lines failure log.agent/is the package where you write every file in this lab.
Each task can be validated individually by clicking on the Validate button next to it.
If you get stuck, every task has a Task Solution section you can expand to reveal the answer. This can be found under the FEEDBACK/CHECKS section of every Task.
info> The
solutions/folder at the top of the workspace contains the final state of every task.A failed task will list one or more failed checks under its Checks section, each with a specific message describing what went wrong.
The starting point of the lab is a directory named "
data-pipeline". The current directory of the built-in Terminal will be set to thedata-pipeline/directory. Packagesopenaiandpython-dotenvare already installed withpip3. You can use the Terminal to run the Python scripts.Click on the Next step arrow to get started.
-
Challenge
Set Up the Environment
The lab ships an LLM endpoint reachable through the OpenAI Python SDK. Look at the top menu of this lab; your lab API key is displayed there. Copy it and paste it inside of the
data-pipeline/.envfile by replacing thepaste-your-key-here:LAB_API_KEY=paste-your-key-hereYour job in the first task is to expose that key from a small
agent/config.pymodule so every other agent file can import one constant instead of touchingos.environdirectly. Reading the key in one place keeps the rest of the agent code clean. A real data warehouse is a managed system like Snowflake or BigQuery. For this lab the warehouse is a single SQLite file atdata/warehouse.db. SQLite is a self-contained relational database that lives in one file on disk. It speaks standard SQL and ships with Python through thesqlite3module in the standard library. Treat it the same way you would treat a remote warehouse. The pipeline writes rows into theraw_salestable and your agent only ever reads what is there to check its work.The
raw_salestable is already created with this schema.CREATE TABLE raw_sales ( sale_id INTEGER PRIMARY KEY, product_id TEXT NOT NULL, sale_date TEXT NOT NULL, quantity INTEGER NOT NULL, amount REAL NOT NULL );In the next task you trigger the broken pipeline and watch it produce a failure log against this warehouse. The agent can reach the LLM through
LAB_API_KEYand the broken pipeline reliably produces a failure log. The pipeline runs but does not heal itself yet. -
Challenge
Parse the Pipeline Logs
The pipeline writes one JSON object per failed row into
logs/pipeline.log. Each failure entry has the same shape.{ "run_started": "2026-05-19T12:00:00", "sale_id": 4, "error_type": "missing_field", "error_message": "Column 'amount' is empty.", "row": {"sale_id": "4", "product_id": "P-003", "sale_date": "2026-05-11", "quantity": "1", "amount": ""} }The file also contains one summary line at the end of every run with
"summary": true. Your parser must ignore that line and return only the failure entries.The agent will pass these entries to the LLM, so it needs to work with them as Python objects rather than raw JSON strings. The cleanest way is a small dataclass that captures the four fields the rest of the agent will read. Defining the shape once at the top of the module also stops the rest of the code from depending on dictionary key names.
Here is a tiny example of the dataclass pattern from an unrelated domain:
from dataclasses import dataclass @dataclass class WeatherReading: city: str temperature_c: float recorded_at: str ``` The agent can now read its own input. Each failed row is a `FailureRecord` your code can pass around. -
Challenge
Build the Diagnosis Prompt
A prompt is the text you send to the model. For this lab the prompt has two parts. The system prompt tells the model what role to play and what shape its answer must take. The user prompt carries the specific failure you want diagnosed. Keeping both in their own module means you can edit the wording without touching any runtime code.
The agent will ask the model to respond with JSON only, in a fixed shape:
{ "sale_id": 4, "diagnosis": "The amount column was empty.", "fix": {"amount": "24.99"} }The
fixobject maps column names to corrected string values. The fix applier in Step 6 will take that mapping and overwrite those columns in the bad row.A tiny example of the two-prompt pattern on an unrelated domain.
SYSTEM_PROMPT = "You translate English sentences to French. Reply with the translation only." def build_user_prompt(sentence: str) -> str: return f"Translate this sentence: {sentence}" ``` The agent now has both halves of the message it will send to the LLM: a fixed system prompt and a per-record user prompt. In the next step you will wire up the actual API call. -
Challenge
Call the LLM to Diagnose Failures
The lab ships an OpenAI-compatible endpoint that serves the
gpt-4o-minimodel. You reach it through theopenaiPython SDK by constructing a client with two keyword arguments: the API key fromagent.config.LAB_API_KEYand the deployment URL.A tiny example on an unrelated module. This helper summarises today's weather for a city through a separate, made-up deployment. You will not write code like this in your agent; it is shown only to make the SDK shape concrete:
from openai import OpenAI client = OpenAI( api_key="sk-example", base_url="https://example.com/openai/deployments/some-other-model", ) def weather_summary(city: str) -> str: response = client.chat.completions.create( model="some-other-model", messages=[ {"role": "system", "content": "You summarise the weather in one sentence."}, {"role": "user", "content": f"Summarise today's weather in {city}."}, ], ) return response.choices[0].message.contentTwo things to notice. First, the client is built once at module scope. Second, every call passes
model=and a list ofmessages. You apply the same pattern to the pipeline failure case in the next two tasks, with two extras: the realbase_urlof this lab'sgpt-4o-minideployment and aresponse_format={"type": "json_object"}argument that tells the model to return JSON. The agent can now ask the model for a diagnosis and receive a parsed Python dict. In the next step you take the model's proposed fix and apply it back to the input CSV, then re-run the pipeline. -
Challenge
Apply Fixes and Re-run the Pipeline
Diagnosing a failure is only useful if you can act on it. In this step you write two functions that close the loop:
apply_fixrewrites the broken row indata/raw_sales.csvwith the corrected values returned bydiagnose.rerun_pipelineshells out to the existing pipeline so the agent can verify the fix actually works end to end.
Both functions live in a new module
agent/fixer.py. Keeping the fix logic separate from the LLM logic means each piece can be tested in isolation:apply_fixis pure Python with no network call, andrerun_pipelinejust launches a subprocess.A short pattern for in-place CSV rewrites:
import csv from pathlib import Path def overwrite_column(csv_path: str, target_id: int, column: str, value: str) -> None: path = Path(csv_path) with path.open() as f: rows = list(csv.DictReader(f)) fieldnames = rows[0].keys() for row in rows: if int(row["id"]) == target_id: row[column] = value with path.open("w", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows)You will generalise this idea to apply a whole
fixdict. The agent now has all four primitives:parse_log,prompts,diagnoseand the fix/rerun pair. In the final step you stitch them together into a single entry point. -
Challenge
Orchestrate the End-to-end Repair Loop
You now have every primitive the agent needs:
| Module | Responsibility | | --- | --- | |
agent.log_parser| Read the latest pipeline log into structuredFailureRecordobjects. | |agent.prompts| Build the system and user prompts for one failure. | |agent.llm| Send aFailureRecordto the LLM and parse the JSON response. | |agent.fixer| Edit the broken row indata/raw_sales.csvand rerun the pipeline. |Finally, you need to stitch these together into a single command-line entry point:
python -m agent.repair. This command should parse the most recent log, ask the LLM for a diagnosis for each failure, apply each returned fix, then rerun the pipeline. After one successful end-to-end run the warehouse should contain all nine rows.The control flow looks like this:
read latest log -> for each FailureRecord: diagnosis = diagnose(record) apply_fix(csv_path, record.sale_id, diagnosis["fix"]) rerun_pipeline()So now you need an orchestrator. Congratulations! You built a complete data-pipeline repair agent in seven steps:
- Inspect the broken pipeline and confirm the four failure modes.
- Survey the failures and record what a fix looks like for each.
- Write the log parser that turns one run's failures into structured records.
- Design the system and user prompts the agent will send to the LLM.
- Build the OpenAI client and the
diagnosefunction that returns a structured fix. - Implement the CSV fix applier and the subprocess-based pipeline rerun.
- Wire everything into a single
python3 -m agent.repairentry point and watched the warehouse fill up.
The same shape — log → structured records → LLM diagnosis → targeted edit → rerun → verify - works for any pipeline that emits machine-readable failure logs. Where this lab edits a CSV, you might just as well restart a worker, rewrite a config value or open a pull request.
About the author
Real skill practice before real-world application
Hands-on Labs are real environments created by industry experts to help you learn. These environments help you gain knowledge and experience, practice without compromising your system, test without risk, destroy without fear, and let you learn from your mistakes. Hands-on Labs: practice your skills before delivering in the real world.
Learn by doing
Engage hands-on with the tools and technologies you’re learning. You pick the skill, we provide the credentials and environment.
Follow your guide
All labs have detailed instructions and objectives, guiding you through the learning process and ensuring you understand every step.
Turn time into mastery
On average, you retain 75% more of your learning if you take time to practice. Hands-on labs set you up for success to make those skills stick.