Comprehensive Introduction to Apache Spark, RDDs & Dataframes (using PySpark).
Complete Guide on DataFrame Operations in PySpark,
Step by Step Guide for Beginners to Learn SparkR

Introduction

Industry estimates that we are creating more than 2.5 Quintillion bytes of data every year.

Think of it for a moment – 1 Qunitillion = 1 Million Billion! Can you imagine how many drives / CDs / Blue-ray DVDs would be required to store them? It is difficult to imagine this scale of data generation even as a data science professional. While this pace of data generation is very exciting, it has created entirely new set of challenges and has forced us to find new ways to handle Big Huge data effectively.

Big Data is not a new phenomena. It has been around for a while now. However, it has become really important with this pace of data generation. In past, several systems were developed for processing big data. Most of them were based on MapReduce framework. These frameworks typically rely on use of hard disk for saving and retrieving the results. However, this turns out to be very costly in terms of time and speed.

On the other hand, Organizations have never been more hungrier to add a competitive differentiation through understanding this data and offering its customer a much better experience. Imagine how valuable would be Facebook, if it did not understand your interests well? The traditional hard disk based MapReduce kind of frameworks do not help much to address this challenge.

In this article, I will introduce you to one such framework, which has made querying and analysing data at a large scale much more efficient than previous systems / frameworks – Read on!

P.S. This article is meant for complete beginners on the topic and presumes minimal prior knowledge in Big Data

 

Table of Contents

  1. Challenges while working with Big Data
  2. Introduction to Distributed Computing Framework
  3. What is Apache Spark?
    • History of Spark
    • Common terms used
    • Benefits of Spark over traditional big data frameworks
  4. Installation of Apache Spark (with Python)
  5. Python vs Scala
  6. Getting up to speed with RDD / Dataframe / Dataset
  7. Solving a machine learning problem

 

Challenges while working with big data

Challenges associated with big data can be classified in following categories:

Now that I have spoken of Distributed computing, let us get a bit deeper into it!

 

What is Distributing Computing Framework?

In simple terms, distributed computing is just a distributed system, where multiple machines are doing certain work at the same time. While doing the work, machines will communicate with each other by passing messages between them. Distributed computing is useful, when there is requirement of fast processing (computation) on huge data.

Let us take a simple analogy to explain the concept. Let us say, you had to count the number of books in various sections of a really large library. And you have to finish it in less than an hour. This number has to be exact and can not be approximated. What would you do? If I was in this position, I would call up as many friends as I can and divide areas / rooms among them. I’ll divide the work in non-overlapping manner and ask them to report back to be in 55 minutes. Once they come back, I’ll simply add up the numbers to come up with a solution. This is exactly how distributed computing works.

Apache Hadoop and Apache Spark are well-known examples of Big data processing systems. Hadoop and Spark are designed for distributed processing of large data sets across clusters of computers. Although, Hadoop is widely used for fast distributed computing, it has several disadvantages. For example, it does not use “In-memory computation“, which is nothing but keeping the data in RAM instead of Hard Disk for fast processing. In-memory computation enables faster processing of Big data. When Apache Spark was developed, it overcame this problem by using In-memory computation for fast computing.

MapReduce is also used widely, when the task is to process huge amounts of data, in parallel (more than one machines are doing a certain task at the same time), on large clusters. You can learn more about MapReduce from this link.

 

What is Apache Spark?

Apache Spark is a fast cluster computing framework which is used for processing, querying and analyzing Big data. It is based on In-memory computation, which is a big advantage of Apache Spark over several other big data Frameworks. Apache Spark is open source and one of the most famous Big data framework. It can run tasks up to 100 times faster, when it utilizes the in-memory computations and 10 times faster when it uses disk than traditional map-reduce tasks.

Please note that Apache Spark is not a replacement of Hadoop. It is actually designed to run on top of Hadoop.

 

History of Apache Spark

Apache Spark was originally created at University of California, Berkeley’s AMPLab in 2009. The Spark code base was later donated to the Apache Software Foundation. Subsequently, it was open sourced in 2010. Spark is mostly written in Scala language. It has some code written in Java, Python and R. Apache Spark provides several APIs for programmers which include Java, Scala, R and Python.

 

Key terms used in Apache Spark:

cluster

Image source: https://spark.apache.org/docs/1.1.1/img/cluster-overview.png

Spark Context: It holds a connection with Spark cluster manager. All Spark applications run as independent set of processes, coordinated by a SparkContext in a program.

Driver and Worker: A driver is incharge of the process of running the main() function of an application and creating the SparkContext. A worker, on the other hand, is any node that can run program in the cluster. If a process is launched for an application, then this application acquires executors at worker node.

Cluster Manager: Cluster manager allocates resources to each application in driver program. There are three types of cluster managers supported by Apache Spark – Standalone, Mesos and YARN. Apache Spark is agnostic to the underlying cluster manager, so we can install any cluster manager, each has its own unique advantages depending upon the goal. They all are different in terms of scheduling, security and monitoring. Once SparkContext connects to the cluster manager, it acquires executors on a cluster node, these executors are worker nodes on cluster which work independently on each tasks and interact with each other.

 

How Apache Spark is better than traditional big data framework?

In-memory computation: The biggest advantage of Apache Spark comes from the fact that it saves and loads the data in and from the RAM rather than from the disk (Hard Drive). If we talk about memory hierarchy, RAM has much higher processing speed than Hard Drive (illustrated in figure below). Since the prices of memory has come down significantly in last few years, in-memory computations have gained a lot of momentum.

Spark uses in-memory computations to speed up 100 times faster than Hadoop framework.

Image Source: https://en.wikipedia.org/wiki/Memory_hierarchy

In Hadoop, tasks are distributed among the nodes of a cluster, which in turn save data on disk. When that data is required for processing, each node has to load the data from the disk and save the data into disk after performing operation. This process ends up adding cost in terms of speed and time, because disk operations are far slower than RAM operations. It also requires time to convert the data in a particular format when writing the data from RAM to disk. This conversion is known as Serialization and reverse is Deserialization.

Let’s look at the MapReduce process to understand the advantage of in-memory computation better. Suppose, there are several map-reduce tasks happening one after another. At the start of the computations, both technologies (Hadoop and Spark), read the data from disk for mapping. Hadoop performs the map operation and saves the results back to hard drive. However, in case of Apache Spark, the results are stored in RAM.

