dask_jobqueue.OARCluster
dask_jobqueue.OARCluster¶
- class dask_jobqueue.OARCluster(n_workers=0, job_cls: typing.Optional[dask_jobqueue.core.Job] = None, loop=None, security=None, shared_temp_directory=None, silence_logs='error', name=None, asynchronous=False, dashboard_address=None, host=None, scheduler_options=None, scheduler_cls=<class 'distributed.scheduler.Scheduler'>, interface=None, protocol=None, config_name=None, **job_kwargs)¶
Launch Dask on an OAR cluster
- Parameters
- queuestr
Destination queue for each worker job. Passed to #OAR -q option.
- projectstr
Project associated with each worker job. Passed to #OAR –project option.
- coresint
Total number of cores per job
- memory: str
Total amount of memory per job
- processesint
Cut the job up into this many processes. Good for GIL workloads or for nodes with many cores. By default,
process ~= sqrt(cores)
so that the number of processes and the number of threads per process is roughly the same.- interfacestr
Network interface like ‘eth0’ or ‘ib0’. This will be used both for the Dask scheduler and the Dask workers interface. If you need a different interface for the Dask scheduler you can pass it through the
scheduler_options
argument:interface=your_worker_interface, scheduler_options={'interface': your_scheduler_interface}
.- nannybool
Whether or not to start a nanny process
- local_directorystr
Dask worker local directory for file spilling.
- death_timeoutfloat
Seconds to wait for a scheduler before closing workers
- extralist
Deprecated: use
worker_extra_args
instead. This parameter will be removed in a future version.- worker_commandlist
Command to run when launching a worker. Defaults to “distributed.cli.dask_worker”
- worker_extra_argslist
Additional arguments to pass to dask-worker
- env_extralist
Deprecated: use
job_script_prologue
instead. This parameter will be removed in a future version.- job_script_prologuelist
Other commands to add to script before launching worker.
- header_skiplist
Deprecated: use
job_directives_skip
instead. This parameter will be removed in a future version.- job_directives_skiplist
Directives to skip in the generated job script header. Directives lines containing the specified strings will be removed. Directives added by job_extra_directives won’t be affected.
- log_directorystr
Directory to use for job scheduler logs.
- shebangstr
Path to desired interpreter for your batch submission script.
- pythonstr
Python executable used to launch Dask workers. Defaults to the Python that is submitting these jobs
- config_namestr
Section to use from jobqueue.yaml configuration file.
- namestr
Name of Dask worker. This is typically set by the Cluster
- n_workersint
Number of workers to start by default. Defaults to 0. See the scale method
- silence_logsstr
Log level like “debug”, “info”, or “error” to emit here if the scheduler is started locally
- asynchronousbool
Whether or not to run this cluster object with the async/await syntax
- securitySecurity or Bool
A dask.distributed security object if you’re using TLS/SSL. If True, temporary self-signed credentials will be created automatically.
- scheduler_optionsdict
Used to pass additional arguments to Dask Scheduler. For example use
scheduler_options={'dashboard_address': ':12435'}
to specify which port the web dashboard should use orscheduler_options={'host': 'your-host'}
to specify the host the Dask scheduler should run on. Seedistributed.Scheduler
for more details.- scheduler_clstype
Changes the class of the used Dask Scheduler. Defaults to Dask’s
distributed.Scheduler
.- shared_temp_directorystr
Shared directory between scheduler and worker (used for example by temporary security certificates) defaults to current working directory if not set.
- resource_specstr
Request resources and specify job placement. Passed to #OAR -l option.
- walltimestr
Walltime for each worker job.
- job_extralist
Deprecated: use
job_extra_directives
instead. This parameter will be removed in a future version.- job_extra_directiveslist
List of other OAR options, for example -t besteffort. Each option will be prepended with the #OAR prefix.
- memory_per_core_property_namestr
The memory per core property name of your OAR cluster (usually memcore or mem_core). Existing properties can be listed by executing the oarnodes command. Note that the memory per core property might not exist on your cluster. If that is the case, you can set memory_per_core_property_name=”not_applicable” to avoid getting a warning. If you leave memory_per_core_property_name set to its default value, you will get a warning. memory_per_core_property_name is “not_applicable” or set to its default value, allocated nodes may not have enough memory to match the memory parameter and Dask worker memory management may not work correctly.
Examples
>>> from dask_jobqueue import OARCluster >>> cluster = OARCluster(queue='regular') >>> cluster.scale(jobs=10) # ask for 10 jobs
>>> from dask.distributed import Client >>> client = Client(cluster)
This also works with adaptive clusters. This automatically launches and kill workers based on load.
>>> cluster.adapt(maximum_jobs=20)
- __init__(n_workers=0, job_cls: typing.Optional[dask_jobqueue.core.Job] = None, loop=None, security=None, shared_temp_directory=None, silence_logs='error', name=None, asynchronous=False, dashboard_address=None, host=None, scheduler_options=None, scheduler_cls=<class 'distributed.scheduler.Scheduler'>, interface=None, protocol=None, config_name=None, **job_kwargs)¶
Methods
__init__
([n_workers, job_cls, loop, ...])adapt
(*args[, minimum_jobs, maximum_jobs])Scale Dask cluster automatically based on scheduler activity.
close
([timeout])from_name
(name)Create an instance of this class to represent an existing cluster by name.
get_client
()Return client for the cluster
get_logs
([cluster, scheduler, workers])Return logs for the cluster, scheduler and workers
job_script
()logs
(*args, **kwargs)new_worker_spec
()Return name and spec for the next worker
scale
([n, jobs, memory, cores])Scale cluster to specified configurations.
scale_down
(workers)scale_up
([n, memory, cores])Scale cluster to n workers
sync
(func, *args[, asynchronous, ...])Call func with args synchronously or asynchronously depending on the calling context
wait_for_workers
(n_workers[, timeout])Blocking call to wait for n workers before continuing
Attributes
asynchronous
Are we running in the event loop?
called_from_running_loop
dashboard_link
job_header
job_name
loop
name
observed
plan
requested
scheduler_address