dask_jobqueue.LSFCluster

class dask_jobqueue.LSFCluster(queue=None, project=None, ncpus=None, mem=None, walltime=None, job_extra=None, lsf_units=None, config_name='lsf', **kwargs)

Launch Dask on a LSF cluster

Parameters
queuestr

Destination queue for each worker job. Passed to #BSUB -q option.

projectstr

Accounting string associated with each worker job. Passed to #BSUB -P option.

ncpusint

Number of cpus. Passed to #BSUB -n option.

memint

Request memory in bytes. Passed to #BSUB -M option.

walltimestr

Walltime for each worker job in HH:MM. Passed to #BSUB -W option.

job_extralist

List of other LSF options, for example -u. Each option will be prepended with the #LSF prefix.

lsf_unitsstr

Unit system for large units in resource usage set by the LSF_UNIT_FOR_LIMITS in the lsf.conf file of a cluster.

namestr

Name of Dask workers.

coresint

Total number of cores per job

memory: str

Total amount of memory per job

processesint

Number of processes per job

interfacestr

Network interface like ‘eth0’ or ‘ib0’.

death_timeoutfloat

Seconds to wait for a scheduler before closing workers

local_directorystr

Dask worker local directory for file spilling.

extralist

Additional arguments to pass to dask-worker

env_extralist

Other commands to add to script before launching worker.

pythonstr

Python executable used to launch Dask workers.

shebangstr

Path to desired interpreter for your batch submission script.

kwargsdict

Additional keyword arguments to pass to LocalCluster

Examples

>>> from dask_jobqueue import LSFCluster
>>> cluster = LSFcluster(queue='general', project='DaskonLSF',
...                      cores=15, memory='25GB')
>>> cluster.scale(10)  # this may take a few seconds to launch
>>> from dask.distributed import Client
>>> client = Client(cluster)

This also works with adaptive clusters. This automatically launches and kill workers based on load.

>>> cluster.adapt()
__init__(self, queue=None, project=None, ncpus=None, mem=None, walltime=None, job_extra=None, lsf_units=None, config_name='lsf', **kwargs)

Methods

__init__(self[, queue, project, ncpus, mem, …])

adapt(self[, minimum_cores, maximum_cores, …])

Turn on adaptivity For keyword arguments see dask.distributed.Adaptive Instead of minimum and maximum parameters which apply to the number of worker, If Cluster object implements worker_spec attribute, one can use the following parameters: Parameters ———- minimum_cores: int Minimum number of cores for the cluster maximum_cores: int Maximum number of cores for the cluster minimum_memory: str Minimum amount of memory for the cluster maximum_memory: str Maximum amount of memory for the cluster Examples ——– >>> cluster.adapt(minimum=0, maximum=10, interval=‘500ms’) >>> cluster.adapt(minimum_cores=24, maximum_cores=96) >>> cluster.adapt(minimum_memory=‘60 GB’, maximum_memory= ‘1 TB’)

close(self, \*\*kwargs)

Stops all running and pending jobs and stops scheduler

job_file(self)

Write job submission script to temporary file

job_script(self)

Construct a job submission script

scale(self[, n, cores, memory])

Scale cluster to n workers or to the given number of cores or memory number of cores and memory are converted into number of workers using worker_spec attribute.

scale_down(self, workers[, n])

Close the workers with the given addresses

scale_up(self, n, \*\*kwargs)

Brings total worker count up to n

start_workers(self[, n])

Start workers and point them to our local scheduler

stop_all_jobs(self)

Stops all running and pending jobs

stop_jobs(self, jobs)

Stop a list of jobs

stop_workers(self, workers)

Stop a list of workers

Attributes

cancel_command

dashboard_link

finished_jobs

Jobs that have finished

job_id_regexp

pending_jobs

Jobs pending in the queue

running_jobs

Jobs with currently active workers

scheduler

The scheduler of this cluster

scheduler_address

LSFCluster.scheduler_name

submit_command

LSFCluster.worker_threads