dask_jobqueue.OARCluster

dask_jobqueue.OARCluster

class dask_jobqueue.OARCluster(n_workers=0, job_cls: typing.Optional[dask_jobqueue.core.Job] = None, loop=None, security=None, shared_temp_directory=None, silence_logs='error', name=None, asynchronous=False, dashboard_address=None, host=None, scheduler_options=None, scheduler_cls=<class 'distributed.scheduler.Scheduler'>, interface=None, protocol=None, config_name=None, **job_kwargs)

Launch Dask on an OAR cluster

Parameters
queuestr

Destination queue for each worker job. Passed to #OAR -q option.

projectstr

Project associated with each worker job. Passed to #OAR –project option.

coresint

Total number of cores per job

memory: str

Total amount of memory per job

processesint

Cut the job up into this many processes. Good for GIL workloads or for nodes with many cores. By default, process ~= sqrt(cores) so that the number of processes and the number of threads per process is roughly the same.

interfacestr

Network interface like ‘eth0’ or ‘ib0’. This will be used both for the Dask scheduler and the Dask workers interface. If you need a different interface for the Dask scheduler you can pass it through the scheduler_options argument: interface=your_worker_interface, scheduler_options={'interface': your_scheduler_interface}.

nannybool

Whether or not to start a nanny process

local_directorystr

Dask worker local directory for file spilling.

death_timeoutfloat

Seconds to wait for a scheduler before closing workers

extralist

Deprecated: use worker_extra_args instead. This parameter will be removed in a future version.

worker_extra_argslist

Additional arguments to pass to dask-worker

env_extralist

Deprecated: use job_script_prologue instead. This parameter will be removed in a future version.

job_script_prologuelist

Other commands to add to script before launching worker.

header_skiplist

Deprecated: use job_directives_skip instead. This parameter will be removed in a future version.

job_directives_skiplist

Directives to skip in the generated job script header. Directives lines containing the specified strings will be removed. Directives added by job_extra_directives won’t be affected.

log_directorystr

Directory to use for job scheduler logs.

shebangstr

Path to desired interpreter for your batch submission script.

pythonstr

Python executable used to launch Dask workers. Defaults to the Python that is submitting these jobs

config_namestr

Section to use from jobqueue.yaml configuration file.

namestr

Name of Dask worker. This is typically set by the Cluster

n_workersint

Number of workers to start by default. Defaults to 0. See the scale method

silence_logsstr

Log level like “debug”, “info”, or “error” to emit here if the scheduler is started locally

asynchronousbool

Whether or not to run this cluster object with the async/await syntax

securitySecurity or Bool

A dask.distributed security object if you’re using TLS/SSL. If True, temporary self-signed credentials will be created automatically.

scheduler_optionsdict

Used to pass additional arguments to Dask Scheduler. For example use scheduler_options={'dashboard_address': ':12435'} to specify which port the web dashboard should use or scheduler_options={'host': 'your-host'} to specify the host the Dask scheduler should run on. See distributed.Scheduler for more details.

scheduler_clstype

Changes the class of the used Dask Scheduler. Defaults to Dask’s distributed.Scheduler.

shared_temp_directorystr

Shared directory between scheduler and worker (used for example by temporary security certificates) defaults to current working directory if not set.

resource_specstr

Request resources and specify job placement. Passed to #OAR -l option.

walltimestr

Walltime for each worker job.

job_extralist

Deprecated: use job_extra_directives instead. This parameter will be removed in a future version.

job_extra_directiveslist

List of other OAR options, for example -t besteffort. Each option will be prepended with the #OAR prefix.

memory_per_core_property_namestr

The memory per core property name of your OAR cluster (usually memcore or mem_core). Existing properties can be listed by executing the oarnodes command. Note that the memory per core property might not exist on your cluster. If that is the case, you can set memory_per_core_property_name=”not_applicable” to avoid getting a warning. If you leave memory_per_core_property_name set to its default value, you will get a warning. memory_per_core_property_name is “not_applicable” or set to its default value, allocated nodes may not have enough memory to match the memory parameter and Dask worker memory management may not work correctly.

Examples

>>> from dask_jobqueue import OARCluster
>>> cluster = OARCluster(queue='regular')
>>> cluster.scale(jobs=10)  # ask for 10 jobs
>>> from dask.distributed import Client
>>> client = Client(cluster)

This also works with adaptive clusters. This automatically launches and kill workers based on load.

>>> cluster.adapt(maximum_jobs=20)
__init__(n_workers=0, job_cls: typing.Optional[dask_jobqueue.core.Job] = None, loop=None, security=None, shared_temp_directory=None, silence_logs='error', name=None, asynchronous=False, dashboard_address=None, host=None, scheduler_options=None, scheduler_cls=<class 'distributed.scheduler.Scheduler'>, interface=None, protocol=None, config_name=None, **job_kwargs)

Methods

__init__([n_workers, job_cls, loop, ...])

adapt(*args[, minimum_jobs, maximum_jobs])

Scale Dask cluster automatically based on scheduler activity.

close([timeout])

from_name(name)

Create an instance of this class to represent an existing cluster by name.

get_client()

Return client for the cluster

get_logs([cluster, scheduler, workers])

Return logs for the cluster, scheduler and workers

job_script()

logs(*args, **kwargs)

new_worker_spec()

Return name and spec for the next worker

scale([n, jobs, memory, cores])

Scale cluster to specified configurations.

scale_down(workers)

scale_up([n, memory, cores])

Scale cluster to n workers

sync(func, *args[, asynchronous, ...])

Call func with args synchronously or asynchronously depending on the calling context

wait_for_workers([n_workers, timeout])

Blocking call to wait for n workers before continuing

Attributes

asynchronous

Are we running in the event loop?

dashboard_link

job_header

job_name

loop

name

observed

plan

requested

scheduler_address