In recent years, Spark has become one of the most common tools for distributed big data processing and machine learning, largely replacing the older Hadoop MapReduce framework, due to its flexibility, speed, and ease of use. It's largely written in Scala but has a wide variety of bindings to other languages, including Java, R, and, most importantly, Python. These bindings allow you to combine the expressiveness and familiarity of Python, as a querying/scripting language, with the raw power and efficiency of Scala, as a compiled language.
As mentioned above, Spark is a distributed processing framework, which means that it delegates work to a number of worker nodes, or "executors". Each executor is an independent processor; for example, an individual machine in a data processing center. The executors perform the calculations they are assigned and then transmit the results back to a single master node, the "driver". Spark handles the lower-level details, such as which part to assign to which executor, memory allocation and watching for timeouts, so you can focus instead on the high-level objective.
Often, in a real-world scenario, you will be physically separated from both the driver and executors (which together form the "cluster") and will connect to them over the Internet or a VPN. For the purposes of this tutorial, though, it's convenient that Spark can also simulate a cluster with your local machine; each core will then be a single executor. In general, all communication with the cluster is performed through a SparkSession
object. Therefore, our first step is to create one.
Note: This guide assumes you have already successfully installed PySpark.
If you're working from the command line, the command pyspark
should instantiate a Python shell with a SparkSession
already created and assigned to the variable spark
. On the other hand, if you prefer working from within a Jupyter notebook, you can run the code below to create a SparkSession
that lives in your notebook.
1from pyspark.sql import SparkSession
2
3spark = SparkSession.builder.getOrCreate()
Great! Now we have an active SparkSession
. The next step is to actually get some data to work with.
In Spark, data is represented by DataFrame
objects, which can be thought of as a 2D structure following the tidy data format. This means that each row represents an observation and each column a variable; accordingly, columns must have names and types. Let's create a DataFrame
from a Python object as a concrete example to flesh this idea out, and display it with the show
method:
1df = spark.createDataFrame([['Japan', 'Asia', 126.8],
2 ['Portugal', 'Europe',10.31],
3 ['Germany', 'Europe', 82.79],
4 ['China', 'Asia', 1386.0],
5 ['Pakistan', 'Asia', 197.0],
6 ['Brazil', 'South America', 209.3],
7 ['Spain', 'Europe', 46.72]],
8 ['name', 'continent', 'population'])
9
10df.show()
We can see that this is a DataFrame
containing information about countries. Each row represents a country, storing its name, which continent it's on, and its population. On the other hand, each column represents information of the same type: for example, the Name
column contains the names of all the entries in the data. Understanding this structure is crucial because you can perform different operations on rows versus columns; accordingly, storing your data in the right format is key to wrangling data stored as Spark DataFrames
.
Creating a Spark DataFrame
from a local Python object is fine if your data can fit solely on a single machine, whether it's a laptop or the cluster's driver. However, when working with big data, often you'll have a data warehouse, or some other form of storage, that you'll want to load from. Spark provides a rich set of APIs to do so; consider these two examples:
1# Reading from a Parquet archive stored at path/to/my_parquet_data
2
3parquet_df = spark.read.parquet('path/to/my_parquet_data')
4
5# Reading from a Hive table mytable stored in the database mydatabase
6
7spark.sql('use mydatabase')
8hive_df = spark.read.table('mytable')
In addition, Spark supports formats more suited to local data, such as CSV and JSON, and can be easily extended to read from other types of data storage, including Apache Cassandra. For now, though, our toy dataset should be enough!
Let's try singling out the country names in our DataFrame
with the select
method, like this: df.select('name')
. You'll probably get something like this:
1DataFrame[name: string]
What's this? Why is it that, instead of the list of names we expected, we got only a description of a DataFrame
?
To answer that question, we need to look at Spark's evaluation strategy. One of the things that make Spark efficient is lazy evaluation: it defers calculations until their results are actually needed. When you're working with huge amounts of data, operations can be extremely expensive. If the outcome of a computation isn't actually used, then, working it out would be a waste of computing power and memory. The call to select
is just such an example of an operation that can be deferred; in the Spark lingo, it's termed a transformation. Most operations on columns and rows are transformations.
This is why, above, we had to call df.show()
instead of just print(df)
or something similar. When show
is called, the output must be displayed to the user and that, necessarily, requires performing all the deferred calculations, or there would be nothing to show! show
, and commands like it, force the evaluation of all necessary transformations; accordingly, they are called actions. Other examples of actions include:
collect
method, which transfers data from the executors to the drivercount
method, which counts the number of rows in the DataFrame
Apart from not computing extraneous results, lazy evaluation allows Spark to optimize your queries as a whole, which will likely be more efficient, since it has access to information about the whole process, instead of only one step at a time. However, the drawback is that errors will also only pop up when actions are evaluated, apart from some easy-to-catch ones that can be identified at transformation time. Therefore, when experimenting, it's good to periodically call show
to check that your data is what you expect.
Let's try this out really quickly by getting the average population of those countries with the following command:
1df.groupby().mean('population').show()
That should give you the following result (or something close):
1+------------------+
2| avg(population)|
3+------------------+
4|294.13142857142856|
5+------------------+
Congratulations! You've just performed your first operation with Spark!
To conclude, note that show
is something like a print
statement. It's fine for inspecting results, but sometimes you need actual values; for example, the mean population we just calculated as a double
. To do that, just replace show
above with collect
, which will return a list
of Row
objects.
Nothing stops you from running collect
on your original data; you can do it here with df.collect()
. Here, that will work because df
is very small. However, in a situation with hundreds of millions of rows, attempting to pull all that data to your driver will likely just crash it, so be warned!