Running PySpark MLlib Jobs Using Google Cloud Dataproc And Amazon Elastic MapReduce

Every minute, the internet generates an enormous amount of data. Considering social media platforms, for instance; Twitter generates around 350,000 tweets per minute; YouTube users upload 500 hours of videos per minute; Facebook users like approximately 4 million posts per minute. The data generated is simply far too large to be stored and processed by […]
Oct 13th 2021
read

Share this post

Oct 13th 2021
read

Share this post

Running PySpark MLlib Jobs Using Google Cloud Dataproc And Amazon Elastic MapReduce

Ogbeide Nelson

We share blogs from our research team.

Every minute, the internet generates an enormous amount of data. Considering social media platforms, for instance; Twitter generates around 350,000 tweets per minute; YouTube users upload 500 hours of videos per minute; Facebook users like approximately 4 million posts per minute. The data generated is simply far too large to be stored and processed by a local computer. Furthermore, some industries require real-time analytics on the stored data.

In the health sector, real-time analytics improves the quality of care, reduces costs, and helps comply with regulations by automating and optimizing the collection and measurement of large amounts of health data. It also helps facilitate regular monitoring of patients’ health status to prioritize treatment based on urgency and severity of illness.

In the stock market industry, real-time data helps traders and investors stay updated with the market. Real-time analytics on the data helps them make better predictions concerning the market’s direction. It is no longer realistic to analyze stock trading performance at the end of the day because the market condition can change at any point during the day.

These few examples are among the numerous benefits of real-time analysis on thousands of gigabytes of data that cannot be archived on a local computer. Instead, we use Distributed Systems to store and process the data. A distributed system is a separation of hardware components called nodes with access to their computational resources, linked together by a piece of the network, all working coherently to archive a single goal.

Distributed Systems allocate data to multiple machines, leveraging the power of numerous smaller machines to extend functionality while acting as a single computer to the end-user. These machines (computers) function synchronously. However, when faults arise, they tend to fail independently, having no effect on the system’s uptime, a feature of distributed systems known as fault tolerance.

Hadoop and MapReduce

With the creation of the open-source software framework Hadoop, it became possible to distribute massive data to multiple computers using the Hadoop distributed file system(HDFS). The programming model MapReduce aided in performing computation on these distributed data.

Over time, there have been a few challenges to using those technologies. MapReduce, which helped carry out computations, is batch-oriented and couldn’t carry out actual real-time processing. Additionally, it took too long to process and offer results depending on the quantity of data and the number of nodes inside the cluster. These challenges sparked Apache Spark’s creation, which can perform batch and real-time processing, graph use cases, and machine learning.

Apache Spark

Apache Spark, developed at UC Berkeley by Matei Zaharia in 2009, is an open-source data processing framework built to perform tasks quickly on very large-scale data with built-in components for SQL, graph processing, streaming, and machine learning. Spark performs operations up to 100 times faster than MapReduce, and this happens because Spark runs on memory which makes processing faster than utilizing disks as with MapReduce.

With Apache Spark, it is possible to perform large-scale data transformation and analysis tasks and immediately build machine learning models. These features set Apache Spark apart from other big data technologies. The core constituents which confer Apache Spark’s functionalities include:

  • Engine (Core).
  • APIs (Java, Python, R, Scala ).
  • Storage (HDFS, Cassandra, Parquet, JSON, Local, Hive, AWS S3, Blob).
  • Resource Manager(Yarn, Mesos, Docker, Kubernetes).
  • Spark Components (Streaming, Spark SQL, MLlib, Spark Core, Graph X).

The Resource Manager efficiently manages workloads between resources in any Spark cluster. The most widely used resource manager in Apache Spark is YARN. The two main components of the resource manager are the Master Node and Slave/Worker Node. Where the master node sends out tasks and slaves, send back results.

Throughout the rest of this post, we will use Spark and PySpark interchangeably. PySpark is the Python-support API for Apache Spark. We will use PySpark when building the machine learning model later in this post.

Spark MLlib

