Serving ML models with Apache Spark

Processing large datasets accompany the difficulties of restrictions set by technologies and programming languages. An impactful step is being aware of distributed processing technologies and their supporting libraries. This article is fundamental for machine learning engineers and data scientists hoping to utilize the data processing, MLlib, and model serving capabilities of Apache Spark. What is […]
Oct 13th 2021
read

Share this post

Oct 13th 2021
read

Share this post

Serving ML models with Apache Spark

Pınar Ersoy

Lead Data Scientist

Processing large datasets accompany the difficulties of restrictions set by technologies and programming languages. An impactful step is being aware of distributed processing technologies and their supporting libraries. This article is fundamental for machine learning engineers and data scientists hoping to utilize the data processing, MLlib, and model serving capabilities of Apache Spark.

What is Apache Spark?

Apache Spark is a system that provides a cluster-based distributed computing environment with the help of its broad packages, including:

  • SQL querying,
  • streaming data processing, and
  • machine learning.

Apache Spark supports Python, Scala, Java, and R programming languages.

Apache Spark serves in-memory computing environments. The platform supports a running job to perform 100 times higher speed in memory and ten times performance on disk, according to the book “Cloud Computing Technologies for Green Enterprises” which is written by Kashif Munir.

What is PySpark?

Originally, Apache Spark was implemented in the Scala language. Since most machine learning libraries and higher-level data processing packages are scripted in Python, demand for integration with Spark was obvious. To cope with this demand, a Python API was developed for Spark. It was named PySpark. It is established with the help of a Python interpreter called Py4J that synchronizes connections to the Java Virtual Machine (JVM).

How does Spark work?

Apache Spark enables the horizontal processing of jobs. It advances this task by allowing the usage of in-memory property with an enhanced SQL proficiency. Spark has capabilities including but not limited to:

  • operate numerous distributed scripts,
  • enable data processing,
  • generate data workflows, and
  • conduct analytical methods with MLlib functions

word image 223

Fig.1. Spark Workflow

Spark components

The Spark project comprises various sorts of firmly incorporated segments. In the center, Spark contains a calculation-performing mechanism that can plan, parallelize and screen numerous applications. Using all the Spark components simultaneously is not compulsory. According to the existing case and requirements, some of them may be utilized with Spark Core. However, Spark Core usage is obligatory since it is the kernel of the Spark architecture.

Spark core

Spark Core is the center of the common implementation component that supports the rest of the range of capabilities in the platform.

Spark streaming

Spark Streaming allows Spark to work with online streaming data consumed from various systems, including HDFS, S3, Kafka, Flume, etc. and outputs to different database systems.

word image 225

Fig.2. Spark Streaming Architecture

Spark SQL

Spark SQL is the main module that extracts structured data with its querying capabilities. A diverse range of data formats can be read using Spark SQL. These include Parquet, JSON, Avro, and more. What‘s more, it allows User Defined Function generation and HiveQL usage.

word image 226

Fig.3. Spark SQL Enablement

GraphX

GraphX can be represented as the Spark for graph-like database systems with a parallel distributed execution. Abstractly, it is composed of vertices and edges. GraphX associates graph computations inside its system.

word image 227

Fig.0. GraphX Architecture

MLlib (Machine learning)

MLlib is the core machine learning library for Spark. It empowers the distributed approach to record and process data. It is composed of various algorithms, including regression, decision trees, k-means clustering, etc.

Spark architecture

Spark architecture is composed of driver and worker nodes. These nodes are connected with the help of a cluster manager.

word image 228

Fig.1. Spark Cluster Mode Architecture (Created by Author)

Driver node

The driver node is the master node responsible for executing the `main()` method. Its primary purpose is to create the required Spark session successfully.

Cluster manager

The cluster manager acts as the structure which distributes resources among the requested jobs. You can select Hadoop Yarn, Mesos, or Kubernetes as a cluster manager.

