aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/task_runner.py39
-rw-r--r--python/tasks.py11
-rw-r--r--scripts/task-runner.service15
3 files changed, 60 insertions, 5 deletions
diff --git a/python/task_runner.py b/python/task_runner.py
new file mode 100644
index 00000000..bbb6a2e9
--- /dev/null
+++ b/python/task_runner.py
@@ -0,0 +1,39 @@
+import logging
+import os
+import redis
+import socket
+import tasks
+from systemd.daemon import notify
+from systemd import journal
+from json import loads, dumps
+from db import dbconn
+from termcolor import colored
+
+def run():
+ hostname = socket.gethostname()
+ ET = dbconn('etdb')
+ if hostname == 'debian':
+ q = redis.Redis(unix_socket_path='/var/run/redis/redis.sock')
+ os.environ['OMP_NUM_THREADS'] = '8'
+ else:
+ q = redis.Redis(host='debian')
+ os.environ['OMP_NUM_THREADS'] = '4'
+ notify("READY=1")
+ while True:
+ f, args = loads(q.blpop("tasks")[1].decode('utf-8'))
+ journal.send("Running '{}'".format(f), ARGS=dumps(args))
+ if f == 'generate_scenarios':
+ args += [ET]
+ getattr(tasks, f)(*args)
+ journal.send("'{}' completed".format(f))
+ if f == "build_portfolio":
+ q.rpush("tasks", dumps(("build_scenarios", args)))
+ if f == "build_scenarios":
+ q.rpush("tasks", dumps(("generate_scenarios", args[:-1])))
+ ET.close()
+
+if __name__=="__main__":
+ logger = logging.getLogger('intex')
+ logger.setLevel(logging.INFO)
+ logger.addHandler(logging.StreamHandler())
+ run()
diff --git a/python/tasks.py b/python/tasks.py
index 62e3a490..6f6e5536 100644
--- a/python/tasks.py
+++ b/python/tasks.py
@@ -2,17 +2,18 @@ import subprocess
from intex.intex_scenarios import generate_scenarios
import os
-Rpath = os.path.join(os.environ['CODE_DIR'], "code", "R")
-logpath = os.path.join(os.environ['CODE_DIR'], "logs")
-
def build_portfolio(workdate, dealname, reinvflag):
- args = ["Rscript", "--vanilla", os.path.join(Rpath, "build_portfolios.R"), \
+ rpath = os.path.join(os.environ['CODE_DIR'], "code", "R")
+ logpath = os.path.join(os.environ['LOG_DIR'])
+ args = ["Rscript", "--vanilla", os.path.join(rpath, "build_portfolios.R"), \
workdate, dealname + "," + reinvflag]
with open(os.path.join(logpath, "build_portfolios.Rout"), "w") as fh:
subprocess.call(args, stderr = subprocess.STDOUT, stdout = fh, env=os.environ)
def build_scenarios(workdate, dealname, reinvflag):
- args = ["Rscript", "--vanilla", os.path.join(Rpath, "build_scenarios.R"), \
+ rpath = os.path.join(os.environ['CODE_DIR'], "code", "R")
+ logpath = os.path.join(os.environ['LOG_DIR'])
+ args = ["Rscript", "--vanilla", os.path.join(rpath, "build_scenarios.R"), \
workdate, dealname + "," + reinvflag]
with open(os.path.join(logpath, "build_scenarios.Rout"), "w") as fh:
subprocess.call(args, stderr = subprocess.STDOUT, stdout = fh, env=os.environ)
diff --git a/scripts/task-runner.service b/scripts/task-runner.service
new file mode 100644
index 00000000..3a44447d
--- /dev/null
+++ b/scripts/task-runner.service
@@ -0,0 +1,15 @@
+[Unit]
+Description=Run Redis tasks
+
+[Service]
+Type=notify
+ExecStart=/usr/bin/python /home/guillaume/projects/code/python/task_runner.py
+WorkingDirectory=/home/guillaume/projects/code/python
+Environment=DATA_DIR=/home/share/CorpCDOs/data
+Environment=LOG_DIR=/home/share/CorpCDOs/logs
+Environment=CODE_DIR=/home/share/CorpCDOs
+Environment='R_LIBS_USER=/home/guillaume/R/\x25p-library/\x25v'
+Environment=PGPASSFILE=/home/guillaume/.pgpass
+
+[Install]
+WantedBy=default.target