diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/task_runner.py | 39 | ||||
| -rw-r--r-- | python/tasks.py | 11 |
2 files changed, 45 insertions, 5 deletions
diff --git a/python/task_runner.py b/python/task_runner.py new file mode 100644 index 00000000..bbb6a2e9 --- /dev/null +++ b/python/task_runner.py @@ -0,0 +1,39 @@ +import logging +import os +import redis +import socket +import tasks +from systemd.daemon import notify +from systemd import journal +from json import loads, dumps +from db import dbconn +from termcolor import colored + +def run(): + hostname = socket.gethostname() + ET = dbconn('etdb') + if hostname == 'debian': + q = redis.Redis(unix_socket_path='/var/run/redis/redis.sock') + os.environ['OMP_NUM_THREADS'] = '8' + else: + q = redis.Redis(host='debian') + os.environ['OMP_NUM_THREADS'] = '4' + notify("READY=1") + while True: + f, args = loads(q.blpop("tasks")[1].decode('utf-8')) + journal.send("Running '{}'".format(f), ARGS=dumps(args)) + if f == 'generate_scenarios': + args += [ET] + getattr(tasks, f)(*args) + journal.send("'{}' completed".format(f)) + if f == "build_portfolio": + q.rpush("tasks", dumps(("build_scenarios", args))) + if f == "build_scenarios": + q.rpush("tasks", dumps(("generate_scenarios", args[:-1]))) + ET.close() + +if __name__=="__main__": + logger = logging.getLogger('intex') + logger.setLevel(logging.INFO) + logger.addHandler(logging.StreamHandler()) + run() diff --git a/python/tasks.py b/python/tasks.py index 62e3a490..6f6e5536 100644 --- a/python/tasks.py +++ b/python/tasks.py @@ -2,17 +2,18 @@ import subprocess from intex.intex_scenarios import generate_scenarios import os -Rpath = os.path.join(os.environ['CODE_DIR'], "code", "R") -logpath = os.path.join(os.environ['CODE_DIR'], "logs") - def build_portfolio(workdate, dealname, reinvflag): - args = ["Rscript", "--vanilla", os.path.join(Rpath, "build_portfolios.R"), \ + rpath = os.path.join(os.environ['CODE_DIR'], "code", "R") + logpath = os.path.join(os.environ['LOG_DIR']) + args = ["Rscript", "--vanilla", os.path.join(rpath, "build_portfolios.R"), \ workdate, dealname + "," + reinvflag] with open(os.path.join(logpath, "build_portfolios.Rout"), "w") as fh: subprocess.call(args, stderr = subprocess.STDOUT, stdout = fh, env=os.environ) def build_scenarios(workdate, dealname, reinvflag): - args = ["Rscript", "--vanilla", os.path.join(Rpath, "build_scenarios.R"), \ + rpath = os.path.join(os.environ['CODE_DIR'], "code", "R") + logpath = os.path.join(os.environ['LOG_DIR']) + args = ["Rscript", "--vanilla", os.path.join(rpath, "build_scenarios.R"), \ workdate, dealname + "," + reinvflag] with open(os.path.join(logpath, "build_scenarios.Rout"), "w") as fh: subprocess.call(args, stderr = subprocess.STDOUT, stdout = fh, env=os.environ) |
