diff options
Diffstat (limited to 'python/position.py')
| -rw-r--r-- | python/position.py | 126 |
1 files changed, 116 insertions, 10 deletions
diff --git a/python/position.py b/python/position.py index 555f7a17..ded45f5a 100644 --- a/python/position.py +++ b/python/position.py @@ -1,8 +1,11 @@ from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP +import datetime +import numpy as np import pandas as pd -from sqlalchemy import create_engine +import psycopg2 from pandas.tseries.offsets import BDay from pandas import bdate_range +from sqlalchemy import create_engine import re import os import logging @@ -59,12 +62,12 @@ def update_securities(engine, session, workdate): field = {'Corp': 'PREV_CPN_DT', 'Mtge': 'START_ACC_DT'} securities = get_list(engine) securities = securities[securities.paid_down.isnull()] - conn = engine.raw_connection() data = retrieve_data(session, securities.index.tolist(), ['PREV_CPN_DT', 'START_ACC_DT', 'CUR_CPN', 'CPN_ASOF_DT']) data = pd.DataFrame.from_dict(data, orient='index') data = data[data.CPN_ASOF_DT.isnull() |(data.CPN_ASOF_DT<=workdate)] m = securities.merge(data, left_index=True, right_index=True) + conn = engine.raw_connection() with conn.cursor() as c: for r in m.to_dict('records'): if r[field[r['bbg_type']]] < workdate: @@ -72,6 +75,7 @@ def update_securities(engine, session, workdate): ",coupon=%(CUR_CPN)s WHERE identifier=%(identifier)s".format(field[r['bbg_type']]), r) conn.commit() + conn.close() def init_fx(session, engine, startdate): currencies = ['EURUSD', 'CADUSD'] @@ -82,7 +86,7 @@ def init_fx(session, engine, startdate): 'PX_LAST_y': 'cadusd'}, inplace=True) data.to_sql('fx', engine, index=False, if_exists='append') -def update_fx(engine, session, currencies): +def update_fx(conn, session, currencies): securities = [c + ' Curncy' for c in currencies] data = retrieve_data(session, securities, ['FIXED_CLOSING_PRICE_NY', 'PX_CLOSE_DT']) colnames = ['date'] @@ -95,18 +99,110 @@ def update_fx(engine, session, currencies): sqlstr = 'INSERT INTO fx({0}) VALUES({1}) ON CONFLICT DO NOTHING'.format( ",".join(colnames), ",".join(["%s"]*len(values))) - conn = engine.raw_connection() + with conn.cursor() as c: c.execute(sqlstr, values) conn.commit() +def init_swap_rates(conn, session, tenors=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 30]): + securities = [f"USISDA{t:02} Index" for t in tenors] + data = retrieve_data(session, securities, ['PX_LAST'], + start_date=datetime.date(1998, 10, 7)) + for t in tenors: + ticker = f"USISDA{t:02} Index" + sql_str = f'INSERT INTO USD_swap_fixings(fixing_date, "{t}y") ' + \ + 'VALUES(%s, %s) ON CONFLICT (fixing_date)' + \ + f' DO UPDATE SET "{t}y" = %s' + + with conn.cursor() as c: + c.executemany(sql_str, + [(d, r, r) for d, r in data[ticker]['PX_LAST'].items()]) + conn.commit() + +def init_swaption_vol(session, + tenors=['A', 'C', 'F', 'I'] + list(range(1, 11)) + [15, 20, 25, 30]): + tickers = [] + for t1 in tenors: + for t2 in tenors[4:]: + tickers.append(f"USSN{t1:0>2}{t2} Curncy") + data = retrieve_data(session, tickers, ['PX_LAST'], + start_date=datetime.date(1998, 10, 7)) + return data + +def split_tenor_expiry(ticker): + m = re.match("USSN(.{2})([^\s]*) Curncy", ticker) + expiry, tenor = m.groups() + if expiry[0] == '0': + expiry = expiry[1:] + if not expiry.isalpha(): + expiry = int(expiry) + tenor = int(tenor) + return expiry, tenor + +def insert_swaption_vol(data, conn): + tenors = ['A', 'C', 'F', 'I'] + list(range(1, 11)) + [15, 20, 25, 30] + df = pd.concat(data, axis=1) + df.columns = df.columns.get_level_values(0) + df.columns = pd.MultiIndex.from_tuples([split_tenor_expiry(c) for c in df.columns]) + + for t in tenors[4:]: + sql_str = f'INSERT INTO swaption_vol(date, "{t}y") ' + \ + 'VALUES(%s, %s) ON CONFLICT (date)' + \ + f' DO UPDATE SET "{t}y" = %s' + with conn.cursor() as c: + for k, v in df.xs(t, axis=1, level=1)[tenors].iterrows(): + if np.all(np.isnan(v.values)): + continue + c.execute(sql_str, (k, v.tolist(), v.tolist())) + conn.commit() + +def update_swaption_vol(conn, session, + tenors=['A', 'C', 'F', 'I'] + list(range(1, 11)) + [15, 20, 25, 30]): + tickers = [] + for expiry in tenors: + for tenor in tenors: + tickers.append(f"USSN{expiry:0>2}{tenor} Curncy") + data = retrieve_data(session, tickers, ['PX_YEST_CLOSE', 'PX_CLOSE_DT']) + for t in tenors[4:]: + sql_str = f'INSERT INTO swaption_vol(date, "{t}y") ' + \ + 'VALUES(%s, %s) ON CONFLICT (date)' + \ + f' DO UPDATE SET "{t}y" = %s' + r = [] + dates = [] + for expiry in tenors: + ticker = f"USSN{expiry:0>2}{t} Curncy" + if data[ticker]: + r.append(data[ticker]['PX_YEST_CLOSE']) + dates.append(data[ticker]['PX_CLOSE_DT']) + else: + r.append(None) + dates.append(dates[-1]) + if dates.count(dates[0]) < len(dates): + raise ValueError('Not all quotes are from the same date') + with conn.cursor() as c: + c.execute(sql_str, (dates[0], r, r)) + conn.commit() + +def update_swap_rates(conn, session, + tenors=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 30]): + securities = [f"USISDA{t:02} Index" for t in tenors] + data = retrieve_data(session, securities, ['PX_LAST', 'LAST_UPDATE_DT']) + for t in tenors: + ticker = f"USISDA{t:02} Index" + sql_str = f'INSERT INTO USD_swap_fixings(fixing_date, "{t}y") ' + \ + 'VALUES(%(LAST_UPDATE_DT)s, %(PX_LAST)s) ON CONFLICT (fixing_date)' + \ + f' DO UPDATE SET "{t}y" = %(PX_LAST)s' + with conn.cursor() as c: + c.execute(sql_str, data[ticker]) + conn.commit() + def populate_cashflow_history(engine, session, workdate=None): securities = get_list(engine, workdate) - conn = engine.raw_connection() data = retrieve_data(session, securities.index.tolist(), ['HIST_CASH_FLOW', 'MTG_HIST_CPN', 'FLT_CPN_HIST', 'HIST_INTEREST_DISTRIBUTED']) fixed_coupons = {'XS0306416982 Mtge': 7.62, '91927RAD1 Mtge': 6.77} + conn = engine.raw_connection() for k, v in data.items(): if 'HIST_CASH_FLOW' in v: to_insert = v['HIST_CASH_FLOW'].merge(v['MTG_HIST_CPN'], how='left', @@ -142,16 +238,26 @@ def populate_cashflow_history(engine, session, workdate=None): with conn.cursor() as c: c.execute("REFRESH MATERIALIZED VIEW factors_history") conn.commit() + conn.close() if __name__=="__main__": - engine = create_engine('postgresql://dawn_user@debian/dawndb') - if len(sys.argv)>1: + serenitas_conn = psycopg2.connect(database="serenitasdb", + user="serenitas_user", + host="debian") + dawn_engine = create_engine('postgresql://dawn_user@debian/dawndb') + dawn_conn = dawn_engine.raw_connection() + if len(sys.argv) > 1: workdate = pd.Timestamp(sys.argv[1]) else: workdate = pd.datetime.today() with init_bbg_session(BBG_IP) as session: - update_securities(engine, session, workdate) - populate_cashflow_history(engine, session, workdate) - update_fx(engine, session, ['EURUSD', 'CADUSD']) + update_securities(dawn_engine, session, workdate) + populate_cashflow_history(dawn_engine, session, workdate) + update_fx(dawn_conn, session, ['EURUSD', 'CADUSD']) + update_swap_rates(serenitas_conn, session) + update_swaption_vol(serenitas_conn, session) # with init_bbg_session(BBG_IP) as session: # init_fx(session, engine, pd.datetime(2013, 1, 1)) + # with init_bbg_session(BBG_IP) as session: + # data = init_swaption_vol(session) + # insert_swaption_vol(data, serenitas_conn) |