Worker node

The worker node covers the task of processing related code blocks. Inside it, the executor maintains the data in the memory after the execution of the scheduled job. The smallest unit in the executor architecture is called a task.

Why use Apache Spark

There exist countless reasons to select Spark. The most important ones are its ease of use, rapidness, and support.

Ease of use

Spark’s abilities are open through numerous APIs. They are all planned explicitly for communicating rapidly and effectively with information at scale. With its easy-to-understand structure, users can quickly produce results with Spark in a short time.

Rapidity

Spark is intended for fast performance. It works both in-memory and local storage. The execution speed of Spark has a significant difference in comparison with Hadoop’s MapReduce up to a hundred times.

Support

Spark supports multiple programming languages, including Python, R, Java, and Scala. It incorporates support for various memory applications in the Hadoop environment. Besides, the Apache Spark developer community is huge, dynamic, and worldwide. Business suppliers of Hadoop also offer extensive service for Spark applications.

Spark installation

Spark can be installed in separate ways depending on the platform. In this section, let’s introduce two different installation options:

  • setting it up on Google Colab, and
  • installing it on your local machine.

Setting up Spark on Google Colab

Google Colab is an environment where users effectively implement their Python scripts in their browsers. For Spark with Python to be executed on Google Colab, you have to install the appropriate Spark, Hadoop, and Java versions. Installing Spark on Google Colab can be done as shown below:

!apt-get install openjdk-8-jdk-headless -qq > /dev/null 
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz 
!tar xf spark-2.4.8-bin-hadoop2.7.tgz

After successfully installing the corresponding versions on Google Colab, you can set up the environment variables for Spark and Java.

import os 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" 
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"

The `findspark` helps to find the previously installed PySpark version. Then, it enables PySpark to be importable as a library with the help of `findspark.init()`.

import findspark 
findspark.init()

Installing Apache Spark on local machine

Apache Spark can run in any environment where Python, Scala, or Java are installed. This article will focus on the Python language. The easiest way to install required Python packages and Jupyter Notebook in a compact and fast way is to use Anaconda.

Install Spark using the command below on the Anaconda Prompt:

conda install pyspark

Apache Spark basics

Spark supports the Resilient Distributed Datasets (RDD) structure. An external data source can be read using this structure. Passing methods to Spark is possible using the RDD. These functions can be applied to an existing dataset or on a new dataset.

You’ll learn more about the RDD structure, Spark Transformations, and Actions in the upcoming sections.

Resilient Distributed Datasets (RDDs)

Resilient Distributed Dataset is the essential client-facing programming interface in Spark for in-memory calculations. It is a combination of data components.

word image 229

Fig.4. RDD Architecture

RDD creation

You need to create a Spark session before you can create an RDD. This is done with the help of the `SparkContext`. The `SparkConf` is used to set other Spark configurations.

from pyspark import SparkContext, SparkConf

The next step is to define our desired Spark configuration. In this case, we’ll use a local cluster since we are working on a local machine. You also have the option to set up Spark in cluster mode.

Specifying `local[*]` means that Spark will use all the cores in the local machine. This is usually the default setting in stand-alone mode.

spark_configurations = (SparkConf().setMaster("local[*]").\

setAppName("firstSparkSession").\

set("spark.executor.memory", "2g"))

With this configuration at hand, we can create the Spark session.

spark_context = SparkContext(conf = spark_configurations)
word image 230

There are some built-in functions for viewing spark configurations.

The Spark version can be retrieved using the `version` attribute.

spark_context.version
word image 231

The Python version can be displayed using the `pythonVer` attribute.

spark_context.pythonVer
word image 232

To view the number of cores that are assigned for the stand-alone mode, you can add a `master` parameter to the spark session variable. In the example below, the name of the Spark session is called `spark_context.`

spark_context = SparkContext(conf = spark_configurations)

spark_context.master

word image 233

