diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/position.py | 48 |
1 files changed, 26 insertions, 22 deletions
diff --git a/python/position.py b/python/position.py index d5bf4558..10a063ba 100644 --- a/python/position.py +++ b/python/position.py @@ -1,7 +1,8 @@ from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP import datetime +import numpy as np import pandas as pd -from sqlalchemy import create_engine +import psycopg2 from pandas.tseries.offsets import BDay from pandas import bdate_range import re @@ -56,11 +57,10 @@ def backpopulate_marks(begin_str='2015-01-15', end_str='2015-07-15'): positions = positions.drop_duplicates() positions.to_sql('position', engine, if_exists='append', index=False) -def update_securities(engine, session, workdate): +def update_securities(conn, session, workdate): field = {'Corp': 'PREV_CPN_DT', 'Mtge': 'START_ACC_DT'} securities = get_list(engine) securities = securities[securities.paid_down.isnull()] - conn = engine.raw_connection() data = retrieve_data(session, securities.index.tolist(), ['PREV_CPN_DT', 'START_ACC_DT', 'CUR_CPN', 'CPN_ASOF_DT']) data = pd.DataFrame.from_dict(data, orient='index') @@ -83,7 +83,7 @@ def init_fx(session, engine, startdate): 'PX_LAST_y': 'cadusd'}, inplace=True) data.to_sql('fx', engine, index=False, if_exists='append') -def update_fx(engine, session, currencies): +def update_fx(conn, session, currencies): securities = [c + ' Curncy' for c in currencies] data = retrieve_data(session, securities, ['FIXED_CLOSING_PRICE_NY', 'PX_CLOSE_DT']) colnames = ['date'] @@ -96,12 +96,12 @@ def update_fx(engine, session, currencies): sqlstr = 'INSERT INTO fx({0}) VALUES({1}) ON CONFLICT DO NOTHING'.format( ",".join(colnames), ",".join(["%s"]*len(values))) - conn = engine.raw_connection() + with conn.cursor() as c: c.execute(sqlstr, values) conn.commit() -def init_swap_rates(engine, session, tenors=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 30]): +def init_swap_rates(conn, session, tenors=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 30]): securities = [f"USISDA{t:02} Index" for t in tenors] data = retrieve_data(session, securities, ['PX_LAST'], start_date=datetime.date(1998, 10, 7)) @@ -110,7 +110,7 @@ def init_swap_rates(engine, session, tenors=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, sql_str = f'INSERT INTO USD_swap_fixings(fixing_date, "{t}y") ' + \ 'VALUES(%s, %s) ON CONFLICT (fixing_date)' + \ f' DO UPDATE SET "{t}y" = %s' - conn = engine.raw_connection() + with conn.cursor() as c: c.executemany(sql_str, [(d, r, r) for d, r in data[ticker]['PX_LAST'].items()]) @@ -136,11 +136,11 @@ def split_tenor_expiry(ticker): tenor = int(tenor) return expiry, tenor -def insert_swaption_vol(data, engine): +def insert_swaption_vol(data, conn): + tenors = ['A', 'C', 'F', 'I'] + list(range(1, 11)) + [15, 20, 25, 30] df = pd.concat(data, axis=1) df.columns = df.columns.get_level_values(0) df.columns = pd.MultiIndex.from_tuples([split_tenor_expiry(c) for c in df.columns]) - conn = engine.raw_connection() for t in tenors[4:]: sql_str = f'INSERT INTO swaption_vol(date, "{t}y") ' + \ @@ -151,7 +151,7 @@ def insert_swaption_vol(data, engine): if np.all(np.isnan(v.values)): continue c.execute(sql_str, (k, v.tolist(), v.tolist())) - conn.commit() + conn.commit() def update_swaption_vol(session, engine, tenors=['A', 'C', 'F', 'I'] + list(range(1, 11)) + [15, 20, 25, 30]): @@ -181,11 +181,10 @@ def update_swaption_vol(session, engine, c.execute(sql_str, (dates[0], r, r)) conn.commit() -def update_swap_rates(engine, session, +def update_swap_rates(conn, session, tenors=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 30]): securities = [f"USISDA{t:02} Index" for t in tenors] data = retrieve_data(session, securities, ['PX_LAST', 'LAST_UPDATE_DT']) - conn = engine.raw_connection() for t in tenors: ticker = f"USISDA{t:02} Index" sql_str = f'INSERT INTO USD_swap_fixings(fixing_date, "{t}y") ' + \ @@ -195,9 +194,8 @@ def update_swap_rates(engine, session, c.execute(sql_str, data[ticker]) conn.commit() -def populate_cashflow_history(engine, session, workdate=None): +def populate_cashflow_history(conn, session, workdate=None): securities = get_list(engine, workdate) - conn = engine.raw_connection() data = retrieve_data(session, securities.index.tolist(), ['HIST_CASH_FLOW', 'MTG_HIST_CPN', 'FLT_CPN_HIST', 'HIST_INTEREST_DISTRIBUTED']) fixed_coupons = {'XS0306416982 Mtge': 7.62, @@ -239,18 +237,24 @@ def populate_cashflow_history(engine, session, workdate=None): conn.commit() if __name__=="__main__": - from db import dbengine - dawn_engine = dbengine('dawndb') - serenitas_engine = dbengine('serenitasdb') + serenitas_conn = psycopg2.connect(database="serenitasdb", + user="serenitas_user", + host="debian") + dawn_conn = psycopg2.connect(database="dawndb", + user="dawn_user", + host="debian") if len(sys.argv) > 1: workdate = pd.Timestamp(sys.argv[1]) else: workdate = pd.datetime.today() with init_bbg_session(BBG_IP) as session: - update_securities(dawn_engine, session, workdate) - populate_cashflow_history(dawn_engine, session, workdate) - update_fx(dawn_engine, session, ['EURUSD', 'CADUSD']) - update_swap_rates(serenitas_engine, session) - update_swaption_vil(serenitas_engine, session) + # update_securities(dawn_conn, session, workdate) + # populate_cashflow_history(dawn_conn, session, workdate) + # update_fx(dawn_conn, session, ['EURUSD', 'CADUSD']) + update_swap_rates(serenitas_conn, session) + #update_swaption_vol(serenitas_conn, session) # with init_bbg_session(BBG_IP) as session: # init_fx(session, engine, pd.datetime(2013, 1, 1)) + with init_bbg_session(BBG_IP) as session: + data = init_swaption_vol(session) + insert_swaption_vol(data, serenitas_conn) |
