diff options
| author | Thibaut Horel <thibaut.horel@gmail.com> | 2013-09-29 05:12:56 -0400 |
|---|---|---|
| committer | Thibaut Horel <thibaut.horel@gmail.com> | 2013-09-29 05:12:56 -0400 |
| commit | 19346fa9068878af516cdb670bea4f791337507b (patch) | |
| tree | 54d4fa5a82b2e0305f3b050dc1ebb53ec9d82a5d /manager.py | |
| download | lastfm-19346fa9068878af516cdb670bea4f791337507b.tar.gz | |
Initial commit
Diffstat (limited to 'manager.py')
| -rw-r--r-- | manager.py | 54 |
1 files changed, 54 insertions, 0 deletions
diff --git a/manager.py b/manager.py new file mode 100644 index 0000000..2221512 --- /dev/null +++ b/manager.py @@ -0,0 +1,54 @@ +from subprocess import Popen +from os.path import join +from time import sleep +from glob import glob +from collections import deque + +SSH_CONFIG = "ssh_config" +to_do = set(glob("*.txt")) - set(["alive.txt", "keys.txt", "nodes.txt"]) +done = set(f[:-4] for f in glob("*.txt.out")) +to_do = deque(to_do - done) +apis = deque(line.strip() for line in open("keys.txt")) +servers = deque(line.strip() for line in open("alive.txt")) +processing = [] + +while to_do or processing: + to_do.extend(task["file"] for task in processing + if task["status"] == "collect") + processing = [task for task in processing if task["status"].endswith("ing")] + + if servers and to_do: + f = to_do.pop() + host = servers.popleft() + api = apis.pop() + apis.appendleft(api) + task = {"host": host, "file": f, "status": "uploading", "api": api, + "process": Popen(["scp", "-F", SSH_CONFIG, f, + host + ":"])} + processing.append(task) + + for task in processing: + rc = task["process"].poll() + if rc is None: + continue + elif rc != 0: + servers.append(task["host"]) + task["status"] = "collect" + continue + + if task["status"] == "uploading": + task["status"] = "processing" + task["process"] = Popen(["ssh", "-F", SSH_CONFIG, + task["host"], + "python2 process.py " + task["api"] + " " + + task["file"]]) + elif task["status"] == "processing": + task["status"] ="downloading" + task["process"] = Popen(["scp", "-F", SSH_CONFIG, + task["host"] + ":" + task["file"] + ".out", + "./"]) + elif task["status"] == "downloading": + task["status"] = "done" + servers.appendleft(task["host"]) + + sleep(0.5) |