Every Spark session has a unique name. The `setAppName` attribute can be used to set the name of the session.

spark_configurations = (SparkConf().setAppName("firstSparkSession"))


spark_context = SparkContext(conf = spark_configurations)

After assigning the application name, it is possible to view it with the `appName` attribute.

spark_context.appName
word image 234

Spark creates a unique application id for every session. The id can be retrieved using the `applicationId` attribute.

spark_context.applicationId
word image 235

Spark distributes jobs in every Spark session through parallelism. You can set this manually or use the default options.

Default settings can be viewed using the `defaultParallelism` attribute.

spark_context.defaultParallelism
word image 236

You can set the default parallelism in the configuration phase of the Spark Context. This is done by using the `spark.default.parallelism` parameter.

spark_context.setConf("spark.default.parallelism", "50")

Additionally, Spark enables a different number of partitions to be assigned to the jobs. The desired number of partitions can be set by adding the number in the `spark.default.partitions` configuration parameter. In the below example, `50` is the number of determined partitions.

spark_context.setConf("spark.default.partitions", "50")

To print the default setting for the minimal count of partitions for the RDD, use the `defaultMinPartitions` attribute.

spark_context.defaultMinPartitions
word image 237

RDD operations

In Spark, RDD operations are composed of `Transformations` and `Actions`. `Transformations` are the operations that can create a non-existent RDD by using an old one.

Spark Transformations

Several Spark transformations are available in an active Spark Session. In this section, let’s introduce the most common.

Map

The `Map` method returns a new distributed dataset resulting from passing each element through a function. In the following example, the `collect` operation is responsible for retrieving all the items in the existing RDD.

items = spark_context.parallelize ([4,13,13,28,36,47,56]) 
mapped_list = items.map(lambda x: x+2).collect()


print ("Printing mapped items for map operation of RDD: ", (mapped_list))

word image 238

FlatMap

The flatMap method operates by performing a computation on each item of the RDD followed by a flattening operation.

items = spark_context.parallelize ([2,4,13]) 
items.flatMap(lambda x: range(1, x)).collect()

word image 239

MapPartitions

With the help of `mapPartitions`, a method can be applied to every partition of the specified RDD.

partitioned = spark_context.parallelize ([4,13,13,28,36,47,56], 2)


def mapPartitionFunc(ind): yield sum(ind) 
partitioned.mapPartitions(mapPartitionFunc).collect()

word image 240

MapPartitionsByIndex

The `mapPartitionsWithIndex` method enables a function to be executed on every partition of the RDD by not losing the index of the core partition.

partitioned = spark_context.parallelize ([4,13,13,28,36,47,56], 4)


def mapPartitionByIndexFunc(indSlicer, ind): yield indSlicer 
partitioned.mapPartitionsWithIndex(mapPartitionByIndexFunc).sum()

word image 241

Filter

The filter method returns a new dataset after selecting the items that return `true` on a certain condition.

items = spark_context.parallelize ([4,13,13,28,36,47,56]) 


filtered_list = items.filter(lambda x: x % 2 == 0).collect()


print ("Printing filtered list items for filter operation of RDD: ", (filtered_list))

word image 242

Sample

Sampling can be used in any stage of data processing. For RDD datasets, sampling may be applied by assigning a percentage value in the `sample` function. When requesting the same subset, a seed id can be added to the method.

sampling_items = spark_context.parallelize(range(20), 4) 
sampling_items.sample(True, 0.3, 1234).collect()

word image 243

Join

RDD datasets can be joined on a pair of matching keys using the `join` method.

list1 = spark_context.parallelize([("k", 98), ("m", 65)]) 
list2 = spark_context.parallelize([("k", 120), ("k", 43)]) 
sorted(list1.join(list2).collect())

word image 244

Union

The `union` operation helps to unite the specified RDDs. It adds one followed by another. This operation does not search for a matching key between them.

