class dask_jobqueue.MoabCluster(queue=None, project=None, resource_spec=None, walltime=None, job_extra=None, config_name='pbs', **kwargs)

Launch Dask on a Moab cluster

queue : str

Destination queue for each worker job. Passed to #PBS -q option.

project : str

Accounting string associated with each worker job. Passed to #PBS -A option.

resource_spec : str

Request resources and specify job placement. Passed to #PBS -l option.

walltime : str

Walltime for each worker job.

job_extra : list

List of other PBS options, for example -j oe. Each option will be prepended with the #PBS prefix.

name : str

Name of Dask workers.

cores : int

Total number of cores per job

memory: str

Total amount of memory per job

processes : int

Number of processes per job

interface : str

Network interface like ‘eth0’ or ‘ib0’.

death_timeout : float

Seconds to wait for a scheduler before closing workers

local_directory : str

Dask worker local directory for file spilling.

extra : list

Additional arguments to pass to dask-worker

env_extra : list

Other commands to add to script before launching worker.

python : str

Python executable used to launch Dask workers.

shebang : str

Path to desired interpreter for your batch submission script.

kwargs : dict

Additional keyword arguments to pass to LocalCluster


>>> import os
>>> from dask_jobqueue import MoabCluster
>>> cluster = MoabCluster(processes=6, cores=6, project='gfdl_m',
                          memory='96G', resource_spec='96G',
                          job_extra=['-d /home/First.Last', '-M none'],
                          local_directory=os.getenv('TMPDIR', '/tmp'))
>>> cluster.scale(60)  # submit enough jobs to deploy 10 workers
>>> from dask.distributed import Client
>>> client = Client(cluster)

This also works with adaptive clusters. This automatically launches and kill workers based on load.

>>> cluster.adapt()
__init__(queue=None, project=None, resource_spec=None, walltime=None, job_extra=None, config_name='pbs', **kwargs)


__init__([queue, project, resource_spec, …])
adapt([minimum_cores, maximum_cores, …]) Turn on adaptivity For keyword arguments see dask.distributed.Adaptive Instead of minimum and maximum parameters which apply to the number of worker, If Cluster object implements worker_spec attribute, one can use the following parameters: Parameters ———- minimum_cores: int Minimum number of cores for the cluster maximum_cores: int Maximum number of cores for the cluster minimum_memory: str Minimum amount of memory for the cluster maximum_memory: str Maximum amount of memory for the cluster Examples ——– >>> cluster.adapt(minimum=0, maximum=10, interval=‘500ms’) >>> cluster.adapt(minimum_cores=24, maximum_cores=96) >>> cluster.adapt(minimum_memory=‘60 GB’, maximum_memory= ‘1 TB’)
close(**kwargs) Stops all running and pending jobs and stops scheduler
job_file() Write job submission script to temporary file
job_script() Construct a job submission script
scale([n, cores, memory]) Scale cluster to n workers or to the given number of cores or memory number of cores and memory are converted into number of workers using worker_spec attribute.
scale_down(workers[, n]) Close the workers with the given addresses
scale_up(n, **kwargs) Brings total worker count up to n
start_workers([n]) Start workers and point them to our local scheduler
stop_all_jobs() Stops all running and pending jobs
stop_jobs(jobs) Stop a list of jobs
stop_workers(workers) Stop a list of workers


finished_jobs Jobs that have finished
pending_jobs Jobs pending in the queue
running_jobs Jobs with currently active workers
scheduler The scheduler of this cluster