Author avatar

Marcus Lim

An Introduction to Tidy Data with Spark DataFrames

Marcus Lim

  • Jul 11, 2019
  • 8 Min read
  • 68 Views
  • Jul 11, 2019
  • 8 Min read
  • 68 Views
Data
PySpark

Introduction

In recent years, Spark has become one of the most common tools for distributed big data processing and machine learning, largely replacing the older Hadoop MapReduce framework, due to its flexibility, speed, and ease of use. It's largely written in Scala but has a wide variety of bindings to other languages, including Java, R, and, most importantly, Python. These bindings allow you to combine the expressiveness and familiarity of Python, as a querying/scripting language, with the raw power and efficiency of Scala, as a compiled language.

As mentioned above, Spark is a distributed processing framework, which means that it delegates work to a number of worker nodes, or "executors". Each executor is an independent processor; for example, an individual machine in a data processing center. The executors perform the calculations they are assigned and then transmit the results back to a single master node, the "driver". Spark handles the lower-level details, such as which part to assign to which executor, memory allocation and watching for timeouts, so you can focus instead on the high-level objective.

Often, in a real-world scenario, you will be physically separated from both the driver and executors (which together form the "cluster") and will connect to them over the Internet or a VPN. For the purposes of this tutorial, though, it's convenient that Spark can also simulate a cluster with your local machine; each core will then be a single executor. In general, all communication with the cluster is performed through a SparkSession object. Therefore, our first step is to create one.

Note: This guide assumes you have already successfully installed PySpark.

If you're working from the command line, the command pyspark should instantiate a Python shell with a SparkSession already created and assigned to the variable spark. On the other hand, if you prefer working from within a Jupyter notebook, you can run the code below to create a SparkSession that lives in your notebook.

1
2
3
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
python

Great! Now we have an active SparkSession. The next step is to actually get some data to work with.

Creating DataFrames

In Spark, data is represented by DataFrame objects, which can be thought of as a 2D structure following the tidy data format. This means that each row represents an observation and each column a variable; accordingly, columns must have names and types. Let's create a DataFrame from a Python object as a concrete example to flesh this idea out, and display it with the show method:

1
2
3
4
5
6
7
8
9
10
df = spark.createDataFrame([['Japan', 'Asia', 126.8],
							['Portugal', 'Europe',10.31],
							['Germany', 'Europe', 82.79],
							['China', 'Asia', 1386.0],
							['Pakistan', 'Asia', 197.0],
							['Brazil', 'South America', 209.3],
							['Spain', 'Europe', 46.72]],
						   ['name', 'continent', 'population'])

df.show()
python

We can see that this is a DataFrame containing information about countries. Each row represents a country, storing its name, which continent it's on, and its population. On the other hand, each column represents information of the same type: for example, the Name column contains the names of all the entries in the data. Understanding this structure is crucial because you can perform different operations on rows versus columns; accordingly, storing your data in the right format is key to wrangling data stored as Spark DataFrames.

Creating a Spark DataFrame from a local Python object is fine if your data can fit solely on a single machine, whether it's a laptop or the cluster's driver. However, when working with big data, often you'll have a data warehouse, or some other form of storage, that you'll want to load from. Spark provides a rich set of APIs to do so; consider these two examples:

1
2
3
4
5
6
7
8
# Reading from a Parquet archive stored at path/to/my_parquet_data

parquet_df = spark.read.parquet('path/to/my_parquet_data')

# Reading from a Hive table mytable stored in the database mydatabase

spark.sql('use mydatabase')
hive_df = spark.read.table('mytable')
python

In addition, Spark supports formats more suited to local data, such as CSV and JSON, and can be easily extended to read from other types of data storage, including Apache Cassandra. For now, though, our toy dataset should be enough!

Transformations and Actions

Let's try singling out the country names in our DataFrame with the select method, like this: df.select('name'). You'll probably get something like this:

1
DataFrame[name: string]
python

What's this? Why is it that, instead of the list of names we expected, we got only a description of a DataFrame?

To answer that question, we need to look at Spark's evaluation strategy. One of the things that make Spark efficient is lazy evaluation: it defers calculations until their results are actually needed. When you're working with huge amounts of data, operations can be extremely expensive. If the outcome of a computation isn't actually used, then, working it out would be a waste of computing power and memory. The call to select is just such an example of an operation that can be deferred; in the Spark lingo, it's termed a transformation. Most operations on columns and rows are transformations.

This is why, above, we had to call df.show() instead of just print(df) or something similar. When show is called, the output must be displayed to the user and that, necessarily, requires performing all the deferred calculations, or there would be nothing to show! show, and commands like it, force the evaluation of all necessary transformations; accordingly, they are called actions. Other examples of actions include:

  • The collect method, which transfers data from the executors to the driver
  • The count method, which counts the number of rows in the DataFrame
  • Writing data to disk

Apart from not computing extraneous results, lazy evaluation allows Spark to optimize your queries as a whole, which will likely be more efficient, since it has access to information about the whole process, instead of only one step at a time. However, the drawback is that errors will also only pop up when actions are evaluated, apart from some easy-to-catch ones that can be identified at transformation time. Therefore, when experimenting, it's good to periodically call show to check that your data is what you expect.

Let's try this out really quickly by getting the average population of those countries with the following command:

1
df.groupby().mean('population').show()
python

That should give you the following result (or something close):

1
2
3
4
5
+------------------+
|   avg(population)|
+------------------+
|294.13142857142856|
+------------------+

Conclusion

Congratulations! You've just performed your first operation with Spark!

To conclude, note that show is something like a print statement. It's fine for inspecting results, but sometimes you need actual values; for example, the mean population we just calculated as a double. To do that, just replace show above with collect, which will return a list of Row objects.

Nothing stops you from running collect on your original data; you can do it here with df.collect(). Here, that will work because df is very small. However, in a situation with hundreds of millions of rows, attempting to pull all that data to your driver will likely just crash it, so be warned!

0