1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
import csv
import datetime
import os
import psycopg2
from risk_insight import app
from flask import render_template, jsonify, request, g
from psycopg2 import sql
def get_attach_from_name(index_type, series):
if index_type.lower() == "ig":
if series == 9:
attach = [0, 3, 7, 10, 15, 30, 100]
else:
attach = [0, 3, 7, 15, 100]
elif index_type.lower() == "hy":
attach = [0, 15, 25, 35, 100]
elif index_type.lower() == "xo":
attach = [0, 10, 20, 35, 100]
elif index_type.lower() == "eu":
if series == 9:
attach = [0, 3, 6, 9, 12, 22, 100]
else:
attach = [0, 3, 6, 12, 100]
return attach
def get_db():
db = getattr(g, "_database", None)
if db is None:
db = g._database = psycopg2.connect(
database="serenitasdb", user="serenitas_user", host="debian"
)
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, "_database", None)
if db is not None:
db.close()
@app.route("/_data_tranches")
def get_risk_numbers():
index = request.args.get("i")
series = request.args.get("s", 0, int)
tenor = request.args.get("t")
greek = request.args.get("g")
db = get_db()
attach = get_attach_from_name(index, series)
sqlstr = sql.SQL(
"SELECT to_char(date, 'YYYY/MM/DD'), array_agg({}) FROM "
"(SELECT date, {} FROM risk_numbers rn JOIN tranche_quotes "
"ON tranche_quotes.id=rn.tranche_id AND rn.index=%s "
"AND rn.series=%s AND rn.tenor=%s ORDER BY rn.date, rn.attach) "
"AS a GROUP BY date ORDER BY date"
).format(sql.Identifier(greek), sql.Identifier(greek))
with db.cursor() as c:
c.execute(sqlstr, (index.upper(), series, tenor))
data = [[date] + arr for date, arr in c]
return jsonify(
labels=["Date"]
+ ["{0}-{1} {2}".format(l, u, greek) for l, u in zip(attach[:-1], attach[1:])],
data=data,
)
@app.route("/_data_indices")
def get_indices_quotes():
index = request.args.get("i")
series = request.args.get("s", 0, int)
tenor = request.args.get("t")
what = request.args.get("w")
data = []
db = get_db()
sqlstr = (
"SELECT to_char(date, 'YYYY/MM/DD'), {0} "
"from index_quotes WHERE index=%s and series=%s "
"and tenor=%s ORDER BY date"
)
if index.upper() == "HY":
tail = "price"
else:
tail = "spread"
query_columns = {
"quotes": f"close{tail}, model{tail}",
"adjquotes": f"adjclose{tail}, adjmodel{tail}",
"basis": f"close{tail} - model{tail}",
"theta": "theta",
"duration": "duration",
}
labels = {
"quotes": ["Date", "Close", "FV"],
"adjquotes": ["Date", "AdjClose", "AdjFV"],
"basis": ["Date", "Basis"],
"theta": ["Date", "Theta"],
"duration": ["Date", "Duration"],
}
sqlstr = sqlstr.format(query_columns[what])
with db.cursor() as c:
c.execute(sqlstr, (index.upper(), series, tenor))
data = c.fetchall()
return jsonify(labels=labels[what], data=data)
@app.route("/tranches.html")
def tranches():
db = get_db()
with db.cursor() as c:
c.execute("SELECT distinct series FROM risk_numbers ORDER BY series")
series_list = [s for s, in c]
return render_template("tranches.html", series=series_list)
@app.route("/indices.html")
def indices():
return render_template("indices.html", series=[9, 10, 11, 13] + list(range(16, 33)))
@app.route("/")
def main():
return render_template("index.html")
|