Apache Spark Tutorial - Mac Terminal.
Here we would get familiar with the stated data stack and tackle a Business Intelligence use case.
Apache Spark ecosystem.
Prerequisites
What is Apache Spark?
Apache Spark is a data processing framework that can quickly perform processing tasks on very large data sets, and can also distribute data processing tasks across multiple computers, either on its own or in tandem with other distributed computing tools.
Apache Spark — which is also open source — is a data processing engine for big data sets. Like Hadoop, Spark splits up large tasks across different nodes.
However, it tends to perform faster than Hadoop and it uses random access memory (RAM) to cache and process data instead of a file system. This enables Spark to handle use cases that Hadoop cannot.
Benefits of the Spark framework include the following:
A unified engine that supports SQL queries, streaming data, Machine Learning (ML)) and graph processing
Can be 100x faster than Hadoop for smaller workloads via in-memory processing, disk data storage, etc.
APIs are designed for ease of use when manipulating semi-structured data and transforming data
Introducing: Apache Hadoop
Apache Hadoop is an open-source framework that is used to efficiently store and process large datasets ranging in size from gigabytes to petabytes of data.
Instead of using one large computer to store and process the data, Hadoop allows clustering multiple computers to analyze massive datasets in parallel more quickly.
Here is a list of some terms associated with Hadoop:
Hadoop — an ecosystem of tools for big data storage and data analysis. Hadoop is an older system than Spark but is still used by many companies. The major difference between Spark and Hadoop is how they use memory.
Hadoop writes intermediate results to disk whereas;
Spark tries to keep data in memory whenever possible. This makes Spark faster for many use cases.
Hadoop MapReduce — a system for processing and analyzing large data sets in parallel.
Hadoop YARN — a resource manager that schedules jobs across a cluster. The manager keeps track of what computer resources are available and then assigns those resources to specific tasks.
Hadoop Distributed File System (HDFS) — a big data storage system that splits data into chunks and stores the chunks across a cluster of computers.
As Hadoop matured, other tools were developed to make Hadoop easier to work with. These tools included:
Apache Pig — a SQL-like language that runs on top of Hadoop MapReduce
Apache Hive — another SQL-like interface that runs on top of Hadoop MapReduce
Oftentimes when someone is talking about Hadoop in general terms, they are actually talking about Hadoop MapReduce. However, Hadoop is more than just MapReduce. In the next part of the lesson, you’ll learn more about how MapReduce works.
💡 Spark is a Hadoop enhancement to MapReduce.
How is Spark related to Hadoop?
Spark
, which is the main focus of this course, is another big data framework. Spark
contains libraries for data analysis, machine learning, graph analysis, and streaming live data. Spark
is generally faster than Hadoop. This is because Hadoop writes intermediate results to disk whereas Spark
tries to keep intermediate results in memory whenever possible.
The Hadoop ecosystem includes a distributed file storage system called HDFS (Hadoop Distributed File System). Spark
, on the other hand, does not include a file storage system. You can use Spark
it on top of HDFS but you do not have to. Spark
can read data from other sources as well such as Amazon S3.
💡 Spark creates a Directed Acyclic Graph (
DAG
) to schedule tasks and the orchestration of nodes across the Hadoop cluster.
Streaming Data
Data streaming is a specialized topic in big data. The use case is when you want to store and analyze data in real-time such as Facebook posts or Twitter tweets.
Spark
has a streaming library called Spark Streaming although it is not as popular and fast as some other streaming libraries. Other popular streaming libraries include Storm and Flink.
Stream Processing
Stream processing evolved from the simple mechanism of messages passing between operating system (OS) processes across buses.
They were more popularly known simply as pub/sub systems, as system processes would publish or subscribe to messages in a form of cross-application communication within a single machine.
The Apache Spark Architecture
Apache Spark evolved from the MapReduce programming model. To understand how this evolution occurred, let’s begin by looking at what MapReduce is.
MapReduce was made infamous by Jeffrey Dean and Sanjay Ghemawat (Google) in their seminal 2004 paper, “MapReduce: Simplified Data Processing on Large Clusters.”
This programming model led to the success of Google’s search engine, and the paper helped pave the way for open-source MapReduce frameworks like Hadoop to come into existence.
This enabled companies other than Google to begin working efficiently on large cluster-compute data problems, aka
Big Data
, in standardized ways.
Main Components of Spark Programs
Spark programs, more commonly just called applications, are made possible due to the following three main components:
The driver program: Each application is supervised by the driver program, and the stages of execution (the work) are divided and distributed across the program executors using simple RPC communication for each stage of execution, typically some kind of transformation, along the journey to the application’s desired outcome, which is referred to as an action.
🕯️ The driver simply acts as a delegate for the application runtime.
The cluster manager is hard at work simply keeping tabs on the state of the cluster, checking in on the running applications, and watching the available compute capacity remaining in the cluster. The cluster manager is the cluster coordinator and the delegate in charge of managing and maintaining the state of the cluster, as well as the executors assigned to each active Spark application.
The program executors: act as the compute delegate for tasks assigned by the Spark driver program. Simply speaking, most of your work will be run across the executors and not the driver program itself.
The figure below shows these three main components, and how the Spark application (driver program) interacts with the other distributed components (cluster manager and executors) at runtime.
Fig 1: Main Components of Spark.
The Spark Ecosystem
Apache Spark, the largest open-source project in data processing, is the only processing framework that combines data and artificial intelligence (AI). This enables users to perform large-scale data transformations and analyses, and then run state-of-the-art machine learning (ML) and AI algorithms.
The Spark ecosystem consists of five primary modules:
Spark Core: Underlying execution engine that schedules and dispatches tasks and coordinates input and output (I/O) operations.
Spark SQL: Gathers information about structured data to enable users to optimize structured data processing.
Spark Streaming and Structured Streaming: Both add stream processing capabilities. Spark Streaming takes data from different streaming sources and divides it into micro-batches for a continuous stream. Structured Streaming, built on Spark SQL, reduces latency and simplifies programming.
Machine Learning Library (MLlib): **A set of machine learning algorithms for scalability plus tools for feature selection and building ML pipelines. The primary API for MLlib is DataFrames, which provides uniformity across different programming languages like Java, Scala, and Python.
GraphX: User-friendly computation engine that enables interactive building, modification, and analysis of scalable, graph-structured data.
Spark Use Cases
Spark is most effective for scenarios that involve the following:
Dealing with chains of parallel operations by using iterative algorithms.
Achieving quick results with in-memory computations.
Analyzing stream data analysis in real-time.
Graph-parallel processing to model data.
All ML applications.
🕯️ Apache Spark is the data engineer’s Swiss Army knife.
Getting Up and Running with Spark
The goal of this next section is simple. You’ll see how to install Spark locally and then go through an introductory exercise to wrap up the chapter.
Installing Spark
Installing Spark is a breeze. We’ll go through the steps for getting up and running on Spark 3 now.
The only requirements necessary for running it are as follows:
The Java Development Kit (JDK)
Scala 2.12 (the default for Spark 3)
If you have Java and Scala installed, you can skip ahead to downloading Spark.
Downloading Java JDK
You can download the official JDK by going to Oracle’s website at
and select the build for your environment. Or if you are running on a Mac laptop with Homebrew, you can simply run the following command from a terminal window:
Mac OS
brew install openjdk@11
You should now have Java installed on your laptop. Now let’s make sure you have Scala installed as well.
Downloading Scala
If you are going through these exercises on a Mac laptop, you can use Homebrew and run the following command to get this requirement out of the way:
brew install scala@2.12
Okay. Now that you have the prerequisites out of the way, it is time to download and install Spark.
Downloading Spark
You are ready to download the Spark 3 release. Head to the download page from the official Spark website:
Fig 2: Spark & Hadoop.
This will download the compressed Spark release to your laptop. Okay, so far so good.
Now you just need to decompress the package and move Spark into a convenient home location. Open your favorite terminal application (I’m on a Mac laptop using the default Terminal app) and execute the following commands;
1. mkdir ~/sources 2. cd ~/sources 3. mv ~/Downloads/spark-3.3.0-bin-hadoop3.tgz ~/sources 4. tar -xvzf spark-3.3.0-bin-hadoop3.tgz 5. mv spark-3.3.0-bin-hadoop3 spark-3.3.0 6. rm spark-3.3.0-bin-hadoop3.tgz
If you followed these steps correctly, you should now have Spark installed in the ~/sources/spark-3.3.0/
directory.
Taking Spark For A Ride
Okay so this step requires surgical precision:
For Mac users, head to your 🏠 Home folder → right click on
.bash_profile
. Open with text edit as seen below:
Fig 3: Bash Profile Hint.
Now, simply add the following lines to your
.bash_profile
{this depends on the location of your java installation}=
{and if you followed the Spark installation guide properly 🙂}
## JAVA HOME export JAVA_HOME=/usr/local/Cellar/openjdk@11/11.0.15/libexec/openjdk.jdk/Contents/Home/ ## SPARK HOME export SPARK_HOME=~/sources/spark-3.3.0/
Now that Spark is installed, you should run a simple test to make sure that things are working correctly. Open a new terminal window and execute the command:
$SPARK_HOME/bin/spark-shell
Given that everything worked out as planned, you should now see some debug information in the terminal window, including an ASCII version of the Spark logo, the runtime Java and Scala version information, and the release version of Spark running. Here is an example from my machine:
Fig 4: Successful Installation.
Great, now we have Apache Spark up and running, we can now tackle a business-related problem.
Business Intelligence Use Case
Say you are tasked with
writing a routine that needs to
count all unique users
who visited your company’s web store, on a particular day i.e •Find the daily active users (or daily unique users)
.You also calculate the
average number of products added to each customer’s cart,
while finding the
top ten most common products added across all carts for the day
.How would you go about answering these questions?
Solving the Problem
Solving most data problems begins with the data and a specification of what we want to accomplish.
Step 1: Start up the spark-shell
$SPARK_HOME/bin/spark-shell
>>
Step 2: create rows of data to work with.
With the
spark-shell
spun up, we’ll create a few rows of data to work with.You can copy and paste the code (if reading on your laptop) by pasting directly into the
spark-shell
, or you can simply write each line into thespark-shell
.To enable copy-paste in the
spark-shell
just type
:paste
Step 3: Working within shell.
You should see // Entering paste mode (
ctrl+d
to finish). To exit paste mode and interpret the code block from below, just pressctrl+d
. You should see // Exiting paste mode, now interpreting.Here we are importing the functions necessary for using SparkSQL.
spark.implicits._
enables us to convert and encode our Scala objects into the format needed by Spark for the DataFrame APIs.
import spark.implicits._ import org.apache.spark.sql.functions._
>>
We will then continue by defining our data model using the Activity
case class.
case class Activity( userId: String, cartId: String, itemId: String)
‘val
' modifier is used to define constants. val
means constant or immutable that means we cannot change its value once its created.
val activities = Seq( Activity("u1", "c1", "i1"), Activity("u1", "c1", "i2"), Activity("u2", "c2", "i1"), Activity("u3", "c3", "i3"), Activity("u4", "c4", "i3"))
With a SparkSession
, applications can create DataFrames from an existing RDD
, from a Hive table, or from Spark data sources.
As an example, the following creates a DataFrame based on the content defined by our data model via the
Activity
case class:
val df = spark.createDataFrame(activities).toDF("user_id", "cart_id", "item_id")
That’s neat, now let's see the output. At this point, Spark will present you with a formatted table view of your activities’ DataFrame.
df.show()
🕯️ This sets the stage for our first micro-application.
Armed with some data, let’s go ahead and walk through a solution to the three questions proposed at the beginning of this exercise.
Problem 1: Find the Daily Active Users for a Given Day.
Getting the Distinct Users
df.select("user_id").distinct().count()
Problem 2: Calculate the Daily Average Number of Items Across All User Carts
This next problem is a bit more complicated, as it requires the data to be grouped and aggregated.
But as you will see, it can be expressed intuitively as well. Paste the code below into the spark-shell (remember to use: paste, then copy, then interpret with Ctrl+D).
val avg_cart_items = df.select("cart_id","item_id") .groupBy("cart_id") .agg(count(col("item_id")) as "total") .agg(avg(col("total")) as "avg_cart_items")
avg_cart_items.show()
Problem 3: Generate the Top Ten Most Added Items Across All User Carts
Grouping, Aggregating, Sorting, and Limiting the Result Set in Order to See the Top N Items in Users’ Carts:
df.select("item_id") .groupBy("item_id") .agg(count(col("item_id")) as "total") .sort(desc("total")) .limit(10) .show()
Great! This exercise gave you some hands-on experience with Spark using the spark-shell. While the code in these exercises is simple, my hope is that it got your gears turning and excited for what is to come.
Now that we are finished with this exercise, we can safely stop the
spark-shell
. To do so, all you need to do is pressCtrl+C
to close the session, or press:q
to quit without any error codes. This will tell the driver application to stop running thespark-shell.