Example Deployments

Deploying dask-jobqueue on different clusters requires a bit of customization. Below, we provide a few examples from real deployments in the wild:

Additional examples from other cluster welcome here.

PBS Deployments

from dask_jobqueue import PBSCluster

cluster = PBSCluster(queue='regular',
                     account='DaskOnPBS',
                     local_directory='$TMPDIR',
                     cores=24,
                     processes=6,
                     memory='16GB',
                     resource_spec='select=1:ncpus=24:mem=100GB')

cluster = PBSCluster(cores=24,
                     processes=6,
                     shebang='#!/usr/bin/env zsh',
                     memory="6GB",
                     account='P48500028',
                     queue='premium',
                     resource_spec='select=1:ncpus=36:mem=109G',
                     walltime='02:00:00',
                     interface='ib0')

Moab Deployments

On systems which use the Moab Workload Manager, a subclass of PBSCluster can be used, called MoabCluster:

import os
from dask_jobqueue import MoabCluster

cluster = MoabCluster(
    cores=6,
    processes=6,
    account="gfdl_m",
    memory="16G",
    resource_spec="pmem=96G",
    job_extra_directives=["-d /home/First.Last", "-M none"],
    local_directory=os.getenv("TMPDIR", "/tmp"),
)

SGE Deployments

On systems which use SGE as the scheduler, SGECluster can be used. Note that Grid Engine has a slightly involved history , so there are a variety of Grid Engine derivatives. SGECluster can be used for any derivative of Grid Engine, for example: SGE (Son of Grid Engine), Oracle Grid Engine, Univa Grid Engine.

Because the variety of Grid Engine derivatives and configuration deployments, it is not possible to use the memory keyword argument to automatically specify the amount of RAM requested. Instead, you specify the resources desired according to how your system is configured, using the resource_spec keyword argument, in addition to the memory keyword argument (which is used by Dask internally for memory management, see this for more details).

In the example below, our system administrator has used the m_mem_free keyword argument to let us request for RAM. Other known keywords may include mem_req and mem_free. We had to check with our cluster documentation and/or system administrator for this. At the same time, we must also correctly specify the memory keyword argument, to enable Dask’s memory management to do its work correctly.

from dask_jobqueue import SGECluster

cluster = SGECluster(queue='default.q',
                     walltime="1500000",
                     processes=10,   # we request 10 processes per worker
                     memory='20GB',  # for memory requests, this must be specified
                     resource_spec='m_mem_free=20G',  # for memory requests, this also needs to be specified
                     )

LSF Deployments

from dask_jobqueue import LSFCluster

cluster = LSFCluster(queue='general',
                     project='cpp',
                     walltime='00:30',
                     cores=15,
                     memory='25GB')

SLURM Deployments

from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(cores=8,
                       processes=4,
                       memory="16GB",
                       account="woodshole",
                       walltime="01:00:00",
                       queue="normal")

SLURM Deployment: Low-priority node usage

from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(
    cores=24,
    processes=6,
    memory="16GB",
    account="co_laika",
    queue="savio2_bigmem",
    job_script_prologue=[
        'export LANG="en_US.utf8"',
        'export LANGUAGE="en_US.utf8"',
        'export LC_ALL="en_US.utf8"',
    ],
    job_extra_directives=['--qos="savio_lowprio"'],
)

SLURM Deployment: Providing additional arguments to the dask-workers

Keyword arguments can be passed through to dask-workers. An example of such an argument is for the specification of abstract resources, described here. This could be used to specify special hardware availability that the scheduler is not aware of, for example GPUs. Below, the arbitrary resources “ssdGB” and “GPU” are specified. Notice that the worker_extra_args keyword is used to pass through arguments to the dask-workers.

Note: the parameter worker_extra_args was named extra until version 0.7.4. extra can still be used, but is considered deprecated and will be removed in a future version.

from dask_jobqueue import SLURMCluster
from distributed import Client
from dask import delayed

cluster = SLURMCluster(
    memory="8g", processes=1, cores=2, worker_extra_args=["--resources ssdGB=200,GPU=2"]
)

cluster.scale(2)
client = Client(cluster)

The client can then be used as normal. Additionally, required resources can be specified for certain steps in the processing. For example:

def step_1_w_single_GPU(data):
    return "Step 1 done for: %s" % data


def step_2_w_local_IO(data):
    return "Step 2 done for: %s" % data


stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)]
stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1]

result_stage_2 = client.compute(stage_2,
                                resources={tuple(stage_1): {'GPU': 1},
                                           tuple(stage_2): {'ssdGB': 100}})