Conda + Spark

written by Benjamin Zaitlen on 2016-04-15

In my previous post, I described different scenarios for bootstrapping Python on a multi-node cluster. I offered a general solution using Anaconda for cluster management and solution using a custom conda env deployed with Knit.

In a follow-up to that post, I was asked if the machinery in Knit would also work for Spark. Sure--of course! In fact, much of Knit's design comes from Spark's deploy codebase. Here I am going to demonstrate how we can ship a Python environment, complete with desired dependencies, as part of a Spark job without installing Python on every node.

Spark YARN Deploy

First I want to briefly describe key points in Spark's YARN deploy methodologies. After negotiating which resources to provision with YARN's Resource Manager, Spark asks for a directory to be constructed on HDFS: /user/ubuntu/.sparkStaging/application_1460665326796_0065/ The directory will always be in the user's home, and the application ID issued by YARN is appended to the directory name. (Thinking about this now, perhaps this is obvious and straightforward to JAVA/JVM folks where bundling Uber JARs has long been the practice in traditional Map-Reduce jobs.) In any case, Spark then uploads itself :) to the stagingDirectory, and when YARN provisions a container, the contents of the directory are pulled down and the spark-assembly jar is executed. If you are using PySpark or sparkR, a corresponding pyspark.zip and sparkr.zip will be found in the staging directory as well.

Occasionally, users see FileNotFoundException errors -- this can be caused by a few things: incorrect Spark Contexts, incorrect SPARK_HOME, and I have faint recollection that there was a packaging problem once where pyspark.zip or sparkr.zip was missing? or could not be created do to permissions? Anyway...below is the output you will see when Spark works cleanly.

16/04/15 13:01:03 INFO Client: Uploading resource file:/opt/anaconda/share/spark-1.6.0/lib/spark-assembly-1.6.0-hadoop2.6.0.jar -> hdfs://ip-172-31-50-60:9000/user/ubuntu/.sparkStaging/application_1460665326796_0065/spark-assembly-1.6.0-hadoop2.6.0.jar
16/04/15 13:01:07 INFO Client: Uploading resource file:/opt/anaconda/share/spark-1.6.0/python/lib/pyspark.zip -> hdfs://ip-172-31-50-60:9000/user/ubuntu/.sparkStaging/application_1460665326796_0065/pyspark.zip

Not terribly exciting, but positive confirmation that Spark is uploading local files to HDFS.

Bootstrap-Fu Redux

Mostly what I described above is what the YARN framework allows developers to do -- it's more that Spark implements a YARN application than Spark doing magical things (and Knit as well!). If I were using Scala/Java, I would package up everything in a jar and use spark-submit -- Done!

Unfortunately, there's a little more work to be done for an Uber Python jar equivalent.

Hard-links won't travel

One of the killer features of Conda is environment management. When Conda creates a new environment, it uses hard-links when possible. Generally, this greatly reduces disk usage. But, if we move the directory to another machine, we're probably just moving a handful of hard-links and not the files themselves. Fortunately, we can tell Conda: No! Copy the files!

For example:

conda create -p /home/ubuntu/dev --copy -y -q python=3 pandas scikit-learn

By using the --copy, we "Install all packages using copies instead of hard- or soft-linking." The headers in various files in the bin/ directory may have lines like #!/home/ubuntu/dev/bin/python. But we don't need to be concerned about that -- we're not going to be using 2to3, idle, pip, etc. If we zipped up the environment, we could move this onto another machine of a similar OS type, execute Python, and we'd be able to load any library in the lib/python3.45/site-packages directory.

We're very close to our Uber Python jar -- now with a zipped Conda directory in mind, let's proceed.

zip -r dev.zip dev

Death by ENV Vars

We are going to need a handful of specific command line options and environment variables: Spark Yarn Configuration and Spark Environment Variables. We'll be using:

  • PYSPARK_PYTHON: The Python binary Spark should use
  • spark.yarn.appMasterEnv.PYSPARK_PYTHON (though this one could be wrong/unnecessary/only used for --master yarn-cluster)
  • --archives: include local tgz/jar/zip in .sparkStaging directory and pull down into temporary YARN container

And we'll also need a test script. The following is a reasonable test to prove which Python Spark is using -- we're writing a no-op function which returns Python's various paths it is using to find libraries