Spark MLlib comprises tools required for building machine learning models using Apache Spark. Just as regression, classification, and clustering algorithms are available in the popular machine learning library scikit-learn (sklearn), these same algorithms are also present in Spark.

Spark data: Preprocessing & Syntax

Real-world data mostly tend to be very dirty (erroneous, inaccurate, incomplete, inconsistent, outdated, and duplicate). A good machine learning model heavily depends on the type of data for training. It is essential to explore and clean data to attain good model accuracy. Data preprocessing is a crucial step in building machine learning models, and listed below are some data preprocessing steps and their syntax using PySpark.

Step 1: Loading Data

Before starting with any preprocessing step, we need to have data to work with.

Syntax:

# Start spark session

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('data_preprocessing').getOrCreate()
print ('Spark session created')

data = spark.read.csv(file_path, inferSchema=True, header=True)

# Display first 5 rows

data.show(5)

Step 2: Explore Data

Data exploration helps us focus efforts on the most relevant data in the dataset. We tend to discover patterns from the dataset and data exploration.

Syntax:

# Start spark session

# Display statistical summary of datadf.describe().show()

# Display correlation between two features

from pyspark.sql.functions import corr
spark_df2.select(corr('Open', 'Close')).show()

# Display features in ascending or descending order

df.orderBy('Sales').show()
df.orderBy(df['Sales'].desc()).show()

# perform groupby

df.groupBy('Company').sum().show()

# perform groupby and aggregation

from pyspark.sql.functions import max, min
spark_df2.groupBy('Date').agg(max('Open_price')).show()
spark_df2.groupBy('Date').agg(min('Open_price')).show()
spark_df2.groupBy('Date').agg(sum('Open_price')).show()

Step 3: Data Cleaning

Real-world data tends to have missing values or confusing feature names. With PySpark, we can solve these issues.

Syntax:

# Drop features containing null values

df = df.na.drop()

# Drop specific features

spark_df = spark_df.drop('AdjOpen', 'AdjHigh')

# Fill missing values

spark_df_missing = spark_df_missing.na.fill(0)

# Rename column name

spark_df = spark_df.withColumnRenamed('Symbol', 'SYMBOL')

# Add two columns to form single column

spark_df1 = spark_df1.withColumn('Sum', col('Open') + col('Close'))

# filter dataframe

spark_df.filter('Open > 168').select('Open', 'Close').show(5)

Step 4: Scaling and Variable Encoder

Preprocessing categorical and numerical variables by either scaling or encoding is essential as most machine learning models only accept numerical values and perform well when these numerical values are normalized.

Syntax:

# scale features

from pyspark.ml.feature import MinMaxScaler
min_max = MinMaxScaler(inputCol='Salary', outputCol='Salary_out')

# convert string to index

from pyspark.ml.feature import StringIndexer
qualification_indexer = StringIndexer(inputCol='qualification',
outputCol='qualificationIndex')

# Combine the list of columns into a vector

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Open'], outputCol='Open_Vect')

Spark Tools

The `spark.ml`library contains all the tools necessary for building machine learning models. They include:

  • Featurization
  • ML Algorithms
  • Pipelines
  • Persistence
  • Utilities

Featurization

Featurization comprises steps for feature engineering during machine learning projects. Feature engineering refers to using domain knowledge in selecting, extracting, and transforming raw data.

The Featurization tool in `spark.ml` library performs the following tasks;

  • Feature Extraction: Extracting features from raw data that can help improve the predictive power of a machine learning model
  • Feature Selection: Selecting a subset of features that will help improve the predictive power of the machine learning model from a larger dataset
  • Dimensionality Reduction: Dimensionality reduction reduces the number of features in a DataFrame under consideration. `spark.mllib` provides dimensionality reduction support in the `RowMatrix` class.
  • Transformation: Scaling, converting, or modifying features.

ML Algorithms

Spark ML algorithms are the core of Spark MLlib. They contain machine learning algorithms for supervised and unsupervised ML tasks.

Estimator

