TDM 20100: Project 13 - PySpark

Project Objectives

In this project, you will learn how to use PySpark (in Python), which is a powerful distributed computing framework by Apache for processing large datasets in a distributed environment.

Learning Objectives
  • Understand the basics of PySpark and its architecture.

  • Learn how to use PySpark to process large datasets.

  • Learn how to use PySpark to perform distributed computing.

Dataset

  • /anvil/projects/tdm/data/whin/weather.parquet

In this project, we use a parquet file for our database. Unlike CSV files, where data is stored row by row, Parquet files are stored in a column by column format. This is significantly more efficient for operations like filtering and aggregating data, as we only need to load the specific columns we need, rather than every row in the dataset. Additionally, Parquet files store data type information internally, so they can be automatically loaded very quickly, compared to other file formats. Because of these advantages, you will often find parquet files used in large scale data processing and analysis, alongside Spark and other distributed computing frameworks.

If AI is used in any cases, such as for debugging, research, etc., we now require that you submit a link to the entire chat history. For example, if you used ChatGPT, there is an “Share” option in the conversation sidebar. Click on “Create Link” and please add the shareable link as a part of your citation.

The project template in the Examples Book now has a “Link to AI Chat History” section; please have this included in all your projects. If you did not use any AI tools, you may write “None”.

We allow using AI for learning purposes; however, all submitted materials (code, comments, and explanations) must all be your own work and in your own words. No content or ideas should be directly applied or copy pasted to your projects. Please refer to the-examples-book.com/projects/fall2025/syllabus#guidance-on-generative-ai. Failing to follow these guidelines is considered as academic dishonesty.

Readings and Resources

We have added Dr. Ward’s videos related to PySpark. Please feel free to watch them for additional practice:

Questions

Question 1 (2 points)

To start, let’s understand how to create a PySpark session and load a dataset into a DataFrame.

The PySpark API uses a builder pattern to create a SparkSession object. If you are not familiar with the builder pattern, it is a design pattern that allows us to create objects with a different interface compared to just calling the constructor. For example, if we had a fruit class that stored name, color, and size, with a traditional constructor it may look like this:

class Fruit:
    def __init__(self, name, color, size):
        self.name = name
        self.color = color
        self.size = size

And we can construct a Fruit object like this:

fruit = Fruit(name="apple", color="red", size="small")

However, with the builder pattern, the class definition may look like this:

class Fruit:
    def __init__(self, name, color, size):
        self.name = name
        self.color = color
        self.size = size
    def withName(self, name):
        self.name = name
        return self
    def withColor(self, color):
        self.color = color
        return self
    def withSize(self, size):
        self.size = size
        return self
    def build(self):
        return Fruit(self.name, self.color, self.size)

This allows us to chain methods together to construct a Fruit object step by step, like below:

fruit = Fruit().withName("apple").withColor("red").withSize("small").build()

Although in the above example, the builder pattern is not particularly useful, it can be quite useful for complex objects with many parameters or optional configurations. Additionally, it is often used to create immutable objects, or objects that cannot be modified after they are created.

Now that you have a basic understanding of the builder pattern, let’s use it to create a SparkSession object. Each SparkSession object needs the following properties:

  • appName: A name for the session

  • config: Configuration options for the session

Additionally, instead of ending the builder pattern with a build() method like in the above example, PySpark uses getOrCreate() to create the session if it does not already exist, or return the existing session if it does. Putting that all together, we can create a SparkSession object like this:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("TDM_S").config("spark.driver.memory", "2g").getOrCreate()
# Initialize a SparkSession called 'TDM_S' with 2GB of memory

Now that we have a SparkSession object, we can use it to load the weather dataset into a DataFrame.

df = spark.read.parquet("/anvil/projects/tdm/data/whin/weather.parquet")

Now that we have a DataFrame, we can use it to perform some basic operations on the data.

df.show(5) # Show the first 5 rows of the DataFrame
Deliverables

1.1 Code to create a SparkSession object and load the weather dataset into a DataFrame.
2.1 Code to show the first 5 rows of the DataFrame.

Question 2 (2 points)

