blob: d68334c1b323a3fdd0133a070353a07f9112ac77 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
import logging
import sys
from utils.db import dbconn
from common import get_redis_queue
from subprocess import CalledProcessError
from tasks import Rpc
def run():
ET = dbconn('etdb')
q = get_redis_queue()
while True:
rpc = Rpc.from_json(q.blpop("tasks")[1].decode('utf-8'))
if rpc.fun == 'generate_scenarios':
rpc.args += [ET]
logger.info("running: {}, {}".format(rpc.fun, rpc.args))
try:
rpc()
except CalledProcessError:
logger.error("'{}' did not complete".format(rpc.fun))
else:
logger.info("'{}' completed".format(rpc.fun))
if rpc.fun == "build_portfolios":
q.rpush("tasks", str(Rpc("build_scenarios", rpc.args)))
if rpc.fun == "build_scenarios":
q.rpush("tasks", str(Rpc("generate_scenarios", rpc.args[:-1])))
ET.close()
if __name__=="__main__":
logger = logging.getLogger('intex')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger.setLevel(logging.INFO)
# log to stderr, which is intercepted by circus
sh = logging.StreamHandler(sys.stdout)
sh.setFormatter(formatter)
logger.addHandler(sh)
run()
|