In the next step (Reduce operation), Hadoop reads the saved data from the hard drive, where as Apache Spark reads it from RAM. This creates a difference in a single MapReduce operation. Now imagine, if there were multiple map-reduce operations, how much time difference would you see at the end of task completion.

big1

Language Support: Apache Spark has API support for popular data science languages like Python, R, Scala and Java.

Supports Real time and Batch processing: Apache Spark supports “Batch data” processing where a group of transactions is collected over a period of time. It also supports real time data processing, where data is continuously flowing from the source. For example, weather information coming in from sensors can be processed by Apache Spark directly.

Lazy operation: Lazy operations are used to optimize solutions in Apache Spark. I will discuss about lazy evaluation in later part of this article. For now, we can think that there are some operations which do not execute until we require results.

Support for multiple transformations and actions: Another advantage of Apache Spark over Hadoop is that Hadoop supports only MapReduce but Apache Spark support many transformations and actions including MapReduce.

There are further advantages of Apache Spark in comparison to Hadoop. For example, Apache Spark is much faster while doing Map side shuffling and Reduce side shuffling. However, shuffling is a complex topic in itself and requires an entire article in itself. Hence, I am not talking about it in more details here.

 

Installation of Apache Spark with PySpark

We can install Apache Spark in many different ways. Easiest way to install Apache Spark is to start with installation on a single machine. Again, we will have choices of different Operating Systems. For installing in a single machine, we need to have certain requirements fulfilled. I am sharing steps to install for Ubuntu (14.04) for Spark version 1.6.0. I am installing Apache Spark with Python which is known as PySpark (Spark Python API for programmer). If you are interested in the R API SparkR, have a look at this learning path.

OS: Ubuntu 14.04, 64 bit . (If you are running on Windows or Mac and are new to this domain, I would strongly suggest to create a Virtual Ubuntu machine with 4 GB RAM and follow the rest of the process).

Softwares Required: Java 7+, Python 2.6+, R 3.1+

Installation Steps:

Step 0: Open the terminal.

Step 1: Install Java

 $ sudo apt-add-repository ppa:webupd8team/java
 $ sudo apt-get update
 $ sudo apt-get install oracle-java7-installer

If you are asked to accept Java license terms, click on “Yes” and proceed. Once finished, let us check whether Java has installed successfully or not. To check the Java version and installation, you can type:

 $ java -version

Step 2 : Once Java is installed, we need to install Scala

 $ cd ~/Downloads
 $ wget http://www.scala-lang.org/files/archive/scala-2.11.7.deb
 $ sudo dpkg -i scala-2.11.7.deb
 $ scala –version

This will show you the version of Scala installed

 

Step 3: Install py4j

Py4J is used on the driver for local communication between the Python and Java SparkContext objects; large data transfers are performed through a different mechanism.

 $ sudo pip install py4j

Step 4: Install Spark.

By now, we have installed the dependencies which are required to install Apache Spark. Next, we need to download and extract Spark source tar. We can get the latest version Apache Spark using wget:

 $ cd ~/Downloads
 $ wget http://d3kbcqa49mib13.cloudfront.net/spark-1.6.0.tgz
 $ tar xvf spark-1.6.0.tgz

Step 5: Compile the extracted source

sbt is an open source build tool for Scala and Java projects which is similar to Java’s Maven.

 $ cd ~/Downloads/spark-1.6.0
 $ sbt/sbt assembly

This will take some time to install Spark. After installing, we can check whether Spark is running correctly or not by typing.

 $ ./bin/run-example SparkPi 10

this will produce the output:

Pi is roughly 3.14042

To see the above results we need to lower the verbosity level of the log4j logger in log4j.properties.

$ cp conf/log4j.properties.template conf/log4j.properties
$ nano conf/log4j.properties


After opening the file ‘log4j.properties’, we need to replace following line:

log4j.rootCategory=INFO, console


by

log4j.rootCategory=ERROR, console

Step 6: Move the files in the right folders (to make it convenient to access them)

$ sudo mv ~/Downloads/spark-1.6.0 /opt/
$ sudo ln -s /opt/spark-1.6.0 /opt/spark

Add this to your path by editing your bashrc file:

Step 7: Create environment variables. To set the environment variables, open bashrc file in any editor.

 $ nano ~/.bashrc

Set the SPARK_HOME and PYTHONPATH by adding following lines at the bottom of this file

export SPARK_HOME=/opt/spark
export PYTHONPATH=$SPARK_HOME/python

Next, restart bashrc by typing in:

 $ . ~/.bashrc

Let’s add this setting for ipython by creating a new python script to automatically export settings, just in case above change did not work.

$ nano ~/.ipython/profile_default/startup/load_spark_environment_variables.py

Paste some lines in this file.

import os
import sys

if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME'] = '/opt/spark'

if '/opt/spark/python' not in sys.path:
    sys.path.insert(0, '/opt/spark/python')

Step 8: We are all set now. Let us start PySpark by typing command in root directory:

 $ ./bin/pyspark --packages

We can also start ipython notebook in shell by typing:

 $ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

When we launch the shell in PySpark, it will automatically load spark Context as sc and SQLContext as sqlContext.

 

Python vs Scala:

One of the common question people ask is whether it is necessary to learn Scala to learn Spark? If you are some one who already knows Python to some extent or are just exploring Spark as of now, you can stick to Python to start with. However, if you want to process some serious data across several machines and clusters, it is strongly recommended that you learn Scala. Computation speed in Python is much slower than Scala in Apache Spark.

 

Apache Spark data representations: RDD / Dataframe / Dataset

Spark has three data representations viz RDD, Dataframe, Dataset. For each data representation, Spark has a different API. For example, later in this article I am going to use ml (a library), which currently supports only Dataframe API. Dataframe is much faster than RDD because it has metadata (some information about data) associated with it, which allows Spark to optimize query plan. Refer to this link to know more about optimization. The Dataframe feature in Apache Spark was added in Spark 1.3. If you want to know more in depth about when to use RDD, Dataframe and Dataset you can refer this link.