An Estimator is an object which fits models based on some training data. They are capable of inferring some properties on new data. An estimator could, for instance, be a classifier or a regressor. Estimators implement the `.fit()` method to a DataFrame to produce a model. The ML algorithm is the estimator which is fit to a DataFrame.

estimator.fit(x)
Transformer 

A transformer implements the `.transform()` method on an already fitted DataFrame to convert the DataFrame into a new one by appending a new column.

new_data = transformer.transform(data)

It is also possible to perform fitting and transforming together rather than separately:

new_data = transformer.fit_transform(data)
The popular ML algorithms in the MLlib library are:

Regression

# Syntax for importing and creating regression models

from pyspark.ml.regression import LinearRegression, \
DecisionTreeRegressor

# Decision Tree

dt = DecisionTreeRegressor(featuresCol='indexedFeatures')

# Linear Regression

Lin_reg = LinearRegression(featuresCol='indexedFeatures',
labelCol='indexedLabel', maxIter=10,
regparam=0.3, elasticNetParam=0.8)

Classification

# Syntax for importing and creating classification models

from pyspark.ml.Classification import LogisticRegression, \
RandomForestClassifier

# Random Forest

Random_clf = RandomClassifier(labelCol='indexLabel',
featuresCol='indexedFeatures',
numTrees=10)

# Logistic Regression

Log_reg = LogisticRegression(featuresCol='indexedFeatures',
labelCol='indexedLabel', maxIter=10,
regParam=0.3, elasticNetParam=0.8)

Clustering

# Syntax for importing and creating clustering models

from pyspark.ml.clustering import KMeans

# Train a K-means model

kmeans = KMeans().setK(5).setSeed(1)
Model = kmeans.fit(dataset)

Recommender System

# Syntax for importing and creating recommendation models

from pyspark.ml.recommendation import ALS

Als = ALS(maxIter=5, regParam=0.01, userCol='userId', itemCol='movieId'
, coldStartStrategy='drop')

Model = als.fit(dataset)

Basic Statistics

# Create correlations and hypothesis testing

from pyspark.ml.stat import Correlation
from pyspark.ml.stat import ChiSquareTest
df = Correlation.corr(df, 'features').head()
r = ChiSquareTest.test(df, 'features', 'label').head()

Pipelines

Pipelines chain a sequence of stages involving Transformers and estimators together. They offer the necessary tools for constructing, evaluating, and tuning ML Pipelines.

Persistence

Persistence helps in saving and loading models, algorithms, and pipelines for later use. Saving pipelines, algorithms and models helps compare various versions to see how previous models perform compared to recent models. The saved models, algorithms, or models can be loaded or reused at any time.

# import gradient boost regressor

from pyspark.ml.regression import GBTRegressor
from pyspark.ml.pipeline import PipelineModel
gbt = GBTRegressor(featuresCol='features',
labelCol='Time from Pickup to Arrival')

# fit model to dataframe and transform

gbt_model = gbt.fit(train_data)
prediction = gbt_model.transform(test_data)

# save and load model

path = 'gradient_boost_v1'
gbt_model.save(path)
print 'pipeline_Saved'
gbt_load = PipelineModel.load(path)

Utilities

Utilities are required for linear algebra, statistics, and data handling. `mllib.linalg` is MLlib utilities for linear algebra.

Running Spark On-Premise Vs. Cloud

Running Spark ML jobs on the cloud instead of on-premise has several benefits.

Cost-effectiveness

In selecting an effective analytics platform and provider, what it comes down to is a way to best store, manipulate and examine large quantities of records quickly, safely, effectively, especially at an affordable cost.

The conventional method of running on-premise Spark ML jobs tends to be steeply-priced, mainly because it requires vast numbers of servers, an enormous physical facility to store these servers, and substantial electrical strength to run them.

Additionally, running on-premise Spark ML tasks requires an on-site IT team to ensure that the whole servers run smoothly without interruption. Cloud platforms that support Spark services require no steeply-priced on-site hardware or infrastructure setup. Therefore, eliminating the need for companies to invest in heavy working and expensive machine learning systems that they might not always use.

