summaryrefslogtreecommitdiffstats
path: root/xlogger.py
blob: 64f6237e556e6e071b953314b406eb02163c9609 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from threading import Thread

from Xlib import X, XK, display
from Xlib.ext import record
from Xlib.error import BadWindow
from Xlib.protocol import rq

from models import KeyEvent, ClickEvent, WindowEvent, MODIFIERS

XK.load_keysym_group("latin2")
XK.load_keysym_group("xkb")
XK.load_keysym_group("xf86")


def state_to_idx(state):
    s = 0
    if state & X.Mod5Mask:
        s += 4
    if state & X.ShiftMask:
        s += 1
    return s


class XLogger(Thread):

    def __init__(self, logger):
        Thread.__init__(self)
        self.keysymdict = {getattr(XK, name): name[3:] for name in dir(XK)
                           if name.startswith("XK_")}
        self.display = display.Display()
        self.record_dpy = display.Display()
        self.keymap = self.display._keymap_codes
        self.logger = logger
        self.last_id = -1
        self.root = self.display.screen().root

    def run(self):
        if not self.record_dpy.has_extension("RECORD"):
            print "RECORD extension not found"
            return
        else:
            print "RECORD extension present"

        self.ctx = self.record_dpy.record_create_context(
            0,
            [record.AllClients],
            [{
                'core_requests': (0, 0),
                'core_replies': (0, 0),
                'ext_requests': (0, 0, 0, 0),
                'ext_replies': (0, 0, 0, 0),
                'delivered_events': (X.FocusIn, X.FocusIn),
                'device_events': (X.KeyPress, X.ButtonPress),
                'errors': (0, 0),
                'client_started': False,
                'client_died': False,
            }])

        self.record_dpy.record_enable_context(self.ctx, self.process)
        self.record_dpy.record_free_context(self.ctx)

    def stop(self):
        self.display.record_disable_context(self.ctx)
        self.display.flush()

    def process(self, reply):
        if reply.category != record.FromServer:
            return
        if reply.client_swapped:
            print "* received swapped protocol data, cowardly ignored"
            return
        if not len(reply.data) or ord(reply.data[0]) < 2:
            return

        data = reply.data
        while len(data):
            ef = rq.EventField(None)
            event, data = ef.parse_binary_value(data, self.record_dpy.display,
                                                None, None)
            self.log_event(event)

    def log_event(self, event):
        if event.type == X.FocusIn:
            p_event = self.window_event(event)
            if p_event:
                self.logger.info(p_event)
        if event.type == X.KeyPress:
            self.logger.info(self.key_event(event))
        elif event.type == X.ButtonPress:
            self.logger.info(self.button_event(event))

    def get_key_name(self, keycode, state):
        state_idx = state_to_idx(state)
        cn = self.keymap[keycode][state_idx]
        if cn < 256:
            return chr(cn).decode('latin1')
        else:
            return self.lookup_keysym(cn)

    def key_event(self, event):
        modifiers = [v for key, v in MODIFIERS.iteritems()
                     if key & event.state]
        return KeyEvent(key=event.detail, modifiers=modifiers,
                        key_name=self.get_key_name(event.detail, event.state),
                        repeat=event.sequence_number == 1)

    def window_event(self, event):
        window = self.display.create_resource_object("window",
                                                     event.window.id)
        try:
            class_name = window.get_wm_class()
            window_name = window.get_wm_name()
        except BadWindow:
            return None
        if not class_name and not window_name:
            if window.id != self.root.id:
                window = window.query_tree().parent
                class_name = window.get_wm_class() or ('', '')
                window_name = window.get_wm_name() or ''
            else:
                class_name = ('root', 'Root')
                window_name = "root"
        if window.id != self.last_id:
            self.last_id = window.id
            window_name = window_name.decode('latin1')
            class1, class2 = map(lambda x: x.decode('latin1'), class_name)
            return WindowEvent(window_id=window.id, name=window_name,
                               class1=class1, class2=class2)

    def button_event(self, event):
        return ClickEvent(button=event.detail, x=event.root_x, y=event.root_y)

    def lookup_keysym(self, keysym):
        if keysym in self.keysymdict:
            return self.keysymdict[keysym]
        else:
            return "[%d]" % keysym