Writing your own runners
Contents
Writing your own runners¶
This document describes the design of the runners class and how to implement your own Dask runners.
The core assumption in the design of the runner model is that the same script will be executed many times by a job scheduler.
(mpirun|srun|qsub|etc) -n4 myscript.py
├── [0] myscript.py
├── [1] myscript.py
├── [2] myscript.py
└── [3] myscript.py
Within the script the runner class is created early on in the execution.
from dask_jobqueue import SomeRunner
from dask.distributed import Client
with SomeRunner(**kwargs) as runner:
with Client(runner) as client:
client.wait_for_workers(2)
# Do some Dask work
This will result in multiple processes runnning on an HPC that are all instantiating the runner class.
The processes need to coordinate to decide which process should run the Dask Scheduler, which should be Dask Workers
and which should continue running the rest of the client code within the script. This coordination happens during the
__init__()
of the runner class.
The Scheduler and Worker processes exit after they complete to avoid running the client code multiple times.
This means that only one of the processes will continue past the __init__()
of the runner class, the rest will
exit at that point after the work is done.
Base class¶
In dask_jobqueue.runners
there is a BaseRunner
class that can be used for implementing other runners.
The minimum required to implement a new runner is the following methods.
from dask_jobqueue.runner import BaseRunner
class MyRunner(BaseRunner):
async def get_role(self) -> str:
"""Figure out whether I am a scheduler, worker or client.
A common way to do this is by using a process ID. Many job queues give each process
a monotonic index that starts from zero. So we can assume proc 0 is the scheduler, proc 1
is the client and any other procs are workers.
"""
...
async def get_scheduler_address(self) -> str:
"""If I am not the scheduler discover the scheduler address.
A common way to do this is to read a scheduler file from a shared filesystem.
Alternatively if the scheduler process can broadcast it's address via something like MPI
we can define ``BaseRunner.set_scheduler_address()`` which will be called on the scheduler
and then recieve the broadcast in this method.
"""
...
The BaseRunner
class handles starting up Dask once these methods have been implemented.
It also provides many stubbed out hooks to allow you to write code that runs before/after each component is created.
E.g BaseRunner.before_scheduler_start()
, BaseRunner.before_worker_start()
and BaseRunner.before_client_start()
.
The runner must know the address of the scheduler so that it can coordinate the clean shutdown of all processes when we
reach the end of the code (either via __exit__()
or a finalizer). This communication happens independently of
any clients that may be created.
Slurm implementation example¶
As a concrete example you can look at the Slurm implementation.
In the get_role()
method we use the SLURM_PROCID
environment variable to infer the role.
We also add a default scheduler option to set the scheduler_file="scheduler-{job_id}.json"
and I look up the
Job ID from the SLURM_JOB_ID
environment variable to ensource uniqueness. This effectively allows us to broadcast
the scheduler address via the shared filesystem.
Then in the get_scheduler_address()
method we wait for the scheduler file to exist and then open and read the
address from the scheduler file in the same way the dask.distributed.Client
does.
Here’s a cut down example for demonstration purposes.
from dask_jobqueue.runner import BaseRunner
class SLURMRunner(BaseRunner):
def __init__(self, *args, scheduler_file="scheduler.json", **kwargs):
# Get the current process ID from the environment
self.proc_id = int(os.environ["SLURM_PROCID"])
# Tell the scheduler and workers to use a scheduler file on the shared filesystem
self.scheduler_file = scheduler_file
options = {"scheduler_file": self.scheduler_file}
super().__init__(*args, worker_options=options, scheduler_options=options)
async def get_role(self) -> str:
# Choose the role for this process based on the process ID
if self.proc_id == 0 and self.scheduler:
return Role.scheduler
elif self.proc_id == 1 and self.client:
return Role.client
else:
return Role.worker
async def get_scheduler_address(self) -> str:
# Wait for the scheduler file to be created and read the address from it
while not self.scheduler_file or not self.scheduler_file.exists():
await asyncio.sleep(0.2)
cfg = json.loads(self.scheduler_file.read_text())
return cfg["address"]