from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP import pandas as pd from sqlalchemy import create_engine from pandas.tseries.offsets import BDay from pandas import bdate_range import re import os def get_list(workdate, asset_class=None, include_unsettled=True): positions = pd.read_sql_query("select * from list_positions(%s, %s, %s)", engine, params=(workdate.date(), asset_class, include_unsettled)) positions.loc[positions.identifier.str.len() <= 11, 'cusip'] = positions.identifier.str.slice(stop=9) positions.loc[positions.identifier.str.len() == 12, 'isin'] = positions.identifier return positions def backpopulate_marks(begin_str='2015-01-15', end_str='2015-07-15'): pattern = re.compile("\d{4}-\d{2}-\d{2}") list_of_daily_folder = (fullpath for (fullpath, _, _) in os.walk('/home/share/Daily') if pattern.match(os.path.basename(fullpath))) list_of_bdays = bdate_range(start=begin_str, end=end_str) for path in list_of_daily_folder: date = pd.to_datetime(os.path.basename(path)) if date in list_of_bdays: marks_file = [f for f in os.listdir(path) if f.startswith("securitiesNpv")] if marks_file: marks_file.sort(key=lambda x:x[13:], reverse=True) #sort by lexicographic order which is what we want since we use ISO dates marks = pd.read_csv(os.path.join(path, marks_file[0])) positions = get_list(pd.to_datetime(date)) positions = positions.merge(marks, left_on='identifier', right_on='IDENTIFIER') positions.drop(['IDENTIFIER', 'last_settle_date'], axis=1, inplace=True) positions['date'] = date positions.rename(columns={'Price': 'price'}, inplace=True) positions = positions.drop_duplicates() positions.to_sql('position', engine, if_exists='append', index=False) def update_securities(session, fields): securities = pd.read_sql_table("securities", engine) securities['bbg_id'] = securities.cusip.where(securities.cusip.notnull(), securities['isin']) + \ ' ' + securities.bbg_type data = retrieve_data(session, securities.bbg_id.tolist(), fields) df = pd.DataFrame.from_dict(data, orient='index') return securities.merge(df, left_on='bbg_id', right_index=True) def init_fx(session, startdate): currencies = ['EURUSD', 'CADUSD'] securities = [c + ' Curncy' for c in currencies] data = retrieve_data(session, securities, ['PX_LAST'], start_date=startdate) data = data['CADUSD Curncy'].merge(data['CADUSD Curncy'], left_on='date', right_on='date') data.rename(columns={'PX_LAST_x': 'eurusd', 'PX_LAST_y': 'cadusd'}, inplace=True) data.to_sql('fx', engine, index=False, if_exists='append') def update_fx(session, conn, currencies): securities = [c + ' Curncy' for c in currencies] data = retrieve_data(session, securities, ['FIXED_CLOSING_PRICE_NY']) colnames = ['date'] values = [pd.datetime.today()] for k, v in data.items(): currency_pair = k.split(' ')[0].lower() colnames.append(currency_pair) values.append(v['FIXED_CLOSING_PRICE_NY']) sqlstr = 'INSERT INTO fx({0}) VALUES({1})'.format(",".join(colnames), ",".join(["%s"]*len(values))) with conn.cursor() as c: c.execute(sqlstr, values) conn.commit() def populate_cashflow_history(session, conn, workdate): securities = get_list(workdate) securities['bbg_id'] = securities.cusip.where(securities.cusip.notnull(), securities['isin']) + \ ' ' + securities.bbg_type securities.set_index('bbg_id', inplace=True) data = retrieve_data(session, securities.index.tolist(), ['HIST_CASH_FLOW']) for k, v in data.items(): to_insert = v.get('HIST_CASH_FLOW') if to_insert is not None: identifier = securities.loc[k,'identifier'] to_insert['identifier'] = identifier with conn.cursor() as c: c.execute("DELETE FROM cashflow_history WHERE identifier=%s", (identifier,)) conn.commit() to_insert.rename(columns={'Coupon': 'coupon', 'Interest': 'interest', 'Payment Date': 'date', 'Principal Balance': 'principal_bal', 'Principal Paid': 'principal'}, inplace=True) to_insert[['identifier', 'date', 'principal_bal', 'principal', 'interest','coupon']].to_sql('cashflow_history', engine, if_exists='append', index=False) with conn.cursor() as c: c.execute("REFRESH MATERIALIZED VIEW factors_history") conn.commit() if __name__=="__main__": engine = create_engine('postgresql://dawn_user@debian/dawndb') workdate = pd.datetime.today() conn = engine.raw_connection() with init_bbg_session(BBG_IP) as session: df = update_securities(session, ['START_ACC_DT', 'CUR_CPN']) populate_cashflow_history(session, conn, workdate) update_fx(session, conn, ['EURUSD', 'CADUSD']) with conn.cursor() as c: c.executemany("UPDATE securities SET start_accrued_date=%(START_ACC_DT)s " ",coupon=%(CUR_CPN)s WHERE identifier=%(identifier)s", df.to_dict('records')) conn.commit() # with init_bbg_session(BBG_IP) as session: # init_fx(session, pd.datetime(2013, 1, 1))