aboutsummaryrefslogtreecommitdiffstats
path: root/python/task_runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/task_runner.py')
-rw-r--r--python/task_runner.py21
1 files changed, 11 insertions, 10 deletions
diff --git a/python/task_runner.py b/python/task_runner.py
index 4809991a..5b62533c 100644
--- a/python/task_runner.py
+++ b/python/task_runner.py
@@ -12,16 +12,17 @@ def run():
q = get_redis_queue()
notify("READY=1")
while True:
- f, args = loads(q.blpop("tasks")[1].decode('utf-8'))
- journal.send("Running '{}'".format(f), ARGS=dumps(args))
- if f == 'generate_scenarios':
- args += [ET]
- getattr(tasks, f)(*args)
- journal.send("'{}' completed".format(f))
- if f == "build_portfolio":
- q.rpush("tasks", dumps(("build_scenarios", args)))
- if f == "build_scenarios":
- q.rpush("tasks", dumps(("generate_scenarios", args[:-1])))
+ rpc = Rpc.from_json(q.blpop("tasks")[1].decode('utf-8'))
+
+ journal.send("Running '{}'".format(rpc.fun), ARGS=dumps(rpc.args))
+ if rpc.fun == 'generate_scenarios':
+ rpc.args += [ET]
+ rpc()
+ journal.send("'{}' completed".format(rpc.fun))
+ if rpc.fun == "build_portfolios":
+ q.rpush("tasks", str(Rpc('build_scenarios', rpc.args)))
+ if rpc.fun == "build_scenarios":
+ q.rpush("tasks", str(Rpc('generate_scenarios', rpc.args[:-1])))
ET.close()
if __name__=="__main__":