aboutsummaryrefslogtreecommitdiffstats
path: root/python/pnl_explain.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pnl_explain.py')
-rw-r--r--python/pnl_explain.py399
1 files changed, 172 insertions, 227 deletions
diff --git a/python/pnl_explain.py b/python/pnl_explain.py
index 25af238e..d67bd536 100644
--- a/python/pnl_explain.py
+++ b/python/pnl_explain.py
@@ -1,243 +1,188 @@
-import numpy as np
+import datetime
import pandas as pd
+from analytics.utils import get_fx
+from dates import bus_day
+from psycopg2.errors import SyntaxError
+from psycopg2.extensions import connection
+from risk.swaptions import get_swaption_portfolio
+from risk.indices import get_index_portfolio
+from risk.tranches import get_tranche_portfolio
+from pyisda.date import previous_twentieth
+from typing import Tuple, Union
-from db import dbengine
-from dates import bus_day, imm_dates, yearfrac
-def get_daycount(identifier, engine=dbengine("dawndb")):
- """ retrieve daycount and paydelay for a given identifier"""
- conn = engine.raw_connection()
- with conn.cursor() as c:
- c.execute("SELECT day_count, pay_delay FROM securities WHERE identifier=%s",
- (identifier,))
- try:
- a, b = c.fetchone()
- except TypeError:
- conn.commit()
- return None, None
- conn.commit()
- return a, b
+def get_index_pv(
+ start_date: datetime.date,
+ end_date: datetime.date,
+ conn: connection,
+ strategies: Union[Tuple[str], None] = None,
+):
+ dr = pd.bdate_range(start_date, end_date, freq=bus_day)
+ pvs = []
+ daily = []
+ dates = []
-def pnl_explain(identifier, start_date = None, end_date = None,
- engine=dbengine("dawndb")):
- """ if start_date is None, pnl since inception"""
- trades = pd.read_sql_query("SELECT * FROM bonds where identifier=%s", engine,
- params=(identifier,), parse_dates=['trade_date', 'settle_date'])
- for key in ['faceamount', 'principal_payment', 'accrued_payment']:
- trades.loc[~trades.buysell, key] = -trades[key][~trades.buysell]
- if start_date is None:
- start_date = trades.trade_date.min()
-
- ## take care of multiple trades settling on the same date
- trades = (trades.
- groupby('settle_date')[['faceamount', 'principal_payment', 'accrued_payment']].
- sum())
- marks = pd.read_sql_query("SELECT * FROM marks where identifier=%s", engine,
- params=(identifier,), parse_dates = ['date'], index_col='date')
- factors = pd.read_sql_query("SELECT * FROM factors_history where identifier=%s", engine,
- params=(identifier,), parse_dates = ['last_pay_date', 'prev_cpn_date'])
- factors = factors.set_index('prev_cpn_date', drop=False)
- daycount, delay = get_daycount(identifier, engine)
-
- df = (marks[['price']].
- join([factors[['prev_cpn_date', 'coupon', 'factor']],
- trades[['principal_payment', 'accrued_payment', 'faceamount']]],
- how='outer'))
- factors = factors.set_index('last_pay_date')
- df = df.join(factors[['principal', 'losses', 'interest']], how='outer')
- df.sort_index(inplace=True)
-
- if end_date is None:
- end_date = pd.datetime.today()
- dates = pd.date_range(start_date, end_date, freq = bus_day)
- keys1 = ['price','factor', 'coupon', 'prev_cpn_date']
- df[keys1] = df[keys1].fillna(method='ffill')
+ for d in dr:
+ prev_day = (d - bus_day).date()
+ if previous_twentieth(d, roll=True) == d.date():
+ accrued = 0.0
+ for t in portf.trades:
+ _, amount = t._fee_leg.cashflows[0]
+ amount *= get_fx(d, t.currency)
+ accrued -= amount * t.notional * t.factor * t.fixed_rate * 1e-4
+ else:
+ accrued = 0.0
+ portf = get_index_portfolio(prev_day, conn, strategies)
+ nav = 0.0
+ with conn.cursor() as c:
+ try:
+ c.execute(
+ "SELECT upfront, currency FROM cds WHERE trade_date=%s "
+ "AND folder in %s",
+ (prev_day, strategies),
+ )
+ except SyntaxError as e:
+ conn.reset()
+ raise e
+ for (fee, curr) in c:
+ nav += fee * get_fx(d, curr)
+ daily.append(nav + accrued)
+ pvs.append(portf.pv)
+ dates.append(prev_day)
+ df = pd.DataFrame({"pv": pvs, "daily": daily}, index=pd.to_datetime(dates))
+ return df
- ## overwrite the factor to 1 in case of zero factor bond
- df['orig_factor'] = df['factor']
- if identifier.endswith('_A'):
- df.loc[df.price.notnull() & (df.factor==0), 'factor'] = 1
- keys2 = ['losses', 'principal','interest', 'faceamount','accrued_payment', 'principal_payment']
- df[keys2] = df[keys2].fillna(value=0)
- df.faceamount = df.faceamount.cumsum()
- keys = keys1 + ['faceamount', 'orig_factor']
- df1 = df.reindex(df.index.union(dates), keys, method='ffill')
- keys = ['losses', 'principal','interest', 'accrued_payment', 'principal_payment']
- df2 = df.reindex(df.index.union(dates), keys, fill_value=0)
- daily = pd.concat([df1, df2], axis = 1)
- daily = daily[(start_date-1):end_date]
- daily['unrealized_pnl'] = daily.price.diff() * daily.factor.shift()/100 * daily.faceamount
- daily['clean_nav'] = daily.price/100 * daily.factor * daily.faceamount
- ## realized pnl due to factor change
- daily['realized_pnl'] = daily.price/100 * daily.factor.diff() * daily.faceamount.shift()
- ## realized pnl due to principal payment
- if delay:
- daily['realized_pnl'] = (daily['realized_pnl'].
- add(daily.principal/100 * daily.faceamount.shift(delay, 'D'),
- fill_value=0))
- else:
- daily['realized_pnl'] = (daily['realized_pnl'].
- add(daily.principal/100 * daily.faceamount.shift(),
- fill_value=0))
- if delay:
- daily['realized_accrued'] = daily.interest/100 * daily.faceamount.shift(delay, 'D')
- else:
- daily['realized_accrued'] = daily.interest/100 * daily.faceamount.shift()
- daily['realized_accrued'] = daily['realized_accrued'].fillna(value=0)
- daily['accrued'] = yearfrac(daily.prev_cpn_date, daily.index.to_series(), daycount) * \
- daily.coupon/100 * daily.orig_factor * daily.faceamount
- daily['unrealized_accrued'] = daily.accrued.diff() + daily.realized_accrued
- cols = ['unrealized_pnl', 'realized_pnl', 'realized_accrued', 'clean_nav']
- daily[cols] = daily[cols].fillna(value=0)
- extra_pnl = daily.clean_nav.diff() - daily.principal_payment
- daily.loc[daily.principal_payment>0 , 'unrealized_pnl'] += extra_pnl[daily.principal_payment>0]
- daily.loc[daily.principal_payment<0, 'realized_pnl'] += extra_pnl[daily.principal_payment<0]
- daily['realized_accrued'] -= daily.accrued_payment
-
- return daily[['clean_nav', 'accrued', 'unrealized_pnl', 'realized_pnl', 'unrealized_accrued',
- 'realized_accrued']]
+def get_swaption_pv(
+ start_date: datetime.date, end_date: datetime.date, conn: connection, **kwargs
+):
+ dr = pd.bdate_range(start_date, end_date, freq=bus_day)
+ pv = []
+ daily = []
+ dates = []
+ for d in dr:
+ prev_day = (d - bus_day).date()
+ portf = get_swaption_portfolio(prev_day, conn, **kwargs)
+ nav = 0.0
+ # add terminations
+ with conn.cursor() as c:
+ c.execute(
+ "SELECT termination_fee "
+ "FROM terminations JOIN swaptions USING (dealid) "
+ "WHERE termination_date=%s AND dealid LIKE 'SWPTN%%' "
+ "AND folder !='STEEP'",
+ (prev_day,),
+ )
+ for (fee,) in c:
+ nav += fee
+ # add new trades
+ with conn.cursor() as c:
+ c.execute(
+ "SELECT notional * price/100 * (CASE WHEN buysell THEN -1. ELSE 1. END) "
+ "FROM swaptions WHERE trade_date=%s AND folder != 'STEEP'",
+ (prev_day,),
+ )
+ for (fee,) in c:
+ nav += fee
+ dates.append(prev_day)
+ pv.append(portf.pv)
+ daily.append(nav)
+ df = pd.DataFrame({"pv": pv, "daily": daily}, index=pd.to_datetime(dates))
+ return df
-def pnl_explain_list(id_list, start_date = None, end_date = None, engine = dbengine("dawndb")):
- return {(identifier, strategy): pnl_explain(identifier, start_date, end_date, engine)
- for identifier, strategy in id_list}
-def compute_tranche_factors(df, attach, detach):
- attach, detach = attach/100, detach/100
- df['indexrecovery'] = 1-df.indexfactor-df.cumulativeloss
- df = df.assign(tranche_loss = lambda x: (x.cumulativeloss-attach)/(detach-attach),
- tranche_recov = lambda x: (x.indexrecovery-(1-detach))/(detach-attach))
- df[['tranche_loss', 'tranche_recov']] = df[['tranche_loss', 'tranche_recov']].clip(lower=0, upper=1)
- df['tranche_factor'] = 1-df.tranche_loss - df.tranche_recov
+def get_tranche_pv(
+ start_date: datetime.date, end_date: datetime.date, conn: connection, **kwargs
+):
+ dr = pd.bdate_range(start_date, end_date, freq=bus_day)
+ pv = []
+ daily = []
+ dates = []
+ for d in dr:
+ prev_day = (d - bus_day).date()
+ portf = get_tranche_portfolio(prev_day, conn, **kwargs)
+ nav = 0.0
+ # add terminations
+ with conn.cursor() as c:
+ c.execute(
+ "SELECT termination_fee "
+ "FROM terminations JOIN cds USING (dealid) "
+ "WHERE termination_date=%s AND dealid LIKE 'SCCDS%%' ",
+ (prev_day,),
+ )
+ for (fee,) in c:
+ nav += fee
+ # add new trades
+ with conn.cursor() as c:
+ c.execute(
+ "SELECT upfront "
+ "FROM cds WHERE trade_date=%s AND swap_type='CD_INDEX_TRANCHE' "
+ "AND fund='SERCGMAST'",
+ (prev_day,),
+ )
+ for (fee,) in c:
+ nav += fee
+ dates.append(prev_day)
+ pv.append(portf.pv)
+ daily.append(nav)
+ df = pd.DataFrame({"pv": pv, "daily": daily}, index=pd.to_datetime(dates))
return df
-def cds_explain(index, series, tenor, attach = np.nan, detach = np.nan,
- start_date = None, end_date = None, engine = dbengine('serenitasdb')):
- cds_trade = np.isnan(attach) or np.isnan(detach)
- if cds_trade:
- quotes = pd.read_sql_query("SELECT date, (100-closeprice)/100 AS upfront " \
- "FROM index_quotes WHERE index=%s AND series=%s " \
- "AND tenor=%s ORDER BY date",
- engine, parse_dates=['date'],
- index_col='date', params = (index, series, tenor))
- else:
- #we take the latest version available
- sqlstring = "SELECT DISTINCT ON (quotedate) quotedate, upfront_mid AS upfront, " \
- "tranche_spread FROM markit_tranche_quotes " \
- "JOIN index_version USING (basketid) WHERE index=%s AND series=%s" \
- "AND tenor=%s AND attach=%s AND detach=%s ORDER by quotedate, version desc"
- quotes = pd.read_sql_query(sqlstring, engine, parse_dates=['quotedate'],
- index_col='quotedate',
- params = (index, series, tenor, int(attach), int(detach)))
- sqlstring = "SELECT indexfactor/100 AS indexfactor, coupon, " \
- "cumulativeloss/100 AS cumulativeloss, lastdate " \
- "FROM index_desc WHERE index=%s AND series=%s AND tenor=%s " \
- "ORDER BY lastdate"
- factors = pd.read_sql_query(sqlstring, engine, parse_dates=['lastdate'],
- params = (index, series, tenor))
- if start_date is None:
- start_date = quotes.index.min()
- if end_date is None:
- end_date = pd.datetime.today()
+if __name__ == "__main__":
+ import argparse
+ from utils.db import dbconn
- if not cds_trade:
- coupon = quotes.tranche_spread.iat[0]/10000
- factors = compute_tranche_factors(factors, attach, detach)
- factors['factor'] = factors.tranche_factor
- factors['recovery'] = factors.tranche_recov
+ dawndb = dbconn("dawndb")
+ parser = argparse.ArgumentParser()
+ parser.add_argument("start_date", type=datetime.datetime.fromisoformat)
+ parser.add_argument("end_date", type=datetime.datetime.fromisoformat)
+ parser.add_argument(
+ "-e",
+ "--external",
+ action="store_true",
+ default=False,
+ dest="use_external",
+ help="use brokers' marks",
+ )
+ parser.add_argument(
+ "-s",
+ "--source",
+ action="append",
+ default=[],
+ dest="source_list",
+ help="quote source",
+ )
+ parser.add_argument(
+ "-t",
+ "--pnl-type",
+ action="store",
+ default="tranche",
+ dest="pnl_type",
+ help="instrument for which we want the pnl (one of 'tranche' or 'swaption')",
+ )
+ args = parser.parse_args()
+ swaption_strats = ("IGTOPTDEL", "HYOPTDEL")
+ tranche_strats = ("IGINX", "HYINX", "XOINX")
+ pnl_type = "tranche"
+ if args.pnl_type == "tranche":
+ index_strats = tranche_strats
else:
- coupon = factors.coupon.iat[0]/10000
- factors['factor'] = factors.indexfactor
- factors['recovery'] = 1-factors.indexfactor-factors.cumulativeloss
+ index_strats = swaption_strats
+ df_index = get_index_pv(args.start_date, args.end_date, dawndb, tranche_strats)
- dates = pd.date_range(start_date, end_date, freq = bus_day)
- yearfrac = imm_dates(start_date, end_date)
- yearfrac = yearfrac.to_series().reindex(dates, method='ffill')
- yearfrac = yearfrac.index-yearfrac
- yearfrac = (yearfrac.dt.days+1)/360
- yearfrac.name = 'yearfrac'
- quotes = quotes.reindex(dates, method='ffill')
-
- if factors.shape[0]==1 or dates[-1] > max(factors.lastdate):
- factors.lastdate.iat[-1] = dates[-1]
+ if args.pnl_type == "tranche":
+ df_instrument = get_tranche_pv(conn=dawndb, **vars(args))
else:
- factors = factors.iloc[:-1]
- try:
- factors = (factors.set_index('lastdate', verify_integrity=True).
- reindex(dates, ['factor', 'recovery'], method='bfill'))
- except ValueError:
- pdb.set_trace()
- factors.recovery = factors.recovery.diff()
- df = quotes.join([factors[['factor', 'recovery']], yearfrac])
- #df.loc[df.factor.isnull(), 'factor'] = factors.factor.iat[-1]
- df['clean_nav'] = df.upfront * df.factor
- df['accrued'] = - df.yearfrac * coupon*df.factor
- df['unrealized_accrued'] = df.accrued.diff()
- df['realized_accrued'] = -df.unrealized_accrued.where(df.unrealized_accrued.isnull() |
- (df.unrealized_accrued>0), 0)
- df['unrealized_accrued'] = df.unrealized_accrued.where(df.unrealized_accrued.isnull()|
- (df.unrealized_accrued<0), df.accrued)
- df.loc[df.realized_accrued>0, 'realized_accrued'] += df.loc[df.realized_accrued>0, 'unrealized_accrued']
- df['unrealized_pnl'] = df.upfront.diff() * df.factor.shift()
- df['realized_pnl'] = df.upfront/100*df.factor.diff()+df.recovery
- return df[['clean_nav', 'accrued', 'unrealized_accrued', 'realized_accrued',
- 'unrealized_pnl', 'realized_pnl']]
-
-def cds_explain_strat(strat, start_date, end_date, engine = dbengine("dawndb")):
- if not pd.core.common.is_list_like(strat):
- strat = [strat]
- cds_positions = pd.read_sql_table("orig_cds", engine,
- parse_dates = ['trade_date', 'upfront_settle_date'],
- index_col='dealid')
- cds_positions = cds_positions.ix[cds_positions.folder.isin(strat) &
- (end_date is None or \
- cds_positions.upfront_settle_date<=pd.Timestamp(end_date))]
- cds_positions.loc[cds_positions.protection=='Seller', "notional"] *= -1
- df = {}
- for r in cds_positions.itertuples():
- key = (r.index, r.series, r.tenor)
- if start_date is not None:
- start_date = max(r.trade_date, pd.Timestamp(start_date))
- else:
- start_date = r.trade_date
- trade_df = cds_explain(r.index, r.series, r.tenor, r.attach, r.detach,
- start_date, end_date, engine)
- trade_df = r.notional * trade_df
- if start_date is None or (start_date <= r.trade_date):
- trade_df.realized_accrued.iat[3] -= trade_df.accrued.iat[0]
- extra_pnl = trade_df.clean_nav.iat[0] + trade_df.accrued.iat[0] + r.upfront
- trade_df.unrealized_pnl.iat[0] = extra_pnl
- trade_df.loc[:3, 'unrealized_accrued'] = 0
- df[key] = trade_df.add(df.get(key, 0), fill_value=0)
- return pd.concat(df)
+ df_instrument = get_swaption_pv(conn=dawndb, **vars(args))
-if __name__=="__main__":
- engine = dbengine("dawndb")
- from position import get_list_range
- ## CLO
- # clo_list = get_list_range(engine, '2015-01-01', '2015-12-31', 'CLO')
- # df = pnl_explain_list(clo_list.identifier.tolist(), '2015-01-01', '2015-12-31', engine)
- # df = pd.concat(df)
- # df_agg = df.groupby(level=1).sum()
- ## subprime
- subprime_list = get_list_range(engine, '2015-09-30', '2015-10-31', 'Subprime')
- df_subprime = pnl_explain_list(subprime_list[['identifier', 'strategy']].to_records(index=False),
- '2015-09-30', '2015-10-31', engine)
- df_subprime = pd.concat(df_subprime, names=['identifier', 'strategy', 'date'])
- # monthly_pnl = (df_subprime.reset_index('strategy', drop=True).
- # reset_index('identifier').
- # groupby('identifier').
- # resample('M', how='sum'))
- # ## daily pnl by strategy
- #df_agg = df_subprime.groupby(level=['date', 'strategy']).sum()
- # ## monthly pnl by strategy
- # df_monthly = df_agg.reset_index('strategy').groupby('strategy').resample('M', 'sum')
- # df_monthly = df_monthly.swaplevel('strategy', 'date').sort_index()
- # monthly_pnl = df_monthly.groupby(level='date')[['unrealized_accrued', 'unrealized_pnl', 'realized_pnl']].sum().sum(axis=1)
- # # df_agg[['realized_accrued','unrealized_accrued',
- # # 'realized_pnl', 'unrealized_pnl']].sum(axis=1).cumsum().plot(x_compat=True)
- # # cds_df = cds_explain_strat(['SER_IGINX', 'SER_IGMEZ'], None, None, engine)
- # #cds_df = cds_explain_strat(['SER_HYMEZ'], None, '2015-03-10', engine)
- # #cds_df2 = cds_explain_strat('SER_IGCURVE', None, None, engine)
- # #cds_df = cds_explain('HY', 21, '5yr', 25, 35, '2014-07-18')
+ pnl_index = df_index.pv.diff() + df_index.daily
+ pnl_instrument = df_instrument.pv.diff() + df_instrument.daily
+ pnl = pd.concat([pnl_index, pnl_instrument], keys=["index", pnl_type], axis=1)
+ print(
+ pd.concat(
+ [pnl.sum(axis=1), pnl.sum(axis=1).cumsum()],
+ axis=1,
+ keys=["daily", "cumulative"],
+ )
+ )