These cloud providers only require payment for what you use without worrying about maintenance, and when not in use, the resources can be shut down. Spark ML is highly computation intensive; therefore, creating and maintaining physical infrastructure to run these tasks is costly. However, running Spark ML jobs on the cloud reduces cost by a high margin.

Ease of Scaling Up

As companies’ activities and data storage increase, the demand for a more generalized machine learning model also increases. Consequently, we need to add more hardware each time the demand arises, which can be time-consuming and costly.

Running Spark ML jobs using on-premise systems requires increasing hardware capacity every time a company trains on more data, hence challenging to scale up on demand. With cloud providers offering Spark services, there is total scalability as need and demand increase. The pay-per-use model makes it even easier for users as they only pay for what they use.

Increased Productivity

On-premise platforms set limitations regarding how easy and swiftly machine learning can be trained on a massive amount of data. With cloud platforms that support Spark services, machine learning models trained on large data can deliver results earlier, saving time and benefiting the company. One can achieve increased productivity as a result of faster and accurate ML model training.

Google Cloud Platform (GCP): DataProc

What is Dataproc?

Google Cloud Dataproc is a fully managed and scalable service for running Apache Spark and Hadoop jobs. We can use Dataproc for big data processing and machine learning. It is simple, fast, and cost-effective. It also supports open-source tools with virtual machines that scale up and down as required. It is tightly integrated with other Google cloud services.

Setting Up a GCP Account

When creating a Google Cloud account, you will need the following:

  • A Google account.
  • A valid credit card.

** GCP offers a 90days free trial and $300 credit. **

To create a GCP account, follow these steps:

  1. Open the Google Cloud Console.
  2. When prompted to sign in, sign in with an existing Google account or create a new account.

Creating a Dataproc Cluster

Before creating a cluster, it is essential first to create a bucket to store our data and script. The following steps describe how to create a bucket and the subsequent steps required to create a cluster.

  1. Click on CREATE PROJECT.

word image 47

  1. Name your project (named spark-ml here), then click on CREATE. word image 48
  2. Click on the navigation menu.
  3. Scroll down to Storage and select Cloud Storage.
  • Give your bucket/storage a unique name and leave other options as default.
  • Select done and return to the home page.

word image 49

  1. Click on the navigation menu to display the service options.
  2. Scroll down to the big data section, where you can find Dataproc and click on Clusters.

Note!!

Make sure the text above the horizontal red line is your project name.

word image 50

  1. Click on CREATE CLUSTER.

word image 51

  1. Set up cluster
  • Add cluster name
  • Location: US-central1 (default)
  • Zone: US-central1-a (default)
  • Cluster type: Standard, i.e., 1 master and N amount of workers (depending on workload).
  • Versioning: specify the version of Spark and Hadoop. It is okay to leave this as default.
  • Optional components: additional components to install while creating the cluster.

word image 52

9. Configure nodes

Using the correct configuration helps to run more optimized Spark jobs. However, everything is left as default except machine type for master and worker node for this tutorial:

  • Machine type(Master node): n1-standard-2 (2vCPU, 7.5GB memory).
  • Machine type(Worker node): n1-standard-2 (2vCPU, 7.5GB memory).

word image 53

10. Customize cluster

Under the Cloud Storage staging bucket, select the created cloud storage bucket.

word image 54

11. Manage Security

Check the Allow API access to all Google service options.

word image 55

12. Click on CREATE. Completion could take up to 4 minutes.

Amazon Elastic MapReduce(EMR)

What is Amazon Elastic MapReduce?

Amazon Elastic MapReduce (EMR) from Amazon Web Service (AWS) is a cost-effective service that employs Apache Spark, Apache Hadoop, and other open-source big data frameworks, making processing vast amounts of data easy. It is also possible to interact with data in other AWS data stores, such as S3 and Amazon DynamoDB. In creating a cluster, the script and data are stored in an S3 bucket. The following describes how to create a cluster and S3 bucket.