Now, let’s learn about what helps make PySpark so powerful. One of the key features is its ability to essentially "plan" out the operations you want to perform on the data before actually executing them, through a process called lazy evaluation. Essentially, most functions in PySpark (like join(), map(), filter(), groupBy(), etc.) are not evaluated until the final result is needed, which is typically when you are trying to get some information from or display the results through a method like show(), count(), collect(), etc.

By chaining together functions without actually executing them, Spark can construct a Directed Acyclic Graph (DAG) of all the operations you want to perform, and then let the Spark engine optimize the graph for maximum efficiency before it is run. This is one of the key reasons why PySpark is so much faster than traditional pandas operations.

To demonstrate this, let’s use the time library to time how long it takes to perform some basic operations on the DataFrame.

Firstly, let’s print out some basic information about the DataFrame.

You can use the printSchema() method to print out the schema of the DataFrame.

df.printSchema()

This will output a list of all of the columns in the DataFrame, their data types, and if they can be a null value.

Now, let’s start performing some operations on the DataFrame. Below is a small list of operations you can perform on a DataFrame in PySpark, and a description of what they do.

Operation Description Example

'show(n)'

Displays the first n rows of the DataFrame

df.show(5)

'printSchema()'

Prints the schema of the DataFrame

df.printSchema()

'count()'

Returns the number of rows in the DataFrame

df.count()

'collect()'

Returns all of the rows in the DataFrame as a list

df.collect()

'select("column1", "column2")'

Selects specific columns from the DataFrame

df.select("column1", "column2")

'filter(condition)'

Filters the DataFrame based on a condition

df.filter(df["column"] > 10)

'alias("new_name")'

Renames a dataframe. Can also be used to rename a column in aggregation functions

df.alias("new_name")

'agg(func1, func2, …​)'

Aggregates the DataFrame using one or more functions in an aggregation function

df.agg(avg("column1"), max("column2"))

'avg("column")'

Calculates the average of a column in an aggregation function

df.avg("column")

'max("column")'

Calculates the maximum value of a column in an aggregation function

df.max("column")

'min("column")'

Calculates the minimum value of a column in an aggregation function

df.min("column")

'sum("column")'

Calculates the sum of a column in an aggregation function

df.sum("column")

'groupBy("column").agg(func1, func2, …​)'

Groups the DataFrame by a column and aggregates using one or more functions in an aggregation function

df.groupBy("column").agg(avg("column1"), max("column2"))

'join(other_df, condition)'

Joins two DataFrames based on a condition

df.join(other_df, df["column"] == other_df["column"])

'sort("column")'

Sorts the DataFrame by a column

df.sort("column")

You can read the full api reference for the DataFrame object on the Official PySpark Documentation.

Similarly to SQL, we can sort of chain these operations together to perform more complex operations on the DataFrame. For example, if we wanted to group the data by 'station id', aggregating the data by the average 'wind_speed_mph' and the maximum 'temperature', filtering by stations with a temperature greater than 80, and then sorting by the average 'wind_speed_mph' in descending order, we could do the following:

result_df = df.groupBy("station_id").agg(avg("wind_speed_mph").alias("average_wind_speed_mph"), max("temperature").alias("max_temperature")).filter(col("max_temperature") > 80).sort(desc("average_wind_speed_mph"))

Essentially, we can use nearly plain English to describe the operations we want to perform on the DataFrame with the PySpark API.

Now, try the following set of operations. Group the data by the name column, aggregating the data by the average humidity, average solar radiation, and average pressure. Then, filter the data to only include rows with humidity greater than 50 and solar radiation greater than 100, and then sort by the average pressure in descending order. Time the results to see how long it takes to perform these operations. Be sure to time the results with the time library.

import time
start_time = time.time()
result_df = # YOUR CODE HERE
end_time = time.time()
print(f"Time taken: {end_time - start_time} seconds")

You should notice that the time taken to perform this operation is nearly instant. Remember, this is because of PySpark’s lazy evaluation. All that happened here was PySpark interpreting what you want it to do and constructing a plan, but not actually executing it yet.

Now, try printing the first 5 rows of the result DataFrame, be sure to time the results with the time library.

start_time = time.time()
# YOUR CODE HERE TO PRINT THE FIRST 5 ROWS OF THE RESULT DATAFRAME
end_time = time.time()
print(f"Time taken: {end_time - start_time} seconds")

