diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/markit/import_quotes.py | 51 | ||||
| -rw-r--r-- | python/process_queue.py | 6 |
2 files changed, 45 insertions, 12 deletions
diff --git a/python/markit/import_quotes.py b/python/markit/import_quotes.py index 2b4eda15..edca4a03 100644 --- a/python/markit/import_quotes.py +++ b/python/markit/import_quotes.py @@ -58,14 +58,15 @@ class CurveKey: short_code: str spread: int + @property + def full_ticker(self): + return f"{self.ticker}.{self.currency}.{self.tier}.{self.short_code}" + def get_markit_bbg_mapping( database, basketid_list, workdate -) -> Tuple[ - Set[Tuple[str, str]], Dict[CurveKey, Set[Tuple[Tuple[str], Tuple[int, Seniority]]]] -]: +) -> Dict[CurveKey, Set[Tuple[Tuple[str], Tuple[int, Seniority]]]]: markit_bbg_mapping = defaultdict(set) - all_tickers = set([]) with database.cursor() as c: c.execute( "SELECT markit_ticker, markit_tier, spread, currency, cds_curve, " @@ -75,7 +76,6 @@ def get_markit_bbg_mapping( (workdate, list(basketid_list)), ) for rec in c: - all_tickers.add((rec.markit_ticker, rec.markit_tier)) key = CurveKey( rec.markit_ticker, rec.markit_tier, @@ -91,7 +91,7 @@ def get_markit_bbg_mapping( ((rec.company_id, Seniority[rec.seniority]), tuple(rec.cds_curve)) ) database.commit() - return (all_tickers, markit_bbg_mapping) + return markit_bbg_mapping def get_bbg_tickers(database, basketid_list: List[int], workdate: datetime.date): @@ -127,7 +127,7 @@ def insert_cds(database, workdate): :param workdate: """ - all_tickers, markit_bbg_mapping = get_current_tickers(database, workdate) + markit_bbg_mapping = get_current_tickers(database, workdate) colnames = [ "Upfront" + tenor for tenor in ["6m", "1y", "2y", "3y", "4y", "5y", "7y", "10y"] ] @@ -199,7 +199,8 @@ def insert_cds(database, workdate): for (cid, sen), curves in markit_bbg_mapping[k]: c.execute( "INSERT INTO cds_curves VALUES(%s, %s, %s, %s) " - "ON CONFLICT DO NOTHING", + "ON CONFLICT (date, company_id, seniority) " + "DO UPDATE SET curve=excluded.curve", (workdate, cid, sen.name, buf), ) c.executemany( @@ -218,9 +219,39 @@ def insert_cds(database, workdate): for t, upf in zip(curves, upfront_rates) ], ) - tickers_found.add((line["Ticker"], line["Tier"])) + tickers_found.add(k) + database.commit() + tickers_missing = markit_bbg_mapping.keys() - tickers_found + tickers_missing.remove(CurveKey("JTIUK", "SNRFOR", "EUR", "CR14", 100)) + with database.cursor() as c: + for curve_key in tickers_missing: + logger.warning(f"{curve_key.full_ticker} missing for {workdate}") + for (cid, sen), e in markit_bbg_mapping[curve_key]: + c.execute( + "SELECT date, curve FROM cds_curves " + "WHERE company_id=%s AND seniority=%s AND date <= %s " + "ORDER BY date desc", + (cid, sen.name, workdate), + ) + try: + date, curve = c.fetchone() + except TypeError: + logger.error(f"{curve_key.full_ticker} never existed") + else: + if (workdate - date).days < 20: # we copy over the old curve + c.execute( + "INSERT INTO cds_curves VALUES(%s, %s, %s, %s) " + "ON CONFLICT (date, company_id, seniority) " + "DO UPDATE SET curve=excluded.curve", + (workdate, cid, sen.name, curve), + ) + logger.info(f"Using {date} curve for {curve_key.ticker}") + else: + logger.error( + "Could not find suitable curve for " + f"{curve_key.full_ticker} even looking back 20 days" + ) database.commit() - logger.warning("missing_quotes for {0}".format(all_tickers - tickers_found)) def get_date(f): diff --git a/python/process_queue.py b/python/process_queue.py index 87722e02..1e8caa9b 100644 --- a/python/process_queue.py +++ b/python/process_queue.py @@ -354,8 +354,10 @@ def terminate_list( f = build_termination(base_dir, **termination) except TypeError as e: logging.error(e) - if upload: - upload_file(f) + return + else: + if upload: + upload_file(f) p.delete(key) |