Setting Up an AWS Account

  1. Go to the AWS homepage.
  2. Click create an AWS account.
  3. Enter all account information correctly.
  4. Choose either personal or professional.
  5. Enter company or personal information.
  6. Click create an account and continue.
  7. Be sure to add payment methods and also verify phone numbers.

** AWS offers 12 months of free usage after your initial sign-up date. **

Creating an EMR cluster

To create an EMR cluster, it is necessary to create an S3 bucket and Amazon EC2 key pairs. S3 and EC2 key pairs are a storage service and a set of security credentials (that you use to prove your identity when connecting to an EC2 instance) respectively provided by AWS.

Steps:

  1. Navigate to the S3 service by either typing S3 in the search box or by selecting S3 from the Storage section of the All services option.

word image

  1. Click on create bucket.
  2. Enter a unique name (my unique name will be sendylogistic) for your bucket and leave other settings as default.

word image 56

  1. Navigate to EC2 by searching EC2 on the search box.
  2. Click on key pair and click create key pair.
  3. Enter the name of the key pair (for file format, `.pem` is a file format for both Mac and Linux users, `.ppk` is the file format for Windows users). In this guide, `.pem` is the file format.
  4. Once the key pair is created, `a .pem` file will be downloaded. This will be required later when running a Spark job.
  5. Navigate to EMR by searching EMR on the search box. Click create cluster once page loads.

word image 57

  1. To Create cluster
    1. Type in cluster name.
    2. Choose your created S3 folder.
    3. Software configuration: change application to Spark.
    4. Hardware configuration: m4.large.
    5. Security and access: Choose your created EC2 key pair.
    6. Click create cluster.

word image 58

Similarities: Amazon EMR & Google Cloud Dataproc

Resizing

Scalability is a great challenge as it is expensive and time-consuming to scale up clusters in an on-premise environment. Conditions might require a cluster to be scaled up to meet task requirements, and when there’s a need to run a minor task, the scaled-up cluster will be underutilized. Sadly, the entire cluster will be paid for even if they are underutilized.

Both EMR and Dataproc offer resizing of clusters at any time. It is possible to specify a particular size for the cluster during creation and resize to meet specific needs later.

Auto Scaling

It is difficult to estimate correctly the amount of resources a cluster needs when creating a Spark cluster. It is also highly probable that the cluster might take longer to process due to the overuse of its resources or that it might be underutilized, which can be pretty challenging.

Google Cloud Dataproc and Amazon EMR provide autoscaling to scale clusters should the need arise during a run. It is possible to define how the cluster from both cloud services should scale either in or out by specifying their scaling policies.

Ephemeral Clusters

Ephemeral clusters are clusters on cloud platforms created to run tasks and are terminated when these tasks are complete. These clusters don’t have to be up and running round the clock, and they can be created and terminated once processes are complete. Furthermore, users only have to pay for the period these clusters are active and only for resources used. Both Google Cloud Dataproc workflow template and Amazon Elastic MapReduce step execution offer termination of clusters after a complete run.

Hands-On Building, Submitting Spark Jobs On EMR & Dataproc

Dataset Description

Sendy offers a web and mobile app platform that matches customers who need packages delivered to available couriers. Sendy system optimizes routes, searches for the closest available couriers, and matches customers to these couriers to deliver packages. The dataset provided by Sendy includes order details and rider metrics based on orders made on the Sendy platform. We obtained data description from Zindi.africa. The link also contains the dataset used.

Task: Build a model that estimates the time of arrival for orders, from pick-up to drop-off.

Code Along and Description

Note: We are writing a PySpark script.

The data has been explored, cleaned, and model built already in a Jupyter Notebook. The code block will not include output, just the code input, and description. To follow along properly, clone this GitHub repo to get the script and data used.

1. We import necessary libraries for preprocessing and model building:

from pyspark.sql import SparkSession

import pyspark

from pyspark.sql.functions import hour, dayofweek

from pyspark.sql.functions import col

from pyspark.sql.functions import udf

from pyspark.sql.types import IntegerType, StringType, DoubleType

from pyspark.ml.feature import StringIndexer

