dask_jobqueue.LSFCluster
dask_jobqueue.LSFCluster¶
- class dask_jobqueue.LSFCluster(n_workers=0, job_cls: typing.Optional[dask_jobqueue.core.Job] = None, loop=None, security=None, shared_temp_directory=None, silence_logs='error', name=None, asynchronous=False, dashboard_address=None, host=None, scheduler_options=None, scheduler_cls=<class 'distributed.scheduler.Scheduler'>, interface=None, protocol=None, config_name=None, **job_kwargs)¶
Launch Dask on a LSF cluster
- Parameters
- queuestr
Destination queue for each worker job. Passed to #BSUB -q option.
- projectstr
Project associated with each worker job. Passed to #BSUB -P option.
- coresint
Total number of cores per job
- memory: str
Total amount of memory per job
- processesint
Cut the job up into this many processes. Good for GIL workloads or for nodes with many cores. By default,
process ~= sqrt(cores)
so that the number of processes and the number of threads per process is roughly the same.- interfacestr
Network interface like ‘eth0’ or ‘ib0’. This will be used both for the Dask scheduler and the Dask workers interface. If you need a different interface for the Dask scheduler you can pass it through the
scheduler_options
argument:interface=your_worker_interface, scheduler_options={'interface': your_scheduler_interface}
.- nannybool
Whether or not to start a nanny process
- local_directorystr
Dask worker local directory for file spilling.
- death_timeoutfloat
Seconds to wait for a scheduler before closing workers
- extralist
Deprecated: use
worker_extra_args
instead. This parameter will be removed in a future version.- worker_commandlist
Command to run when launching a worker. Defaults to “distributed.cli.dask_worker”
- worker_extra_argslist
Additional arguments to pass to dask-worker
- env_extralist
Deprecated: use
job_script_prologue
instead. This parameter will be removed in a future version.- job_script_prologuelist
Other commands to add to script before launching worker.
- header_skiplist
Deprecated: use
job_directives_skip
instead. This parameter will be removed in a future version.- job_directives_skiplist
Directives to skip in the generated job script header. Directives lines containing the specified strings will be removed. Directives added by job_extra_directives won’t be affected.
- log_directorystr
Directory to use for job scheduler logs.
- shebangstr
Path to desired interpreter for your batch submission script.
- pythonstr
Python executable used to launch Dask workers. Defaults to the Python that is submitting these jobs
- config_namestr
Section to use from jobqueue.yaml configuration file.
- namestr
Name of Dask worker. This is typically set by the Cluster
- ncpusint
Number of cpus. Passed to #BSUB -n option.
- memint
Request memory in bytes. Passed to #BSUB -M option.
- walltimestr
Walltime for each worker job in HH:MM. Passed to #BSUB -W option.
- n_workersint
Number of workers to start by default. Defaults to 0. See the scale method
- silence_logsstr
Log level like “debug”, “info”, or “error” to emit here if the scheduler is started locally
- asynchronousbool
Whether or not to run this cluster object with the async/await syntax
- securitySecurity or Bool
A dask.distributed security object if you’re using TLS/SSL. If True, temporary self-signed credentials will be created automatically.
- scheduler_optionsdict
Used to pass additional arguments to Dask Scheduler. For example use
scheduler_options={'dashboard_address': ':12435'}
to specify which port the web dashboard should use orscheduler_options={'host': 'your-host'}
to specify the host the Dask scheduler should run on. Seedistributed.Scheduler
for more details.- scheduler_clstype
Changes the class of the used Dask Scheduler. Defaults to Dask’s
distributed.Scheduler
.- shared_temp_directorystr
Shared directory between scheduler and worker (used for example by temporary security certificates) defaults to current working directory if not set.
- job_extralist
Deprecated: use
job_extra_directives
instead. This parameter will be removed in a future version.- job_extra_directiveslist
List of other LSF options, for example -u. Each option will be prepended with the #LSF prefix.
- lsf_unitsstr
Unit system for large units in resource usage set by the LSF_UNIT_FOR_LIMITS in the lsf.conf file of a cluster.
- use_stdinbool
LSF’s
bsub
command allows us to launch a job by passing it as an argument (bsub /tmp/jobscript.sh
) or feeding it to stdin (bsub < /tmp/jobscript.sh
). Depending on your cluster’s configuration and/or shared filesystem setup, one of those methods may not work, forcing you to use the other one. This option controls which methoddask-jobqueue
will use to submit jobs viabsub
.In particular, if your cluster fails to launch and the LSF log contains an error message similar to the following:
/home/someuser/.lsbatch/1571869562.66512066: line 8: /tmp/tmpva_yau8m.sh: No such file or directory
…then try passing
use_stdin=True
here or settinguse-stdin: true
in yourjobqueue.lsf
config section.
Examples
>>> from dask_jobqueue import LSFCluster >>> cluster = LSFCluster(queue='general', project='DaskonLSF', ... cores=15, memory='25GB', use_stdin=True) >>> cluster.scale(jobs=10) # ask for 10 jobs
>>> from dask.distributed import Client >>> client = Client(cluster)
This also works with adaptive clusters. This automatically launches and kill workers based on load.
>>> cluster.adapt(maximum_jobs=20)
- __init__(n_workers=0, job_cls: typing.Optional[dask_jobqueue.core.Job] = None, loop=None, security=None, shared_temp_directory=None, silence_logs='error', name=None, asynchronous=False, dashboard_address=None, host=None, scheduler_options=None, scheduler_cls=<class 'distributed.scheduler.Scheduler'>, interface=None, protocol=None, config_name=None, **job_kwargs)¶
Methods
__init__
([n_workers, job_cls, loop, ...])adapt
(*args[, minimum_jobs, maximum_jobs])Scale Dask cluster automatically based on scheduler activity.
close
([timeout])from_name
(name)Create an instance of this class to represent an existing cluster by name.
get_client
()Return client for the cluster
get_logs
([cluster, scheduler, workers])Return logs for the cluster, scheduler and workers
job_script
()logs
(*args, **kwargs)new_worker_spec
()Return name and spec for the next worker
scale
([n, jobs, memory, cores])Scale cluster to specified configurations.
scale_down
(workers)scale_up
([n, memory, cores])Scale cluster to n workers
sync
(func, *args[, asynchronous, ...])Call func with args synchronously or asynchronously depending on the calling context
wait_for_workers
(n_workers[, timeout])Blocking call to wait for n workers before continuing
Attributes
asynchronous
Are we running in the event loop?
called_from_running_loop
dashboard_link
job_header
job_name
loop
name
observed
plan
requested
scheduler_address