diff options
| -rw-r--r-- | python/client.py | 21 | ||||
| -rw-r--r-- | python/task_runner.py | 21 | ||||
| -rw-r--r-- | python/tasks.py | 17 |
3 files changed, 39 insertions, 20 deletions
diff --git a/python/client.py b/python/client.py index d4282770..87d85bf4 100644 --- a/python/client.py +++ b/python/client.py @@ -1,23 +1,24 @@ import logging -import tasks -from json import loads, dumps +from json import dumps from db import dbconn from common import get_redis_queue +from tasks import Rpc def run(): ET = dbconn('etdb') q = get_redis_queue() while True: - f, args = loads(q.blpop("tasks")[1].decode('utf-8')) - if f == 'generate_scenarios': - args += [ET] + rpc = Rpc.from_json(q.blpop("tasks")[1].decode('utf-8')) + + if rpc.fun == 'generate_scenarios': + rpc.args += [ET] logger.info("running: {}, {}".format(f, args)) - getattr(tasks, f)(*args) - if f == "build_portfolio": - q.rpush("tasks", dumps(("build_scenarios", args))) - if f == "build_scenarios": - q.rpush("tasks", dumps(("generate_scenarios", args[:-1]))) + rpc() + if rpc.fun == "build_portfolio": + q.rpush("tasks", str(Rpc("build_scenarios", rpc.args))) + if rpc.fun == "build_scenarios": + q.rpush("tasks", str(Rpc("generate_scenarios", rpc.args[:-1]))) ET.close() if __name__=="__main__": diff --git a/python/task_runner.py b/python/task_runner.py index 4809991a..5b62533c 100644 --- a/python/task_runner.py +++ b/python/task_runner.py @@ -12,16 +12,17 @@ def run(): q = get_redis_queue() notify("READY=1") while True: - f, args = loads(q.blpop("tasks")[1].decode('utf-8')) - journal.send("Running '{}'".format(f), ARGS=dumps(args)) - if f == 'generate_scenarios': - args += [ET] - getattr(tasks, f)(*args) - journal.send("'{}' completed".format(f)) - if f == "build_portfolio": - q.rpush("tasks", dumps(("build_scenarios", args))) - if f == "build_scenarios": - q.rpush("tasks", dumps(("generate_scenarios", args[:-1]))) + rpc = Rpc.from_json(q.blpop("tasks")[1].decode('utf-8')) + + journal.send("Running '{}'".format(rpc.fun), ARGS=dumps(rpc.args)) + if rpc.fun == 'generate_scenarios': + rpc.args += [ET] + rpc() + journal.send("'{}' completed".format(rpc.fun)) + if rpc.fun == "build_portfolios": + q.rpush("tasks", str(Rpc('build_scenarios', rpc.args))) + if rpc.fun == "build_scenarios": + q.rpush("tasks", str(Rpc('generate_scenarios', rpc.args[:-1]))) ET.close() if __name__=="__main__": diff --git a/python/tasks.py b/python/tasks.py index 8f646fdb..f50d8e13 100644 --- a/python/tasks.py +++ b/python/tasks.py @@ -19,3 +19,20 @@ def build_scenarios(workdate, dealname, reinvflag): with open(os.path.join(logpath, "build_scenarios.Rout"), "w") as fh: subprocess.call(args, stderr=subprocess.STDOUT, stdout=fh, env=os.environ, cwd=rpath) + +class Rpc(object): + def __init__(self, fun, args): + self.fun = fun + self.args = args + + def __str__(self): + return dumps({'fun': self.fun, + 'args': self.args}) + def __call__(self): + globals()[self.fun](*self.args) + + @classmethod + def from_json(cls, s): + rpc = loads(s) + instance = cls(rpc['fun'], rpc['args']) + return instance |
