1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
import blpapi
import sys
import pandas as pd
from contextlib import contextmanager
import logging
import datetime
logger = logging.getLogger(__name__)
BBG_IP = ['192.168.9.61', '192.168.0.8', '192.168.0.10', '192.168.0.12']
@contextmanager
def init_bbg_session(ip_list, port=8194):
sessionOptions = blpapi.SessionOptions()
sessionOptions.setServerPort(port)
for ip in ip_list:
# Start a Session
sessionOptions.setServerHost(ip)
session = blpapi.Session(sessionOptions)
if not session.start():
logger.info("Failed to open session with {0}".format(ip))
continue
try:
if not session.openService("//blp/refdata"):
raise NameError("Failed to open //blp/refdata")
logger.info("Logged in on {0}".format(ip))
yield session
break
except NameError as e:
logger.error(str(e))
raise
finally:
session.stop()
def append_overrides(request, d):
overrides = request.getElement('overrides')
for k, v in d.items():
o = overrides.appendElement()
o.setElement("fieldId", k)
if isinstance(v, datetime.date):
o.setElement("value", "{0:%Y%m%d}".format(v))
else:
o.setElement("value", v)
def event_loop(session, request):
session.sendRequest(request)
# Process received events
while(True):
# We provide timeout to give the chance to Ctrl+C handling:
ev = session.nextEvent(500)
if ev.eventType() in [blpapi.Event.PARTIAL_RESPONSE, blpapi.Event.RESPONSE]:
for msg in ev:
yield msg
# Response completely received, so we can exit
if ev.eventType() == blpapi.Event.RESPONSE:
raise StopIteration
def field_array_todf(field):
df = pd.DataFrame.from_dict([{str(e.name()): e.getValue() for e in f.elements()} \
for f in field.values()])
return df.convert_objects(convert_dates='coerce')
def process_historical_msg(msg):
securityData = msg.getElement("securityData")
securityName = securityData.getElementValue("security")
fieldData = securityData.getElement("fieldData")
return {securityName: field_array_todf(fieldData)}
def process_reference_msg(msg):
data = {}
securityDataArray = msg.getElement("securityData")
for securityData in securityDataArray.values():
securityName = securityData.getElementValue("security")
row = {}
fieldData = securityData.getElement("fieldData")
for field in fieldData.elements():
if not field.isValid():
logger.info("Invalid field: {0}".format(str(field)))
elif field.isArray():
row[str(field.name())] = field_array_todf(field) #to convert dates to timestamps
else:
row[str(field.name())] = field.getValue()
data[securityName] = row
return data
def retrieve_data(session, securities, fields, overrides={},
start_date=None, end_date=None):
refDataService = session.getService("//blp/refdata")
if start_date:
request = refDataService.createRequest("HistoricalDataRequest")
else:
request = refDataService.createRequest("ReferenceDataRequest")
for security in securities:
request.append("securities", security)
for field in fields:
request.append("fields", field)
if overrides:
append_overrides(request, overrides)
if start_date:
request.set("startDate", "{0:%Y%m%d}".format(start_date))
if end_date:
request.set("endDate", "{0:%Y%m%d}".format(end_date))
data = {}
for msg in event_loop(session, request):
if msg.messageType() == blpapi.Name("ReferenceDataResponse"):
data.update(process_reference_msg(msg))
elif msg.messageType() == blpapi.Name("HistoricalDataResponse"):
data.update(process_historical_msg(msg))
return data
if __name__=="__main__":
testdate = pd.datetime(2013, 1, 1)
hist_securities = ['CADUSD Curncy', "EURUSD Curncy"]
hist_fields = ['PX_LAST']
securities = ['38145BAA9 Mtge', '75157EAE2 Mtge', 'XS0295516776 Mtge']
fields = ['CUR_CPN', 'START_ACC_DT']
with init_bbg_session(BBG_IP) as session:
hist_data = retrieve_data(session, hist_securities, hist_fields, start_date=testdate)
overrides={'SETTLE_DT': testdate}
ref_data = retrieve_data(session, securities, fields, overrides=overrides)
struct_data = retrieve_data(session, securities, ["HIST_CASH_FLOW"])
|