Source code for ornithology.job_queue

# Copyright 2019 HTCondor Team, Computer Sciences Department,
# University of Wisconsin-Madison, WI.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Iterator, List, Optional, Tuple, Union, Mapping, Iterable

import logging

import time
import collections

from . import condor, jobs

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


[docs] class SetAttribute: def __init__(self, attribute: str, value: Optional[str]): self.attribute = attribute self.value = value def __eq__(self, other): return ( (isinstance(other, self.__class__) or isinstance(self, other.__class__)) and self.attribute == other.attribute and self.value == other.value ) def __hash__(self): return hash((SetAttribute, self.attribute, self.value)) def matches(self, other): return self == other or ( ( isinstance(other, self.__class__ or isinstance(self, other.__class__)) and ( ( (self.attribute is None or other.attribute is None) and self.value == other.value ) or ( self.attribute == other.attribute and (self.value is None or other.value is None) ) ) ) ) def _fmt_value(self): """Some values can have special formatting depending on the attribute.""" if self.attribute in ("JobStatus", "LastJobStatus"): return str(jobs.JobStatus(self.value)) return str(self.value) def __repr__(self): return "{}(attribute = {}, value = {})".format( self.__class__.__name__, self.attribute, self._fmt_value() ) def __str__(self): return "{}({} = {})".format( type(self).__name__, self.attribute, self._fmt_value() )
[docs] class SetJobStatus(SetAttribute): """A subclass of :class:`SetAttribute` specialized for a job status change.""" def __init__(self, new_status: jobs.JobStatus): super().__init__(attribute="JobStatus", value=new_status)
EXPECTED_EVENTS = Mapping[jobs.JobID, Iterable[SetAttribute]]
[docs] class JobQueue: def __init__(self, condor: "condor.Condor"): self.condor = condor self.transactions = [] self.by_jobid = collections.defaultdict(list) self._job_queue_log_file = None def __iter__(self): yield from self.transactions
[docs] def events(self): """ Yield a flat stream of events from the job queue log (ignoring transaction structure). """ for transaction in self.transactions: yield from transaction
[docs] def filter(self, condition): """ Yield a flat stream of events on the job queue log that satisfy some condition (ignoring transaction structure). Parameters ---------- condition """ yield from ((j, e) for j, e in self.events() if condition(j, e))
[docs] def read_transactions(self) -> Iterator[List[Tuple[jobs.JobID, SetAttribute]]]: """Yield transactions (i.e., lists) of (jobid, event) pairs from the job queue log.""" if self._job_queue_log_file is None: self._job_queue_log_file = self.condor.job_queue_log.open(mode="r") transaction_accumulator = [] for line in buffered_readlines(self._job_queue_log_file): jobid, event = parse_job_queue_log_line(line) if event is START_TRANSACTION: transaction_accumulator = [] elif event is END_TRANSACTION: t = JobQueueTransaction(transaction_accumulator) self.transactions.append(t) yield t elif isinstance(jobid, jobs.JobID) and isinstance(event, SetAttribute): self.by_jobid[jobid].append(event) transaction_accumulator.append((jobid, event))
[docs] def wait_for_events( self, expected_events: EXPECTED_EVENTS, unexpected_events: Optional[EXPECTED_EVENTS] = None, timeout: int = 120, ): """ Wait for job queue events to occur. This method is primarily intended for test setup; see :func:`in_order` for a way to assert that job queue events occurred in a certain order post-facto. All this method cares about is what the *next* event is for a particular jobid. If events come out of order, it will not record the out-of-order ones! This method never raises an exception intentionally, even when it times out. It simply returns control to the test, so that the test itself can declare failure. Log messages are recorded with detailed information in either case. Parameters ---------- expected_events unexpected_events timeout Returns ------- all_good : bool ``True`` is all events occurred and no unexpected events occurred. ``False`` if it timed out, or if any unexpected events occurred. """ all_good = True if unexpected_events is None: unexpected_events = {} unexpected_events = { jobid: set(events) for jobid, events in unexpected_events.items() } expected_events = { jobid: collections.deque( event if isinstance(event, tuple) else (event, lambda *_: None) for event in events ) for jobid, events in expected_events.items() } jobids = set(expected_events.keys()) start = time.time() while True: elapsed = time.time() - start if elapsed > timeout: logger.error("Job queue event wait ending due to timeout!") return False for transaction in self.read_transactions(): for jobid, event in transaction: if jobid not in jobids: continue if event in unexpected_events.get(jobid, ()): logger.error( "Saw unexpected job queue event for job {}: {} (was expecting {})".format( jobid, event, expected_events[jobid][0] ) ) all_good = False continue expected_events_for_jobid = expected_events[jobid] if len(expected_events_for_jobid) == 0: continue next_event, callback = expected_events_for_jobid[0] if not event.matches(next_event): continue logger.debug( "Saw expected job queue event for job {}: {}".format( jobid, event ) ) expected_events_for_jobid.popleft() callback(jobid, event) if len(expected_events_for_jobid) == 0: logger.debug( "Have seen all expected job queue events for job {}".format( jobid ) ) else: logger.debug( "Still expecting job queue events for job {}: [{}]".format( jobid, ", ".join(str(e) for e, _ in expected_events_for_jobid), ) ) # if no more expected event, we're done! if all(len(events) == 0 for events in expected_events.values()): logger.debug( "Job queue event wait exiting with goodness: {}".format( all_good ) ) return all_good time.sleep(0.1)
[docs] def wait_for_job_completion(self, job_ids, timeout=120) -> bool: """ Wait for a set of job ids to reach the completed state. If any of the jobs reach a non-complete terminal state (held, removed, etc.) this function will immediately return ``False``. Parameters ---------- job_ids The job ids to wait for. You may find :attr:`ClusterHandle.job_ids` useful. timeout Returns ------- all_good : bool ``True`` is all events occurred and no unexpected events occurred. ``False`` if it timed out, or if any unexpected events occurred. """ job_ids = list(job_ids) return self.wait_for_events( expected_events={ job_id: [SetJobStatus(jobs.JobStatus.COMPLETED)] for job_id in job_ids }, unexpected_events={ job_id: { SetJobStatus(jobs.JobStatus.HELD), SetJobStatus(jobs.JobStatus.SUSPENDED), SetJobStatus(jobs.JobStatus.REMOVED), } for job_id in job_ids }, timeout=timeout, )
START_TRANSACTION = object() END_TRANSACTION = object() def buffered_readlines(file) -> Iterator[str]: """ Because of the buffering semantics of the job queue log, we need to do things more manually that we would like. The problem is that the job queue log doesn't flush in the middle of a transaction, making it possible to get partial lines if we read in the middle of a transaction. Thus, we use a simple line-oriented buffer to avoid that. """ buffer = "" while True: line = file.readline() # If the buffer and line are both empty, this is the end of the file. if buffer == "" and line == "": break buffer += line # Oops! We read only part of a line. # Wait a moment, then try again to see if we can get the full line. if not buffer.endswith("\n"): time.sleep(0.0001) continue yield buffer buffer = "" def parse_job_queue_log_line( line: str, ) -> Tuple[Optional[jobs.JobID], Optional[Union[SetAttribute, Any]]]: parts = line.strip().split(" ", 3) event_type = parts[0] if event_type == "103": return jobs.JobID(*parts[1].split(".")), SetAttribute(parts[2], parts[3]) elif event_type == "105": return None, START_TRANSACTION elif event_type == "106": return None, END_TRANSACTION return None, None class JobQueueTransaction: def __init__(self, events): self.events = events def __iter__(self): yield from self.events def __len__(self): return len(self.events) def __getitem__(self, item): return self.events[item] def __contains__(self, item): expected_j, expected_e = item return any(expected_j == j and expected_e.matches(e) for j, e in self) def as_dict(self): d = collections.defaultdict(dict) for j, e in self: d[j][e.attribute] = e.value return dict(d) def __repr__(self): return "{}{}".format(self.__class__.__name__, repr(self.events))