aboutsummaryrefslogtreecommitdiffstats
path: root/python/bbg_helpers.py
blob: 26facc6f416f22113007bca271077116e2dd8ed8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import blpapi
import sys
import pandas as pd
from contextlib import contextmanager
import logging
import datetime

logger = logging.getLogger(__name__)

BBG_IP = ['192.168.9.61', '192.168.9.65', '192.168.0.10', '192.168.0.12']

@contextmanager
def init_bbg_session(ip_list, port=8194):
    sessionOptions = blpapi.SessionOptions()
    sessionOptions.setServerPort(port)

    for ip in ip_list:
        # Start a Session
        sessionOptions.setServerHost(ip)
        session = blpapi.Session(sessionOptions)
        if not session.start():
            logger.info("Failed to open session with {0}".format(ip))
            continue
        try:
            if not session.openService("//blp/refdata"):
                raise NameError("Failed to open //blp/refdata")
            logger.info("Logged in on {0}".format(ip))
            yield session
            break
        except NameError as e:
            logger.error(str(e))
            raise
        finally:
            session.stop()

def append_overrides(request, d):
    overrides = request.getElement('overrides')
    for k, v in d.items():
        o = overrides.appendElement()
        o.setElement("fieldId", k)
        if isinstance(v, datetime.date):
            o.setElement("value", "{0:%Y%m%d}".format(v))
        else:
            o.setElement("value", v)

def event_loop(session, request):
    session.sendRequest(request)
    # Process received events
    while(True):
        # We provide timeout to give the chance to Ctrl+C handling:
        ev = session.nextEvent(500)
        if ev.eventType() in [blpapi.Event.PARTIAL_RESPONSE, blpapi.Event.RESPONSE]:
            for msg in ev:
                yield msg
        # Response completely received, so we can exit
        if ev.eventType() == blpapi.Event.RESPONSE:
            return

def get_pythonvalue(e):
    if e.datatype() in [blpapi.DataType.DATE, blpapi.DataType.DATETIME]:
        return pd.to_datetime(e.getValue())
    else:
        return e.getValue()

def field_array_todf(field):
    df = pd.DataFrame.from_dict([{str(e.name()): get_pythonvalue(e) for e in f.elements()} \
                                             for f in field.values()])
    if "date" in df:
        df = df.set_index('date')
    return df

def process_historical_msg(msg):
    securityData = msg.getElement("securityData")
    securityName = securityData.getElementValue("security")
    fieldData = securityData.getElement("fieldData")
    return {securityName: field_array_todf(fieldData)}

def process_reference_msg(msg):
    data = {}
    securityDataArray = msg.getElement("securityData")
    for securityData in securityDataArray.values():
        securityName = securityData.getElementValue("security")
        row = {}
        fieldData = securityData.getElement("fieldData")
        for field in fieldData.elements():
            if not field.isValid():
                logger.info("Invalid field: {0}".format(str(field)))
            elif field.isArray():
                row[str(field.name())] = field_array_todf(field) #to convert dates to timestamps
            else:
                row[str(field.name())] = get_pythonvalue(field)
        data[securityName] = row
    return data

def retrieve_data(session, securities, fields, overrides={},
                  start_date=None, end_date=None, frequency="DAILY"):
    """
    Convenience function to retrieve data from the Bloomberg API.

    Parameters
    ----------
    session : blpapi session
    securities : iterable
       list of tickers (with type, e.g.: ['CADUSD Curncy', '38145BAA9 Mtge']
    start_date : datetime.date
    end_date : datetime.date
    frequency : One of "DAILY", "MONTHLY", "QUARTERLY", "YEARLY"
    """
    refDataService = session.getService("//blp/refdata")
    if start_date:
        request = refDataService.createRequest("HistoricalDataRequest")
        request.set("periodicitySelection", frequency)
    else:
        request = refDataService.createRequest("ReferenceDataRequest")
    for security in securities:
        request.append("securities", security)
    for field in fields:
        request.append("fields", field)
    if overrides:
        append_overrides(request, overrides)
    if start_date:
        request.set("startDate", "{0:%Y%m%d}".format(start_date))
        if end_date:
            request.set("endDate", "{0:%Y%m%d}".format(end_date))

    data = {}
    for msg in event_loop(session, request):
        if msg.messageType() == blpapi.Name("ReferenceDataResponse"):
            data.update(process_reference_msg(msg))
        elif msg.messageType() == blpapi.Name("HistoricalDataResponse"):
            data.update(process_historical_msg(msg))
    return data

if __name__=="__main__":
    testdate = pd.datetime(2013, 1, 1)
    hist_securities = ['CADUSD Curncy', "EURUSD Curncy"]
    hist_fields = ['PX_LAST']
    securities = ['004421BW2 Mtge', '75157EAE2 Mtge', 'XS0295516776 Mtge']
    fields = ['CUR_CPN', 'START_ACC_DT']
    with init_bbg_session(BBG_IP) as session:
        hist_data = retrieve_data(session, securities, hist_fields, start_date=testdate,
                                  frequency="MONTHLY")
        overrides={'SETTLE_DT': testdate}
        ref_data = retrieve_data(session, securities, fields, overrides=overrides)
        struct_data = retrieve_data(session, securities, ["HIST_CASH_FLOW"])