- Lab
-
Libraries: If you want this lab, consider one of these libraries.
- Data
Build and Deploy an ETL Pipeline with Python
This hands-on lab provides a step-by-step approach to building an ETL (Extract, Transform, Load) pipeline using Python. Participants will learn how to efficiently extract data from various sources, apply transformations for data cleaning and processing, and load the structured data into target systems for storage and analytics. By completing this lab, learners will gain practical experience in building an end-to-end ETL pipeline, preparing them to handle real-world data engineering tasks efficiently.
Lab Info
Table of Contents
-
Challenge
Introduction to Build and Deploy an ETL Pipeline with Python
Introduction to Build and Deploy an ETL Pipeline with Python
In this lab, you'll explore various techniques to build Python ETL pipelines. By leveraging Python for ETL pipelines, you can create scripted logic that is both straightforward to comprehend and compose, while tapping into the extensive range of Python modules to meet your data processing requirements.
Learning Objectives
- Understand the 3 phases of ETL pipelines.
- Implement techniques to extract data from critical data ETL Python libraries such as
Pandas,Requests, andSQLAlchemy. - Transform your data while its in memory and then load the data to a destination database.
To get started with this lab click the
>button to proceed to the next step! -
Challenge
Extracting Data
Extract Phase
The extract phase is the first step in an ETL pipeline, where data is collected from various sources for further processing. During this phase, raw data is retrieved from flat files, databases, APIs, or other systems, often in different formats and structures. The goal is to gather all the necessary information efficiently and accurately, ensuring that downstream processes have the required input.
In this task you will learn how to:
- Read various raw data files into Pandas
- Read data into Pandas from a SQL database
- Read data into Pandas from REST APIs
To get started open the
extract.pyfile.A SQLAlchemy engine has already been created with a connection to the MySQL database at the top of the script.
Task 1.0.1
Extracting Data from Flat Files
Pandas supports reading flat files such as the following:
.csv.xlsx.json
When Pandas reads the file, the contents are written to a dataframe in memory. Pandas dataframes are similar to SQL tables and are a great way to hold data while you clean and process it.
For this task:
- Read the
data/ example.csvfile intodf1with thepd.read_csvfunction. - Read the
data/ example.xlsxfile intodf2with thepd.read_excelfunction. - Read the
data/ example.jsonfile intodf3with thepd.read_jsonfunction. ## Task 1.0.2
Data Loading with Dask
Dask is a powerful parallel processing framework. Dask allows you not only to process your workloads faster, but it also allows you to work with more data in memory than your machine has memory. Its great if you have really big Pandas dataframe and are doing lots of processing on the data in memory.
Let's do a simple exercise to see how you can load several files into a Dask dataframe. Note that
dask.dataframehas been imported as the aliasdd.- Observe the
dask_data_files in thedatafolder. - Assign the
df4variable the value from the following code snippet:
dd.read_csv("dask_data_*.csv")Note the wildcard syntax in the file name (
*). This is specifying that any file with the name prefixdask_data_and an extension of.csvwill be read into the dataframe. 3. Optionally used theget_memory_sizeandget_shapefunctions to metadata about the dask dataframe.Task
1.03 Extract Data from Database Tables
Pandas can also read data directly from database tables using
pd.read_sql()by using SQLAlchemy connectors. These SQLAlchemy connections are refered to as an engines.In this challenge you will learning how to connect to a database and use a raw SQL query to return results to a Pandas dataframe. This challenge uses a locally hosted mysql database, the connection credentials have already been provided at the top of the
extract.pyscript.Read a database query into a Pandas dataframe:
- Create a SQLAlchemy engine and point it to the
data/sales.dbSQLite database - Assign the following SQL query to the
queryvariablequery = "SELECT TABLE_NAME FROM TABLES" - Assign the two required parameters to the
pd.read_sqlfunction, which arequeryandcon.conwill be theenginedefined at the top of the script that connects to MySQL. - Use
pd.read_sqlfunction to read the data intodf5
Task 1.0.4
Extracting REST API Data with Requests
The Requests libray allows your to read data from a web endpoint and store that data as JSON. Request is helpful for building ETL pipelines because it can talk to different services via REST api endpoints. Lets use Requests to get data from a locally running REST api:
- Click the
+icon next to theTerminaland selectNew tab. - On the
New tabmodal, selectTerminal. - In the terminal execute the following command
python3 scripts/app.py. This command starts the flask app which hosts multiple REST api endpoints. - Specify the following
urlvariable for the REST api endpoint, which ishttp://localhost:5000/random-user. - To make a
getrequest, and print the response from the REST API, use the following code:r = requests.get(url) print(r.json()) ``` ## Task 1.0.5
### Authenticating with Requests
Most REST apis will require authentication, there are many forms of REST api authentication, in this challenge you will use a Bearer token. To do this you will need to assign an additional
headersvariable to the request which has the token.Add a bearer token to the requests
GETrequest and authenticate to the/protectedendpoint.- In the
headersdictionary variable, add the keyAuthorization. - In the
headersdictionary variable add the valueBearer mysecrettoken123. - Assign the variable
responseto the results of the following request get:response = requests.get("http://localhost:5000/protected", headers=headers)
(Optional)
Loading data with AIOHTTP
In this task you will compare the execution time of running similar workloads with a
non-async.py(Requests) andasync.py(AIOhttp) libraries.- In the
scipts/directory, open both theasync.pyandnon-async.pypython files. - Observe the difference in structure. Notice that the
async.pyfile has the keywordasyncin front of themain()function and that theaiohttp.ClientSession()has theasync withkeyword. - Run both scripts with the following commands and observe the difference in execution time. Feel free to change the
num_iterationsto greater values to experiment.
To run the
async.pyfile:python3 scripts async.pyTo run the
non-async.pyfile:python3 scripts non-async.py -
Challenge
Transforming Data
Transform Phase
The Transform phase is where raw data is refined, structured, and enriched to meet business needs before being loaded into a target system.
This stage involves various data activities including:
- Data cleansing
- Deduplication
- Standardization
- Aggregation
- Applying business logic
- Joining
Transformations can range from simple conversions, such as changing date formats, to complex operations like joining multiple datasets, or calculating key performance indicators.
In this task you will learn how to:
- Apply different data conversion and translation to Pandas data.
- Query, filter, and aggregate Pandas data.
- Join multiple Pandas dataframes together with
pd.merge.
To get started open the
transform.pyfile. ## Task 2.0.1Convert Pandas Column Data Types
Remember that Pandas will infer data types by default. This may be fine for most cases but sometimes you will want to define data types explictly or convert them.
-
Observe the
datadictionary. Notice that theZipCodecolumn is an integer. -
Convert the
ZipCodecolumn to a type ofstring. ## Task 2.0.2
Convert Datetime Columns in Pandas
Next, we will convert the
Birthdatecolumn.-
Observe the current format of the
Birthdatecolumn. You can add aprint(df)statement in the to see the values of the columns. -
The format of the column is in a
yyyy-mm-ddformat. We would like to convert the format tomm-dd-yyyyformat. -
Assign the converted value back to the
Birthdatecolumn. ## Task 2.0.3
Using
mapto Replace Dataframe ValuesNext we will learn about the
df.replace()function. This function allows you to specify a dictionary of key value pairs and replace existing dataframe values.- Observe the
replace_mapdictionary, in this dictionary the value being replaced is specified first, followed by the replacement version. - Write a
df.replace()statement that uses thedfcolumnCityand replaces its values with the ones specified inreplace_map. - Remeber to assign the value of the
df.replace()command back to thedfvariable. ## Task 2.0.4
Using Regex to Replace Dirty Data
You can also replace specific dataframe row values by specifying regex patterns. Regex is a string search pattern language that can identify specific strings in text. You can use regex to find and replace strings within your dataframes and replace or extract those values.
For example the following code snippet shows how to replace all
@@@@values within a dataframe with####.df['Column'] = df['Column'].str.replace(r'\@\@\@\@', '####', regex=True)- Observe the
CharacterSeqcolumn in the dataframe. - Write a regex pattern that identifies the 4 occurences of the
****character. - Use the
df = df['Column'].str.replace()command with the regex pattern to replace the occurances of 4 asterix characters****with underscores____. ## (Optional)
Dataframe Filtering
There are multiple ways that you can filter dataframe rows a couple examples are with the
df.query()function for example:df.query('Name == "Adam"')Or with self referencing synax:
df[df.Name == "Adam"]Filtered dataframe can also be assigned to other dataframes. This is useful for generating reports or sub datasets that only include specific values. For example:
test = df[df['Name'] == "Adam"] test ``` ## Task 2.0.5 ### Creating Custom Calculated Columns You can write your own functions to translate data in dataframes based on existing values. You can use a lambda function to pass an existing data value to the function similar to how you call a python function. 1. Observe the `cost_of_living_adjust` function, this function takes an `original` value and a `inc_percent` or incremental percentage. 2. Create a new column called `Salary_COL` and assign the values of the `cost_of_living_adjust` function based on the existing `Salary` column and a `5` percent increase. ## (Optional) ### Dataframe Aggregating You can aggregate the values of your dataframe easily with the `.agg` function. `.agg` accepts key words such as: - `count` - `min` - `max` - `mean` - `sum` To apply an aggregate to your dataframe you can specify the column name and the aggregate(s) you want to apply, for example: ```python df.agg({'Salary' : ['count','min', 'max', 'mean', 'sum']})You can also use
.groupbysyntax in combination with.agg:df.groupby("TShirtSize").agg({"Salary" : ["mean"]})Task 2.0.6
Joining Data in Pandas
Pandas offers the ability to join multiple dataframe together, similar to how you would in a relational database with SQL.
Pandas uses the
.merge()function to facilitate this process. When useing.merge()you specify the original dataframe and the joining dataframe, theonwhich is the column that is being matched andhowwhich is the type of join.For example, the following example joins
df1anddf2together based on the matchingIDcolumns that are present in both dataframes. These examples also show the different types of joins available when using.merge():# Merge using different join types inner_merge = pd.merge(df1, df2, on='ID', how='inner') left_merge = pd.merge(df1, df2, on='ID', how='left') right_merge = pd.merge(df1, df2, on='ID', how='right') outer_merge = pd.merge(df1, df2, on='ID', how='outer')For this task:
- Observe both dataframes (
dfanddf2) - Use the
UserIdfield as theonparameter. - Use
leftas thehowparameter. - Assign the results of the merged data from to a dataframe named
df_merged.
-
Challenge
Loading Data
Loading Data
Data writing in Pandas refers to the process of exporting or saving data from a Pandas DataFrame to various file formats or databases. Pandas provides several functions to write data, including
to_csv(),to_excel(),to_sql(), andto_json(), allowing users to easily export data to formats like CSV, Excel, SQL databases, or JSON.These functions give you flexibility in saving data in a format that suits your needs, whether for sharing, archiving, or further processing. With options for customization such as delimiter specification, index handling, and database connection parameters, Pandas ensures a seamless experience in saving data to various destinations efficiently.
To get started open the
load.pyfile. ## Task 3.0.1Loading Data with a SQLAlchemy Connection
-
Create a dictionary with the following python code:
data = { 'Transaction_ID': ['T1001', 'T1002', 'T1003', 'T1004'], 'Customer_Name': ['Alice Johnson', 'Bob Lee', 'Charlie Kim', 'David Clark'], 'Product': ['Laptop', 'Smartphone', 'Tablet', 'Smartwatch'], 'Quantity': [1, 2, 3, 1], 'Price_Per_Unit': [1200, 800, 400, 200], 'Total_Sale': [1200, 1600, 1200, 200], 'Sale_Date': ['2025-03-20', '2025-03-21', '2025-03-21', '2025-03-22'] } -
Convert the dictionary into a Pandas dataframe variable
df. -
Use the
df.to_csvto write the dataframe contents to aTransactions.csvfile into thedatadirectory. Include the following parameters when using theto_csvfunction:index=False## (Optional)
Validating Data Operations
You can use assert statements to validate that a specific data pipeline has finished successfully. Use assert statements to check that row counts, column types, totals all match the expected results.
For example, to assert that the rows in the file are the same as the number of rows in the table you could use the following code:
import pandas as pd from sqlalchemy import create_engine csv_df = pd.read_csv(csv_file) assert csv_df.shape[0] == 3, "Row counts do not match expected values!" print(f"Row counts match: {csv_df.shape[0]} rows") -
About the author
Real skill practice before real-world application
Hands-on Labs are real environments created by industry experts to help you learn. These environments help you gain knowledge and experience, practice without compromising your system, test without risk, destroy without fear, and let you learn from your mistakes. Hands-on Labs: practice your skills before delivering in the real world.
Learn by doing
Engage hands-on with the tools and technologies you’re learning. You pick the skill, we provide the credentials and environment.
Follow your guide
All labs have detailed instructions and objectives, guiding you through the learning process and ensuring you understand every step.
Turn time into mastery
On average, you retain 75% more of your learning if you take time to practice. Hands-on labs set you up for success to make those skills stick.