PySpark

pyspark


What is Spark?

Apache Spark is an open-source big data processing framework that enables distributed processing of large datasets across a cluster of computers.

Apache Spark is designed to be fast and efficient, with a focus on in-memory data processing. It supports various programming languages, including Java, Scala, Python, and R, and provides a set of high-level APIs for batch processing, real-time stream processing, and machine learning.

Spark uses a distributed computing model to process data, which means that data is processed in parallel across a cluster of machines. This makes it highly scalable and capable of handling large volumes of data. Spark’s processing engine consists of a core engine that provides basic data processing functionality and a set of libraries, including Spark SQL, Spark Streaming, MLlib (machine learning library), and GraphX (graph processing library).

What is PySpark?

PySpark is a Spark API that allows you to interact with Spark through the Python shell. PySpark is a particularly flexible tool for exploratory big data analysis because it integrates with the rest of the Python data analysis ecosystem, including pandas , NumPy , and Matplotlib.

Spark basically written in Scala and later on due to its industry adaptation it’s API PySpark released for Python using Py4J. Py4J is a Java library that is integrated within PySpark and allows python to dynamically interface with JVM objects, hence to run PySpark you also need Java to be installed along with Python, and Apache Spark.

Reasons to choose PySpark over pandas

  • PySpark is a general-purpose, in-memory, distributed processing engine that allows you to process data efficiently in a distributed fashion.

  • Applications running on PySpark are 100x faster than traditional systems.

  • Using PySpark we can process data from Hadoop HDFS, AWS S3, and many file systems.

  • PySpark is also used to process real-time data using Streaming and Kafka.

  • PySpark natively has machine learning and graph libraries.

  • PySpark provides a variety of methods for data processing along with methods to convert PySpark DataFrame to Pandas Dataframe and vice-versa.

Let us look closer and compare the running time for a basic column transformation in Pandas to that in Spark.

from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import time

# Create the Session
spark = SparkSession.builder\
    .master("local")\
    .appName("PySpark Tutorial")\
    .getOrCreate()
# Generate a random dataframe with 10^7 rows
data = {'col1':np.random.randint(1,10,10000000),'col2':np.random.randint(1,10,10000000)}
df = pd.DataFrame(data)
# Creating our pandas dataframe
pandasDF = df.copy()
startPandas = time.time()
pandasDF['col2'] = pandasDF['col2'].map(lambda x: x - (x*10/100))
endPandas = time.time()
print(endPandas - startPandas)
2.008172035217285
# Creating our spark dataframe
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled","true")
sparkDF = spark.createDataFrame(df)
startSpark = time.time()
sparkRDD = sparkDF.rdd.map(lambda x: [x[0],x[1] - (x[1]*10/100)])
endSpark = time.time()
print(endSpark - startSpark)
0.013193845748901367

We see that we achieve more than 100x faster computation by using a Spark object. But we didn’t actually use a Spark dataframe. Instead, we made use of Resilient Distributed Datasets(RDDs).

What are Resilient Distributed Datasets?

Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster. RDDs are fault-tolerant because they automatically recover from node failures by recomputing lost partitions. They also support various types of data persistence, which allows them to be stored in memory or on disk for faster processing in subsequent computations.

RDDs are usually used when we want to perform some low-level transformations on the dataset.

Some common RDD methods available in PySpark are

  • map() - This method applies a function to each element in the RDD and returns a new RDD with the transformed data.

  • filter() - This method creates a new RDD that includes only the elements that satisfy a given condition.

  • reduce() - This method aggregates the elements in the RDD using a specified function.

  • flatMap() - This method is similar to map(), but it returns a flattened output instead of a nested output.

  • groupByKey() - This method groups the elements in the RDD by their keys and returns a new RDD of (key, value) pairs, where the values are grouped together.

  • sortByKey() - This method sorts the RDD by its keys.

  • join() - This method performs an inner join between two RDDs based on their keys.

  • union() - This method combines two RDDs into a single RDD.

  • distinct() - This method removes duplicate elements from the RDD.

  • take() - This method returns the first n elements of the RDD.

  • toDF() - This method converts the RDD and returns a DataFrame structure with the specified column names. Note - When converting an RDD into a DataFrame, you might run into "Out-of-memory" errors. So it is advised to

What are Dataframes?

There is another data structure provided by the spark library, called DataFrames. Spark Dataframes are the distributed collection of the data points, but here, the data is organized into the named columns. They allow developers to debug the code during the runtime which was not allowed with the RDDs. One can assume them to be analogous to Pandas Dataframes, but faster.

Here’s an article that compares runtime of Pandas DataFrame to Spark DataFrame. It concludes that, "I found that as the size of the data increased, notably beyond 1 millions rows and 1000 columns, the Spark DataFrame can outperform the Pandas DataFrame." towardsdatascience.com/parallelize-pandas-dataframe-computations-w-spark-dataframe-bba4c924487c