from pyspark.ml import Pipeline

from pyspark.ml.linalg import Vectors

from pyspark.ml.feature import VectorAssembler

from pyspark.ml.regression import GBTRegressor

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.ml.evaluation import RegressionEvaluator# Math module to help with feature engineering

from math import sin, cos, atan, atan2, radians, sqrt, degrees
print ('Libraries imported')

2. We define a Spark session and read the train and rider data.

A Spark session is a unified entry point for manipulating data with Spark. Spark introduced this feature with Spark 2.0. It helps to interact with other Spark functionality.

spark = SparkSession.builder.appName('Sendy_logistics').getOrCreate()

print('Spark session created')

print('====loading data=====')

data = spark.read.csv('s3://sendylogistic/Train(1).csv',

inferSchema=True, header=True)

rider = spark.read.csv('s3://sendylogistic/Riders.csv',

inferSchema=True, header=True)

print('=====Data loaded=====')

The rider dataset helps to build a better model. So we merge with the original dataset. We then indicate columns to drop and drop them from the `data_merge` dataset.

# Merge rider data to both train and test

data_merge = data.join(rider, on=['Rider Id'], how='inner')
print 'Megered data and rider together'

# Drop unwanted data

cols_to_drop = [
'Vehicle Type',
'Order No',
'Arrival at Destination - Day of Month',
'Arrival at Destination - Weekday (Mo = 1)',
'Arrival at Destination - Time',
'Precipitation in millimeters',
'Temperature',
'Rider Id',
'User Id',
]
time_cols = ['Placement - Time', 'Confirmation - Time',
'Arrival at Pickup - Time', 'Pickup - Time']

data_merge = data_merge.drop(*cols_to_drop)

3.: Feature Engineering

Since we estimate the time from pickup to arrival, extracting additional time features and calculating several distances will help build an accurate model. We:

  • Extract hours from the time column.
  • Define a function for Haversine distance: The distance between two points on the surface of a sphere. The first coordinate is the latitude, and the second is the longitude.
  • Define a function for Manhattan distance: This is the sum of the horizontal and vertical distance between points on a grid.
  • Define a function for Bearing: This is the measurement of an angle between two points.
for col in time_cols:

data_merge = data_merge.withColumn(col + '_Hour', hour(col))

print ('Time columns extracted')# Calculate the haversine distance

def haversine_dist(lat1, lng1, lat2, lng2):
(lat1, lng1, lat2, lng2) = map(radians, (lat1, lng1, lat2, lng2))
AVG_EARTH_RADIUS = 6371 # in kilometers
lat = lat2 - lat1
lng = lng2 - lng1
d = sin(lat * 0.5) ** 2 + cos(lat1) * cos(lat2) * sin(lng * 0.5) \
** 2
h = 2 * AVG_EARTH_RADIUS * atan(sqrt(d))
return h

# Manhattan distance

def manhattan_dist(lat1, lng1, lat2, lng2):
a = haversine_dist(lat1, lng1, lat1, lng2)
b = haversine_dist(lat1, lng1, lat2, lng1)
return a + b

# Direction from the given coordinates

def bearing(lat1, lng1, lat2, lng2):
AVG_EARTH_RADIUS = 6371 # in km
lng_delta_rad = radians(lng2 - lng1)
(lat1, lng1, lat2, lng2) = map(radians, (lat1, lng1, lat2, lng2))
y = sin(lng_delta_rad) * cos(lat2)
x = cos(lat1) * sin(lat2) - sin(lat1) * cos(lat2) \
* cos(lng_delta_rad)
return degrees(atan2(y, x))

4.: Register Function

Next we register using PySpark’s UDF and call these functions on the DataFrame using the `withColumn()` function. Also, we dropped the original time columns since we have already extracted features.

# Register the created functions

HaversineUDF = udf(lambda a, b, c, d: haversine_dist(a, b, c, d),
StringType())
ManhattanUDF = udf(lambda a, b, c, d: manhattan_dist(a, b, c, d),
StringType())
BearingUDF = udf(lambda a, b, c, d: bearing(a, b, c, d),
StringType())

