aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/analytics/tranche_data.py144
1 files changed, 144 insertions, 0 deletions
diff --git a/python/analytics/tranche_data.py b/python/analytics/tranche_data.py
new file mode 100644
index 00000000..f4a13a9e
--- /dev/null
+++ b/python/analytics/tranche_data.py
@@ -0,0 +1,144 @@
+import datetime
+import pandas as pd
+import numpy as np
+
+from dates import bond_cal
+from . import serenitas_engine, serenitas_pool
+from .utils import tenor_t
+
+
+def get_tranche_quotes(
+ index=None,
+ series=None,
+ tenor=None,
+ from_date=None,
+ end_date=None,
+ years=3,
+ remove_holidays=True,
+):
+ args = locals().copy()
+ del args["remove_holidays"]
+ if args["end_date"] is None:
+ args["end_date"] = datetime.date.today()
+ if args["years"] is not None:
+ args["from_date"] = (args["end_date"] - pd.DateOffset(years=years)).date()
+ del args["years"]
+
+ def make_str(key, val):
+ col_key = key
+ if isinstance(val, list) or isinstance(val, tuple):
+ op = "IN"
+ return "{} IN %({})s".format(key, key)
+ elif key == "from_date":
+ col_key = "quotedate"
+ op = ">="
+ elif key == "end_date":
+ col_key = "quotedate"
+ op = "<="
+ else:
+ op = "="
+ return "{} {} %({})s".format("d." + col_key, op, key)
+
+ where_clause = " AND ".join(
+ make_str(k, v) for k, v in args.items() if v is not None
+ )
+ sql_str = (
+ "SELECT * from "
+ "(SELECT quotedate, b.index, b.series, a.tenor, b.version, "
+ "a.attach, a.detach, upfront_mid, a.index_price, indexfactor, "
+ "cumulativeloss, c.delta, a.tranche_spread "
+ "from markit_tranche_quotes a "
+ "left join index_version b using (basketid)"
+ "inner join risk_numbers c on a.quotedate=date(c.date) "
+ "and b.index=c.index and b.series=c.series and "
+ "a.tenor=c.tenor and a.attach=c.attach) d "
+ )
+ if where_clause:
+ sql_str = " WHERE ".join([sql_str, where_clause])
+
+ def make_params(args):
+ return {
+ k: tuple(v) if isinstance(v, list) else v
+ for k, v in args.items()
+ if v is not None
+ }
+
+ df = pd.read_sql_query(
+ sql_str,
+ serenitas_engine,
+ parse_dates={"date": {"utc": True}},
+ index_col=["quotedate", "index", "series", "version"],
+ params=make_params(args),
+ )
+ df.tenor = df.tenor.astype(tenor_t)
+ df = df.set_index("tenor", append=True)
+ df.sort_index(inplace=True)
+ df = df.assign(
+ close_price=lambda x: x.upfront_mid
+ if index in ["IG", "EU"]
+ else 1 - x.upfront_mid
+ )
+ df = df.assign(
+ attach_adj=lambda x: np.maximum(
+ (x.attach - x.cumulativeloss) / df.indexfactor, 0
+ ),
+ detach_adj=lambda x: np.minimum(
+ (x.detach - x.cumulativeloss) / df.indexfactor, 1
+ ),
+ adj_thickness=lambda x: x.detach_adj - x.attach_adj,
+ )
+ df.set_index("attach", append=True, inplace=True)
+ # get rid of US holidays
+ if remove_holidays:
+ dates = df.index.levels[0]
+ if index in ["IG", "HY"]:
+ holidays = bond_cal().holidays(start=dates[0], end=dates[-1])
+ df = df.loc(axis=0)[dates.difference(holidays), :, :]
+ return df
+
+
+def tranche_returns(
+ df=None, index=None, series=None, tenor=None, from_date=None, end_date=None, years=3
+):
+ """computes spreads and price returns
+
+ Parameters
+ ----------
+ df : pandas.DataFrame
+ index : str or List[str], optional
+ index type, one of 'IG', 'HY', 'EU', 'XO'
+ series : int or List[int], optional
+ tenor : str or List[str], optional
+ tenor in years e.g: '3yr', '5yr'
+ date : datetime.date, optional
+ starting date
+ years : int, optional
+ limits many years do we go back starting from today.
+
+ """
+ if df is None:
+ df = get_tranche_quotes(index, series, tenor, from_date, end_date, years)
+
+ df = df.groupby(level=["quotedate", "index", "series", "tenor", "attach"]).nth(0)
+ g = df.groupby(level=["index", "series", "tenor", "attach"])
+ tranche_return = g.close_price.shift(0) * g.adj_thickness.shift(
+ 0
+ ) / g.adj_thickness.shift(1) - g.close_price.shift(1)
+ index_return = g.index_price.diff()
+ returns = pd.concat(
+ [index_return, tranche_return], axis=1, keys=["index_return", "tranche_return"]
+ )
+ df = df.merge(returns, left_index=True, right_index=True)
+
+ df["date"] = df.index.get_level_values(level="quotedate")
+ df["day_frac"] = df.groupby(level=["index", "series", "tenor", "attach"])[
+ "date"
+ ].transform(lambda s: s.diff().astype("timedelta64[D]") / 360)
+ df["tranche_return"] += df.day_frac * df.tranche_spread / 10000
+
+ df = df.assign(deladj_return=lambda x: x.tranche_return - x.delta * x.index_return)
+
+ df = df.drop(
+ ["date", "day_frac", "tranche_spread", "cumulativeloss", "detach"], axis=1
+ )
+ return df