union_items = spark_context.parallelize(range(5), 2) 
union_items.union(union_items).collect()

word image 245

Intersection

The `intersection` method is responsible for finding the intersecting group of elements in RDD datasets.

group1 = spark_context.parallelize([2, 10, 17, 3, 14, 5]) 
group2 = spark_context.parallelize([2, 8, 5, 34, 42, 14])


group1.intersection(group2).collect()

word image 246

Distinct

The `distinct` function is used to obtain a unique group of elements from an RDD.

items = spark_context.parallelize ([4, 13, 13, 28, 36, 47, 56]) 
unique_element_list = items.distinct().collect() 


print ("Printing distinct items for distinct operation of RDD: ", (unique_element_list))

word image 247

GroupByKey

The usage of the `groupByKey` function requires grouping the elements for every key in one line. After this operation, the output of the RDD will have the hashed partitions.

groupedKeys = spark_context.parallelize([("first_num", 300),

("second_num", 500), ("third_num", 900)])


print(sorted(groupedKeys.groupByKey().mapValues(len).collect())) 
print(sorted(groupedKeys.groupByKey().mapValues(list).collect()))

word image 248

ReduceByKey

The `reduceByKey` method performs a merging operation on the values of the RDD elements.

from operator import sub 
reducedKeys = spark_context.parallelize([("first_num", 300),

("second_num", 500),

("third_num", 900),

("second_num", 500)]) 


print(sorted(reducedKeys.reduceByKey(sub).collect()))

word image 249

AggregateByKey

Two separate RDDs are required in a structure of common keys to perform an aggregation operation. First, aggregation for every item is actualized. After this step, the operation is applied to the outputs.

item_group1 = spark_context.parallelize([('first',5),('first',3),('second',3)]) 


item_group2 = spark_context.parallelize(range(20)) 

firstGroup = (lambda x,y: (x[0]+y,x[1]+1)) 
aggregatedGroup = (lambda x,y:(x[0]+y[0],x[1]+y[1])) 

print(item_group2.aggregate((0,0),firstGroup,aggregatedGroup)) 
print(item_group1.aggregateByKey((0,0),firstGroup,aggregatedGroup))

word image 250

SortByKey

The `sortByKey` method is responsible for sequencing the element pairs in an ascending way.

item_list = [('first', 7), ('second', 9),

('third', 11), ('fourth', 34), ('fifth', 58)]


spark_context.parallelize(item_list).sortByKey().first()

word image 251

Spark Actions

Let’s now take a look at some Spark Actions.

Collect

The `collect` function returns all the elements of the dataset as an array.

items = spark_context.parallelize ([4,13,13,28,36,47,56]) 
number_list = items.collect() 


print ("Printing elements for collect: %s" % (number_list))

word image 252

First

The `first` method is used to get the first item from an RDD.

items = spark_context.parallelize ([4,13,13,28,36,47,56]) 
first_element = items.first() 


print ("Printing first element with first operation of RDD: %s" % (first_element))

word image 253

Take

The `take(n)` method returns the first n elements of the dataset.

items = spark_context.parallelize ([4,13,13,28,36,47,56]) 
take_element = items.take(3)

print ("Printing specified number of elements with take operation of RDD: %s" % (take_element))

word image 254

Take Sample

The `takeSample` method returns a specified length of the RDD. In this method, the first parameter is `withReplacement`. It indicates whether there is a need to replace the new results with the old ones. If yes, then set it to `True`, otherwise set it to `False`.

The second parameter is the number to be sampled. The third parameter is the `seed` number. When set to any number, it is the identifier ID for this specific sample. Anytime you run this sampling function with the same ID inside the current Spark session, it returns the same subset of samples.

items = spark_context.parallelize ([5,13,13,28,36,47,56]) 
items.takeSample(True, 5, 1)

word image 255

TakeOrdered

The `takeOrdered` function takes the determined number of items out of the RDD ordered in an ascending manner.

