- Lab
-
Libraries: If you want this lab, consider one of these libraries.
- AI
LLMs for Building Data Pipelines
In this Code Lab, you'll use LLMs to generate Python code for building a data pipeline. You'll configure LLM API access, generate ingestion and transformation scripts with prompts, validate outputs against schemas, implement error handling, and optimize performance. When finished, you'll have a working data pipeline built with LLM-generated code.
Lab Info
Table of Contents
-
Challenge
Introduction
Welcome to the LLMs for Building Data Pipelines Code Lab. In this hands-on lab, you use a Large Language Model (LLM) to power a customer analytics pipeline — configuring the LLM client, prompting the model to describe ingestion and transformation steps, validating the model's output against a target schema, and adding the retry, logging, and caching layers a real pipeline needs to stay reliable.
### PrerequisitesAbout the tools and concepts
An LLM API client is a thin wrapper around the model provider's HTTP endpoint. You send a list of messages, the model returns a completion, and your pipeline code extracts the text. Centralizing the client in one module means every prompt in the pipeline reuses the same authentication, endpoint, and model configuration.
A prompt template is a reusable string that mixes fixed instructions with a slot for runtime data. The fixed part tells the model what to produce; the slot carries the row or path the model needs to do the job. Templates keep prompts auditable and make it easy to swap models later.
A target schema is the exact set of column names and Python types the warehouse expects. Validating every transformed record against the schema before loading catches the cases where the model's output drifts from the contract — a missing column, a string where a number belongs — instead of failing deeper in the warehouse.
Retry logic is the standard way to handle the non-deterministic side of LLM calls. When a single call returns malformed JSON or hits a transient error, the loop tries again up to a fixed number of times before raising a hard failure.
Response caching keeps the pipeline from paying the token cost twice for the same input. Hashing the prompt and storing the parsed response in a dictionary means a repeated row returns instantly without a network round-trip.
Before starting this lab, you should be comfortable with:
- Python basics: functions, dictionaries, imports, and file I/O
- Data pipelines: what ingestion, transformation, validation, and load mean
- LLM prompting: sending a prompt to a model and reading the response
- SQL basics: tables, columns, and inserts
- CSV and JSON: reading tabular data and parsing JSON strings
The lab environment is ready to use. Run
python3 -m pip show openaifrom inside theworkspacefolder at any time to confirm the OpenAI SDK is installed.The Scenario
You are a data engineer at CarvedRock building a customer analytics pipeline. Raw customer data arrives daily in CSV format and needs transformation before it lands in the warehouse. Writing transformation code by hand is slow and brittle, so your job is to use an LLM to generate the pipeline logic instead: configure the LLM client, prompt the model for ingestion and transformation steps, validate the output, and add the retry, logging, and caching layers the pipeline needs to run unattended.
The Application Structure
Run the full pipeline at any point with:Key files in the lab environment
- `workspace/pipeline/llm_client.py` — the LLM client module that needs to carry the model name, base URL, and API key so every pipeline step reuses one configured client - `workspace/pipeline/database.py` — the SQLite setup that needs to expose the target schema as a single tuple - `workspace/pipeline/ingestion.py` — the ingestion step that asks the LLM to describe a pandas read plan for a given CSV - `workspace/pipeline/transformer.py` — the transformation step that prompts the LLM per row, retries on failure, and caches repeated prompts - `workspace/pipeline/validator.py` — the schema guard that checks every transformed record before it reaches the database - `workspace/pipeline/logger_setup.py` — the shared logger every pipeline stage writes to - `workspace/run_pipeline.py` — the end-to-end runner that wires every step together - `workspace/data/customers_raw.csv` — the sample raw dataset - `workspace/data/customers_extra.csv` — a second dataset for retesting the pipeline Complete the tasks in order. Each task builds on the previous one.python3 run_pipeline.pyHowever, you may get an error in the Terminal until all tasks have been completed.
info> If you get stuck, you can refer to the provided solution code for each task, available in the
solutionsfolder. -
Challenge
Configuring LLM Access and Data Infrastructure
Before any prompt goes out, the pipeline needs one configured LLM client every module shares. The lab environment exposes
gpt-4o-minithrough an OpenAI-compatible endpoint and shows an API Key at the top of the lab pane that you export into the shell once. Centralizing the model name, base URL, and key inllm_client.pykeeps the rest of the pipeline free of credential plumbing — every step just importscall_llmand sends a prompt. ### Defining the Target Schema in One PlaceThe warehouse table has a fixed set of columns:
customer_id,first_name,last_name,email,signup_year, andtotal_spent. The transformer needs that list to write the prompt, the validator needs it to check incoming records, and the loader needs it to map fields onto the SQL insert. Storing the column names once asTARGET_COLUMNSindatabase.pymeans every module reads from the same source of truth, and adding a column later changes one line instead of three. -
Challenge
Generating Pipeline Code with LLM Prompts
Asking the LLM for an Ingestion Plan
The first thing the pipeline does with the LLM is hand it the CSV path and ask for a read plan. The prompt is short on purpose — the model only needs the path and a clear instruction to list the columns and types it would read. Keeping the prompt template in
build_ingestion_promptmeans the operator can review or tweak the wording without touching the function that actually calls the model.Asking the LLM to Transform Each Row
Once the raw CSV is in memory, every row gets handed to the LLM with a transformation prompt. The prompt tells the model exactly what to change — split
full_nameintofirst_nameandlast_name, renameemail_addresstoemail, extract the year fromsignup_dateassignup_year— and asks for the result as a JSON object. Returning JSON makes the response easy to parse, and listing every required change in the prompt keeps the model on contract. -
Challenge
Validating Outputs and Handling Failures
Treating Every Transformed Record as Untrusted
LLM output is non-deterministic by design. Even a well-written prompt occasionally produces a record missing a field or with a number where a string belongs, and a missing column three steps later in the warehouse is a much worse failure mode than a clear rejection at the validator. The guard runs two cheap checks — every expected column is present, and every value matches the expected Python type — and rejects the record outright on either failure.
Retrying Failed LLM Calls
A single failed LLM call should not stop the pipeline. The model occasionally returns malformed JSON or hits a transient network error, and the cheapest defense is a small retry loop — three attempts is the standard starting point. The loop catches both the
JSONDecodeErrorfrom a bad parse and any other exception from the SDK; only after the third attempt fails does the function raise aRuntimeErrorso the caller sees a clear final failure. -
Challenge
Logging and Optimizing the Pipeline
Putting Every Stage on One Timeline
A pipeline that runs unattended needs a single log stream the operator can scan after the fact. The shared logger in
logger_setup.pyis already configured with a timestamped formatter. The only piece left is thelog_stagehelper that turns a stage name and message into one consistent line. Prefixing every entry with[INGEST],[TRANSFORM],[VALIDATE], or[LOAD]keeps the timeline readable at a glance.Caching Repeated LLM Responses
The transformer often sees the same raw row twice — a daily reload of the same CSV, a retry after a downstream failure, a test run that replays yesterday's data. Calling the LLM again for an identical input pays the token cost twice for the same answer. Hashing the prompt with
hashlib.md5and storing the parsed record in_response_cachemeans the second call returns instantly without touching the network. -
Challenge
Run the Full Pipeline
Now that every task is complete, run the end-to-end pipeline against both sample datasets to see the full workflow in action.
-
Confirm the API key is still exported in your shell:
echo $LAB_API_KEYIf the output is empty, copy the key from the top of the lab pane and re-export it:
export LAB_API_KEY=<paste-key-here> -
Start the pipeline against the main dataset from the Workspace directory:
python3 run_pipeline.py -
Confirm the log stream prints
[INIT],[INGEST],[TRANSFORM],[VALIDATE],[LOAD], and[DONE]entries in order, with one[LOAD]line per customer. -
Open the SQLite database and confirm the rows landed:
sqlite3 analytics.db "SELECT * FROM customers;" -
Run the pipeline a second time against
data/customers_extra.csvby editing the path at the bottom of the filerun_pipeline.py, then rerunning the command. Watch the[LOAD]entries for the three new customers. -
Open the SQLite database again and confirm the warehouse now holds eight rows instead of the original five:
sqlite3 analytics.db "SELECT * FROM customers;" -
Run the original CSV one more time and watch how fast the transformation stage finishes — the cache returns every record without a single LLM call.
Expected Result: Every part of the workflow you implemented is visible in the running pipeline — the LLM client serves every prompt, the ingestion and transformation prompts produce structured output, the validator rejects malformed records, the retry loop absorbs transient failures, the logger puts every stage on one timeline, and the cache makes the second run essentially free.
-
-
Challenge
Conclusion
Congratulations on completing the LLMs for Building Data Pipelines lab! You have used a Large Language Model to generate the working parts of a customer analytics pipeline: configured the client, written the ingestion and transformation prompts, validated the output against a target schema, and added the retry, logging, and caching layers that make the pipeline production-ready.
What You Have Accomplished
- Configured the LLM API Client: Pointed a single shared client at the
gpt-4o-miniendpoint and threaded the lab API key through it. - Set the Target Schema Tuple: Defined
TARGET_COLUMNSonce so the transformer, validator, and loader read from one source of truth. - Built the Ingestion Prompt and Called the LLM: Asked the model to describe a pandas read plan for the source CSV.
- Built the Transformation Prompt and Called the LLM: Prompted the model to split, rename, and reshape each raw row into a JSON record.
- Validated Transformed Records Against the Schema: Added a guard that rejects records missing a column or carrying a mistyped value before they reach the warehouse.
- Added Retry Logic for Failed Transformations: Wrapped the LLM call in a three-attempt loop that raises a clear final failure instead of silently dropping a row.
- Added Pipeline Stage Logging: Put every ingestion, transformation, validation, and load event on one timestamped timeline.
- Cached LLM Responses by Prompt Hash: Stored every parsed record under the MD5 of its prompt so repeated rows return without a network call.
Key Takeaways
- A single LLM client module is the simplest way to keep credentials and model selection out of the rest of the pipeline.
- Prompt templates make LLM-generated code auditable. The operator can read every instruction the model received without opening a notebook.
- LLM output is non-deterministic, so every parsed record belongs behind a schema guard before it reaches the warehouse.
- Retry loops and response caches turn a fragile single-call workflow into a pipeline that survives transient failures and pays the token cost only once per unique input.
Experiment Before You Go
You still have time in the lab environment. Try these explorations:
- Swap
gpt-4o-minifor a different deployment by changingMODEL_NAMEandBASE_URLand confirm the rest of the pipeline still runs. - Add a fourth check to
is_valid_recordthat rejects records wheretotal_spentis negative. - Extend
_response_cachewith a maximum size and an eviction rule so long-running pipelines don't grow the cache forever. - Add a
[CACHE_HIT]log entry intransform_rowso the timeline shows when a row came back from the cache instead of the LLM.
- Configured the LLM API Client: Pointed a single shared client at the
About the author
Real skill practice before real-world application
Hands-on Labs are real environments created by industry experts to help you learn. These environments help you gain knowledge and experience, practice without compromising your system, test without risk, destroy without fear, and let you learn from your mistakes. Hands-on Labs: practice your skills before delivering in the real world.
Learn by doing
Engage hands-on with the tools and technologies you’re learning. You pick the skill, we provide the credentials and environment.
Follow your guide
All labs have detailed instructions and objectives, guiding you through the learning process and ensuring you understand every step.
Turn time into mastery
On average, you retain 75% more of your learning if you take time to practice. Hands-on labs set you up for success to make those skills stick.