diff options
| -rw-r--r-- | python/task_runner.py | 39 | ||||
| -rw-r--r-- | python/tasks.py | 11 | ||||
| -rw-r--r-- | scripts/task-runner.service | 15 |
3 files changed, 60 insertions, 5 deletions
diff --git a/python/task_runner.py b/python/task_runner.py new file mode 100644 index 00000000..bbb6a2e9 --- /dev/null +++ b/python/task_runner.py @@ -0,0 +1,39 @@ +import logging +import os +import redis +import socket +import tasks +from systemd.daemon import notify +from systemd import journal +from json import loads, dumps +from db import dbconn +from termcolor import colored + +def run(): + hostname = socket.gethostname() + ET = dbconn('etdb') + if hostname == 'debian': + q = redis.Redis(unix_socket_path='/var/run/redis/redis.sock') + os.environ['OMP_NUM_THREADS'] = '8' + else: + q = redis.Redis(host='debian') + os.environ['OMP_NUM_THREADS'] = '4' + notify("READY=1") + while True: + f, args = loads(q.blpop("tasks")[1].decode('utf-8')) + journal.send("Running '{}'".format(f), ARGS=dumps(args)) + if f == 'generate_scenarios': + args += [ET] + getattr(tasks, f)(*args) + journal.send("'{}' completed".format(f)) + if f == "build_portfolio": + q.rpush("tasks", dumps(("build_scenarios", args))) + if f == "build_scenarios": + q.rpush("tasks", dumps(("generate_scenarios", args[:-1]))) + ET.close() + +if __name__=="__main__": + logger = logging.getLogger('intex') + logger.setLevel(logging.INFO) + logger.addHandler(logging.StreamHandler()) + run() diff --git a/python/tasks.py b/python/tasks.py index 62e3a490..6f6e5536 100644 --- a/python/tasks.py +++ b/python/tasks.py @@ -2,17 +2,18 @@ import subprocess from intex.intex_scenarios import generate_scenarios import os -Rpath = os.path.join(os.environ['CODE_DIR'], "code", "R") -logpath = os.path.join(os.environ['CODE_DIR'], "logs") - def build_portfolio(workdate, dealname, reinvflag): - args = ["Rscript", "--vanilla", os.path.join(Rpath, "build_portfolios.R"), \ + rpath = os.path.join(os.environ['CODE_DIR'], "code", "R") + logpath = os.path.join(os.environ['LOG_DIR']) + args = ["Rscript", "--vanilla", os.path.join(rpath, "build_portfolios.R"), \ workdate, dealname + "," + reinvflag] with open(os.path.join(logpath, "build_portfolios.Rout"), "w") as fh: subprocess.call(args, stderr = subprocess.STDOUT, stdout = fh, env=os.environ) def build_scenarios(workdate, dealname, reinvflag): - args = ["Rscript", "--vanilla", os.path.join(Rpath, "build_scenarios.R"), \ + rpath = os.path.join(os.environ['CODE_DIR'], "code", "R") + logpath = os.path.join(os.environ['LOG_DIR']) + args = ["Rscript", "--vanilla", os.path.join(rpath, "build_scenarios.R"), \ workdate, dealname + "," + reinvflag] with open(os.path.join(logpath, "build_scenarios.Rout"), "w") as fh: subprocess.call(args, stderr = subprocess.STDOUT, stdout = fh, env=os.environ) diff --git a/scripts/task-runner.service b/scripts/task-runner.service new file mode 100644 index 00000000..3a44447d --- /dev/null +++ b/scripts/task-runner.service @@ -0,0 +1,15 @@ +[Unit] +Description=Run Redis tasks + +[Service] +Type=notify +ExecStart=/usr/bin/python /home/guillaume/projects/code/python/task_runner.py +WorkingDirectory=/home/guillaume/projects/code/python +Environment=DATA_DIR=/home/share/CorpCDOs/data +Environment=LOG_DIR=/home/share/CorpCDOs/logs +Environment=CODE_DIR=/home/share/CorpCDOs +Environment='R_LIBS_USER=/home/guillaume/R/\x25p-library/\x25v' +Environment=PGPASSFILE=/home/guillaume/.pgpass + +[Install] +WantedBy=default.target |
