diff options
| author | Thibaut Horel <thibaut.horel@gmail.com> | 2010-11-08 00:59:14 +0100 |
|---|---|---|
| committer | Thibaut Horel <thibaut.horel@gmail.com> | 2010-11-08 00:59:14 +0100 |
| commit | b0a2a305028bf284fc5dcf7e1a696d85787f128f (patch) | |
| tree | e6463e36e381b4342b7c864200a3482cca182618 /sleekxmpp/xmlstream/scheduler.py | |
| parent | b8499306ce329ca3881b1d1dfc3362a3a5c115d0 (diff) | |
| download | alias-b0a2a305028bf284fc5dcf7e1a696d85787f128f.tar.gz | |
Add the sleekxmpp library (will be added as a submodule later)
Diffstat (limited to 'sleekxmpp/xmlstream/scheduler.py')
| -rw-r--r-- | sleekxmpp/xmlstream/scheduler.py | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py new file mode 100644 index 0000000..240d4a4 --- /dev/null +++ b/sleekxmpp/xmlstream/scheduler.py @@ -0,0 +1,202 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2010 Nathanael C. Fritz + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +import time +import threading +import logging +try: + import queue +except ImportError: + import Queue as queue + + +class Task(object): + + """ + A scheduled task that will be executed by the scheduler + after a given time interval has passed. + + Attributes: + name -- The name of the task. + seconds -- The number of seconds to wait before executing. + callback -- The function to execute. + args -- The arguments to pass to the callback. + kwargs -- The keyword arguments to pass to the callback. + repeat -- Indicates if the task should repeat. + Defaults to False. + qpointer -- A pointer to an event queue for queuing callback + execution instead of executing immediately. + + Methods: + run -- Either queue or execute the callback. + reset -- Reset the task's timer. + """ + + def __init__(self, name, seconds, callback, args=None, + kwargs=None, repeat=False, qpointer=None): + """ + Create a new task. + + Arguments: + name -- The name of the task. + seconds -- The number of seconds to wait before executing. + callback -- The function to execute. + args -- The arguments to pass to the callback. + kwargs -- The keyword arguments to pass to the callback. + repeat -- Indicates if the task should repeat. + Defaults to False. + qpointer -- A pointer to an event queue for queuing callback + execution instead of executing immediately. + """ + self.name = name + self.seconds = seconds + self.callback = callback + self.args = args or tuple() + self.kwargs = kwargs or {} + self.repeat = repeat + self.next = time.time() + self.seconds + self.qpointer = qpointer + + def run(self): + """ + Execute the task's callback. + + If an event queue was supplied, place the callback in the queue; + otherwise, execute the callback immediately. + """ + if self.qpointer is not None: + self.qpointer.put(('schedule', self.callback, self.args)) + else: + self.callback(*self.args, **self.kwargs) + self.reset() + return self.repeat + + def reset(self): + """ + Reset the task's timer so that it will repeat. + """ + self.next = time.time() + self.seconds + + +class Scheduler(object): + + """ + A threaded scheduler that allows for updates mid-execution unlike the + scheduler in the standard library. + + http://docs.python.org/library/sched.html#module-sched + + Attributes: + addq -- A queue storing added tasks. + schedule -- A list of tasks in order of execution times. + thread -- If threaded, the thread processing the schedule. + run -- Indicates if the scheduler is running. + parentqueue -- A parent event queue in control of this scheduler. + + Methods: + add -- Add a new task to the schedule. + process -- Process and schedule tasks. + quit -- Stop the scheduler. + """ + + def __init__(self, parentqueue=None, parentstop=None): + """ + Create a new scheduler. + + Arguments: + parentqueue -- A separate event queue controlling this scheduler. + """ + self.addq = queue.Queue() + self.schedule = [] + self.thread = None + self.run = False + self.parentqueue = parentqueue + self.parentstop = parentstop + + def process(self, threaded=True): + """ + Begin accepting and processing scheduled tasks. + + Arguments: + threaded -- Indicates if the scheduler should execute in its own + thread. Defaults to True. + """ + if threaded: + self.thread = threading.Thread(name='sheduler_process', + target=self._process) + self.thread.start() + else: + self._process() + + def _process(self): + """Process scheduled tasks.""" + self.run = True + try: + while self.run and (self.parentstop is None or not self.parentstop.isSet()): + wait = 1 + updated = False + if self.schedule: + wait = self.schedule[0].next - time.time() + try: + if wait <= 0.0: + newtask = self.addq.get(False) + else: + newtask = self.addq.get(True, wait) + except queue.Empty: + cleanup = [] + for task in self.schedule: + if time.time() >= task.next: + updated = True + if not task.run(): + cleanup.append(task) + else: + break + for task in cleanup: + x = self.schedule.pop(self.schedule.index(task)) + else: + updated = True + self.schedule.append(newtask) + finally: + if updated: + self.schedule = sorted(self.schedule, + key=lambda task: task.next) + except KeyboardInterrupt: + self.run = False + if self.parentstop is not None: + logging.debug("stopping parent") + self.parentstop.set() + except SystemExit: + self.run = False + if self.parentstop is not None: + self.parentstop.set() + logging.debug("Quitting Scheduler thread") + if self.parentqueue is not None: + self.parentqueue.put(('quit', None, None)) + + def add(self, name, seconds, callback, args=None, + kwargs=None, repeat=False, qpointer=None): + """ + Schedule a new task. + + Arguments: + name -- The name of the task. + seconds -- The number of seconds to wait before executing. + callback -- The function to execute. + args -- The arguments to pass to the callback. + kwargs -- The keyword arguments to pass to the callback. + repeat -- Indicates if the task should repeat. + Defaults to False. + qpointer -- A pointer to an event queue for queuing callback + execution instead of executing immediately. + """ + self.addq.put(Task(name, seconds, callback, args, + kwargs, repeat, qpointer)) + + def quit(self): + """Shutdown the scheduler.""" + self.run = False |