In this article, I will first spend some time on RDD, to get you started with Apache Spark. Later, I will spend some time on Dataframes. Dataframes share some common characteristics with RDD (transformations and actions). In this article, I am not going to talk about Dataset as this functionality is not included in PySpark.

RDD:

After installing and configuring PySpark, we can start programming using Spark in Python. But to use Spark functionality, we must use RDD. RDD (Resilient Distributed Database) is a collection of elements, that can be divided across multiple nodes in a cluster to run parallel processing. It is also fault tolerant collection of elements, which means it can automatically recover from failures. RDD is immutable, we can create RDD once but can’t change it. We can apply any number of operation on it and can create another RDD by applying some transformations. Here are a few things to keep in mind about RDD:

 

We can apply 2 types of operations on RDDs:

Transformation: Transformation refers to the operation applied on a RDD to create new RDD.
Action: Actions refer to an operation which also apply on RDD that perform computation and send the result back to driver.

Example: Map (Transformation) performs operation on each element of RDD and returns a new RDD. But, in case of Reduce (Action), it reduces / aggregates the output of a map by applying some functions (Reduce by key). There are many transformations and actions are defined in Apache Spark documentation, I will discuss these in a later article.

 

RDDs use Shared Variable:
The parallel operations in Apache Spark use shared variable. It means that whenever a task is sent by a driver to executors program in a cluster, a copy of shared variable is sent to each node in a cluster, so that they can use this variable while performing task. Accumulator and Broadcast are the two types of shared variables supported by Apache Spark.
Broadcast: We can use the Broadcast variable to save the copy of data across all node.
Accumulator: In Accumulator variables are used for aggregating the information.

 

How to Create RDD in Apache Spark

Existing storage: When we want to create a RDD though existing storage in driver program (which we would like to be parallelized). For example, converting a list to RDD, which is already created in a driver program.

External sources: When we want to create a RDD though external sources such as a shared file system, HDFS, HBase, or any data source offering a Hadoop Input Format.

 

Writing first program in Apache Spark

I have already discussed that RDD supports two type of operations, which are transformation and action. Let us get down to writing our first program:

Step1: Create SparkContext

First step in any Apache programming is to create a SparkContext. SparkContext is needed when we want to execute operations in a cluster. SparkContext tells Spark how and where to access a cluster. It is first step to connect with Apache Cluster. If you are using Spark Shell, we will find that this is already created. Otherwise, we can create the Spark Context by importing, initializing and providing the configuration settings. For example:

 from pyspark import SparkContext
 sc = SparkContext()

Step2: Create a RDD

I have already discussed that we can create RDD in two ways: Either from an existing storage or from an external storage. Let’s create our first RDD. SparkContext has parallelize method, which is used for creating the Spark RDD from an iterable (like list, tuple..) already present in driver program.

We can also provide the number of partitions as a parameter to parallelize method. If we do not give number of partition parameter, then Spark will automatically set the number of partition in a cluster. The number of partition can be set manually by passing second parameter to parallelize method. For example, sc.parallelize(data, 10)), where data is an existing data in driver program and 10 is the number of partitions.
Lets create the first Spark RDD called rdd.

data = range(1,1000)
rdd = sc.parallelize(data)

We have a collect method to see the content of RDD.

rdd.collect()

To see the first n element of a RDD we have a method take:

rdd.take(2) # It will print first 2 elements of rdd

We have 2 parallel operations in RDD which are Transformation and Action. Transformation and Action were already discussed briefly earlier. So let’s see how transformation works. Remember that RDDs are immutable – so we can’t change our RDD, but we can apply transformation on it. Let’s see an example of map transformation to demonstrate how transformation works.

Step 3: Map transformation.

Map transformation returns a Mapped RDD by applying function to each element of the base RDD. Let’s repeat the first step of creating a RDD from existing source, For example,

data = ['Hello' , 'I' , 'AM', 'Ankit ', 'Gupta']
Rdd = sc.parallelize(data)

Now a RDD (name is ‘Rdd’) is created from the existing source, which is a list of string in a driver program. We will now apply lambda function to each element of Rdd and return the mapped (transformed) RDD (word,1) pair in the Rdd1.

Rdd1 = Rdd.map(lambda x: (x,1))

Let’s see the out of this map operation.

Rdd1.collect()
output: [('Hello', 1), ('I', 1), ('AM', 1), ('Ankit ', 1), ('Gupta', 1)]

If you noticed, nothing happened after applying the lambda function on Rdd1 (we won’t see any computation happening in a cluster). This is called the lazy operation. All transformation operations in Spark are lazy, which means that we will not see any computations on RDD, until we need them for further action.

Spark remembers which transformation is applied to which RDD with the help of DAG (Directed a Cyclic Graph). The lazy evaluation helps Spark to optimize the solution because Spark will get time to see the DAG before actually executing the operations on RDD. This enables Spark to run operations more efficiently.

In the code above, collect() and take() are the examples of an action.

There are many number of transformation defined in Apache Spark. We will talk more about them in a future post.

 

Solving a machine learning problem:

We have covered a lot of ground already. We started with understanding what Spark brings to the table, its data representations, installed Spark and have already played with our first RDD. Now, I’ll demonstrate solution to “Practice Problem: Black Friday” using Apache Spark. Even if you don’t understand these commands completely as of now, it is fine. Just follow along, we will take them up again in a future tutorial.

Let’s look at the steps:

 

Reading a data file (csv)

For reading the csv file in Apache Spark, we need to specify the library in python shell. Lets read the the data from a csv files to create the Dataframe and apply some data science skills on this Dataframe like we do in Pandas.

For reading the csv file, first we need to download Spark-csv package (Latest) and extract this package into the home directory of Spark. Then, we need to open a PySpark shell and include the package (I am using “spark-csv_2.10:1.3.0”).

$ ./bin/pyspark --packages com.databricks:spark-csv_2.10:1.3.0

In Apache Spark, we can read the csv file and create a Dataframe with the help of SQLContext. Dataframe is a distributed collection of observations (rows) with column name, just like a table. Let’s see how can we do that.

Please note that since I am using pyspark shell, there is already a sparkContext and sqlContext available for me to use. In case, you are not using pyspark shell, you might need to type in the following commands as well:

sc = sparkContext()
sqlContext = SQLContext(sc)

