diff options
Diffstat (limited to 'python/pnl_explain.py')
| -rw-r--r-- | python/pnl_explain.py | 399 |
1 files changed, 172 insertions, 227 deletions
diff --git a/python/pnl_explain.py b/python/pnl_explain.py index 25af238e..d67bd536 100644 --- a/python/pnl_explain.py +++ b/python/pnl_explain.py @@ -1,243 +1,188 @@ -import numpy as np +import datetime import pandas as pd +from analytics.utils import get_fx +from dates import bus_day +from psycopg2.errors import SyntaxError +from psycopg2.extensions import connection +from risk.swaptions import get_swaption_portfolio +from risk.indices import get_index_portfolio +from risk.tranches import get_tranche_portfolio +from pyisda.date import previous_twentieth +from typing import Tuple, Union -from db import dbengine -from dates import bus_day, imm_dates, yearfrac -def get_daycount(identifier, engine=dbengine("dawndb")): - """ retrieve daycount and paydelay for a given identifier""" - conn = engine.raw_connection() - with conn.cursor() as c: - c.execute("SELECT day_count, pay_delay FROM securities WHERE identifier=%s", - (identifier,)) - try: - a, b = c.fetchone() - except TypeError: - conn.commit() - return None, None - conn.commit() - return a, b +def get_index_pv( + start_date: datetime.date, + end_date: datetime.date, + conn: connection, + strategies: Union[Tuple[str], None] = None, +): + dr = pd.bdate_range(start_date, end_date, freq=bus_day) + pvs = [] + daily = [] + dates = [] -def pnl_explain(identifier, start_date = None, end_date = None, - engine=dbengine("dawndb")): - """ if start_date is None, pnl since inception""" - trades = pd.read_sql_query("SELECT * FROM bonds where identifier=%s", engine, - params=(identifier,), parse_dates=['trade_date', 'settle_date']) - for key in ['faceamount', 'principal_payment', 'accrued_payment']: - trades.loc[~trades.buysell, key] = -trades[key][~trades.buysell] - if start_date is None: - start_date = trades.trade_date.min() - - ## take care of multiple trades settling on the same date - trades = (trades. - groupby('settle_date')[['faceamount', 'principal_payment', 'accrued_payment']]. - sum()) - marks = pd.read_sql_query("SELECT * FROM marks where identifier=%s", engine, - params=(identifier,), parse_dates = ['date'], index_col='date') - factors = pd.read_sql_query("SELECT * FROM factors_history where identifier=%s", engine, - params=(identifier,), parse_dates = ['last_pay_date', 'prev_cpn_date']) - factors = factors.set_index('prev_cpn_date', drop=False) - daycount, delay = get_daycount(identifier, engine) - - df = (marks[['price']]. - join([factors[['prev_cpn_date', 'coupon', 'factor']], - trades[['principal_payment', 'accrued_payment', 'faceamount']]], - how='outer')) - factors = factors.set_index('last_pay_date') - df = df.join(factors[['principal', 'losses', 'interest']], how='outer') - df.sort_index(inplace=True) - - if end_date is None: - end_date = pd.datetime.today() - dates = pd.date_range(start_date, end_date, freq = bus_day) - keys1 = ['price','factor', 'coupon', 'prev_cpn_date'] - df[keys1] = df[keys1].fillna(method='ffill') + for d in dr: + prev_day = (d - bus_day).date() + if previous_twentieth(d, roll=True) == d.date(): + accrued = 0.0 + for t in portf.trades: + _, amount = t._fee_leg.cashflows[0] + amount *= get_fx(d, t.currency) + accrued -= amount * t.notional * t.factor * t.fixed_rate * 1e-4 + else: + accrued = 0.0 + portf = get_index_portfolio(prev_day, conn, strategies) + nav = 0.0 + with conn.cursor() as c: + try: + c.execute( + "SELECT upfront, currency FROM cds WHERE trade_date=%s " + "AND folder in %s", + (prev_day, strategies), + ) + except SyntaxError as e: + conn.reset() + raise e + for (fee, curr) in c: + nav += fee * get_fx(d, curr) + daily.append(nav + accrued) + pvs.append(portf.pv) + dates.append(prev_day) + df = pd.DataFrame({"pv": pvs, "daily": daily}, index=pd.to_datetime(dates)) + return df - ## overwrite the factor to 1 in case of zero factor bond - df['orig_factor'] = df['factor'] - if identifier.endswith('_A'): - df.loc[df.price.notnull() & (df.factor==0), 'factor'] = 1 - keys2 = ['losses', 'principal','interest', 'faceamount','accrued_payment', 'principal_payment'] - df[keys2] = df[keys2].fillna(value=0) - df.faceamount = df.faceamount.cumsum() - keys = keys1 + ['faceamount', 'orig_factor'] - df1 = df.reindex(df.index.union(dates), keys, method='ffill') - keys = ['losses', 'principal','interest', 'accrued_payment', 'principal_payment'] - df2 = df.reindex(df.index.union(dates), keys, fill_value=0) - daily = pd.concat([df1, df2], axis = 1) - daily = daily[(start_date-1):end_date] - daily['unrealized_pnl'] = daily.price.diff() * daily.factor.shift()/100 * daily.faceamount - daily['clean_nav'] = daily.price/100 * daily.factor * daily.faceamount - ## realized pnl due to factor change - daily['realized_pnl'] = daily.price/100 * daily.factor.diff() * daily.faceamount.shift() - ## realized pnl due to principal payment - if delay: - daily['realized_pnl'] = (daily['realized_pnl']. - add(daily.principal/100 * daily.faceamount.shift(delay, 'D'), - fill_value=0)) - else: - daily['realized_pnl'] = (daily['realized_pnl']. - add(daily.principal/100 * daily.faceamount.shift(), - fill_value=0)) - if delay: - daily['realized_accrued'] = daily.interest/100 * daily.faceamount.shift(delay, 'D') - else: - daily['realized_accrued'] = daily.interest/100 * daily.faceamount.shift() - daily['realized_accrued'] = daily['realized_accrued'].fillna(value=0) - daily['accrued'] = yearfrac(daily.prev_cpn_date, daily.index.to_series(), daycount) * \ - daily.coupon/100 * daily.orig_factor * daily.faceamount - daily['unrealized_accrued'] = daily.accrued.diff() + daily.realized_accrued - cols = ['unrealized_pnl', 'realized_pnl', 'realized_accrued', 'clean_nav'] - daily[cols] = daily[cols].fillna(value=0) - extra_pnl = daily.clean_nav.diff() - daily.principal_payment - daily.loc[daily.principal_payment>0 , 'unrealized_pnl'] += extra_pnl[daily.principal_payment>0] - daily.loc[daily.principal_payment<0, 'realized_pnl'] += extra_pnl[daily.principal_payment<0] - daily['realized_accrued'] -= daily.accrued_payment - - return daily[['clean_nav', 'accrued', 'unrealized_pnl', 'realized_pnl', 'unrealized_accrued', - 'realized_accrued']] +def get_swaption_pv( + start_date: datetime.date, end_date: datetime.date, conn: connection, **kwargs +): + dr = pd.bdate_range(start_date, end_date, freq=bus_day) + pv = [] + daily = [] + dates = [] + for d in dr: + prev_day = (d - bus_day).date() + portf = get_swaption_portfolio(prev_day, conn, **kwargs) + nav = 0.0 + # add terminations + with conn.cursor() as c: + c.execute( + "SELECT termination_fee " + "FROM terminations JOIN swaptions USING (dealid) " + "WHERE termination_date=%s AND dealid LIKE 'SWPTN%%' " + "AND folder !='STEEP'", + (prev_day,), + ) + for (fee,) in c: + nav += fee + # add new trades + with conn.cursor() as c: + c.execute( + "SELECT notional * price/100 * (CASE WHEN buysell THEN -1. ELSE 1. END) " + "FROM swaptions WHERE trade_date=%s AND folder != 'STEEP'", + (prev_day,), + ) + for (fee,) in c: + nav += fee + dates.append(prev_day) + pv.append(portf.pv) + daily.append(nav) + df = pd.DataFrame({"pv": pv, "daily": daily}, index=pd.to_datetime(dates)) + return df -def pnl_explain_list(id_list, start_date = None, end_date = None, engine = dbengine("dawndb")): - return {(identifier, strategy): pnl_explain(identifier, start_date, end_date, engine) - for identifier, strategy in id_list} -def compute_tranche_factors(df, attach, detach): - attach, detach = attach/100, detach/100 - df['indexrecovery'] = 1-df.indexfactor-df.cumulativeloss - df = df.assign(tranche_loss = lambda x: (x.cumulativeloss-attach)/(detach-attach), - tranche_recov = lambda x: (x.indexrecovery-(1-detach))/(detach-attach)) - df[['tranche_loss', 'tranche_recov']] = df[['tranche_loss', 'tranche_recov']].clip(lower=0, upper=1) - df['tranche_factor'] = 1-df.tranche_loss - df.tranche_recov +def get_tranche_pv( + start_date: datetime.date, end_date: datetime.date, conn: connection, **kwargs +): + dr = pd.bdate_range(start_date, end_date, freq=bus_day) + pv = [] + daily = [] + dates = [] + for d in dr: + prev_day = (d - bus_day).date() + portf = get_tranche_portfolio(prev_day, conn, **kwargs) + nav = 0.0 + # add terminations + with conn.cursor() as c: + c.execute( + "SELECT termination_fee " + "FROM terminations JOIN cds USING (dealid) " + "WHERE termination_date=%s AND dealid LIKE 'SCCDS%%' ", + (prev_day,), + ) + for (fee,) in c: + nav += fee + # add new trades + with conn.cursor() as c: + c.execute( + "SELECT upfront " + "FROM cds WHERE trade_date=%s AND swap_type='CD_INDEX_TRANCHE' " + "AND fund='SERCGMAST'", + (prev_day,), + ) + for (fee,) in c: + nav += fee + dates.append(prev_day) + pv.append(portf.pv) + daily.append(nav) + df = pd.DataFrame({"pv": pv, "daily": daily}, index=pd.to_datetime(dates)) return df -def cds_explain(index, series, tenor, attach = np.nan, detach = np.nan, - start_date = None, end_date = None, engine = dbengine('serenitasdb')): - cds_trade = np.isnan(attach) or np.isnan(detach) - if cds_trade: - quotes = pd.read_sql_query("SELECT date, (100-closeprice)/100 AS upfront " \ - "FROM index_quotes WHERE index=%s AND series=%s " \ - "AND tenor=%s ORDER BY date", - engine, parse_dates=['date'], - index_col='date', params = (index, series, tenor)) - else: - #we take the latest version available - sqlstring = "SELECT DISTINCT ON (quotedate) quotedate, upfront_mid AS upfront, " \ - "tranche_spread FROM markit_tranche_quotes " \ - "JOIN index_version USING (basketid) WHERE index=%s AND series=%s" \ - "AND tenor=%s AND attach=%s AND detach=%s ORDER by quotedate, version desc" - quotes = pd.read_sql_query(sqlstring, engine, parse_dates=['quotedate'], - index_col='quotedate', - params = (index, series, tenor, int(attach), int(detach))) - sqlstring = "SELECT indexfactor/100 AS indexfactor, coupon, " \ - "cumulativeloss/100 AS cumulativeloss, lastdate " \ - "FROM index_desc WHERE index=%s AND series=%s AND tenor=%s " \ - "ORDER BY lastdate" - factors = pd.read_sql_query(sqlstring, engine, parse_dates=['lastdate'], - params = (index, series, tenor)) - if start_date is None: - start_date = quotes.index.min() - if end_date is None: - end_date = pd.datetime.today() +if __name__ == "__main__": + import argparse + from utils.db import dbconn - if not cds_trade: - coupon = quotes.tranche_spread.iat[0]/10000 - factors = compute_tranche_factors(factors, attach, detach) - factors['factor'] = factors.tranche_factor - factors['recovery'] = factors.tranche_recov + dawndb = dbconn("dawndb") + parser = argparse.ArgumentParser() + parser.add_argument("start_date", type=datetime.datetime.fromisoformat) + parser.add_argument("end_date", type=datetime.datetime.fromisoformat) + parser.add_argument( + "-e", + "--external", + action="store_true", + default=False, + dest="use_external", + help="use brokers' marks", + ) + parser.add_argument( + "-s", + "--source", + action="append", + default=[], + dest="source_list", + help="quote source", + ) + parser.add_argument( + "-t", + "--pnl-type", + action="store", + default="tranche", + dest="pnl_type", + help="instrument for which we want the pnl (one of 'tranche' or 'swaption')", + ) + args = parser.parse_args() + swaption_strats = ("IGTOPTDEL", "HYOPTDEL") + tranche_strats = ("IGINX", "HYINX", "XOINX") + pnl_type = "tranche" + if args.pnl_type == "tranche": + index_strats = tranche_strats else: - coupon = factors.coupon.iat[0]/10000 - factors['factor'] = factors.indexfactor - factors['recovery'] = 1-factors.indexfactor-factors.cumulativeloss + index_strats = swaption_strats + df_index = get_index_pv(args.start_date, args.end_date, dawndb, tranche_strats) - dates = pd.date_range(start_date, end_date, freq = bus_day) - yearfrac = imm_dates(start_date, end_date) - yearfrac = yearfrac.to_series().reindex(dates, method='ffill') - yearfrac = yearfrac.index-yearfrac - yearfrac = (yearfrac.dt.days+1)/360 - yearfrac.name = 'yearfrac' - quotes = quotes.reindex(dates, method='ffill') - - if factors.shape[0]==1 or dates[-1] > max(factors.lastdate): - factors.lastdate.iat[-1] = dates[-1] + if args.pnl_type == "tranche": + df_instrument = get_tranche_pv(conn=dawndb, **vars(args)) else: - factors = factors.iloc[:-1] - try: - factors = (factors.set_index('lastdate', verify_integrity=True). - reindex(dates, ['factor', 'recovery'], method='bfill')) - except ValueError: - pdb.set_trace() - factors.recovery = factors.recovery.diff() - df = quotes.join([factors[['factor', 'recovery']], yearfrac]) - #df.loc[df.factor.isnull(), 'factor'] = factors.factor.iat[-1] - df['clean_nav'] = df.upfront * df.factor - df['accrued'] = - df.yearfrac * coupon*df.factor - df['unrealized_accrued'] = df.accrued.diff() - df['realized_accrued'] = -df.unrealized_accrued.where(df.unrealized_accrued.isnull() | - (df.unrealized_accrued>0), 0) - df['unrealized_accrued'] = df.unrealized_accrued.where(df.unrealized_accrued.isnull()| - (df.unrealized_accrued<0), df.accrued) - df.loc[df.realized_accrued>0, 'realized_accrued'] += df.loc[df.realized_accrued>0, 'unrealized_accrued'] - df['unrealized_pnl'] = df.upfront.diff() * df.factor.shift() - df['realized_pnl'] = df.upfront/100*df.factor.diff()+df.recovery - return df[['clean_nav', 'accrued', 'unrealized_accrued', 'realized_accrued', - 'unrealized_pnl', 'realized_pnl']] - -def cds_explain_strat(strat, start_date, end_date, engine = dbengine("dawndb")): - if not pd.core.common.is_list_like(strat): - strat = [strat] - cds_positions = pd.read_sql_table("orig_cds", engine, - parse_dates = ['trade_date', 'upfront_settle_date'], - index_col='dealid') - cds_positions = cds_positions.ix[cds_positions.folder.isin(strat) & - (end_date is None or \ - cds_positions.upfront_settle_date<=pd.Timestamp(end_date))] - cds_positions.loc[cds_positions.protection=='Seller', "notional"] *= -1 - df = {} - for r in cds_positions.itertuples(): - key = (r.index, r.series, r.tenor) - if start_date is not None: - start_date = max(r.trade_date, pd.Timestamp(start_date)) - else: - start_date = r.trade_date - trade_df = cds_explain(r.index, r.series, r.tenor, r.attach, r.detach, - start_date, end_date, engine) - trade_df = r.notional * trade_df - if start_date is None or (start_date <= r.trade_date): - trade_df.realized_accrued.iat[3] -= trade_df.accrued.iat[0] - extra_pnl = trade_df.clean_nav.iat[0] + trade_df.accrued.iat[0] + r.upfront - trade_df.unrealized_pnl.iat[0] = extra_pnl - trade_df.loc[:3, 'unrealized_accrued'] = 0 - df[key] = trade_df.add(df.get(key, 0), fill_value=0) - return pd.concat(df) + df_instrument = get_swaption_pv(conn=dawndb, **vars(args)) -if __name__=="__main__": - engine = dbengine("dawndb") - from position import get_list_range - ## CLO - # clo_list = get_list_range(engine, '2015-01-01', '2015-12-31', 'CLO') - # df = pnl_explain_list(clo_list.identifier.tolist(), '2015-01-01', '2015-12-31', engine) - # df = pd.concat(df) - # df_agg = df.groupby(level=1).sum() - ## subprime - subprime_list = get_list_range(engine, '2015-09-30', '2015-10-31', 'Subprime') - df_subprime = pnl_explain_list(subprime_list[['identifier', 'strategy']].to_records(index=False), - '2015-09-30', '2015-10-31', engine) - df_subprime = pd.concat(df_subprime, names=['identifier', 'strategy', 'date']) - # monthly_pnl = (df_subprime.reset_index('strategy', drop=True). - # reset_index('identifier'). - # groupby('identifier'). - # resample('M', how='sum')) - # ## daily pnl by strategy - #df_agg = df_subprime.groupby(level=['date', 'strategy']).sum() - # ## monthly pnl by strategy - # df_monthly = df_agg.reset_index('strategy').groupby('strategy').resample('M', 'sum') - # df_monthly = df_monthly.swaplevel('strategy', 'date').sort_index() - # monthly_pnl = df_monthly.groupby(level='date')[['unrealized_accrued', 'unrealized_pnl', 'realized_pnl']].sum().sum(axis=1) - # # df_agg[['realized_accrued','unrealized_accrued', - # # 'realized_pnl', 'unrealized_pnl']].sum(axis=1).cumsum().plot(x_compat=True) - # # cds_df = cds_explain_strat(['SER_IGINX', 'SER_IGMEZ'], None, None, engine) - # #cds_df = cds_explain_strat(['SER_HYMEZ'], None, '2015-03-10', engine) - # #cds_df2 = cds_explain_strat('SER_IGCURVE', None, None, engine) - # #cds_df = cds_explain('HY', 21, '5yr', 25, 35, '2014-07-18') + pnl_index = df_index.pv.diff() + df_index.daily + pnl_instrument = df_instrument.pv.diff() + df_instrument.daily + pnl = pd.concat([pnl_index, pnl_instrument], keys=["index", pnl_type], axis=1) + print( + pd.concat( + [pnl.sum(axis=1), pnl.sum(axis=1).cumsum()], + axis=1, + keys=["daily", "cumulative"], + ) + ) |
