Advanced tips and tricks
========================

The universe of HPC clusters is extremely diverse, with different job
schedulers, different configuration, different decisions (security, usage, etc...)
made by each HPC cluster. An unfortunate consequence of this is that this is
impossible for Dask-Jobqueue to cover all possible tiny edge cases of some HPC
clusters.

This page is an attempt to document tips and tricks that are likely to be useful
on some clusters (strictly more than one ideally although hard to be sure ...).

Skipping unrecognised line in submission script with ``job_directives_skip``
----------------------------------------------------------------------------

*Note: the parameter* ``job_directives_skip`` *was named* ``header_skip`` *until version 0.8.0.* ``header_skip`` *can still
be used, but is considered deprecated and will be removed in a future version.*

On some clusters, the submission script generated by Dask-Jobqueue (you can look
at it with ``print(cluster.job_script())``) may not work
because on some configuration quirk of this HPC cluster. Probably there are
some reasons behind this configuration quirk of course.

You'll get an error when calling ``cluster.scale`` (i.e. where you actually
submit some jobs) that will tell you the job scheduler is not happy with your
job submission script (see examples below). The main parameter you can use to
work-around this is ``job_directives_skip``:

.. code-block:: python

   # this will remove any line containing either '--mem' or
   # 'another-string' from the job submission script
   cluster = YourCluster(
       job_directives_skip=['--mem', 'another-string'],
       **other_options_go_here)


An example of this problem is very well detailed in this `blog post
<https://blog.dask.org/2019/08/28/dask-on-summit#invalid-operations-in-the-job-script>`_
by Matthew Rocklin. In his case, the error was:

.. code-block:: text

   Command:
   bsub /tmp/tmp4874eufw.sh
   stdout:
   
   Typical usage:
   	bsub [LSF arguments] jobscript
   	bsub [LSF arguments] -Is $SHELL
   	bsub -h[elp] [options]
   	bsub -V
   
   NOTES:
    * All jobs must specify a walltime (-W) and project id (-P)
    * Standard jobs must specify a node count (-nnodes) or -ln_slots. These jobs cannot specify a resource string (-R).
    * Expert mode jobs (-csm y) must specify a resource string and cannot specify -nnodes or -ln_slots.
   
   stderr:
   ERROR: Resource strings (-R) are not supported in easy mode. Please resubmit without a resource string.
   ERROR: -n is no longer supported. Please request nodes with -nnodes.
   ERROR: No nodes requested. Please request nodes with -nnodes.

Another example of this issue is this github `issue
<https://github.com/dask/dask-jobqueue/issues/238>`_ where ``--mem`` is not an
accepted option on some SLURM clusters. The error was something like this:

.. code-block:: text

  $sbatch submit_slurm.sh
  sbatch: error: Memory specification can not be satisfied
  sbatch: error: Batch job submission failed: Requested node configuration is not available

Run setup commands before starting the worker with ``job_script_prologue``
--------------------------------------------------------------------------

*Note: the parameter* ``job_script_prologue`` *was named* ``env_extra`` *until version 0.7.4.* ``env_extra`` *can still
be used, but is considered deprecated and will be removed in a future version.*

Sometimes you need to run some setup commands before the actual worker can be started. This includes
setting environment variables, loading environment modules, sourcing/activating a virtual environment,
or activating conda/mamba environments.

This can be achieved using the ``job_script_prologue`` parameter. Example for setting up a virtual environment:

.. code-block:: python

   from dask_jobqueue.htcondor import HTCondorCluster
   job_script_prologue = ['cd /some/path', 'source venv/bin/activate']
   cluster = HTCondorCluster(cores=1, memory="2GB", disk="4GB", log_directory = 'logs', python='python3',
                             job_script_prologue=job_script_prologue)
   print(cluster.job_script())

For ``HTCondorCluster``, the commands will be prepended to the actual python call in the ``Arguments``
parameter in the submit description file. The relevant lines will look like this:

.. code-block:: text

   ...
   Arguments = "-c 'cd /some/path; source venv/bin/activate; python3 -m distributed.cli.dask_worker tcp://<IP>:<PORT> --nthreads 1 --memory-limit 2.00GB --name dummy-name --nanny --death-timeout 60'"
   Executable = /bin/sh
   ...

For other batch systems (``*Cluster`` classes) the additional commands will be inserted as separate lines
in the submission script.

Similarly, if you need to run some commands after the worker has exited, then use ``job_script_epilogue`` parameter.

How to handle job queueing system walltime killing workers
----------------------------------------------------------

In dask-jobqueue, every worker process runs inside a job, and all jobs have a time limit in job queueing systems.
Reaching walltime can be troublesome in several cases:

- when you don't have a lot of room on you HPC platform and have only a few workers at a time 
  (less than what you were hoping for when using scale or adapt). These workers will be 
  killed (and others started) before your workload ends.
- when you really don't know how long your workload will take: all your workers could be 
  killed before reaching the end. In this case, you'll want to use adaptive clusters so 
  that Dask ensures some workers are always up.

If you don't set the proper parameters, you'll run into KilledWorker exception in those two cases.

The solution to this problem is to tell Dask up front that the workers have a finite lifetime:

- Use `--lifetime` worker option. This will enable infinite workloads using adaptive. 
  Workers will be properly shut down before the scheduling system kills them, and all their states moved.
- Use `--lifetime-stagger` when dealing with many workers (say > 20): this will prevent workers from 
  terminating at the same time, thus ease rebalancing tasks and scheduling burden.

Here is an example of how to use these parameters:

.. code-block:: python

    cluster = Cluster(
        walltime="01:00:00",
        cores=4,
        memory="16gb",
        worker_extra_args=["--lifetime", "55m", "--lifetime-stagger", "4m"],
    )
    cluster.adapt(minimum=0, maximum=200)

*Note: the parameter* ``worker_extra_args`` *was named* ``extra`` *until version 0.7.4.* ``extra`` *can still
be used, but is considered deprecated and will be removed in a future version.*

Here is an example of a workflow taking advantage of this, if you want to give it a try or adapt it to your use case:

.. code-block:: python

    import time
    import numpy as np
    from dask_jobqueue import PBSCluster as Cluster
    from dask import delayed
    from dask.distributed import Client, as_completed

    # config in $HOME/.config/dask/jobqueue.yaml
    cluster = Cluster(walltime='00:01:00', cores=1, memory='4gb')
    cluster.adapt(minimum=0, maximum=4)

    client = Client(cluster)

    # each job takes 1s, and we have 4 cpus * 1 min * 60s/min = 240 cpu.s, let's ask for a little more tasks.
    filenames = [f'img{num}.jpg' for num in range(480)]

    def features(num_fn):
        num, image_fn = num_fn
        time.sleep(1)  # takes about 1s to compute features on an image
        features = np.random.random(246)
        return num, features

    num_files = len(filenames)
    num_features = len(features((0, filenames[0]))[1]) # FIX

    X = np.zeros((num_files, num_features), dtype=np.float32)

    for future in as_completed(client.map(features, list(enumerate(filenames)))): # FIX
        i, v = future.result()
        X[i, :] = v