# test_spark.py
import os
import sys
from pyspark import SparkContext
from pyspark import SparkConf

conf = SparkConf()
conf.setAppName("get-hosts")

sc = SparkContext(conf=conf)

def noop(x):
    import socket
    import sys
    return socket.gethostname() + ' '.join(sys.path) + ' '.join(os.environ)

rdd = sc.parallelize(range(1000), 100)
hosts = rdd.map(noop).distinct().collect()
print(hosts)

And executing everything together:

 PYSPARK_PYTHON=./ANACONDA/dev/bin/python spark-submit \
 --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./ANACONDA/dev/bin/python \
 --master yarn-cluster \
 --archives /home/ubuntu/dev.zip#ANACONDA \
 /home/ubuntu/test_spark.py

We'll get the following output in the yarn logs:

'ip-172-31-50-61 . /var/lib/hadoop- yarn/data/1/yarn/local/usercache/ubuntu/filecache/207/spark-assembly-1.6.0- hadoop2.6.0.jar /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubuntu/appcach e/application_1460665326796_0070/container_1460665326796_0070_01_000003/{{PWD}} /pyspark.zip{{PWD}}/py4j-0.9-src.zip /var/lib/hadoop-yarn/data/1/yarn/loca l/usercache/ubuntu/appcache/application_1460665326796_0070/container_1460665326 796_0070_01_000003/pyspark.zip /var/lib/hadoop-yarn/data/1/yarn/local/usercache /ubuntu/appcache/application_1460665326796_0070/container_1460665326796_0070_01 _000003/py4j-0.9-src.zip /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubunt u/appcache/application_1460665326796_0070/container_1460665326796_0070_01_00000 3/ANACONDA/dev/lib/python35.zip /var/lib/hadoop-yarn/data/1/yarn/local/usercach e/ubuntu/appcache/application_1460665326796_0070/container_1460665326796_0070_0 1_000003/ANACONDA/dev/lib/python3.5 /var/lib/hadoop-yarn/data/1/yarn/local/user cache/ubuntu/appcache/application_1460665326796_0070/container_1460665326796_00 70_01_000003/ANACONDA/dev/lib/python3.5/plat-linux /var/lib/hadoop-yarn/data/1/ yarn/local/usercache/ubuntu/appcache/application_1460665326796_0070/container_1 460665326796_0070_01_000003/ANACONDA/dev/lib/python3.5/lib-dynload /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubuntu/filecache/208/dev. zip/dev/lib/python3.5/site-packages/setuptools-20.6.7-py3.5.egg /var/lib/hadoop -yarn/data/1/yarn/local/usercache/ubuntu/appcache/application_1460665326796_007 0/container_1460665326796_0070_01_000003/ANACONDA/dev/lib/python3.5/site- packages ...', 'ip-172-31-50-62 . /var/lib/hadoop- yarn/data/1/yarn/local/usercache/ubuntu/filecache/209/spark-assembly-1.6.0- hadoop2.6.0.jar /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubuntu/appcach e/application_1460665326796_0070/container_1460665326796_0070_01_000002/{{PWD}} /pyspark.zip{{PWD}}/py4j-0.9-src.zip /var/lib/hadoop-yarn/data/1/yarn/loca l/usercache/ubuntu/appcache/application_1460665326796_0070/container_1460665326 796_0070_01_000002/pyspark.zip /var/lib/hadoop-yarn/data/1/yarn/local/usercache /ubuntu/appcache/application_1460665326796_0070/container_1460665326796_0070_01 _000002/py4j-0.9-src.zip /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubunt u/appcache/application_1460665326796_0070/container_1460665326796_0070_01_00000 2/ANACONDA/dev/lib/python35.zip /var/lib/hadoop-yarn/data/1/yarn/local/usercach e/ubuntu/appcache/application_1460665326796_0070/container_1460665326796_0070_0 1_000002/ANACONDA/dev/lib/python3.5 /var/lib/hadoop-yarn/data/1/yarn/local/user cache/ubuntu/appcache/application_1460665326796_0070/container_1460665326796_00 70_01_000002/ANACONDA/dev/lib/python3.5/plat-linux /var/lib/hadoop-yarn/data/1/ yarn/local/usercache/ubuntu/appcache/application_1460665326796_0070/container_1 460665326796_0070_01_000002/ANACONDA/dev/lib/python3.5/lib-dynload /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubuntu/filecache/211/dev. zip/dev/lib/python3.5/site-packages/setuptools-20.6.7-py3.5.egg /var/lib/hadoop -yarn/data/1/yarn/local/usercache/ubuntu/appcache/application_1460665326796_007 0/container_1460665326796_0070_01_000002/ANACONDA/dev/lib/python3.5/site- packages ...'

