1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
from threading import Thread
from Xlib import X, XK, display
from Xlib.ext import record
from Xlib.error import BadWindow
from Xlib.protocol import rq
from models import KeyEvent, ClickEvent, WindowEvent, MODIFIERS
XK.load_keysym_group("latin2")
XK.load_keysym_group("xkb")
XK.load_keysym_group("xf86")
def state_to_idx(state):
s = 0
if state & X.Mod5Mask:
s += 4
if state & X.ShiftMask:
s += 1
return s
class XLogger(Thread):
def __init__(self, logger):
Thread.__init__(self)
self.keysymdict = {getattr(XK, name): name[3:] for name in dir(XK)
if name.startswith("XK_")}
self.display = display.Display()
self.record_dpy = display.Display()
self.keymap = self.display._keymap_codes
self.logger = logger
self.last_id = -1
self.root = self.display.screen().root
def run(self):
if not self.record_dpy.has_extension("RECORD"):
print "RECORD extension not found"
return
else:
print "RECORD extension present"
self.ctx = self.record_dpy.record_create_context(
0,
[record.AllClients],
[{
'core_requests': (0, 0),
'core_replies': (0, 0),
'ext_requests': (0, 0, 0, 0),
'ext_replies': (0, 0, 0, 0),
'delivered_events': (X.FocusIn, X.FocusIn),
'device_events': (X.KeyPress, X.ButtonPress),
'errors': (0, 0),
'client_started': False,
'client_died': False,
}])
self.record_dpy.record_enable_context(self.ctx, self.process)
self.record_dpy.record_free_context(self.ctx)
def stop(self):
self.display.record_disable_context(self.ctx)
self.display.flush()
def process(self, reply):
if reply.category != record.FromServer:
return
if reply.client_swapped:
print "* received swapped protocol data, cowardly ignored"
return
if not len(reply.data) or ord(reply.data[0]) < 2:
return
data = reply.data
while len(data):
ef = rq.EventField(None)
event, data = ef.parse_binary_value(data, self.record_dpy.display,
None, None)
self.log_event(event)
def log_event(self, event):
if event.type == X.FocusIn:
p_event = self.window_event(event)
if p_event:
self.logger.info(p_event)
if event.type == X.KeyPress:
self.logger.info(self.key_event(event))
elif event.type == X.ButtonPress:
self.logger.info(self.button_event(event))
def get_key_name(self, keycode, state):
state_idx = state_to_idx(state)
cn = self.keymap[keycode][state_idx]
if cn < 256:
return chr(cn).decode('latin1')
else:
return self.lookup_keysym(cn)
def key_event(self, event):
modifiers = [v for key, v in MODIFIERS.iteritems()
if key & event.state]
return KeyEvent(key=event.detail, modifiers=modifiers,
key_name=self.get_key_name(event.detail, event.state),
repeat=event.sequence_number == 1)
def window_event(self, event):
window = self.display.create_resource_object("window",
event.window.id)
try:
class_name = window.get_wm_class()
window_name = window.get_wm_name()
except BadWindow:
return None
if not class_name and not window_name:
if window.id != self.root.id:
window = window.query_tree().parent
class_name = window.get_wm_class() or ('', '')
window_name = window.get_wm_name() or ''
else:
class_name = ('root', 'Root')
window_name = "root"
if window.id != self.last_id:
self.last_id = window.id
window_name = window_name.decode('latin1')
class1, class2 = map(lambda x: x.decode('latin1'), class_name)
return WindowEvent(window_id=window.id, name=window_name,
class1=class1, class2=class2)
def button_event(self, event):
return ClickEvent(button=event.detail, x=event.root_x, y=event.root_y)
def lookup_keysym(self, keysym):
if keysym in self.keysymdict:
return self.keysymdict[keysym]
else:
return "[%d]" % keysym
|