items = spark_context.parallelize ([44,131,836,147,56]).takeOrdered(6)

print (items)

word image 256

Count

The `count` function returns the number of elements regardless of duplicate or non-duplicate records that are found in the RDD.

element_count = items.count()

print ("Printing number of instances for count operation of RDD: %i" % (element_count))

word image 257

CountByKey

The `countByKey` function differs from the `count` function by counting the items by their corresponding keys.

countKey = spark_context.parallelize([("first_num", 300), ("second_num", 500), ("third_num", 900), ("second_num", 500), ]) 
sorted(countKey.countByKey().items())

word image 258

SaveAsTextFile

RDD datasets can be saved in a text format with the help of the `saveasTextFile` function.

items = spark_context.parallelize ([4,13,13,28,36,47,56])

saved_list = items.saveAsTextFile("items.txt")

RDD persistence

The main advantage of Spark is the ability to keep a dataset in memory across partitions. Persistence is achieved through caching. Partitions are processed in memory to enable storing the RDD in the cache. After that, they can be reused in different operations on that dataset. This allows future actions to be a lot quicker.

An RDD can be registered in an activity for the first time. After that, it will be kept in memory in partitions. However, these RDD partitions have the risk of being lost. Spark can recompute changes that were initially made.

By caching the RDDs, users continue to keep the dataset on the hard drive and reuse it across other jobs. Some RDDs can be used several times. Using the `persist` operation on those RDDs can be advantageous.

word image 259

Fig.5. Memory Storage Level

With the storage level of the `persist` method, RDD partitions will be cached to memory and disk only once when `MEMORY_AND_DISK` is selected.

item_list = spark_context.parallelize([('first',5), ('first',3), ('second',3)])


item_list.persist(pyspark.StorageLevel.MEMORY_AND_DISK )

item_list.getStorageLevel()

print(item_list.getStorageLevel())

word image 260

On the other hand, when `MEMORY_AND_DISK_2` is selected, RDD partitions will have two replications both on memory and disk.

item_list = spark_context.parallelize([('first',5), ('first',3), ('second',3)])


item_list.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2 )

item_list.getStorageLevel()

print(item_list.getStorageLevel())

word image 261

Spark session creation with PySpark

A Spark session is required to run PySpark functions. First, the required libraries have to be imported.

from pyspark.sql import SparkSession
from pyspark.context import SparkContext

After loading the related libraries, a spark session can be initiated by simply adding an `appName` and a `getOrCreate` function. If any additional config requirement exists, such as the memory size of the executor, then a `config` parameter can be included in the Spark session building block.

spark = SparkSession.builder.\
appName("FirstSparkApplication").\
config ("spark.executor.memory", "8g").\
getOrCreate()

Creating Spark Dataframes by reading different Data formats

After creating the session, data can be read and loaded into supporting data structures as DataFrames. DataFrames can be described as the collection of column-based table formats.

Below, different formatted files are being read using the corresponding file name functions.

json_df = spark.read.json("dataset.json")
text_df = spark.read.text("dataset.txt")
csv_df = spark.read.csv("dataset.csv")
parquet_df = spark.read.parquet("dataset.parquet")

Machine learning in Spark

Spark contains a separate library called MLlib that supports several machine learning algorithms. The core fields that the MLlib enhances are:

  • machine learning computations,
  • featurization,
  • generating pipeline structures,
  • persistence

Let’s discuss the steps for implementing a machine learning model using Spark.

Data preparation

Throughout this article, a very well-known dataset, the “Titanic,” will be used.

As the first step, we’ll read in the dataset with the help of the Spark session.

The dataset has its format with an essential requirement of the usage of `csv` function. It is specified as a parameter in the `spark.read.format()`.

training_dataset = spark.read.format("csv").\

option("inferSchema", True). option("header", "true").\

load('dataset/titanic_train.csv')

