#!/usr/bin/python3
# Copyright 2019 HTCondor Team, Computer Sciences Department,
# University of Wisconsin-Madison, WI.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Tuple, Optional
import logging
import os
import subprocess
import re
from .try_os_set import (
try_os_setegid,
try_os_seteuid,
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
[docs]
def run_command(
args: List[str],
stdin: Optional[str] = None,
timeout: int = 60,
echo: bool = False,
suppress: bool = False,
as_user: str = None,
) -> subprocess.CompletedProcess:
"""
Execute a command.
Parameters
----------
args
The command to run, as a list of strings.
stdin
Any stdin to pass to the command.
timeout
If the command does not return within this time, a :class:`TimeoutError`
will be raised.
echo
If ``True``, the stdout and stderr of the command will be printed.
suppress
If ``True``, the details of the command execution will be truncated.
Returns
-------
"""
if timeout is None:
raise TypeError("run_command timeout cannot be None")
args = list(map(str, args))
logger.debug("About to run command: {}".format(" ".join(args)))
if as_user is not None:
egid = try_os_setegid(name=as_user)
euid = try_os_seteuid(name=as_user)
p = subprocess.run(
args,
timeout=timeout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=stdin,
universal_newlines=True,
)
p.stdout = p.stdout.rstrip()
p.stderr = p.stderr.rstrip()
if as_user is not None:
try_os_setegid(number=egid)
try_os_seteuid(number=euid)
msg_lines = ["Ran command: {}".format(" ".join(p.args))]
if not suppress:
msg_lines += [
"CONDOR_CONFIG = {}".format(os.environ.get("CONDOR_CONFIG", "<not set>")),
"exit code: {}".format(p.returncode),
"stdout:{}{}".format("\n" if "\n" in p.stdout else " ", p.stdout),
"stderr:{}{}".format("\n" if "\n" in p.stderr else " ", p.stderr),
]
msg = "\n".join(msg_lines)
logger.debug(msg)
if echo:
print(msg)
return p
RE_SUBMIT_RESULT = re.compile(r"(\d+) job\(s\) submitted to cluster (\d+)\.")
def parse_submit_result(submit_cmd: subprocess.CompletedProcess) -> Tuple[int, int]:
"""
Get a "submit result" from a `condor_submit` command run via
:func:`run_command`.
Parameters
----------
submit_cmd
The :class:`subprocess.CompletedProcess` returned by a call to
`condor_submit` via :func:`run_command`.
Returns
-------
cluster_id, num_procs : int, int
The cluster ID and number of submitted jobs for the cluster.
"""
match = RE_SUBMIT_RESULT.search(submit_cmd.stdout)
if match is not None:
num_procs = int(match.group(1))
clusterid = int(match.group(2))
return clusterid, num_procs
raise ValueError(
'Was not able to extract submit results from command "{}", stdout:\n{}\nstderr:\n{}'.format(
" ".join(submit_cmd.args), submit_cmd.stdout, submit_cmd.stderr
)
)