aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/http-server.py79
-rw-r--r--python/task_server/__init__.py3
-rw-r--r--python/task_server/globeop.py (renamed from python/globeop.py)0
-rw-r--r--python/task_server/rest.py57
4 files changed, 66 insertions, 73 deletions
diff --git a/python/http-server.py b/python/http-server.py
index 38df18f2..c2058cd0 100644
--- a/python/http-server.py
+++ b/python/http-server.py
@@ -1,75 +1,8 @@
-from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
-from urlparse import urlparse, parse_qsl
-from daemon import Daemon
-import sys, datetime, logging, json, redis
-from insert_tranche_quotes import insert_quotes
-from load_intex_collateral import intex_data
-import psycopg2
-from psycopg2.extras import DictCursor
-from globeop import download_data
+import logging
+from task_server import app
-class MyHandler(BaseHTTPRequestHandler):
- def do_GET(self):
- if self.path.startswith("/insert_quotes"):
- q = urlparse(self.path).query
- if q:
- args = dict(parse_qsl(q))
- insert_quotes(**args)
- else:
- insert_quotes()
- if self.path == "/insert_intex_data":
- workdate = str(datetime.date.today())
- intex_data(self.server.conn, workdate)
+file_handler = logging.FileHandler(filename='/home/share/CorpCDOs/logs/tasks.log')
+file_handler.setLevel(logging.INFO)
+app.logger.addHandler(file_handler)
- if self.path.startswith("/globeop"):
- q = urlparse(self.path).query
- if q:
- args = dict(parse_qsl(q))
- args['workdate'] = datetime.datetime.strptime(args['workdate'], "%Y-%m-%d").date()
- download_data(**args)
- else:
- download_data()
- self.send_response(200)
- self.end_headers()
-
- def do_POST(self):
- length = int(self.headers['content-length'])
- d = parse_qsl(self.rfile.read(length).decode('utf-8'))
- self.log_message("%s" , json.dumps(d))
- workdate = str(datetime.date.today())
- pipe = self.server.queue.pipeline()
- for dealname, reinvflag in d:
- pipe.rpush("tasks", json.dumps(("build_portfolio",
- [workdate, dealname, reinvflag])))
- pipe.execute()
- self.send_response(200)
- self.end_headers()
-
- def log_message(self, format, *args):
- logging.info("%s - - [%s] %s" %
- (self.address_string(),
- self.log_date_time_string(),
- format%args))
-
-class MyServer(HTTPServer):
- def __init__(self, addr, handler, queue, conn):
- HTTPServer.__init__(self, addr, handler)
- self.queue = queue
- self.conn = conn
-
-class MyDaemon(Daemon):
- def run(self):
- server_address = ('',8000)
- logging.basicConfig(filename='/home/share/CorpCDOs/logs/tasks.log', level=logging.INFO)
- q = redis.Redis(unix_socket_path='/var/run/redis/redis.sock')
- self.conn = psycopg2.connect(database="ET",
- user="et_user",
- host="debian",
- cursor_factory=DictCursor)
- http = MyServer(server_address, MyHandler, q, self.conn)
- http.serve_forever()
-
-if __name__=="__main__":
- d = MyDaemon('/tmp/tasks.pid')
- command = getattr(d, sys.argv[1])
- command()
+app.run(host='0.0.0.0', port = 8000, debug=True)
diff --git a/python/task_server/__init__.py b/python/task_server/__init__.py
new file mode 100644
index 00000000..54f56688
--- /dev/null
+++ b/python/task_server/__init__.py
@@ -0,0 +1,3 @@
+from flask import Flask
+app = Flask(__name__)
+import task_server.rest
diff --git a/python/globeop.py b/python/task_server/globeop.py
index f0eaf644..f0eaf644 100644
--- a/python/globeop.py
+++ b/python/task_server/globeop.py
diff --git a/python/task_server/rest.py b/python/task_server/rest.py
new file mode 100644
index 00000000..3acb6920
--- /dev/null
+++ b/python/task_server/rest.py
@@ -0,0 +1,57 @@
+import psycopg2
+from psycopg2.extras import DictCursor
+import datetime, redis
+from intex.load_intex_collateral import intex_data
+#from ..globeop import download_data
+from task_server import app
+from flask import request, g
+import json
+
+def get_db():
+ db = getattr(g, 'db', None)
+ if db is None:
+ db = g.db = psycopg2.connect(database="ET",
+ user="et_user",
+ host="debian",
+ cursor_factory=DictCursor)
+ return db
+
+def get_queue():
+ q = getattr(g, 'queue', None)
+ if q is None:
+ q = g.queue = redis.Redis(unix_socket_path='/var/run/redis/redis.sock')
+ return q
+
+@app.teardown_appcontext
+def close_db(error):
+ """Closes the database again at the end of the request."""
+ db = getattr(g, 'db', None)
+ if db:
+ db.close()
+
+@app.route('/insert_intex_data')
+def intex():
+ db = get_db()
+ intex_data(db, request.args.get('workdate', str(datetime.date.today())))
+ return 'OK'
+
+@app.route('/globeop', methods=['PUT'])
+def globeop():
+ download_data(request.args.get('workdate', datetime.date.today(), lambda s: datetime.datetime.
+ strptime(s, "%Y-%m-%d").date()))
+ return 'OK'
+
+@app.route('/insert_quotes', methods=['PUT'])
+def insert_tranches():
+ insert_quotes(**request.args)
+ return 'OK'
+
+@app.route("/", methods=['POST'])
+def run_tasks():
+ workdate = str(datetime.date.today())
+ with get_queue().pipeline() as pipe:
+ for dealname, reinvflag in request.form.items():
+ pipe.rpush("tasks", json.dumps(("build_portfolio",
+ [workdate, dealname, reinvflag])))
+ pipe.execute()
+ return 'OK'