test_dataset = spark.read.format("csv").\

option("inferSchema", True).option("header", "true").\

load('dataset/titanic_test.csv')

As the initial analysis step, let’s display the column names. This can be done in three different ways by using PySpark.

The first way is to use the `show` method while passing the number of rows you’d like to display as the argument.

training_dataset.show(5)

word image 262

Fig.4. Output of `show(5)` function

The second way is to use the `show()` method without passing any arguments. This action will output 20 rows. Its default format contains truncated column content. Truncated columns can be observed for the `Name` column since it exceeds the default length.

training_dataset.show()
word image 263

Fig.5. Output of `show()` function

The full column content can be viewed by setting `truncate = False` in the `show` method. Also, the default horizontal display can be changed by adding `vertical = True` in the `show()` function.

training_dataset.show(2, truncate=False, vertical=True)
word image 264

Fig.6. Output of `show(truncate=False, vertical=True)` function

Data preprocessing with Spark

After viewing the column names and their types, it is crucial to check if the dataset includes any `null` or `nan` values. We have to fill them before the modeling step.

Let’s display the null and non-null columns below.

from pyspark.sql.functions import * 
print ("NaN values\n") 
training_dataset.select([count(when(isnan(item), item)).alias(item) for item in training_dataset.columns]).show(5) 

print ("Null values\n") 
training_dataset.select([count(when(col(item).isNull(), item)).alias(item) for item in training_dataset.columns]).show(5) 

print ("Not Null values\n") 
training_dataset.select([count(when(col(item).isNotNull(), item)).alias(item) for item in training_dataset.columns]).show(5)

word image 265

Fig 7. Output of filled and unfilled values

Some column names can be renamed using the `withColumnRenamed` function. With this approach, multiple columns can be renamed by simply adding them one by one with a dot separator. The function’s first parameter is the original value, and the second parameter is the new column name.

print("Renaming Column Name")


training_dataset = training_dataset.\ 
withColumnRenamed("Pclass","PassengerClasses").\ 
withColumnRenamed("Sex","Gender") 
training_dataset

word image 266

Fig 8. Output of `withColumnRenamed` function

The `groupBy` SQL operation can be applied on a single column with a `count` operation. Additionally, multiple values can be added inside the function for multiple grouping actions.

The `sort()` function can also be added at the end of the `count()` function. By observing the counts by classes on the output, we can see the `Survived` count as highest in the third passenger class on both genders.

print("Counting the number of Passenger per Classes")


training_dataset.groupBy("PassengerClasses").\

count().\

sort("PassengerClasses").show() 


print("Counting the number of Survivals by Classes") 
training_dataset.groupBy("PassengerClasses", 
"Gender", 
"Survived").count().sort("PassengerClasses", 
"Gender", 
"Survived").show()

word image 267

Fig 9. Output of `groupBy` operation

Feature engineering with PySpark

With the help of feature engineering, more insightful information can be extracted from the existing variables in the dataset.

There’s a `Name` column in the titanic dataset that also includes the person’s title. This information might be beneficial in the model. So let’s generate it as a new variable. A new title column can be created using the `withColumn` operation.

training_dataset = training_dataset.withColumn("Title", regexp_extract(col("Name"),"([A-Za-z]+)\.", 1))

training_dataset.select("Name","Title").show(10)

word image 268

Fig 10. Output of `withColumn`title extraction operation

A new column is produced with the `Title` name. Listing each title by its count can show us that some titles are seen only once.

training_dataset.groupBy("Title").count().show()
word image 269

Fig 11. Output of `groupBy` operation

There are some duplicated titles that are in different formats. Some of them can be replaced. For this purpose, the `replace` function can be applied.

