aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/client.py21
-rw-r--r--python/task_runner.py21
-rw-r--r--python/tasks.py17
3 files changed, 39 insertions, 20 deletions
diff --git a/python/client.py b/python/client.py
index d4282770..87d85bf4 100644
--- a/python/client.py
+++ b/python/client.py
@@ -1,23 +1,24 @@
import logging
-import tasks
-from json import loads, dumps
+from json import dumps
from db import dbconn
from common import get_redis_queue
+from tasks import Rpc
def run():
ET = dbconn('etdb')
q = get_redis_queue()
while True:
- f, args = loads(q.blpop("tasks")[1].decode('utf-8'))
- if f == 'generate_scenarios':
- args += [ET]
+ rpc = Rpc.from_json(q.blpop("tasks")[1].decode('utf-8'))
+
+ if rpc.fun == 'generate_scenarios':
+ rpc.args += [ET]
logger.info("running: {}, {}".format(f, args))
- getattr(tasks, f)(*args)
- if f == "build_portfolio":
- q.rpush("tasks", dumps(("build_scenarios", args)))
- if f == "build_scenarios":
- q.rpush("tasks", dumps(("generate_scenarios", args[:-1])))
+ rpc()
+ if rpc.fun == "build_portfolio":
+ q.rpush("tasks", str(Rpc("build_scenarios", rpc.args)))
+ if rpc.fun == "build_scenarios":
+ q.rpush("tasks", str(Rpc("generate_scenarios", rpc.args[:-1])))
ET.close()
if __name__=="__main__":
diff --git a/python/task_runner.py b/python/task_runner.py
index 4809991a..5b62533c 100644
--- a/python/task_runner.py
+++ b/python/task_runner.py
@@ -12,16 +12,17 @@ def run():
q = get_redis_queue()
notify("READY=1")
while True:
- f, args = loads(q.blpop("tasks")[1].decode('utf-8'))
- journal.send("Running '{}'".format(f), ARGS=dumps(args))
- if f == 'generate_scenarios':
- args += [ET]
- getattr(tasks, f)(*args)
- journal.send("'{}' completed".format(f))
- if f == "build_portfolio":
- q.rpush("tasks", dumps(("build_scenarios", args)))
- if f == "build_scenarios":
- q.rpush("tasks", dumps(("generate_scenarios", args[:-1])))
+ rpc = Rpc.from_json(q.blpop("tasks")[1].decode('utf-8'))
+
+ journal.send("Running '{}'".format(rpc.fun), ARGS=dumps(rpc.args))
+ if rpc.fun == 'generate_scenarios':
+ rpc.args += [ET]
+ rpc()
+ journal.send("'{}' completed".format(rpc.fun))
+ if rpc.fun == "build_portfolios":
+ q.rpush("tasks", str(Rpc('build_scenarios', rpc.args)))
+ if rpc.fun == "build_scenarios":
+ q.rpush("tasks", str(Rpc('generate_scenarios', rpc.args[:-1])))
ET.close()
if __name__=="__main__":
diff --git a/python/tasks.py b/python/tasks.py
index 8f646fdb..f50d8e13 100644
--- a/python/tasks.py
+++ b/python/tasks.py
@@ -19,3 +19,20 @@ def build_scenarios(workdate, dealname, reinvflag):
with open(os.path.join(logpath, "build_scenarios.Rout"), "w") as fh:
subprocess.call(args, stderr=subprocess.STDOUT, stdout=fh, env=os.environ,
cwd=rpath)
+
+class Rpc(object):
+ def __init__(self, fun, args):
+ self.fun = fun
+ self.args = args
+
+ def __str__(self):
+ return dumps({'fun': self.fun,
+ 'args': self.args})
+ def __call__(self):
+ globals()[self.fun](*self.args)
+
+ @classmethod
+ def from_json(cls, s):
+ rpc = loads(s)
+ instance = cls(rpc['fun'], rpc['args'])
+ return instance