print 'Applying feature engineering to dataframe'

# Call the haversine function

data_merge = data_merge.withColumn('Haversine Distance',
HaversineUDF('Pickup Lat',
'Pickup Long', 'Destination Lat',
'Destination Long'
).cast(DoubleType()))

# Call the manhattan dist function

data_merge = data_merge.withColumn('Manhattan Distance',
ManhattanUDF('Pickup Lat',
'Pickup Long', 'Destination Lat',
'Destination Long'
).cast(DoubleType()))

# Call the bearing function

data_merge = data_merge.withColumn('Direction Distance',
BearingUDF('Pickup Lat',
'Pickup Long', 'Destination Lat',
'Destination Long'
).cast(DoubleType()))

cols = data_merge.columns
data_merge = data_merge.drop(*time_cols)

5.: Transformer and Estimator

  • Indexer: The indexer converts a string to int or double. We `fit()` and `transform()`.
  • Vector assembler: A vector assembler is a transformer that converts a series of columns into a single vector, as this is how Spark ML algorithms work with data.
  • GBTRegressor: We will build our model using the Gradient Boost Regressor.
# String indexer to convert cat column to double

cols_index = ['Personal or Business']

Indexers = [StringIndexer(inputCol=column, outputCol=column + '_index'
).fit(data_merge) for column in cols_index]
pipeline_idx = Pipeline(stages=indexers)
final_df = pipeline_idx.fit(data_merge).transform(data_merge)
final_df = final_df.drop(*cols_index)

print ('========String indexer applied========')

cols_to_vector = [cols for cols in final_df.columns if cols
!= 'Time from Pickup to Arrival']

assembler = VectorAssembler(inputCols=cols_to_vector,
outputCol='features')
final_df = assembler.transform(final_df)
print ('==================Vector assembler Done=======================')

(train_data, test_data) = final_df.randomSplit([0.7, 0.3])
print ('Data frame split into train and test')

gbt = GBTRegressor(featuresCol='features',
labelCol='Time from Pickup to Arrival')

6.:Cross Validation

Define parameters for cross-validation and evaluation metrics:

# Define parameters

paramGrid = ParamGridBuilder()\
.addGrid(gbt.maxDepth, [2, 5])\
.addGrid(gbt.maxIter, [10, 50])\
.build()

#Next define the evaluation metric.

evaluator = RegressionEvaluator(metricName="rmse",
labelCol=gbt.getLabelCol(),
predictionCol=gbt.getPredictionCol())

cv = CrossValidator(estimator=gbt, evaluator=evaluator,
estimatorParamMaps=paramGrid)

7.: Pipeline

To chain multiple steps, we use the pipeline. Here we have just one step, but this makes it easy to add additional steps by adding them as part of the stages.

pipeline = Pipeline(stages=[cv])

print("===========Fitting data pipeline===========")

pipeline_model = pipeline.fit(train_data)

print("========Model trained=========")predictions = pipeline_model.transform(test_data)

rmse = evaluator.evaluate(predictions)

print("RMSE : {}".format(rmse))

8.: Persistence

After building the model, we save the model to our cloud buckets to help compare several model versions we might build later.

pipeline_model.save("s3://sendylogistic/Saved_pipeline/v1")

print("pipeline saved")

Submitting Spark jobs on Amazon EMR

Since we already created our S3 bucket, EC2 key pair, and EMR cluster from the description above. Let’s upload files to our S3 bucket. The files include a Python script and data.

From the AWS homepage:

Navigate to the S3 service by either typing S3 in the search box or selecting S3 from the storage section in the All service option:

  • select the created bucket,
  • click create a folder to create a folder that stores our pipeline,
  • give the folder a name (here we use “Saved_pipeline”) and create, then
  • go back to the S3 bucket and click upload to
  • upload the Python script and data from your local computer.

Your S3 bucket should look like this:

word image 59

Next, navigate back to the created EMR cluster. Here we’ll update the security settings to access the cluster from our local machine via SSH.

  • Under the security and access section, click the Security groups for Master.