It's a little hard to parse -- what should be noted are file paths like:

.../container_1460665326796_0070_01_000002/ANACONDA/dev/lib/python3.5/site- packages

This is demonstrating that Spark is using the unzipped directory in the YARN container. Ta-da!

Thoughts

Okay, perhaps that's not super exciting, so let's zoom out again:

  1. We create a zipped Conda environment with dependencies: pandas, python=3,...
  2. We successfully launched a Python Spark job without any Python binaries or libraries previously installed on the nodes.

There is an open JIRA ticket discussing the option of having Spark ingest a requirements.txt and building the Python environment as a preamble to a Spark job. This is also a fairly novel approach to the same end -- using Spark to bootstrap a runtime environment. It's even a bit more general, since the method described above relies on YARN. I first saw this strategy in use with streamparse. Similarly to the implementation in JIRA ticket, streamparse can ship a Python requirements.txt and construct a Python environment as part of a Streamparse Storm job!

Rrrrrrrrrrrrrr

Oh, and R Conda environments work as well...but it's more involved:

Create/Munge R Env

First, it's pretty cool that Conda can install and manage R environments. Again, we create a Conda environment with R binaries and libraries

conda create -p /home/ubuntu/r_env --copy -y -q r-essentials -c r

R is not exactly relocatable so we need to munge a bit:

sed -i "s/home\/ubuntu/.r_env.zip/g" /home/ubuntu/r_env/bin/R
zip -r r_env.zip r_env

My R skills are at a below-novice level, so the following test script could probably be improved

# /home/ubuntu/test_spark.R
library(SparkR)
sc <- sparkR.init(appName="get-hosts-R")

noop <- function(x) {
  path <- toString(.libPaths())
  host <- toString(Sys.info()['nodename'])
  host_path <- toString(cbind(host,path))
  host_path
}


rdd <- SparkR:::parallelize(sc, 1:1000, 100)
hosts <- SparkR:::map(rdd, noop)
d_hosts <- SparkR:::distinct(hosts)
out <- SparkR:::collect(d_hosts)
print(out)

Execute (and the real death by options):

SPARKR_DRIVER_R=./r_env.zip/r_env/lib/R spark-submit --master yarn-cluster \
--conf spark.yarn.appMasterEnv.R_HOME=./r_env.zip/r_env/lib64/R \
--conf spark.yarn.appMasterEnv.RHOME=./r_env.zip/r_env \
--conf spark.yarn.appMasterEnv.R_SHARE_DIR=./r_env.zip/r_env/lib/R/share \
--conf spark.yarn.appMasterEnv.R_INCLUDE_DIR=./r_env.zip/r_env/lib/R/include \
--conf spark.executorEnv.R_HOME=./r_env.zip/r_env/lib64/R \
--conf spark.executorEnv.RHOME=./r_env.zip/r_env \
--conf spark.executorEnv.R_SHARE_DIR=./r_env.zip/r_env/lib/R/share \
--conf spark.executorEnv.R_INCLUDE_DIR=./r_env.zip/r_env/lib/R/include \
--conf  spark.r.command=./r_env.zip/r_env/bin/Rscript \
--archives r_env.zip \
/home/ubuntu/test_spark.R

Example output:

[[1]]
[1] "ip-172-31-50-59, /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubuntu/filecache/230/sparkr.zip, /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubuntu/filecache/229/r_env.zip/r_env/lib64/R/library"

[[2]]
[1] "ip-172-31-50-61, /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubuntu/filecache/183/sparkr.zip, /var/lib/hadoop-yarn/data/1/yarn/local/usercache/ubuntu/filecache/182/r_env.zip/r_env/lib64/R/library"