Source code for ornithology.condor

# Copyright 2019 HTCondor Team, Computer Sciences Department,
# University of Wisconsin-Madison, WI.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Mapping, Optional
import logging

import subprocess
from pathlib import Path
import shutil
import time
import functools
import shlex
import re
import textwrap
import os
import sys
import getpass
import socket

from .try_os_set import (
    try_os_setegid,
    try_os_seteuid,
)

import htcondor2 as htcondor

from . import job_queue, env, cmd, daemons, handles, scripts

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

unique_identifier = 0
true_exe = "/usr/bin/true" if sys.platform == "darwin" else "/bin/true"
if sys.platform == "win32" : true_exe = "$(LOCAL_DIR)/condor_tests/success.exe"

DEFAULT_PARAMS = {
    "LOCAL_CONFIG_FILE": "",
    "COLLECTOR_HOST": "$(CONDOR_HOST):0",
    "SHARED_PORT_PORT": "0",
    "MAIL": true_exe,
    "SENDMAIL": true_exe,
    "UPDATE_INTERVAL": "2",
    "POLLING_INTERVAL": "2",
    "NEGOTIATOR_INTERVAL": "2",
    "NEGOTIATOR_MIN_INTERVAL": "2",
    "SCHEDD_INTERVAL": "30",
    "STARTER_UPDATE_INTERVAL": "2",
    "SHADOW_QUEUE_UPDATE_INTERVAL": "2",
    "STARTER_INITIAL_UPDATE_INTERVAL": "2",
    "NEGOTIATOR_CYCLE_DELAY": "2",
    "MachineMaxVacateTime": "2",
    "RUNBENCHMARKS": "0",
    "MAX_JOB_QUEUE_LOG_ROTATIONS": "10",
    "PROCD_ADDRESS": "$(PROCD_ADDRESS)_$(PERSONAL_INSTANCE_ID)", # if we start multiple condors each must have a different PROCD_ADDRESS
    "FILETRANSFER_PLUGINS" : f"$(FILETRANSFER_PLUGINS) {scripts.custom_fto_plugins()}",
    "SINGULARITY": "/usr/bin/false"
}


def master_is_not_alive(self):
    return not self.master_is_alive


def condor_is_ready(self):
    return self.condor_is_ready


def condor_master_was_started(self):
    return self.condor_master is not None


def skip_if(condition):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(self, *args, **kwargs):
            if condition(self):
                logger.debug(
                    "Skipping call to {} for {} because {} was True".format(
                        func.__name__, self, condition.__name__
                    )
                )
                return

            return func(self, *args, **kwargs)

        return wrapper

    return decorator


