from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP import datetime import numpy as np import pandas as pd import psycopg2 from pandas.tseries.offsets import BDay from pandas import bdate_range from sqlalchemy import create_engine import re import os import logging import sys def get_list(engine, workdate=None, asset_class=None, include_unsettled=True): if workdate: positions = pd.read_sql_query("select identifier, bbg_type from list_positions(%s, %s, %s)", engine, params=(workdate.date(), asset_class, include_unsettled)) positions.loc[positions.identifier.str.len() <= 11, 'cusip'] = positions.identifier.str.slice(stop=9) positions.loc[positions.identifier.str.len() == 12, 'isin'] = positions.identifier else: positions = pd.read_sql_table("securities", engine) positions['bbg_id'] = positions.cusip.where(positions.cusip.notnull(), positions['isin']) + \ ' ' + positions.bbg_type positions.set_index('bbg_id', inplace=True) return positions def get_list_range(engine, begin, end, asset_class=None): begin = pd.Timestamp(begin).date() end = pd.Timestamp(end).date() positions = pd.read_sql_query("select identifier, bbg_type, strategy from list_positions_range(%s, %s, %s)", engine, params=(begin, end, asset_class)) positions.loc[positions.identifier.str.len() <= 11, 'cusip'] = positions.identifier.str.slice(stop=9) positions.loc[positions.identifier.str.len() == 12, 'isin'] = positions.identifier positions['bbg_id'] = positions.cusip.where(positions.cusip.notnull(), positions['isin']) + \ ' ' + positions.bbg_type positions.set_index('bbg_id', inplace=True) return positions def backpopulate_marks(begin_str='2015-01-15', end_str='2015-07-15'): pattern = re.compile("\d{4}-\d{2}-\d{2}") list_of_daily_folder = (fullpath for (fullpath, _, _) in os.walk('/home/serenitas/Daily') if pattern.match(os.path.basename(fullpath))) list_of_bdays = bdate_range(start=begin_str, end=end_str) for path in list_of_daily_folder: date = pd.to_datetime(os.path.basename(path)) if date in list_of_bdays: marks_file = [f for f in os.listdir(path) if f.startswith("securitiesNpv")] if marks_file: marks_file.sort(key=lambda x:x[13:], reverse=True) #sort by lexicographic order which is what we want since we use ISO dates marks = pd.read_csv(os.path.join(path, marks_file[0])) positions = get_list(pd.to_datetime(date)) positions = positions.merge(marks, left_on='identifier', right_on='IDENTIFIER') positions.drop(['IDENTIFIER', 'last_settle_date'], axis=1, inplace=True) positions['date'] = date positions.rename(columns={'Price': 'price'}, inplace=True) positions = positions.drop_duplicates() positions.to_sql('position', engine, if_exists='append', index=False) def update_securities(engine, session, workdate): field = {'Corp': 'PREV_CPN_DT', 'Mtge': 'START_ACC_DT'} securities = get_list(engine) securities = securities[securities.paid_down.isnull()] data = retrieve_data(session, securities.index.tolist(), ['PREV_CPN_DT', 'START_ACC_DT', 'CUR_CPN', 'CPN_ASOF_DT']) data = pd.DataFrame.from_dict(data, orient='index') data = data[data.CPN_ASOF_DT.isnull() |(data.CPN_ASOF_DT<=workdate)] m = securities.merge(data, left_index=True, right_index=True) conn = engine.raw_connection() with conn.cursor() as c: for r in m.to_dict('records'): if r[field[r['bbg_type']]] < workdate: c.execute("UPDATE securities SET start_accrued_date=%({0})s " ",coupon=%(CUR_CPN)s WHERE identifier=%(identifier)s".format(field[r['bbg_type']]), r) conn.commit() conn.close() def init_fx(session, engine, startdate): currencies = ['EURUSD', 'CADUSD'] securities = [c + ' Curncy' for c in currencies] data = retrieve_data(session, securities, ['PX_LAST'], start_date=startdate) data = data['EURUSD Curncy'].merge(data['CADUSD Curncy'], left_on='date', right_on='date') data.rename(columns={'PX_LAST_x': 'eurusd', 'PX_LAST_y': 'cadusd'}, inplace=True) data.to_sql('fx', engine, index=False, if_exists='append') def update_fx(conn, session, currencies): securities = [c + ' Curncy' for c in currencies] data = retrieve_data(session, securities, ['FIXED_CLOSING_PRICE_NY', 'PX_CLOSE_DT']) colnames = ['date'] values = [] for k, v in data.items(): currency_pair = k.split(' ')[0].lower() colnames.append(currency_pair) values.append(v['FIXED_CLOSING_PRICE_NY']) values = [v['PX_CLOSE_DT']] + values sqlstr = 'INSERT INTO fx({0}) VALUES({1}) ON CONFLICT DO NOTHING'.format( ",".join(colnames), ",".join(["%s"]*len(values))) with conn.cursor() as c: c.execute(sqlstr, values) conn.commit() def init_swap_rates(conn, session, tenors=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 30], start_date=datetime.date(1998, 10, 7)): securities = [f"USISDA{t:02} Index" for t in tenors] data = retrieve_data(session, securities, ['PX_LAST'], start_date=datetime.date(1998, 10, 7)) for t in tenors: ticker = f"USISDA{t:02} Index" sql_str = f'INSERT INTO USD_swap_fixings(fixing_date, "{t}y") ' + \ 'VALUES(%s, %s) ON CONFLICT (fixing_date)' + \ f' DO UPDATE SET "{t}y" = %s' with conn.cursor() as c: c.executemany(sql_str, [(d, r, r) for d, r in data[ticker]['PX_LAST'].items()]) conn.commit() def init_swaption_vol(session, tenors=['A', 'C', 'F', 'I'] + list(range(1, 11)) + [15, 20, 25, 30], source='BBIR', vol_type='N', start_date=datetime.date(1990, 1, 1)): tickers = [] for t1 in tenors: for t2 in tenors[4:]: tickers.append(f"USS{vol_type}{t1:0>2}{t2} {source} Curncy") data = retrieve_data(session, tickers, ['PX_LAST'], start_date=start_date) return data def split_tenor_expiry(ticker, vol_type='N'): m = re.match(f"USS{vol_type}(.{{2}})([^\s]*) ([^\s]*) Curncy", ticker) expiry, tenor, _ = m.groups() if expiry[0] == '0': expiry = expiry[1:] if not expiry.isalpha(): expiry = int(expiry) tenor = int(tenor) return expiry, tenor def insert_swaption_vol(data, conn, source, vol_type="N"): tenors = ['A', 'C', 'F', 'I'] + list(range(1, 11)) + [15, 20, 25, 30] df = pd.concat(data, axis=1) df.columns = df.columns.get_level_values(0) df.columns = pd.MultiIndex.from_tuples([split_tenor_expiry(c, vol_type) for c in df.columns]) table_name = "swaption_normal_vol" if vol_type == "N" else "swaption_lognormal_vol" for t in tenors[-14:]: sql_str = f'INSERT INTO {table_name}(date, "{t}y", source) ' + \ 'VALUES(%s, %s, %s) ON CONFLICT (date, source)' + \ f' DO UPDATE SET "{t}y" = %s, source = %s' with conn.cursor() as c: df_temp = df.xs(t, axis=1, level=1).reindex(tenors, axis=1) for k, v in df_temp.iterrows(): if np.all(np.isnan(v.values)): continue c.execute(sql_str, (k, v.tolist(), source, v.tolist(), source)) conn.commit() def update_swaption_vol(conn, session, tenors=['A', 'C', 'F', 'I'] + list(range(1, 11)) + [15, 20, 25, 30], *, sources=['BBIR', 'CMPN', 'ICPL'], vol_type="N"): """ Parameters ---------- vol_type : one of 'N' or 'V' (normal or log-normal) """ table_name = "swaption_normal_vol" if vol_type == "N" else "swaption_lognormal_vol" for source in ['BBIR', 'CMPN', 'ICPL']: tickers = [] for expiry in tenors: for tenor in tenors: tickers.append(f"USS{vol_type}{expiry:0>2}{tenor} {source} Curncy") data = retrieve_data(session, tickers, ['PX_YEST_CLOSE', 'PX_CLOSE_DT']) for t in tenors[4:]: sql_str = f'INSERT INTO {table_name}(date, "{t}y", source) ' + \ 'VALUES(%s, %s, %s) ON CONFLICT (date, source)' + \ f' DO UPDATE SET "{t}y" = %s, source = %s' r = [] dates = [] for expiry in tenors: ticker = f"USS{vol_type}{expiry:0>2}{t} {source} Curncy" if data[ticker]: r.append(data[ticker]['PX_YEST_CLOSE']) dates.append(data[ticker]['PX_CLOSE_DT']) else: r.append(None) dates.append(dates[-1]) if dates.count(dates[0]) < len(dates): raise ValueError('Not all quotes are from the same date') with conn.cursor() as c: c.execute(sql_str, (dates[0], r, source, r, source)) conn.commit() def update_swap_rates(conn, session, tenors=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 30]): securities = [f"USISDA{t:02} Index" for t in tenors] data = retrieve_data(session, securities, ['PX_LAST', 'LAST_UPDATE_DT']) for t in tenors: ticker = f"USISDA{t:02} Index" sql_str = f'INSERT INTO USD_swap_fixings(fixing_date, "{t}y") ' + \ 'VALUES(%(LAST_UPDATE_DT)s, %(PX_LAST)s) ON CONFLICT (fixing_date)' + \ f' DO UPDATE SET "{t}y" = %(PX_LAST)s' with conn.cursor() as c: c.execute(sql_str, data[ticker]) conn.commit() def populate_cashflow_history(engine, session, workdate=None): securities = get_list(engine, workdate) data = retrieve_data(session, securities.index.tolist(), ['HIST_CASH_FLOW', 'MTG_HIST_CPN', 'FLT_CPN_HIST', 'HIST_INTEREST_DISTRIBUTED']) fixed_coupons = {'XS0306416982 Mtge': 7.62, '91927RAD1 Mtge': 6.77} conn = engine.raw_connection() for k, v in data.items(): if 'HIST_CASH_FLOW' in v: to_insert = v['HIST_CASH_FLOW'].merge(v['MTG_HIST_CPN'], how='left', left_on='Payment Date', right_on='Payment Date') to_insert.rename(columns={'Coupon_y': 'coupon', 'Interest': 'interest', 'Payment Date': 'date', 'Principal Balance': 'principal_bal', 'Principal Paid': 'principal'}, inplace=True) to_insert.drop(['Period Number', 'Coupon_x'], axis=1, inplace=True) elif 'FLT_CPN_HIST' in v: to_insert = v['FLT_CPN_HIST'] to_insert.rename(columns={'Coupon Rate': 'coupon', 'Accrual Start Date': 'date'}, inplace=True) to_insert.coupon = to_insert.coupon.shift(1) elif 'HIST_INTEREST_DISTRIBUTED' in v: to_insert = v['HIST_INTEREST_DISTRIBUTED'] to_insert.rename(columns={'Interest': 'interest', 'Historical Date': 'date'}, inplace=True) if k in fixed_coupons: to_insert['coupon'] = fixed_coupons[k] else: #damn you XS0299146992 ! continue else: logging.error("No cashflows for the given security") identifier = securities.loc[k,'identifier'] to_insert['identifier'] = identifier with conn.cursor() as c: c.execute("DELETE FROM cashflow_history WHERE identifier=%s", (identifier,)) conn.commit() to_insert.to_sql('cashflow_history', engine, if_exists='append', index=False) with conn.cursor() as c: c.execute("REFRESH MATERIALIZED VIEW factors_history") conn.commit() conn.close() if __name__=="__main__": serenitas_conn = psycopg2.connect(database="serenitasdb", user="serenitas_user", host="debian") dawn_engine = create_engine('postgresql://dawn_user@debian/dawndb') dawn_conn = dawn_engine.raw_connection() if len(sys.argv) > 1: workdate = pd.Timestamp(sys.argv[1]) else: workdate = pd.datetime.today() with init_bbg_session(BBG_IP) as session: update_securities(dawn_engine, session, workdate) populate_cashflow_history(dawn_engine, session, workdate) update_fx(dawn_conn, session, ['EURUSD', 'CADUSD']) update_swap_rates(serenitas_conn, session) for vol_type in ["N", "V"]: update_swaption_vol(serenitas_conn, session, vol_type=vol_type) # with init_bbg_session(BBG_IP) as session: # init_fx(session, engine, pd.datetime(2013, 1, 1)) # with init_bbg_session(BBG_IP) as session: # init_swap_rates(serenitas_conn, session, start_date=pd.datetime(2012, 2, 2)) # for source in ['BBIR', 'ICPL', 'CMPN']: # for vol_type in ["N", "V"]: # with init_bbg_session(BBG_IP) as session: # data = init_swaption_vol(session, source=source, # vol_type=vol_type, # start_date=datetime.date(2001, 1, 1)) # insert_swaption_vol(data, serenitas_conn, source, # vol_type=vol_type)