First download the train and test file and load these with the help of SparkContext

train = sqlContext.load(source="com.databricks.spark.csv", path = 'PATH/train.csv', header = True,inferSchema = True)
test = sqlContext.load(source="com.databricks.spark.csv", path = 'PATH/test-comb.csv', header = True,inferSchema = True)

PATH is the location of folder, where your train and test csv files are located. Header is True, it means that the csv files contains the header. We are using inferSchema is True for telling sqlContext to automatically detect the data type of each column in data frame. If we do not set inferSchema to true, all columns will be read as string.

 

Analyze the data type

To see the types of columns in Dataframe, we can use the method printSchema(). Lets apply printSchema() on train which will Print the schema in a tree format.

train.printSchema()

 

Previewing the data set

To see the first n rows of a Dataframe, we have head() method in PySpark, just like pandas in python. We need to provide an argument (number of rows) inside the head method. Lets see first 10 rows of train:

train.head(10)

To see the number of rows in a data frame we need to call a method count(). Lets check the number of rows in train. The count method in pandas and Spark are different.

train.count()

 

Impute Missing values

We can check number of not null observations in train and test by calling drop() method. By default, drop() method will drop a row if it contains any null value. We can also pass ‘all” to drop a row only if all its values are null.

train.na.drop().count(),test.na.drop('any').count()

Here, I am imputing null values in train and test file with -1. Imputing the values with -1 is not an elegant solution. We have several algorithms / techniques to impute null values, but for the simplicity I am imputing null with constant value (-1). We can transform our base train, test Dataframes after applying this imputation. For imputing constant value, we have fillna method. Lets fill the -1 in-place of null in all columns.

train = train.fillna(-1)
test = test.fillna(-1)

 

Analyzing numerical features

We can also see the various summary Statistics of a Dataframe columns using describe() method, which shows statistics for numerical variables. To show the results we need to call show() method.

train.describe().show()

 

Sub-setting Columns

Let’s select a column called ‘User_ID’ from a train, we need to call a method ‘select’ and pass the column name which we want to select. The select method will show result for selected column. We can also select more than one column from a data frame by providing columns name separated by comma.

train.select('User_ID').show()

 

Analyzing categorical features

To start building a model, we need to see the distribution of categorical features in train and test. Here I am showing this for only Product_ID but we can also do the same for any categorical feature. Let’s see the number of distinct categories of “Product_ID” in train and test. Which we can do by applying methods distinct() and count().

train.select('Product_ID').distinct().count(), test.select('Product_ID').distinct().count()
Output:(3631, 3491)

After counting the number of distinct values for train and test we can see the train has more categories than test. Let us check what are the categories for Product_ID, which are in test but not in train by applying subtract method.We can also do the same for all categorical feature.

diff_cat_in_train_test=test.select('Product_ID').subtract(train.select('Product_ID'))
diff_cat_in_train_test.distinct().count()# For distict count
Output: 46

Above you can see that 46 different categories are in test not in train. In this case, either we collect more data about them or skip the rows in test for those categories(invalid category) which are not in train.

 

Transforming categorical variables to labels

We also need to transform categorical columns to label by applying StringIndexer Transformation on Product_ID which will encode the Product_ID column of labels to a column of label indices. You can see more about this from the link

from pyspark.ml.feature import StringIndexer
plan_indexer = StringIndexer(inputCol = 'Product_ID', outputCol = 'product_ID')
labeller = plan_indexer.fit(train)

Above, we build a ‘labeller’ by applying fit() method on train Dataframe. Later we will use this ‘labeller’ to transform our train and test. Let us transform our train and test Dataframe with the help of labeller. We need to call transform method for doing that. We will store the transformation result in Train1 and Test1.

Train1 = labeller.transform(train)
Test1 = labeller.transform(test)

Lets check the resulting Train1 Dataframe.

Train1.show()

The show method on Train1 Dataframe will show that we successfully added one transformed column product_ID in our previous train Dataframe.

 

Selecting Features to Build a Machine Learning Model

Let’s try to create a formula for Machine learning model like we do in R. First, we need to import RFormula from the pyspark.ml.feature. Then we need to specify the dependent and independent column inside this formula. We also have to specify the names for features column and label column.

from pyspark.ml.feature import RFormula
formula = RFormula(formula="Purchase ~ Age+ Occupation +City_Category+Stay_In_Current_City_Years+Product_Category_1+Product_Category_2+ Gender",featuresCol="features",labelCol="label")

After creating the formula we need to fit this formula on our Train1 and transform Train1,Test1 through this formula. Lets see how to do this and after fitting transform train1,Test1 in train1,test1.

t1 = formula.fit(Train1)
train1 = t1.transform(Train1)
test1 = t1.transform(Test1)

We can see the transformed train1, test1.

train1.show()

After applying the formula we can see that train1 and test1 have 2 extra columns called features and label those we have specified in the formula (featuresCol=”features” and labelCol=”label”). The intuition is that all categorical variables in the features column in train1 and test1 are transformed to the numerical and the numerical variables are same as before for applying ML. Purchase variable will transom to label column. We can also look at the column features and label in train1 and test1.

train1.select('features').show()
train1.select('label').show()

 

Building a Machine Learning Model: Random Forest

After applying the RFormula and transforming the Dataframe, we now need to develop the machine learning model on this data. I want to apply a random forest regressor for this task. Let us import a random forest regressor, which is defined in pyspark.ml.regression and then create a model called rf. I am going to use default parameters for randomforest algorithm.

from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor()

After creating a model rf we need to divide our train1 data to train_cv and test_cv for cross validation.

Here we are dividing train1 Dataframe in 70% for train_cv and 30% test_cv.

(train_cv, test_cv) = train1.randomSplit([0.7, 0.3])

Now build the model on train_cv and predict on test_cv. The results will save in predictions.

model1 = rf.fit(train_cv)
predictions = model1.transform(test_cv)

If you check the columns in predictions Dataframe, there is one column called prediction which has prediction result for test_cv.

model1 = rf.fit(train_cv)
predictions = model1.transform(test_cv)

Lets evaluate our predictions on test_cv and see what is the mean squae error.