Even though PySpark had to execute all of those instructions and print the results, this time should still be very fast, maybe around a second or faster. This shows how powerful PySpark’s optimizations are.

Deliverables

2.1 Print the schema of the DataFrame.
2.2 Code to perform the desired set of operations on the dataframe and time the results.
2.3 Code to print the first 5 rows of the result DataFrame and time the results.

Question 3 (2 points)

Another great feature of PySpark is its ability to simply execute SQL queries on the dataframe. This is a powerful feature that allows you to use your existing SQL knowledge to perform complex operations on the data. To demonstrate this, let’s write a SQL query to find the average humidity, solar radiation, and pressure of the data.

Firstly, we need to create a temporary view of the DataFrame. We can do this by using the createOrReplaceTempView method.

df.createOrReplaceTempView("weather_view")

Now, we can write a SQL query to find the average humidity, solar radiation, and pressure of the data.

result_df = spark.sql("SELECT AVG(humidity), AVG(solar_radiation), AVG(pressure) FROM weather_view")

Now, we can print the first 5 rows of the result DataFrame.

result_df.show(5)

You may wonder why we would want to use SQL through Spark rather than just using traditional SQLite, Postgres, etc. Again, it is all about PySpark’s lazy evaluation and optimizer. The PySpark engine will actually take your traditional SQL query and optimize it for maximum efficiency, and convert it into a PySpark DataFrame object so that you can use all of the PySpark API to perform operations on the data.

Now that you know how to use SQL through PySpark, try writing a SQL query that does the same thing as the previous set of operations you performed on the DataFrame. Group the data by the name column, aggregating the data by the average humidity, solar radiation, and pressure. Then, filter the data to only include rows with humidity greater than 50 and solar radiation greater than 100, and then sort by the average pressure in descending order. Time the results to see how long it takes to perform these operations. Be sure to time the results with the time library.

start_time = time.time()
result_df = spark.sql("""
YOUR SQL QUERY HERE
""")
end_time = time.time()
print(f"Time taken: {end_time - start_time} seconds")

Now, try printing the first 5 rows of the result DataFrame, be sure to time the results with the time library.

start_time = time.time()
# YOUR CODE HERE TO PRINT THE FIRST 5 ROWS OF THE RESULT DATAFRAME
end_time = time.time()
print(f"Time taken: {end_time - start_time} seconds")

How long did it take to print the first 5 rows of the result DataFrame? How does this compare to the time it took to print the first 5 rows of the result DataFrame using the PySpark API?

Deliverables

3.1 Code to create a temporary view of the DataFrame.
3.2 Code to write a SQL query to perform the desired set of operations on the dataframe and time the results.
3.3 Code to print the first 5 rows of the result DataFrame and time the results.
3.4 How long did it take to print the first 5 rows of the result DataFrame?
3.5 How does the time it took to print the first 5 rows of the result using PySpark SQL compare to the time it took to print the first 5 rows of the result using the traditional PySpark DataFrame API?

Question 4 (2 points)

Now, let’s talk about modifying the DataFrame in PySpark. Maybe we need to add a new column, remove a column, or change the data type of a column. To add a new column, we can use the withColumn method, which will return a new DataFrame with the new column added. This new column will be based on an existing column in the DataFrame. For example, to create a column where every value is 1 more than the value in the old column, we can do the following:

newdf = df.withColumn("new_column", col("old_column") + 1)

To remove a column, we can simply use the drop method, which will return a new DataFrame with the column removed.

newdf = df.drop("column_name")

To change the data type of a column, we can use the cast method in conjunction with the withColumn method. For example, to create a new column where every value is the same as the value in the old column, but as a string, we can do the following:

newdf = df.withColumn("new_column", col("old_column").cast("string"))

Additionally, if we did not want to create a new column, we could just put the same column name as the existing column name, and the value will be the new value.

newdf = df.withColumn("old_column", col("old_column").cast("string"))

Now, let’s take a look at the observation_time column. This column is a string that is the timestamp for when the measurement was taken. We can use the to_timestamp method to convert this column to a timestamp type.

newdf = df.withColumn("observation_time", to_timestamp("observation_time"))

