aboutsummaryrefslogtreecommitdiffstats
path: root/sleekxmpp/xmlstream/xmlstream.py
diff options
context:
space:
mode:
authorThibaut Horel <thibaut.horel@gmail.com>2010-12-31 19:19:25 +0100
committerThibaut Horel <thibaut.horel@gmail.com>2010-12-31 19:19:25 +0100
commitd90aec17e2201f256783a531c548dcc9857c889d (patch)
tree56b6d0580ee1993c73e67c63d4a452a81bbaaf1e /sleekxmpp/xmlstream/xmlstream.py
parentaf76bcdf7a947702eaa19d39f5b9ecfcd7ec6fd2 (diff)
downloadalias-d90aec17e2201f256783a531c548dcc9857c889d.tar.gz
Cleanup of repository. Bases of webclient.
* remove sleekxmpp (install guideline in server/README) * move server code to server directory * webclient directory with basic strophejs example
Diffstat (limited to 'sleekxmpp/xmlstream/xmlstream.py')
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py948
1 files changed, 0 insertions, 948 deletions
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
deleted file mode 100644
index 30b76ce..0000000
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ /dev/null
@@ -1,948 +0,0 @@
-"""
- SleekXMPP: The Sleek XMPP Library
- Copyright (C) 2010 Nathanael C. Fritz
- This file is part of SleekXMPP.
-
- See the file LICENSE for copying permission.
-"""
-
-from __future__ import with_statement, unicode_literals
-
-import copy
-import logging
-import socket as Socket
-import ssl
-import sys
-import threading
-import time
-import types
-import signal
-try:
- import queue
-except ImportError:
- import Queue as queue
-
-from sleekxmpp.thirdparty.statemachine import StateMachine
-from sleekxmpp.xmlstream import Scheduler, tostring
-from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET
-
-# In Python 2.x, file socket objects are broken. A patched socket
-# wrapper is provided for this case in filesocket.py.
-if sys.version_info < (3, 0):
- from sleekxmpp.xmlstream.filesocket import FileSocket, Socket26
-
-
-# The time in seconds to wait before timing out waiting for response stanzas.
-RESPONSE_TIMEOUT = 10
-
-# The number of threads to use to handle XML stream events. This is not the
-# same as the number of custom event handling threads. HANDLER_THREADS must
-# be at least 1.
-HANDLER_THREADS = 1
-
-# Flag indicating if the SSL library is available for use.
-SSL_SUPPORT = True
-
-
-log = logging.getLogger(__name__)
-
-
-class RestartStream(Exception):
- """
- Exception to restart stream processing, including
- resending the stream header.
- """
-
-
-class XMLStream(object):
- """
- An XML stream connection manager and event dispatcher.
-
- The XMLStream class abstracts away the issues of establishing a
- connection with a server and sending and receiving XML "stanzas".
- A stanza is a complete XML element that is a direct child of a root
- document element. Two streams are used, one for each communication
- direction, over the same socket. Once the connection is closed, both
- streams should be complete and valid XML documents.
-
- Three types of events are provided to manage the stream:
- Stream -- Triggered based on received stanzas, similar in concept
- to events in a SAX XML parser.
- Custom -- Triggered manually.
- Scheduled -- Triggered based on time delays.
-
- Typically, stanzas are first processed by a stream event handler which
- will then trigger custom events to continue further processing,
- especially since custom event handlers may run in individual threads.
-
-
- Attributes:
- address -- The hostname and port of the server.
- default_ns -- The default XML namespace that will be applied
- to all non-namespaced stanzas.
- event_queue -- A queue of stream, custom, and scheduled
- events to be processed.
- filesocket -- A filesocket created from the main connection socket.
- Required for ElementTree.iterparse.
- namespace_map -- Optional mapping of namespaces to namespace prefixes.
- scheduler -- A scheduler object for triggering events
- after a given period of time.
- send_queue -- A queue of stanzas to be sent on the stream.
- socket -- The connection to the server.
- ssl_support -- Indicates if a SSL library is available for use.
- ssl_version -- The version of the SSL protocol to use.
- Defaults to ssl.PROTOCOL_TLSv1.
- state -- A state machine for managing the stream's
- connection state.
- stream_footer -- The start tag and any attributes for the stream's
- root element.
- stream_header -- The closing tag of the stream's root element.
- use_ssl -- Flag indicating if SSL should be used.
- use_tls -- Flag indicating if TLS should be used.
- stop -- threading Event used to stop all threads.
- auto_reconnect-- Flag to determine whether we auto reconnect.
-
- Methods:
- add_event_handler -- Add a handler for a custom event.
- add_handler -- Shortcut method for registerHandler.
- connect -- Connect to the given server.
- del_event_handler -- Remove a handler for a custom event.
- disconnect -- Disconnect from the server and terminate
- processing.
- event -- Trigger a custom event.
- get_id -- Return the current stream ID.
- incoming_filter -- Optionally filter stanzas before processing.
- new_id -- Generate a new, unique ID value.
- process -- Read XML stanzas from the stream and apply
- matching stream handlers.
- reconnect -- Reestablish a connection to the server.
- register_handler -- Add a handler for a stream event.
- register_stanza -- Add a new stanza object type that may appear
- as a direct child of the stream's root.
- remove_handler -- Remove a stream handler.
- remove_stanza -- Remove a stanza object type.
- schedule -- Schedule an event handler to execute after a
- given delay.
- send -- Send a stanza object on the stream.
- send_raw -- Send a raw string on the stream.
- send_xml -- Send an XML string on the stream.
- set_socket -- Set the stream's socket and generate a new
- filesocket.
- start_stream_handler -- Perform any stream initialization such
- as handshakes.
- start_tls -- Establish a TLS connection and restart
- the stream.
- """
-
- def __init__(self, socket=None, host='', port=0):
- """
- Establish a new XML stream.
-
- Arguments:
- socket -- Use an existing socket for the stream.
- Defaults to None to generate a new socket.
- host -- The name of the target server.
- Defaults to the empty string.
- port -- The port to use for the connection.
- Defaults to 0.
- """
- # To comply with PEP8, method names now use underscores.
- # Deprecated method names are re-mapped for backwards compatibility.
- self.startTLS = self.start_tls
- self.registerStanza = self.register_stanza
- self.removeStanza = self.remove_stanza
- self.registerHandler = self.register_handler
- self.removeHandler = self.remove_handler
- self.setSocket = self.set_socket
- self.sendRaw = self.send_raw
- self.getId = self.get_id
- self.getNewId = self.new_id
- self.sendXML = self.send_xml
-
- self.ssl_support = SSL_SUPPORT
- self.ssl_version = ssl.PROTOCOL_TLSv1
-
- self.state = StateMachine(('disconnected', 'connected'))
- self.state._set_state('disconnected')
-
- self.address = (host, int(port))
- self.filesocket = None
- self.set_socket(socket)
-
- if sys.version_info < (3, 0):
- self.socket_class = Socket26
- else:
- self.socket_class = Socket.socket
-
- self.use_ssl = False
- self.use_tls = False
-
- self.default_ns = ''
- self.stream_header = "<stream>"
- self.stream_footer = "</stream>"
-
- self.stop = threading.Event()
- self.stream_end_event = threading.Event()
- self.stream_end_event.set()
- self.event_queue = queue.Queue()
- self.send_queue = queue.Queue()
- self.scheduler = Scheduler(self.event_queue, self.stop)
-
- self.namespace_map = {}
-
- self.__thread = {}
- self.__root_stanza = []
- self.__handlers = []
- self.__event_handlers = {}
- self.__event_handlers_lock = threading.Lock()
-
- self._id = 0
- self._id_lock = threading.Lock()
-
- self.auto_reconnect = True
- self.is_client = False
-
- try:
- if hasattr(signal, 'SIGHUP'):
- signal.signal(signal.SIGHUP, self._handle_kill)
- if hasattr(signal, 'SIGTERM'):
- # Used in Windows
- signal.signal(signal.SIGTERM, self._handle_kill)
- except:
- log.debug("Can not set interrupt signal handlers. " + \
- "SleekXMPP is not running from a main thread.")
-
- def _handle_kill(self, signum, frame):
- """
- Capture kill event and disconnect cleanly after first
- spawning the "killed" event.
- """
- self.event("killed", direct=True)
- self.disconnect()
-
- def new_id(self):
- """
- Generate and return a new stream ID in hexadecimal form.
-
- Many stanzas, handlers, or matchers may require unique
- ID values. Using this method ensures that all new ID values
- are unique in this stream.
- """
- with self._id_lock:
- self._id += 1
- return self.get_id()
-
- def get_id(self):
- """
- Return the current unique stream ID in hexadecimal form.
- """
- return "%X" % self._id
-
- def connect(self, host='', port=0, use_ssl=False,
- use_tls=True, reattempt=True):
- """
- Create a new socket and connect to the server.
-
- Setting reattempt to True will cause connection attempts to be made
- every second until a successful connection is established.
-
- Arguments:
- host -- The name of the desired server for the connection.
- port -- Port to connect to on the server.
- use_ssl -- Flag indicating if SSL should be used.
- use_tls -- Flag indicating if TLS should be used.
- reattempt -- Flag indicating if the socket should reconnect
- after disconnections.
- """
- if host and port:
- self.address = (host, int(port))
-
- self.is_client = True
- # Respect previous SSL and TLS usage directives.
- if use_ssl is not None:
- self.use_ssl = use_ssl
- if use_tls is not None:
- self.use_tls = use_tls
-
- # Repeatedly attempt to connect until a successful connection
- # is established.
- connected = self.state.transition('disconnected', 'connected',
- func=self._connect)
- while reattempt and not connected:
- connected = self.state.transition('disconnected', 'connected',
- func=self._connect)
- return connected
-
- def _connect(self):
- self.stop.clear()
- self.socket = self.socket_class(Socket.AF_INET, Socket.SOCK_STREAM)
- self.socket.settimeout(None)
- if self.use_ssl and self.ssl_support:
- log.debug("Socket Wrapped for SSL")
- ssl_socket = ssl.wrap_socket(self.socket)
- if hasattr(self.socket, 'socket'):
- # We are using a testing socket, so preserve the top
- # layer of wrapping.
- self.socket.socket = ssl_socket
- else:
- self.socket = ssl_socket
-
- try:
- log.debug("Connecting to %s:%s" % self.address)
- self.socket.connect(self.address)
- self.set_socket(self.socket, ignore=True)
- #this event is where you should set your application state
- self.event("connected", direct=True)
- return True
- except Socket.error as serr:
- error_msg = "Could not connect to %s:%s. Socket Error #%s: %s"
- log.error(error_msg % (self.address[0], self.address[1],
- serr.errno, serr.strerror))
- time.sleep(1)
- return False
-
- def disconnect(self, reconnect=False):
- """
- Terminate processing and close the XML streams.
-
- Optionally, the connection may be reconnected and
- resume processing afterwards.
-
- Arguments:
- reconnect -- Flag indicating if the connection
- and processing should be restarted.
- Defaults to False.
- """
- self.state.transition('connected', 'disconnected', wait=0.0,
- func=self._disconnect, args=(reconnect,))
-
- def _disconnect(self, reconnect=False):
- # Send the end of stream marker.
- self.send_raw(self.stream_footer)
- # Wait for confirmation that the stream was
- # closed in the other direction.
- if not reconnect:
- self.auto_reconnect = False
- self.stream_end_event.wait(4)
- if not self.auto_reconnect:
- self.stop.set()
- try:
- self.socket.close()
- self.filesocket.close()
- self.socket.shutdown(Socket.SHUT_RDWR)
- except Socket.error as serr:
- pass
- finally:
- #clear your application state
- self.event("disconnected", direct=True)
- return True
-
- def reconnect(self):
- """
- Reset the stream's state and reconnect to the server.
- """
- log.debug("reconnecting...")
- self.state.transition('connected', 'disconnected', wait=2.0,
- func=self._disconnect, args=(True,))
- log.debug("connecting...")
- return self.state.transition('disconnected', 'connected',
- wait=2.0, func=self._connect)
-
- def set_socket(self, socket, ignore=False):
- """
- Set the socket to use for the stream.
-
- The filesocket will be recreated as well.
-
- Arguments:
- socket -- The new socket to use.
- ignore -- don't set the state
- """
- self.socket = socket
- if socket is not None:
- # ElementTree.iterparse requires a file.
- # 0 buffer files have to be binary.
-
- # Use the correct fileobject type based on the Python
- # version to work around a broken implementation in
- # Python 2.x.
- if sys.version_info < (3, 0):
- self.filesocket = FileSocket(self.socket)
- else:
- self.filesocket = self.socket.makefile('rb', 0)
- if not ignore:
- self.state._set_state('connected')
-
- def start_tls(self):
- """
- Perform handshakes for TLS.
-
- If the handshake is successful, the XML stream will need
- to be restarted.
- """
- if self.ssl_support:
- log.info("Negotiating TLS")
- log.info("Using SSL version: %s" % str(self.ssl_version))
- ssl_socket = ssl.wrap_socket(self.socket,
- ssl_version=self.ssl_version,
- do_handshake_on_connect=False)
- if hasattr(self.socket, 'socket'):
- # We are using a testing socket, so preserve the top
- # layer of wrapping.
- self.socket.socket = ssl_socket
- else:
- self.socket = ssl_socket
- self.socket.do_handshake()
- self.set_socket(self.socket)
- return True
- else:
- log.warning("Tried to enable TLS, but ssl module not found.")
- return False
-
- def start_stream_handler(self, xml):
- """
- Perform any initialization actions, such as handshakes, once the
- stream header has been sent.
-
- Meant to be overridden.
- """
- pass
-
- def register_stanza(self, stanza_class):
- """
- Add a stanza object class as a known root stanza. A root stanza is
- one that appears as a direct child of the stream's root element.
-
- Stanzas that appear as substanzas of a root stanza do not need to
- be registered here. That is done using register_stanza_plugin() from
- sleekxmpp.xmlstream.stanzabase.
-
- Stanzas that are not registered will not be converted into
- stanza objects, but may still be processed using handlers and
- matchers.
-
- Arguments:
- stanza_class -- The top-level stanza object's class.
- """
- self.__root_stanza.append(stanza_class)
-
- def remove_stanza(self, stanza_class):
- """
- Remove a stanza from being a known root stanza. A root stanza is
- one that appears as a direct child of the stream's root element.
-
- Stanzas that are not registered will not be converted into
- stanza objects, but may still be processed using handlers and
- matchers.
- """
- del self.__root_stanza[stanza_class]
-
- def add_handler(self, mask, pointer, name=None, disposable=False,
- threaded=False, filter=False, instream=False):
- """
- A shortcut method for registering a handler using XML masks.
-
- Arguments:
- mask -- An XML snippet matching the structure of the
- stanzas that will be passed to this handler.
- pointer -- The handler function itself.
- name -- A unique name for the handler. A name will
- be generated if one is not provided.
- disposable -- Indicates if the handler should be discarded
- after one use.
- threaded -- Deprecated. Remains for backwards compatibility.
- filter -- Deprecated. Remains for backwards compatibility.
- instream -- Indicates if the handler should execute during
- stream processing and not during normal event
- processing.
- """
- # To prevent circular dependencies, we must load the matcher
- # and handler classes here.
- from sleekxmpp.xmlstream.matcher import MatchXMLMask
- from sleekxmpp.xmlstream.handler import XMLCallback
-
- if name is None:
- name = 'add_handler_%s' % self.getNewId()
- self.registerHandler(XMLCallback(name, MatchXMLMask(mask), pointer,
- once=disposable, instream=instream))
-
- def register_handler(self, handler, before=None, after=None):
- """
- Add a stream event handler that will be executed when a matching
- stanza is received.
-
- Arguments:
- handler -- The handler object to execute.
- """
- if handler.stream is None:
- self.__handlers.append(handler)
- handler.stream = self
-
- def remove_handler(self, name):
- """
- Remove any stream event handlers with the given name.
-
- Arguments:
- name -- The name of the handler.
- """
- idx = 0
- for handler in self.__handlers:
- if handler.name == name:
- self.__handlers.pop(idx)
- return True
- idx += 1
- return False
-
- def add_event_handler(self, name, pointer,
- threaded=False, disposable=False):
- """
- Add a custom event handler that will be executed whenever
- its event is manually triggered.
-
- Arguments:
- name -- The name of the event that will trigger
- this handler.
- pointer -- The function to execute.
- threaded -- If set to True, the handler will execute
- in its own thread. Defaults to False.
- disposable -- If set to True, the handler will be
- discarded after one use. Defaults to False.
- """
- if not name in self.__event_handlers:
- self.__event_handlers[name] = []
- self.__event_handlers[name].append((pointer, threaded, disposable))
-
- def del_event_handler(self, name, pointer):
- """
- Remove a function as a handler for an event.
-
- Arguments:
- name -- The name of the event.
- pointer -- The function to remove as a handler.
- """
- if not name in self.__event_handlers:
- return
-
- # Need to keep handlers that do not use
- # the given function pointer
- def filter_pointers(handler):
- return handler[0] != pointer
-
- self.__event_handlers[name] = filter(filter_pointers,
- self.__event_handlers[name])
-
- def event_handled(self, name):
- """
- Indicates if an event has any associated handlers.
-
- Returns the number of registered handlers.
-
- Arguments:
- name -- The name of the event to check.
- """
- return len(self.__event_handlers.get(name, []))
-
- def event(self, name, data={}, direct=False):
- """
- Manually trigger a custom event.
-
- Arguments:
- name -- The name of the event to trigger.
- data -- Data that will be passed to each event handler.
- Defaults to an empty dictionary.
- direct -- Runs the event directly if True, skipping the
- event queue. All event handlers will run in the
- same thread.
- """
- for handler in self.__event_handlers.get(name, []):
- if direct:
- try:
- handler[0](copy.copy(data))
- except Exception as e:
- error_msg = 'Error processing event handler: %s'
- log.exception(error_msg % str(handler[0]))
- if hasattr(data, 'exception'):
- data.exception(e)
- else:
- self.event_queue.put(('event', handler, copy.copy(data)))
-
- if handler[2]:
- # If the handler is disposable, we will go ahead and
- # remove it now instead of waiting for it to be
- # processed in the queue.
- with self.__event_handlers_lock:
- try:
- h_index = self.__event_handlers[name].index(handler)
- self.__event_handlers[name].pop(h_index)
- except:
- pass
-
- def schedule(self, name, seconds, callback, args=None,
- kwargs=None, repeat=False):
- """
- Schedule a callback function to execute after a given delay.
-
- Arguments:
- name -- A unique name for the scheduled callback.
- seconds -- The time in seconds to wait before executing.
- callback -- A pointer to the function to execute.
- args -- A tuple of arguments to pass to the function.
- kwargs -- A dictionary of keyword arguments to pass to
- the function.
- repeat -- Flag indicating if the scheduled event should
- be reset and repeat after executing.
- """
- self.scheduler.add(name, seconds, callback, args, kwargs,
- repeat, qpointer=self.event_queue)
-
- def incoming_filter(self, xml):
- """
- Filter incoming XML objects before they are processed.
-
- Possible uses include remapping namespaces, or correcting elements
- from sources with incorrect behavior.
-
- Meant to be overridden.
- """
- return xml
-
- def send(self, data, mask=None, timeout=RESPONSE_TIMEOUT):
- """
- A wrapper for send_raw for sending stanza objects.
-
- May optionally block until an expected response is received.
-
- Arguments:
- data -- The stanza object to send on the stream.
- mask -- Deprecated. An XML snippet matching the structure
- of the expected response. Execution will block
- in this thread until the response is received
- or a timeout occurs.
- timeout -- Time in seconds to wait for a response before
- continuing. Defaults to RESPONSE_TIMEOUT.
- """
- if hasattr(mask, 'xml'):
- mask = mask.xml
- data = str(data)
- if mask is not None:
- log.warning("Use of send mask waiters is deprecated.")
- wait_for = Waiter("SendWait_%s" % self.new_id(),
- MatchXMLMask(mask))
- self.register_handler(wait_for)
- self.send_raw(data)
- if mask is not None:
- return wait_for.wait(timeout)
-
- def send_raw(self, data):
- """
- Send raw data across the stream.
-
- Arguments:
- data -- Any string value.
- """
- self.send_queue.put(data)
- return True
-
- def send_xml(self, data, mask=None, timeout=RESPONSE_TIMEOUT):
- """
- Send an XML object on the stream, and optionally wait
- for a response.
-
- Arguments:
- data -- The XML object to send on the stream.
- mask -- Deprecated. An XML snippet matching the structure
- of the expected response. Execution will block
- in this thread until the response is received
- or a timeout occurs.
- timeout -- Time in seconds to wait for a response before
- continuing. Defaults to RESPONSE_TIMEOUT.
- """
- return self.send(tostring(data), mask, timeout)
-
- def process(self, threaded=True):
- """
- Initialize the XML streams and begin processing events.
-
- The number of threads used for processing stream events is determined
- by HANDLER_THREADS.
-
- Arguments:
- threaded -- If threaded=True then event dispatcher will run
- in a separate thread, allowing for the stream to be
- used in the background for another application.
- Defaults to True.
-
- Event handlers and the send queue will be threaded
- regardless of this parameter's value.
- """
- self.scheduler.process(threaded=True)
-
- def start_thread(name, target):
- self.__thread[name] = threading.Thread(name=name, target=target)
- self.__thread[name].start()
-
- for t in range(0, HANDLER_THREADS):
- log.debug("Starting HANDLER THREAD")
- start_thread('stream_event_handler_%s' % t, self._event_runner)
-
- start_thread('send_thread', self._send_thread)
-
- if threaded:
- # Run the XML stream in the background for another application.
- start_thread('process', self._process)
- else:
- self._process()
-
- def _process(self):
- """
- Start processing the XML streams.
-
- Processing will continue after any recoverable errors
- if reconnections are allowed.
- """
- firstrun = True
-
- # The body of this loop will only execute once per connection.
- # Additional passes will be made only if an error occurs and
- # reconnecting is permitted.
- while firstrun or (self.auto_reconnect and not self.stop.isSet()):
- firstrun = False
- try:
- if self.is_client:
- self.send_raw(self.stream_header)
- # The call to self.__read_xml will block and prevent
- # the body of the loop from running until a disconnect
- # occurs. After any reconnection, the stream header will
- # be resent and processing will resume.
- while not self.stop.isSet() and self.__read_xml():
- # Ensure the stream header is sent for any
- # new connections.
- if self.is_client:
- self.send_raw(self.stream_header)
- except KeyboardInterrupt:
- log.debug("Keyboard Escape Detected in _process")
- self.stop.set()
- except SystemExit:
- log.debug("SystemExit in _process")
- self.stop.set()
- except Socket.error:
- log.exception('Socket Error')
- except:
- if not self.stop.isSet():
- log.exception('Connection error.')
- if not self.stop.isSet() and self.auto_reconnect:
- self.reconnect()
- else:
- self.disconnect()
- self.event_queue.put(('quit', None, None))
- self.scheduler.run = False
-
- def __read_xml(self):
- """
- Parse the incoming XML stream, raising stream events for
- each received stanza.
- """
- depth = 0
- root = None
- for (event, xml) in ET.iterparse(self.filesocket, (b'end', b'start')):
- if event == b'start':
- if depth == 0:
- # We have received the start of the root element.
- root = xml
- # Perform any stream initialization actions, such
- # as handshakes.
- self.stream_end_event.clear()
- self.start_stream_handler(root)
- depth += 1
- if event == b'end':
- depth -= 1
- if depth == 0:
- # The stream's root element has closed,
- # terminating the stream.
- log.debug("End of stream recieved")
- self.stream_end_event.set()
- return False
- elif depth == 1:
- # We only raise events for stanzas that are direct
- # children of the root element.
- try:
- self.__spawn_event(xml)
- except RestartStream:
- return True
- if root:
- # Keep the root element empty of children to
- # save on memory use.
- root.clear()
- log.debug("Ending read XML loop")
-
- def _build_stanza(self, xml, default_ns=None):
- """
- Create a stanza object from a given XML object.
-
- If a specialized stanza type is not found for the XML, then
- a generic StanzaBase stanza will be returned.
-
- Arguments:
- xml -- The XML object to convert into a stanza object.
- default_ns -- Optional default namespace to use instead of the
- stream's current default namespace.
- """
- if default_ns is None:
- default_ns = self.default_ns
- stanza_type = StanzaBase
- for stanza_class in self.__root_stanza:
- if xml.tag == "{%s}%s" % (default_ns, stanza_class.name):
- stanza_type = stanza_class
- break
- stanza = stanza_type(self, xml)
- return stanza
-
- def __spawn_event(self, xml):
- """
- Analyze incoming XML stanzas and convert them into stanza
- objects if applicable and queue stream events to be processed
- by matching handlers.
-
- Arguments:
- xml -- The XML stanza to analyze.
- """
- log.debug("RECV: %s" % tostring(xml,
- xmlns=self.default_ns,
- stream=self))
- # Apply any preprocessing filters.
- xml = self.incoming_filter(xml)
-
- # Convert the raw XML object into a stanza object. If no registered
- # stanza type applies, a generic StanzaBase stanza will be used.
- stanza_type = StanzaBase
- for stanza_class in self.__root_stanza:
- if xml.tag == "{%s}%s" % (self.default_ns, stanza_class.name):
- stanza_type = stanza_class
- break
- stanza = stanza_type(self, xml)
-
- # Match the stanza against registered handlers. Handlers marked
- # to run "in stream" will be executed immediately; the rest will
- # be queued.
- unhandled = True
- for handler in self.__handlers:
- if handler.match(stanza):
- stanza_copy = stanza_type(self, copy.deepcopy(xml))
- handler.prerun(stanza_copy)
- self.event_queue.put(('stanza', handler, stanza_copy))
- try:
- if handler.check_delete():
- self.__handlers.pop(self.__handlers.index(handler))
- except:
- pass # not thread safe
- unhandled = False
-
- # Some stanzas require responses, such as Iq queries. A default
- # handler will be executed immediately for this case.
- if unhandled:
- stanza.unhandled()
-
- def _threaded_event_wrapper(self, func, args):
- """
- Capture exceptions for event handlers that run
- in individual threads.
-
- Arguments:
- func -- The event handler to execute.
- args -- Arguments to the event handler.
- """
- try:
- func(*args)
- except Exception as e:
- error_msg = 'Error processing event handler: %s'
- log.exception(error_msg % str(func))
- if hasattr(args[0], 'exception'):
- args[0].exception(e)
-
- def _event_runner(self):
- """
- Process the event queue and execute handlers.
-
- The number of event runner threads is controlled by HANDLER_THREADS.
-
- Stream event handlers will all execute in this thread. Custom event
- handlers may be spawned in individual threads.
- """
- log.debug("Loading event runner")
- try:
- while not self.stop.isSet():
- try:
- event = self.event_queue.get(True, timeout=5)
- except queue.Empty:
- event = None
- if event is None:
- continue
-
- etype, handler = event[0:2]
- args = event[2:]
-
- if etype == 'stanza':
- try:
- handler.run(args[0])
- except Exception as e:
- error_msg = 'Error processing stream handler: %s'
- log.exception(error_msg % handler.name)
- args[0].exception(e)
- elif etype == 'schedule':
- try:
- log.debug(args)
- handler(*args[0])
- except:
- log.exception('Error processing scheduled task')
- elif etype == 'event':
- func, threaded, disposable = handler
- try:
- if threaded:
- x = threading.Thread(
- name="Event_%s" % str(func),
- target=self._threaded_event_wrapper,
- args=(func, args))
- x.start()
- else:
- func(*args)
- except Exception as e:
- error_msg = 'Error processing event handler: %s'
- log.exception(error_msg % str(func))
- if hasattr(args[0], 'exception'):
- args[0].exception(e)
- elif etype == 'quit':
- log.debug("Quitting event runner thread")
- return False
- except KeyboardInterrupt:
- log.debug("Keyboard Escape Detected in _event_runner")
- self.disconnect()
- return
- except SystemExit:
- self.disconnect()
- self.event_queue.put(('quit', None, None))
- return
-
- def _send_thread(self):
- """
- Extract stanzas from the send queue and send them on the stream.
- """
- try:
- while not self.stop.isSet():
- try:
- data = self.send_queue.get(True, 1)
- except queue.Empty:
- continue
- log.debug("SEND: %s" % data)
- try:
- self.socket.send(data.encode('utf-8'))
- except:
- log.warning("Failed to send %s" % data)
- self.disconnect(self.auto_reconnect)
- except KeyboardInterrupt:
- log.debug("Keyboard Escape Detected in _send_thread")
- self.disconnect()
- return
- except SystemExit:
- self.disconnect()
- self.event_queue.put(('quit', None, None))
- return