diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/task_server/globeop.py | 117 |
1 files changed, 76 insertions, 41 deletions
diff --git a/python/task_server/globeop.py b/python/task_server/globeop.py index 83b8f307..251e378a 100644 --- a/python/task_server/globeop.py +++ b/python/task_server/globeop.py @@ -9,7 +9,8 @@ import re import logging import sys from sqlalchemy import create_engine -sys.path.append('..') + +sys.path.append("..") import load_globeop_report logger = logging.getLogger(__name__) @@ -20,6 +21,7 @@ try: except ImportError: pass + def get_ped(s): regex = re.search("PED=([^.]+)", s) if regex: @@ -30,6 +32,7 @@ def get_ped(s): PED = PED.date() return PED + def key_fun(s): PED = get_ped(s) regex = re.search("KD=([^.]+)", s) @@ -42,7 +45,7 @@ def key_fun(s): def run_date(s): - if 'SWO' in s: + if "SWO" in s: date_string = s.split("_", 5)[4] else: date_string = s.split("_", 3)[2] @@ -50,38 +53,54 @@ def run_date(s): def get_ftp(folder): - ftp = FTP('ftp.globeop.com') - ftp.login('srntsftp', config.ftp_password) + ftp = FTP("ftp.globeop.com") + ftp.login("srntsftp", config.ftp_password) ftp.cwd(folder) return ftp def get_gpg(): - if os.name == 'nt': - gpg = gnupg.GPG(gpgbinary=r'"c:\\Program Files (x86)\\GNU\\GnuPG\\gpg2.exe"', - gnupghome=os.path.join(os.getenv('APPDATA'), "gnupg")) - elif os.name == 'posix': - gpg = gnupg.GPG(gnupghome=os.path.join(os.environ['HOME'], '.gnupg')) - gpg.encoding = 'utf8' + if os.name == "nt": + gpg = gnupg.GPG( + gpgbinary=r'"c:\\Program Files (x86)\\GNU\\GnuPG\\gpg2.exe"', + gnupghome=os.path.join(os.getenv("APPDATA"), "gnupg"), + ) + elif os.name == "posix": + gpg = gnupg.GPG(gnupghome=os.path.join(os.environ["HOME"], ".gnupg")) + gpg.encoding = "utf8" return gpg + def convert_to_csv(f): - mapping = (('Credit Default Swap', 'CDS'), ('Swaption', 'Swaption'), ('ALL', 'All')) + mapping = (("Credit Default Swap", "CDS"), ("Swaption", "Swaption"), ("ALL", "All")) if f.exists(): for sheet, name in mapping: df = pd.read_excel(f, sheet_name=sheet, skiprows=[0, 1, 2, 3]) df.to_csv(f.parent / f"{name}_Report.csv", index=False) f.unlink() + def download_data(workdate: datetime.date): - ftp = get_ftp('outgoing') + ftp = get_ftp("outgoing") files = ftp.nlst() - pnlfiles = [filename for filename in files if "csv" in filename and - "Profit" in filename if get_ped(filename) < workdate] - valuationfiles = [filename for filename in files if "csv" in filename and - "Valuation_TradeID" in filename if get_ped(filename) < workdate] - cdsfiles = [filename for filename in files if "TradeSearch" in filename - if run_date(filename).date() <= workdate] + pnlfiles = [ + filename + for filename in files + if "csv" in filename and "Profit" in filename + if get_ped(filename) < workdate + ] + valuationfiles = [ + filename + for filename in files + if "csv" in filename and "Valuation_TradeID" in filename + if get_ped(filename) < workdate + ] + cdsfiles = [ + filename + for filename in files + if "TradeSearch" in filename + if run_date(filename).date() <= workdate + ] available_files = [] if pnlfiles: @@ -100,7 +119,7 @@ def download_data(workdate: datetime.date): for filename in available_files: with (reports_dir / filename).open("wb") as fh: - ftp.retrbinary('RETR ' + filename, fh.write) + ftp.retrbinary("RETR " + filename, fh.write) logger.info(f"downloaded {filename}") gpg = get_gpg() @@ -112,18 +131,22 @@ def download_data(workdate: datetime.date): else: newfilename = "CDS_Report.xls" with (reports_dir / filename).open("rb") as fh: - dec = gpg.decrypt_file(fh, output=(reports_dir / newfilename).as_posix(), - passphrase=config.key_password, - always_trust=True) - logger.info(f'{filename}: {dec.status}') + dec = gpg.decrypt_file( + fh, + output=(reports_dir / newfilename).as_posix(), + passphrase=config.key_password, + always_trust=True, + ) + logger.info(f"{filename}: {dec.status}") (reports_dir / filename).unlink() # convert xls to csv convert_to_csv(reports_dir / "CDS_Report.xls") insert_todb(workdate) + def insert_todb(workdate: datetime.date): reports_dir = DAILY_DIR / str(workdate) / "Reports" - engine = create_engine('postgresql://dawn_user@debian/dawndb') + engine = create_engine("postgresql://dawn_user@debian/dawndb") for report in ["Valuation", "Pnl", "CDS"]: fun = getattr(load_globeop_report, f"read_{report.lower()}_report") table = f"{report.lower()}_reports" @@ -135,42 +158,54 @@ def insert_todb(workdate: datetime.date): period_end_date = pd.Timestamp(df.periodenddate[0]) sql_str = "DELETE FROM valuation_reports WHERE periodenddate=%s" else: - df['date'] = period_end_date + df["date"] = period_end_date sql_str = "DELETE FROM {} WHERE date=%s".format(table) - df['row'] = df.index + df["row"] = df.index engine.execute(sql_str, (period_end_date,)) - df.to_sql(table, engine, if_exists='append', index=False) + df.to_sql(table, engine, if_exists="append", index=False) + def upload_bond_marks(engine, workdate: datetime.datetime): - df = pd.read_sql_query("SELECT identifier, price from list_marks(%s) " - "RIGHT JOIN list_positions(%s, NULL, False) " - "USING (identifier)", engine, - params=(workdate.date(), workdate.date())) - df.rename(columns={'identifier': 'IDENTIFIER', - 'price': 'Price'}, inplace=True) - fullpath = DAILY_DIR / str(workdate.date()) / f"securitiesNpv{workdate:%Y%m%d_%H%M%S}.csv" + df = pd.read_sql_query( + "SELECT identifier, price from list_marks(%s) " + "RIGHT JOIN list_positions(%s, NULL, False) " + "USING (identifier)", + engine, + params=(workdate.date(), workdate.date()), + ) + df.rename(columns={"identifier": "IDENTIFIER", "price": "Price"}, inplace=True) + fullpath = ( + DAILY_DIR / str(workdate.date()) / f"securitiesNpv{workdate:%Y%m%d_%H%M%S}.csv" + ) df.to_csv(fullpath, index=False) - ftp = get_ftp('incoming') + ftp = get_ftp("incoming") with fullpath.open("rb") as fh: - ftp.storbinary('STOR ' + fullpath.name, fh) + ftp.storbinary("STOR " + fullpath.name, fh) logger.info("upload bond marks done") + def upload_cds_marks(engine, workdate: datetime.datetime): - df = pd.read_sql_query("""SELECT cds.dealid AS "DealID", 'CREDIT_SWAP' AS "Instrument Type", + df = pd.read_sql_query( + """SELECT cds.dealid AS "DealID", 'CREDIT_SWAP' AS "Instrument Type", (a.clean_nav+a.accrued) AS "NPV" from list_abscds_marks(%s) a -JOIN cds USING (security_id)""", engine, params=(workdate.date(),)) +JOIN cds USING (security_id)""", + engine, + params=(workdate.date(),), + ) fullpath = DAILY_DIR / str(workdate.date()) / f"otcNpv{workdate:%Y%m%d}.csv" df.to_csv(fullpath, index=False) - ftp = get_ftp('incoming') + ftp = get_ftp("incoming") with fullpath.open("rb") as fh: - ftp.storbinary('STOR ' + fullpath.name, fh) + ftp.storbinary("STOR " + fullpath.name, fh) logger.info("upload cds marks done") + def upload_data(engine, workdate: datetime.datetime): upload_bond_marks(engine, workdate) upload_cds_marks(engine, workdate) + def back_fill(start_date=pd.datetime(2017, 7, 20)): - date_rng = pd.date_range(start=start_date, end=pd.Timestamp.today(), freq='B') + date_rng = pd.date_range(start=start_date, end=pd.Timestamp.today(), freq="B") for date in date_rng: insert_todb(date.date()) |
