diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/position.py | 113 |
1 files changed, 67 insertions, 46 deletions
diff --git a/python/position.py b/python/position.py index 3ecab213..b5b5c2c5 100644 --- a/python/position.py +++ b/python/position.py @@ -5,13 +5,20 @@ from pandas.tseries.offsets import BDay from pandas import bdate_range import re import os +import logging -def get_list(workdate, asset_class=None, include_unsettled=True): - positions = pd.read_sql_query("select * from list_positions(%s, %s, %s)", - engine, - params=(workdate.date(), asset_class, include_unsettled)) - positions.loc[positions.identifier.str.len() <= 11, 'cusip'] = positions.identifier.str.slice(stop=9) - positions.loc[positions.identifier.str.len() == 12, 'isin'] = positions.identifier +def get_list(engine, workdate=None, asset_class=None, include_unsettled=True): + if workdate: + positions = pd.read_sql_query("select identifier, bbg_type from list_positions(%s, %s, %s)", + engine, + params=(workdate.date(), asset_class, include_unsettled)) + positions.loc[positions.identifier.str.len() <= 11, 'cusip'] = positions.identifier.str.slice(stop=9) + positions.loc[positions.identifier.str.len() == 12, 'isin'] = positions.identifier + else: + positions = pd.read_sql_table("securities", engine) + positions['bbg_id'] = positions.cusip.where(positions.cusip.notnull(), positions['isin']) + \ + ' ' + positions.bbg_type + positions.set_index('bbg_id', inplace=True) return positions def backpopulate_marks(begin_str='2015-01-15', end_str='2015-07-15'): @@ -34,15 +41,20 @@ def backpopulate_marks(begin_str='2015-01-15', end_str='2015-07-15'): positions = positions.drop_duplicates() positions.to_sql('position', engine, if_exists='append', index=False) -def update_securities(session, fields): - securities = pd.read_sql_table("securities", engine) - securities['bbg_id'] = securities.cusip.where(securities.cusip.notnull(), securities['isin']) + \ - ' ' + securities.bbg_type - data = retrieve_data(session, securities.bbg_id.tolist(), fields) - df = pd.DataFrame.from_dict(data, orient='index') - m = securities.merge(df, left_on='bbg_id', right_index=True) - m.START_ACC_DT = m.START_ACC_DT.where(m.bbg_type=='Mtge', m.PREV_CPN_DT) - return m +def update_securities(engine, session): + field = {'Corp': 'PREV_CPN_DT', 'Mtge': 'START_ACC_DT'} + all_securities = get_list(engine) + conn = engine.raw_connection() + for bbg_type in ['Corp', 'Mtge']: + securities = all_securities[all_securities.index.str.endswith(bbg_type)] + data = retrieve_data(session, securities.index.tolist(), [field[bbg_type], 'CUR_CPN']) + data = pd.DataFrame.from_dict(data, orient='index') + m = securities.merge(data, left_index=True, right_index=True) + with conn.cursor() as c: + c.executemany("UPDATE securities SET start_accrued_date=%({0})s " + ",coupon=%(CUR_CPN)s WHERE identifier=%(identifier)s".format(field[bbg_type]), + m.to_dict('records')) + conn.commit() def init_fx(session, engine, startdate): currencies = ['EURUSD', 'CADUSD'] @@ -53,7 +65,7 @@ def init_fx(session, engine, startdate): 'PX_LAST_y': 'cadusd'}, inplace=True) data.to_sql('fx', engine, index=False, if_exists='append') -def update_fx(session, conn, currencies): +def update_fx(engine, session, currencies): securities = [c + ' Curncy' for c in currencies] data = retrieve_data(session, securities, ['FIXED_CLOSING_PRICE_NY']) colnames = ['date'] @@ -64,35 +76,50 @@ def update_fx(session, conn, currencies): values.append(v['FIXED_CLOSING_PRICE_NY']) sqlstr = 'INSERT INTO fx({0}) VALUES({1})'.format(",".join(colnames), ",".join(["%s"]*len(values))) + conn = engine.raw_connection() with conn.cursor() as c: c.execute(sqlstr, values) conn.commit() -def populate_cashflow_history(session, conn, workdate): - securities = get_list(workdate) - securities['bbg_id'] = securities.cusip.where(securities.cusip.notnull(), securities['isin']) + \ - ' ' + securities.bbg_type - securities.set_index('bbg_id', inplace=True) - #we also download MTG_HIST_CPN because the data is more accurate - data = retrieve_data(session, securities.index.tolist(), ['HIST_CASH_FLOW', 'MTG_HIST_CPN']) +def populate_cashflow_history(engine, session, workdate=None): + securities = get_list(engine, workdate) + conn = engine.raw_connection() + data = retrieve_data(session, securities.index.tolist(), ['HIST_CASH_FLOW', 'MTG_HIST_CPN', + 'FLT_CPN_HIST', 'HIST_INTEREST_DISTRIBUTED']) + fixed_coupons = {'XS0306416982 Mtge': 7.62, + '91927RAD1 Mtge': 6.77} for k, v in data.items(): - hist_cf = v.get('HIST_CASH_FLOW') - hist_cpn = v.get('MTG_HIST_CPN') - if hist_cf is not None: - identifier = securities.loc[k,'identifier'] - hist_cf['identifier'] = identifier - to_insert = hist_cf.merge(hist_cpn, left_on='Payment Date', right_on='Payment Date') - with conn.cursor() as c: - c.execute("DELETE FROM cashflow_history WHERE identifier=%s", (identifier,)) - conn.commit() + if 'HIST_CASH_FLOW' in v: + to_insert = v['HIST_CASH_FLOW'].merge(v['MTG_HIST_CPN'], + left_on='Payment Date', + right_on='Payment Date') to_insert.rename(columns={'Coupon_y': 'coupon', 'Interest': 'interest', 'Payment Date': 'date', 'Principal Balance': 'principal_bal', 'Principal Paid': 'principal'}, inplace=True) - to_insert[['identifier', 'date', 'principal_bal', 'principal', - 'interest','coupon']].to_sql('cashflow_history', - engine, if_exists='append', index=False) + to_insert.drop(['Period Number', 'Coupon_x'], axis=1, inplace=True) + elif 'FLT_CPN_HIST' in v: + to_insert = v['FLT_CPN_HIST'] + to_insert.rename(columns={'Coupon Rate': 'coupon', + 'Accrual Start Date': 'date'}, inplace=True) + to_insert.coupon = to_insert.coupon.shift(1) + elif 'HIST_INTEREST_DISTRIBUTED' in v: + to_insert = v['HIST_INTEREST_DISTRIBUTED'] + to_insert.rename(columns={'Interest': 'interest', + 'Historical Date': 'date'}, inplace=True) + if k in fixed_coupons: + to_insert['coupon'] = fixed_coupons[k] + else: #damn you XS0299146992 ! + continue + else: + logging.error("No cashflows for the given security") + identifier = securities.loc[k,'identifier'] + to_insert['identifier'] = identifier + with conn.cursor() as c: + c.execute("DELETE FROM cashflow_history WHERE identifier=%s", (identifier,)) + conn.commit() + to_insert.to_sql('cashflow_history', engine, if_exists='append', index=False) with conn.cursor() as c: c.execute("REFRESH MATERIALIZED VIEW factors_history") conn.commit() @@ -100,15 +127,9 @@ def populate_cashflow_history(session, conn, workdate): if __name__=="__main__": engine = create_engine('postgresql://dawn_user@debian/dawndb') workdate = pd.datetime.today() - conn = engine.raw_connection() with init_bbg_session(BBG_IP) as session: - df = update_securities(session, ['START_ACC_DT', 'CUR_CPN', 'PREV_CPN_DT']) - populate_cashflow_history(session, conn, workdate) - update_fx(session, conn, ['EURUSD', 'CADUSD']) - with conn.cursor() as c: - c.executemany("UPDATE securities SET start_accrued_date=%(START_ACC_DT)s " - ",coupon=%(CUR_CPN)s WHERE identifier=%(identifier)s", - df.to_dict('records')) - conn.commit() - # with init_bbg_session(BBG_IP) as session: - # init_fx(session, engine, pd.datetime(2013, 1, 1)) + update_securities(engine, session) + populate_cashflow_history(engine, session, workdate) + update_fx(engine, session, ['EURUSD', 'CADUSD']) + # # with init_bbg_session(BBG_IP) as session: + # # init_fx(session, engine, pd.datetime(2013, 1, 1)) |