feature_df = training_dataset.\ 
replace(["Mme", 
"Mlle","Ms", 
"Major","Dr", "Capt","Col","Rev", 
"Lady","Dona", "the Countess","Countess", "Don", "Sir", "Jonkheer","Master"], 
["Mrs", 
"Miss", "Miss", 
"Ranked","Ranked","Ranked","Ranked","Ranked", 
"Royalty","Royalty","Royalty","Royalty","Royalty", "Royalty", "Royalty","Royalty"]) 

feature_df.groupBy("Title").count().sort(desc("count")).show()

word image 270

Fig 12. Output of `groupBy` operation for `Title` column

After the replacement operation, the distribution of the titles seems more accurate than before.

Building the machine learning model with PySpark MLlib

Before the model implementation phase, the types of variables should be inspected. Since the prediction algorithms request numerical formatted variables, string-formatted columns may cause errors.

PySpark’s `dtypes` function can be used to print the types of the variables.

feature_df.dtypes
word image 271

Fig 13. Output of `dtypes` operation for the dataframe

After printing the types of the variables, it can be observed that `Gender`,`Embarked`,`Title` columns have a string format. These columns need to be converted to a numerical form.

A specialized PySpark function called `StringIndexer` fits and transforms variables into numeric types. Let’s implement it below.

from pyspark.ml.feature import StringIndexer


parchIndexer = StringIndexer(inputCol="Parch", outputCol="Parch_Ind").fit(df)

sibspIndexer = StringIndexer(inputCol="SibSp", outputCol="SibSp_Ind").fit(df)


passangerIndexer = StringIndexer(inputCol="PassengerClasses", outputCol="PassengerClasses_Ind").fit(df)


survivedIndexer = StringIndexer(inputCol="Survived", outputCol="Survived_Ind").fit(df)

After the indexing and dropping of old string-formatted operations, the DataFrame has all numerical variables. Since all the columns have non-string format, we can generate a feature vector using the columns in the DataFrame. The `VectorAssembler` can be applied to transform the `features` vector column.

from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler( 
inputCols = ["PassengerClasses","SibSp","Parch"], 
outputCol = "features")

The next step after creating the feature vector is to split the data into train and test sets. You can use the `randomSplit` function to achieve this.

(train, test) = df.randomSplit([0.8, 0.2], seed = 345)

Before applying the prediction algorithm, a classifier and a pipeline generation phase needs to be implemented.

Let’s define these steps together. First, let’s select a classifier from the MLlib library built-in PySpark functions. After adding the import statements, the classifier can be created by assigning the `labelCol` and `featuresCol` columns.

from pyspark.ml.classification import DecisionTreeClassifier 


classifier = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")

word image 272

In this step, a pipeline is created by adding parameters to `stages` accordingly.

from pyspark.ml import Pipeline


pipeline = Pipeline(stages=[assembler, model_identifier])

word image 273

When the pipeline is established, parameters of the classifier can be optimized with the help of `ParamGridBuilder`.

Appropriate parameters will be produced after the grid search.

from pyspark.ml.tuning import ParamGridBuilder 

paramGrid = ParamGridBuilder() \ 
.addGrid(model_identifier.maxDepth, [10,20]) \ 
.addGrid(model_identifier.maxBins, [50, 100]) \ 
.build()

word image 274

For this purpose, the corresponding `label`, `features`, and `metric` columns are filled.

tvs = TrainValidationSplit( 
estimator=pipeline, 
estimatorParamMaps=paramGrid, 
evaluator=MulticlassClassificationEvaluator(labelCol="Survived",

predictionCol="prediction", metricName="weightedPrecision"), 
trainRatio=0.8)

After the `TrainValidationSplit` stage is completed, we are ready to fit the model.

model = tvs.fit(train)

Model evaluation

As a model evaluation method, the metric `accuracy` can be applied. The mathematical formula of `accuracy` is as follows.

word image 275

With the line of code below, we can obtain the accuracy metrics by each parameter.

list(zip(model.validationMetrics, model.getEstimatorParamMaps()))
word image 276

Serving Apache Spark machine learning models

