dask_jobqueue.OARCluster

class dask_jobqueue.OARCluster(n_workers=0, job_cls: dask_jobqueue.core.Job = None, loop=None, security=None, silence_logs='error', name=None, asynchronous=False, interface=None, host=None, protocol='tcp://', dashboard_address=':8787', config_name=None, **kwargs)

Launch Dask on an OAR cluster

Parameters
queuestr

Destination queue for each worker job. Passed to #OAR -q option.

projectstr

Accounting string associated with each worker job. Passed to #OAR -p option.

coresint

Total number of cores per job

memory: str

Total amount of memory per job

processesint

Cut the job up into this many processes. Good for GIL workloads or for nodes with many cores.

interfacestr

Network interface like ‘eth0’ or ‘ib0’.

nannybool

Whether or not to start a nanny process

local_directorystr

Dask worker local directory for file spilling.

death_timeoutfloat

Seconds to wait for a scheduler before closing workers

extralist

Additional arguments to pass to dask-worker

env_extralist

Other commands to add to script before launching worker.

header_skiplist

Lines to skip in the header. Header lines matching this text will be removed

log_directorystr

Directory to use for job scheduler logs.

shebangstr

Path to desired interpreter for your batch submission script.

pythonstr

Python executable used to launch Dask workers. Defaults to the Python that is submitting these jobs

config_namestr

Section to use from jobqueue.yaml configuration file.

namestr

Name of Dask worker. This is typically set by the Cluster

n_workersint

Number of workers to start by default. Defaults to 0. See the scale method

silence_logsstr

Log level like “debug”, “info”, or “error” to emit here if the scheduler is started locally

asynchronousbool

Whether or not to run this cluster object with the async/await syntax

securitySecurity

A dask.distributed security object if you’re using TLS/SSL

dashboard_addressstr or int

An address like “:8787” on which to host the Scheduler’s dashboard

resource_specstr

Request resources and specify job placement. Passed to #OAR -l option.

walltimestr

Walltime for each worker job.

job_extralist

List of other OAR options, for example -t besteffort. Each option will be prepended with the #OAR prefix.

Examples

>>> from dask_jobqueue import OARCluster
>>> cluster = OARCluster(queue='regular')
>>> cluster.scale(jobs=10)  # ask for 10 jobs
>>> from dask.distributed import Client
>>> client = Client(cluster)

This also works with adaptive clusters. This automatically launches and kill workers based on load.

>>> cluster.adapt(maximum_jobs=20)
__init__(self, n_workers=0, job_cls:dask_jobqueue.core.Job=None, loop=None, security=None, silence_logs='error', name=None, asynchronous=False, interface=None, host=None, protocol='tcp://', dashboard_address=':8787', config_name=None, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

Methods

__init__(self[, n_workers, loop, security, …])

Initialize self.

adapt(self, \*args, minimum_jobs, …)

Scale Dask cluster automatically based on scheduler activity.

close(self[, timeout])

job_script(self)

logs(self[, scheduler, workers])

Return logs for the scheduler and workers

new_worker_spec(self)

Return name and spec for the next worker

scale(self[, n, jobs, memory, cores])

Scale cluster to specified configurations.

scale_down(self, workers)

scale_up(self[, n, memory, cores])

Scale cluster to n workers

sync(self, func, \*args[, asynchronous, …])

Attributes

asynchronous

config_name

dashboard_link

job_header

job_name

observed

plan

requested

scheduler_address