From 3bb7785b59ef92278b24f2636b5250b07ce788ee Mon Sep 17 00:00:00 2001 From: thibauth Date: Mon, 1 Aug 2011 16:05:49 +0000 Subject: Planetlab utilities git-svn-id: https://scm.gforge.inria.fr/svn/pacemaker@50 30fcff6e-8de6-41c7-acce-77ff6d1dd07b --- planetlab/pssh/psshlib/__init__.py | 0 planetlab/pssh/psshlib/askpass_client.py | 95 +++++++++ planetlab/pssh/psshlib/askpass_server.py | 101 +++++++++ planetlab/pssh/psshlib/cli.py | 108 ++++++++++ planetlab/pssh/psshlib/color.py | 39 ++++ planetlab/pssh/psshlib/manager.py | 345 +++++++++++++++++++++++++++++++ planetlab/pssh/psshlib/psshutil.py | 108 ++++++++++ planetlab/pssh/psshlib/task.py | 281 +++++++++++++++++++++++++ 8 files changed, 1077 insertions(+) create mode 100644 planetlab/pssh/psshlib/__init__.py create mode 100644 planetlab/pssh/psshlib/askpass_client.py create mode 100644 planetlab/pssh/psshlib/askpass_server.py create mode 100644 planetlab/pssh/psshlib/cli.py create mode 100644 planetlab/pssh/psshlib/color.py create mode 100644 planetlab/pssh/psshlib/manager.py create mode 100644 planetlab/pssh/psshlib/psshutil.py create mode 100644 planetlab/pssh/psshlib/task.py (limited to 'planetlab/pssh/psshlib') diff --git a/planetlab/pssh/psshlib/__init__.py b/planetlab/pssh/psshlib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/planetlab/pssh/psshlib/askpass_client.py b/planetlab/pssh/psshlib/askpass_client.py new file mode 100644 index 0000000..fa4d40a --- /dev/null +++ b/planetlab/pssh/psshlib/askpass_client.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python +# -*- Mode: python -*- + +# Copyright (c) 2009, Andrew McNabb + +"""Implementation of SSH_ASKPASS to get a password to ssh from pssh. + +The password is read from the socket specified by the environment variable +PSSH_ASKPASS_SOCKET. The other end of this socket is pssh. + +The ssh man page discusses SSH_ASKPASS as follows: + If ssh needs a passphrase, it will read the passphrase from the current + terminal if it was run from a terminal. If ssh does not have a terminal + associated with it but DISPLAY and SSH_ASKPASS are set, it will execute + the program specified by SSH_ASKPASS and open an X11 window to read the + passphrase. This is particularly useful when calling ssh from a .xsession + or related script. (Note that on some machines it may be necessary to + redirect the input from /dev/null to make this work.) +""" + +import os +import socket +import sys +import textwrap + +bin_dir = os.path.dirname(os.path.abspath(sys.argv[0])) +askpass_bin_path = os.path.join(bin_dir, 'pssh-askpass') +ASKPASS_PATHS = (askpass_bin_path, + '/usr/libexec/pssh/pssh-askpass', + '/usr/local/libexec/pssh/pssh-askpass', + '/usr/lib/pssh/pssh-askpass', + '/usr/local/lib/pssh/pssh-askpass') + +_executable_path = None + +def executable_path(): + """Determines the value to use for SSH_ASKPASS. + + The value is cached since this may be called many times. + """ + global _executable_path + if _executable_path is None: + for path in ASKPASS_PATHS: + if os.access(path, os.X_OK): + _executable_path = path + break + else: + _executable_path = '' + sys.stderr.write(textwrap.fill("Warning: could not find an" + " executable path for askpass because PSSH was not" + " installed correctly. Password prompts will not work.")) + sys.stderr.write('\n') + return _executable_path + +def askpass_main(): + """Connects to pssh over the socket specified at PSSH_ASKPASS_SOCKET.""" + + # It's not documented anywhere, as far as I can tell, but ssh may prompt + # for a password or ask a yes/no question. The command-line argument + # specifies what is needed. + if len(sys.argv) > 1: + prompt = sys.argv[1] + if not prompt.lower().endswith('password: '): + sys.stderr.write(prompt) + sys.stderr.write('\n') + sys.exit(1) + + address = os.getenv('PSSH_ASKPASS_SOCKET') + if not address: + sys.stderr.write(textwrap.fill("pssh error: SSH requested a password." + " Please create SSH keys or use the -A option to provide a" + " password.")) + sys.stderr.write('\n') + sys.exit(1) + + sock = socket.socket(socket.AF_UNIX) + try: + sock.connect(address) + except socket.error: + _, e, _ = sys.exc_info() + message = e.args[1] + sys.stderr.write("Couldn't bind to %s: %s.\n" % (address, message)) + sys.exit(2) + + try: + password = sock.makefile().read() + except socket.error: + sys.stderr.write("Socket error.\n") + sys.exit(3) + + print(password) + + +if __name__ == '__main__': + askpass_main() diff --git a/planetlab/pssh/psshlib/askpass_server.py b/planetlab/pssh/psshlib/askpass_server.py new file mode 100644 index 0000000..a5db977 --- /dev/null +++ b/planetlab/pssh/psshlib/askpass_server.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python +# -*- Mode: python -*- + +# Copyright (c) 2009, Andrew McNabb + +"""Sends the password over a socket to askpass. +""" + +import errno +import getpass +import os +import socket +import sys +import tempfile +import textwrap + +from psshlib import psshutil + + +class PasswordServer(object): + """Listens on a UNIX domain socket for password requests.""" + def __init__(self): + self.sock = None + self.tempdir = None + self.address = None + self.socketmap = {} + self.buffermap = {} + + def start(self, iomap, backlog): + """Prompts for the password, creates a socket, and starts listening. + + The specified backlog should be the max number of clients connecting + at once. + """ + message = ('Warning: do not enter your password if anyone else has' + ' superuser privileges or access to your account.') + print(textwrap.fill(message)) + + self.password = getpass.getpass() + + # Note that according to the docs for mkdtemp, "The directory is + # readable, writable, and searchable only by the creating user." + self.tempdir = tempfile.mkdtemp(prefix='pssh.') + self.address = os.path.join(self.tempdir, 'pssh_askpass_socket') + self.sock = socket.socket(socket.AF_UNIX) + psshutil.set_cloexec(self.sock) + self.sock.bind(self.address) + self.sock.listen(backlog) + iomap.register_read(self.sock.fileno(), self.handle_listen) + + def handle_listen(self, fd, iomap): + try: + conn = self.sock.accept()[0] + except socket.error: + _, e, _ = sys.exc_info() + number = e.args[0] + if number == errno.EINTR: + return + else: + # TODO: print an error message here? + self.sock.close() + self.sock = None + fd = conn.fileno() + iomap.register_write(fd, self.handle_write) + self.socketmap[fd] = conn + self.buffermap[fd] = self.password + + def handle_write(self, fd, iomap): + buffer = self.buffermap[fd] + conn = self.socketmap[fd] + try: + bytes_written = conn.send(buffer) + except socket.error: + _, e, _ = sys.exc_info() + number = e.args[0] + if number == errno.EINTR: + return + else: + self.close_socket(fd, iomap) + + buffer = buffer[bytes_written:] + if buffer: + self.buffermap[fd] = buffer + else: + self.close_socket(fd, iomap) + + def close_socket(self, fd, iomap): + iomap.unregister(fd) + self.socketmap[fd].close() + del self.socketmap[fd] + del self.buffermap[fd] + + def __del__(self): + if self.sock: + self.sock.close() + self.sock = None + if self.address: + os.remove(self.address) + if self.tempdir: + os.rmdir(self.tempdir) + diff --git a/planetlab/pssh/psshlib/cli.py b/planetlab/pssh/psshlib/cli.py new file mode 100644 index 0000000..1686ba9 --- /dev/null +++ b/planetlab/pssh/psshlib/cli.py @@ -0,0 +1,108 @@ +# Copyright (c) 2009, Andrew McNabb +# Copyright (c) 2003-2008, Brent N. Chun + +import optparse +import os +import shlex +import sys +import textwrap + +_DEFAULT_PARALLELISM = 32 +_DEFAULT_TIMEOUT = 0 # "infinity" by default + + +def common_parser(): + """ + Create a basic OptionParser with arguments common to all pssh programs. + """ + # The "resolve" conflict handler avoids errors from the hosts option + # conflicting with the help option. + parser = optparse.OptionParser(conflict_handler='resolve') + # Ensure that options appearing after the command are sent to ssh. + parser.disable_interspersed_args() + parser.epilog = "Example: pssh -h nodes.txt -l irb2 -o /tmp/foo uptime" + + parser.add_option('-h', '--hosts', dest='host_files', action='append', + metavar='HOST_FILE', + help='hosts file (each line "[user@]host[:port]")') + parser.add_option('-H', '--host', dest='host_strings', action='append', + metavar='HOST_STRING', + help='additional host entries ("[user@]host[:port]")') + parser.add_option('-l', '--user', dest='user', + help='username (OPTIONAL)') + parser.add_option('-p', '--par', dest='par', type='int', + help='max number of parallel threads (OPTIONAL)') + parser.add_option('-o', '--outdir', dest='outdir', + help='output directory for stdout files (OPTIONAL)') + parser.add_option('-e', '--errdir', dest='errdir', + help='output directory for stderr files (OPTIONAL)') + parser.add_option('-t', '--timeout', dest='timeout', type='int', + help='timeout (secs) (0 = no timeout) per host (OPTIONAL)') + parser.add_option('-O', '--option', dest='options', action='append', + metavar='OPTION', help='SSH option (OPTIONAL)') + parser.add_option('-v', '--verbose', dest='verbose', action='store_true', + help='turn on warning and diagnostic messages (OPTIONAL)') + parser.add_option('-A', '--askpass', dest='askpass', action='store_true', + help='Ask for a password (OPTIONAL)') + parser.add_option('-x', '--extra-args', action='callback', type='string', + metavar='ARGS', callback=shlex_append, dest='extra', + help='Extra command-line arguments, with processing for ' + 'spaces, quotes, and backslashes') + parser.add_option('-X', '--extra-arg', dest='extra', action='append', + metavar='ARG', help='Extra command-line argument') + + return parser + + +def common_defaults(**kwargs): + defaults = dict(par=_DEFAULT_PARALLELISM, timeout=_DEFAULT_TIMEOUT) + defaults.update(**kwargs) + envvars = [('user', 'PSSH_USER'), + ('par', 'PSSH_PAR'), + ('outdir', 'PSSH_OUTDIR'), + ('errdir', 'PSSH_ERRDIR'), + ('timeout', 'PSSH_TIMEOUT'), + ('verbose', 'PSSH_VERBOSE'), + ('print_out', 'PSSH_PRINT'), + ('askpass', 'PSSH_ASKPASS'), + ('inline', 'PSSH_INLINE'), + ('recursive', 'PSSH_RECURSIVE'), + ('archive', 'PSSH_ARCHIVE'), + ('compress', 'PSSH_COMPRESS'), + ('localdir', 'PSSH_LOCALDIR'), + ] + for option, var, in envvars: + value = os.getenv(var) + if value: + defaults[option] = value + + value = os.getenv('PSSH_OPTIONS') + if value: + defaults['options'] = [value] + + value = os.getenv('PSSH_HOSTS') + if value: + message1 = ('Warning: the PSSH_HOSTS environment variable is ' + 'deprecated. Please use the "-h" option instead, and consider ' + 'creating aliases for convenience. For example:') + message2 = " alias pssh_abc='pssh -h /path/to/hosts_abc'" + sys.stderr.write(textwrap.fill(message1)) + sys.stderr.write('\n') + sys.stderr.write(message2) + sys.stderr.write('\n') + defaults['host_files'] = [value] + + return defaults + + +def shlex_append(option, opt_str, value, parser): + """An optparse callback similar to the append action. + + The given value is processed with shlex, and the resulting list is + concatenated to the option's dest list. + """ + lst = getattr(parser.values, option.dest) + if lst is None: + lst = [] + setattr(parser.values, option.dest, lst) + lst.extend(shlex.split(value)) diff --git a/planetlab/pssh/psshlib/color.py b/planetlab/pssh/psshlib/color.py new file mode 100644 index 0000000..eb9f001 --- /dev/null +++ b/planetlab/pssh/psshlib/color.py @@ -0,0 +1,39 @@ +# Copyright (c) 2009, Andrew McNabb +# Copyright (c) 2003-2008, Brent N. Chun + +def with_color(string, fg, bg=49): + '''Given foreground/background ANSI color codes, return a string that, + when printed, will format the supplied string using the supplied colors. + ''' + return "\x1b[%dm\x1b[%dm%s\x1b[39m\x1b[49m" % (fg, bg, string) + +def B(string): + '''Returns a string that, when printed, will display the supplied string + in ANSI bold. + ''' + return "\x1b[1m%s\x1b[22m" % string + +def r(string): return with_color(string, 31) # Red +def g(string): return with_color(string, 32) # Green +def y(string): return with_color(string, 33) # Yellow +def b(string): return with_color(string, 34) # Blue +def m(string): return with_color(string, 35) # Magenta +def c(string): return with_color(string, 36) # Cyan +def w(string): return with_color(string, 37) # White + +#following from Python cookbook, #475186 +def has_colors(stream): + '''Returns boolean indicating whether or not the supplied stream supports + ANSI color. + ''' + if not hasattr(stream, "isatty"): + return False + if not stream.isatty(): + return False # auto color only on TTYs + try: + import curses + curses.setupterm() + return curses.tigetnum("colors") > 2 + except: + # guess false in case of error + return False diff --git a/planetlab/pssh/psshlib/manager.py b/planetlab/pssh/psshlib/manager.py new file mode 100644 index 0000000..b10959d --- /dev/null +++ b/planetlab/pssh/psshlib/manager.py @@ -0,0 +1,345 @@ +# Copyright (c) 2009, Andrew McNabb + +from errno import EINTR +import os +import select +import signal +import sys +import threading + +try: + import queue +except ImportError: + import Queue as queue + +from psshlib.askpass_server import PasswordServer +from psshlib import psshutil + +READ_SIZE = 1 << 16 + + +class FatalError(RuntimeError): + """A fatal error in the PSSH Manager.""" + pass + + +class Manager(object): + """Executes tasks concurrently. + + Tasks are added with add_task() and executed in parallel with run(). + Returns a list of the exit statuses of the processes. + + Arguments: + limit: Maximum number of commands running at once. + timeout: Maximum allowed execution time in seconds. + """ + def __init__(self, opts): + self.limit = opts.par + self.timeout = opts.timeout + self.askpass = opts.askpass + self.outdir = opts.outdir + self.errdir = opts.errdir + self.iomap = IOMap() + + self.taskcount = 0 + self.tasks = [] + self.running = [] + self.done = [] + + self.askpass_socket = None + + def run(self): + """Processes tasks previously added with add_task.""" + try: + if self.outdir or self.errdir: + writer = Writer(self.outdir, self.errdir) + writer.start() + else: + writer = None + + if self.askpass: + pass_server = PasswordServer() + pass_server.start(self.iomap, self.limit) + self.askpass_socket = pass_server.address + + self.set_sigchld_handler() + + try: + self.update_tasks(writer) + wait = None + while self.running or self.tasks: + # Opt for efficiency over subsecond timeout accuracy. + if wait is None or wait < 1: + wait = 1 + self.iomap.poll(wait) + self.update_tasks(writer) + wait = self.check_timeout() + except KeyboardInterrupt: + # This exception handler tries to clean things up and prints + # out a nice status message for each interrupted host. + self.interrupted() + + except KeyboardInterrupt: + # This exception handler doesn't print out any fancy status + # information--it just stops. + pass + + if writer: + writer.signal_quit() + writer.join() + + statuses = [task.exitstatus for task in self.done] + return statuses + + def clear_sigchld_handler(self): + signal.signal(signal.SIGCHLD, signal.SIG_DFL) + + def set_sigchld_handler(self): + # TODO: find out whether set_wakeup_fd still works if the default + # signal handler is used (I'm pretty sure it doesn't work if the + # signal is ignored). + signal.signal(signal.SIGCHLD, self.handle_sigchld) + # This should keep reads and writes from getting EINTR. + if hasattr(signal, 'siginterrupt'): + signal.siginterrupt(signal.SIGCHLD, False) + + def handle_sigchld(self, number, frame): + """Apparently we need a sigchld handler to make set_wakeup_fd work.""" + # Write to the signal pipe (only for Python <2.5, where the + # set_wakeup_fd method doesn't exist). + if self.iomap.wakeup_writefd: + os.write(self.iomap.wakeup_writefd, '\0') + for task in self.running: + if task.proc: + task.proc.poll() + # Apparently some UNIX systems automatically resent the SIGCHLD + # handler to SIG_DFL. Reset it just in case. + self.set_sigchld_handler() + + def add_task(self, task): + """Adds a Task to be processed with run().""" + self.tasks.append(task) + + def update_tasks(self, writer): + """Reaps tasks and starts as many new ones as allowed.""" + # Mask signals to work around a Python bug: + # http://bugs.python.org/issue1068268 + # Since sigprocmask isn't in the stdlib, clear the SIGCHLD handler. + # Since signals are masked, reap_tasks needs to be called once for + # each loop. + keep_running = True + while keep_running: + self.clear_sigchld_handler() + self._start_tasks_once(writer) + self.set_sigchld_handler() + keep_running = self.reap_tasks() + + def _start_tasks_once(self, writer): + """Starts tasks once. + + Due to http://bugs.python.org/issue1068268, signals must be masked + when this method is called. + """ + while 0 < len(self.tasks) and len(self.running) < self.limit: + task = self.tasks.pop(0) + self.running.append(task) + task.start(self.taskcount, self.iomap, writer, self.askpass_socket) + self.taskcount += 1 + + def reap_tasks(self): + """Checks to see if any tasks have terminated. + + After cleaning up, returns the number of tasks that finished. + """ + still_running = [] + finished_count = 0 + for task in self.running: + if task.running(): + still_running.append(task) + else: + self.finished(task) + finished_count += 1 + self.running = still_running + return finished_count + + def check_timeout(self): + """Kills timed-out processes and returns the lowest time left.""" + if self.timeout <= 0: + return None + + min_timeleft = None + for task in self.running: + timeleft = self.timeout - task.elapsed() + if timeleft <= 0: + task.timedout() + continue + if min_timeleft is None or timeleft < min_timeleft: + min_timeleft = timeleft + + if min_timeleft is None: + return 0 + else: + return max(0, min_timeleft) + + def interrupted(self): + """Cleans up after a keyboard interrupt.""" + for task in self.running: + task.interrupted() + self.finished(task) + + for task in self.tasks: + task.cancel() + self.finished(task) + + def finished(self, task): + """Marks a task as complete and reports its status to stdout.""" + self.done.append(task) + n = len(self.done) + task.report(n) + + +class IOMap(object): + """A manager for file descriptors and their associated handlers. + + The poll method dispatches events to the appropriate handlers. + """ + def __init__(self): + self.readmap = {} + self.writemap = {} + + # Setup the wakeup file descriptor to avoid hanging on lost signals. + wakeup_readfd, wakeup_writefd = os.pipe() + self.register_read(wakeup_readfd, self.wakeup_handler) + # TODO: remove test when we stop supporting Python <2.5 + if hasattr(signal, 'set_wakeup_fd'): + signal.set_wakeup_fd(wakeup_writefd) + self.wakeup_writefd = None + else: + self.wakeup_writefd = wakeup_writefd + + def register_read(self, fd, handler): + """Registers an IO handler for a file descriptor for reading.""" + self.readmap[fd] = handler + + def register_write(self, fd, handler): + """Registers an IO handler for a file descriptor for writing.""" + self.writemap[fd] = handler + + def unregister(self, fd): + """Unregisters the given file descriptor.""" + if fd in self.readmap: + del self.readmap[fd] + if fd in self.writemap: + del self.writemap[fd] + + def poll(self, timeout=None): + """Performs a poll and dispatches the resulting events.""" + if not self.readmap and not self.writemap: + return + rlist = list(self.readmap) + wlist = list(self.writemap) + try: + rlist, wlist, _ = select.select(rlist, wlist, [], timeout) + except select.error: + _, e, _ = sys.exc_info() + errno = e.args[0] + if errno == EINTR: + return + else: + raise + for fd in rlist: + handler = self.readmap[fd] + handler(fd, self) + for fd in wlist: + handler = self.writemap[fd] + handler(fd, self) + + def wakeup_handler(self, fd, iomap): + """Handles read events on the signal wakeup pipe. + + This ensures that SIGCHLD signals aren't lost. + """ + try: + os.read(fd, READ_SIZE) + except (OSError, IOError): + _, e, _ = sys.exc_info() + errno, message = e.args + if errno != EINTR: + sys.stderr.write('Fatal error reading from wakeup pipe: %s\n' + % message) + raise FatalError + + +class Writer(threading.Thread): + """Thread that writes to files by processing requests from a Queue. + + Until AIO becomes widely available, it is impossible to make a nonblocking + write to an ordinary file. The Writer thread processes all writing to + ordinary files so that the main thread can work without blocking. + """ + OPEN = object() + EOF = object() + ABORT = object() + + def __init__(self, outdir, errdir): + threading.Thread.__init__(self) + # A daemon thread automatically dies if the program is terminated. + self.setDaemon(True) + self.queue = queue.Queue() + self.outdir = outdir + self.errdir = errdir + + self.host_counts = {} + self.files = {} + + def run(self): + while True: + filename, data = self.queue.get() + if filename == self.ABORT: + return + + if data == self.OPEN: + self.files[filename] = open(filename, 'wb', buffering=1) + psshutil.set_cloexec(self.files[filename]) + else: + dest = self.files[filename] + if data == self.EOF: + dest.close() + else: + dest.write(data) + + def open_files(self, host): + """Called from another thread to create files for stdout and stderr. + + Returns a pair of filenames (outfile, errfile). These filenames are + used as handles for future operations. Either or both may be None if + outdir or errdir or not set. + """ + outfile = errfile = None + if self.outdir or self.errdir: + count = self.host_counts.get(host, 0) + self.host_counts[host] = count + 1 + if count: + filename = "%s.%s" % (host, count) + else: + filename = host + if self.outdir: + outfile = os.path.join(self.outdir, filename) + self.queue.put((outfile, self.OPEN)) + if self.errdir: + errfile = os.path.join(self.errdir, filename) + self.queue.put((errfile, self.OPEN)) + return outfile, errfile + + def write(self, filename, data): + """Called from another thread to enqueue a write.""" + self.queue.put((filename, data)) + + def close(self, filename): + """Called from another thread to close the given file.""" + self.queue.put((filename, self.EOF)) + + def signal_quit(self): + """Called from another thread to request the Writer to quit.""" + self.queue.put((self.ABORT, None)) + diff --git a/planetlab/pssh/psshlib/psshutil.py b/planetlab/pssh/psshlib/psshutil.py new file mode 100644 index 0000000..ae1a24c --- /dev/null +++ b/planetlab/pssh/psshlib/psshutil.py @@ -0,0 +1,108 @@ +# Copyright (c) 2009, Andrew McNabb +# Copyright (c) 2003-2008, Brent N. Chun + +import fcntl +import string +import sys + +HOST_FORMAT = 'Host format is [user@]host[:port] [user]' + + +def read_host_files(paths, default_user=None, default_port=None): + """Reads the given host files. + + Returns a list of (host, port, user) triples. + """ + hosts = [] + if paths: + for path in paths: + hosts.extend(read_host_file(path, default_user=default_user)) + return hosts + + +def read_host_file(path, default_user=None, default_port=None): + """Reads the given host file. + + Lines are of the form: host[:port] [login]. + Returns a list of (host, port, user) triples. + """ + lines = [] + f = open(path) + for line in f: + lines.append(line.strip()) + f.close() + + hosts = [] + for line in lines: + # Skip blank lines or lines starting with # + line = line.strip() + if not line or line.startswith('#'): + continue + host, port, user = parse_host_entry(line, default_user, default_port) + if host: + hosts.append((host, port, user)) + return hosts + + +# TODO: deprecate the second host field and standardize on the +# [user@]host[:port] format. +def parse_host_entry(line, default_user, default_port): + """Parses a single host entry. + + This may take either the of the form [user@]host[:port] or + host[:port][ user]. + + Returns a (host, port, user) triple. + """ + fields = line.split() + if len(fields) > 2: + sys.stderr.write('Bad line: "%s". Format should be' + ' [user@]host[:port] [user]\n' % line) + return None, None, None + host_field = fields[0] + host, port, user = parse_host(host_field, default_port=default_port) + if len(fields) == 2: + if user is None: + user = fields[1] + else: + sys.stderr.write('User specified twice in line: "%s"\n' % line) + return None, None, None + if user is None: + user = default_user + return host, port, user + + +def parse_host_string(host_string, default_user=None, default_port=None): + """Parses a whitespace-delimited string of "[user@]host[:port]" entries. + + Returns a list of (host, port, user) triples. + """ + hosts = [] + entries = host_string.split() + for entry in entries: + hosts.append(parse_host(entry, default_user, default_port)) + return hosts + + +def parse_host(host, default_user=None, default_port=None): + """Parses host entries of the form "[user@]host[:port]". + + Returns a (host, port, user) triple. + """ + # TODO: when we stop supporting Python 2.4, switch to using str.partition. + user = default_user + port = default_port + if '@' in host: + user, host = host.split('@', 1) + if ':' in host: + host, port = host.rsplit(':', 1) + return (host, port, user) + + +def set_cloexec(filelike): + """Sets the underlying filedescriptor to automatically close on exec. + + If set_cloexec is called for all open files, then subprocess.Popen does + not require the close_fds option. + """ + fcntl.fcntl(filelike.fileno(), fcntl.FD_CLOEXEC, 1) diff --git a/planetlab/pssh/psshlib/task.py b/planetlab/pssh/psshlib/task.py new file mode 100644 index 0000000..d2ac132 --- /dev/null +++ b/planetlab/pssh/psshlib/task.py @@ -0,0 +1,281 @@ +# Copyright (c) 2009, Andrew McNabb + +from errno import EINTR +from subprocess import Popen, PIPE +import os +import signal +import sys +import time +import traceback + +from psshlib import askpass_client +from psshlib import color + +BUFFER_SIZE = 1 << 16 + +try: + bytes +except NameError: + bytes = str + + +class Task(object): + """Starts a process and manages its input and output. + + Upon completion, the `exitstatus` attribute is set to the exit status + of the process. + """ + def __init__(self, host, port, user, cmd, opts, stdin=None): + self.exitstatus = None + + self.host = host + self.pretty_host = host + self.port = port + self.cmd = cmd + + if user != opts.user: + self.pretty_host = '@'.join((user, self.pretty_host)) + if port: + self.pretty_host = ':'.join((self.pretty_host, port)) + + self.proc = None + self.writer = None + self.timestamp = None + self.failures = [] + self.killed = False + self.inputbuffer = stdin + self.byteswritten = 0 + self.outputbuffer = bytes() + self.errorbuffer = bytes() + + self.stdin = None + self.stdout = None + self.stderr = None + self.outfile = None + self.errfile = None + + # Set options. + self.verbose = opts.verbose + try: + self.print_out = bool(opts.print_out) + except AttributeError: + self.print_out = False + try: + self.inline = bool(opts.inline) + except AttributeError: + self.inline = False + + def start(self, nodenum, iomap, writer, askpass_socket=None): + """Starts the process and registers files with the IOMap.""" + self.writer = writer + + if writer: + self.outfile, self.errfile = writer.open_files(self.pretty_host) + + # Set up the environment. + environ = dict(os.environ) + environ['PSSH_NODENUM'] = str(nodenum) + # Disable the GNOME pop-up password dialog and allow ssh to use + # askpass.py to get a provided password. If the module file is + # askpass.pyc, we replace the extension. + environ['SSH_ASKPASS'] = askpass_client.executable_path() + if askpass_socket: + environ['PSSH_ASKPASS_SOCKET'] = askpass_socket + # Work around a mis-feature in ssh where it won't call SSH_ASKPASS + # if DISPLAY is unset. + if 'DISPLAY' not in environ: + environ['DISPLAY'] = 'pssh-gibberish' + + # Create the subprocess. Since we carefully call set_cloexec() on + # all open files, we specify close_fds=False. + self.proc = Popen(self.cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, + close_fds=False, preexec_fn=os.setsid, env=environ) + self.timestamp = time.time() + if self.inputbuffer: + self.stdin = self.proc.stdin + iomap.register_write(self.stdin.fileno(), self.handle_stdin) + else: + self.proc.stdin.close() + self.stdout = self.proc.stdout + iomap.register_read(self.stdout.fileno(), self.handle_stdout) + self.stderr = self.proc.stderr + iomap.register_read(self.stderr.fileno(), self.handle_stderr) + + def _kill(self): + """Signals the process to terminate.""" + if self.proc: + try: + os.kill(-self.proc.pid, signal.SIGKILL) + except OSError: + # If the kill fails, then just assume the process is dead. + pass + self.killed = True + + def timedout(self): + """Kills the process and registers a timeout error.""" + if not self.killed: + self._kill() + self.failures.append('Timed out') + + def interrupted(self): + """Kills the process and registers an keyboard interrupt error.""" + if not self.killed: + self._kill() + self.failures.append('Interrupted') + + def cancel(self): + """Stops a task that has not started.""" + self.failures.append('Cancelled') + + def elapsed(self): + """Finds the time in seconds since the process was started.""" + return time.time() - self.timestamp + + def running(self): + """Finds if the process has terminated and saves the return code.""" + if self.stdin or self.stdout or self.stderr: + return True + if self.proc: + self.exitstatus = self.proc.poll() + if self.exitstatus is None: + if self.killed: + # Set the exitstatus to what it would be if we waited. + self.exitstatus = -signal.SIGKILL + return False + else: + return True + else: + if self.exitstatus < 0: + message = 'Killed by signal %s' % (-self.exitstatus) + self.failures.append(message) + elif self.exitstatus > 0: + message = 'Exited with error code %s' % self.exitstatus + self.failures.append(message) + self.proc = None + return False + + def handle_stdin(self, fd, iomap): + """Called when the process's standard input is ready for writing.""" + try: + start = self.byteswritten + if start < len(self.inputbuffer): + chunk = self.inputbuffer[start:start+BUFFER_SIZE] + self.byteswritten = start + os.write(fd, chunk) + else: + self.close_stdin(iomap) + except (OSError, IOError): + _, e, _ = sys.exc_info() + if e.errno != EINTR: + self.close_stdin(iomap) + self.log_exception(e) + + def close_stdin(self, iomap): + if self.stdin: + iomap.unregister(self.stdin.fileno()) + self.stdin.close() + self.stdin = None + + def handle_stdout(self, fd, iomap): + """Called when the process's standard output is ready for reading.""" + try: + buf = os.read(fd, BUFFER_SIZE) + if buf: + if self.inline: + self.outputbuffer += buf + if self.outfile: + self.writer.write(self.outfile, buf) + if self.print_out: + sys.stdout.write('%s: %s' % (self.host, buf)) + if buf[-1] != '\n': + sys.stdout.write('\n') + else: + self.close_stdout(iomap) + except (OSError, IOError): + _, e, _ = sys.exc_info() + if e.errno != EINTR: + self.close_stdout(iomap) + self.log_exception(e) + + def close_stdout(self, iomap): + if self.stdout: + iomap.unregister(self.stdout.fileno()) + self.stdout.close() + self.stdout = None + if self.outfile: + self.writer.close(self.outfile) + self.outfile = None + + def handle_stderr(self, fd, iomap): + """Called when the process's standard error is ready for reading.""" + try: + buf = os.read(fd, BUFFER_SIZE) + if buf: + if self.inline: + self.errorbuffer += buf + if self.errfile: + self.writer.write(self.errfile, buf) + else: + self.close_stderr(iomap) + except (OSError, IOError): + _, e, _ = sys.exc_info() + if e.errno != EINTR: + self.close_stderr(iomap) + self.log_exception(e) + + def close_stderr(self, iomap): + if self.stderr: + iomap.unregister(self.stderr.fileno()) + self.stderr.close() + self.stderr = None + if self.errfile: + self.writer.close(self.errfile) + self.errfile = None + + def log_exception(self, e): + """Saves a record of the most recent exception for error reporting.""" + if self.verbose: + exc_type, exc_value, exc_traceback = sys.exc_info() + exc = ("Exception: %s, %s, %s" % + (exc_type, exc_value, traceback.format_tb(exc_traceback))) + else: + exc = str(e) + self.failures.append(exc) + + def report(self, n): + """Pretty prints a status report after the Task completes.""" + error = ', '.join(self.failures) + tstamp = time.asctime().split()[3] # Current time + if color.has_colors(sys.stdout): + progress = color.c("[%s]" % color.B(n)) + success = color.g("[%s]" % color.B("SUCCESS")) + failure = color.r("[%s]" % color.B("FAILURE")) + stderr = color.r("Stderr: ") + error = color.r(color.B(error)) + else: + progress = "[%s]" % n + success = "[SUCCESS]" + failure = "[FAILURE]" + stderr = "Stderr: " + host = self.pretty_host + if self.failures: + print(' '.join((progress, tstamp, failure, host, error))) + else: + print(' '.join((progress, tstamp, success, host))) + # NOTE: The extra flushes are to ensure that the data is output in + # the correct order with the C implementation of io. + if self.outputbuffer: + sys.stdout.flush() + try: + sys.stdout.buffer.write(self.outputbuffer) + sys.stdout.flush() + except AttributeError: + sys.stdout.write(self.outputbuffer) + if self.errorbuffer: + sys.stdout.write(stderr) + # Flush the TextIOWrapper before writing to the binary buffer. + sys.stdout.flush() + try: + sys.stdout.buffer.write(self.errorbuffer) + except AttributeError: + sys.stdout.write(self.errorbuffer) + -- cgit v1.2.3-70-g09d2