aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/client.py7
-rw-r--r--python/task_runner.py11
-rw-r--r--python/tasks.py8
3 files changed, 15 insertions, 11 deletions
diff --git a/python/client.py b/python/client.py
index ca24be71..f42f89cf 100644
--- a/python/client.py
+++ b/python/client.py
@@ -13,7 +13,12 @@ def run():
if rpc.fun == 'generate_scenarios':
rpc.args += [ET]
logger.info("running: {}, {}".format(rpc.fun, rpc.args))
- rpc()
+ try:
+ rpc()
+ except CalledProcessError:
+ logger.error("'{}' did not complete".format(rpc.fun))
+ else:
+ logger.info("'{}' completed".format(rpc.fun))
if rpc.fun == "build_portfolios":
q.rpush("tasks", str(Rpc("build_scenarios", rpc.args)))
if rpc.fun == "build_scenarios":
diff --git a/python/task_runner.py b/python/task_runner.py
index 5fcf8070..3c8ca247 100644
--- a/python/task_runner.py
+++ b/python/task_runner.py
@@ -1,7 +1,6 @@
import logging
from systemd.daemon import notify
-from systemd import journal
from db import dbconn
from common import get_redis_queue
from tasks import Rpc
@@ -13,11 +12,15 @@ def run():
while True:
rpc = Rpc.from_json(q.blpop("tasks")[1].decode('utf-8'))
- journal.send("Running '{}'".format(rpc.fun), ARGS=rpc.args)
+ print("Running '{}' with {}".format(rpc.fun, rpc.args))
if rpc.fun == 'generate_scenarios':
rpc.args += [ET]
- rpc()
- journal.send("'{}' completed".format(rpc.fun))
+ try:
+ rpc()
+ except CalledProcessError:
+ print("'{}' did not complete".format(rpc.fun))
+ else:
+ print("'{}' completed".format(rpc.fun))
if rpc.fun == "build_portfolios":
q.rpush("tasks", str(Rpc('build_scenarios', rpc.args)))
if rpc.fun == "build_scenarios":
diff --git a/python/tasks.py b/python/tasks.py
index 7cc28c41..05c5f51c 100644
--- a/python/tasks.py
+++ b/python/tasks.py
@@ -10,18 +10,14 @@ def build_portfolios(workdate, dealname, reinvflag):
logpath = os.path.join(os.environ['LOG_DIR'])
args = ["Rscript", "--vanilla", os.path.join(rpath, "build_portfolios.R"), \
workdate, dealname + "," + reinvflag]
- with open(os.path.join(logpath, "build_portfolios.Rout"), "w") as fh:
- subprocess.call(args, stderr=subprocess.STDOUT, stdout=fh, env=os.environ,
- cwd=rpath)
+ subprocess.check_call(args, env=os.environ, cwd=rpath)
def build_scenarios(workdate, dealname, reinvflag):
rpath = os.path.join(os.environ['CODE_DIR'], "R")
logpath = os.path.join(os.environ['LOG_DIR'])
args = ["Rscript", "--vanilla", os.path.join(rpath, "build_scenarios.R"), \
workdate, dealname + "," + reinvflag]
- with open(os.path.join(logpath, "build_scenarios.Rout"), "w") as fh:
- subprocess.call(args, stderr=subprocess.STDOUT, stdout=fh, env=os.environ,
- cwd=rpath)
+ subprocess.check_call(args, env=os.environ, cwd=rpath)
class Rpc(object):
def __init__(self, fun, args):