summaryrefslogtreecommitdiffstats
path: root/xlogger.py
diff options
context:
space:
mode:
Diffstat (limited to 'xlogger.py')
-rw-r--r--xlogger.py137
1 files changed, 137 insertions, 0 deletions
diff --git a/xlogger.py b/xlogger.py
new file mode 100644
index 0000000..64f6237
--- /dev/null
+++ b/xlogger.py
@@ -0,0 +1,137 @@
+from threading import Thread
+
+from Xlib import X, XK, display
+from Xlib.ext import record
+from Xlib.error import BadWindow
+from Xlib.protocol import rq
+
+from models import KeyEvent, ClickEvent, WindowEvent, MODIFIERS
+
+XK.load_keysym_group("latin2")
+XK.load_keysym_group("xkb")
+XK.load_keysym_group("xf86")
+
+
+def state_to_idx(state):
+ s = 0
+ if state & X.Mod5Mask:
+ s += 4
+ if state & X.ShiftMask:
+ s += 1
+ return s
+
+
+class XLogger(Thread):
+
+ def __init__(self, logger):
+ Thread.__init__(self)
+ self.keysymdict = {getattr(XK, name): name[3:] for name in dir(XK)
+ if name.startswith("XK_")}
+ self.display = display.Display()
+ self.record_dpy = display.Display()
+ self.keymap = self.display._keymap_codes
+ self.logger = logger
+ self.last_id = -1
+ self.root = self.display.screen().root
+
+ def run(self):
+ if not self.record_dpy.has_extension("RECORD"):
+ print "RECORD extension not found"
+ return
+ else:
+ print "RECORD extension present"
+
+ self.ctx = self.record_dpy.record_create_context(
+ 0,
+ [record.AllClients],
+ [{
+ 'core_requests': (0, 0),
+ 'core_replies': (0, 0),
+ 'ext_requests': (0, 0, 0, 0),
+ 'ext_replies': (0, 0, 0, 0),
+ 'delivered_events': (X.FocusIn, X.FocusIn),
+ 'device_events': (X.KeyPress, X.ButtonPress),
+ 'errors': (0, 0),
+ 'client_started': False,
+ 'client_died': False,
+ }])
+
+ self.record_dpy.record_enable_context(self.ctx, self.process)
+ self.record_dpy.record_free_context(self.ctx)
+
+ def stop(self):
+ self.display.record_disable_context(self.ctx)
+ self.display.flush()
+
+ def process(self, reply):
+ if reply.category != record.FromServer:
+ return
+ if reply.client_swapped:
+ print "* received swapped protocol data, cowardly ignored"
+ return
+ if not len(reply.data) or ord(reply.data[0]) < 2:
+ return
+
+ data = reply.data
+ while len(data):
+ ef = rq.EventField(None)
+ event, data = ef.parse_binary_value(data, self.record_dpy.display,
+ None, None)
+ self.log_event(event)
+
+ def log_event(self, event):
+ if event.type == X.FocusIn:
+ p_event = self.window_event(event)
+ if p_event:
+ self.logger.info(p_event)
+ if event.type == X.KeyPress:
+ self.logger.info(self.key_event(event))
+ elif event.type == X.ButtonPress:
+ self.logger.info(self.button_event(event))
+
+ def get_key_name(self, keycode, state):
+ state_idx = state_to_idx(state)
+ cn = self.keymap[keycode][state_idx]
+ if cn < 256:
+ return chr(cn).decode('latin1')
+ else:
+ return self.lookup_keysym(cn)
+
+ def key_event(self, event):
+ modifiers = [v for key, v in MODIFIERS.iteritems()
+ if key & event.state]
+ return KeyEvent(key=event.detail, modifiers=modifiers,
+ key_name=self.get_key_name(event.detail, event.state),
+ repeat=event.sequence_number == 1)
+
+ def window_event(self, event):
+ window = self.display.create_resource_object("window",
+ event.window.id)
+ try:
+ class_name = window.get_wm_class()
+ window_name = window.get_wm_name()
+ except BadWindow:
+ return None
+ if not class_name and not window_name:
+ if window.id != self.root.id:
+ window = window.query_tree().parent
+ class_name = window.get_wm_class() or ('', '')
+ window_name = window.get_wm_name() or ''
+ else:
+ class_name = ('root', 'Root')
+ window_name = "root"
+ if window.id != self.last_id:
+ self.last_id = window.id
+ window_name = window_name.decode('latin1')
+ class1, class2 = map(lambda x: x.decode('latin1'), class_name)
+ return WindowEvent(window_id=window.id, name=window_name,
+ class1=class1, class2=class2)
+
+ def button_event(self, event):
+ return ClickEvent(button=event.detail, x=event.root_x, y=event.root_y)
+
+ def lookup_keysym(self, keysym):
+ if keysym in self.keysymdict:
+ return self.keysymdict[keysym]
+ else:
+ return "[%d]" % keysym