PBSCluster(queue=None, project=None, resource_spec=None, walltime=None, job_extra=None, config_name='pbs', **kwargs)¶
Launch Dask on a PBS cluster
- queue : str
Destination queue for each worker job. Passed to #PBS -q option.
- project : str
Accounting string associated with each worker job. Passed to #PBS -A option.
- resource_spec : str
Request resources and specify job placement. Passed to #PBS -l option.
- walltime : str
Walltime for each worker job.
- job_extra : list
List of other PBS options, for example -j oe. Each option will be prepended with the #PBS prefix.
- name : str
Name of Dask workers.
- cores : int
Total number of cores per job
- memory: str
Total amount of memory per job
- processes : int
Number of processes per job
- interface : str
Network interface like ‘eth0’ or ‘ib0’.
- death_timeout : float
Seconds to wait for a scheduler before closing workers
- local_directory : str
Dask worker local directory for file spilling.
- extra : list
Additional arguments to pass to dask-worker
- env_extra : list
Other commands to add to script before launching worker.
- python : str
Python executable used to launch Dask workers.
- shebang : str
Path to desired interpreter for your batch submission script.
- kwargs : dict
Additional keyword arguments to pass to LocalCluster
>>> from dask_jobqueue import PBSCluster >>> cluster = PBSCluster(queue='regular', project='DaskOnPBS', cores=12) >>> cluster.scale(10) # this may take a few seconds to launch
>>> from dask.distributed import Client >>> client = Client(cluster)
This also works with adaptive clusters. This automatically launches and kill workers based on load.
It is a good practice to define local_directory to your PBS system scratch directory:
>>> cluster = PBSCluster(queue='regular', project='DaskOnPBS', ... local_directory='$TMPDIR', ... cores=24, processes=6, memory='100GB')
__init__(queue=None, project=None, resource_spec=None, walltime=None, job_extra=None, config_name='pbs', **kwargs)¶
__init__([queue, project, resource_spec, …])
adapt([minimum_cores, maximum_cores, …])
Turn on adaptivity For keyword arguments see dask.distributed.Adaptive Instead of minimum and maximum parameters which apply to the number of worker, If Cluster object implements worker_spec attribute, one can use the following parameters: Parameters ———- minimum_cores: int Minimum number of cores for the cluster maximum_cores: int Maximum number of cores for the cluster minimum_memory: str Minimum amount of memory for the cluster maximum_memory: str Maximum amount of memory for the cluster Examples ——– >>> cluster.adapt(minimum=0, maximum=10, interval=‘500ms’) >>> cluster.adapt(minimum_cores=24, maximum_cores=96) >>> cluster.adapt(minimum_memory=‘60 GB’, maximum_memory= ‘1 TB’)
Stops all running and pending jobs and stops scheduler
Write job submission script to temporary file
Construct a job submission script
scale([n, cores, memory])
Scale cluster to n workers or to the given number of cores or memory number of cores and memory are converted into number of workers using worker_spec attribute.
Close the workers with the given addresses
Brings total worker count up to
Start workers and point them to our local scheduler
Stops all running and pending jobs
Stop a list of jobs
Stop a list of workers
Jobs that have finished
Jobs pending in the queue
Jobs with currently active workers
The scheduler of this cluster