Credit: BecomingHuman
There are countless articles and forum posts about running Python on Spark, but most assume that the work to be submitted is contained in a single .py file: spark-submit wordcount.py
— done!
What if your Python program is more than just a script? Perhaps it generates dynamic SQL for Spark to execute, or refreshes models using Spark’s output. As your Python code becomes more of an app (with a directory structure, configuration files, and library dependencies), submitting it to Spark requires a bit more consideration.
Below are the alternatives I recently considered when taking one such Python application to production using Spark 2.3. This first article focuses on Spark standalone clusters. A separate article covers EMR Spark (YARN).
I am far from an authority on Spark let alone Python. My decisions attempted to balance correctness with ease of deployment, and the limitations imposed by the app with those of the cluster. Let me know what you think.
Trending AI Articles:
1. Predicting buying behavior using Machine Learning
2. Understanding and building Generative Adversarial Networks(GANs)
3. Building a Django POST face-detection API using OpenCV and Haar Cascades
4. Learning from mistakes with Hindsight Experience Replay
Sample Python application
To simulate a complete application, the scenarios below assume a Python 3 application with the following structure:
project.py
data/
data_source.py
data_source.ini
data_source.ini contains various configuration parameters:
[spark]
app_name = My PySpark App
master_url = spark://sparkmaster:7077
data_source.py is a module responsible for sourcing and processing data in Spark, making math transformations with NumPy, and returning a Pandas dataframe to the client. Dependencies:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType
import pandas as pd
import numpy as np
import configparser
It defines a DataSource class that creates a SparkContext
and SparkSession
on initialization…
class DataSource:
def __init__(self):
config = configparser.ConfigParser()
config.read('./data/data_source.ini')
master_url = config['spark']['master_url']
app_name = config['spark']['app_name']
conf = SparkConf().setAppName(app_name)
.setMaster(master_url)
self.sc = SparkContext(conf=conf)
self.spark = SparkSession.builder
.config(conf=conf)
.getOrCreate()
…and a get_data() method that:
- Creates an RDD from a NumPy normal distribution.
- Applies a function to double the value of every element.
- Converts the RDD into a Spark dataframe and defines a temp view on top.
- Applies a Python UDF that squares the contents of every dataframe element using SQL.
- Returns the results to the client as a Pandas dataframe.
def get_data(self, num_elements=1000) -> pd.DataFrame:
mu, sigma = 2, 0.5
v = np.random.normal(mu, sigma, num_elements)
rdd1 = self.sc.parallelize(v)
def mult(x): return x * np.array([2])
rdd2 = rdd1.map(mult).map(lambda x: (float(x),))
schema = StructType([StructField("value", FloatType(), True)])
df1 = self.spark.createDataFrame(rdd2, schema)
df1.registerTempTable("test")
def square(x): return x ** 2
self.spark.udf.register("squared", square)
df2 = self.spark.sql("SELECT squared(value) squared FROM test")
return df2.toPandas()
project.py is our main program, acting as a client of the above module:
from data.data_source import DataSource
def main():
src = DataSource()
df = src.get_data(num_elements=100000)
print(f"Got Pandas dataframe with {df.size} elements")
print(df.head(10))
main()
Clone the repo: https://bitbucket.org/calidoteam/pyspark.git
Before we begin, let’s review the options available when submitting work to Spark.
spark-submit, client and cluster modes
- Spark supports various cluster managers: Standalone (i.e. built into Spark), Hadoop’s YARN, Mesos, Kubernetes, all of which control how your workload runs on a set of resources.
spark-submit
is the only interface that works consistently with all cluster managers. For Python applications,spark-submit
can upload and stage all dependencies you provide as .py, .zip or .egg files when needed.- In client mode, your Python program (i.e. driver) will run on the same host where
spark-submit
runs. It is in your best interest to make sure such host is close to your worker nodes to reduce network latency. - In cluster mode, your Python program (i.e. driver) and dependencies will be uploaded to and run from some worker node. This is useful when submitting jobs from a remote host. As of Spark 2.4.0 cluster mode is not an option when running on Spark standalone.
- Alternatively, it is possible to bypass
spark-submit
by configuring theSparkSession
in your Python app to connect to the cluster. This requires the right configuration and matching PySpark binaries. Your Python app will effectively be running in client mode: It will run from wherever host you launched it.
The following sections describe several deployment alternatives and what configuration was required in each.
Credit: BecomingHuman By: Ivan Vasquez