diff options
Diffstat (limited to 'python/analytics/portfolio.py')
| -rw-r--r-- | python/analytics/portfolio.py | 373 |
1 files changed, 0 insertions, 373 deletions
diff --git a/python/analytics/portfolio.py b/python/analytics/portfolio.py deleted file mode 100644 index d37c0e11..00000000 --- a/python/analytics/portfolio.py +++ /dev/null @@ -1,373 +0,0 @@ -from __future__ import annotations -from .index import CreditIndex -from .option import BlackSwaption -from .tranche_basket import DualCorrTranche -from functools import reduce - -import pandas as pd -import numpy as np -import logging - -logger = logging.getLogger(__name__) - - -def portf_repr(method): - def f(*args): - portf = args[0] - thousands = "{:,.2f}".format - - def percent(x): - if np.isnan(x): - return "N/A" - else: - return f"{100*x:.2f}%" - - header = f"Portfolio {portf.value_date}\n\n" - kwargs = { - "formatters": { - "Notional": thousands, - "PV": thousands, - "Delta": percent, - "Gamma": percent, - "Theta": thousands, - "Vega": thousands, - "Vol": percent, - "Ref": thousands, - "Attach Rho": percent, - "Detach Rho": percent, - "HY Equiv": thousands, - "Strike": lambda x: "N/A" if np.isnan(x) else str(x), - "Type": lambda x: "N/A" if x is None else x, - "Corr01": thousands, - }, - "index": True, - } - if method == "string": - kwargs["line_width"] = 100 - s = getattr(portf._todf().dropna(axis=1, how="all"), "to_" + method)(**kwargs) - return header + s - - return f - - -class Portfolio: - def __init__(self, trades, trade_ids=None): - self.trades = trades - if trade_ids is not None: - self.trade_ids = trade_ids - else: - self.trade_ids = (None,) * len(trades) - if trades: - value_dates = set(t.value_date for t in self.trades) - self._value_date = value_dates.pop() - if len(value_dates) >= 1: - logger.warn( - f"not all instruments have the same trade date, picking {self._value_date}" - ) - - def __bool__(self): - return bool(self.trades) - - def add_trade(self, trades, trade_ids): - self.trades.append(trades) - self.trade_ids.append(trade_ids) - - def __iter__(self): - for t in self.trades: - yield t - - def __iadd__(self, other: Portfolio): - if other: - self.trades.extend(other.trades) - self.trade_ids.extend(other.trade_ids) - return self - - def __add__(self, other: Portfolio): - return Portfolio( - self.trades + other.trades, trade_ids=self.trade_ids + other.trade_ids - ) - - def __getitem__(self, trade_id): - for tid, trade in zip(self.trade_ids, self.trades): - if tid == trade_id: - break - else: - raise ValueError(f"{trade_id} not found") - return trade - - @property - def indices(self): - return [t for t in self.trades if isinstance(t, CreditIndex)] - - @property - def swaptions(self): - return [t for t in self.trades if isinstance(t, BlackSwaption)] - - @property - def tranches(self): - return [t for t in self.trades if isinstance(t, DualCorrTranche)] - - def items(self): - for trade_id, trade in zip(self.trade_ids, self.trades): - yield (trade_id, trade) - - @property - def pnl(self): - return sum(t.pnl for t in self.trades) - - @property - def pnl_list(self): - return [t.pnl for t in self.trades] - - @property - def pv(self): - return sum(t.pv for t in self.trades) - - @property - def pv_list(self): - return [t.pv for t in self.trades] - - def reset_pv(self): - for t in self.trades: - t.reset_pv() - - @property - def value_date(self): - return self._value_date - - @property - def jump_to_default(self): - return sum(t.jump_to_default for t in self.trades if isinstance(t, CreditIndex)) - - def jtd_single_names(self): - jtd = reduce( - lambda x, y: x.add(y, fill_value=0.0), - ( - t.jtd_single_names() - for t in self.trades - if isinstance(t, (DualCorrTranche, CreditIndex)) - ), - ) - return ( - jtd.unstack() - .sort_index(1, ascending=False) - .fillna(0.0) - .cumsum(axis=1) - .sort_index(1) - ) - - @value_date.setter - def value_date(self, d): - for t in self.trades: - t.value_date = d - self._value_date = d - - def mark(self, **kwargs): - for tid, t in self.items(): - try: - t.mark(**kwargs) - logger.debug(f"marking {tid} to {t.pv}") - except Exception as e: - raise - - def shock(self, params=["pnl"], **kwargs): - return { - trade_id: trade.shock(params, **kwargs) for trade_id, trade in self.items() - } - - @property - def ref(self): - if len(self.indices) == 1: - return self.indices[0].ref - else: - return [index.ref for index in self.indices] - - @ref.setter - def ref(self, val): - if len(self.indices) == 1: - self.indices[0].ref = val - elif len(self.indices) == 0: - # no index, so set the individual refs - for t in self.swaptions: - t.index.ref = val - elif len(self.indices) == len(val): - for index, val in zip(self.indices, val): - index.ref = val - else: - raise ValueError("The number of refs doesn't match the number of indices") - - @property - def spread(self): - if len(self.indices) == 1: - return self.indices[0].spread - else: - return [index.spread for index in self.indices] - - @spread.setter - def spread(self, val): - if len(self.indices) == 1: - self.indices[0].spread = val - elif len(self.indices) == 0: - # no index, so set the individual refs - for t in self.swaptions: - t.index.spread = val - elif len(self.indices) == len(val): - for index, val in zip(self.indices, val): - index.spread = val - else: - raise ValueError( - "The number of spreads doesn't match the number of indices" - ) - - @property - def delta(self): - """returns the equivalent protection notional - - makes sense only where there is a single index.""" - return sum( - [getattr(t, "delta", t._direction) * t.notional for t in self.trades] - ) - - @property - def gamma(self): - return sum([getattr(t, "gamma", 0) * t.notional for t in self.trades]) - - @property - def dv01(self): - return sum(t.dv01 for t in self.trades) - - @property - def theta(self): - return sum(t.theta for t in self.trades) - - @property - def hy_equiv(self): - return sum(t.hy_equiv for t in self.trades) - - @property - def corr01(self): - return sum(t.corr01 for t in self.trades) - - @property - def vega(self): - return sum(t.vega for t in self.trades) - - def _todf(self): - headers = [ - "Product", - "Index", - "Notional", - "Ref", - "Strike", - "Direction", - "Type", - "Expiry", - "Vol", - "PV", - "Delta", - "Gamma", - "Theta", - "Corr01", - "IRDV01", - "Vega", - "attach", - "detach", - "Attach Rho", - "Detach Rho", - "HY Equiv", - ] - rec = [] - for t in self.trades: - if isinstance(t, CreditIndex): - name = f"{t.index_type}{t.series} {t.tenor}" - r = ( - "Index", - name, - t.notional, - t.ref, - None, - t.direction, - getattr(t, "option_type", None), - getattr(t, "forward_date", None), - None, - t.pv, - 1.0, - None, - t.theta, - getattr(t, "corr01", None), - getattr(t, "IRDV01", None), - getattr(t, "vega", None), - None, - None, - None, - None, - t.hy_equiv, - ) - elif isinstance(t, BlackSwaption): - name = f"{t.index.index_type}{t.index.series} {t.index.tenor}" - r = ( - "Swaption", - name, - t.notional, - t.ref, - t.strike, - t.direction, - t.option_type, - t.forward_date, - t.sigma, - t.pv, - t.delta, - t.gamma, - t.theta, - getattr(t, "corr01", None), - getattr(t, "IRDV01", None), - t.vega, - None, - None, - None, - None, - t.hy_equiv, - ) - elif isinstance(t, DualCorrTranche): - name = f"{t.index_type}{t.series} {t.tenor}" - try: - theta = t.theta() - except ValueError: - theta = t.pv / t.notional / t.duration + t.tranche_running * 1e-4 - r = ( - "Tranche", - name, - t.notional, - None, - None, - t.direction, - None, - None, - None, - t.pv, - t.delta, - t.gamma, - theta, - getattr(t, "corr01", None), - getattr(t, "IRDV01", None), - None, - t.attach, - t.detach, - t.rho[0], - t.rho[1], - t.hy_equiv, - ) - else: - raise TypeError - rec.append(r) - if isinstance(self.trade_ids[0], tuple): - index = [tid[1] for tid in self.trade_ids] - else: - index = self.trade_ids - df = pd.DataFrame.from_records(rec, columns=headers, index=index) - df.index.name = "ids" - return df - - __repr__ = portf_repr("string") - - _repr_html_ = portf_repr("html") |