To evaluate model we need to import RegressionEvaluator from the pyspark.ml.evaluation. We have to create an object for this. There is a method called evaluate for evaluator which will evaluate the model. We need to specify the metrics for that.

from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator()
mse = evaluator.evaluate(predictions,{evaluator.metricName:"mse" })
import numpy as np
np.sqrt(mse), mse

After evaluation we can see that our root mean square error is 3773.1460883883865 which is a square root of mse.

Now, we will implement the same process on full train1 dataset.

model = rf.fit(train1)
predictions1 = model.transform(test1)

After prediction, we need to select those columns which are required in Black Friday competition submission.

df = predictions1.selectExpr("User_ID as User_ID", "Product_ID as Product_ID", 'prediction as Purchase')

Now we need to write the df in csv format for submission.

df.toPandas().to_csv('submission.csv')

After writing into the csv file(submission.csv). We can upload our first solution to see the score, I got the score 3822.121053 which is not very bad for first model out of Spark!

 

Complete Guide on DataFrame Operations in PySpark

Introduction

In my first article, I introduced you to basic concepts of Apache Spark like how does it work, different cluster modes in Spark and What are the different data representation in Apache Spark. To provide you with a hands-on-experience, I also used a real world machine learning problem and then I solved it using PySpark.

In my second article, I introduced you on how to create RDD from different sources ( External, Existing ) and briefed you on basic operations ( Transformation and Action) on RDD.

In this article, I will be talking about DataFrame and its features in detail. Then, we will see how to create DataFrame from different sources and how to perform various operations in DataFrame.

P.S. – If you have not read the previous 2 articles, I strongly recommend that you go through them before going further.

 

Table of Contents

  1. Dataframes in PySpark: Overview
  2. Why DataFrames are Useful
  3. Setup of Apache Spark
  4. How to create a DataFrame
    1. Creating DataFrame from RDD
    2. Creating DataFrame from CSV File
  5. Dataframe Manipulations
  6. Apply SQL queries on DataFrame
  7. Pandas vs PySpark DataFrame

 

1. Dataframe in PySpark: Overview

In Apache Spark, a DataFrame is a distributed collection of rows under named columns. In simple terms, it is same as a table in relational database or an Excel sheet with Column headers. It also shares some common characteristics with RDD:

My first exposure to DataFrames was when I learnt about Pandas. Today, it is difficult for me to run my data science workflow with out Pandas DataFrames. So, when I saw similar functionality in Apache Spark, I was excited about the possibilities it opens up!

 

2. Why DataFrames ar Useful ?

I am sure this question must be lingering in your mind. To make things simpler for you, I’m listing down few advantages of DataFrames:

 

3. Setup Apache Spark

In order to understand the operations of DataFrame, you need to first setup the Apache Spark in your machine. Follow the step by step approach mentioned in my previous article, which will guide you to setup Apache Spark in Ubuntu.

DataFrame supports wide range of operations which are very useful while working with data. In this section, I will take you through some of the common operations on DataFrame.

First step, in any Apache programming is to create a SparkContext. SparkContext is required when we want to execute operations in a cluster. SparkContext tells Spark how and where to access a cluster. And the first step is to connect with Apache Cluster. If you are using Spark Shell, you will notice that it is already created. Otherwise, we can create the SparkContext by importing, initializing and providing the configuration settings. For example,

from pyspark import SparkContext
sc = SparkContext()

Again we need to do same with the SQLContext, if it is not loaded.

sqlContext = SQLContext(sc)

 

4. How to create a DataFrame ?

A DataFrame in Apache Spark can be created in multiple ways:

dataframe-in-spark

Creating DataFrame from RDD

I am following these steps for creating a DataFrame from list of tuples:

from pyspark.sql import Row
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
schemaPeople = sqlContext.createDataFrame(people)

Lets check the type of schemaPeople.

type(schemaPeople)
Output:
pyspark.sql.dataframe.DataFrame

 

For reading a csv file in Apache Spark, we need to specify a new library in our python shell. To perform this action, first we need to download Spark-csv package (Latest version) and extract this package into the home directory of Spark. Then, we need to open a PySpark shell and include the package (I am using “spark-csv_2.10:1.3.0”).

$ ./bin/pyspark --packages com.databricks:spark-csv_2.10:1.3.0

Let’s read the data from csv file and create the DataFrame. To demonstrate this I’m to using the train and test datasets from the Black Friday Practice Problem, which you can download here.

train = sqlContext.load(source="com.databricks.spark.csv", path = 'PATH/train.csv', header = True,inferSchema = True)
test = sqlContext.load(source="com.databricks.spark.csv", path = 'PATH/test-comb.csv', header = True,inferSchema = True)

PATH is the location of folder, where your train and test csv files are located. Header is True, which means that the csv files contains the header. We are using inferSchema = True option for telling sqlContext to automatically detect the data type of each column in data frame. If we do not set inferSchema to be true, all columns will be read as string.

 

5. DataFrame Manipulations

Now comes the fun part. You have loaded the dataset by now. Let us start playing with it now.

To see the types of columns in DataFrame, we can use the printSchema, dtypes. Let’s apply printSchema() on train which will Print the schema in a tree format.

train.printSchema()
Output:
root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)

From above output, we can see that, we have perfectly captured the schema / data types of each columns while reading from csv.

 

We can use head operation to see first n observation (say, 5 observation). Head operation in PySpark is similar to head operation in Pandas.

train.head(5)
Output:
[Row(User_ID=1000001, Product_ID=u'P00069042', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),
 Row(User_ID=1000001, Product_ID=u'P00248942', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200),
 Row(User_ID=1000001, Product_ID=u'P00087842', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422),
 Row(User_ID=1000001, Product_ID=u'P00085442', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=12, Product_Category_2=14, Product_Category_3=None, Purchase=1057),
 Row(User_ID=1000002, Product_ID=u'P00285442', Gender=u'M', Age=u'55+', Occupation=16, City_Category=u'C', Stay_In_Current_City_Years=u'4+', Marital_Status=0, Product_Category_1=8, Product_Category_2=None, Product_Category_3=None, Purchase=7969)]

Above results are comprised of row like format. To see the result in more interactive manner (rows under the columns), we can use the show operation. Let’s apply show operation on train and take first 2 rows of it. We can pass the argument truncate = True to truncate the result.

