1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP
import pandas as pd
from sqlalchemy import create_engine
from pandas.tseries.offsets import BDay
from pandas import bdate_range
import re
import os
def get_list(workdate, asset_class=None, include_unsettled=True):
positions = pd.read_sql_query("select * from list_positions(%s, %s, %s)",
engine,
params=(workdate.date(), asset_class, include_unsettled))
positions.loc[positions.identifier.str.len() <= 11, 'cusip'] = positions.identifier.str.slice(stop=9)
positions.loc[positions.identifier.str.len() == 12, 'isin'] = positions.identifier
return positions
def backpopulate_marks(begin_str='2015-01-15', end_str='2015-07-15'):
pattern = re.compile("\d{4}-\d{2}-\d{2}")
list_of_daily_folder = (fullpath for (fullpath, _, _) in os.walk('/home/share/Daily')
if pattern.match(os.path.basename(fullpath)))
list_of_bdays = bdate_range(start=begin_str, end=end_str)
for path in list_of_daily_folder:
date = pd.to_datetime(os.path.basename(path))
if date in list_of_bdays:
marks_file = [f for f in os.listdir(path) if f.startswith("securitiesNpv")]
if marks_file:
marks_file.sort(key=lambda x:x[13:], reverse=True) #sort by lexicographic order which is what we want since we use ISO dates
marks = pd.read_csv(os.path.join(path, marks_file[0]))
positions = get_list(pd.to_datetime(date))
positions = positions.merge(marks, left_on='identifier', right_on='IDENTIFIER')
positions.drop(['IDENTIFIER', 'last_settle_date'], axis=1, inplace=True)
positions['date'] = date
positions.rename(columns={'Price': 'price'}, inplace=True)
positions = positions.drop_duplicates()
positions.to_sql('position', engine, if_exists='append', index=False)
def update_securities(session, fields):
securities = pd.read_sql_table("securities", engine)
securities['bbg_id'] = securities.cusip.where(securities.cusip.notnull(), securities['isin']) + \
' ' + securities.bbg_type
data = retrieve_data(session, securities.bbg_id.tolist(), fields)
df = pd.DataFrame.from_dict(data, orient='index')
m = securities.merge(df, left_on='bbg_id', right_index=True)
m.START_ACC_DT = m.START_ACC_DT.where(m.bbg_type=='Mtge', m.PREV_CPN_DT)
return m
def init_fx(session, engine, startdate):
currencies = ['EURUSD', 'CADUSD']
securities = [c + ' Curncy' for c in currencies]
data = retrieve_data(session, securities, ['PX_LAST'], start_date=startdate)
data = data['EURUSD Curncy'].merge(data['CADUSD Curncy'], left_on='date', right_on='date')
data.rename(columns={'PX_LAST_x': 'eurusd',
'PX_LAST_y': 'cadusd'}, inplace=True)
data.to_sql('fx', engine, index=False, if_exists='append')
def update_fx(session, conn, currencies):
securities = [c + ' Curncy' for c in currencies]
data = retrieve_data(session, securities, ['FIXED_CLOSING_PRICE_NY'])
colnames = ['date']
values = [pd.datetime.today()]
for k, v in data.items():
currency_pair = k.split(' ')[0].lower()
colnames.append(currency_pair)
values.append(v['FIXED_CLOSING_PRICE_NY'])
sqlstr = 'INSERT INTO fx({0}) VALUES({1})'.format(",".join(colnames),
",".join(["%s"]*len(values)))
with conn.cursor() as c:
c.execute(sqlstr, values)
conn.commit()
def populate_cashflow_history(session, conn, workdate):
securities = get_list(workdate)
securities['bbg_id'] = securities.cusip.where(securities.cusip.notnull(), securities['isin']) + \
' ' + securities.bbg_type
securities.set_index('bbg_id', inplace=True)
#we also download MTG_HIST_CPN because the data is more accurate
data = retrieve_data(session, securities.index.tolist(), ['HIST_CASH_FLOW', 'MTG_HIST_CPN'])
for k, v in data.items():
hist_cf = v.get('HIST_CASH_FLOW')
hist_cpn = v.get('MTG_HIST_CPN')
if hist_cf is not None:
identifier = securities.loc[k,'identifier']
hist_cf['identifier'] = identifier
to_insert = hist_cf.merge(hist_cpn, left_on='Payment Date', right_on='Payment Date')
with conn.cursor() as c:
c.execute("DELETE FROM cashflow_history WHERE identifier=%s", (identifier,))
conn.commit()
to_insert.rename(columns={'Coupon_y': 'coupon',
'Interest': 'interest',
'Payment Date': 'date',
'Principal Balance': 'principal_bal',
'Principal Paid': 'principal'}, inplace=True)
to_insert[['identifier', 'date', 'principal_bal', 'principal',
'interest','coupon']].to_sql('cashflow_history',
engine, if_exists='append', index=False)
with conn.cursor() as c:
c.execute("REFRESH MATERIALIZED VIEW factors_history")
conn.commit()
if __name__=="__main__":
engine = create_engine('postgresql://dawn_user@debian/dawndb')
workdate = pd.datetime.today()
conn = engine.raw_connection()
with init_bbg_session(BBG_IP) as session:
df = update_securities(session, ['START_ACC_DT', 'CUR_CPN', 'PREV_CPN_DT'])
populate_cashflow_history(session, conn, workdate)
update_fx(session, conn, ['EURUSD', 'CADUSD'])
with conn.cursor() as c:
c.executemany("UPDATE securities SET start_accrued_date=%(START_ACC_DT)s "
",coupon=%(CUR_CPN)s WHERE identifier=%(identifier)s",
df.to_dict('records'))
conn.commit()
# with init_bbg_session(BBG_IP) as session:
# init_fx(session, engine, pd.datetime(2013, 1, 1))
|