aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/position.py69
1 files changed, 30 insertions, 39 deletions
diff --git a/python/position.py b/python/position.py
index 81f5af5c..e7633f7d 100644
--- a/python/position.py
+++ b/python/position.py
@@ -1,4 +1,5 @@
from serenitas.analytics.bbg_helpers import bbg_retry, retrieve_data
+from serenitas.utils.db2 import InfDateLoaderPandas
from itertools import product
import datetime
import numpy as np
@@ -10,7 +11,7 @@ import logging
def get_list(
- engine,
+ conn,
workdate: datetime.date = None,
asset_class=None,
include_unsettled: bool = True,
@@ -19,11 +20,14 @@ def get_list(
if workdate:
positions = pd.read_sql_query(
"SELECT identifier, figi, bbg_type FROM list_positions(%s, %s, %s, %s)",
- engine,
+ conn,
params=(workdate, asset_class, include_unsettled, fund),
)
else:
- positions = pd.read_sql_table("securities", engine)
+ conn.adapters.register_loader("date", InfDateLoaderPandas)
+ positions = pd.read_sql_query(
+ "SELECT * FROM securities", conn, parse_dates=["paid_down"]
+ )
positions["bbg_id"] = positions.figi + " " + positions.bbg_type
positions.set_index("bbg_id", inplace=True)
return positions
@@ -78,9 +82,9 @@ def backpopulate_marks(begin_str="2015-01-15", end_str="2015-07-15"):
positions.to_sql("position", engine, if_exists="append", index=False)
-def update_securities(engine, session, workdate):
+def update_securities(conn, session, workdate):
field = {"Corp": "PREV_CPN_DT", "Mtge": "START_ACC_DT"}
- securities = get_list(engine)
+ securities = get_list(conn)
securities = securities[securities.paid_down.isnull()]
data = retrieve_data(
session,
@@ -92,7 +96,6 @@ def update_securities(engine, session, workdate):
data.CPN_ASOF_DT.isnull() | (data.CPN_ASOF_DT <= pd.Timestamp(workdate))
]
m = securities.merge(data, left_index=True, right_index=True)
- conn = engine.raw_connection()
with conn.cursor() as c:
for r in m.to_dict("records"):
accrued_field = field[r["bbg_type"]]
@@ -103,8 +106,6 @@ def update_securities(engine, session, workdate):
"WHERE identifier=%(identifier)s",
r,
)
- conn.commit()
- conn.close()
def init_fx(session, engine, startdate):
@@ -305,10 +306,10 @@ def update_cash_rates(conn, session, start_date: datetime.date = None):
conn.commit()
-def populate_cashflow_history(engine, session, workdate=None, funds=("SERCGMAST",)):
+def populate_cashflow_history(conn, session, workdate=None, funds=("SERCGMAST",)):
securities = {}
for fund in funds:
- secs = get_list(engine, workdate, fund=fund)
+ secs = get_list(conn, workdate, fund=fund)
for sec in secs.itertuples():
if sec.Index not in securities:
securities[sec.Index] = sec.figi
@@ -318,7 +319,6 @@ def populate_cashflow_history(engine, session, workdate=None, funds=("SERCGMAST"
["HIST_CASH_FLOW", "MTG_HIST_CPN", "FLT_CPN_HIST", "HIST_INTEREST_DISTRIBUTED"],
)
fixed_coupons = {"XS0306416982 Mtge": 7.62, "91927RAD1 Mtge": 6.77}
- conn = engine.raw_connection()
for k, v in data.items():
if "HIST_CASH_FLOW" in v:
to_insert = v["HIST_CASH_FLOW"].merge(
@@ -358,22 +358,25 @@ def populate_cashflow_history(engine, session, workdate=None, funds=("SERCGMAST"
else:
logging.error(f"No cashflows for security {securities[k]}")
continue
-
- to_insert["identifier"] = securities[k]
+ cols = to_insert.columns.tolist() + ["identifier"]
+ update_cols = ["principal_bal", "principal", "interest", "coupon"]
+ sql_str = (
+ f"INSERT INTO cashflow_history({','.join(cols)}) "
+ f"VALUES ({','.join(['%s'] * len(cols))}) "
+ "ON CONFLICT (identifier, date) DO UPDATE SET "
+ f"({','.join(update_cols)}) = ({','.join(['EXCLUDED.'+c for c in update_cols])})"
+ )
with conn.cursor() as c:
- c.execute(
- "DELETE FROM cashflow_history WHERE identifier=%s", (securities[k],)
- )
+ for row in to_insert.itertuples(index=False):
+ c.execute(sql_str, row + (securities[k],))
conn.commit()
- to_insert.to_sql("cashflow_history", engine, if_exists="append", index=False)
with conn.cursor() as c:
c.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY factors_history")
conn.commit()
- conn.close()
if __name__ == "__main__":
- from serenitas.utils.db import serenitas_pool, dawn_engine
+ from serenitas.utils.db2 import serenitas_pool, dawn_pool
import argparse
parser = argparse.ArgumentParser()
@@ -385,14 +388,11 @@ if __name__ == "__main__":
)
args = parser.parse_args()
- dawn_conn = dawn_engine.raw_connection()
- serenitas_conn = serenitas_pool.getconn()
-
@bbg_retry(2)
- def bbg_call(session, dawn_engine, dawn_conn, serenitas_conn, workdate):
- update_securities(dawn_engine, session, args.workdate)
+ def bbg_call(session, dawn_conn, serenitas_conn, workdate):
+ update_securities(dawn_conn, session, args.workdate)
populate_cashflow_history(
- dawn_engine,
+ dawn_conn,
session,
args.workdate,
("SERCGMAST", "BRINKER", "BOWDST"),
@@ -403,17 +403,8 @@ if __name__ == "__main__":
for vol_type in ["N", "V", "N_OIS"]:
update_swaption_vol(serenitas_conn, session, vol_type=vol_type)
- bbg_call(dawn_engine, dawn_conn, serenitas_conn, args.workdate)
- serenitas_pool.putconn(serenitas_conn)
- # with init_bbg_session(BBG_IP) as session:
- # init_fx(session, engine, pd.datetime(2013, 1, 1))
- # with init_bbg_session(BBG_IP) as session:
- # init_swap_rates(serenitas_conn, session, start_date=pd.datetime(2012, 2, 2))
- # for source in ['BBIR', 'ICPL', 'CMPN']:
- # for vol_type in ["N", "V"]:
- # with init_bbg_session() as session:
- # data = init_swaption_vol(session, source=source,
- # vol_type=vol_type,
- # start_date=datetime.date(2001, 1, 1))
- # insert_swaption_vol(data, serenitas_conn, source,
- # vol_type=vol_type)
+ with (
+ serenitas_pool.connection() as serenitas_conn,
+ dawn_pool.connection() as dawn_conn,
+ ):
+ bbg_call(dawn_conn, serenitas_conn, args.workdate)