aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/task_runner.py39
-rw-r--r--python/tasks.py11
2 files changed, 45 insertions, 5 deletions
diff --git a/python/task_runner.py b/python/task_runner.py
new file mode 100644
index 00000000..bbb6a2e9
--- /dev/null
+++ b/python/task_runner.py
@@ -0,0 +1,39 @@
+import logging
+import os
+import redis
+import socket
+import tasks
+from systemd.daemon import notify
+from systemd import journal
+from json import loads, dumps
+from db import dbconn
+from termcolor import colored
+
+def run():
+ hostname = socket.gethostname()
+ ET = dbconn('etdb')
+ if hostname == 'debian':
+ q = redis.Redis(unix_socket_path='/var/run/redis/redis.sock')
+ os.environ['OMP_NUM_THREADS'] = '8'
+ else:
+ q = redis.Redis(host='debian')
+ os.environ['OMP_NUM_THREADS'] = '4'
+ notify("READY=1")
+ while True:
+ f, args = loads(q.blpop("tasks")[1].decode('utf-8'))
+ journal.send("Running '{}'".format(f), ARGS=dumps(args))
+ if f == 'generate_scenarios':
+ args += [ET]
+ getattr(tasks, f)(*args)
+ journal.send("'{}' completed".format(f))
+ if f == "build_portfolio":
+ q.rpush("tasks", dumps(("build_scenarios", args)))
+ if f == "build_scenarios":
+ q.rpush("tasks", dumps(("generate_scenarios", args[:-1])))
+ ET.close()
+
+if __name__=="__main__":
+ logger = logging.getLogger('intex')
+ logger.setLevel(logging.INFO)
+ logger.addHandler(logging.StreamHandler())
+ run()
diff --git a/python/tasks.py b/python/tasks.py
index 62e3a490..6f6e5536 100644
--- a/python/tasks.py
+++ b/python/tasks.py
@@ -2,17 +2,18 @@ import subprocess
from intex.intex_scenarios import generate_scenarios
import os
-Rpath = os.path.join(os.environ['CODE_DIR'], "code", "R")
-logpath = os.path.join(os.environ['CODE_DIR'], "logs")
-
def build_portfolio(workdate, dealname, reinvflag):
- args = ["Rscript", "--vanilla", os.path.join(Rpath, "build_portfolios.R"), \
+ rpath = os.path.join(os.environ['CODE_DIR'], "code", "R")
+ logpath = os.path.join(os.environ['LOG_DIR'])
+ args = ["Rscript", "--vanilla", os.path.join(rpath, "build_portfolios.R"), \
workdate, dealname + "," + reinvflag]
with open(os.path.join(logpath, "build_portfolios.Rout"), "w") as fh:
subprocess.call(args, stderr = subprocess.STDOUT, stdout = fh, env=os.environ)
def build_scenarios(workdate, dealname, reinvflag):
- args = ["Rscript", "--vanilla", os.path.join(Rpath, "build_scenarios.R"), \
+ rpath = os.path.join(os.environ['CODE_DIR'], "code", "R")
+ logpath = os.path.join(os.environ['LOG_DIR'])
+ args = ["Rscript", "--vanilla", os.path.join(rpath, "build_scenarios.R"), \
workdate, dealname + "," + reinvflag]
with open(os.path.join(logpath, "build_scenarios.Rout"), "w") as fh:
subprocess.call(args, stderr = subprocess.STDOUT, stdout = fh, env=os.environ)