In recent years, Spark has become one of the most common tools for distributed big data processing and machine learning, largely replacing the older Hadoop MapReduce framework, due to its flexibility, speed, and ease of use. It's largely written in Scala but has a wide variety of bindings to other languages, including Java, R, and, most importantly, Python. These bindings allow you to combine the expressiveness and familiarity of Python, as a querying/scripting language, with the raw power and efficiency of Scala, as a compiled language.
As mentioned above, Spark is a distributed processing framework, which means that it delegates work to a number of worker nodes, or "executors". Each executor is an independent processor; for example, an individual machine in a data processing center. The executors perform the calculations they are assigned and then transmit the results back to a single master node, the "driver". Spark handles the lower-level details, such as which part to assign to which executor, memory allocation and watching for timeouts, so you can focus instead on the high-level objective.
Often, in a real-world scenario, you will be physically separated from both the driver and executors (which together form the "cluster") and will connect to them over the Internet or a VPN. For the purposes of this tutorial, though, it's convenient that Spark can also simulate a cluster with your local machine; each core will then be a single executor. In general, all communication with the cluster is performed through a
SparkSession object. Therefore, our first step is to create one.
Note: This guide assumes you have already successfully installed PySpark.
If you're working from the command line, the command
pyspark should instantiate a Python shell with a
SparkSession already created and assigned to the variable
spark. On the other hand, if you prefer working from within a Jupyter notebook, you can run the code below to create a
SparkSession that lives in your notebook.
1 2 3
from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate()
Great! Now we have an active
SparkSession. The next step is to actually get some data to work with.
In Spark, data is represented by
DataFrame objects, which can be thought of as a 2D structure following the tidy data format. This means that each row represents an observation and each column a variable; accordingly, columns must have names and types. Let's create a
DataFrame from a Python object as a concrete example to flesh this idea out, and display it with the
1 2 3 4 5 6 7 8 9 10
df = spark.createDataFrame([['Japan', 'Asia', 126.8], ['Portugal', 'Europe',10.31], ['Germany', 'Europe', 82.79], ['China', 'Asia', 1386.0], ['Pakistan', 'Asia', 197.0], ['Brazil', 'South America', 209.3], ['Spain', 'Europe', 46.72]], ['name', 'continent', 'population']) df.show()
We can see that this is a
DataFrame containing information about countries. Each row represents a country, storing its name, which continent it's on, and its population. On the other hand, each column represents information of the same type: for example, the
Name column contains the names of all the entries in the data. Understanding this structure is crucial because you can perform different operations on rows versus columns; accordingly, storing your data in the right format is key to wrangling data stored as Spark
Creating a Spark
DataFrame from a local Python object is fine if your data can fit solely on a single machine, whether it's a laptop or the cluster's driver. However, when working with big data, often you'll have a data warehouse, or some other form of storage, that you'll want to load from. Spark provides a rich set of APIs to do so; consider these two examples:
1 2 3 4 5 6 7 8
# Reading from a Parquet archive stored at path/to/my_parquet_data parquet_df = spark.read.parquet('path/to/my_parquet_data') # Reading from a Hive table mytable stored in the database mydatabase spark.sql('use mydatabase') hive_df = spark.read.table('mytable')
In addition, Spark supports formats more suited to local data, such as CSV and JSON, and can be easily extended to read from other types of data storage, including Apache Cassandra. For now, though, our toy dataset should be enough!
Let's try singling out the country names in our
DataFrame with the
select method, like this:
df.select('name'). You'll probably get something like this:
What's this? Why is it that, instead of the list of names we expected, we got only a description of a
To answer that question, we need to look at Spark's evaluation strategy. One of the things that make Spark efficient is lazy evaluation: it defers calculations until their results are actually needed. When you're working with huge amounts of data, operations can be extremely expensive. If the outcome of a computation isn't actually used, then, working it out would be a waste of computing power and memory. The call to
select is just such an example of an operation that can be deferred; in the Spark lingo, it's termed a transformation. Most operations on columns and rows are transformations.
This is why, above, we had to call
df.show() instead of just
print(df) or something similar. When
show is called, the output must be displayed to the user and that, necessarily, requires performing all the deferred calculations, or there would be nothing to show!
show, and commands like it, force the evaluation of all necessary transformations; accordingly, they are called actions. Other examples of actions include:
collectmethod, which transfers data from the executors to the driver
countmethod, which counts the number of rows in the
Apart from not computing extraneous results, lazy evaluation allows Spark to optimize your queries as a whole, which will likely be more efficient, since it has access to information about the whole process, instead of only one step at a time. However, the drawback is that errors will also only pop up when actions are evaluated, apart from some easy-to-catch ones that can be identified at transformation time. Therefore, when experimenting, it's good to periodically call
show to check that your data is what you expect.
Let's try this out really quickly by getting the average population of those countries with the following command:
That should give you the following result (or something close):
1 2 3 4 5
+------------------+ | avg(population)| +------------------+ |294.13142857142856| +------------------+
Congratulations! You've just performed your first operation with Spark!
To conclude, note that
show is something like a
double. To do that, just replace
show above with
collect, which will return a
Nothing stops you from running
collect on your original data; you can do it here with
df.collect(). Here, that will work because
df is very small. However, in a situation with hundreds of millions of rows, attempting to pull all that data to your driver will likely just crash it, so be warned!