Machine learning models generated using PySpark can be served using MLflow. In the following sections, the installation of the MLflow package will be explained. In addition, the model serving methodology will be presented with some sample scripts added at the end of the concept descriptions.

Installing MLflow for Spark Model Serving

MLflow can be used as the model serving library for PySpark models. Installation of the library is required to use MLflow in the Spark session. For PySpark, the package can be installed by using the following command.

pip install mlflow

Import `spark` after installing MLflow.

from mlflow import spark

Serving Spark Model with MLFlow

Execute the `start_run()` function after importing MLflow to activate MLflow in a Spark session.

import mlflow

from mlflow import spark 
with mlflow.start_run():

mlflow.spark.log_model(model, "sparkML-model")

After executing the `log_model` operation the MLflow, model `artifacts`, `metrics`, `params`, and `tags` folders will be created.

word image 277

In the `artifacts` folder, you’ll find the spark ML-model folder. In the `sparkML-model`, there are `metadata` and `stages` folders. `Stages` records the lifecycle of the model. There can be a single stage or multiple stages. On the other hand, `metadata` represents the set of data that describes and contains information about the model.

word image 278

word image 279

The stages folder contains the `bestModel` information.

word image 280

In below, you can find a snippet from a sample format for the `MLmodel` file that is saved under the sparkML-model file.

word image 281

Inferences can be generated using the `mlflow.pyfunc` module. Firstly, model and dataset paths are defined separately. Secondly, a Spark UDF is defined by using the model path. Thirdly, the dataset is read and registered into a dataframe. For the last step, a new column is produced with the help of previously defined Spark UDF by selecting the requested columns.

import mlflow.pyfunc 
from pyspark.sql import SQLContext 

train.toPandas().to_csv('dataset.csv') 

model_path = '/Users/ersoyp/Documents/LAYER/ServingModelsWithApacheSpark/Scripts/mlruns/1/51ef199ab3b945e8a31b47cdfbf60912/artifacts/sparkML-model' 


titanic_path = '/Users/ersoyp/Documents/LAYER/ServingModelsWithApacheSpark/Scripts/dataset.csv'


titanic_udf = mlflow.pyfunc.spark_udf(spark, model_path) 

df = spark.read.format("csv").option("inferSchema", True).option("header", "true").option('delimiter', ';').load(titanic_path) 

columns = ['PassengerClasses', 'SibSp', 'Parch'] 

df.withColumn('Inferences', titanic_udf(*columns)).show(False)

word image 282

Serving Apache Spark ML Models with Layer

An easier and faster way to serve Spark machine learning models is to use Layer in the model deployment stage.

As the initial step, install the Layer package.

pip install layer-sdk

After successfully installing the package, import the two required functions of the library.

from layer import Featureset, Train

For a more detailed machine learning model implementation, the GitHub sample code is available on its repository.

Final thoughts

Throughout this article, a wide range of topics were presented in the structure of initially describing the concept and implementing the solutions with sample scripts. The topics contain introducing Spark, followed by building machine learning models in Spark. The topics involved but not limited to:

  • The concept of Spark
  • Spark components and its unique architecture
  • Installations of Spark and Python
  • The essentials of RDD and its operations
  • Spark transformations and actions by their varying functions
  • Spark session generation and DataFrames
  • Exploratory data analysis with Spark
  • Machine learning in PySpark
  • Data preparation, preprocessing, and feature engineering using PySpark
  • Model building stages in PySpark
  • Model serving with Apache Spark
  • Model serving with Layer SDK

Resources

Spark wins Daytona Gray Sort 100TB Benchmark

Running Pyspark in Colab

Jupyter Notebook for RDD Basics and ML Model Serving with PySpark

Jupyter Notebook for RDD Basics and ML Model Serving with Colab

Oct 13th 2021
read

Share this post

Try Layer for free

Get started with Layers Beta

Start Free