Source code for ornithology.handles

# Copyright 2019 HTCondor Team, Computer Sciences Department,
# University of Wisconsin-Madison, WI.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional, Callable, List, Iterator

import logging
import abc
import time
from pathlib import Path
import collections
import weakref

import htcondor2 as htcondor
import classad2 as classad

from . import jobs, exceptions, utils

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


[docs] class Handle(abc.ABC): """ A connection to a set of jobs defined by a constraint. The handle can be used to query, act on, or edit those jobs. """ def __init__(self, condor): self.condor = condor @property def constraint_string(self) -> str: raise NotImplementedError def __repr__(self): return "{}(constraint = {})".format(type(self).__name__, self.constraint_string) def __eq__(self, other): return all( ( isinstance(other, self.__class__), self.condor == other.condor, self.constraint_string == other.constraint_string, ) ) def __hash__(self): return hash((self.__class__, self.constraint_string, self.condor)) @property def get_schedd(self): return self.condor.get_local_schedd()
[docs] def query(self, projection=None, options=htcondor.QueryOpts.Default, limit=-1): """ Query against this set of jobs. Parameters ---------- projection The :class:`classad.ClassAd` attributes to retrieve, as a list of case-insensitive strings. If ``None`` (the default), all attributes will be returned. options limit The total number of matches to return from the query. If ``None`` (the default), return all matches. Returns ------- ads : Iterator[:class:`classad.ClassAd`] An iterator over the :class:`classad.ClassAd` that match the constraint. """ return self.condor.query( self.constraint_string, projection=projection, opts=options, limit=limit )
def _act(self, action): return self.condor.act(action, self.constraint_string)
[docs] def remove(self): """ Remove jobs from the queue. Returns ------- ad : :class:`classad.ClassAd` An ad describing the results of the action. """ return self._act(htcondor.JobAction.Remove)
[docs] def hold(self): """ Hold jobs. Returns ------- ad : :class:`classad.ClassAd` An ad describing the results of the action. """ return self._act(htcondor.JobAction.Hold)
[docs] def release(self): """ Release held jobs. They will return to the queue in the idle state. Returns ------- ad : :class:`classad.ClassAd` An ad describing the results of the action. """ return self._act(htcondor.JobAction.Release)
[docs] def pause(self): """ Pause jobs. Jobs will stop running, but will hold on to their claimed resources. Returns ------- ad : :class:`classad.ClassAd` An ad describing the results of the action. """ return self._act(htcondor.JobAction.Suspend)
[docs] def resume(self): """ Resume (un-pause) jobs. Returns ------- ad : :class:`classad.ClassAd` An ad describing the results of the action. """ return self._act(htcondor.JobAction.Continue)
[docs] def vacate(self): """ Vacate running jobs. This will force them off of their current execute resource, causing them to become idle again. Returns ------- ad : :class:`classad.ClassAd` An ad describing the results of the action. """ return self._act(htcondor.JobAction.Vacate)
[docs] def edit(self, attr, value): """ Edit attributes of jobs. .. warning:: Many attribute edits will not affect jobs that have already matched. For example, changing ``RequestMemory`` will not affect the memory allocation of a job that is already executing. In that case, you would need to vacate (or release the job if it was held) before the edit had the desired effect. Parameters ---------- attr The attribute to edit. Case-insensitive. value The new value for the attribute. Returns ------- ad : :class:`classad.ClassAd` An ad describing the results of the edit. """ return self.condor.edit(self.constraint_string, attr, str(value))
[docs] class ConstraintHandle(Handle): """ A connection to a set of jobs defined by an :attr:`ConstraintHandle.constraint`. The handle can be used to query, act on, or edit those jobs. """ def __init__(self, condor, constraint): super().__init__(condor=condor) if isinstance(constraint, str): constraint = classad.ExprTree(constraint) self._constraint = constraint @property def constraint(self) -> classad.ExprTree: """ The constraint that defines this :class:`ConstraintHandle`, as an :class:`classad.ExprTree`. """ return self._constraint @property def constraint_string(self) -> str: """ The constraint that defines this :class:`ConstraintHandle`, as a string. """ return str(self.constraint) def __repr__(self): return "{}(constraint = {})".format(type(self).__name__, self.constraint)
[docs] class ClusterHandle(ConstraintHandle): """ A subclass of :class:`ConstraintHandle` that targets a single cluster of jobs, as produced by :func:`Condor.submit`. Because this handle targets a single cluster of jobs, it has superpowers. If the cluster has an event log (``log = <path>`` in the submit description, see the `docs`_), this handle's ``state`` attribute will be a :class:`ClusterState` that provides information about the current state of the jobs in the cluster. .. warning :: You shouldn't have to construct a :class:`ClusterHandle` yourself. Instead, use the ones returned by :func:`Condor.submit`. .. _docs: https://htcondor.readthedocs.io/en/latest/man-pages/condor_submit.html """ def __init__(self, condor, submit_result): self._clusterid = submit_result.cluster() self._clusterad = submit_result.clusterad() self._first_proc = submit_result.first_proc() self._num_procs = submit_result.num_procs() super().__init__( condor=condor, constraint=classad.ExprTree("ClusterID == {}".format(self.clusterid)), ) # must delay this until after init, because at this point the submit # transaction may not be done yet self._state = None self._event_log = None def __int__(self): return self.clusterid def __repr__(self): batch_name = self.clusterad.get("JobBatchName", None) batch = ( ", JobBatchName = {}".format(batch_name) if batch_name is not None else "" ) return "{}(ClusterID = {}{})".format(type(self).__name__, self.clusterid, batch) @property def clusterid(self): """The cluster's cluster ID.""" return self._clusterid @property def clusterad(self): """The cluster's cluster ad.""" return self._clusterad @property def first_proc(self): """The process ID of the first job in the cluster.""" return self._first_proc @property def num_procs(self): """The number of jobs in the cluster.""" return self._num_procs def __len__(self): return self.num_procs @property def job_ids(self) -> List[jobs.JobID]: """Return the list of :class:`JobID` in this :class:`ClusterHandle`.""" return [jobs.JobID(self.clusterid, proc) for proc in range(len(self))] @property def state(self): """A :class:`ClusterState` that provides information about job state for this cluster.""" if self._state is None: self._state = ClusterState(self) return self._state @property def event_log(self): """The :class:`EventLog` for this :class:`ClusterHandle`.""" if self._event_log is None: self._event_log = EventLog(self) return self._event_log
[docs] def wait( self, condition: Optional[Callable[["ClusterState"], bool]] = None, fail_condition: Optional[Callable[["ClusterState"], bool]] = None, timeout: int = 120, verbose: bool = False, ) -> bool: """ Waits for the ``condition`` to become ``True``. Parameters ---------- condition The function to wait to become ``True``. It will be passed the :class:`ClusterState` as its only argument. Because of how Python calls unbound class methods, you may directly pass :class:`ClusterState` methods as conditions (e.g., ``handle.wait(condition = ClusterState.any_held)``). The default condition is :meth:`ClusterState.all_complete`, which means "wait until all the jobs in this cluster are completed". fail_condition If this function becomes ``True``, ``wait`` will immediately return ``False``. Use this to avoid waiting for a long time when a test is failing. timeout After this amount of time, ``wait`` will return ``False`` and emit a warning in the log. verbose If ``True``, the handle's state counts will be logged during the wait. Returns ------- success : bool ``True`` if the wait finished because the condition became ``True``; ``False`` otherwise. """ if condition is None: condition = ClusterState.all_complete if fail_condition is None: fail_condition = lambda _: False start_time = time.time() num_events_read = self.state.read_events() while True: if verbose: logger.debug("Handle {} state: {}".format(self, self.state.counts())) if condition(self.state): break if fail_condition(self.state): logger.warning( "Wait for handle {} triggered its failure condition".format(self) ) return False if timeout is not None and time.time() > start_time + timeout: logger.warning("Wait for handle {} timed out".format(self)) return False # Sleep a second here if no job log events were waiting for us to prevent # busy waiting. However, if we did see an event, try to read another # event as often they come in bunches - and we want to consume them # as rapidly as possible. if num_events_read == 0: time.sleep(1) num_events_read = self.state.read_events() logger.debug("Wait for handle {} finished successfully".format(self)) return True
class _MockSubmitResult: """ This class is used purely to transform unpacked submit results back into "submit results" to accommodate the :class:`ClusterHandle` constructor. **Should not be used in user code.** """ def __init__(self, clusterid, clusterad, first_proc, num_procs): self._clusterid = clusterid self._clusterad = clusterad self._first_proc = first_proc self._num_procs = num_procs def cluster(self): return self._clusterid def clusterad(self): return self._clusterad def first_proc(self): return self._first_proc def num_procs(self): return self._num_procs JOB_EVENT_STATUS_TRANSITIONS = { htcondor.JobEventType.SUBMIT: jobs.JobStatus.IDLE, htcondor.JobEventType.JOB_EVICTED: jobs.JobStatus.IDLE, htcondor.JobEventType.JOB_UNSUSPENDED: jobs.JobStatus.IDLE, htcondor.JobEventType.JOB_RELEASED: jobs.JobStatus.IDLE, htcondor.JobEventType.SHADOW_EXCEPTION: jobs.JobStatus.IDLE, htcondor.JobEventType.JOB_RECONNECT_FAILED: jobs.JobStatus.IDLE, htcondor.JobEventType.JOB_TERMINATED: jobs.JobStatus.COMPLETED, htcondor.JobEventType.EXECUTE: jobs.JobStatus.RUNNING, htcondor.JobEventType.JOB_HELD: jobs.JobStatus.HELD, htcondor.JobEventType.JOB_SUSPENDED: jobs.JobStatus.SUSPENDED, htcondor.JobEventType.JOB_ABORTED: jobs.JobStatus.REMOVED, } NO_EVENT_LOG = object()
[docs] class ClusterState: """ A class that manages the state of the cluster tracked by a :class:`ClusterHandle`. It reads from the cluster's event log internally and provides a variety of views of the individual job states. .. warning:: :class:`ClusterState` objects should not be instantiated manually. :class:`ClusterHandle` will create them automatically when needed. """ def __init__(self, handle): self._handle = weakref.proxy(handle) self._clusterid = handle.clusterid self._offset = handle.first_proc self._data = self._make_initial_data(handle) self._counts = collections.Counter(jobs.JobStatus(js) for js in self._data) self._last_event_read = -1 def _make_initial_data(self, handle): return [jobs.JobStatus.UNMATERIALIZED for _ in range(len(handle))] def read_events(self): # TODO: this reacharound through the handle is bad # trigger a read... list(self._handle.event_log.read_events()) # ... but actually look through everything we haven't read yet # in case someone else has read elsewhere num_events_read = 0 for event in self._handle.event_log.events[self._last_event_read + 1 :]: self._last_event_read += 1 num_events_read += 1 new_status = JOB_EVENT_STATUS_TRANSITIONS.get(event.type, None) if new_status is not None: key = event.proc - self._offset # update counts old_status = self._data[key] self._counts[old_status] -= 1 self._counts[new_status] += 1 # set new status on individual job self._data[key] = new_status # break here to avoid race conditions where a test may be waiting # for a status that is very temporary and thus the wait condition never fires. break return num_events_read def __getitem__(self, proc): if isinstance(proc, int): return self._data[proc - self._offset] elif isinstance(proc, slice): start, stop, stride = proc.indices(len(self)) return self._data[start - self._offset : stop - self._offset : stride]
[docs] def counts(self): """ Return the number of jobs in each :class:`JobStatus`, as a :class:`collections.Counter`. """ return self._counts.copy()
@property def by_name(self): states = collections.defaultdict(list) for p, s in enumerate(self._data): states[s].append(jobs.JobID(self._clusterid, p + self._offset)) return states def __iter__(self): yield from self._data def __str__(self): return str(self._data) def __repr__(self): return repr(self._data) def __len__(self): return len(self._data) def __eq__(self, other): return isinstance(other, self.__class__) and self._handle == other._handle
[docs] def all_complete(self) -> bool: """ Return ``True`` if **all** of the jobs in the cluster are complete. Note that this definition does include jobs that have left the queue, not just ones that are in the "Completed" state in the queue. """ return self.all_status(jobs.JobStatus.COMPLETED)
[docs] def any_complete(self) -> bool: """ Return ``True`` if **any** of the jobs in the cluster are complete. Note that this definition does include jobs that have left the queue, not just ones that are in the "Completed" state in the queue. """ return self.any_status(jobs.JobStatus.COMPLETED)
[docs] def any_idle(self) -> bool: """Return ``True`` if **any** of the jobs in the cluster are idle.""" return self.any_status(jobs.JobStatus.IDLE)
[docs] def none_idle(self) -> bool: """Return ``True`` if **none** of the jobs in the cluster are idle.""" return self.none_status(jobs.JobStatus.IDLE)
[docs] @staticmethod def running_exactly(count) -> bool: """Returns ``True`` if **count** of the jobs in the cluster are running.""" return lambda self: self.status_exactly(count, jobs.JobStatus.RUNNING)
[docs] def any_running(self) -> bool: """Return ``True`` if **any** of the jobs in the cluster are running.""" return self.any_status(jobs.JobStatus.RUNNING)
[docs] def all_running(self) -> bool: """Return ``True`` if **all** of the jobs in the cluster are running.""" return self.all_status(jobs.JobStatus.RUNNING)
[docs] def all_held(self) -> bool: """Return ``True`` if **all** of the jobs in the cluster are held.""" return self.all_status(jobs.JobStatus.HELD)
[docs] def all_idle(self) -> bool: """Return ``True`` if **all** of the jobs in the cluster are held.""" return self.all_status(jobs.JobStatus.IDLE)
[docs] def any_held(self) -> bool: """Return ``True`` if **any** of the jobs in the cluster are held.""" return self.any_status(jobs.JobStatus.HELD)
[docs] def none_held(self) -> bool: """Return ``True`` if **none** of the jobs in the cluster are held.""" return self.none_status(jobs.JobStatus.HELD)
[docs] def all_terminal(self) -> bool: """Return ``True`` if **all** of the jobs in the cluster are completed, held, or removed.""" return self.all_status( jobs.JobStatus.COMPLETED, jobs.JobStatus.HELD, jobs.JobStatus.REMOVED )
[docs] def any_terminal(self) -> bool: """Return ``True`` if **any** of the jobs in the cluster are completed, held, or removed.""" return self.any_status( jobs.JobStatus.COMPLETED, jobs.JobStatus.HELD, jobs.JobStatus.REMOVED )
[docs] def status_exactly(self, count, *statuses: jobs.JobStatus) -> bool: """ Return ``True`` if **exactly** ``count`` of the jobs in the cluster are in one of the ``statuses``. Prefer one of the explicitly-named helper methods when possible, and don't be afraid to make a new helper method! """ return self.count_status(*statuses) == count
[docs] def all_status(self, *statuses: jobs.JobStatus) -> bool: """ Return ``True`` if **all** of the jobs in the cluster are in one of the ``statuses``. Prefer one of the explicitly-named helper methods when possible, and don't be afraid to make a new helper method! """ return self.count_status(*statuses) == len(self)
[docs] def any_status(self, *statuses: jobs.JobStatus) -> bool: """ Return ``True`` if **any** of the jobs in the cluster are in one of the ``statuses``. Prefer one of the explicitly-named helper methods when possible, and don't be afraid to make a new helper method! """ return self.count_status(*statuses) > 0
[docs] def none_status(self, *statuses: jobs.JobStatus) -> bool: """ Return ``True`` if **none** of the jobs in the cluster are in one of the ``statuses``. Prefer one of the explicitly-named helper methods when possible, and don't be afraid to make a new helper method! """ return self.count_status(*statuses) == 0
[docs] def count_status(self, *statuses: jobs.JobStatus) -> int: """Return the total number of jobs in the cluster in any of the given statuses.""" counts = self.counts() return sum(counts[status] for status in statuses)
[docs] class EventLog: """ This class represents the job event log for a :class:`ClusterHandle`. .. warning :: You shouldn't have to construct this yourself. Instead, use :attr:`ClusterHandle.event_log`. """ def __init__(self, handle: ClusterHandle): self._handle = handle self._clusterid = handle.clusterid raw_event_log_path = utils.chain_get( handle.clusterad, ("UserLog", "DAGManNodesLog"), default=NO_EVENT_LOG ) if raw_event_log_path is NO_EVENT_LOG: raise exceptions.NoJobEventLog( "Cluster for handle {} does not have a job event log, so it cannot track job state".format( self._handle ) ) self._event_log_path = Path(raw_event_log_path).absolute() self._event_reader = None self.events = []
[docs] def read_events(self) -> Iterator[htcondor.JobEvent]: """Yield all un-read events in the event log.""" if self._event_reader is None: self._event_reader = htcondor.JobEventLog( self._event_log_path.as_posix() ).events(0) for event in self._event_reader: if event.cluster != self._clusterid: continue self.events.append(event) yield event
[docs] def filter( self, condition: Callable[[htcondor.JobEvent], bool] ) -> List[htcondor.JobEvent]: """ Return a list containing the job events that the condition is ``True`` for. """ return [e for e in self.events if condition(e)]