Source code for plus_slurm.jobcluster

# -*- coding: UTF-8 -*-
# Copyright (c) 2018, Thomas Hartmann
#
# This file is part of the plus_slurm Project,
# see: https://gitlab.com/thht/plus-slurm
#
#    plus_slurm is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    plus_slurm is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with obob_subjectdb. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

import collections
import copy
import getpass
import inspect
import itertools
import logging
import os
import shutil
import subprocess
import sys
import tempfile
import uuid
from pathlib import Path
from typing import TYPE_CHECKING, Any

import humanfriendly
import jinja2
import numpy
import six

from plus_slurm.job import Job, JobItem

if TYPE_CHECKING:
    from collections.abc import Iterable


[docs] class JobCluster: """ This is the main class, the *controller* of plus_slurm. It collects all the jobs and takes care of submitting them to the cluster. It also contains information about how much RAM the jobs need, how many CPUs are requested etc. Parameters ---------- required_ram : :class:`str`, :class:`float`, :class:`int`, optional The amount of RAM required to run one Job in megabytes. A string like "2G" or "200M" will be converted accordingly. request_cpus : :class:`int`, optional The number of CPUs requested qos : :class:`str`, optional The quality of service to use. request_time : :class:`int`, optional Maximum duration of a job in minutes. jobs_dir ::class:`str`, optional Folder to put all the jobs in. This one needs to be on the shared filesystem (so somewhere under /mnt/obob) inc_jobsdir ::class:`str`, optional If this is set to True (default), jobs_dir is the parent folder for all the jobs folders. Each time a job is submitted, a new folder is created in the jobs_dir folder that contains all the necessary files and a folder called "log" containing the log files. If jobs_dir is set to False, the respective files are put directly under jobs_dir. In this case, jobs_dir must either be empty or not exist at all to avoid any side effects. python_bin ::class:`str`, optional The path to the python interpreter that should run the jobs. If you do not set it, it gets chosen automatically. If the python interpreter you are using when submitting the jobs is on /mnt/obob/ that one will be used. If the interpreter you are using is **not** on /mnt/obob/ the default one at /mnt/obob/obob_mne will be used. working_directory ::class:`str`, optional The working directory when the jobs run. exclude_nodes : :class:`str`, optional Comma separated list of nodes to exclude. max_jobs_per_jobcluster : :class:`int`, optional Slurm only allows a certain number of jobs per array job (1000 by default). If the number of jobs to be submitted is higher that this number, jobs will be split in more than one array job. append_to_path ::class:`str`, optional Path to append to the python module search path. extra_slurm_args ::class:`list`, optional Extra arguments for slurm sbatch. These get appended as is with the `#SBATCH` prefix. export ::class:`str`, optional What environment variables to export to the job. Look here for options: https://slurm.schedmd.com/sbatch.html#OPT_export """ _slurm_submit = '/usr/bin/sbatch' _runner_template = 'runner.py' _submit_template = 'submit.sh' def __init__( # noqa PLR0913 self, required_ram: str = '2G', request_cpus: int = 1, jobs_dir: str | Path = 'jobs', request_time: int = 10, inc_jobsdir: bool = True, qos: str | None = None, python_bin: str | None = None, working_directory: str | None = None, exclude_nodes: list[str] | None = None, max_jobs_per_jobcluster: int = 1000, append_to_path: list[str | Path] | None = None, extra_slurm_args: list[str] | None = None, export: str = 'NONE', ) -> None: self.required_ram = required_ram # type: ignore self.request_time = request_time self.request_cpus = request_cpus self.jobs_dir = Path(jobs_dir) self.inc_jobsdir = inc_jobsdir self.python_bin = python_bin # type: ignore self.working_directory = working_directory # type: ignore self._output_folder: Path | None = None self._exclude_nodes = exclude_nodes self._max_jobs_per_jobcluster = max_jobs_per_jobcluster self._append_to_path = append_to_path self._qos = qos self._export = export self._extra_slurm_args = extra_slurm_args if self._extra_slurm_args is None: self._extra_slurm_args = list() self._jobs: list[JobItem] = list() self._jinja_env = jinja2.Environment( loader=jinja2.PackageLoader('plus_slurm', 'jinja2_templates'), trim_blocks=True, lstrip_blocks=True ) self._jobs_submitted: list[JobItem] = list() self._current_jobs_submitting: list[JobItem] = list()
[docs] def add_job(self, job: type[Job], *args: Any, **kwargs: Any) -> None: """ Add one job to the JobCluster. All further arguments will be passed on to the Job. Parameters ---------- job : child of :class:`plus_slurm.Job` The job class to be added. *args Variable length argument list. **kwargs Arbitrary keyword arguments. """ if Job not in inspect.getmro(job): raise TypeError('Job must be a subclass of plus_slurm.Job') kwargs = collections.OrderedDict(kwargs) args_permuted = [idx for (idx, cur_arg) in enumerate(args) if isinstance(cur_arg, PermuteArgument)] # noqa kwargs_permuted = [ idx for (idx, cur_key) in enumerate(kwargs.keys()) if isinstance(kwargs[cur_key], PermuteArgument) ] # noqa if not args_permuted and not kwargs_permuted: self._jobs.append(JobItem(job, *args, **kwargs)) else: all_kwargs_as_array = numpy.array( [kwargs[cur_key] for (idx, cur_key) in enumerate(kwargs.keys()) if idx in kwargs_permuted] ) # noqa all_args_as_array = numpy.hstack((numpy.array(args)[args_permuted], all_kwargs_as_array)) # noqa all_args_permutations = itertools.product(*[x.args for x in all_args_as_array]) # noqa for cur_perm_args in all_args_permutations: new_args = numpy.array(args) new_kwargs = copy.deepcopy(kwargs) cur_perm_args_list = list(cur_perm_args) if args_permuted: new_args[args_permuted] = cur_perm_args_list[0 : len(args_permuted)] # noqa del cur_perm_args_list[0 : len(args_permuted)] if kwargs_permuted: for idx, cur_kwarg_idx in enumerate(kwargs_permuted): new_kwargs[list(new_kwargs.keys())[cur_kwarg_idx]] = cur_perm_args_list[idx] # noqa self.add_job(job, *new_args, **new_kwargs)
[docs] def run_local(self) -> None: """ Runs the added jobs locally. """ self._remove_notrunning_jobs() while self.n_jobs > 0: self._current_jobs_submitting = self._prepare_jobs_directory() self._generate_runner_file() self._generate_submit_file() for idx, _ in enumerate(sorted(Path(self.output_folder, 'slurm').glob('*.json.gzip'))): with ( Path(self.output_folder, 'log', f'out_{idx + 1}.log').open('w') as out, ): env = os.environ env['SLURM_ARRAY_TASK_ID'] = str(idx + 1) env['SLURM_ARRAY_JOB_ID'] = '0' subprocess.run( [self.executable] + str(self.arguments).split(' '), cwd=self.working_directory, stderr=subprocess.STDOUT, stdout=out, env=env, check=False, ) self._jobs_submitted.extend(self._current_jobs_submitting) self._current_jobs_submitting = []
[docs] def submit(self, do_submit: bool = True) -> None: """ Runs the added jobs on the cluster. Parameters ---------- do_submit : :class:`bool`, optional Set this to false to not actually submit but prepare all files. """ old_njobs = self.n_jobs self._remove_notrunning_jobs() while self.n_jobs > 0: self._current_jobs_submitting = self._prepare_jobs_directory() self._generate_runner_file() submit_fname = self._generate_submit_file() if do_submit: output = subprocess.run( [self._slurm_submit, submit_fname], stderr=subprocess.STDOUT, cwd=self.working_directory, stdout=subprocess.PIPE, text=True, check=False, ) print(f'Submit output:\n{output.stdout}\n') print(f'Submitted {len(self._current_jobs_submitting)}/{old_njobs} jobs.') # noqa else: logging.info('Not actually submitting') self._jobs_submitted.extend(self._current_jobs_submitting) self._current_jobs_submitting = []
def _remove_notrunning_jobs(self) -> None: new_job_list = [] for cur_job in self._jobs: cur_job_object = cur_job.make_object() if cur_job_object.shall_run_private(): new_job_list.append(cur_job) self._jobs = new_job_list @property def _jinja_context(self) -> dict[str, Any]: return { 'executable': self.executable, 'arguments': self.arguments, 'jobs_dir': self.output_folder, 'required_mem': int(self.required_ram / 1024 / 1024), 'request_cpus': self.request_cpus, 'request_time': self.request_time, 'uuid': str(uuid.uuid4()), 'python_home': self.python_home, 'n_jobs': len(self._current_jobs_submitting), 'working_directory': self.working_directory, 'exclude_nodes': self._exclude_nodes, 'append_to_path': self._append_to_path, 'qos': self._qos, 'export': self._export, 'extra_slurm_args': self._extra_slurm_args, } @property def jobs_submitted(self) -> tuple[JobItem, ...]: return tuple(self._jobs_submitted) @property def n_jobs_submitted(self) -> int: return len(self._jobs_submitted) def _generate_submit_file(self) -> Path: template = self._jinja_env.get_template(self._submit_template) submit_rendered = template.render(self._jinja_context) submit_fname = Path(self.output_folder, 'slurm', 'submit.sh') with Path(submit_fname).open('w') as submit_file: submit_file.write(submit_rendered) return submit_fname def _generate_runner_file(self) -> None: template = self._jinja_env.get_template(self._runner_template) runner_rendered = template.render(self._jinja_context) with Path(self.runner_filename).open('w') as runner_file: runner_file.write(runner_rendered) def _prepare_jobs_directory(self) -> list[JobItem]: if self.inc_jobsdir: Path(self.jobs_dir).mkdir(exist_ok=True) jobs_idx = 1 while Path(self.jobs_dir, '%03d' % (jobs_idx,)).exists(): jobs_idx = jobs_idx + 1 final_jobs_dir = Path(self.jobs_dir, '%03d' % (jobs_idx,)) else: final_jobs_dir = self.jobs_dir if final_jobs_dir.exists(): raise ValueError('The jobs folder already exists. ' 'Please choose another one') Path(final_jobs_dir).mkdir(exist_ok=True) json_folder = Path(final_jobs_dir, 'slurm') Path(json_folder).mkdir(exist_ok=True) log_folder = Path(final_jobs_dir, 'log') Path(log_folder).mkdir(exist_ok=True) current_jobs = self._jobs[: self._max_jobs_per_jobcluster] del self._jobs[: self._max_jobs_per_jobcluster] for idx, cur_job in enumerate(current_jobs): cur_job.to_json(Path(json_folder, 'job%03d.json.gzip' % (idx + 1,))) self._output_folder = final_jobs_dir.resolve() src_module = inspect.getmodule(JobItem) if src_module is None: raise ValueError('Cannot find source module for JobItem.') src_file = inspect.getsourcefile(src_module) if src_file is None: raise ValueError('Cannot find source file for JobItem.') shutil.copytree(Path(src_file).parent, Path(json_folder, 'plus_slurm')) return current_jobs @property def owner(self) -> str: return getpass.getuser() @property def required_ram(self) -> float: return self._required_ram @required_ram.setter def required_ram(self, required_ram: str | float | int) -> None: if isinstance(required_ram, six.string_types): required_ram = humanfriendly.parse_size(required_ram) if not isinstance(required_ram, (int, float)): raise TypeError('required_ram must be either a string or a number.') self._required_ram = required_ram @property def python_bin(self) -> Path: return self._python_bin @python_bin.setter def python_bin(self, python_bin: Path | str) -> None: if not python_bin: python_bin = sys.executable if not Path(python_bin).is_file(): raise ValueError('The python interpreter does not exist.') if 'Python' not in str(subprocess.check_output([python_bin, '-V'], stderr=subprocess.STDOUT)): raise ValueError('Cannot execute the python interpreter or ' 'it is not a python interpreter') self._python_bin = Path(python_bin) @property def executable(self) -> str: return str(self.python_bin) @property def arguments(self) -> str: return str(self.runner_filename) @property def python_home(self) -> Path: return Path(self.python_bin).parent.parent @property def working_directory(self) -> Path: return self._working_directory @working_directory.setter def working_directory(self, working_directory: Path | str | None) -> None: if not working_directory: working_directory = Path.cwd() if not Path(working_directory).is_dir(): raise ValueError('working_directory is not a valid directory.') self._working_directory = Path(working_directory) @property def output_folder(self) -> Path: if self._output_folder is None: raise ValueError('No output folder set.') return self._output_folder @property def n_jobs(self) -> int: return len(self._jobs) @property def runner_filename(self) -> Path: return Path(self.output_folder, 'slurm', 'runner.py')
[docs] class ApptainerJobCluster(JobCluster): """ If you use this class instead of :class:`JobCluster`, the jobs will run in a Apptainer Container. You need to supply the same parameters as when instantiating :class:`JobCluster`. Additionally, you need to at least set the Apptainer Image you want to use. Parameters ---------- apptainer_image ::class:`str` Set this to a apptainer image to have the jobs execute in it. Can be a link to a local file or to some online repository. mounts : :class:`tuple`, :class:`list`, optional What to mount in the container. mount_slurm_folders : :class:`bool` If `true`, mount `/etc/slurm` and `/var/run/munge` so you can issue slurm commands from within the job. apptainer_args : :class:`str`, optional Additional arguments to supply to apptainer prepull_apptainer_image : :class:`bool`, optional Pull the apptainer image before running the jobs no avoid 100s of jobs pulling the same image at the same time. """ _slurm_folders = ('/etc/slurm', '/var/run/munge') def __init__( # noqa PLR0913 self, *args: Any, apptainer_image: str | None = None, mounts: Iterable[str] = ('/mnt',), mount_slurm_folders: bool = True, apptainer_args: str = '', prepull_apptainer_image: bool = True, **kwargs: Any, ) -> None: super().__init__(*args, **kwargs) if apptainer_image is None: raise RuntimeError('You must specify a Apptainer Image') self._apptainer_image = apptainer_image self._mounts = list(mounts) self._apptainer_args = apptainer_args self._prepull_apptainer_image = prepull_apptainer_image if mount_slurm_folders: for mnt in self._slurm_folders: if mnt not in self._mounts: self._mounts.append(mnt) @property def apptainer_command(self) -> str: if Path('/usr/bin/apptainer').is_file(): return '/usr/bin/apptainer' elif Path('/usr/bin/singularity').is_file(): return '/usr/bin/singularity' else: raise RuntimeError('Neither apptainer nor singularity found.') @property def apptainer_arguments(self) -> str: cmd_list = ['exec'] for mnt in self._mounts: cmd_list.append(f'-B {mnt}') if self._apptainer_args: cmd_list.append(self._apptainer_args.strip()) return ' '.join(cmd_list) @property def executable(self) -> str: return self.apptainer_command @property def arguments(self) -> str: return f'{self.apptainer_arguments} {self._apptainer_image} {self.python_bin} {self.runner_filename}' # noqa
[docs] def submit(self, do_submit: bool = True) -> None: if self._prepull_apptainer_image: with tempfile.TemporaryDirectory() as tmpdir: subprocess.run( [self.apptainer_command, 'pull', Path(tmpdir, 'tmp.sif'), self._apptainer_image], check=False ) super().submit(do_submit=do_submit)
[docs] class PermuteArgument: """ This is a container for to-be-permuted arguments. See the example in the introductions for details. """ def __init__(self, args: Iterable) -> None: self.args = args