Since it is a timestamp type, we can access specific parts of the timestamp. For example, we can create a new column that stores the year of the timestamp.

newdf = newdf.withColumn("year", year("observation_time"))

Now, please create a new column that stores the month of the timestamp.

newdf = # Your code here

Now, let’s display the columns of the new DataFrame.

newdf.printSchema()

Finally, please group the data by the month, and aggregate the temperature and solar_radiation by the average. Sort the results by the average temperature in descending order, and print the top 5 results. You can do this with the traditional PySpark DataFrame API or PySpark SQL, whichever you prefer.

Deliverables

4.1 Code to convert the observation_time column to a timestamp type.
4.2 Code to create a new column that stores the year of the timestamp.
4.3 Code to create a new column that stores the month of the timestamp.
4.4 Code to group the data by the month, and aggregate the temperature and solar_radiation by the average.
4.5 Code to sort the results by the average temperature in descending order.
4.6 Code to print the top 5 results.

Question 5 (2 points)

Since the PySpark engine is designed to be distributed and optimize computation, it can be confusing to understand how your operations are actually running under the hood. Luckily, PySpark provides a few tools to help you understand what is happening under the hood.

One of the most useful tools is the explain method. This method will print out the entire 'plan' that the optimizer will use to execute the query.

For example, take your code from Question 4, and use the explain method (such as result_df.explain()) before showing the results. You should get a result that looks somewhat like this:

AdaptiveSparkPlan isFinalPlan=false
+- Sort [avg(temperature)#918 DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(avg(temperature)#918 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=581]
      +- HashAggregate(keys=[month#829], functions=[avg(temperature#5), avg(solar_radiation#9)])
         +- Exchange hashpartitioning(month#829, 200), ENSURE_REQUIREMENTS, [plan_id=578]
            +- HashAggregate(keys=[month#829], functions=[partial_avg(temperature#5), partial_avg(solar_radiation#9)])
               +- Project [temperature#5, solar_radiation#9, month(cast(cast(observation_time#4 as timestamp) as date)) AS month#829]
                  +- FileScan parquet [observation_time#4,temperature#5,solar_radiation#9] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/anvil/projects/tdm/data/whin/weather.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<observation_time:string,temperature:double,solar_radiation:double>

The above plan is telling us step by step what we asked Spark to do. As you can see, it is kind of long, and a bit confusing. But, it is a nice tool to help us understand what may be happening under the hood. Now, let’s use the mode argument in the explain method to change the output to be more readable, and print a more optimized plan.

If you use mode="cost", you should have an additional section in your result that says "Optimized Logical Plan". This is a optimized description of the plan that Spark will use, that is more human readable than the Physical Plan that Spark actually executes.

Now, make as big of an SQL or PySpark DataFrame API query as you can, but don’t actually execute it. Use the explain method with mode="cost" to print out the Physical Plan and the Optimized Logical Plan. How many steps did your Physical Plan have, and how many did the Optimized Logical Plan have? Was the Optimized Plan easier to understand than the Physical Plan?

Deliverables

5.1 Code to use the explain method with mode="cost" to print out the Physical Plan and the Optimized Logical Plan for your code from Question 4.
5.2 How many steps did your Physical Plan have, and how many did the Optimized Logical Plan have?
5.3 Create a new SQL or PySpark DataFrame API query that is as complex as you can, but don’t actually execute it.
5.4 Use the explain method with mode="cost" to print out the Physical Plan and the Optimized Logical Plan for your new complex query.
5.5 How many steps did your Physical Plan have, and how many did the Optimized Logical Plan have?
5.6 Was the Optimized Plan easier to understand than the Physical Plan?

Submitting your Work

Once you have completed the questions, save your Jupyter notebook. You can then download the notebook and submit it to Gradescope.

Items to submit
  • firstname_lastname_project13.ipynb

You must double check your .ipynb after submitting it in gradescope. A very common mistake is to assume that your .ipynb file has been rendered properly and contains your code, markdown, and code output even though it may not. Please take the time to double check your work. See here for instructions on how to double check this.

You will not receive full credit if your .ipynb file does not contain all of the information you expect it to, or if it does not render properly in Gradescope. Please ask a TA if you need help with this.