[docs] class Condor: """ A :class:`Condor` is responsible for managing the lifecycle of an HTCondor pool. """ def __init__( self, local_dir: Path, config: Optional[Mapping[str, str]] = None, raw_config: Optional[str] = None, clean_local_dir_before: bool = True, submit_user : str = None, condor_user : str = None, use_sudo: Optional[bool] = None, ): """ Parameters ---------- local_dir The local directory for the HTCondor pool. All HTCondor state will be stored in this directory. config HTCondor configuration parameters to inject, as a mapping of key-value pairs. raw_config Raw HTCondor configuration language to inject, as a string. clean_local_dir_before If ``True``, any existing directory at ``local_dir`` will be removed before trying to stand up the new instance. submit_user If set, the user to switch to when submitting a job. condor_user If set, the user that HTCondor will run as. use_sudo If ``True``, start condor_master with sudo. If ``None`` (default), checks HTCONDOR_TEST_USE_SUDO environment variable. If ``False``, never use sudo even if environment variable is set. """ self.submit_user = submit_user # Auto-detect sudo mode from environment if not explicitly specified if use_sudo is None: use_sudo = os.environ.get("HTCONDOR_TEST_USE_SUDO") == "1" self.use_sudo = use_sudo # If using sudo but no condor_user specified, default to "condor" if self.use_sudo and condor_user is None: condor_user = "condor" self.condor_user = condor_user self.local_dir = local_dir # TODO: don't assume paths, get these from config self.execute_dir = self.local_dir / "execute" self.lock_dir = self.local_dir / "lock" self.log_dir = self.local_dir / "log" self.run_dir = self.local_dir / "run" self.spool_dir = self.local_dir / "spool" self.passwords_dir = self.local_dir / "passwords.d" self.tokens_dir = self.local_dir / "tokens.d" self.config_file = self.local_dir / "condor_config" if config is None: config = {} self.config = {k: v if v is not None else "" for k, v in config.items()} self.raw_config = raw_config or "" self.clean_local_dir_before = clean_local_dir_before self.condor_master = None self.condor_is_ready = False self.job_queue = job_queue.JobQueue(self)
[docs] def use_config(self): """ Returns a context manager that sets ``CONDOR_CONFIG`` to point to the config file for this HTCondor pool. """ return env.SetCondorConfig(self.config_file)
def __repr__(self): return "{}(local_dir = {})".format(self.__class__.__name__, self.local_dir) @property def master_is_alive(self): return self.condor_master is not None and self.condor_master.poll() is None def __enter__(self): self._start() return self def __exit__(self, exc_type, exc_val, exc_tb): self._cleanup() def _start(self): logger.info("Starting {}".format(self)) try: self._setup_local_dirs() self._write_config() self._start_condor() self._wait_for_ready() except BaseException: logger.exception( "Encountered error during setup of {}, cleaning up!".format(self) ) self._cleanup() raise logger.info("Started {}".format(self)) def _setup_local_dirs(self): if self.clean_local_dir_before and self.local_dir.exists(): shutil.rmtree(self.local_dir) logger.debug("Removed existing local dir for {}".format(self)) condor_dirs_to_make = [ self.local_dir, self.execute_dir, self.lock_dir, self.log_dir, self.run_dir, self.spool_dir, self.passwords_dir, self.tokens_dir, ] # First make the dirs as non-privileged user for dir in condor_dirs_to_make: dir.mkdir(parents=True, exist_ok=not self.clean_local_dir_before) # Unprivileged users will write the condor_config file here, cheat by making world writable if self.use_sudo: # chmod 0777 condor dir so we can write config file there as non-root cmd.run_command( ["sudo", "chmod", "0777", f"{self.condor_user}:{self.condor_user}", self.local_dir.as_posix()], echo=False, suppress=True, ) # Now chown them if needed for dir in condor_dirs_to_make: if self.use_sudo: cmd.run_command( ["sudo", "chown", "-R", f"{self.condor_user}:{self.condor_user}", dir.as_posix()], echo=False, suppress=True, ) else: if self.condor_user: try: shutil.chown( dir, user=self.condor_user, group=self.condor_user ) except PermissionError: pass def _write_config(self): # TODO: how to ensure that this always hits the right config? # TODO: switch to -summary instead of -write:up write = cmd.run_command( ["condor_config_val", "-write:up", self.config_file.as_posix()], echo=False, suppress=True, ) if write.returncode != 0: raise Exception("Failed to copy base OS config: {}".format(write.stderr)) param_lines = [] param_lines += ["#", "# ROLES", "#"] param_lines += [ "DAEMON_LIST = MASTER COLLECTOR NEGOTIATOR STARTD SCHEDD", ] base_config = { "LOCAL_DIR": self.local_dir.as_posix(), #"EXECUTE": "$(LOCAL_DIR)/execute", "LOCK": "$(LOCAL_DIR)/lock", #"LOG": "$(LOCAL_DIR)/log", "RUN": "$(LOCAL_DIR)/run", #"SPOOL": "$(LOCAL_DIR)/spool", "SEC_PASSWORD_DIRECTORY": "$(LOCAL_DIR)/passwords.d", "SEC_TOKEN_SYSTEM_DIRECTORY": "$(LOCAL_DIR)/tokens.d", # "STARTD_DEBUG": "D_FULLDEBUG D_COMMAND:1", } if self.condor_user or self.use_sudo: try: from pwd import getpwnam from grp import getgrnam uid = getpwnam(self.condor_user).pw_uid gid = getgrnam(self.condor_user).gr_gid base_config["CONDOR_IDS"] = f"{uid}.{gid}" except ModuleNotFoundError: # This should arguably be a setup error if this process # has user-switching privileges. pass if self.use_sudo: username = getpass.getuser() fqdn = socket.getfqdn() base_config["ALLOW_WRITE"] = f"{username}@{fqdn}" # global unique_identifier unique_identifier += 1 base_config["PERSONAL_INSTANCE_ID"] = "{}".format(unique_identifier) # win32 default for PROCD_ADDRESS can end up being too long when running tests # so we want to use a different value than the default # $Fddub() extracts the last 2 components of LOCAL_DIR as posix with no trailing / if sys.platform == "win32": base_config["PROCD_ADDRESS"] = r"\\.\pipe\$Fddub(LOCAL_DIR)/{}_$(PERSONAL_INSTANCE_ID)".format(os.getpid()) param_lines += ["#", "# BASE PARAMS", "#"] param_lines += ["{} = {}".format(k, v) for k, v in base_config.items()] param_lines += ["#", "# DEFAULT PARAMS", "#"] param_lines += ["{} = {}".format(k, v) for k, v in DEFAULT_PARAMS.items()] param_lines += ["#", "# CUSTOM PARAMS", "#"] param_lines += ["{} = {}".format(k, v) for k, v in self.config.items()] param_lines += ["#", "# RAW PARAMS", "#"] param_lines += textwrap.dedent(self.raw_config).splitlines() with self.config_file.open(mode="a") as f: f.write("\n".join(param_lines)) logger.debug("Wrote config file for {} to {}".format(self, self.config_file)) @skip_if(condor_master_was_started) def _start_condor(self): with env.SetCondorConfig(self.config_file): # If we invoke the condor_master via sudo, sudo won't use the path for # security reasons, so we have to find the binary ourselves. master_bin = shutil.which("condor_master") cmd = [master_bin, "-f"] if self.use_sudo: cmd = ["sudo", "-E"] + cmd logger.info( f"Starting condor_master with sudo (will run as {self.condor_user or 'root'})" ) self.condor_master = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) logger.debug( "Started condor_master (pid {})".format(self.condor_master.pid) ) @skip_if(condor_is_ready) def _wait_for_ready(self, timeout: int = 120, dump_logs_if_fail: bool = False): daemons = set( self.run_command(["condor_config_val", "DAEMON_LIST"], echo=False) .stdout.replace(",", " ") .split(" ") ) master_log_path = self.master_log.path logger.debug( "Starting up daemons for {}, waiting for: {}".format( self, " ".join(sorted(daemons)) ) ) start = time.time() while time.time() - start < timeout: time_to_give_up = int(timeout - (time.time() - start)) # if the master log does not exist yet, we can't use condor_who if not master_log_path.exists(): logger.debug( "MASTER_LOG at {} does not yet exist for {}, retrying in 1 seconds (giving up in {} seconds)".format( self.master_log, self, time_to_give_up ) ) time.sleep(1) continue if "STARTD" in daemons: who_command = "condor_who -wait:10 'IsReady && STARTD_State =?= \"Ready\"'" else: who_command = "condor_who -wait:10 'IsReady'" who = self.run_command( shlex.split(who_command), echo=False, suppress=True, ) if who.stdout.strip() == "": logger.debug( "condor_who stdout was unexpectedly blank for {}, retrying in 1 second (giving up in {} seconds)".format( self, time_to_give_up ) ) time.sleep(1) continue who_ad = dict(kv.split(" = ") for kv in who.stdout.splitlines()) if ( who_ad.get("IsReady") == "true" and ("STARTD" not in daemons or who_ad.get("STARTD_State") == '"Ready"') and all(who_ad.get(d) == '"Alive"' for d in daemons) ): self.condor_is_ready = True return logger.debug( "{} is waiting for daemons to be ready (giving up in {} seconds)".format( self, time_to_give_up ) ) self.run_command(["condor_who", "-quick"]) if dump_logs_if_fail: for logfile in self.log_dir.iterdir(): logger.error("Contents of {}:\n{}".format(logfile, logfile.read_text())) raise TimeoutError("Standup for {} failed".format(self)) def _cleanup(self): logger.info("Cleaning up {}".format(self)) self._condor_off() self._wait_for_master_to_terminate() # TODO: look for core dumps # self._remove_local_dir() logger.info("Cleaned up {}".format(self)) @skip_if(master_is_not_alive) def _condor_off(self): off = self.run_command( ["condor_off", "-fast", "-daemon", "master"], timeout=30, echo=False ) if not off.returncode == 0: logger.error( "condor_off failed, exit code: {}, stderr: {}".format( off.returncode, off.stderr ) ) self._terminate_condor_master() return logger.debug("condor_off succeeded: {}".format(off.stdout)) @skip_if(master_is_not_alive) def _wait_for_master_to_terminate(self, kill_after: int = 60, timeout: int = 120): logger.debug( "Waiting for condor_master (pid {}) to terminate".format( self.condor_master.pid ) ) start = time.time() killed = False while True: try: self.condor_master.communicate(timeout=5) break except subprocess.TimeoutExpired: pass elapsed = time.time() - start if not killed: logger.debug( "condor_master has not terminated yet, will kill in {} seconds".format( int(kill_after - elapsed) ) ) if elapsed > kill_after and not killed: # TODO: in this path, we should also kill the other daemons # TODO: we can find their pids by reading the master log self._kill_condor_master() killed = True if elapsed > timeout: raise TimeoutError( "Timed out while waiting for condor_master to terminate" ) logger.debug( "condor_master (pid {}) has terminated with exit code {}".format( self.condor_master.pid, self.condor_master.returncode ) ) @skip_if(master_is_not_alive) def _terminate_condor_master(self): if not self.master_is_alive: return self.condor_master.terminate() logger.debug( "Sent terminate signal to condor_master (pid {})".format( self.condor_master.pid ) ) @skip_if(master_is_not_alive) def _kill_condor_master(self): self.condor_master.kill() logger.debug( "Sent kill signal to condor_master (pid {})".format(self.condor_master.pid) ) def read_config(self) -> str: return self.config_file.read_text()
[docs] def run_command(self, *args, **kwargs) -> subprocess.CompletedProcess: """ Execute a command with ``CONDOR_CONFIG`` set to point to this HTCondor pool. Arguments and keyword arguments are passed through to :func:`~run_command`. """ with self.use_config(): return cmd.run_command(*args, **kwargs)
@property def master_log(self) -> daemons.DaemonLog: """A :class:`DaemonLog` for the pool's master.""" return self._get_daemon_log("MASTER") @property def collector_log(self) -> daemons.DaemonLog: """A :class:`DaemonLog` for the pool's collector.""" return self._get_daemon_log("COLLECTOR") @property def negotiator_log(self) -> daemons.DaemonLog: """A :class:`DaemonLog` for the pool's negotiator.""" return self._get_daemon_log("NEGOTIATOR") @property def schedd_log(self) -> daemons.DaemonLog: """A :class:`DaemonLog` for the pool's schedd.""" return self._get_daemon_log("SCHEDD") @property def startd_log(self) -> daemons.DaemonLog: """A :class:`DaemonLog` for the pool's startd.""" return self._get_daemon_log("STARTD") @property def shadow_log(self) -> daemons.DaemonLog: """A :class:`DaemonLog` for the pool's shadows.""" return self._get_daemon_log("SHADOW") @property def credmon_oauth_log(self) -> daemons.DaemonLog: """A :class:`DaemonLog` for the OAuth CredMon.""" return self._get_daemon_log("CREDMON_OAUTH") @property def job_queue_log(self) -> Path: """The :class:`pathlib.Path` to the pool's job queue log.""" return self._get_log_path("JOB_QUEUE") @property def startd_address(self): """The address of the pool's startd.""" return self._get_address_file("STARTD").read_text().splitlines()[0] def _get_log_path(self, subsystem): return self._get_path_from_condor_config_val("{}_LOG".format(subsystem)) def _get_address_file(self, subsystem): return self._get_path_from_condor_config_val( "{}_ADDRESS_FILE".format(subsystem) ) def _get_path_from_condor_config_val(self, attr): return Path( self.run_command( ["condor_config_val", attr], echo=False, suppress=True ).stdout ) def _get_daemon_log(self, daemon_name): return daemons.DaemonLog(self._get_log_path(daemon_name))
[docs] def get_local_schedd(self): """Return the :class:`htcondor.Schedd` for this pool's schedd.""" with self.use_config(): #htcondor.enable_debug("D_HOSTNAME D_CAT") sched = htcondor.Schedd() #htcondor.disable_debug() return sched
[docs] def get_local_collector(self): """Return the :class:`htcondor.Collector` for this pool's collector.""" with self.use_config(): return htcondor.Collector()
[docs] def status(self, ad_type=htcondor.AdTypes.Any, constraint="true", projection=None): """ Perform a status query against the pool's collector. Parameters ---------- ad_type constraint projection Returns ------- """ projection = projection or [] with self.use_config(): result = self.get_local_collector().query( ad_type=ad_type, constraint=constraint, projection=projection ) logger.debug( 'Ads returned by status query for {} ads with constraint "{}":\n'.format( ad_type, constraint ) + "\n".join(str(ad) for ad in result) ) return result
[docs] def direct_status(self, daemon_type, ad_type, constraint="true", projection=None): """ Perform a direct status query against a daemon. Parameters ---------- daemon_type ad_type constraint projection Returns ------- """ projection = projection or [] # TODO: we would like this to use Collector.directQuery, but it can't because of https://htcondor-wiki.cs.wisc.edu/index.cgi/tktview?tn=7420 with self.use_config(): daemon_location = self.get_local_collector().locate(daemon_type) # pretend the target daemon is a collector so we can query it daemon = htcondor.Collector(daemon_location["MyAddress"]) result = daemon.query( ad_type=ad_type, constraint=constraint, projection=projection ) logger.debug( 'Ads returned by direct status query against {} for {} ads with constraint "{}":\n'.format( daemon_type, ad_type, constraint ) + "\n".join(str(ad) for ad in result) ) return result
[docs] def query( self, constraint="true", projection=None, limit=-1, opts=htcondor.QueryOpts.Default, ): """ Perform a job information query against the pool's schedd. Parameters ---------- constraint projection limit opts Returns ------- """ if projection is None: projection = [] with self.use_config(): result = self.get_local_schedd().query( constraint=constraint, projection=projection, limit=limit, opts=opts ) logger.debug( 'Ads returned by queue query with constraint "{}":\n'.format(constraint) + "\n".join(str(ad) for ad in result) ) return result
[docs] def act(self, action, constraint="true"): """ Perform a job action against the pool's schedd. Parameters ---------- action constraint Returns ------- """ with self.use_config(): logger.debug( 'Executing action: {} with constraint "{}"'.format(action, constraint) ) return self.get_local_schedd().act(action, constraint)
[docs] def edit(self, attr, value, constraint="true"): """ Perform a job attribute edit action against the pool's schedd. Parameters ---------- attr value constraint Returns ------- """ with self.use_config(): logger.debug( 'Executing edit: setting {} to {} with constraint "{}"'.format( attr, value, constraint ) ) return self.get_local_schedd().edit(constraint, attr, value)
[docs] def submit( self, description: Mapping[str, str], count=1, itemdata=None ) -> handles.ClusterHandle: """ Submit jobs to the pool. Internally, this method uses the Python bindings to submit jobs and returns a rich handle object (a :class:`ClusterHandle`) that can be used to inspect and manage the jobs. .. warning:: If your intent is to test job submission itself, **DO NOT** use this method. Instead, submit jobs by calling `condor_q` with :meth:`Condor.run_command`, using the Python bindings directly, or whatever other method you want to test. Parameters ---------- description count itemdata Returns ------- """ sub = htcondor.Submit(dict(description)) logger.debug( "Submitting jobs with description:\n{}\nCount: {}\nItemdata: {}".format( sub, count, itemdata ) ) if self.submit_user: euid = os.geteuid() egid = os.getegid() try_os_setegid( name=self.submit_user ) try_os_seteuid( name=self.submit_user ) with self.use_config(): schedd = self.get_local_schedd() result = schedd.submit(sub, count=count, itemdata=itemdata) logger.debug("Got submit result:\n{}".format(result)) if self.submit_user: try_os_setegid( number=egid ) try_os_seteuid( number=euid ) return handles.ClusterHandle(self, result)
[docs] def submit_dag(self, dagfile: Path) -> handles.ClusterHandle: """ Submit a DAG. Parameters ---------- dagfile Returns ------- """ return self.submit(htcondor.Submit.from_dag(str(dagfile)))
RE_PORT_HOST = re.compile(r"\d+\.\d+\.\d+\.\d+:\d+") def get_port_host_from_sinful(sinful): return RE_PORT_HOST.search(sinful)[0]