from serenitas.analytics.bbg_helpers import bbg_retry, retrieve_data from serenitas.utils.db2 import InfDateLoaderPandas from itertools import product import datetime import numpy as np import pandas as pd from pandas import bdate_range import re import os import logging def get_list( conn, workdate: datetime.date = None, asset_class=None, include_unsettled: bool = True, fund="SERCGMAST", ): if workdate: positions = pd.read_sql_query( "SELECT identifier, figi, bbg_type FROM list_positions(%s, %s, %s, %s)", conn, params=(workdate, asset_class, include_unsettled, fund), ) else: conn.adapters.register_loader("date", InfDateLoaderPandas) positions = pd.read_sql_query( "SELECT * FROM securities", conn, parse_dates=["paid_down"] ) positions["bbg_id"] = positions.figi + " " + positions.bbg_type positions.set_index("bbg_id", inplace=True) return positions def get_list_range(engine, begin, end, asset_class=None): begin = pd.Timestamp(begin).date() end = pd.Timestamp(end).date() positions = pd.read_sql_query( "select identifier, bbg_type, strategy from list_positions_range(%s, %s, %s)", engine, params=(begin, end, asset_class), ) positions.loc[ positions.identifier.str.len() <= 11, "cusip" ] = positions.identifier.str.slice(stop=9) positions.loc[positions.identifier.str.len() == 12, "isin"] = positions.identifier positions["bbg_id"] = ( positions.cusip.where(positions.cusip.notnull(), positions["isin"]) + " " + positions.bbg_type ) positions.set_index("bbg_id", inplace=True) return positions def backpopulate_marks(begin_str="2015-01-15", end_str="2015-07-15"): pattern = re.compile(r"\d{4}-\d{2}-\d{2}") list_of_daily_folder = ( fullpath for (fullpath, _, _) in os.walk("/home/serenitas/Daily") if pattern.match(os.path.basename(fullpath)) ) list_of_bdays = bdate_range(start=begin_str, end=end_str) for path in list_of_daily_folder: date = pd.to_datetime(os.path.basename(path)) if date in list_of_bdays: marks_file = [f for f in os.listdir(path) if f.startswith("securitiesNpv")] if marks_file: marks_file.sort( key=lambda x: x[13:], reverse=True ) # sort by lexicographic order which is what we want since we use ISO dates marks = pd.read_csv(os.path.join(path, marks_file[0])) positions = get_list(pd.to_datetime(date)) positions = positions.merge( marks, left_on="identifier", right_on="IDENTIFIER" ) positions.drop(["IDENTIFIER", "last_settle_date"], axis=1, inplace=True) positions["date"] = date positions.rename(columns={"Price": "price"}, inplace=True) positions = positions.drop_duplicates() positions.to_sql("position", engine, if_exists="append", index=False) def update_securities(conn, session, workdate): field = {"Corp": "PREV_CPN_DT", "Mtge": "START_ACC_DT"} securities = get_list(conn) securities = securities[securities.paid_down.isnull()] data = retrieve_data( session, securities.index.tolist(), ["PREV_CPN_DT", "START_ACC_DT", "CUR_CPN", "CPN_ASOF_DT", "MTG_FACE_AMT"], ) data = pd.DataFrame.from_dict(data, orient="index") data = data[ data.CPN_ASOF_DT.isnull() | (data.CPN_ASOF_DT <= pd.Timestamp(workdate)) ] m = securities.merge(data, left_index=True, right_index=True) with conn.cursor() as c: for r in m.to_dict("records"): accrued_field = field[r["bbg_type"]] if r[accrued_field].date() < workdate: c.execute( f"UPDATE securities SET start_accrued_date=%({accrued_field})s " ",coupon=%(CUR_CPN)s, face_amount=%(MTG_FACE_AMT)s " "WHERE identifier=%(identifier)s", r, ) def init_fx(session, engine, startdate): currencies = ["EURUSD", "CADUSD"] securities = [c + " Curncy" for c in currencies] data = retrieve_data(session, securities, ["PX_LAST"], start_date=startdate) data = data["EURUSD Curncy"].merge( data["CADUSD Curncy"], left_on="date", right_on="date" ) data.rename(columns={"PX_LAST_x": "eurusd", "PX_LAST_y": "cadusd"}, inplace=True) data.to_sql("fx", engine, if_exists="append") def update_fx(conn, session, currencies): securities = [c + " Curncy" for c in currencies] data = retrieve_data(session, securities, ["FIXED_CLOSING_PRICE_NY", "PX_CLOSE_DT"]) colnames = ["date"] values = [] for k, v in data.items(): currency_pair = k.split(" ")[0].lower() colnames.append(currency_pair) values.append(v["FIXED_CLOSING_PRICE_NY"]) values = [v["PX_CLOSE_DT"]] + values sqlstr = "INSERT INTO fx({0}) VALUES({1}) ON CONFLICT DO NOTHING".format( ",".join(colnames), ",".join(["%s"] * len(values)) ) with conn.cursor() as c: c.execute(sqlstr, values) conn.commit() def init_swap_rates( conn, session, tenors=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 30], start_date=datetime.date(1998, 10, 7), ): securities = [f"USISDA{t:02} Index" for t in tenors] data = retrieve_data( session, securities, ["PX_LAST"], start_date=datetime.date(1998, 10, 7) ) for t in tenors: ticker = f"USISDA{t:02} Index" sql_str = ( f'INSERT INTO USD_swap_fixings(fixing_date, "{t}y") ' + "VALUES(%s, %s) ON CONFLICT (fixing_date)" + f' DO UPDATE SET "{t}y" = excluded."{t}y"' ) with conn.cursor() as c: c.executemany(sql_str, [(d, r) for d, r in data[ticker]["PX_LAST"].items()]) conn.commit() def init_swaption_vol( session, tenors=["A", "C", "F", "I"] + list(range(1, 11)) + [15, 20, 25, 30], source="BBIR", vol_type="N", start_date=datetime.date(1990, 1, 1), ): tickers = [] for t1 in tenors: for t2 in tenors[4:]: tickers.append(f"USS{vol_type}{t1:0>2}{t2} {source} Curncy") data = retrieve_data(session, tickers, ["PX_LAST"], start_date=start_date) return data def split_tenor_expiry(ticker, vol_type="N"): m = re.match("USS" + vol_type + r"(.{{2}})([^\s]*) ([^\s]*) Curncy", ticker) expiry, tenor, _ = m.groups() if expiry[0] == "0": expiry = expiry[1:] if not expiry.isalpha(): expiry = int(expiry) tenor = int(tenor) return expiry, tenor def insert_swaption_vol(data, conn, source, vol_type="N"): tenors = ["A", "C", "F", "I"] + list(range(1, 11)) + [15, 20, 25, 30] df = pd.concat(data, axis=1) df.columns = df.columns.get_level_values(0) df.columns = pd.MultiIndex.from_tuples( [split_tenor_expiry(c, vol_type) for c in df.columns] ) table_name = "swaption_normal_vol" if vol_type == "N" else "swaption_lognormal_vol" for t in tenors[-14:]: sql_str = ( f'INSERT INTO {table_name}(date, "{t}y", source) ' + "VALUES(%s, %s, %s) ON CONFLICT (date, source)" + f' DO UPDATE SET "{t}y" = excluded."{t}y", source = excluded.source' ) with conn.cursor() as c: df_temp = df.xs(t, axis=1, level=1).reindex(tenors, axis=1) for k, v in df_temp.iterrows(): if np.all(np.isnan(v.values)): continue c.execute(sql_str, (k, v.tolist(), source)) conn.commit() def update_swaption_vol( conn, session, expiries=["A", "C", "F", "I"] + list(range(1, 11)) + [15, 20, 25, 30], tenors=[1, 2, 3, 4, 5, 7, 10, 12, 15, 20, 25, 30], start_from=datetime.date.today(), *, sources=("BBIR", "CMPN", "ICPL"), vol_type="N", ): """ Parameters ---------- vol_type : one of 'N', 'V' or 'N_OIS' (normal or log-normal) """ _alt_exp = {10: "J", 12: "L", 15: "O", 20: "T", 25: "Y", 30: "Z"} match vol_type: case "N": db_vol_type = "Normal" ticker_pattern = "USSN{:0>2}{} {} Curncy" alt_exp = lambda e, t: e case "V": db_vol_type = "LogNormal" ticker_pattern = "USSV{:0>2}{} {} Curncy" alt_exp = lambda e, t: e case "N_OIS": db_vol_type = "Normal (OIS)" ticker_pattern = "USSNA{}{} {} Curncy" sources = ("ICPL", "BGN") alt_exp = lambda e, t: _alt_exp.get(e, e) if t >= 10 else e mappings = {"A": "1M", "C": "3M", "F": "6M", "I": "9M"} for source in sources: tickers = { ticker_pattern.format(alt_exp(e, t), t, source): (e, t) for e, t in product(expiries, tenors) } data = retrieve_data(session, tickers, ["PX_LAST"], start_date=start_from) data = pd.concat(data, names=["ticker", "date"]) for date, df in data.groupby(level="date"): with conn.cursor() as c: for ticker, val in df.reset_index("date", drop=True).itertuples(): e, t = tickers[ticker] tenor = f"{t}Y" expiry = mappings.get(e, f"{e}Y") c.execute( "INSERT INTO swaption_vol VALUES (%s, %s, %s, %s, %s, %s)" " ON CONFLICT (date, expiry, tenor, vol_type, source) " "DO UPDATE SET vol=EXCLUDED.vol", (date, expiry, tenor, db_vol_type, source, val), ) conn.commit() def update_swap_rates( conn, session, tenors=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 30] ): securities = [f"USISDA{t:02} Index" for t in tenors] data = retrieve_data(session, securities, ["PX_LAST", "LAST_UPDATE_DT"]) for t in tenors: ticker = f"USISDA{t:02} Index" sql_str = ( f'INSERT INTO USD_swap_fixings(fixing_date, "{t}y") ' + "VALUES(%(LAST_UPDATE_DT)s, %(PX_LAST)s) ON CONFLICT (fixing_date)" + f' DO UPDATE SET "{t}y" = %(PX_LAST)s' ) with conn.cursor() as c: c.execute(sql_str, data[ticker]) conn.commit() def update_cash_rates(conn, session, start_date: datetime.date = None): securities = { "FEDL01 Index": "FED_FUND", "US0001M Index": "1M_LIBOR", "US0003M Index": "3M_LIBOR", "SOFRRATE Index": "SOFRRATE", "SOFRINDX Index": "SOFRINDX", } if start_date is None: data = retrieve_data( session, list(securities.keys()), ["PX_LAST", "LAST_UPDATE_DT"] ) else: data = retrieve_data( session, list(securities.keys()), ["PX_LAST"], start_date=start_date ) sql_str = "INSERT INTO rates VALUES(%s, %s, %s) ON CONFLICT DO NOTHING" with conn.cursor() as c: if start_date is None: for k, v in data.items(): c.execute(sql_str, (v["LAST_UPDATE_DT"], securities[k], v["PX_LAST"])) else: for k, v in data.items(): for d, r in v["PX_LAST"].items(): c.execute(sql_str, (d, securities[k], r)) conn.commit() def populate_cashflow_history(conn, session, workdate=None, funds=("SERCGMAST",)): securities = {} for fund in funds: secs = get_list(conn, workdate, fund=fund) for sec in secs.itertuples(): if sec.Index not in securities: securities[sec.Index] = sec.figi data = retrieve_data( session, securities, [ "HIST_CASH_FLOW", "MTG_HIST_CPN", "FLT_CPN_HIST", "HIST_INTEREST_DISTRIBUTED", "MTG_HIST_FACT", ], ) fixed_coupons = {"XS0306416982 Mtge": 7.62, "91927RAD1 Mtge": 6.77} for k, v in data.items(): if "HIST_CASH_FLOW" in v: to_insert = ( v["HIST_CASH_FLOW"] .merge( v["MTG_HIST_CPN"], how="left", on="Payment Date", ) .merge( v["MTG_HIST_FACT"], how="left", on="Payment Date", ) ) to_insert.rename( columns={ "Coupon_y": "coupon", "Interest": "interest", "Payment Date": "date", "Principal Balance": "principal_bal", "Principal Paid": "principal", }, inplace=True, ) to_insert.drop(["Period Number", "Coupon_x"], axis=1, inplace=True) elif "FLT_CPN_HIST" in v: to_insert = v["FLT_CPN_HIST"] to_insert.rename( columns={"Coupon Rate": "coupon", "Accrual Start Date": "date"}, inplace=True, ) to_insert.coupon = to_insert.coupon.shift(1) elif "HIST_INTEREST_DISTRIBUTED" in v: to_insert = v["HIST_INTEREST_DISTRIBUTED"] to_insert.rename( columns={"Interest": "interest", "Historical Date": "date"}, inplace=True, ) if k in fixed_coupons: to_insert["coupon"] = fixed_coupons[k] else: # damn you XS0299146992 ! continue else: logging.error(f"No cashflows for security {securities[k]}") continue cols = to_insert.columns.tolist() + ["identifier"] update_cols = ["principal_bal", "principal", "interest", "coupon", "factor"] sql_str = ( f"INSERT INTO cashflow_history({','.join(cols)}) " f"VALUES ({','.join(['%s'] * len(cols))}) " "ON CONFLICT (identifier, date) DO UPDATE SET " f"({','.join(update_cols)}) = ({','.join(['EXCLUDED.'+c for c in update_cols])})" ) with conn.cursor() as c: for row in to_insert.itertuples(index=False): c.execute(sql_str, row + (securities[k],)) conn.commit() with conn.cursor() as c: c.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY factors_history") conn.commit() if __name__ == "__main__": from serenitas.utils.pool import serenitas_pool, dawn_pool import argparse parser = argparse.ArgumentParser() parser.add_argument( "workdate", nargs="?", type=datetime.date.fromisoformat, default=datetime.date.today(), ) args = parser.parse_args() @bbg_retry(2) def bbg_call(session, dawn_conn, serenitas_conn, workdate): update_securities(dawn_conn, session, workdate) populate_cashflow_history( dawn_conn, session, workdate, ( "SERCGMAST", "BRINKER", "BOWDST", "ISOSEL", ), ) update_fx(dawn_conn, session, ["EURUSD", "CADUSD"]) update_swap_rates(serenitas_conn, session) update_cash_rates(serenitas_conn, session) for vol_type in ["N", "V", "N_OIS"]: update_swaption_vol( serenitas_conn, session, start_from=workdate, vol_type=vol_type ) with ( serenitas_pool.connection() as serenitas_conn, dawn_pool.connection() as dawn_conn, ): bbg_call(dawn_conn, serenitas_conn, args.workdate)