summaryrefslogtreecommitdiffstats
path: root/manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'manager.py')
-rw-r--r--manager.py54
1 files changed, 54 insertions, 0 deletions
diff --git a/manager.py b/manager.py
new file mode 100644
index 0000000..2221512
--- /dev/null
+++ b/manager.py
@@ -0,0 +1,54 @@
+from subprocess import Popen
+from os.path import join
+from time import sleep
+from glob import glob
+from collections import deque
+
+SSH_CONFIG = "ssh_config"
+to_do = set(glob("*.txt")) - set(["alive.txt", "keys.txt", "nodes.txt"])
+done = set(f[:-4] for f in glob("*.txt.out"))
+to_do = deque(to_do - done)
+apis = deque(line.strip() for line in open("keys.txt"))
+servers = deque(line.strip() for line in open("alive.txt"))
+processing = []
+
+while to_do or processing:
+ to_do.extend(task["file"] for task in processing
+ if task["status"] == "collect")
+ processing = [task for task in processing if task["status"].endswith("ing")]
+
+ if servers and to_do:
+ f = to_do.pop()
+ host = servers.popleft()
+ api = apis.pop()
+ apis.appendleft(api)
+ task = {"host": host, "file": f, "status": "uploading", "api": api,
+ "process": Popen(["scp", "-F", SSH_CONFIG, f,
+ host + ":"])}
+ processing.append(task)
+
+ for task in processing:
+ rc = task["process"].poll()
+ if rc is None:
+ continue
+ elif rc != 0:
+ servers.append(task["host"])
+ task["status"] = "collect"
+ continue
+
+ if task["status"] == "uploading":
+ task["status"] = "processing"
+ task["process"] = Popen(["ssh", "-F", SSH_CONFIG,
+ task["host"],
+ "python2 process.py " + task["api"] + " "
+ + task["file"]])
+ elif task["status"] == "processing":
+ task["status"] ="downloading"
+ task["process"] = Popen(["scp", "-F", SSH_CONFIG,
+ task["host"] + ":" + task["file"] + ".out",
+ "./"])
+ elif task["status"] == "downloading":
+ task["status"] = "done"
+ servers.appendleft(task["host"])
+
+ sleep(0.5)