diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/http-server.py | 79 | ||||
| -rw-r--r-- | python/task_server/__init__.py | 3 | ||||
| -rw-r--r-- | python/task_server/globeop.py (renamed from python/globeop.py) | 0 | ||||
| -rw-r--r-- | python/task_server/rest.py | 57 |
4 files changed, 66 insertions, 73 deletions
diff --git a/python/http-server.py b/python/http-server.py index 38df18f2..c2058cd0 100644 --- a/python/http-server.py +++ b/python/http-server.py @@ -1,75 +1,8 @@ -from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler -from urlparse import urlparse, parse_qsl -from daemon import Daemon -import sys, datetime, logging, json, redis -from insert_tranche_quotes import insert_quotes -from load_intex_collateral import intex_data -import psycopg2 -from psycopg2.extras import DictCursor -from globeop import download_data +import logging +from task_server import app -class MyHandler(BaseHTTPRequestHandler): - def do_GET(self): - if self.path.startswith("/insert_quotes"): - q = urlparse(self.path).query - if q: - args = dict(parse_qsl(q)) - insert_quotes(**args) - else: - insert_quotes() - if self.path == "/insert_intex_data": - workdate = str(datetime.date.today()) - intex_data(self.server.conn, workdate) +file_handler = logging.FileHandler(filename='/home/share/CorpCDOs/logs/tasks.log') +file_handler.setLevel(logging.INFO) +app.logger.addHandler(file_handler) - if self.path.startswith("/globeop"): - q = urlparse(self.path).query - if q: - args = dict(parse_qsl(q)) - args['workdate'] = datetime.datetime.strptime(args['workdate'], "%Y-%m-%d").date() - download_data(**args) - else: - download_data() - self.send_response(200) - self.end_headers() - - def do_POST(self): - length = int(self.headers['content-length']) - d = parse_qsl(self.rfile.read(length).decode('utf-8')) - self.log_message("%s" , json.dumps(d)) - workdate = str(datetime.date.today()) - pipe = self.server.queue.pipeline() - for dealname, reinvflag in d: - pipe.rpush("tasks", json.dumps(("build_portfolio", - [workdate, dealname, reinvflag]))) - pipe.execute() - self.send_response(200) - self.end_headers() - - def log_message(self, format, *args): - logging.info("%s - - [%s] %s" % - (self.address_string(), - self.log_date_time_string(), - format%args)) - -class MyServer(HTTPServer): - def __init__(self, addr, handler, queue, conn): - HTTPServer.__init__(self, addr, handler) - self.queue = queue - self.conn = conn - -class MyDaemon(Daemon): - def run(self): - server_address = ('',8000) - logging.basicConfig(filename='/home/share/CorpCDOs/logs/tasks.log', level=logging.INFO) - q = redis.Redis(unix_socket_path='/var/run/redis/redis.sock') - self.conn = psycopg2.connect(database="ET", - user="et_user", - host="debian", - cursor_factory=DictCursor) - http = MyServer(server_address, MyHandler, q, self.conn) - http.serve_forever() - -if __name__=="__main__": - d = MyDaemon('/tmp/tasks.pid') - command = getattr(d, sys.argv[1]) - command() +app.run(host='0.0.0.0', port = 8000, debug=True) diff --git a/python/task_server/__init__.py b/python/task_server/__init__.py new file mode 100644 index 00000000..54f56688 --- /dev/null +++ b/python/task_server/__init__.py @@ -0,0 +1,3 @@ +from flask import Flask +app = Flask(__name__) +import task_server.rest diff --git a/python/globeop.py b/python/task_server/globeop.py index f0eaf644..f0eaf644 100644 --- a/python/globeop.py +++ b/python/task_server/globeop.py diff --git a/python/task_server/rest.py b/python/task_server/rest.py new file mode 100644 index 00000000..3acb6920 --- /dev/null +++ b/python/task_server/rest.py @@ -0,0 +1,57 @@ +import psycopg2 +from psycopg2.extras import DictCursor +import datetime, redis +from intex.load_intex_collateral import intex_data +#from ..globeop import download_data +from task_server import app +from flask import request, g +import json + +def get_db(): + db = getattr(g, 'db', None) + if db is None: + db = g.db = psycopg2.connect(database="ET", + user="et_user", + host="debian", + cursor_factory=DictCursor) + return db + +def get_queue(): + q = getattr(g, 'queue', None) + if q is None: + q = g.queue = redis.Redis(unix_socket_path='/var/run/redis/redis.sock') + return q + +@app.teardown_appcontext +def close_db(error): + """Closes the database again at the end of the request.""" + db = getattr(g, 'db', None) + if db: + db.close() + +@app.route('/insert_intex_data') +def intex(): + db = get_db() + intex_data(db, request.args.get('workdate', str(datetime.date.today()))) + return 'OK' + +@app.route('/globeop', methods=['PUT']) +def globeop(): + download_data(request.args.get('workdate', datetime.date.today(), lambda s: datetime.datetime. + strptime(s, "%Y-%m-%d").date())) + return 'OK' + +@app.route('/insert_quotes', methods=['PUT']) +def insert_tranches(): + insert_quotes(**request.args) + return 'OK' + +@app.route("/", methods=['POST']) +def run_tasks(): + workdate = str(datetime.date.today()) + with get_queue().pipeline() as pipe: + for dealname, reinvflag in request.form.items(): + pipe.rpush("tasks", json.dumps(("build_portfolio", + [workdate, dealname, reinvflag]))) + pipe.execute() + return 'OK' |
