from threading import Thread from Xlib import X, XK, display from Xlib.ext import record from Xlib.error import BadWindow from Xlib.protocol import rq from models import KeyEvent, ClickEvent, WindowEvent, MODIFIERS XK.load_keysym_group("latin2") XK.load_keysym_group("xkb") XK.load_keysym_group("xf86") def state_to_idx(state): s = 0 if state & X.Mod5Mask: s += 4 if state & X.ShiftMask: s += 1 return s class XLogger(Thread): def __init__(self, logger): Thread.__init__(self) self.keysymdict = {getattr(XK, name): name[3:] for name in dir(XK) if name.startswith("XK_")} self.display = display.Display() self.record_dpy = display.Display() self.keymap = self.display._keymap_codes self.logger = logger self.last_id = -1 self.root = self.display.screen().root def run(self): if not self.record_dpy.has_extension("RECORD"): print "RECORD extension not found" return else: print "RECORD extension present" self.ctx = self.record_dpy.record_create_context( 0, [record.AllClients], [{ 'core_requests': (0, 0), 'core_replies': (0, 0), 'ext_requests': (0, 0, 0, 0), 'ext_replies': (0, 0, 0, 0), 'delivered_events': (X.FocusIn, X.FocusIn), 'device_events': (X.KeyPress, X.ButtonPress), 'errors': (0, 0), 'client_started': False, 'client_died': False, }]) self.record_dpy.record_enable_context(self.ctx, self.process) self.record_dpy.record_free_context(self.ctx) def stop(self): self.display.record_disable_context(self.ctx) self.display.flush() def process(self, reply): if reply.category != record.FromServer: return if reply.client_swapped: print "* received swapped protocol data, cowardly ignored" return if not len(reply.data) or ord(reply.data[0]) < 2: return data = reply.data while len(data): ef = rq.EventField(None) event, data = ef.parse_binary_value(data, self.record_dpy.display, None, None) self.log_event(event) def log_event(self, event): if event.type == X.FocusIn: p_event = self.window_event(event) if p_event: self.logger.info(p_event) if event.type == X.KeyPress: self.logger.info(self.key_event(event)) elif event.type == X.ButtonPress: self.logger.info(self.button_event(event)) def get_key_name(self, keycode, state): state_idx = state_to_idx(state) cn = self.keymap[keycode][state_idx] if cn < 256: return chr(cn).decode('latin1') else: return self.lookup_keysym(cn) def key_event(self, event): modifiers = [v for key, v in MODIFIERS.iteritems() if key & event.state] return KeyEvent(key=event.detail, modifiers=modifiers, key_name=self.get_key_name(event.detail, event.state), repeat=event.sequence_number == 1) def window_event(self, event): window = self.display.create_resource_object("window", event.window.id) try: class_name = window.get_wm_class() window_name = window.get_wm_name() except BadWindow: return None if not class_name and not window_name: if window.id != self.root.id: window = window.query_tree().parent class_name = window.get_wm_class() or ('', '') window_name = window.get_wm_name() or '' else: class_name = ('root', 'Root') window_name = "root" if window.id != self.last_id: self.last_id = window.id window_name = window_name.decode('latin1') class1, class2 = map(lambda x: x.decode('latin1'), class_name) return WindowEvent(window_id=window.id, name=window_name, class1=class1, class2=class2) def button_event(self, event): return ClickEvent(button=event.detail, x=event.root_x, y=event.root_y) def lookup_keysym(self, keysym): if keysym in self.keysymdict: return self.keysymdict[keysym] else: return "[%d]" % keysym