From d1a77fac18e25df1093172f15fb8925c4545a7b5 Mon Sep 17 00:00:00 2001 From: Thibaut Horel Date: Sun, 6 Jul 2014 17:56:02 -0400 Subject: Initial commit, X events tracking --- xlogger.py | 137 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 xlogger.py (limited to 'xlogger.py') diff --git a/xlogger.py b/xlogger.py new file mode 100644 index 0000000..64f6237 --- /dev/null +++ b/xlogger.py @@ -0,0 +1,137 @@ +from threading import Thread + +from Xlib import X, XK, display +from Xlib.ext import record +from Xlib.error import BadWindow +from Xlib.protocol import rq + +from models import KeyEvent, ClickEvent, WindowEvent, MODIFIERS + +XK.load_keysym_group("latin2") +XK.load_keysym_group("xkb") +XK.load_keysym_group("xf86") + + +def state_to_idx(state): + s = 0 + if state & X.Mod5Mask: + s += 4 + if state & X.ShiftMask: + s += 1 + return s + + +class XLogger(Thread): + + def __init__(self, logger): + Thread.__init__(self) + self.keysymdict = {getattr(XK, name): name[3:] for name in dir(XK) + if name.startswith("XK_")} + self.display = display.Display() + self.record_dpy = display.Display() + self.keymap = self.display._keymap_codes + self.logger = logger + self.last_id = -1 + self.root = self.display.screen().root + + def run(self): + if not self.record_dpy.has_extension("RECORD"): + print "RECORD extension not found" + return + else: + print "RECORD extension present" + + self.ctx = self.record_dpy.record_create_context( + 0, + [record.AllClients], + [{ + 'core_requests': (0, 0), + 'core_replies': (0, 0), + 'ext_requests': (0, 0, 0, 0), + 'ext_replies': (0, 0, 0, 0), + 'delivered_events': (X.FocusIn, X.FocusIn), + 'device_events': (X.KeyPress, X.ButtonPress), + 'errors': (0, 0), + 'client_started': False, + 'client_died': False, + }]) + + self.record_dpy.record_enable_context(self.ctx, self.process) + self.record_dpy.record_free_context(self.ctx) + + def stop(self): + self.display.record_disable_context(self.ctx) + self.display.flush() + + def process(self, reply): + if reply.category != record.FromServer: + return + if reply.client_swapped: + print "* received swapped protocol data, cowardly ignored" + return + if not len(reply.data) or ord(reply.data[0]) < 2: + return + + data = reply.data + while len(data): + ef = rq.EventField(None) + event, data = ef.parse_binary_value(data, self.record_dpy.display, + None, None) + self.log_event(event) + + def log_event(self, event): + if event.type == X.FocusIn: + p_event = self.window_event(event) + if p_event: + self.logger.info(p_event) + if event.type == X.KeyPress: + self.logger.info(self.key_event(event)) + elif event.type == X.ButtonPress: + self.logger.info(self.button_event(event)) + + def get_key_name(self, keycode, state): + state_idx = state_to_idx(state) + cn = self.keymap[keycode][state_idx] + if cn < 256: + return chr(cn).decode('latin1') + else: + return self.lookup_keysym(cn) + + def key_event(self, event): + modifiers = [v for key, v in MODIFIERS.iteritems() + if key & event.state] + return KeyEvent(key=event.detail, modifiers=modifiers, + key_name=self.get_key_name(event.detail, event.state), + repeat=event.sequence_number == 1) + + def window_event(self, event): + window = self.display.create_resource_object("window", + event.window.id) + try: + class_name = window.get_wm_class() + window_name = window.get_wm_name() + except BadWindow: + return None + if not class_name and not window_name: + if window.id != self.root.id: + window = window.query_tree().parent + class_name = window.get_wm_class() or ('', '') + window_name = window.get_wm_name() or '' + else: + class_name = ('root', 'Root') + window_name = "root" + if window.id != self.last_id: + self.last_id = window.id + window_name = window_name.decode('latin1') + class1, class2 = map(lambda x: x.decode('latin1'), class_name) + return WindowEvent(window_id=window.id, name=window_name, + class1=class1, class2=class2) + + def button_event(self, event): + return ClickEvent(button=event.detail, x=event.root_x, y=event.root_y) + + def lookup_keysym(self, keysym): + if keysym in self.keysymdict: + return self.keysymdict[keysym] + else: + return "[%d]" % keysym -- cgit v1.2.3-70-g09d2