aboutsummaryrefslogtreecommitdiffstats
path: root/python/task_server/globeop.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/task_server/globeop.py')
-rw-r--r--python/task_server/globeop.py117
1 files changed, 76 insertions, 41 deletions
diff --git a/python/task_server/globeop.py b/python/task_server/globeop.py
index 83b8f307..251e378a 100644
--- a/python/task_server/globeop.py
+++ b/python/task_server/globeop.py
@@ -9,7 +9,8 @@ import re
import logging
import sys
from sqlalchemy import create_engine
-sys.path.append('..')
+
+sys.path.append("..")
import load_globeop_report
logger = logging.getLogger(__name__)
@@ -20,6 +21,7 @@ try:
except ImportError:
pass
+
def get_ped(s):
regex = re.search("PED=([^.]+)", s)
if regex:
@@ -30,6 +32,7 @@ def get_ped(s):
PED = PED.date()
return PED
+
def key_fun(s):
PED = get_ped(s)
regex = re.search("KD=([^.]+)", s)
@@ -42,7 +45,7 @@ def key_fun(s):
def run_date(s):
- if 'SWO' in s:
+ if "SWO" in s:
date_string = s.split("_", 5)[4]
else:
date_string = s.split("_", 3)[2]
@@ -50,38 +53,54 @@ def run_date(s):
def get_ftp(folder):
- ftp = FTP('ftp.globeop.com')
- ftp.login('srntsftp', config.ftp_password)
+ ftp = FTP("ftp.globeop.com")
+ ftp.login("srntsftp", config.ftp_password)
ftp.cwd(folder)
return ftp
def get_gpg():
- if os.name == 'nt':
- gpg = gnupg.GPG(gpgbinary=r'"c:\\Program Files (x86)\\GNU\\GnuPG\\gpg2.exe"',
- gnupghome=os.path.join(os.getenv('APPDATA'), "gnupg"))
- elif os.name == 'posix':
- gpg = gnupg.GPG(gnupghome=os.path.join(os.environ['HOME'], '.gnupg'))
- gpg.encoding = 'utf8'
+ if os.name == "nt":
+ gpg = gnupg.GPG(
+ gpgbinary=r'"c:\\Program Files (x86)\\GNU\\GnuPG\\gpg2.exe"',
+ gnupghome=os.path.join(os.getenv("APPDATA"), "gnupg"),
+ )
+ elif os.name == "posix":
+ gpg = gnupg.GPG(gnupghome=os.path.join(os.environ["HOME"], ".gnupg"))
+ gpg.encoding = "utf8"
return gpg
+
def convert_to_csv(f):
- mapping = (('Credit Default Swap', 'CDS'), ('Swaption', 'Swaption'), ('ALL', 'All'))
+ mapping = (("Credit Default Swap", "CDS"), ("Swaption", "Swaption"), ("ALL", "All"))
if f.exists():
for sheet, name in mapping:
df = pd.read_excel(f, sheet_name=sheet, skiprows=[0, 1, 2, 3])
df.to_csv(f.parent / f"{name}_Report.csv", index=False)
f.unlink()
+
def download_data(workdate: datetime.date):
- ftp = get_ftp('outgoing')
+ ftp = get_ftp("outgoing")
files = ftp.nlst()
- pnlfiles = [filename for filename in files if "csv" in filename and
- "Profit" in filename if get_ped(filename) < workdate]
- valuationfiles = [filename for filename in files if "csv" in filename and
- "Valuation_TradeID" in filename if get_ped(filename) < workdate]
- cdsfiles = [filename for filename in files if "TradeSearch" in filename
- if run_date(filename).date() <= workdate]
+ pnlfiles = [
+ filename
+ for filename in files
+ if "csv" in filename and "Profit" in filename
+ if get_ped(filename) < workdate
+ ]
+ valuationfiles = [
+ filename
+ for filename in files
+ if "csv" in filename and "Valuation_TradeID" in filename
+ if get_ped(filename) < workdate
+ ]
+ cdsfiles = [
+ filename
+ for filename in files
+ if "TradeSearch" in filename
+ if run_date(filename).date() <= workdate
+ ]
available_files = []
if pnlfiles:
@@ -100,7 +119,7 @@ def download_data(workdate: datetime.date):
for filename in available_files:
with (reports_dir / filename).open("wb") as fh:
- ftp.retrbinary('RETR ' + filename, fh.write)
+ ftp.retrbinary("RETR " + filename, fh.write)
logger.info(f"downloaded {filename}")
gpg = get_gpg()
@@ -112,18 +131,22 @@ def download_data(workdate: datetime.date):
else:
newfilename = "CDS_Report.xls"
with (reports_dir / filename).open("rb") as fh:
- dec = gpg.decrypt_file(fh, output=(reports_dir / newfilename).as_posix(),
- passphrase=config.key_password,
- always_trust=True)
- logger.info(f'{filename}: {dec.status}')
+ dec = gpg.decrypt_file(
+ fh,
+ output=(reports_dir / newfilename).as_posix(),
+ passphrase=config.key_password,
+ always_trust=True,
+ )
+ logger.info(f"{filename}: {dec.status}")
(reports_dir / filename).unlink()
# convert xls to csv
convert_to_csv(reports_dir / "CDS_Report.xls")
insert_todb(workdate)
+
def insert_todb(workdate: datetime.date):
reports_dir = DAILY_DIR / str(workdate) / "Reports"
- engine = create_engine('postgresql://dawn_user@debian/dawndb')
+ engine = create_engine("postgresql://dawn_user@debian/dawndb")
for report in ["Valuation", "Pnl", "CDS"]:
fun = getattr(load_globeop_report, f"read_{report.lower()}_report")
table = f"{report.lower()}_reports"
@@ -135,42 +158,54 @@ def insert_todb(workdate: datetime.date):
period_end_date = pd.Timestamp(df.periodenddate[0])
sql_str = "DELETE FROM valuation_reports WHERE periodenddate=%s"
else:
- df['date'] = period_end_date
+ df["date"] = period_end_date
sql_str = "DELETE FROM {} WHERE date=%s".format(table)
- df['row'] = df.index
+ df["row"] = df.index
engine.execute(sql_str, (period_end_date,))
- df.to_sql(table, engine, if_exists='append', index=False)
+ df.to_sql(table, engine, if_exists="append", index=False)
+
def upload_bond_marks(engine, workdate: datetime.datetime):
- df = pd.read_sql_query("SELECT identifier, price from list_marks(%s) "
- "RIGHT JOIN list_positions(%s, NULL, False) "
- "USING (identifier)", engine,
- params=(workdate.date(), workdate.date()))
- df.rename(columns={'identifier': 'IDENTIFIER',
- 'price': 'Price'}, inplace=True)
- fullpath = DAILY_DIR / str(workdate.date()) / f"securitiesNpv{workdate:%Y%m%d_%H%M%S}.csv"
+ df = pd.read_sql_query(
+ "SELECT identifier, price from list_marks(%s) "
+ "RIGHT JOIN list_positions(%s, NULL, False) "
+ "USING (identifier)",
+ engine,
+ params=(workdate.date(), workdate.date()),
+ )
+ df.rename(columns={"identifier": "IDENTIFIER", "price": "Price"}, inplace=True)
+ fullpath = (
+ DAILY_DIR / str(workdate.date()) / f"securitiesNpv{workdate:%Y%m%d_%H%M%S}.csv"
+ )
df.to_csv(fullpath, index=False)
- ftp = get_ftp('incoming')
+ ftp = get_ftp("incoming")
with fullpath.open("rb") as fh:
- ftp.storbinary('STOR ' + fullpath.name, fh)
+ ftp.storbinary("STOR " + fullpath.name, fh)
logger.info("upload bond marks done")
+
def upload_cds_marks(engine, workdate: datetime.datetime):
- df = pd.read_sql_query("""SELECT cds.dealid AS "DealID", 'CREDIT_SWAP' AS "Instrument Type",
+ df = pd.read_sql_query(
+ """SELECT cds.dealid AS "DealID", 'CREDIT_SWAP' AS "Instrument Type",
(a.clean_nav+a.accrued) AS "NPV" from list_abscds_marks(%s) a
-JOIN cds USING (security_id)""", engine, params=(workdate.date(),))
+JOIN cds USING (security_id)""",
+ engine,
+ params=(workdate.date(),),
+ )
fullpath = DAILY_DIR / str(workdate.date()) / f"otcNpv{workdate:%Y%m%d}.csv"
df.to_csv(fullpath, index=False)
- ftp = get_ftp('incoming')
+ ftp = get_ftp("incoming")
with fullpath.open("rb") as fh:
- ftp.storbinary('STOR ' + fullpath.name, fh)
+ ftp.storbinary("STOR " + fullpath.name, fh)
logger.info("upload cds marks done")
+
def upload_data(engine, workdate: datetime.datetime):
upload_bond_marks(engine, workdate)
upload_cds_marks(engine, workdate)
+
def back_fill(start_date=pd.datetime(2017, 7, 20)):
- date_rng = pd.date_range(start=start_date, end=pd.Timestamp.today(), freq='B')
+ date_rng = pd.date_range(start=start_date, end=pd.Timestamp.today(), freq="B")
for date in date_rng:
insert_todb(date.date())