summaryrefslogtreecommitdiffstats
path: root/planetlab/pssh/psshlib
diff options
context:
space:
mode:
authorthibauth <thibauth@30fcff6e-8de6-41c7-acce-77ff6d1dd07b>2011-08-01 16:05:49 +0000
committerthibauth <thibauth@30fcff6e-8de6-41c7-acce-77ff6d1dd07b>2011-08-01 16:05:49 +0000
commit3bb7785b59ef92278b24f2636b5250b07ce788ee (patch)
tree44679a6548631ff2f278bcf941cdb6c56349cdf5 /planetlab/pssh/psshlib
parenta33698d5d7d4bb49fadb4e29daef0d6d58c7c2fc (diff)
downloadpacemaker-3bb7785b59ef92278b24f2636b5250b07ce788ee.tar.gz
Planetlab utilities
git-svn-id: https://scm.gforge.inria.fr/svn/pacemaker@50 30fcff6e-8de6-41c7-acce-77ff6d1dd07b
Diffstat (limited to 'planetlab/pssh/psshlib')
-rw-r--r--planetlab/pssh/psshlib/__init__.py0
-rw-r--r--planetlab/pssh/psshlib/askpass_client.py95
-rw-r--r--planetlab/pssh/psshlib/askpass_server.py101
-rw-r--r--planetlab/pssh/psshlib/cli.py108
-rw-r--r--planetlab/pssh/psshlib/color.py39
-rw-r--r--planetlab/pssh/psshlib/manager.py345
-rw-r--r--planetlab/pssh/psshlib/psshutil.py108
-rw-r--r--planetlab/pssh/psshlib/task.py281
8 files changed, 1077 insertions, 0 deletions
diff --git a/planetlab/pssh/psshlib/__init__.py b/planetlab/pssh/psshlib/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/planetlab/pssh/psshlib/__init__.py
diff --git a/planetlab/pssh/psshlib/askpass_client.py b/planetlab/pssh/psshlib/askpass_client.py
new file mode 100644
index 0000000..fa4d40a
--- /dev/null
+++ b/planetlab/pssh/psshlib/askpass_client.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+# -*- Mode: python -*-
+
+# Copyright (c) 2009, Andrew McNabb
+
+"""Implementation of SSH_ASKPASS to get a password to ssh from pssh.
+
+The password is read from the socket specified by the environment variable
+PSSH_ASKPASS_SOCKET. The other end of this socket is pssh.
+
+The ssh man page discusses SSH_ASKPASS as follows:
+ If ssh needs a passphrase, it will read the passphrase from the current
+ terminal if it was run from a terminal. If ssh does not have a terminal
+ associated with it but DISPLAY and SSH_ASKPASS are set, it will execute
+ the program specified by SSH_ASKPASS and open an X11 window to read the
+ passphrase. This is particularly useful when calling ssh from a .xsession
+ or related script. (Note that on some machines it may be necessary to
+ redirect the input from /dev/null to make this work.)
+"""
+
+import os
+import socket
+import sys
+import textwrap
+
+bin_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
+askpass_bin_path = os.path.join(bin_dir, 'pssh-askpass')
+ASKPASS_PATHS = (askpass_bin_path,
+ '/usr/libexec/pssh/pssh-askpass',
+ '/usr/local/libexec/pssh/pssh-askpass',
+ '/usr/lib/pssh/pssh-askpass',
+ '/usr/local/lib/pssh/pssh-askpass')
+
+_executable_path = None
+
+def executable_path():
+ """Determines the value to use for SSH_ASKPASS.
+
+ The value is cached since this may be called many times.
+ """
+ global _executable_path
+ if _executable_path is None:
+ for path in ASKPASS_PATHS:
+ if os.access(path, os.X_OK):
+ _executable_path = path
+ break
+ else:
+ _executable_path = ''
+ sys.stderr.write(textwrap.fill("Warning: could not find an"
+ " executable path for askpass because PSSH was not"
+ " installed correctly. Password prompts will not work."))
+ sys.stderr.write('\n')
+ return _executable_path
+
+def askpass_main():
+ """Connects to pssh over the socket specified at PSSH_ASKPASS_SOCKET."""
+
+ # It's not documented anywhere, as far as I can tell, but ssh may prompt
+ # for a password or ask a yes/no question. The command-line argument
+ # specifies what is needed.
+ if len(sys.argv) > 1:
+ prompt = sys.argv[1]
+ if not prompt.lower().endswith('password: '):
+ sys.stderr.write(prompt)
+ sys.stderr.write('\n')
+ sys.exit(1)
+
+ address = os.getenv('PSSH_ASKPASS_SOCKET')
+ if not address:
+ sys.stderr.write(textwrap.fill("pssh error: SSH requested a password."
+ " Please create SSH keys or use the -A option to provide a"
+ " password."))
+ sys.stderr.write('\n')
+ sys.exit(1)
+
+ sock = socket.socket(socket.AF_UNIX)
+ try:
+ sock.connect(address)
+ except socket.error:
+ _, e, _ = sys.exc_info()
+ message = e.args[1]
+ sys.stderr.write("Couldn't bind to %s: %s.\n" % (address, message))
+ sys.exit(2)
+
+ try:
+ password = sock.makefile().read()
+ except socket.error:
+ sys.stderr.write("Socket error.\n")
+ sys.exit(3)
+
+ print(password)
+
+
+if __name__ == '__main__':
+ askpass_main()
diff --git a/planetlab/pssh/psshlib/askpass_server.py b/planetlab/pssh/psshlib/askpass_server.py
new file mode 100644
index 0000000..a5db977
--- /dev/null
+++ b/planetlab/pssh/psshlib/askpass_server.py
@@ -0,0 +1,101 @@
+#!/usr/bin/env python
+# -*- Mode: python -*-
+
+# Copyright (c) 2009, Andrew McNabb
+
+"""Sends the password over a socket to askpass.
+"""
+
+import errno
+import getpass
+import os
+import socket
+import sys
+import tempfile
+import textwrap
+
+from psshlib import psshutil
+
+
+class PasswordServer(object):
+ """Listens on a UNIX domain socket for password requests."""
+ def __init__(self):
+ self.sock = None
+ self.tempdir = None
+ self.address = None
+ self.socketmap = {}
+ self.buffermap = {}
+
+ def start(self, iomap, backlog):
+ """Prompts for the password, creates a socket, and starts listening.
+
+ The specified backlog should be the max number of clients connecting
+ at once.
+ """
+ message = ('Warning: do not enter your password if anyone else has'
+ ' superuser privileges or access to your account.')
+ print(textwrap.fill(message))
+
+ self.password = getpass.getpass()
+
+ # Note that according to the docs for mkdtemp, "The directory is
+ # readable, writable, and searchable only by the creating user."
+ self.tempdir = tempfile.mkdtemp(prefix='pssh.')
+ self.address = os.path.join(self.tempdir, 'pssh_askpass_socket')
+ self.sock = socket.socket(socket.AF_UNIX)
+ psshutil.set_cloexec(self.sock)
+ self.sock.bind(self.address)
+ self.sock.listen(backlog)
+ iomap.register_read(self.sock.fileno(), self.handle_listen)
+
+ def handle_listen(self, fd, iomap):
+ try:
+ conn = self.sock.accept()[0]
+ except socket.error:
+ _, e, _ = sys.exc_info()
+ number = e.args[0]
+ if number == errno.EINTR:
+ return
+ else:
+ # TODO: print an error message here?
+ self.sock.close()
+ self.sock = None
+ fd = conn.fileno()
+ iomap.register_write(fd, self.handle_write)
+ self.socketmap[fd] = conn
+ self.buffermap[fd] = self.password
+
+ def handle_write(self, fd, iomap):
+ buffer = self.buffermap[fd]
+ conn = self.socketmap[fd]
+ try:
+ bytes_written = conn.send(buffer)
+ except socket.error:
+ _, e, _ = sys.exc_info()
+ number = e.args[0]
+ if number == errno.EINTR:
+ return
+ else:
+ self.close_socket(fd, iomap)
+
+ buffer = buffer[bytes_written:]
+ if buffer:
+ self.buffermap[fd] = buffer
+ else:
+ self.close_socket(fd, iomap)
+
+ def close_socket(self, fd, iomap):
+ iomap.unregister(fd)
+ self.socketmap[fd].close()
+ del self.socketmap[fd]
+ del self.buffermap[fd]
+
+ def __del__(self):
+ if self.sock:
+ self.sock.close()
+ self.sock = None
+ if self.address:
+ os.remove(self.address)
+ if self.tempdir:
+ os.rmdir(self.tempdir)
+
diff --git a/planetlab/pssh/psshlib/cli.py b/planetlab/pssh/psshlib/cli.py
new file mode 100644
index 0000000..1686ba9
--- /dev/null
+++ b/planetlab/pssh/psshlib/cli.py
@@ -0,0 +1,108 @@
+# Copyright (c) 2009, Andrew McNabb
+# Copyright (c) 2003-2008, Brent N. Chun
+
+import optparse
+import os
+import shlex
+import sys
+import textwrap
+
+_DEFAULT_PARALLELISM = 32
+_DEFAULT_TIMEOUT = 0 # "infinity" by default
+
+
+def common_parser():
+ """
+ Create a basic OptionParser with arguments common to all pssh programs.
+ """
+ # The "resolve" conflict handler avoids errors from the hosts option
+ # conflicting with the help option.
+ parser = optparse.OptionParser(conflict_handler='resolve')
+ # Ensure that options appearing after the command are sent to ssh.
+ parser.disable_interspersed_args()
+ parser.epilog = "Example: pssh -h nodes.txt -l irb2 -o /tmp/foo uptime"
+
+ parser.add_option('-h', '--hosts', dest='host_files', action='append',
+ metavar='HOST_FILE',
+ help='hosts file (each line "[user@]host[:port]")')
+ parser.add_option('-H', '--host', dest='host_strings', action='append',
+ metavar='HOST_STRING',
+ help='additional host entries ("[user@]host[:port]")')
+ parser.add_option('-l', '--user', dest='user',
+ help='username (OPTIONAL)')
+ parser.add_option('-p', '--par', dest='par', type='int',
+ help='max number of parallel threads (OPTIONAL)')
+ parser.add_option('-o', '--outdir', dest='outdir',
+ help='output directory for stdout files (OPTIONAL)')
+ parser.add_option('-e', '--errdir', dest='errdir',
+ help='output directory for stderr files (OPTIONAL)')
+ parser.add_option('-t', '--timeout', dest='timeout', type='int',
+ help='timeout (secs) (0 = no timeout) per host (OPTIONAL)')
+ parser.add_option('-O', '--option', dest='options', action='append',
+ metavar='OPTION', help='SSH option (OPTIONAL)')
+ parser.add_option('-v', '--verbose', dest='verbose', action='store_true',
+ help='turn on warning and diagnostic messages (OPTIONAL)')
+ parser.add_option('-A', '--askpass', dest='askpass', action='store_true',
+ help='Ask for a password (OPTIONAL)')
+ parser.add_option('-x', '--extra-args', action='callback', type='string',
+ metavar='ARGS', callback=shlex_append, dest='extra',
+ help='Extra command-line arguments, with processing for '
+ 'spaces, quotes, and backslashes')
+ parser.add_option('-X', '--extra-arg', dest='extra', action='append',
+ metavar='ARG', help='Extra command-line argument')
+
+ return parser
+
+
+def common_defaults(**kwargs):
+ defaults = dict(par=_DEFAULT_PARALLELISM, timeout=_DEFAULT_TIMEOUT)
+ defaults.update(**kwargs)
+ envvars = [('user', 'PSSH_USER'),
+ ('par', 'PSSH_PAR'),
+ ('outdir', 'PSSH_OUTDIR'),
+ ('errdir', 'PSSH_ERRDIR'),
+ ('timeout', 'PSSH_TIMEOUT'),
+ ('verbose', 'PSSH_VERBOSE'),
+ ('print_out', 'PSSH_PRINT'),
+ ('askpass', 'PSSH_ASKPASS'),
+ ('inline', 'PSSH_INLINE'),
+ ('recursive', 'PSSH_RECURSIVE'),
+ ('archive', 'PSSH_ARCHIVE'),
+ ('compress', 'PSSH_COMPRESS'),
+ ('localdir', 'PSSH_LOCALDIR'),
+ ]
+ for option, var, in envvars:
+ value = os.getenv(var)
+ if value:
+ defaults[option] = value
+
+ value = os.getenv('PSSH_OPTIONS')
+ if value:
+ defaults['options'] = [value]
+
+ value = os.getenv('PSSH_HOSTS')
+ if value:
+ message1 = ('Warning: the PSSH_HOSTS environment variable is '
+ 'deprecated. Please use the "-h" option instead, and consider '
+ 'creating aliases for convenience. For example:')
+ message2 = " alias pssh_abc='pssh -h /path/to/hosts_abc'"
+ sys.stderr.write(textwrap.fill(message1))
+ sys.stderr.write('\n')
+ sys.stderr.write(message2)
+ sys.stderr.write('\n')
+ defaults['host_files'] = [value]
+
+ return defaults
+
+
+def shlex_append(option, opt_str, value, parser):
+ """An optparse callback similar to the append action.
+
+ The given value is processed with shlex, and the resulting list is
+ concatenated to the option's dest list.
+ """
+ lst = getattr(parser.values, option.dest)
+ if lst is None:
+ lst = []
+ setattr(parser.values, option.dest, lst)
+ lst.extend(shlex.split(value))
diff --git a/planetlab/pssh/psshlib/color.py b/planetlab/pssh/psshlib/color.py
new file mode 100644
index 0000000..eb9f001
--- /dev/null
+++ b/planetlab/pssh/psshlib/color.py
@@ -0,0 +1,39 @@
+# Copyright (c) 2009, Andrew McNabb
+# Copyright (c) 2003-2008, Brent N. Chun
+
+def with_color(string, fg, bg=49):
+ '''Given foreground/background ANSI color codes, return a string that,
+ when printed, will format the supplied string using the supplied colors.
+ '''
+ return "\x1b[%dm\x1b[%dm%s\x1b[39m\x1b[49m" % (fg, bg, string)
+
+def B(string):
+ '''Returns a string that, when printed, will display the supplied string
+ in ANSI bold.
+ '''
+ return "\x1b[1m%s\x1b[22m" % string
+
+def r(string): return with_color(string, 31) # Red
+def g(string): return with_color(string, 32) # Green
+def y(string): return with_color(string, 33) # Yellow
+def b(string): return with_color(string, 34) # Blue
+def m(string): return with_color(string, 35) # Magenta
+def c(string): return with_color(string, 36) # Cyan
+def w(string): return with_color(string, 37) # White
+
+#following from Python cookbook, #475186
+def has_colors(stream):
+ '''Returns boolean indicating whether or not the supplied stream supports
+ ANSI color.
+ '''
+ if not hasattr(stream, "isatty"):
+ return False
+ if not stream.isatty():
+ return False # auto color only on TTYs
+ try:
+ import curses
+ curses.setupterm()
+ return curses.tigetnum("colors") > 2
+ except:
+ # guess false in case of error
+ return False
diff --git a/planetlab/pssh/psshlib/manager.py b/planetlab/pssh/psshlib/manager.py
new file mode 100644
index 0000000..b10959d
--- /dev/null
+++ b/planetlab/pssh/psshlib/manager.py
@@ -0,0 +1,345 @@
+# Copyright (c) 2009, Andrew McNabb
+
+from errno import EINTR
+import os
+import select
+import signal
+import sys
+import threading
+
+try:
+ import queue
+except ImportError:
+ import Queue as queue
+
+from psshlib.askpass_server import PasswordServer
+from psshlib import psshutil
+
+READ_SIZE = 1 << 16
+
+
+class FatalError(RuntimeError):
+ """A fatal error in the PSSH Manager."""
+ pass
+
+
+class Manager(object):
+ """Executes tasks concurrently.
+
+ Tasks are added with add_task() and executed in parallel with run().
+ Returns a list of the exit statuses of the processes.
+
+ Arguments:
+ limit: Maximum number of commands running at once.
+ timeout: Maximum allowed execution time in seconds.
+ """
+ def __init__(self, opts):
+ self.limit = opts.par
+ self.timeout = opts.timeout
+ self.askpass = opts.askpass
+ self.outdir = opts.outdir
+ self.errdir = opts.errdir
+ self.iomap = IOMap()
+
+ self.taskcount = 0
+ self.tasks = []
+ self.running = []
+ self.done = []
+
+ self.askpass_socket = None
+
+ def run(self):
+ """Processes tasks previously added with add_task."""
+ try:
+ if self.outdir or self.errdir:
+ writer = Writer(self.outdir, self.errdir)
+ writer.start()
+ else:
+ writer = None
+
+ if self.askpass:
+ pass_server = PasswordServer()
+ pass_server.start(self.iomap, self.limit)
+ self.askpass_socket = pass_server.address
+
+ self.set_sigchld_handler()
+
+ try:
+ self.update_tasks(writer)
+ wait = None
+ while self.running or self.tasks:
+ # Opt for efficiency over subsecond timeout accuracy.
+ if wait is None or wait < 1:
+ wait = 1
+ self.iomap.poll(wait)
+ self.update_tasks(writer)
+ wait = self.check_timeout()
+ except KeyboardInterrupt:
+ # This exception handler tries to clean things up and prints
+ # out a nice status message for each interrupted host.
+ self.interrupted()
+
+ except KeyboardInterrupt:
+ # This exception handler doesn't print out any fancy status
+ # information--it just stops.
+ pass
+
+ if writer:
+ writer.signal_quit()
+ writer.join()
+
+ statuses = [task.exitstatus for task in self.done]
+ return statuses
+
+ def clear_sigchld_handler(self):
+ signal.signal(signal.SIGCHLD, signal.SIG_DFL)
+
+ def set_sigchld_handler(self):
+ # TODO: find out whether set_wakeup_fd still works if the default
+ # signal handler is used (I'm pretty sure it doesn't work if the
+ # signal is ignored).
+ signal.signal(signal.SIGCHLD, self.handle_sigchld)
+ # This should keep reads and writes from getting EINTR.
+ if hasattr(signal, 'siginterrupt'):
+ signal.siginterrupt(signal.SIGCHLD, False)
+
+ def handle_sigchld(self, number, frame):
+ """Apparently we need a sigchld handler to make set_wakeup_fd work."""
+ # Write to the signal pipe (only for Python <2.5, where the
+ # set_wakeup_fd method doesn't exist).
+ if self.iomap.wakeup_writefd:
+ os.write(self.iomap.wakeup_writefd, '\0')
+ for task in self.running:
+ if task.proc:
+ task.proc.poll()
+ # Apparently some UNIX systems automatically resent the SIGCHLD
+ # handler to SIG_DFL. Reset it just in case.
+ self.set_sigchld_handler()
+
+ def add_task(self, task):
+ """Adds a Task to be processed with run()."""
+ self.tasks.append(task)
+
+ def update_tasks(self, writer):
+ """Reaps tasks and starts as many new ones as allowed."""
+ # Mask signals to work around a Python bug:
+ # http://bugs.python.org/issue1068268
+ # Since sigprocmask isn't in the stdlib, clear the SIGCHLD handler.
+ # Since signals are masked, reap_tasks needs to be called once for
+ # each loop.
+ keep_running = True
+ while keep_running:
+ self.clear_sigchld_handler()
+ self._start_tasks_once(writer)
+ self.set_sigchld_handler()
+ keep_running = self.reap_tasks()
+
+ def _start_tasks_once(self, writer):
+ """Starts tasks once.
+
+ Due to http://bugs.python.org/issue1068268, signals must be masked
+ when this method is called.
+ """
+ while 0 < len(self.tasks) and len(self.running) < self.limit:
+ task = self.tasks.pop(0)
+ self.running.append(task)
+ task.start(self.taskcount, self.iomap, writer, self.askpass_socket)
+ self.taskcount += 1
+
+ def reap_tasks(self):
+ """Checks to see if any tasks have terminated.
+
+ After cleaning up, returns the number of tasks that finished.
+ """
+ still_running = []
+ finished_count = 0
+ for task in self.running:
+ if task.running():
+ still_running.append(task)
+ else:
+ self.finished(task)
+ finished_count += 1
+ self.running = still_running
+ return finished_count
+
+ def check_timeout(self):
+ """Kills timed-out processes and returns the lowest time left."""
+ if self.timeout <= 0:
+ return None
+
+ min_timeleft = None
+ for task in self.running:
+ timeleft = self.timeout - task.elapsed()
+ if timeleft <= 0:
+ task.timedout()
+ continue
+ if min_timeleft is None or timeleft < min_timeleft:
+ min_timeleft = timeleft
+
+ if min_timeleft is None:
+ return 0
+ else:
+ return max(0, min_timeleft)
+
+ def interrupted(self):
+ """Cleans up after a keyboard interrupt."""
+ for task in self.running:
+ task.interrupted()
+ self.finished(task)
+
+ for task in self.tasks:
+ task.cancel()
+ self.finished(task)
+
+ def finished(self, task):
+ """Marks a task as complete and reports its status to stdout."""
+ self.done.append(task)
+ n = len(self.done)
+ task.report(n)
+
+
+class IOMap(object):
+ """A manager for file descriptors and their associated handlers.
+
+ The poll method dispatches events to the appropriate handlers.
+ """
+ def __init__(self):
+ self.readmap = {}
+ self.writemap = {}
+
+ # Setup the wakeup file descriptor to avoid hanging on lost signals.
+ wakeup_readfd, wakeup_writefd = os.pipe()
+ self.register_read(wakeup_readfd, self.wakeup_handler)
+ # TODO: remove test when we stop supporting Python <2.5
+ if hasattr(signal, 'set_wakeup_fd'):
+ signal.set_wakeup_fd(wakeup_writefd)
+ self.wakeup_writefd = None
+ else:
+ self.wakeup_writefd = wakeup_writefd
+
+ def register_read(self, fd, handler):
+ """Registers an IO handler for a file descriptor for reading."""
+ self.readmap[fd] = handler
+
+ def register_write(self, fd, handler):
+ """Registers an IO handler for a file descriptor for writing."""
+ self.writemap[fd] = handler
+
+ def unregister(self, fd):
+ """Unregisters the given file descriptor."""
+ if fd in self.readmap:
+ del self.readmap[fd]
+ if fd in self.writemap:
+ del self.writemap[fd]
+
+ def poll(self, timeout=None):
+ """Performs a poll and dispatches the resulting events."""
+ if not self.readmap and not self.writemap:
+ return
+ rlist = list(self.readmap)
+ wlist = list(self.writemap)
+ try:
+ rlist, wlist, _ = select.select(rlist, wlist, [], timeout)
+ except select.error:
+ _, e, _ = sys.exc_info()
+ errno = e.args[0]
+ if errno == EINTR:
+ return
+ else:
+ raise
+ for fd in rlist:
+ handler = self.readmap[fd]
+ handler(fd, self)
+ for fd in wlist:
+ handler = self.writemap[fd]
+ handler(fd, self)
+
+ def wakeup_handler(self, fd, iomap):
+ """Handles read events on the signal wakeup pipe.
+
+ This ensures that SIGCHLD signals aren't lost.
+ """
+ try:
+ os.read(fd, READ_SIZE)
+ except (OSError, IOError):
+ _, e, _ = sys.exc_info()
+ errno, message = e.args
+ if errno != EINTR:
+ sys.stderr.write('Fatal error reading from wakeup pipe: %s\n'
+ % message)
+ raise FatalError
+
+
+class Writer(threading.Thread):
+ """Thread that writes to files by processing requests from a Queue.
+
+ Until AIO becomes widely available, it is impossible to make a nonblocking
+ write to an ordinary file. The Writer thread processes all writing to
+ ordinary files so that the main thread can work without blocking.
+ """
+ OPEN = object()
+ EOF = object()
+ ABORT = object()
+
+ def __init__(self, outdir, errdir):
+ threading.Thread.__init__(self)
+ # A daemon thread automatically dies if the program is terminated.
+ self.setDaemon(True)
+ self.queue = queue.Queue()
+ self.outdir = outdir
+ self.errdir = errdir
+
+ self.host_counts = {}
+ self.files = {}
+
+ def run(self):
+ while True:
+ filename, data = self.queue.get()
+ if filename == self.ABORT:
+ return
+
+ if data == self.OPEN:
+ self.files[filename] = open(filename, 'wb', buffering=1)
+ psshutil.set_cloexec(self.files[filename])
+ else:
+ dest = self.files[filename]
+ if data == self.EOF:
+ dest.close()
+ else:
+ dest.write(data)
+
+ def open_files(self, host):
+ """Called from another thread to create files for stdout and stderr.
+
+ Returns a pair of filenames (outfile, errfile). These filenames are
+ used as handles for future operations. Either or both may be None if
+ outdir or errdir or not set.
+ """
+ outfile = errfile = None
+ if self.outdir or self.errdir:
+ count = self.host_counts.get(host, 0)
+ self.host_counts[host] = count + 1
+ if count:
+ filename = "%s.%s" % (host, count)
+ else:
+ filename = host
+ if self.outdir:
+ outfile = os.path.join(self.outdir, filename)
+ self.queue.put((outfile, self.OPEN))
+ if self.errdir:
+ errfile = os.path.join(self.errdir, filename)
+ self.queue.put((errfile, self.OPEN))
+ return outfile, errfile
+
+ def write(self, filename, data):
+ """Called from another thread to enqueue a write."""
+ self.queue.put((filename, data))
+
+ def close(self, filename):
+ """Called from another thread to close the given file."""
+ self.queue.put((filename, self.EOF))
+
+ def signal_quit(self):
+ """Called from another thread to request the Writer to quit."""
+ self.queue.put((self.ABORT, None))
+
diff --git a/planetlab/pssh/psshlib/psshutil.py b/planetlab/pssh/psshlib/psshutil.py
new file mode 100644
index 0000000..ae1a24c
--- /dev/null
+++ b/planetlab/pssh/psshlib/psshutil.py
@@ -0,0 +1,108 @@
+# Copyright (c) 2009, Andrew McNabb
+# Copyright (c) 2003-2008, Brent N. Chun
+
+import fcntl
+import string
+import sys
+
+HOST_FORMAT = 'Host format is [user@]host[:port] [user]'
+
+
+def read_host_files(paths, default_user=None, default_port=None):
+ """Reads the given host files.
+
+ Returns a list of (host, port, user) triples.
+ """
+ hosts = []
+ if paths:
+ for path in paths:
+ hosts.extend(read_host_file(path, default_user=default_user))
+ return hosts
+
+
+def read_host_file(path, default_user=None, default_port=None):
+ """Reads the given host file.
+
+ Lines are of the form: host[:port] [login].
+ Returns a list of (host, port, user) triples.
+ """
+ lines = []
+ f = open(path)
+ for line in f:
+ lines.append(line.strip())
+ f.close()
+
+ hosts = []
+ for line in lines:
+ # Skip blank lines or lines starting with #
+ line = line.strip()
+ if not line or line.startswith('#'):
+ continue
+ host, port, user = parse_host_entry(line, default_user, default_port)
+ if host:
+ hosts.append((host, port, user))
+ return hosts
+
+
+# TODO: deprecate the second host field and standardize on the
+# [user@]host[:port] format.
+def parse_host_entry(line, default_user, default_port):
+ """Parses a single host entry.
+
+ This may take either the of the form [user@]host[:port] or
+ host[:port][ user].
+
+ Returns a (host, port, user) triple.
+ """
+ fields = line.split()
+ if len(fields) > 2:
+ sys.stderr.write('Bad line: "%s". Format should be'
+ ' [user@]host[:port] [user]\n' % line)
+ return None, None, None
+ host_field = fields[0]
+ host, port, user = parse_host(host_field, default_port=default_port)
+ if len(fields) == 2:
+ if user is None:
+ user = fields[1]
+ else:
+ sys.stderr.write('User specified twice in line: "%s"\n' % line)
+ return None, None, None
+ if user is None:
+ user = default_user
+ return host, port, user
+
+
+def parse_host_string(host_string, default_user=None, default_port=None):
+ """Parses a whitespace-delimited string of "[user@]host[:port]" entries.
+
+ Returns a list of (host, port, user) triples.
+ """
+ hosts = []
+ entries = host_string.split()
+ for entry in entries:
+ hosts.append(parse_host(entry, default_user, default_port))
+ return hosts
+
+
+def parse_host(host, default_user=None, default_port=None):
+ """Parses host entries of the form "[user@]host[:port]".
+
+ Returns a (host, port, user) triple.
+ """
+ # TODO: when we stop supporting Python 2.4, switch to using str.partition.
+ user = default_user
+ port = default_port
+ if '@' in host:
+ user, host = host.split('@', 1)
+ if ':' in host:
+ host, port = host.rsplit(':', 1)
+ return (host, port, user)
+
+
+def set_cloexec(filelike):
+ """Sets the underlying filedescriptor to automatically close on exec.
+
+ If set_cloexec is called for all open files, then subprocess.Popen does
+ not require the close_fds option.
+ """
+ fcntl.fcntl(filelike.fileno(), fcntl.FD_CLOEXEC, 1)
diff --git a/planetlab/pssh/psshlib/task.py b/planetlab/pssh/psshlib/task.py
new file mode 100644
index 0000000..d2ac132
--- /dev/null
+++ b/planetlab/pssh/psshlib/task.py
@@ -0,0 +1,281 @@
+# Copyright (c) 2009, Andrew McNabb
+
+from errno import EINTR
+from subprocess import Popen, PIPE
+import os
+import signal
+import sys
+import time
+import traceback
+
+from psshlib import askpass_client
+from psshlib import color
+
+BUFFER_SIZE = 1 << 16
+
+try:
+ bytes
+except NameError:
+ bytes = str
+
+
+class Task(object):
+ """Starts a process and manages its input and output.
+
+ Upon completion, the `exitstatus` attribute is set to the exit status
+ of the process.
+ """
+ def __init__(self, host, port, user, cmd, opts, stdin=None):
+ self.exitstatus = None
+
+ self.host = host
+ self.pretty_host = host
+ self.port = port
+ self.cmd = cmd
+
+ if user != opts.user:
+ self.pretty_host = '@'.join((user, self.pretty_host))
+ if port:
+ self.pretty_host = ':'.join((self.pretty_host, port))
+
+ self.proc = None
+ self.writer = None
+ self.timestamp = None
+ self.failures = []
+ self.killed = False
+ self.inputbuffer = stdin
+ self.byteswritten = 0
+ self.outputbuffer = bytes()
+ self.errorbuffer = bytes()
+
+ self.stdin = None
+ self.stdout = None
+ self.stderr = None
+ self.outfile = None
+ self.errfile = None
+
+ # Set options.
+ self.verbose = opts.verbose
+ try:
+ self.print_out = bool(opts.print_out)
+ except AttributeError:
+ self.print_out = False
+ try:
+ self.inline = bool(opts.inline)
+ except AttributeError:
+ self.inline = False
+
+ def start(self, nodenum, iomap, writer, askpass_socket=None):
+ """Starts the process and registers files with the IOMap."""
+ self.writer = writer
+
+ if writer:
+ self.outfile, self.errfile = writer.open_files(self.pretty_host)
+
+ # Set up the environment.
+ environ = dict(os.environ)
+ environ['PSSH_NODENUM'] = str(nodenum)
+ # Disable the GNOME pop-up password dialog and allow ssh to use
+ # askpass.py to get a provided password. If the module file is
+ # askpass.pyc, we replace the extension.
+ environ['SSH_ASKPASS'] = askpass_client.executable_path()
+ if askpass_socket:
+ environ['PSSH_ASKPASS_SOCKET'] = askpass_socket
+ # Work around a mis-feature in ssh where it won't call SSH_ASKPASS
+ # if DISPLAY is unset.
+ if 'DISPLAY' not in environ:
+ environ['DISPLAY'] = 'pssh-gibberish'
+
+ # Create the subprocess. Since we carefully call set_cloexec() on
+ # all open files, we specify close_fds=False.
+ self.proc = Popen(self.cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE,
+ close_fds=False, preexec_fn=os.setsid, env=environ)
+ self.timestamp = time.time()
+ if self.inputbuffer:
+ self.stdin = self.proc.stdin
+ iomap.register_write(self.stdin.fileno(), self.handle_stdin)
+ else:
+ self.proc.stdin.close()
+ self.stdout = self.proc.stdout
+ iomap.register_read(self.stdout.fileno(), self.handle_stdout)
+ self.stderr = self.proc.stderr
+ iomap.register_read(self.stderr.fileno(), self.handle_stderr)
+
+ def _kill(self):
+ """Signals the process to terminate."""
+ if self.proc:
+ try:
+ os.kill(-self.proc.pid, signal.SIGKILL)
+ except OSError:
+ # If the kill fails, then just assume the process is dead.
+ pass
+ self.killed = True
+
+ def timedout(self):
+ """Kills the process and registers a timeout error."""
+ if not self.killed:
+ self._kill()
+ self.failures.append('Timed out')
+
+ def interrupted(self):
+ """Kills the process and registers an keyboard interrupt error."""
+ if not self.killed:
+ self._kill()
+ self.failures.append('Interrupted')
+
+ def cancel(self):
+ """Stops a task that has not started."""
+ self.failures.append('Cancelled')
+
+ def elapsed(self):
+ """Finds the time in seconds since the process was started."""
+ return time.time() - self.timestamp
+
+ def running(self):
+ """Finds if the process has terminated and saves the return code."""
+ if self.stdin or self.stdout or self.stderr:
+ return True
+ if self.proc:
+ self.exitstatus = self.proc.poll()
+ if self.exitstatus is None:
+ if self.killed:
+ # Set the exitstatus to what it would be if we waited.
+ self.exitstatus = -signal.SIGKILL
+ return False
+ else:
+ return True
+ else:
+ if self.exitstatus < 0:
+ message = 'Killed by signal %s' % (-self.exitstatus)
+ self.failures.append(message)
+ elif self.exitstatus > 0:
+ message = 'Exited with error code %s' % self.exitstatus
+ self.failures.append(message)
+ self.proc = None
+ return False
+
+ def handle_stdin(self, fd, iomap):
+ """Called when the process's standard input is ready for writing."""
+ try:
+ start = self.byteswritten
+ if start < len(self.inputbuffer):
+ chunk = self.inputbuffer[start:start+BUFFER_SIZE]
+ self.byteswritten = start + os.write(fd, chunk)
+ else:
+ self.close_stdin(iomap)
+ except (OSError, IOError):
+ _, e, _ = sys.exc_info()
+ if e.errno != EINTR:
+ self.close_stdin(iomap)
+ self.log_exception(e)
+
+ def close_stdin(self, iomap):
+ if self.stdin:
+ iomap.unregister(self.stdin.fileno())
+ self.stdin.close()
+ self.stdin = None
+
+ def handle_stdout(self, fd, iomap):
+ """Called when the process's standard output is ready for reading."""
+ try:
+ buf = os.read(fd, BUFFER_SIZE)
+ if buf:
+ if self.inline:
+ self.outputbuffer += buf
+ if self.outfile:
+ self.writer.write(self.outfile, buf)
+ if self.print_out:
+ sys.stdout.write('%s: %s' % (self.host, buf))
+ if buf[-1] != '\n':
+ sys.stdout.write('\n')
+ else:
+ self.close_stdout(iomap)
+ except (OSError, IOError):
+ _, e, _ = sys.exc_info()
+ if e.errno != EINTR:
+ self.close_stdout(iomap)
+ self.log_exception(e)
+
+ def close_stdout(self, iomap):
+ if self.stdout:
+ iomap.unregister(self.stdout.fileno())
+ self.stdout.close()
+ self.stdout = None
+ if self.outfile:
+ self.writer.close(self.outfile)
+ self.outfile = None
+
+ def handle_stderr(self, fd, iomap):
+ """Called when the process's standard error is ready for reading."""
+ try:
+ buf = os.read(fd, BUFFER_SIZE)
+ if buf:
+ if self.inline:
+ self.errorbuffer += buf
+ if self.errfile:
+ self.writer.write(self.errfile, buf)
+ else:
+ self.close_stderr(iomap)
+ except (OSError, IOError):
+ _, e, _ = sys.exc_info()
+ if e.errno != EINTR:
+ self.close_stderr(iomap)
+ self.log_exception(e)
+
+ def close_stderr(self, iomap):
+ if self.stderr:
+ iomap.unregister(self.stderr.fileno())
+ self.stderr.close()
+ self.stderr = None
+ if self.errfile:
+ self.writer.close(self.errfile)
+ self.errfile = None
+
+ def log_exception(self, e):
+ """Saves a record of the most recent exception for error reporting."""
+ if self.verbose:
+ exc_type, exc_value, exc_traceback = sys.exc_info()
+ exc = ("Exception: %s, %s, %s" %
+ (exc_type, exc_value, traceback.format_tb(exc_traceback)))
+ else:
+ exc = str(e)
+ self.failures.append(exc)
+
+ def report(self, n):
+ """Pretty prints a status report after the Task completes."""
+ error = ', '.join(self.failures)
+ tstamp = time.asctime().split()[3] # Current time
+ if color.has_colors(sys.stdout):
+ progress = color.c("[%s]" % color.B(n))
+ success = color.g("[%s]" % color.B("SUCCESS"))
+ failure = color.r("[%s]" % color.B("FAILURE"))
+ stderr = color.r("Stderr: ")
+ error = color.r(color.B(error))
+ else:
+ progress = "[%s]" % n
+ success = "[SUCCESS]"
+ failure = "[FAILURE]"
+ stderr = "Stderr: "
+ host = self.pretty_host
+ if self.failures:
+ print(' '.join((progress, tstamp, failure, host, error)))
+ else:
+ print(' '.join((progress, tstamp, success, host)))
+ # NOTE: The extra flushes are to ensure that the data is output in
+ # the correct order with the C implementation of io.
+ if self.outputbuffer:
+ sys.stdout.flush()
+ try:
+ sys.stdout.buffer.write(self.outputbuffer)
+ sys.stdout.flush()
+ except AttributeError:
+ sys.stdout.write(self.outputbuffer)
+ if self.errorbuffer:
+ sys.stdout.write(stderr)
+ # Flush the TextIOWrapper before writing to the binary buffer.
+ sys.stdout.flush()
+ try:
+ sys.stdout.buffer.write(self.errorbuffer)
+ except AttributeError:
+ sys.stdout.write(self.errorbuffer)
+