Featured resource
2026 Tech Forecast
2026 Tech Forecast

1,500+ tech insiders, business leaders, and Pluralsight Authors share their predictions on what’s shifting fastest and how to stay ahead.

Download the forecast
  • Lab
    • Libraries: If you want this lab, consider one of these libraries.
    • AI
Labs

Building a Data Pipeline Repair Agent

Data pipelines fail. A bad row, a malformed date or a missing field is enough to break a nightly job and leave the warehouse half-loaded. In this hands-on Code Lab you build a Python repair agent that turns a broken pipeline into a self-healing one. You will inspect a deliberately failing ETL job that loads a CSV into a local SQLite warehouse, parse its structured failure logs, design the system and user prompts that ask gpt-4o-mini to diagnose each bad row, call the LLM through the OpenAI SDK with a JSON-only response format, apply the model's proposed fixes back to the input data and re-run the pipeline until the warehouse is fully populated. By the end of the lab you have a working repair_agent module that demonstrates a practical, end-to-end pattern for using LLMs to recover failed data pipeline runs.

Lab platform
Lab Info
Level
Intermediate
Last updated
May 27, 2026
Duration
45m

Contact sales

By clicking submit, you agree to our Privacy Policy and Terms of Use, and consent to receive marketing emails from Pluralsight.
Table of Contents
  1. Challenge

    Introduction

    Data pipelines fail. A bad row, a malformed date or a missing field is enough to stop a nightly job and leave the warehouse half-loaded. In this lab you will build a Python repair agent that turns a broken pipeline into a self-healing one. The agent reads the pipeline's failure log, asks an LLM to diagnose each bad row, applies the proposed fixes to the input file and re-runs the pipeline until the warehouse is fully populated.

    You join Globomantics as a data engineer on the team that owns the nightly sales pipeline. The pipeline reads data/raw_sales.csv, validates each row and loads the result into a SQLite warehouse at data/warehouse.db. A handful of rows fail every night and your job is to make the pipeline finish on its own.

    The lab workspace is pre-configured so you can focus on the agent.

    • pipeline/ holds the ETL job that reads the CSV, validates and casts each row and writes the result into SQLite. You do not edit this code.
    • data/raw_sales.csv is the input file. It contains five clean rows and four poisoned rows that will fail validation.
    • data/warehouse.db is an empty SQLite database with the raw_sales schema already created.
    • logs/ is where the pipeline writes its JSON-lines failure log.
    • agent/ is the package where you write every file in this lab.

    Each task can be validated individually by clicking on the Validate button next to it.

    If you get stuck, every task has a Task Solution section you can expand to reveal the answer. This can be found under the FEEDBACK/CHECKS section of every Task.

    info> The solutions/ folder at the top of the workspace contains the final state of every task.

    A failed task will list one or more failed checks under its Checks section, each with a specific message describing what went wrong.

    The starting point of the lab is a directory named "data-pipeline". The current directory of the built-in Terminal will be set to the data-pipeline/ directory. Packages openai and python-dotenv are already installed with pip3. You can use the Terminal to run the Python scripts.

    Click on the Next step arrow to get started.

  2. Challenge

    Set Up the Environment

    The lab ships an LLM endpoint reachable through the OpenAI Python SDK. Look at the top menu of this lab; your lab API key is displayed there. Copy it and paste it inside of the data-pipeline/.env file by replacing the paste-your-key-here:

    LAB_API_KEY=paste-your-key-here
    

    Your job in the first task is to expose that key from a small agent/config.py module so every other agent file can import one constant instead of touching os.environ directly. Reading the key in one place keeps the rest of the agent code clean. A real data warehouse is a managed system like Snowflake or BigQuery. For this lab the warehouse is a single SQLite file at data/warehouse.db. SQLite is a self-contained relational database that lives in one file on disk. It speaks standard SQL and ships with Python through the sqlite3 module in the standard library. Treat it the same way you would treat a remote warehouse. The pipeline writes rows into the raw_sales table and your agent only ever reads what is there to check its work.

    The raw_sales table is already created with this schema.

    CREATE TABLE raw_sales (
        sale_id    INTEGER PRIMARY KEY,
        product_id TEXT NOT NULL,
        sale_date  TEXT NOT NULL,
        quantity   INTEGER NOT NULL,
        amount     REAL NOT NULL
    );
    

    In the next task you trigger the broken pipeline and watch it produce a failure log against this warehouse. The agent can reach the LLM through LAB_API_KEY and the broken pipeline reliably produces a failure log. The pipeline runs but does not heal itself yet.

  3. Challenge

    Parse the Pipeline Logs

    The pipeline writes one JSON object per failed row into logs/pipeline.log. Each failure entry has the same shape.

    {
      "run_started": "2026-05-19T12:00:00",
      "sale_id": 4,
      "error_type": "missing_field",
      "error_message": "Column 'amount' is empty.",
      "row": {"sale_id": "4", "product_id": "P-003", "sale_date": "2026-05-11", "quantity": "1", "amount": ""}
    }
    

    The file also contains one summary line at the end of every run with "summary": true. Your parser must ignore that line and return only the failure entries.

    The agent will pass these entries to the LLM, so it needs to work with them as Python objects rather than raw JSON strings. The cleanest way is a small dataclass that captures the four fields the rest of the agent will read. Defining the shape once at the top of the module also stops the rest of the code from depending on dictionary key names.

    Here is a tiny example of the dataclass pattern from an unrelated domain:

    from dataclasses import dataclass
    
    @dataclass
    class WeatherReading:
        city: str
        temperature_c: float
        recorded_at: str
    ``` The agent can now read its own input. Each failed row is a `FailureRecord` your code can pass around.
  4. Challenge

    Build the Diagnosis Prompt

    A prompt is the text you send to the model. For this lab the prompt has two parts. The system prompt tells the model what role to play and what shape its answer must take. The user prompt carries the specific failure you want diagnosed. Keeping both in their own module means you can edit the wording without touching any runtime code.

    The agent will ask the model to respond with JSON only, in a fixed shape:

    {
      "sale_id": 4,
      "diagnosis": "The amount column was empty.",
      "fix": {"amount": "24.99"}
    }
    

    The fix object maps column names to corrected string values. The fix applier in Step 6 will take that mapping and overwrite those columns in the bad row.

    A tiny example of the two-prompt pattern on an unrelated domain.

    SYSTEM_PROMPT = "You translate English sentences to French. Reply with the translation only."
    
    
    def build_user_prompt(sentence: str) -> str:
        return f"Translate this sentence: {sentence}"
    ``` The agent now has both halves of the message it will send to the LLM: a fixed system prompt and a per-record user prompt. In the next step you will wire up the actual API call.
  5. Challenge

    Call the LLM to Diagnose Failures

    The lab ships an OpenAI-compatible endpoint that serves the gpt-4o-mini model. You reach it through the openai Python SDK by constructing a client with two keyword arguments: the API key from agent.config.LAB_API_KEY and the deployment URL.

    A tiny example on an unrelated module. This helper summarises today's weather for a city through a separate, made-up deployment. You will not write code like this in your agent; it is shown only to make the SDK shape concrete:

    from openai import OpenAI
    
    client = OpenAI(
        api_key="sk-example",
        base_url="https://example.com/openai/deployments/some-other-model",
    )
    
    def weather_summary(city: str) -> str:
        response = client.chat.completions.create(
            model="some-other-model",
            messages=[
                {"role": "system", "content": "You summarise the weather in one sentence."},
                {"role": "user", "content": f"Summarise today's weather in {city}."},
            ],
        )
        return response.choices[0].message.content
    

    Two things to notice. First, the client is built once at module scope. Second, every call passes model= and a list of messages. You apply the same pattern to the pipeline failure case in the next two tasks, with two extras: the real base_url of this lab's gpt-4o-mini deployment and a response_format={"type": "json_object"} argument that tells the model to return JSON. The agent can now ask the model for a diagnosis and receive a parsed Python dict. In the next step you take the model's proposed fix and apply it back to the input CSV, then re-run the pipeline.

  6. Challenge

    Apply Fixes and Re-run the Pipeline

    Diagnosing a failure is only useful if you can act on it. In this step you write two functions that close the loop:

    1. apply_fix rewrites the broken row in data/raw_sales.csv with the corrected values returned by diagnose.
    2. rerun_pipeline shells out to the existing pipeline so the agent can verify the fix actually works end to end.

    Both functions live in a new module agent/fixer.py. Keeping the fix logic separate from the LLM logic means each piece can be tested in isolation: apply_fix is pure Python with no network call, and rerun_pipeline just launches a subprocess.

    A short pattern for in-place CSV rewrites:

    import csv
    from pathlib import Path
    
    def overwrite_column(csv_path: str, target_id: int, column: str, value: str) -> None:
        path = Path(csv_path)
        with path.open() as f:
            rows = list(csv.DictReader(f))
        fieldnames = rows[0].keys()
        for row in rows:
            if int(row["id"]) == target_id:
                row[column] = value
        with path.open("w", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(rows)
    

    You will generalise this idea to apply a whole fix dict. The agent now has all four primitives: parse_log, prompts, diagnose and the fix/rerun pair. In the final step you stitch them together into a single entry point.

  7. Challenge

    Orchestrate the End-to-end Repair Loop

    You now have every primitive the agent needs:

    | Module | Responsibility | | --- | --- | | agent.log_parser | Read the latest pipeline log into structured FailureRecord objects. | | agent.prompts | Build the system and user prompts for one failure. | | agent.llm | Send a FailureRecord to the LLM and parse the JSON response. | | agent.fixer | Edit the broken row in data/raw_sales.csv and rerun the pipeline. |

    Finally, you need to stitch these together into a single command-line entry point: python -m agent.repair. This command should parse the most recent log, ask the LLM for a diagnosis for each failure, apply each returned fix, then rerun the pipeline. After one successful end-to-end run the warehouse should contain all nine rows.

    The control flow looks like this:

    read latest log -> for each FailureRecord:
        diagnosis = diagnose(record)
        apply_fix(csv_path, record.sale_id, diagnosis["fix"])
    rerun_pipeline()
    

    So now you need an orchestrator. Congratulations! You built a complete data-pipeline repair agent in seven steps:

    1. Inspect the broken pipeline and confirm the four failure modes.
    2. Survey the failures and record what a fix looks like for each.
    3. Write the log parser that turns one run's failures into structured records.
    4. Design the system and user prompts the agent will send to the LLM.
    5. Build the OpenAI client and the diagnose function that returns a structured fix.
    6. Implement the CSV fix applier and the subprocess-based pipeline rerun.
    7. Wire everything into a single python3 -m agent.repair entry point and watched the warehouse fill up.

    The same shape — log → structured records → LLM diagnosis → targeted edit → rerun → verify - works for any pipeline that emits machine-readable failure logs. Where this lab edits a CSV, you might just as well restart a worker, rewrite a config value or open a pull request.

About the author

Mateo is currently a full stack web developer working for a company that has clients from Europe and North America. His niche in programming was mostly web oriented, while freelancing, working on small startups and companies that require his services. Go(lang), Elixir, Ruby and C are his favorite languages and also the ones he’s mostly working with other then PHP in day to day work. He has a big passion for learning and teaching what he knows the best. His big interests recently have been the fields of DevOps, Linux, functional programming and machine learning.

Real skill practice before real-world application

Hands-on Labs are real environments created by industry experts to help you learn. These environments help you gain knowledge and experience, practice without compromising your system, test without risk, destroy without fear, and let you learn from your mistakes. Hands-on Labs: practice your skills before delivering in the real world.

Learn by doing

Engage hands-on with the tools and technologies you’re learning. You pick the skill, we provide the credentials and environment.

Follow your guide

All labs have detailed instructions and objectives, guiding you through the learning process and ensuring you understand every step.

Turn time into mastery

On average, you retain 75% more of your learning if you take time to practice. Hands-on labs set you up for success to make those skills stick.

Get started with Pluralsight