train.show(2,truncate= True)
Output:
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only showing top 2 rows

 

We can use count operation to count the number of rows in DataFrame. Let’s apply count operation on train & test files to count the number of rows.

train.count(),test.count()
Output:
(550068, 233599)

We have 550068, 233599 rows in train and test respectively.

 

For getting the columns name we can use columns on DataFrame, similar to what we do for getting the columns in pandas DataFrame. Let’s first print the number of columns and columns name in train file then in test file.

len(train.columns), train.columns
OutPut: 
12 ['User_ID', 'Product_ID', 'Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status', 'Product_Category_1', 'Product_Category_2', 'Product_Category_3', 'Purchase']

Lets do same for the test.

len(test.columns), test.columns
Output:
13 ['', 'User_ID', 'Product_ID', 'Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status', 'Product_Category_1', 'Product_Category_2', 'Product_Category_3', 'Comb']

From the above output we can check that we have 13 columns in test file and 12 in train file. “Purchase” not present in test file where as “Comb” is only in test file. We can also see that, we have one column (”) in test file which doesn’t have a name.

 

describe operation is use to calculate the summary statistics of numerical column(s) in DataFrame. If we don’t specify the name of columns it will calculate summary statistics for all numerical columns present in DataFrame.

train.describe().show()
Output:
+-------+------------------+-----------------+-------------------+------------------+------------------+------------------+------------------+
|summary|           User_ID|       Occupation|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|          Purchase|
+-------+------------------+-----------------+-------------------+------------------+------------------+------------------+------------------+
|  count|            550068|           550068|             550068|            550068|            376430|            166821|            550068|
|   mean|1003028.8424013031|8.076706879876669|0.40965298835780306| 5.404270017525106| 9.842329251122386|12.668243206790512| 9263.968712959126|
| stddev|1727.5915855308265|6.522660487341778| 0.4917701263173273|3.9362113692014082| 5.086589648693526| 4.125337631575267|5023.0653938206015|
|    min|           1000001|                0|                  0|                 1|                 2|                 3|                12|
|    max|           1006040|               20|                  1|                20|                18|                18|             23961|
+-------+------------------+-----------------+-------------------+------------------+------------------+------------------+------------------+

Let’s check what happens when we specify the name of a categorical / String columns in describe operation.

train.describe('Product_ID').show()
Output:
+-------+----------+
|summary|Product_ID|
+-------+----------+
|  count|    550068|
|   mean|      null|
| stddev|      null|
|    min| P00000142|
|    max|  P0099942|
+-------+----------+

As we can see that, describe operation is working for String type column but the output for mean, stddev are null and min & max values are calculated based on ASCII value of categories.

 

To subset the columns, we need to use select operation on DataFrame and we need to pass the columns names separated by commas inside select Operation. Let’s select first 5 rows of ‘User_ID’ and ‘Age’ from the train.

train.select('User_ID','Age').show(5)
 Output:
+-------+----+
|User_ID| Age|
+-------+----+
|1000001|0-17|
|1000001|0-17|
|1000001|0-17|
|1000001|0-17|
|1000002| 55+|
+-------+----+

The distinct operation can be used here, to calculate the number of distinct rows in a DataFrame. Let’s apply distinct operation to calculate the number of distinct product in train and test file each.

train.select('Product_ID').distinct().count(),test.select('Product_ID').distinct().count()
Output:
(3631, 3491)

We have 3631 & 3491 distinct product in train & test file respectively. After counting the number of distinct values for train and test files, we can see the train file has more categories than test file. Let us check what are the categories for Product_ID, which are in test file but not in train file by applying subtract operation.We can do the same for all categorical features.

diff_cat_in_train_test=test.select('Product_ID').subtract(train.select('Product_ID'))
diff_cat_in_train_test.distinct().count()# For distict count
Output:
46

Above, you can see that 46 different categories are in test file but not in train. In this case, either we collect more data about them or skip the rows in test file for those categories (invalid category) which are not in train file.

 

We can use crosstab operation on DataFrame to calculate the pair wise frequency of columns. Let’s apply crosstab operation on ‘Age’ and ‘Gender’ columns of train DataFrame.

train.crosstab('Age', 'Gender').show()
Output:
+----------+-----+------+
|Age_Gender|    F|     M|
+----------+-----+------+
|      0-17| 5083| 10019|
|     46-50|13199| 32502|
|     18-25|24628| 75032|
|     36-45|27170| 82843|
|       55+| 5083| 16421|
|     51-55| 9894| 28607|
|     26-35|50752|168835|
+----------+-----+------+

In the above output, the first column of each row will be the distinct values of Age and the column names will be the distinct values of Gender. The name of the first column will be Age_Gender. Pair with no occurrences will have zero count in contingency table.

 

We can use dropDuplicates operation to drop the duplicate rows of a DataFrame and get the DataFrame which won’t have duplicate rows. To demonstrate that I am performing this on two columns Age and Gender of train and get the all unique rows for these columns.

train.select('Age','Gender').dropDuplicates().show()
Output:
+-----+------+
|  Age|Gender|
+-----+------+
|51-55|     F|
|51-55|     M|
|26-35|     F|
|26-35|     M|
|36-45|     F|
|36-45|     M|
|46-50|     F|
|46-50|     M|
|  55+|     F|
|  55+|     M|
|18-25|     F|
| 0-17|     F|
|18-25|     M|
| 0-17|     M|
+-----+------+

The dropna operation can be use here. To drop row from the DataFrame it consider three options.

Let’t drop null rows in train with default parameters and count the rows in output DataFrame. Default options are any, None, None for how, thresh, subset respectively.

train.dropna().count()
Output:
166821

 

Use fillna operation here. The fillna will take two parameters to fill the null values.

Let’s fill ‘-1’ inplace of null values in train DataFrame.

train.fillna(-1).show(2)
Output:
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only showing top 2 rows

 

We can apply the filter operation on Purchase column in train DataFrame to filter out the rows with values more than 15000. We need to pass a condition. Let’s apply filter on Purchase column in train DataFrame and print the number of rows which has more purchase than 15000.

train.filter(train.Purchase > 15000).count()
Output:
110523

The groupby operation can be used here to find the mean of Purchase for each age group in train. Let’s see how can we get the mean purchase for the ‘Age’ column train.

