dask_jobqueue.SLURMCluster

class dask_jobqueue.SLURMCluster(queue=None, project=None, walltime=None, job_cpu=None, job_mem=None, job_extra=None, config_name='slurm', **kwargs)

Launch Dask on a SLURM cluster

Parameters
queuestr

Destination queue for each worker job. Passed to #SBATCH -p option.

projectstr

Accounting string associated with each worker job. Passed to #SBATCH -A option.

walltimestr

Walltime for each worker job.

job_cpuint

Number of cpu to book in SLURM, if None, defaults to worker threads * processes

job_memstr

Amount of memory to request in SLURM. If None, defaults to worker processes * memory

job_extralist

List of other Slurm options, for example -j oe. Each option will be prepended with the #SBATCH prefix.

namestr

Name of Dask workers.

coresint

Total number of cores per job

memory: str

Total amount of memory per job

processesint

Number of processes per job

interfacestr

Network interface like ‘eth0’ or ‘ib0’.

death_timeoutfloat

Seconds to wait for a scheduler before closing workers

local_directorystr

Dask worker local directory for file spilling.

extralist

Additional arguments to pass to dask-worker

env_extralist

Other commands to add to script before launching worker.

pythonstr

Python executable used to launch Dask workers.

shebangstr

Path to desired interpreter for your batch submission script.

kwargsdict

Additional keyword arguments to pass to LocalCluster

Examples

>>> from dask_jobqueue import SLURMCluster
>>> cluster = SLURMCluster(processes=6, cores=24, memory="120GB",
                           env_extra=['export LANG="en_US.utf8"',
                                      'export LANGUAGE="en_US.utf8"',
                                      'export LC_ALL="en_US.utf8"'])
>>> cluster.scale(10)  # this may take a few seconds to launch
>>> from dask.distributed import Client
>>> client = Client(cluster)

This also works with adaptive clusters. This automatically launches and kill workers based on load.

>>> cluster.adapt()
__init__(self, queue=None, project=None, walltime=None, job_cpu=None, job_mem=None, job_extra=None, config_name='slurm', **kwargs)

Methods

__init__(self[, queue, project, walltime, …])

adapt(self[, minimum_cores, maximum_cores, …])

Turn on adaptivity For keyword arguments see dask.distributed.Adaptive Instead of minimum and maximum parameters which apply to the number of worker, If Cluster object implements worker_spec attribute, one can use the following parameters: Parameters ———- minimum_cores: int Minimum number of cores for the cluster maximum_cores: int Maximum number of cores for the cluster minimum_memory: str Minimum amount of memory for the cluster maximum_memory: str Maximum amount of memory for the cluster Examples ——– >>> cluster.adapt(minimum=0, maximum=10, interval=‘500ms’) >>> cluster.adapt(minimum_cores=24, maximum_cores=96) >>> cluster.adapt(minimum_memory=‘60 GB’, maximum_memory= ‘1 TB’)

close(self, \*\*kwargs)

Stops all running and pending jobs and stops scheduler

job_file(self)

Write job submission script to temporary file

job_script(self)

Construct a job submission script

scale(self[, n, cores, memory])

Scale cluster to n workers or to the given number of cores or memory number of cores and memory are converted into number of workers using worker_spec attribute.

scale_down(self, workers[, n])

Close the workers with the given addresses

scale_up(self, n, \*\*kwargs)

Brings total worker count up to n

start_workers(self[, n])

Start workers and point them to our local scheduler

stop_all_jobs(self)

Stops all running and pending jobs

stop_jobs(self, jobs)

Stop a list of jobs

stop_workers(self, workers)

Stop a list of workers

Attributes

cancel_command

dashboard_link

finished_jobs

Jobs that have finished

job_id_regexp

pending_jobs

Jobs pending in the queue

running_jobs

Jobs with currently active workers

scheduler

The scheduler of this cluster

scheduler_address

SLURMCluster.scheduler_name

submit_command

SLURMCluster.worker_threads