diff options
| -rw-r--r-- | python/position.py | 69 |
1 files changed, 30 insertions, 39 deletions
diff --git a/python/position.py b/python/position.py index 81f5af5c..e7633f7d 100644 --- a/python/position.py +++ b/python/position.py @@ -1,4 +1,5 @@ from serenitas.analytics.bbg_helpers import bbg_retry, retrieve_data +from serenitas.utils.db2 import InfDateLoaderPandas from itertools import product import datetime import numpy as np @@ -10,7 +11,7 @@ import logging def get_list( - engine, + conn, workdate: datetime.date = None, asset_class=None, include_unsettled: bool = True, @@ -19,11 +20,14 @@ def get_list( if workdate: positions = pd.read_sql_query( "SELECT identifier, figi, bbg_type FROM list_positions(%s, %s, %s, %s)", - engine, + conn, params=(workdate, asset_class, include_unsettled, fund), ) else: - positions = pd.read_sql_table("securities", engine) + conn.adapters.register_loader("date", InfDateLoaderPandas) + positions = pd.read_sql_query( + "SELECT * FROM securities", conn, parse_dates=["paid_down"] + ) positions["bbg_id"] = positions.figi + " " + positions.bbg_type positions.set_index("bbg_id", inplace=True) return positions @@ -78,9 +82,9 @@ def backpopulate_marks(begin_str="2015-01-15", end_str="2015-07-15"): positions.to_sql("position", engine, if_exists="append", index=False) -def update_securities(engine, session, workdate): +def update_securities(conn, session, workdate): field = {"Corp": "PREV_CPN_DT", "Mtge": "START_ACC_DT"} - securities = get_list(engine) + securities = get_list(conn) securities = securities[securities.paid_down.isnull()] data = retrieve_data( session, @@ -92,7 +96,6 @@ def update_securities(engine, session, workdate): data.CPN_ASOF_DT.isnull() | (data.CPN_ASOF_DT <= pd.Timestamp(workdate)) ] m = securities.merge(data, left_index=True, right_index=True) - conn = engine.raw_connection() with conn.cursor() as c: for r in m.to_dict("records"): accrued_field = field[r["bbg_type"]] @@ -103,8 +106,6 @@ def update_securities(engine, session, workdate): "WHERE identifier=%(identifier)s", r, ) - conn.commit() - conn.close() def init_fx(session, engine, startdate): @@ -305,10 +306,10 @@ def update_cash_rates(conn, session, start_date: datetime.date = None): conn.commit() -def populate_cashflow_history(engine, session, workdate=None, funds=("SERCGMAST",)): +def populate_cashflow_history(conn, session, workdate=None, funds=("SERCGMAST",)): securities = {} for fund in funds: - secs = get_list(engine, workdate, fund=fund) + secs = get_list(conn, workdate, fund=fund) for sec in secs.itertuples(): if sec.Index not in securities: securities[sec.Index] = sec.figi @@ -318,7 +319,6 @@ def populate_cashflow_history(engine, session, workdate=None, funds=("SERCGMAST" ["HIST_CASH_FLOW", "MTG_HIST_CPN", "FLT_CPN_HIST", "HIST_INTEREST_DISTRIBUTED"], ) fixed_coupons = {"XS0306416982 Mtge": 7.62, "91927RAD1 Mtge": 6.77} - conn = engine.raw_connection() for k, v in data.items(): if "HIST_CASH_FLOW" in v: to_insert = v["HIST_CASH_FLOW"].merge( @@ -358,22 +358,25 @@ def populate_cashflow_history(engine, session, workdate=None, funds=("SERCGMAST" else: logging.error(f"No cashflows for security {securities[k]}") continue - - to_insert["identifier"] = securities[k] + cols = to_insert.columns.tolist() + ["identifier"] + update_cols = ["principal_bal", "principal", "interest", "coupon"] + sql_str = ( + f"INSERT INTO cashflow_history({','.join(cols)}) " + f"VALUES ({','.join(['%s'] * len(cols))}) " + "ON CONFLICT (identifier, date) DO UPDATE SET " + f"({','.join(update_cols)}) = ({','.join(['EXCLUDED.'+c for c in update_cols])})" + ) with conn.cursor() as c: - c.execute( - "DELETE FROM cashflow_history WHERE identifier=%s", (securities[k],) - ) + for row in to_insert.itertuples(index=False): + c.execute(sql_str, row + (securities[k],)) conn.commit() - to_insert.to_sql("cashflow_history", engine, if_exists="append", index=False) with conn.cursor() as c: c.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY factors_history") conn.commit() - conn.close() if __name__ == "__main__": - from serenitas.utils.db import serenitas_pool, dawn_engine + from serenitas.utils.db2 import serenitas_pool, dawn_pool import argparse parser = argparse.ArgumentParser() @@ -385,14 +388,11 @@ if __name__ == "__main__": ) args = parser.parse_args() - dawn_conn = dawn_engine.raw_connection() - serenitas_conn = serenitas_pool.getconn() - @bbg_retry(2) - def bbg_call(session, dawn_engine, dawn_conn, serenitas_conn, workdate): - update_securities(dawn_engine, session, args.workdate) + def bbg_call(session, dawn_conn, serenitas_conn, workdate): + update_securities(dawn_conn, session, args.workdate) populate_cashflow_history( - dawn_engine, + dawn_conn, session, args.workdate, ("SERCGMAST", "BRINKER", "BOWDST"), @@ -403,17 +403,8 @@ if __name__ == "__main__": for vol_type in ["N", "V", "N_OIS"]: update_swaption_vol(serenitas_conn, session, vol_type=vol_type) - bbg_call(dawn_engine, dawn_conn, serenitas_conn, args.workdate) - serenitas_pool.putconn(serenitas_conn) - # with init_bbg_session(BBG_IP) as session: - # init_fx(session, engine, pd.datetime(2013, 1, 1)) - # with init_bbg_session(BBG_IP) as session: - # init_swap_rates(serenitas_conn, session, start_date=pd.datetime(2012, 2, 2)) - # for source in ['BBIR', 'ICPL', 'CMPN']: - # for vol_type in ["N", "V"]: - # with init_bbg_session() as session: - # data = init_swaption_vol(session, source=source, - # vol_type=vol_type, - # start_date=datetime.date(2001, 1, 1)) - # insert_swaption_vol(data, serenitas_conn, source, - # vol_type=vol_type) + with ( + serenitas_pool.connection() as serenitas_conn, + dawn_pool.connection() as dawn_conn, + ): + bbg_call(dawn_conn, serenitas_conn, args.workdate) |
