aboutsummaryrefslogtreecommitdiffstats
path: root/python/analytics/tranche_data.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/analytics/tranche_data.py')
-rw-r--r--python/analytics/tranche_data.py172
1 files changed, 0 insertions, 172 deletions
diff --git a/python/analytics/tranche_data.py b/python/analytics/tranche_data.py
deleted file mode 100644
index 316543d7..00000000
--- a/python/analytics/tranche_data.py
+++ /dev/null
@@ -1,172 +0,0 @@
-import datetime
-import pandas as pd
-import numpy as np
-
-from dates import bond_cal
-from . import serenitas_engine, serenitas_pool
-from .utils import tenor_t
-
-
-def get_tranche_quotes(
- index=None,
- series=None,
- tenor=None,
- from_date=None,
- end_date=None,
- years=3,
- remove_holidays=True,
-):
- args = locals().copy()
- del args["remove_holidays"]
- if args["end_date"] is None:
- args["end_date"] = datetime.date.today()
- if args["years"] is not None:
- args["from_date"] = (args["end_date"] - pd.DateOffset(years=years)).date()
- del args["years"]
-
- def make_str(key, val):
- col_key = key
- if isinstance(val, list) or isinstance(val, tuple):
- op = "IN"
- return "{} IN %({})s".format(key, key)
- elif key == "from_date":
- col_key = "date"
- op = ">="
- elif key == "end_date":
- col_key = "date"
- op = "<="
- else:
- op = "="
- return "{} {} %({})s".format("d." + col_key, op, key)
-
- where_clause = " AND ".join(
- make_str(k, v) for k, v in args.items() if v is not None
- )
- sql_str = (
- "SELECT * from "
- "(SELECT quotedate as date, b.index, b.series, a.tenor, b.version, "
- "a.attach, a.detach, (1-upfront_mid) as close_price, a.index_price, indexfactor/100 as indexfactor, "
- "cumulativeloss, c.delta, a.tranche_spread "
- "from markit_tranche_quotes a "
- "left join index_version b using (basketid)"
- "inner join risk_numbers c on a.quotedate=date(c.date) "
- "and b.index=c.index and b.series=c.series and "
- "a.tenor=c.tenor and a.attach=c.attach) d "
- )
- if where_clause:
- sql_str = " WHERE ".join([sql_str, where_clause])
-
- def make_params(args):
- return {
- k: tuple(v) if isinstance(v, list) else v
- for k, v in args.items()
- if v is not None
- }
-
- df = pd.read_sql_query(
- sql_str,
- serenitas_engine,
- parse_dates={"date"},
- index_col=["date", "index", "series", "version"],
- params=make_params(args),
- )
- df.tenor = df.tenor.astype(tenor_t)
- df = df.set_index("tenor", append=True)
- df.sort_index(inplace=True)
- df = df.assign(
- attach_adj=lambda x: np.maximum(
- (x.attach - x.cumulativeloss) / (x.indexfactor * 100), 0
- ),
- detach_adj=lambda x: np.minimum(
- (x.detach - x.cumulativeloss) / (x.indexfactor * 100), 1
- ),
- orig_thickness=lambda x: (x.detach - x.attach) / 100,
- adj_thickness=lambda x: x.detach_adj - x.attach_adj,
- tranche_factor=lambda x: x.adj_thickness * x.indexfactor / x.orig_thickness,
- )
- df.set_index("attach", append=True, inplace=True)
- # get rid of US holidays
- if remove_holidays:
- dates = df.index.levels[0]
- if index in ["IG", "HY"]:
- holidays = bond_cal().holidays(start=dates[0], end=dates[-1])
- df = df.loc(axis=0)[dates.difference(holidays), :, :]
- return df
-
-
-def tranche_returns(
- df=None, index=None, series=None, tenor=None, from_date=None, end_date=None, years=3
-):
- """computes spreads and price returns
-
- Parameters
- ----------
- df : pandas.DataFrame
- index : str or List[str], optional
- index type, one of 'IG', 'HY', 'EU', 'XO'
- series : int or List[int], optional
- tenor : str or List[str], optional
- tenor in years e.g: '3yr', '5yr'
- date : datetime.date, optional
- starting date
- years : int, optional
- limits many years do we go back starting from today.
-
- """
- if df is None:
- df = get_tranche_quotes(index, series, tenor, from_date, end_date, years)
- df = df.groupby(level=["date", "index", "series", "tenor", "attach"]).nth(0)
- coupon_data = pd.read_sql_query(
- "SELECT index, series, tenor, coupon * 1e-4 AS coupon "
- " FROM index_maturity WHERE coupon is NOT NULL",
- serenitas_engine,
- index_col=["index", "series", "tenor"],
- )
- df = df.join(coupon_data)
- df["date_1"] = df.index.get_level_values(level="date")
-
- # skip missing dates
- returns = []
- for i, g in df.groupby(level=["index", "series", "tenor", "attach"]):
- g = g.dropna()
- day_frac = g["date_1"].transform(
- lambda s: s.diff().astype("timedelta64[D]") / 360
- )
- index_loss = g.cumulativeloss - g.cumulativeloss.shift(1)
- tranche_loss = (
- (
- g.adj_thickness.shift(1) * g.indexfactor.shift(1)
- - g.adj_thickness * g.indexfactor
- )
- / g.orig_thickness
- if g.detach[0] != 100
- else 0
- )
- tranche_return = g.close_price - (
- 1
- - ((1 - g.close_price.shift(1)) * g.tranche_factor.shift(1) - tranche_loss)
- / g.tranche_factor
- )
- index_return = g.index_price - (
- 1
- - ((1 - g.index_price.shift(1)) * g.indexfactor.shift(1) - index_loss / 100)
- / g.indexfactor
- )
- tranche_return += day_frac * g.tranche_spread / 10000
- index_return += day_frac * g.coupon
- delhedged_return = (
- tranche_return
- - g.delta.shift(1) * index_return * g.indexfactor / g.tranche_factor
- )
- returns.append(
- pd.concat(
- [index_return, tranche_return, delhedged_return],
- axis=1,
- keys=["index_return", "tranche_return", "delhedged_return"],
- )
- )
-
- df = df.merge(pd.concat(returns), left_index=True, right_index=True, how="left")
-
- df = df.drop(["date_1", "tranche_spread", "detach", "coupon"], axis=1)
- return df