diff options
| -rw-r--r-- | python/load_globeop_report.py | 138 | ||||
| -rw-r--r-- | python/task_server/globeop.py | 47 | ||||
| -rw-r--r-- | sql/dawn.sql | 10 |
3 files changed, 93 insertions, 102 deletions
diff --git a/python/load_globeop_report.py b/python/load_globeop_report.py index 68c1aa9d..377f2746 100644 --- a/python/load_globeop_report.py +++ b/python/load_globeop_report.py @@ -17,35 +17,42 @@ def get_globs(fname, years=['2013', '2014', '2015', '2016', '2017']): fname)))) return globs -def valuation_reports(): - df = pd.DataFrame() - for f in chain.from_iterable(get_globs('Valuation_Report')): - try: - date = pd.Timestamp(f.split('/')[6]) - except ValueError: - date = pd.Timestamp(f.split('/')[4]) +def read_valuation_report(f): + try: + date = pd.Timestamp(f.split('/')[6]) + except ValueError: + date = pd.Timestamp(f.split('/')[4]) + if date >= pd.Timestamp('2013-02-06'): + df = pd.read_csv(f, parse_dates=['KnowledgeDate','PeriodEndDate']) + else: + df = pd.read_csv(f) + df['KnowledgeDate'] = date + df['PeriodEndDate'] = date - bus_day + df['row'] = df.index + if 'AccountingPeriod' in df: + del df['AccountingPeriod'] + if "Strat" in df: + df.Strat = df.Strat.str.replace("^(SERCGMAST__){1,2}(M_|SER_)?", "", 1) + if "Port" in df: + df.Port = df.Port.str.replace("^(SERCGMAST__){1,2}(SERG__|SERG_)?", "", 1) + df.columns = df.columns.str.lower() + return df - if date >= pd.Timestamp('2013-02-06'): - newdf = pd.read_csv(f, parse_dates=['KnowledgeDate','PeriodEndDate']) - else: - newdf = pd.read_csv(f) - newdf['KnowledgeDate'] = date - newdf['PeriodEndDate'] = date - bus_day - newdf['row'] = newdf.index - if newdf.empty or ('PeriodEndDate' in df and \ - not df[df.PeriodEndDate == newdf.PeriodEndDate.iat[0]].empty): - continue - df = df.append(newdf) - del df['AccountingPeriod'] +def valuation_reports(): + df = pd.concat(read_valuation_report(f) for f in + chain.from_iterable(get_globs('Valuation_Report'))) + # There can be duplicates in case of holidays + df = df.sort_values(['periodenddate', 'row', 'knowledgedate']) + df = df.drop_duplicates(['periodenddate', 'row'], 'last') + df.to_sql('valuation_reports', dbengine('dawndb'), if_exists='append', index=False) - ## cleanups +def read_pnl_report(f): + df = pd.read_csv(f) df.Strat = df.Strat.str.replace("^(SERCGMAST__){1,2}(M_|SER_)?", "", 1) df.Port = df.Port.str.replace("^(SERCGMAST__){1,2}(SERG__|SERG_)?", "", 1) - for col in ['Strat', 'InvCcy', 'Fund', 'Port']: - df[col] = df[col].astype('category') - df.columns = df.columns.str.lower() - - df.to_sql('val_reports', dbengine('dawndb'), if_exists='append', index=False) + df['LongShortIndicator'] = df['LongShortIndicator'].str.strip() + df.columns = df.columns.str.lower().str.replace(" ", "") + return df def pnl_reports(): df = {} @@ -55,73 +62,54 @@ def pnl_reports(): except ValueError: date = pd.Timestamp(f.split('/')[4]) date = date - bus_day - df[date] = pd.read_csv(f) - df[date]['row'] = df[date].index - df = pd.concat(df, names=['date', 'to_drop']) - df.reset_index(level='to_drop', drop=True, inplace=True) - df.Strat = df.Strat.str.replace("^(SERCGMAST__){1,2}(M_|SER_)?", "", 1) - df.Port = df.Port.str.replace("^(SERCGMAST__){1,2}(SERG__|SERG_)?", "", 1) - for col in ['Fund', 'Strat', 'Port', 'LongShortIndicator', 'InvCcy']: - df[col] = df[col].astype('category') - - ## cleanups - df = df.reset_index() - df.columns = df.columns.str.lower() - df['longshortindicator'] = df['longshortindicator'].str.strip() - df.columns = [c.replace(" ", "") for c in df.columns] - + df[date] = read_pnl_report(f) + df = pd.concat(df, names=['date', 'row']).reset_index() df.to_sql('pnl_reports', dbengine('dawndb'), if_exists='append', index=False) -def cds_reports(): - df = {} - for f in chain.from_iterable(get_globs('CDS_Report')): - try: - date = pd.Timestamp(f.split('/')[6]) - except ValueError: - date = pd.Timestamp(f.split('/')[4]) - date = date - bus_day - df[date] = pd.read_csv(f) - df[date]['row'] = df[date].index - df = pd.concat(df, names=['date', 'to_drop']) - df.reset_index(level='to_drop', drop=True, inplace=True) - for col in ['Buy/Sell', 'Counterparty', 'CCP', 'Ccy', 'Direction', 'Price Ccy', - 'Period End Date', 'Basis', 'Roll Convention', 'Settle Mode', - 'Strategy', 'Trade Type', 'Trade Status', 'Prime Broker']: - df[col] = df[col].astype('category') - for col in df.columns: - if 'Date' in col and col != 'Period End Date': - df[col] = pd.to_datetime(df[col]) +def read_cds_report(f, old_report=False): + df = pd.read_csv(f) for col in df.columns: vc = len(df[col].value_counts()) if vc == 0: del df[col] continue - if df[col].dtype == 'object' and vc < 20: - df[col] = df[col].astype('category') - contract = df['Contractual Definition'] - contract = contract.where(contract.isin(['ISDA2014', 'ISDA2003Cred']), 'ISDA2014').astype('category') - df['Contractual Definition'] = contract + if 'Contractual Definition' in df: + contract = df['Contractual Definition'] + contract = contract.where(contract.isin(['ISDA2014', 'ISDA2003Cred']), 'ISDA2014') + df['Contractual Definition'] = contract df = df.drop(['Bloomberg Yellow key', 'Created User', 'Last Modified User', 'Last Modified Date', 'Fund Long Name', 'Instrument Sub Type', 'Netting Id', 'Client', 'Trade Status', 'Position Status', 'Clearing Broker', 'Settle Mode', 'Off Price', 'On Price', - 'Price Ccy'], - axis=1) - df.columns = df.columns.str.lower() - df.columns = df.columns.str.replace(" ", "_") - df.roll_convention = df.roll_convention.str.title() + 'Price Ccy'], axis=1, errors='ignore') + df.columns = df.columns.str.lower().str.replace(" ", "_") + if old_report: + df.calendar = df.calendar.str.replace(" ", "") + df = df.rename(columns={'direction': 'buy/sell'}) + df.roll_convention = df.roll_convention.str.title() df = df[df.strategy != 'SER_TEST'] df.loc[df.strategy == 'SERCGMAST__MBSCDS', 'strategy'] = 'MBSCDS' df.strategy = df.strategy.str.replace("SER_","") - df.loc[df['buy/sell'].isnull(), 'buy/sell'] = df.loc[df['buy/sell'].isnull(), 'direction'] + df['buy/sell'] = df['buy/sell'].astype('category') df['buy/sell'].cat.categories = ['Buyer', 'Seller'] - del df['direction'] - df.prime_broker = df.prime_broker.cat.remove_categories('NONE') - df.calendar = df.calendar.str.replace(" ", "") - df['executing_broker'] = df['executing_broker'].astype('object') + df.prime_broker = df.prime_broker.where(df.prime_broker != 'NONE') df.loc[df.executing_broker.isnull(),'executing_broker'] = df[df.executing_broker.isnull()].counterparty del df['counterparty'] - df = df.rename(columns={'executing_broker': 'counterparty'}) + df = df.rename(columns={'executing_broker': 'counterparty', + 'independent_%':'independent_perc'}) + return df + +def cds_reports(): + df = {} + for f in chain.from_iterable(get_globs('CDS_Report')): + try: + date = pd.Timestamp(f.split('/')[6]) + except ValueError: + date = pd.Timestamp(f.split('/')[4]) + old_report = date <= pd.Timestamp('2017-02-28') or date == pd.Timestamp('2017-03-02') + date = date - bus_day + df[date] = read_cds_report(f, old_report) + df = pd.concat(df, names=['date', 'row']).reset_index() return df def monthly_pnl_bycusip(df, strats): diff --git a/python/task_server/globeop.py b/python/task_server/globeop.py index 397b1f47..91e5f5a0 100644 --- a/python/task_server/globeop.py +++ b/python/task_server/globeop.py @@ -6,8 +6,11 @@ from task_server import config import re
import logging
import shutil
+import sys
import pandas as pd
from sqlalchemy import create_engine
+sys.path.append('..')
+import load_globeop_report
logger = logging.getLogger(__name__)
@@ -59,6 +62,12 @@ def get_gpg(): gpg.encoding = 'utf8'
return gpg
+def convert_to_csv(f):
+ if os.path.exists(f + ".xls"):
+ df = pd.read_excel(f + ".xls", sheetname=0, skiprows=[0,1,2,3])
+ df.to_csv(f + ".csv", index=False)
+ os.remove(f + ".xls")
+
def download_data(workdate):
ftp = get_ftp('outgoing')
files = ftp.nlst()
@@ -67,7 +76,7 @@ def download_data(workdate): valuationfiles = [filename for filename in files if "csv" in filename and \
"Valuation_TradeID" in filename if get_ped(filename) < workdate]
cdsfiles = [filename for filename in files if "TradeSearch" in filename \
- if run_date(filename).date()<=workdate]
+ if run_date(filename).date() <= workdate]
available_files = []
if pnlfiles:
available_files.append(sorted(pnlfiles, key=key_fun, reverse=True)[0])
@@ -92,7 +101,7 @@ def download_data(workdate): gpg = get_gpg()
for filename in available_files:
if "Profit" in filename:
- newfilename = "Pnl.csv"
+ newfilename = "Pnl_Report.csv"
elif "Valuation" in filename:
newfilename = "Valuation_Report.csv"
else:
@@ -103,30 +112,22 @@ def download_data(workdate): always_trust=True)
logger.info('{0}: {1}'.format(filename, dec.status))
os.remove(os.path.join(reports_dir, filename))
- if os.path.exists(os.path.join(reports_dir, "CDS_Report.xls")):
- df = pd.read_excel(os.path.join(reports_dir, "CDS_Report.xls"), sheetname=0, skiprows=[0,1,2,3])
- df.to_csv(os.path.join(reports_dir, "CDS_Report.csv"), index=False)
- os.remove(os.path.join(reports_dir, "CDS_Report.xls"))
+ ## convert xls to csv
+ convert_to_csv(os.path.join(reports_dir, "CDS_Report"))
engine = create_engine('postgresql://dawn_user@debian/dawndb')
- for f, table in zip(["Valuation_Report.csv", "Pnl.csv"],
- ["val_reports", "pnl_reports"]):
- df = pd.read_csv(os.path.join(reports_dir, f))
- if 'PeriodEndDate' in df:
- period_end_date = pd.Timestamp(df.PeriodEndDate[0])
- df['row'] = df.index
- if 'AccountingPeriod' in df:
- del df['AccountingPeriod']
- df.Strat = df.Strat.str.replace("^(SERCGMAST__){1,2}(M_|SER_)?", "", 1)
- df.Port = df.Port.str.replace("^(SERCGMAST__){1,2}(SERG__|SERG_)?", "", 1)
- df.columns = df.columns.str.lower()
- if f == "Pnl.csv":
- df['longshortindicator'] = df['longshortindicator'].str.strip()
- df.columns = df.columns.str.replace(" ", "")
- df['date'] = period_end_date
- sql_str = "DELETE FROM pnl_reports WHERE date=%s"
+ for report in ["Valuation", "Pnl", "CDS"]:
+ fun = getattr(load_globeop_report, "read_{}_report".format(report.lower()))
+ table = "{}_reports".format(report.lower())
+
+ df = fun(os.path.join(reports_dir, "{}_Report.csv".format(report)))
+ if report == "Valuation":
+ period_end_date = pd.Timestamp(df.periodenddate[0])
+ sql_str = "DELETE FROM valuation_reports WHERE periodenddate=%s"
else:
- sql_str = "DELETE FROM val_reports WHERE periodenddate=%s"
+ df['date'] = period_end_date
+ sql_str = "DELETE FROM {} WHERE date=%s".format(table)
+ df['row'] = df.index
engine.execute(sql_str, (period_end_date,))
df.to_sql(table, engine, if_exists='append', index=False)
diff --git a/sql/dawn.sql b/sql/dawn.sql index 8d2c5b93..ba1b878b 100644 --- a/sql/dawn.sql +++ b/sql/dawn.sql @@ -910,7 +910,7 @@ CREATE TYPE strategy AS ENUM('CLOCDSCSH', 'CLO_AAA', 'CLO_BB20', 'CLO_BBB', 'CSH 'SERCGLLC__SERCGLLC', 'SERCGLTD__SERCGLTD', 'SERCGLTD__SERLTD_EXP', 'SER_TEST__SER_TEST', 'STR_MAV', 'STR_MEZZ') -CREATE TABLE val_reports( +CREATE TABLE valuation_reports( custacctname text, endbookcost float, endbookmv float, @@ -928,7 +928,7 @@ CREATE TABLE val_reports( invdesc text, invid text, invtype text, - knowledgedate date NOT NULL, + knowledgedate timestamp NOT NULL, periodenddate date NOT NULL, port portfolio, strat strategy, @@ -936,7 +936,7 @@ CREATE TABLE val_reports( PRIMARY KEY(periodenddate, row) ); -CREATE INDEX on val_reports (periodenddate); +CREATE INDEX on valuation_reports (periodenddate); CREATE TYPE longshort AS ENUM('L', 'S'); @@ -1010,8 +1010,8 @@ CREATE TABLE cds_reports( fund fund, gtid text, geneva_id text, - "independent_%" float, independent_amount float, + independent_perc float, maturity_date date, notional float, original_gtid text, @@ -1036,6 +1036,8 @@ CREATE TABLE cds_reports( upfront_fee_date date, PRIMARY KEY(date, row)); +CREATE INDEX on cds_reports (date); + CREATE TYPE mark_list AS (date date, identifier text, "BROKER" float, |