train.groupby('Age').agg({'Purchase': 'mean'}).show()
Output:
+-----+-----------------+
|  Age|    avg(Purchase)|
+-----+-----------------+
|51-55|9534.808030960236|
|46-50|9208.625697468327|
| 0-17|8933.464640444974|
|36-45|9331.350694917874|
|26-35|9252.690632869888|
|  55+|9336.280459449405|
|18-25|9169.663606261289|
+-----+-----------------+

We can also apply sum, min, max, count with groupby when we want to get different summary insight each group. Let’s take one more example of groupby to count the number of rows in each Age group.

train.groupby('Age').count().show()
Output:
+-----+------+
|  Age| count|
+-----+------+
|51-55| 38501|
|46-50| 45701|
| 0-17| 15102|
|36-45|110013|
|26-35|219587|
|  55+| 21504|
|18-25| 99660|
+-----+------+

 

We can use sample operation to take sample of a DataFrame. The sample method on DataFrame will return a DataFrame containing the sample of base DataFrame. The sample method will take 3 parameters.

Let’s create the two DataFrame t1 and t2 from train, both will have 20% sample of train and count the number of rows in each.

t1 = train.sample(False, 0.2, 42)
t2 = train.sample(False, 0.2, 43)
t1.count(),t2.count()
Output:
(109812, 109745)

 

We can apply a function on each row of DataFrame using map operation. After applying this function, we get the result in the form of RDD. Let’s apply a map operation on User_ID column of train and print the first 5 elements of mapped RDD(x,1) after applying the function (I am applying lambda function).

train.select('User_ID').map(lambda x:(x,1)).take(5)
Output:
[(Row(User_ID=1000001), 1),
 (Row(User_ID=1000001), 1),
 (Row(User_ID=1000001), 1),
 (Row(User_ID=1000001), 1),
 (Row(User_ID=1000002), 1)]

In above code we have passed lambda function in the map operation which will take each row / element of ‘User_ID’ one by one and return pair for them (‘User_ID’,1).

 

We can use orderBy operation on DataFrame to get sorted output based on some column. The orderBy operation take two arguments.

Let’s sort the train DataFrame based on ‘Purchase’.

train.orderBy(train.Purchase.desc()).show(5)
Output:
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1003160| P00052842|     M|26-35|        17|            C|                         3|             0|                10|                15|              null|   23961|
|1002272| P00052842|     M|26-35|         0|            C|                         1|             0|                10|                15|              null|   23961|
|1001474| P00052842|     M|26-35|         4|            A|                         2|             1|                10|                15|              null|   23961|
|1005848| P00119342|     M|51-55|        20|            A|                         0|             1|                10|                13|              null|   23960|
|1005596| P00117642|     M|36-45|        12|            B|                         1|             0|                10|                16|              null|   23960|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only showing top 5 rows

 

We can use withColumn operation to add new column (we can also replace) in base DataFrame and return a new DataFrame. The withColumn operation will take 2 parameters.

Let’s see how withColumn works. I am calculating new column name ‘Purchase_new’ in train which is calculated by dviding Purchase column by 2.

train.withColumn('Purchase_new', train.Purchase /2.0).select('Purchase','Purchase_new').show(5)
Output:
+--------+------------+
|Purchase|Purchase_new|
+--------+------------+
|    8370|      4185.0|
|   15200|      7600.0|
|    1422|       711.0|
|    1057|       528.5|
|    7969|      3984.5|
+--------+------------+
only showing top 5 rows

 

To drop a column from the DataFrame we can use drop operation. Let’s drop the column called ‘Comb’ from the test and get the remaining columns in test.

test.drop('Comb').columns
Output:
['',
 'User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3']

Here, we can use a user defined function ( udf ) to remove the categories of a column which are in test but not in train. Let’s again calculate the categories in Product_ID column which are in test but not in train.

diff_cat_in_train_test=test.select('Product_ID').subtract(train.select('Product_ID'))
diff_cat_in_train_test.distinct().count()# For distict count
Output:
46

We have got 46 different categories in test. For removing these categories from the test ‘Product_ID’ column. I am applying these steps.

Let’s see how it works. First create ‘not_found_cat’

not_found_cat = diff_cat_in_train_test.distinct().rdd.map(lambda x: x[0]).collect()
len(not_found_cat)
Output: 
46

Now resister the udf, we need to import StringType from the pyspark.sql and udf from the pyspark.sql.functions. The udf function takes 2 parameters as arguments:

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
F1 = udf(lambda x: '-1' if x in not_found_cat else x, StringType())

In the above code function name is ‘F1’ and we are putting ‘-1’ for not found catagories in test ‘Product_ID’. Finally apply above ‘F1’ function on test ‘Product_ID’ and take result in k1 for new column calles “NEW_Product_ID”.

k = test.withColumn("NEW_Product_ID",F1(test["Product_ID"])).select('NEW_Product_ID')

Now, let’s see the results by again calculating the different categories in k and train subtract operation.

diff_cat_in_train_test=k.select('NEW_Product_ID').subtract(train.select('Product_ID'))
diff_cat_in_train_test.distinct().count()# For distinct count
Output:
1

The output 1 means we have now only 1 different category k and train.

diff_cat_in_train_test.distinct().collect()
Output:
Row(NEW_Product_ID=u'-1')

 

6. How to Apply SQL Queries on DataFrame?

We have already discussed in the above section that DataFrame has additional information about datatypes and names of columns associated with it. Unlike RDD, this additional information allows Spark to run SQL queries on DataFrame. To apply SQL queries on DataFrame first we need to register DataFrame as table. Let’s first register train DataFrame as table.

train.registerAsTable('train_table')

In the above code, we have registered ‘train’ as table(‘train_table’) with the help of registerAsTable operation. Let’s apply SQL queries on ‘train_table’ to select Product_ID the result of SQL query will be a DataFrame. We need to apply a action to get the result.

sqlContext.sql('select Product_ID from train_table').show(5)
Output:
+----------+
|Product_ID|
+----------+
| P00069042|
| P00248942|
| P00087842|
| P00085442|
| P00285442|
+----------+

In the above code, I am using sqlContext.sql for specifying SQL query.

