Dask + Yarn

written by Benjamin Zaitlen on 2016-04-08

In the past few months we've seen a number of posts about Dask. For those unfamiliar with it, Dask is an out-of-core parallel framework for data analysis. Some of the more recent examples (1, 2, 3) have demonstrated Dask's distributed capabilities -- leveraging not just multi-core architectures, but also multi-node clusters. We need a way to launch Dask workers on many machines in our cluster. In a small cluster we might do this by manually SSH-ing into many machines, using a job scheduler like SGE, or using the dec2 tool to provision and bootstrap on ec2. However, for larger clusters this approach breaks down, especially when the cluster is simultaneously running many parallel frameworks like Hadoop, Spark, Impala, etc. In this case we typically use a cluster resource manager like YARN to start and stop jobs on the cluster and to manage their execution environments. In this post, I demonstrate a toy example using the YARN resource manager.

Knit

Unfortunately for the Python community, YARN is a JVM based framework. Fortunately for the Python community, we (with special thanks to Niels Zeilemaker and support from Continuum Analytics) wrote Knit. Knit is a Python/Scala-based library which enables Python developers to request resources from YARN. As YARN is a container based resource manager, in addition to the job we wish to execute the job pack will also request container resources: number of containers, amount of memory, number of cores, queues, etc.

Dask+Knit

First, we start the scheduler on one of nodes -- typically, this will be an edge node or head node (a node where we can communicate with the YARN Resource Manager):

ubuntu@ip-172-31-62-166:~/$ dscheduler
distributed.scheduler - INFO - Start Scheduler at:        172.31.62.166:8786
distributed.scheduler - INFO -            http at:        172.31.62.166:9786

Dask is resilient to workers appearing and disappearing from the scheduler. With the scheduler up, we can add dworkers and point them at the scheduler's IP and port by issuing the following command:

$ dworker 172.31.62.166:8786

Using Knit, we'll use the same command above and start simply by asking for one container with YARN defaults for CPU and Memory:

>>> from knit import Knit
>>> k = Knit()
>>> cmd = "dworker 172.31.62.166:8786"
>>> appId = k.start(cmd, num_containers=1)
6/04/06 15:58:16 INFO knit.Client$: Staring Application Master
Attempting upload of /home/ubuntu/knit/knit/java_libs/knit-1.0-SNAPSHOT.jar
Uploading resource file:/home/ubuntu/knit/knit/java_libs/knit-1.0-SNAPSHOT.jar -> hdfs://ip-172-31-62-166.ec2.internal:8020/user/ubuntu/.knitDeps/knit-1.0-SNAPSHOT.jarhdfs://ip-172-31-62-166.ec2.internal:8020/user/ubuntu/.knitDeps/knit-1.0-SNAPSHOT.jar
16/04/06 15:58:20 INFO impl.TimelineClientImpl: Timeline service address: http://ip-172-31-62-167.ec2.internal:8188/ws/v1/timeline/
16/04/06 15:58:20 INFO client.RMProxy: Connecting to ResourceManager at ip-172-31-62-167.ec2.internal/172.31.62.167:8050
Security is enabled: true
16/04/06 15:58:20 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 162 for ubuntu on 172.31.62.166:8020
[Lorg.apache.hadoop.security.token.Token;@5fdbded6
Got dt for DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-453042160_12, ugi=ubuntu@CONTINUUM (auth:KERBEROS)]].getUri() Kind: HDFS_DELEGATION_TOKEN, Service: 172.31.62.166:8020, Ident: (HDFS_DELEGATION_TOKEN token 162 for ubuntu)
16/04/06 15:58:20 INFO knit.Client$: Submitting application application_1458491078518_0071
16/04/06 15:58:21 INFO impl.YarnClientImpl: Submitted application application_1458491078518_0071

The scheduler will verify that a new worker has connected:

distributed.core - INFO - Connection from 172.31.62.167:58512 to Scheduler
distributed.scheduler - INFO - Register 172.31.62.167:42748

Let's kill the YARN application and now ask for 5 containers:

>>> k.kill()
16/04/06 16:51:00 INFO impl.YarnClientImpl: Killed application application_1458491078518_0071

>>> appId = k.start(cmd, num_containers=5)

Again, the scheduler will also confirm we have new dworkers:

distributed.core - INFO - Connection from 172.31.62.167:39885 to Scheduler
distributed.scheduler - INFO - Register 172.31.62.167:43795
distributed.core - INFO - Connection from 172.31.62.169:60726 to Scheduler
distributed.scheduler - INFO - Register 172.31.62.169:52115
distributed.core - INFO - Connection from 172.31.62.166:33672 to Scheduler
distributed.scheduler - INFO - Register 172.31.62.166:37686
distributed.core - INFO - Connection from 172.31.62.169:60727 to Scheduler
distributed.scheduler - INFO - Register 172.31.62.169:51797
distributed.core - INFO - Connection from 172.31.62.166:33673 to Scheduler
distributed.scheduler - INFO - Register 172.31.62.166:33068

Five Dask workers are now running in various YARN containers throughout our cluster -- we can now connect an Executor to the scheduler and begin our analytics processing with Dask.

>>> from distributed import Executor
>>> e = Executor('172.31.62.166:8786')

....

Bootstrap-Fu

What wasn't mentioned in all of this is how we bootstrapped the cluster with Python, Dask, Knit, and all the other goodies. In this particular case, I used Anaconda for cluster management. This is an especially useful tool for both bootstrapping and managing Python (and R) remotely.

I would recommend folks check out Anaconda for cluster management -- but still, there are times when we don't need a hammer when a chisel will do. Within Knit, we have such a chisel. Knit can create a small but complete Python environment -- with the dependencies you need -- and ship this env along with your command. This is immeasurably valuable to those curious and excited about bringing the PyData stack to Hadoop. For example, let's build and ship an env with Dask, Pandas, and Scikit-Learn and assume we are starting on a blank cluster.

$ conda/pip install knit
$ python
>>> from knit import Knit
>>> k = Knit()
>>> env_zip = k.create_env(env_name='dev', packages=['python=3', 'distributed',
...                                                  'dask', 'pandas', 'scikit-learn'])


>>> cmd = '$PYTHON_BIN $CONDA_PREFIX/bin/dworker 172.31.62.166:8786'
>>> appId = k.start(cmd=cmd, env=env_zip)
...

And we're done! We've given the PyData community the space to leverage powerful tools we know and love in a previously non-friendly Python ecosystem.

Future Work

While we've demonstrated Dask on YARN, it's more cumbersome than I would like. It would be better if the Scheduler talked directly to YARN and we have an open issue discussing what that interface may look like. We are also pursuing other tasks, namely: