aboutsummaryrefslogtreecommitdiffstats
path: root/python/bbg_helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/bbg_helpers.py')
-rw-r--r--python/bbg_helpers.py129
1 files changed, 71 insertions, 58 deletions
diff --git a/python/bbg_helpers.py b/python/bbg_helpers.py
index 9c34899c..5c2cd4b5 100644
--- a/python/bbg_helpers.py
+++ b/python/bbg_helpers.py
@@ -28,7 +28,58 @@ def init_bbg_session(ipaddr, port=8194):
finally:
session.stop()
-def retreive_data(session, securities, fields, settle_date=None,
+def append_overrides(request, override):
+ overrides = request.getElement('overrides')
+ for k, v in override.items():
+ o = overrides.appendElement()
+ o.setElement("fieldId", k)
+ if isinstance(v, pd.datetime):
+ o.setElement("value", "{0:%Y%m%d}".format(v))
+ else:
+ o.setElement("value", v)
+
+def event_loop(session, request):
+ session.sendRequest(request)
+ # Process received events
+ while(True):
+ # We provide timeout to give the chance to Ctrl+C handling:
+ ev = session.nextEvent(500)
+ if ev.eventType() in [blpapi.Event.PARTIAL_RESPONSE, blpapi.Event.RESPONSE]:
+ for msg in ev:
+ yield msg
+ # Response completely received, so we can exit
+ if ev.eventType() == blpapi.Event.RESPONSE:
+ raise StopIteration
+
+def field_array_todf(field):
+ df = pd.DataFrame.from_dict([{str(e.name()): e.getValue() for e in f.elements()} \
+ for f in field.values()])
+ return df.convert_objects(convert_dates='coerce')
+
+def process_historical_msg(msg):
+ securityData = msg.getElement("securityData")
+ securityName = securityData.getElementValue("security")
+ fieldData = securityData.getElement("fieldData")
+ return {securityName: field_array_todf(fieldData)}
+
+def process_reference_msg(msg):
+ data = {}
+ securityDataArray = msg.getElement("securityData")
+ for securityData in securityDataArray.values():
+ securityName = securityData.getElementValue("security")
+ row = {}
+ fieldData = securityData.getElement("fieldData")
+ for field in fieldData.elements():
+ if not field.isValid():
+ logger.info("Invalid field: {0}".format(str(field)))
+ elif field.isArray():
+ row[str(field.name())] = field_array_todf(field) #to convert dates to timestamps
+ else:
+ row[str(field.name())] = field.getValue()
+ data[securityName] = row
+ return data
+
+def retreive_data(session, securities, fields, overrides={},
start_date=None, end_date=None):
refDataService = session.getService("//blp/refdata")
if start_date:
@@ -39,67 +90,29 @@ def retreive_data(session, securities, fields, settle_date=None,
request.append("securities", security)
for field in fields:
request.append("fields", field)
- if settle_date:
- overrides = request.getElement('overrides')
- o = overrides.appendElement()
- o.setElement("fieldId", "SETTLE_DT")
- o.setElement("value", "{0:%Y%m%d}".format(settle_date))
+ if overrides:
+ append_overrides(request, overrides)
if start_date:
request.set("startDate", "{0:%Y%m%d}".format(start_date))
- request.set("endDate", "{0:%Y%m%d}".format(end_date))
+ if end_date:
+ request.set("endDate", "{0:%Y%m%d}".format(end_date))
- session.sendRequest(request)
- data = []
- # Process received events
- while(True):
- # We provide timeout to give the chance to Ctrl+C handling:
- ev = session.nextEvent(500)
- if ev.eventType() in [blpapi.Event.PARTIAL_RESPONSE, blpapi.Event.RESPONSE]:
- for msg in ev:
- data.append(msg)
- # Response completely received, so we could exit
- if ev.eventType() == blpapi.Event.RESPONSE:
- break
- return data
-
-def process_msgs(data):
- # return a dict whose keys are the bloomberg securities
- # and whose values are a dict of bloomberg fields -> values
- # the values can either be a scalar or a dataframe
- newdata = {}
- for msg in data:
+ data = {}
+ for msg in event_loop(session, request):
if msg.messageType() == blpapi.Name("ReferenceDataResponse"):
- securityDataArray = msg.getElement("securityData")
- for securityData in securityDataArray.values():
- securityName = securityData.getElementValue("security")
- row = {}
- fieldData = securityData.getElement("fieldData")
- for field in fieldData.elements():
- if not field.isValid():
- logger.info("Invalid field: {0}".format(str(field)))
- elif field.isArray():
- df = pd.DataFrame.from_dict(
- [{str(e.name()): e.getValue() for e in f.elements()} \
- for f in field.values()])
- row[str(field.name())] = df.convert_objects(convert_dates='coerce') #to convert dates to timestamps
- else:
- row[str(field.name())] = field.getValue()
- newdata[securityName] = row
- if msg.messageType() == blpapi.Name("HistoricalDataResponse"):
- securityData = msg.getElement("securityData")
- securityName = securityData.getElementValue("security")
- fieldData = securityData.getElement("fieldData")
- df = pd.DataFrame.from_dict([{str(e.name()): e.getValue() for e in f.elements()} \
- for f in fieldData.values()])
- df.convert_objects(convert_dates='coerce')
- newdata[securityName] = df
- return newdata
+ data.update(process_reference_msg(msg))
+ elif msg.messageType() == blpapi.Name("HistoricalDataResponse"):
+ data.update(process_historical_msg(msg))
+ return data
if __name__=="__main__":
- startdate = pd.datetime(2013, 1, 1)
- enddate = pd.datetime(2015, 8, 7)
- securities = ['CADUSD Curncy', "EURUSD Curncy"]
- fields = ['PX_LAST']
+ testdate = pd.datetime(2015, 1, 1)
+ hist_securities = ['CADUSD Curncy', "EURUSD Curncy"]
+ hist_fields = ['PX_LAST']
+ securities = ['38145BAA9 Mtge', '75157EAE2 Mtge']
+ fields = ['CUR_CPN', 'START_ACC_DT']
with init_bbg_session('192.168.0.4', 8194) as session:
- data = retreive_data(session, securities, fields, start_date=startdate, end_date=enddate)
- test = process_msgs(data)
+ hist_data = retreive_data(session, hist_securities, hist_fields, start_date=testdate)
+ overrides={'SETTLE_DT': testdate}
+ ref_data = retreive_data(session, securities, fields, overrides=overrides)
+ struct_data = retreive_data(session, securities, ["HIST_CASH_FLOW"])