diff options
Diffstat (limited to 'python/analytics/tranche_data.py')
| -rw-r--r-- | python/analytics/tranche_data.py | 172 |
1 files changed, 0 insertions, 172 deletions
diff --git a/python/analytics/tranche_data.py b/python/analytics/tranche_data.py deleted file mode 100644 index 316543d7..00000000 --- a/python/analytics/tranche_data.py +++ /dev/null @@ -1,172 +0,0 @@ -import datetime -import pandas as pd -import numpy as np - -from dates import bond_cal -from . import serenitas_engine, serenitas_pool -from .utils import tenor_t - - -def get_tranche_quotes( - index=None, - series=None, - tenor=None, - from_date=None, - end_date=None, - years=3, - remove_holidays=True, -): - args = locals().copy() - del args["remove_holidays"] - if args["end_date"] is None: - args["end_date"] = datetime.date.today() - if args["years"] is not None: - args["from_date"] = (args["end_date"] - pd.DateOffset(years=years)).date() - del args["years"] - - def make_str(key, val): - col_key = key - if isinstance(val, list) or isinstance(val, tuple): - op = "IN" - return "{} IN %({})s".format(key, key) - elif key == "from_date": - col_key = "date" - op = ">=" - elif key == "end_date": - col_key = "date" - op = "<=" - else: - op = "=" - return "{} {} %({})s".format("d." + col_key, op, key) - - where_clause = " AND ".join( - make_str(k, v) for k, v in args.items() if v is not None - ) - sql_str = ( - "SELECT * from " - "(SELECT quotedate as date, b.index, b.series, a.tenor, b.version, " - "a.attach, a.detach, (1-upfront_mid) as close_price, a.index_price, indexfactor/100 as indexfactor, " - "cumulativeloss, c.delta, a.tranche_spread " - "from markit_tranche_quotes a " - "left join index_version b using (basketid)" - "inner join risk_numbers c on a.quotedate=date(c.date) " - "and b.index=c.index and b.series=c.series and " - "a.tenor=c.tenor and a.attach=c.attach) d " - ) - if where_clause: - sql_str = " WHERE ".join([sql_str, where_clause]) - - def make_params(args): - return { - k: tuple(v) if isinstance(v, list) else v - for k, v in args.items() - if v is not None - } - - df = pd.read_sql_query( - sql_str, - serenitas_engine, - parse_dates={"date"}, - index_col=["date", "index", "series", "version"], - params=make_params(args), - ) - df.tenor = df.tenor.astype(tenor_t) - df = df.set_index("tenor", append=True) - df.sort_index(inplace=True) - df = df.assign( - attach_adj=lambda x: np.maximum( - (x.attach - x.cumulativeloss) / (x.indexfactor * 100), 0 - ), - detach_adj=lambda x: np.minimum( - (x.detach - x.cumulativeloss) / (x.indexfactor * 100), 1 - ), - orig_thickness=lambda x: (x.detach - x.attach) / 100, - adj_thickness=lambda x: x.detach_adj - x.attach_adj, - tranche_factor=lambda x: x.adj_thickness * x.indexfactor / x.orig_thickness, - ) - df.set_index("attach", append=True, inplace=True) - # get rid of US holidays - if remove_holidays: - dates = df.index.levels[0] - if index in ["IG", "HY"]: - holidays = bond_cal().holidays(start=dates[0], end=dates[-1]) - df = df.loc(axis=0)[dates.difference(holidays), :, :] - return df - - -def tranche_returns( - df=None, index=None, series=None, tenor=None, from_date=None, end_date=None, years=3 -): - """computes spreads and price returns - - Parameters - ---------- - df : pandas.DataFrame - index : str or List[str], optional - index type, one of 'IG', 'HY', 'EU', 'XO' - series : int or List[int], optional - tenor : str or List[str], optional - tenor in years e.g: '3yr', '5yr' - date : datetime.date, optional - starting date - years : int, optional - limits many years do we go back starting from today. - - """ - if df is None: - df = get_tranche_quotes(index, series, tenor, from_date, end_date, years) - df = df.groupby(level=["date", "index", "series", "tenor", "attach"]).nth(0) - coupon_data = pd.read_sql_query( - "SELECT index, series, tenor, coupon * 1e-4 AS coupon " - " FROM index_maturity WHERE coupon is NOT NULL", - serenitas_engine, - index_col=["index", "series", "tenor"], - ) - df = df.join(coupon_data) - df["date_1"] = df.index.get_level_values(level="date") - - # skip missing dates - returns = [] - for i, g in df.groupby(level=["index", "series", "tenor", "attach"]): - g = g.dropna() - day_frac = g["date_1"].transform( - lambda s: s.diff().astype("timedelta64[D]") / 360 - ) - index_loss = g.cumulativeloss - g.cumulativeloss.shift(1) - tranche_loss = ( - ( - g.adj_thickness.shift(1) * g.indexfactor.shift(1) - - g.adj_thickness * g.indexfactor - ) - / g.orig_thickness - if g.detach[0] != 100 - else 0 - ) - tranche_return = g.close_price - ( - 1 - - ((1 - g.close_price.shift(1)) * g.tranche_factor.shift(1) - tranche_loss) - / g.tranche_factor - ) - index_return = g.index_price - ( - 1 - - ((1 - g.index_price.shift(1)) * g.indexfactor.shift(1) - index_loss / 100) - / g.indexfactor - ) - tranche_return += day_frac * g.tranche_spread / 10000 - index_return += day_frac * g.coupon - delhedged_return = ( - tranche_return - - g.delta.shift(1) * index_return * g.indexfactor / g.tranche_factor - ) - returns.append( - pd.concat( - [index_return, tranche_return, delhedged_return], - axis=1, - keys=["index_return", "tranche_return", "delhedged_return"], - ) - ) - - df = df.merge(pd.concat(returns), left_index=True, right_index=True, how="left") - - df = df.drop(["date_1", "tranche_spread", "detach", "coupon"], axis=1) - return df |
