In my previous post, I described different scenarios for bootstrapping Python on a multi-node cluster. I offered a general solution using Anaconda for cluster management and solution using a custom conda env deployed with Knit.
In a follow-up to that post, I was asked if the machinery in Knit would also work for Spark. Sure--of course! In fact, much of Knit's design comes from Spark's deploy codebase. Here I am going to demonstrate how we can ship a Python environment, complete with desired dependencies, as part of a Spark job without installing Python on every node.
First I want to briefly describe key points in Spark's YARN deploy methodologies.
After negotiating which resources to provision with YARN's Resource Manager, Spark
asks for a directory to be constructed on HDFS: /user/ubuntu/.sparkStaging/application_1460665326796_0065/
The directory will always be in the user's home, and the application ID issued by YARN is appended to the
directory name. (Thinking about this now, perhaps this is obvious and straightforward to JAVA/JVM folks where
bundling Uber JARs has long been the practice in traditional Map-Reduce jobs.) In any case,
Spark then uploads itself :) to the stagingDirectory
, and when YARN provisions
a container, the contents of the directory are pulled down and the spark-assembly jar is executed.
If you are using PySpark or sparkR, a corresponding pyspark.zip and sparkr.zip will
be found in the staging directory as well.
Occasionally, users see FileNotFoundException errors -- this can be caused by a few things: incorrect Spark Contexts, incorrect SPARK_HOME, and I have faint recollection that there was a packaging problem once where pyspark.zip or sparkr.zip was missing? or could not be created do to permissions? Anyway...below is the output you will see when Spark works cleanly.
16/04/15 13:01:03 INFO Client: Uploading resource file:/opt/anaconda/share/spark-1.6.0/lib/spark-assembly-1.6.0-hadoop2.6.0.jar -> hdfs://ip-172-31-50-60:9000/user/ubuntu/.sparkStaging/application_1460665326796_0065/spark-assembly-1.6.0-hadoop2.6.0.jar
16/04/15 13:01:07 INFO Client: Uploading resource file:/opt/anaconda/share/spark-1.6.0/python/lib/pyspark.zip -> hdfs://ip-172-31-50-60:9000/user/ubuntu/.sparkStaging/application_1460665326796_0065/pyspark.zip
Not terribly exciting, but positive confirmation that Spark is uploading local files to HDFS.
Mostly what I described above is what the YARN framework allows developers to do -- it's more that Spark implements a YARN application than Spark doing magical things (and Knit as well!). If I were using Scala/Java, I would package up everything in a jar and use spark-submit -- Done!
Unfortunately, there's a little more work to be done for an Uber Python jar equivalent.
One of the killer features of Conda is environment management. When Conda creates a new environment, it uses hard-links when possible. Generally, this greatly reduces disk usage. But, if we move the directory to another machine, we're probably just moving a handful of hard-links and not the files themselves. Fortunately, we can tell Conda: No! Copy the files!
For example:
conda create -p /home/ubuntu/dev --copy -y -q python=3 pandas scikit-learn
By using the --copy
, we "Install all packages using copies instead of hard- or soft-linking." The headers
in various files in the bin/
directory may have lines like #!/home/ubuntu/dev/bin/python
. But we don't
need to be concerned about that -- we're not going to be using 2to3, idle, pip, etc. If we zipped up the
environment, we could move this onto another machine of a similar OS type, execute Python, and we'd be able to load any
library in the lib/python3.45/site-packages
directory.
We're very close to our Uber Python jar -- now with a zipped Conda directory in mind, let's proceed.
zip -r dev.zip dev
We are going to need a handful of specific command line options and environment variables: Spark Yarn Configuration and Spark Environment Variables. We'll be using:
PYSPARK_PYTHON
: The Python binary Spark should usespark.yarn.appMasterEnv.PYSPARK_PYTHON
(though this one could be wrong/unnecessary/only used for --master yarn-cluster)--archives
: include local tgz/jar/zip in .sparkStaging
directory and pull down into temporary YARN containerAnd we'll also need a test script. The following is a reasonable test to prove which Python Spark is using -- we're writing a no-op function which returns Python's various paths it is using to find libraries
# test_spark.py
import os
import sys
from pyspark import SparkContext
from pyspark import SparkConf
conf = SparkConf()
conf.setAppName("get-hosts")
sc = SparkContext(conf=conf)
def noop(x):
import socket
import sys
return socket.gethostname() + ' '.join(sys.path) + ' '.join(os.environ)
rdd = sc.parallelize(range(1000), 100)
hosts = rdd.map(noop).distinct().collect()
print(hosts)
And executing everything together:
PYSPARK_PYTHON=./ANACONDA/dev/bin/python spark-submit \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./ANACONDA/dev/bin/python \
--master yarn-cluster \
--archives /home/ubuntu/dev.zip#ANACONDA \
/home/ubuntu/test_spark.py
We'll get the following output in the yarn logs:
'ip-172-31-50-61 . /var/lib/hadoop- yarn/data/1/yarn/local/usercache/ubuntu/filecache/207/spark-assembly-1.6.0- hadoop2.6.0.jar /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubuntu/appcach e/application_1460665326796_0070/container_1460665326796_0070_01_000003/{{PWD}} /pyspark.zip
{{PWD}}/py4j-0.9-src.zip /var/lib/hadoop-yarn/data/1/yarn/loca l/usercache/ubuntu/appcache/application_1460665326796_0070/container_1460665326 796_0070_01_000003/pyspark.zip /var/lib/hadoop-yarn/data/1/yarn/local/usercache /ubuntu/appcache/application_1460665326796_0070/container_1460665326796_0070_01 _000003/py4j-0.9-src.zip /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubunt u/appcache/application_1460665326796_0070/container_1460665326796_0070_01_00000 3/ANACONDA/dev/lib/python35.zip /var/lib/hadoop-yarn/data/1/yarn/local/usercach e/ubuntu/appcache/application_1460665326796_0070/container_1460665326796_0070_0 1_000003/ANACONDA/dev/lib/python3.5 /var/lib/hadoop-yarn/data/1/yarn/local/user cache/ubuntu/appcache/application_1460665326796_0070/container_1460665326796_00 70_01_000003/ANACONDA/dev/lib/python3.5/plat-linux /var/lib/hadoop-yarn/data/1/ yarn/local/usercache/ubuntu/appcache/application_1460665326796_0070/container_1 460665326796_0070_01_000003/ANACONDA/dev/lib/python3.5/lib-dynload /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubuntu/filecache/208/dev. zip/dev/lib/python3.5/site-packages/setuptools-20.6.7-py3.5.egg /var/lib/hadoop -yarn/data/1/yarn/local/usercache/ubuntu/appcache/application_1460665326796_007 0/container_1460665326796_0070_01_000003/ANACONDA/dev/lib/python3.5/site- packages ...', 'ip-172-31-50-62 . /var/lib/hadoop- yarn/data/1/yarn/local/usercache/ubuntu/filecache/209/spark-assembly-1.6.0- hadoop2.6.0.jar /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubuntu/appcach e/application_1460665326796_0070/container_1460665326796_0070_01_000002/{{PWD}} /pyspark.zip {{PWD}}/py4j-0.9-src.zip /var/lib/hadoop-yarn/data/1/yarn/loca l/usercache/ubuntu/appcache/application_1460665326796_0070/container_1460665326 796_0070_01_000002/pyspark.zip /var/lib/hadoop-yarn/data/1/yarn/local/usercache /ubuntu/appcache/application_1460665326796_0070/container_1460665326796_0070_01 _000002/py4j-0.9-src.zip /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubunt u/appcache/application_1460665326796_0070/container_1460665326796_0070_01_00000 2/ANACONDA/dev/lib/python35.zip /var/lib/hadoop-yarn/data/1/yarn/local/usercach e/ubuntu/appcache/application_1460665326796_0070/container_1460665326796_0070_0 1_000002/ANACONDA/dev/lib/python3.5 /var/lib/hadoop-yarn/data/1/yarn/local/user cache/ubuntu/appcache/application_1460665326796_0070/container_1460665326796_00 70_01_000002/ANACONDA/dev/lib/python3.5/plat-linux /var/lib/hadoop-yarn/data/1/ yarn/local/usercache/ubuntu/appcache/application_1460665326796_0070/container_1 460665326796_0070_01_000002/ANACONDA/dev/lib/python3.5/lib-dynload /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubuntu/filecache/211/dev. zip/dev/lib/python3.5/site-packages/setuptools-20.6.7-py3.5.egg /var/lib/hadoop -yarn/data/1/yarn/local/usercache/ubuntu/appcache/application_1460665326796_007 0/container_1460665326796_0070_01_000002/ANACONDA/dev/lib/python3.5/site- packages ...'
It's a little hard to parse -- what should be noted are file paths like:
.../container_1460665326796_0070_01_000002/ANACONDA/dev/lib/python3.5/site- packages
This is demonstrating that Spark is using the unzipped directory in the YARN container. Ta-da!
Okay, perhaps that's not super exciting, so let's zoom out again:
There is an open JIRA ticket discussing
the option of having Spark ingest a requirements.txt
and building the Python environment
as a preamble to a Spark job. This is also a fairly novel approach to the same end -- using Spark
to bootstrap a runtime environment. It's even
a bit more general, since the method described above relies on YARN. I first saw this strategy
in use with streamparse.
Similarly to the implementation in JIRA ticket, streamparse can ship a Python requirements.txt
and construct a Python environment as part of a Streamparse Storm job!
Oh, and R Conda environments work as well...but it's more involved:
First, it's pretty cool that Conda can install and manage R environments. Again, we create a Conda environment with R binaries and libraries
conda create -p /home/ubuntu/r_env --copy -y -q r-essentials -c r
R is not exactly relocatable so we need to munge a bit:
sed -i "s/home\/ubuntu/.r_env.zip/g" /home/ubuntu/r_env/bin/R
zip -r r_env.zip r_env
My R skills are at a below-novice level, so the following test script could probably be improved
# /home/ubuntu/test_spark.R
library(SparkR)
sc <- sparkR.init(appName="get-hosts-R")
noop <- function(x) {
path <- toString(.libPaths())
host <- toString(Sys.info()['nodename'])
host_path <- toString(cbind(host,path))
host_path
}
rdd <- SparkR:::parallelize(sc, 1:1000, 100)
hosts <- SparkR:::map(rdd, noop)
d_hosts <- SparkR:::distinct(hosts)
out <- SparkR:::collect(d_hosts)
print(out)
Execute (and the real death by options):
SPARKR_DRIVER_R=./r_env.zip/r_env/lib/R spark-submit --master yarn-cluster \
--conf spark.yarn.appMasterEnv.R_HOME=./r_env.zip/r_env/lib64/R \
--conf spark.yarn.appMasterEnv.RHOME=./r_env.zip/r_env \
--conf spark.yarn.appMasterEnv.R_SHARE_DIR=./r_env.zip/r_env/lib/R/share \
--conf spark.yarn.appMasterEnv.R_INCLUDE_DIR=./r_env.zip/r_env/lib/R/include \
--conf spark.executorEnv.R_HOME=./r_env.zip/r_env/lib64/R \
--conf spark.executorEnv.RHOME=./r_env.zip/r_env \
--conf spark.executorEnv.R_SHARE_DIR=./r_env.zip/r_env/lib/R/share \
--conf spark.executorEnv.R_INCLUDE_DIR=./r_env.zip/r_env/lib/R/include \
--conf spark.r.command=./r_env.zip/r_env/bin/Rscript \
--archives r_env.zip \
/home/ubuntu/test_spark.R
Example output:
[[1]]
[1] "ip-172-31-50-59, /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubuntu/filecache/230/sparkr.zip, /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubuntu/filecache/229/r_env.zip/r_env/lib64/R/library"
[[2]]
[1] "ip-172-31-50-61, /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubuntu/filecache/183/sparkr.zip, /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubuntu/filecache/182/r_env.zip/r_env/lib64/R/library"