Let’s get maximum purchase of each Age group in train_table.

sqlContext.sql('select Age, max(Purchase) from train_table group by Age').show()
Output:
+-----+-----+
|  Age|  _c1|
+-----+-----+
|51-55|23960|
|46-50|23960|
| 0-17|23955|
|36-45|23960|
|26-35|23961|
|  55+|23960|
|18-25|23958|
+-----+-----+

 

7. Pandas vs PySpark DataFrame

Pandas and Spark DataFrame are designed for structural and semistructral data processing. Both share some similar properties (which I have discussed above). The few differences between Pandas and PySpark DataFrame are:

In addition to above points, Pandas and Pyspark DataFrame have some basic differences like columns selection, filtering, adding the columns, etc. which I am not covering here.

 

Learning Path : Step by Step Guide for Beginners to Learn SparkR

Introduction

Lately, I’ve been reading the book Data Scientist at Work to draw some inspiration from successful data scientists. Among other things, I found that most of the data scientists have emphasized upon the evolution of Spark and its incredible extent of computational power.

This piqued my interest to know more about Spark. Since then, I’ve done an extensive research on this topic to come across every possible bit of information I could find.

Fortunately, Spark has extensive packages for different programming languages. I think, being an R user, my inherent inclination to SparkR is justified.

After I finished with the research, I realized there is no structured learning path available on SparkR. I even connected with folks who are keen to learn SparkR, but none came across such structured learning path. Have you faced the same difficulty ? If yes, here’s your answer.

This inspired me to create this step by step learning path. I’ve listed the best resources available on SparkR. If you manage to complete the 7 steps thoroughly, you are expected to acquire intermediate level of adeptness on Spark. However, your journey from intermediate to expert level would require hours of practice. You knew that, right ? Let’s begin!

 

Step 1: What is Spark? Why do we need it?

Spark is an Apache project promoted as “lightning fast cluster computing”. It’s astonishing computing speed makes it 100x faster than hadoop and 10x faster than Mapreduce in memory. For large data processing, Spark has become first choice of every data scientist or engineer today.

You see Amazon, eBay, Yahoo, Facebook, everyone is using Spark for data processing on insanely large data sets. Apache Spark has one of the fastest growing big data community with more than 750 contributors from 200+ companies worldwide. According to the 2015 Data Science Salary Survey by O’Reilly, presence of Apache Spark skills added $11,000 extra to the median salary.

To explore the amazing world of Spark in detail, you can refer this article.

You can also watch this video to learn more about the value that Spark has added to the business world:

However, if you more of a person who read stuffs, you can skip the video and check this recommended blog.

Interesting Read: Apache officially sets a new record in large scale sorting

 

Step 2: What is Spark R?

Being an R user, let’s channelize our focus on SparkR.

R is one of the most widely used programming languages in data science. With its simple syntax and ability to run complex algorithms, it is probably the first choice of language for beginners.

But, R suffers from a problem. That is, its data processing capacity is limited to memory on a single node. This limits the amount of data you can process with R. Now, you know why does R runs out of memory when you attempt to work on large data sets. To overcome this memory problem, we can use SparkR.

Along with R, Apache Spark provides APIs for various languages such as Python, Scala, Java, SQL and many more. These APIs act as a bridge in connecting these tools with Spark.

For a detailed view of SparkR, this is a must watch video:

Note: SparkR has a limitation. Currently, it only support linear predictive models. Therefore, if you were excited to run boosting algorithm on SparkR, you might have to wait until the next version is rolled out.

 

Step 3 : Setting up your Machine

If you are still reading, I presume that this new technology has sparked a curiosity in you and that you would be determined to complete this journey. So, lets move on with setting up the machine:

To install SparkR, firstly, we need to install Spark in our systems, since it runs at the backend.

Following resources will help you in installation on your respective OS:

  1. Windows
  2. Ubuntu
  3. Mac OS

After you’ve successfully installed, it just takes few extra steps to initiate SparkR , once you are done with Spark installation. Following resources will help you to initiate SparkR locally:

  1. Windows
  2. Ubuntu
  3. Mac OS

 

Step 4 : Getting the Basics Right

Start with R: Though I assume that you would be knowing R if you are interested to work with Big Data. However, if R is not your domain, this course by data camp will help you to get started with R.

Exercise: Install a package swirl in R and do the complete set of exercises.

Database handling with SQL: SQL is widely used in SparkR in order to implement functions easily using simple commands. This helps in reducing the code lines you have to write. Also, increases the speed of operations. If you are not familiar with SQL, you should do this course by codecademy.

Exercise: Practice 1 and Practice 2

 

Step 5 : Data Exploration with SparkR and SQL

Once your basics are at place, it’s time to learn to work with SparkR & SQL.

SparkR enables us to use a number of data exploration operations using a combination of R and SQL simultaneously. The most common ones being select, collect, group_By, summarize, subset and arrange. You can learn these operations with this article.

Exercise: Do this exercise by AmpBerkley

Dataset used in above exercise: Download

 

Step 6 : Building Predictive Models (Linear) on SparkR

As mentioned above, SparkR only supports linear modeling algorithms such as Regression. However, it’s just a matter of time until we are facing this constraint. I am expecting them to soon roll out an updated version which would support non-linear models as well.

SparkR implements linear modeling using the function glm. On the other hand, at present, Spark has a machine learning library known as MLlib (for more info on MLlib, click here), which supports non-linear modeling.

Learn and Practice: To build your first linear regression model on SparkR, follow this link. To build a logistic regression model, follow this link.

 

Step 7 : Integrating SparkR with Hive for Faster Computation

SparkR works even faster with Apache Hive for database management.

Apache Hive is a data warehouse infrastructure built on top of Hadoop for providing data summarization, query, and analysis. Integrating Hive with SparkR would help running queries even faster and more efficiently.

If you want to step into bigdata, the use of hive would really be a great advantage for efficient data processing. You can install Hive by following the links given for respective OS:

  1. For Windows
  2. For Ubuntu
  3. For Mac OS

After you’ve installed R successfully, you can start integrating Hive with SparkR using the steps demonstrated in this video. Alternatively, if you are more comfortable in reading, this video is also available in text format on this blog.

For a quick overview on SparkR, you can also follow its official documentation.