diff options
Diffstat (limited to 'python/position.py')
| -rw-r--r-- | python/position.py | 355 |
1 files changed, 217 insertions, 138 deletions
diff --git a/python/position.py b/python/position.py index 589b22c6..91286770 100644 --- a/python/position.py +++ b/python/position.py @@ -10,160 +10,208 @@ import os import logging import sys -def get_list(engine, workdate: datetime.datetime=None, asset_class=None, - include_unsettled: bool=True, - fund="SERCGMAST"): + +def get_list( + engine, + workdate: datetime.datetime = None, + asset_class=None, + include_unsettled: bool = True, + fund="SERCGMAST", +): if workdate: - positions = pd.read_sql_query("SELECT identifier, bbg_type FROM " - "list_positions(%s, %s, %s, %s)", - engine, - params=(workdate.date(), - asset_class, - include_unsettled, - fund)) - positions.loc[positions.identifier.str.len() <= 11, 'cusip'] = \ - positions.identifier.str.slice(stop=9) - positions.loc[positions.identifier.str.len() == 12, 'isin'] = \ - positions.identifier + positions = pd.read_sql_query( + "SELECT identifier, bbg_type FROM " "list_positions(%s, %s, %s, %s)", + engine, + params=(workdate.date(), asset_class, include_unsettled, fund), + ) + positions.loc[ + positions.identifier.str.len() <= 11, "cusip" + ] = positions.identifier.str.slice(stop=9) + positions.loc[ + positions.identifier.str.len() == 12, "isin" + ] = positions.identifier else: positions = pd.read_sql_table("securities", engine) - positions['bbg_id'] = positions.cusip.where(positions.cusip.notnull(), positions['isin']) + \ - ' ' + positions.bbg_type - positions.set_index('bbg_id', inplace=True) + positions["bbg_id"] = ( + positions.cusip.where(positions.cusip.notnull(), positions["isin"]) + + " " + + positions.bbg_type + ) + positions.set_index("bbg_id", inplace=True) return positions + def get_list_range(engine, begin, end, asset_class=None): begin = pd.Timestamp(begin).date() end = pd.Timestamp(end).date() - positions = pd.read_sql_query("select identifier, bbg_type, strategy from list_positions_range(%s, %s, %s)", - engine, - params=(begin, end, asset_class)) - positions.loc[positions.identifier.str.len() <= 11, 'cusip'] = positions.identifier.str.slice(stop=9) - positions.loc[positions.identifier.str.len() == 12, 'isin'] = positions.identifier - positions['bbg_id'] = positions.cusip.where(positions.cusip.notnull(), positions['isin']) + \ - ' ' + positions.bbg_type - positions.set_index('bbg_id', inplace=True) + positions = pd.read_sql_query( + "select identifier, bbg_type, strategy from list_positions_range(%s, %s, %s)", + engine, + params=(begin, end, asset_class), + ) + positions.loc[ + positions.identifier.str.len() <= 11, "cusip" + ] = positions.identifier.str.slice(stop=9) + positions.loc[positions.identifier.str.len() == 12, "isin"] = positions.identifier + positions["bbg_id"] = ( + positions.cusip.where(positions.cusip.notnull(), positions["isin"]) + + " " + + positions.bbg_type + ) + positions.set_index("bbg_id", inplace=True) return positions -def backpopulate_marks(begin_str='2015-01-15', end_str='2015-07-15'): + +def backpopulate_marks(begin_str="2015-01-15", end_str="2015-07-15"): pattern = re.compile("\d{4}-\d{2}-\d{2}") - list_of_daily_folder = (fullpath for (fullpath, _, _) in os.walk('/home/serenitas/Daily') - if pattern.match(os.path.basename(fullpath))) + list_of_daily_folder = ( + fullpath + for (fullpath, _, _) in os.walk("/home/serenitas/Daily") + if pattern.match(os.path.basename(fullpath)) + ) list_of_bdays = bdate_range(start=begin_str, end=end_str) for path in list_of_daily_folder: date = pd.to_datetime(os.path.basename(path)) if date in list_of_bdays: marks_file = [f for f in os.listdir(path) if f.startswith("securitiesNpv")] if marks_file: - marks_file.sort(key=lambda x:x[13:], reverse=True) #sort by lexicographic order which is what we want since we use ISO dates + marks_file.sort( + key=lambda x: x[13:], reverse=True + ) # sort by lexicographic order which is what we want since we use ISO dates marks = pd.read_csv(os.path.join(path, marks_file[0])) positions = get_list(pd.to_datetime(date)) - positions = positions.merge(marks, left_on='identifier', right_on='IDENTIFIER') - positions.drop(['IDENTIFIER', 'last_settle_date'], axis=1, inplace=True) - positions['date'] = date - positions.rename(columns={'Price': 'price'}, inplace=True) + positions = positions.merge( + marks, left_on="identifier", right_on="IDENTIFIER" + ) + positions.drop(["IDENTIFIER", "last_settle_date"], axis=1, inplace=True) + positions["date"] = date + positions.rename(columns={"Price": "price"}, inplace=True) positions = positions.drop_duplicates() - positions.to_sql('position', engine, if_exists='append', index=False) + positions.to_sql("position", engine, if_exists="append", index=False) + def update_securities(engine, session, workdate): - field = {'Corp': 'PREV_CPN_DT', 'Mtge': 'START_ACC_DT'} + field = {"Corp": "PREV_CPN_DT", "Mtge": "START_ACC_DT"} securities = get_list(engine) securities = securities[securities.paid_down.isnull()] - data = retrieve_data(session, securities.index.tolist(), - ['PREV_CPN_DT', 'START_ACC_DT', 'CUR_CPN', 'CPN_ASOF_DT']) - data = pd.DataFrame.from_dict(data, orient='index') - data = data[data.CPN_ASOF_DT.isnull() |(data.CPN_ASOF_DT <= workdate)] + data = retrieve_data( + session, + securities.index.tolist(), + ["PREV_CPN_DT", "START_ACC_DT", "CUR_CPN", "CPN_ASOF_DT"], + ) + data = pd.DataFrame.from_dict(data, orient="index") + data = data[data.CPN_ASOF_DT.isnull() | (data.CPN_ASOF_DT <= workdate)] m = securities.merge(data, left_index=True, right_index=True) conn = engine.raw_connection() with conn.cursor() as c: - for r in m.to_dict('records'): - accrued_field = field[r['bbg_type']] + for r in m.to_dict("records"): + accrued_field = field[r["bbg_type"]] if r[accrued_field] < workdate: - c.execute(f"UPDATE securities SET start_accrued_date=%({accrued_field})s " - ",coupon=%(CUR_CPN)s WHERE identifier=%(identifier)s", - r) + c.execute( + f"UPDATE securities SET start_accrued_date=%({accrued_field})s " + ",coupon=%(CUR_CPN)s WHERE identifier=%(identifier)s", + r, + ) conn.commit() conn.close() + def init_fx(session, engine, startdate): - currencies = ['EURUSD', 'CADUSD'] - securities = [c + ' Curncy' for c in currencies] - data = retrieve_data(session, securities, ['PX_LAST'], start_date=startdate) - data = data['EURUSD Curncy'].merge(data['CADUSD Curncy'], left_on='date', right_on='date') - data.rename(columns={'PX_LAST_x': 'eurusd', - 'PX_LAST_y': 'cadusd'}, inplace=True) - data.to_sql('fx', engine, if_exists='append') + currencies = ["EURUSD", "CADUSD"] + securities = [c + " Curncy" for c in currencies] + data = retrieve_data(session, securities, ["PX_LAST"], start_date=startdate) + data = data["EURUSD Curncy"].merge( + data["CADUSD Curncy"], left_on="date", right_on="date" + ) + data.rename(columns={"PX_LAST_x": "eurusd", "PX_LAST_y": "cadusd"}, inplace=True) + data.to_sql("fx", engine, if_exists="append") + def update_fx(conn, session, currencies): - securities = [c + ' Curncy' for c in currencies] - data = retrieve_data(session, securities, ['FIXED_CLOSING_PRICE_NY', 'PX_CLOSE_DT']) - colnames = ['date'] + securities = [c + " Curncy" for c in currencies] + data = retrieve_data(session, securities, ["FIXED_CLOSING_PRICE_NY", "PX_CLOSE_DT"]) + colnames = ["date"] values = [] for k, v in data.items(): - currency_pair = k.split(' ')[0].lower() + currency_pair = k.split(" ")[0].lower() colnames.append(currency_pair) - values.append(v['FIXED_CLOSING_PRICE_NY']) - values = [v['PX_CLOSE_DT']] + values - sqlstr = 'INSERT INTO fx({0}) VALUES({1}) ON CONFLICT DO NOTHING'.format( - ",".join(colnames), - ",".join(["%s"]*len(values))) + values.append(v["FIXED_CLOSING_PRICE_NY"]) + values = [v["PX_CLOSE_DT"]] + values + sqlstr = "INSERT INTO fx({0}) VALUES({1}) ON CONFLICT DO NOTHING".format( + ",".join(colnames), ",".join(["%s"] * len(values)) + ) with conn.cursor() as c: c.execute(sqlstr, values) conn.commit() -def init_swap_rates(conn, session, tenors=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 30], - start_date=datetime.date(1998, 10, 7)): + +def init_swap_rates( + conn, + session, + tenors=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 30], + start_date=datetime.date(1998, 10, 7), +): securities = [f"USISDA{t:02} Index" for t in tenors] - data = retrieve_data(session, securities, ['PX_LAST'], - start_date=datetime.date(1998, 10, 7)) + data = retrieve_data( + session, securities, ["PX_LAST"], start_date=datetime.date(1998, 10, 7) + ) for t in tenors: ticker = f"USISDA{t:02} Index" - sql_str = f'INSERT INTO USD_swap_fixings(fixing_date, "{t}y") ' + \ - 'VALUES(%s, %s) ON CONFLICT (fixing_date)' + \ - f' DO UPDATE SET "{t}y" = excluded."{t}y"' + sql_str = ( + f'INSERT INTO USD_swap_fixings(fixing_date, "{t}y") ' + + "VALUES(%s, %s) ON CONFLICT (fixing_date)" + + f' DO UPDATE SET "{t}y" = excluded."{t}y"' + ) with conn.cursor() as c: - c.executemany(sql_str, - [(d, r) for d, r in data[ticker]['PX_LAST'].items()]) + c.executemany(sql_str, [(d, r) for d, r in data[ticker]["PX_LAST"].items()]) conn.commit() -def init_swaption_vol(session, - tenors=['A', 'C', 'F', 'I'] + list(range(1, 11)) + [15, 20, 25, 30], - source='BBIR', - vol_type='N', - start_date=datetime.date(1990, 1, 1)): + +def init_swaption_vol( + session, + tenors=["A", "C", "F", "I"] + list(range(1, 11)) + [15, 20, 25, 30], + source="BBIR", + vol_type="N", + start_date=datetime.date(1990, 1, 1), +): tickers = [] for t1 in tenors: for t2 in tenors[4:]: tickers.append(f"USS{vol_type}{t1:0>2}{t2} {source} Curncy") - data = retrieve_data(session, tickers, ['PX_LAST'], - start_date=start_date) + data = retrieve_data(session, tickers, ["PX_LAST"], start_date=start_date) return data -def split_tenor_expiry(ticker, vol_type='N'): + +def split_tenor_expiry(ticker, vol_type="N"): m = re.match(f"USS{vol_type}(.{{2}})([^\s]*) ([^\s]*) Curncy", ticker) expiry, tenor, _ = m.groups() - if expiry[0] == '0': + if expiry[0] == "0": expiry = expiry[1:] if not expiry.isalpha(): expiry = int(expiry) tenor = int(tenor) return expiry, tenor + def insert_swaption_vol(data, conn, source, vol_type="N"): - tenors = ['A', 'C', 'F', 'I'] + list(range(1, 11)) + [15, 20, 25, 30] + tenors = ["A", "C", "F", "I"] + list(range(1, 11)) + [15, 20, 25, 30] df = pd.concat(data, axis=1) df.columns = df.columns.get_level_values(0) - df.columns = pd.MultiIndex.from_tuples([split_tenor_expiry(c, vol_type) for c in df.columns]) + df.columns = pd.MultiIndex.from_tuples( + [split_tenor_expiry(c, vol_type) for c in df.columns] + ) table_name = "swaption_normal_vol" if vol_type == "N" else "swaption_lognormal_vol" for t in tenors[-14:]: - sql_str = f'INSERT INTO {table_name}(date, "{t}y", source) ' + \ - 'VALUES(%s, %s, %s) ON CONFLICT (date, source)' + \ - f' DO UPDATE SET "{t}y" = excluded."{t}y", source = excluded.source' + sql_str = ( + f'INSERT INTO {table_name}(date, "{t}y", source) ' + + "VALUES(%s, %s, %s) ON CONFLICT (date, source)" + + f' DO UPDATE SET "{t}y" = excluded."{t}y", source = excluded.source' + ) with conn.cursor() as c: df_temp = df.xs(t, axis=1, level=1).reindex(tenors, axis=1) for k, v in df_temp.iterrows(): @@ -172,68 +220,83 @@ def insert_swaption_vol(data, conn, source, vol_type="N"): c.execute(sql_str, (k, v.tolist(), source)) conn.commit() -def update_swaption_vol(conn, session, - tenors=['A', 'C', 'F', 'I'] + list(range(1, 11)) + [15, 20, 25, 30], - *, - sources=['BBIR', 'CMPN', 'ICPL'], - vol_type="N"): + +def update_swaption_vol( + conn, + session, + tenors=["A", "C", "F", "I"] + list(range(1, 11)) + [15, 20, 25, 30], + *, + sources=["BBIR", "CMPN", "ICPL"], + vol_type="N", +): """ Parameters ---------- vol_type : one of 'N' or 'V' (normal or log-normal) """ table_name = "swaption_normal_vol" if vol_type == "N" else "swaption_lognormal_vol" - for source in ['BBIR', 'CMPN', 'ICPL']: + for source in ["BBIR", "CMPN", "ICPL"]: tickers = [] for expiry in tenors: for tenor in tenors: tickers.append(f"USS{vol_type}{expiry:0>2}{tenor} {source} Curncy") - data = retrieve_data(session, tickers, ['PX_YEST_CLOSE', 'PX_CLOSE_DT']) + data = retrieve_data(session, tickers, ["PX_YEST_CLOSE", "PX_CLOSE_DT"]) for t in tenors[4:]: - sql_str = f'INSERT INTO {table_name}(date, "{t}y", source) ' + \ - 'VALUES(%s, %s, %s) ON CONFLICT (date, source)' + \ - f' DO UPDATE SET "{t}y" = excluded."{t}y", source = excluded.source' + sql_str = ( + f'INSERT INTO {table_name}(date, "{t}y", source) ' + + "VALUES(%s, %s, %s) ON CONFLICT (date, source)" + + f' DO UPDATE SET "{t}y" = excluded."{t}y", source = excluded.source' + ) r = [] dates = [] for expiry in tenors: ticker = f"USS{vol_type}{expiry:0>2}{t} {source} Curncy" if data[ticker]: - r.append(data[ticker]['PX_YEST_CLOSE']) - dates.append(data[ticker]['PX_CLOSE_DT']) + r.append(data[ticker]["PX_YEST_CLOSE"]) + dates.append(data[ticker]["PX_CLOSE_DT"]) else: r.append(None) dates.append(dates[-1]) if dates.count(dates[0]) < len(dates): - raise ValueError('Not all quotes are from the same date') + raise ValueError("Not all quotes are from the same date") with conn.cursor() as c: c.execute(sql_str, (dates[0], r, source)) conn.commit() -def update_swap_rates(conn, session, - tenors=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 30]): + +def update_swap_rates( + conn, session, tenors=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 30] +): securities = [f"USISDA{t:02} Index" for t in tenors] - data = retrieve_data(session, securities, ['PX_LAST', 'LAST_UPDATE_DT']) + data = retrieve_data(session, securities, ["PX_LAST", "LAST_UPDATE_DT"]) for t in tenors: ticker = f"USISDA{t:02} Index" - sql_str = f'INSERT INTO USD_swap_fixings(fixing_date, "{t}y") ' + \ - 'VALUES(%(LAST_UPDATE_DT)s, %(PX_LAST)s) ON CONFLICT (fixing_date)' + \ - f' DO UPDATE SET "{t}y" = %(PX_LAST)s' + sql_str = ( + f'INSERT INTO USD_swap_fixings(fixing_date, "{t}y") ' + + "VALUES(%(LAST_UPDATE_DT)s, %(PX_LAST)s) ON CONFLICT (fixing_date)" + + f' DO UPDATE SET "{t}y" = %(PX_LAST)s' + ) with conn.cursor() as c: c.execute(sql_str, data[ticker]) conn.commit() + def update_cash_rates(conn, session, start_date=None): - securities = {"FEDL01 Index": "FED_FUND", - "US0001M Index": "1M_LIBOR", - "US0003M Index": "3M_LIBOR"} + securities = { + "FEDL01 Index": "FED_FUND", + "US0001M Index": "1M_LIBOR", + "US0003M Index": "3M_LIBOR", + } if start_date is None: - data = retrieve_data(session, list(securities.keys()), - ["PX_LAST", "LAST_UPDATE_DT"]) + data = retrieve_data( + session, list(securities.keys()), ["PX_LAST", "LAST_UPDATE_DT"] + ) else: - data = retrieve_data(session, list(securities.keys()), - ["PX_LAST"], start_date=start_date.date()) + data = retrieve_data( + session, list(securities.keys()), ["PX_LAST"], start_date=start_date.date() + ) sql_str = "INSERT INTO rates VALUES(%s, %s, %s) ON CONFLICT DO NOTHING" with conn.cursor() as c: if start_date is None: @@ -245,53 +308,69 @@ def update_cash_rates(conn, session, start_date=None): c.execute(sql_str, (d, securities[k], r)) conn.commit() + def populate_cashflow_history(engine, session, workdate=None, fund="SERCGMAST"): securities = get_list(engine, workdate, fund=fund) - data = retrieve_data(session, securities.index.tolist(), - ['HIST_CASH_FLOW', 'MTG_HIST_CPN', - 'FLT_CPN_HIST', 'HIST_INTEREST_DISTRIBUTED']) - fixed_coupons = {'XS0306416982 Mtge': 7.62, - '91927RAD1 Mtge': 6.77} + data = retrieve_data( + session, + securities.index.tolist(), + ["HIST_CASH_FLOW", "MTG_HIST_CPN", "FLT_CPN_HIST", "HIST_INTEREST_DISTRIBUTED"], + ) + fixed_coupons = {"XS0306416982 Mtge": 7.62, "91927RAD1 Mtge": 6.77} conn = engine.raw_connection() for k, v in data.items(): - if 'HIST_CASH_FLOW' in v: - to_insert = v['HIST_CASH_FLOW'].merge(v['MTG_HIST_CPN'], how='left', - left_on='Payment Date', - right_on='Payment Date') - to_insert.rename(columns={'Coupon_y': 'coupon', - 'Interest': 'interest', - 'Payment Date': 'date', - 'Principal Balance': 'principal_bal', - 'Principal Paid': 'principal'}, inplace=True) - to_insert.drop(['Period Number', 'Coupon_x'], axis=1, inplace=True) - elif 'FLT_CPN_HIST' in v: - to_insert = v['FLT_CPN_HIST'] - to_insert.rename(columns={'Coupon Rate': 'coupon', - 'Accrual Start Date': 'date'}, inplace=True) + if "HIST_CASH_FLOW" in v: + to_insert = v["HIST_CASH_FLOW"].merge( + v["MTG_HIST_CPN"], + how="left", + left_on="Payment Date", + right_on="Payment Date", + ) + to_insert.rename( + columns={ + "Coupon_y": "coupon", + "Interest": "interest", + "Payment Date": "date", + "Principal Balance": "principal_bal", + "Principal Paid": "principal", + }, + inplace=True, + ) + to_insert.drop(["Period Number", "Coupon_x"], axis=1, inplace=True) + elif "FLT_CPN_HIST" in v: + to_insert = v["FLT_CPN_HIST"] + to_insert.rename( + columns={"Coupon Rate": "coupon", "Accrual Start Date": "date"}, + inplace=True, + ) to_insert.coupon = to_insert.coupon.shift(1) - elif 'HIST_INTEREST_DISTRIBUTED' in v: - to_insert = v['HIST_INTEREST_DISTRIBUTED'] - to_insert.rename(columns={'Interest': 'interest', - 'Historical Date': 'date'}, inplace=True) + elif "HIST_INTEREST_DISTRIBUTED" in v: + to_insert = v["HIST_INTEREST_DISTRIBUTED"] + to_insert.rename( + columns={"Interest": "interest", "Historical Date": "date"}, + inplace=True, + ) if k in fixed_coupons: - to_insert['coupon'] = fixed_coupons[k] - else: #damn you XS0299146992 ! + to_insert["coupon"] = fixed_coupons[k] + else: # damn you XS0299146992 ! continue else: logging.error("No cashflows for the given security") - identifier = securities.loc[k, 'identifier'] - to_insert['identifier'] = identifier + identifier = securities.loc[k, "identifier"] + to_insert["identifier"] = identifier with conn.cursor() as c: c.execute("DELETE FROM cashflow_history WHERE identifier=%s", (identifier,)) conn.commit() - to_insert.to_sql('cashflow_history', engine, if_exists='append', index=False) + to_insert.to_sql("cashflow_history", engine, if_exists="append", index=False) with conn.cursor() as c: c.execute("REFRESH MATERIALIZED VIEW factors_history") conn.commit() conn.close() + if __name__ == "__main__": from utils.db import serenitas_pool, dawn_engine + dawn_conn = dawn_engine.raw_connection() serenitas_conn = serenitas_pool.getconn() if len(sys.argv) > 1: @@ -302,7 +381,7 @@ if __name__ == "__main__": update_securities(dawn_engine, session, workdate) populate_cashflow_history(dawn_engine, session, workdate, "SERCGMAST") populate_cashflow_history(dawn_engine, session, workdate, "BRINKER") - update_fx(dawn_conn, session, ['EURUSD', 'CADUSD']) + update_fx(dawn_conn, session, ["EURUSD", "CADUSD"]) update_swap_rates(serenitas_conn, session) update_cash_rates(serenitas_conn, session) for vol_type in ["N", "V"]: |