word image 60

  • Select the Master option.

word image 61

  • Click edit inbound rules and click add rules button below the page.

word image 62

  • In the type option, select SSH. In the source option, select anywhere (this option allows us SSH to the master node provided we have a private key).

word image 63

  • Click save rules.

Navigate back to the created EMR cluster. Connect to the master node by clicking on the highlighted link. ‘

word image 64

We will select the Linux option. The steps are pretty explanatory. If you use windows, select the Windows option and follow the steps.

word image 65

The steps state that to create a connection, we have to:

  1. Open a terminal on a Linux machine, copy and paste the command `chmod ~/keyfile.pem` to the location of our `.pem` file. Remember, this file was downloaded after creating the key pair.

Steps:

  • Open terminal on your Linux machine, and
  • Type `chmod +x key.pem`, ` key.pem` here means the location of your key pair. Since ours is stored in the home directory, we will type `chmod +x ~/sendy.pem`.
  • Next, add your SSH command. An output showing EMR like in the image below means that a successful connection is established.

word image 66

  • Run `aws s3 cp s3://folder/file` command to copy the script from our S3 bucket. Ours will be:

`Aws s3 cp s3://sendylogistic/sendy.py`

  • List the file in your directory to be sure they are present by typing the `ls` command.

word image 67

We have created a successful connection and also copied our sendy script.

Now to submit the Spark job, we run the `spark-submit filename.py` command.

Ours will be: `spark.submit sendy.py`

  • The process runs for approximately 15mins, depending on your data and hardware configuration.

word image 68

  • Should it runs successfully, the terminal output will look like this:

word image 69

After that, navigate to your S3 bucket to ensure your pipeline v1 is present after the successful job.

word image 70

To make predictions with this model, we simply load this model from our s3 bucket and perform predictions on our new data like this.

# Since we fit the pipeline to a dataframe, we import PipelineModel

from pyspark.ml import PipelineModel
Path = 's3://sendylogistic/SavedPipeline/v1'
LoadedPipeline = PipelineModel.load(path)

# predict on new data

New_prediction = LoadedPipeline.transform(test_data)

Submitting Spark jobs on Google Cloud Dataproc

Since we already created our cloud storage, let’s populate it with our data and Python script.

To begin with, we navigate to cloud storage and click on the created bucket. From there, we upload the necessary files.

Your result should look like this:

word image 71

Navigate back to Dataproc and click on the already created cluster name.

  • Click on submit job
  • Leave job_id name as default or use any name
  • Change job type to PySpark
  • The main Python file should be the path to your Python script on cloud storage. We use `gs://sendylogistic/sendy.py`
  • Click on submit.

word image 73

This should be what your display looks like after a successful run. Make sure the status reads Succeeded.

word image 75

Finally, navigate back to your cloud storage to make sure your pipeline version is present.

word image 77

To make predictions with this model, we simply load this model from our cloud storage bucket and perform predictions on our new data like this.

# Since we fit the pipeline to a dataframe, we import PipelineModel

from pyspark.ml import PipelineModel

Path = 'gs://sendylogistic/SavedPipeline/v1'
LoadedPipeline = PipelineModel.load(path)

# predict on new data

New_prediction = LoadedPipeline.transform(test_data)

Conclusion

Running Spark ML jobs on cloud platforms rather than on-premises platforms have many benefits, including from cost-effectiveness to speed to auto-scaling and so on. We have discussed two leading cloud services, Google cloud service (Dataproc) and Amazon Web service (Elastic MapReduce), and how we can use their Spark service to run our big data ML jobs without worrying about infrastructure.

References

https://spark.apache.org/docs/latest/ml-guide.html

https://cloud.google.com/docs

https://docs.aws.amazon.com/https://scikit-learn.org/stable/developers/develop.html

https://github.com/Nelsonchris1/Running-pyspark-mllib-on-AWS-and-GCP

Oct 13th 2021
read

Share this post

Try Layer for free

Get started with Layers Beta

Start Free