diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/db.py | 42 | ||||
| -rw-r--r-- | python/import_quotes.py | 3 | ||||
| -rw-r--r-- | python/intex/intex_scenarios.py | 2 | ||||
| -rw-r--r-- | python/intex/load_indicative.py | 17 | ||||
| -rw-r--r-- | python/intex/load_intex_collateral.py | 7 | ||||
| -rw-r--r-- | python/markit_cds.py | 3 | ||||
| -rw-r--r-- | python/markit_loans.py | 8 | ||||
| -rw-r--r-- | python/markit_red.py | 9 | ||||
| -rw-r--r-- | python/process_queue.py | 6 |
9 files changed, 53 insertions, 44 deletions
diff --git a/python/db.py b/python/db.py index ea3fed4e..760344f1 100644 --- a/python/db.py +++ b/python/db.py @@ -2,29 +2,33 @@ import psycopg2 from psycopg2.extras import DictCursor from psycopg2 import IntegrityError -conn = psycopg2.connect(database="ET", - user="et_user", +def dbconn(dbname): + if dbname == 'etdb': + dbname = 'ET' + user_name = 'et_user' + else: + user_name = dbname[:-2] + '_user' + return psycopg2.connect(database=dbname, + user=user_name, host="debian", cursor_factory=DictCursor) -serenitasdb = psycopg2.connect(database="serenitasdb", - user="serenitas_user", - host="debian", - cursor_factory=DictCursor) -def with_connection(f): - def with_connection_(*args, **kwargs): - # or use a pool, or a factory function... - try: - rv = f(conn, *args, **kwargs) - except Exception as e: - print(e) - conn.rollback() - else: - return rv - - return with_connection_ +def with_connection(dbname): + def decorator(f): + conn = dbconn(dbname) + def with_connection_(*args, **kwargs): + # or use a pool, or a factory function... + try: + rv = f(conn, *args, **kwargs) + except Exception as e: + print(e) + conn.rollback() + else: + return rv + return with_connection_ + return decorator -@with_connection +@with_connection('etdb') def query_db(conn, sqlstr, params=None, one=True): with conn.cursor() as c: if params: diff --git a/python/import_quotes.py b/python/import_quotes.py index 43c34045..bac610b0 100644 --- a/python/import_quotes.py +++ b/python/import_quotes.py @@ -2,7 +2,7 @@ import os from common import root import csv import datetime -from db import serenitasdb +from db import dbconn import re, sys from pandas.tseries.offsets import BDay import pandas as pd @@ -134,6 +134,7 @@ if __name__=="__main__": workdate = datetime.datetime.today()-BDay(1) workdate = workdate.date() engine = create_engine('postgresql://serenitas_user@debian/serenitasdb') + serenitasdb = dbconn('serenitasdb') insert_cds(serenitasdb, workdate) insert_index(engine, workdate) serenitasdb.close() diff --git a/python/intex/intex_scenarios.py b/python/intex/intex_scenarios.py index eece4876..0b15d64a 100644 --- a/python/intex/intex_scenarios.py +++ b/python/intex/intex_scenarios.py @@ -25,7 +25,7 @@ pattern9 = re.compile("(?P<a>SEVERITY\[\w+,\d+\]=)mkt\(70\)") global_reinvfloatpercentage = 84
global_reinvfixedpercentage = 16
-@with_connection
+@with_connection('etdb')
def dealname_from_cusip(conn, cusips):
with conn.cursor() as c:
c.callproc("dealname_from_cusip", params = cusip)
diff --git a/python/intex/load_indicative.py b/python/intex/load_indicative.py index 0686ca92..145325df 100644 --- a/python/intex/load_indicative.py +++ b/python/intex/load_indicative.py @@ -5,7 +5,8 @@ from datetime import date import csv, sys, re import pdb from common import root, sanitize_float -from db import conn +from db import dbconn +from contextlib import closing import logging logger = logging.getLogger(__name__) @@ -184,10 +185,10 @@ if __name__=="__main__": cusip_files = [f for f in files if "TrInfo" in f] deal_files = [f for f in files if "TrInfo" not in f] - #first load deal data - for deal in deal_files: - upload_deal_data(conn, deal) - #then load tranche data - for cusip in cusip_files: - upload_cusip_data(conn, cusip) - conn.close() + with closing(dbconn('etdb')) as etdb: + #first load deal data + for deal in deal_files: + upload_deal_data(etdb, deal) + #then load tranche data + for cusip in cusip_files: + upload_cusip_data(etdb, cusip) diff --git a/python/intex/load_intex_collateral.py b/python/intex/load_intex_collateral.py index cde12587..add1e127 100644 --- a/python/intex/load_intex_collateral.py +++ b/python/intex/load_intex_collateral.py @@ -2,11 +2,12 @@ import psycopg2 import os, csv, datetime import pdb from common import root, sanitize_float -from db import conn +from db import dbconn import sys import uuid from intex.load_indicative import upload_cusip_data, upload_deal_data import logging +from contextlib import closing logger = logging.getLogger(__name__) @@ -146,6 +147,6 @@ if __name__ == "__main__": workdate = sys.argv[1] else: workdate = str(datetime.date.today()) - intex_data(conn, workdate) - conn.close() + with dbconn('etdb') as etdb: + intex_data(etdb, workdate) print("done") diff --git a/python/markit_cds.py b/python/markit_cds.py index bd004d3d..965f2f8c 100644 --- a/python/markit_cds.py +++ b/python/markit_cds.py @@ -8,7 +8,7 @@ import zipfile, io import shutil
import pandas as pd
from pandas.tseries.offsets import BDay
-from db import serenitasdb
+from db import dbconn
from import_quotes import insert_cds, insert_index
from sqlalchemy import create_engine
@@ -66,6 +66,7 @@ if __name__=="__main__": payload.update({'type':'CredIndex','version':4})
download_composite_data(payload, historical)
engine = create_engine('postgresql://serenitas_user@debian/serenitasdb')
+ serenitasdb = dbconn('serenitasdb')
insert_cds(serenitasdb, workdate.date())
insert_index(engine, workdate.date())
serenitasdb.close()
diff --git a/python/markit_loans.py b/python/markit_loans.py index 1cc99739..f0e9d9a9 100644 --- a/python/markit_loans.py +++ b/python/markit_loans.py @@ -1,13 +1,13 @@ import requests
from common import root
-from db import conn, with_connection
+from db import with_connection
import os
import datetime
import csv
import sys
import logging
-@with_connection
+@with_connection('etdb')
def download_facility(conn, workdate, payload):
r = requests.get('https://loans.markit.com/loanx/LoanXFacilityUpdates.csv', params=payload)
facility_filename = os.path.join(root, "data", "Facility files", "facility_{0}.csv".format(workdate))
@@ -26,7 +26,7 @@ def download_facility(conn, workdate, payload): c.execute(sqlstring, newline)
conn.commit()
-@with_connection
+@with_connection('etdb')
def download_marks(conn, workdate, payload):
r = requests.get('https://loans.markit.com/loanx/LoanXMarks.csv', params=payload)
marks_filename = os.path.join(root, "data", "markit", "markit_data_{0}.csv".format(workdate))
@@ -46,7 +46,7 @@ def download_marks(conn, workdate, payload): line['Depth'], line['Mark Date']))
conn.commit()
-@with_connection
+@with_connection('etdb')
def update_facility(conn, workdate, payload):
#we update the missing facility loanxids
sqlstring = "SELECT loanxid FROM markit_prices EXCEPT SELECT loanxid FROM markit_facility";
diff --git a/python/markit_red.py b/python/markit_red.py index 513f08e1..09b836e0 100644 --- a/python/markit_red.py +++ b/python/markit_red.py @@ -3,7 +3,7 @@ from lxml import etree import requests, io, zipfile, shutil import common import os -from db import serenitasdb +from db import with_connection def request_payload(payload): r = requests.get('https://www.markit.com/export.jsp', params=payload) @@ -30,7 +30,8 @@ def download_report(report): r += request_payload(payload) return r -def update_redcodes(fname): +@with_connection('serenitasdb') +def update_redcodes(conn, fname): with open(os.path.join(common.root, "Tranche_data", "RED_reports", fname)) as fh: et = etree.parse(fh) r = [] @@ -45,9 +46,9 @@ def update_redcodes(fname): sqlstr = """UPDATE index_version SET redindexcode=%s where index=%s and series=%s and version=%s and indexfactor=%s""" - with serenitasdb.cursor() as c: + with conn.cursor() as c: c.executemany(sqlstr, [tuple([e[3], e[0], e[1], e[2], float(e[4])*100]) for e in r]) - serenitasdb.commit() + conn.commit() def update_redindices(fname): basedir = os.path.join(common.root, "Tranche_Data", "RED_reports") diff --git a/python/process_queue.py b/python/process_queue.py index 9f0e6f47..125a7a27 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -15,7 +15,7 @@ import os from sqlalchemy import create_engine from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP import re -import psycopg2 +from db import dbconn import logging import argparse @@ -204,9 +204,9 @@ if __name__=="__main__": parser = argparse.ArgumentParser() parser.add_argument("-n", "--no-upload", action="store_true", help="do not upload to Globeop") args = parser.parse_args() - engine = create_engine('postgresql://dawn_user@debian/dawndb') - conn = engine.raw_connection() q = get_redis_queue() + serenitasdb = dbconn('serenitasdb') + dawndb = dbconn('dawndb') for queue_name in ['bond_trades', 'cds_trades']: l = get_trades(q, queue_name) if l: |
