summaryrefslogtreecommitdiffstats
path: root/manager.py
blob: 2221512a1ef85492d66b8d73f911454af540f5c9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from subprocess import Popen
from os.path import join
from time import sleep
from glob import glob
from collections import deque

SSH_CONFIG = "ssh_config"
to_do = set(glob("*.txt")) - set(["alive.txt", "keys.txt", "nodes.txt"])
done = set(f[:-4] for f in glob("*.txt.out"))
to_do = deque(to_do - done)
apis = deque(line.strip() for line in open("keys.txt"))
servers = deque(line.strip() for line in open("alive.txt"))
processing = []

while to_do or processing:
    to_do.extend(task["file"] for task in processing
                 if task["status"] == "collect")
    processing = [task for task in processing if task["status"].endswith("ing")]

    if servers and to_do:
        f = to_do.pop()
        host = servers.popleft()
        api = apis.pop()
        apis.appendleft(api)
        task = {"host": host, "file": f, "status": "uploading", "api": api,
                "process": Popen(["scp", "-F", SSH_CONFIG, f,
                                  host + ":"])}
        processing.append(task)

    for task in processing:
        rc = task["process"].poll()
        if rc is None:
            continue
        elif rc != 0:
            servers.append(task["host"])
            task["status"] = "collect"
            continue

        if task["status"] == "uploading":
            task["status"] = "processing"
            task["process"] = Popen(["ssh", "-F", SSH_CONFIG,
                                     task["host"],
                                   "python2 process.py " + task["api"] + " "
                                   + task["file"]])
        elif task["status"] == "processing":
            task["status"] ="downloading"
            task["process"] = Popen(["scp", "-F", SSH_CONFIG,
                                     task["host"] + ":" + task["file"] + ".out",
                                     "./"])
        elif task["status"] == "downloading":
            task["status"] = "done"
            servers.appendleft(task["host"])

    sleep(0.5)