aboutsummaryrefslogtreecommitdiffstats
path: root/python/citco_submission.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/citco_submission.py')
-rw-r--r--python/citco_submission.py223
1 files changed, 14 insertions, 209 deletions
diff --git a/python/citco_submission.py b/python/citco_submission.py
index f5fd4f1a..b38218ea 100644
--- a/python/citco_submission.py
+++ b/python/citco_submission.py
@@ -1,232 +1,37 @@
from serenitas.utils.remote import SftpClient
from stat import S_ISREG
-import re
import time
-import logging
from paramiko.ssh_exception import SSHException
from serenitas.utils.db import dbconn
from csv import DictReader, reader
from serenitas.utils.exchange import ExchangeMessage, FileAttachment
-from psycopg2.errors import UniqueViolation
+from psycopg.errors import UniqueViolation
from io import StringIO
import pandas as pd
import warnings
-
-logger = logging.getLogger(__name__)
-
-
-def get_citco_property(s):
- is_processed, fname_short = s.rsplit("_", 1)
- is_processed = (
- is_processed.rsplit("-")[1] == "PROCESSED"
- ) # We have separate process for processed files and failed files
- fname_short = fname_short.removesuffix(".csv")
- return is_processed, fname_short
-
-
-def insert_todb(conn, queue):
- sql = "INSERT INTO citco_submission (fname, identifier_type, identifier, serenitas_id) VALUES (%s, %s, %s, %s)"
- with conn.cursor() as c:
- try:
- c.executemany(sql, queue)
- except UniqueViolation as e:
- logger.warning(e)
- conn.rollback()
- return False # Committed
- else:
- conn.commit()
- return True
-
-
-def instrument_table(instrument_id):
- if instrument_id.startswith("IRS"):
- return "citco_irs"
- elif instrument_id.startswith("SWPO_") or instrument_id.startswith("BNDO_"):
- return "citco_swaption"
- elif instrument_id.startswith("CDS_"):
- return "citco_tranche"
- elif instrument_id.startswith("TRS"):
- return "citco_trs"
-
-
-# Not needed for now, but maybe in the future, please leave
-def trade_table(trade_id):
- if trade_id.startswith("IRS"):
- return "irs"
- elif trade_id.startswith("SWPTN"):
- return "swaption"
- elif trade_id.startswith("SCCDS"):
- return "cds"
- elif trade_id.startswith("TRS"):
- return "trs"
- elif trade_id.startswith("SC_"):
- return "bonds"
-
-
-def update_instrument(conn, instrument_id, identifier, acked):
- if table := instrument_table(instrument_id):
- instrument_table_sql = (
- f"UPDATE {table} SET committed=%s, status=%s where dealid=%s"
- )
- submission_sql = f"UPDATE citco_submission_status SET committed=True, identifier=%s where serenitas_id=%s and identifier is NULL and identifier_type='instrument'"
- with conn.cursor() as c:
- c.execute(
- instrument_table_sql,
- (
- acked,
- "Acknowledged" if acked else "Failed",
- instrument_id,
- ),
- )
- c.execute(
- submission_sql,
- (
- identifier,
- instrument_id,
- ),
- )
- conn.commit()
- else:
- print(f"{instrument_id} not identified")
-
-
-def update_trade(conn, trade_id, identifier):
- submission_sql = f"UPDATE citco_submission_status SET committed=True, identifier=%s where serenitas_id=%s and identifier is NULL and identifier_type='trade'"
- with conn.cursor() as c:
- c.execute(
- submission_sql,
- (
- identifier,
- trade_id,
- ),
- )
- conn.commit()
-
-
-def get_data(line):
- if line["Internal_Order_Id"]: # This is a trade
- identifier_type = "trade"
- serenitas_id = line["External_Order_Id"]
- identifier = line["Internal_Order_Id"]
- else:
- identifier_type = "instrument"
- serenitas_id = line["External_Security_Id"]
- identifier = line["Internal_Security_Id"]
- return identifier_type, serenitas_id, identifier
-
-
-def get_failed_data(line):
- if len(line) == 1:
- return ("failed", line[-1])
- elif line[1]: # Trade upload
- return ("trade", line[2])
- elif (
- not line[1] and line[2]
- ): # Instrument upload, just mark as failed if it's a single error message
- return ("instrument", line[2])
- else:
- return ("failed", line[-1])
-
-
-def update_submission_table(
- conn, identifier_type, serenitas_id, identifier, acked=True
-):
- if identifier_type == "trade":
- update_trade(conn, serenitas_id, identifier)
- elif identifier_type == "failed":
- warnings.warn(f"failed: {serenitas_id}")
- else:
- update_instrument(conn, serenitas_id, identifier, acked)
-
-
-def process_failed_files(fh, _insert_queue, fname_short):
- csv_reader = reader(fh)
- errors = []
- for line in csv_reader:
- if "header line" in line[-1]:
- continue
- instrument, serenitas_id = get_failed_data(line)
- _insert_queue.append(
- [
- fname_short,
- instrument,
- "failed",
- serenitas_id,
- ]
- )
- errors.append(f"{serenitas_id}: {line[-1]}")
- return errors
-
-
-def process_processed_file(fh, _insert_queue, fname_short):
- dict_reader = DictReader(fh)
- for line in dict_reader:
- (
- identifier_type,
- serenitas_id,
- identifier,
- ) = get_data(line)
- _insert_queue.append(
- [
- fname_short,
- identifier_type,
- identifier,
- serenitas_id,
- ]
- )
- return identifier
-
-
-def _update_table(conn, _insert_queue, is_processed):
- resp = []
- for row in _insert_queue:
- if "header line" in row or (
- row[2] != "failed" and not is_processed
- ): # short file
- continue
- update_submission_table(conn, row[1], row[3], row[2], is_processed)
- resp.append(f"{row[1]}: {row[2]}, {row[3]}{'' if is_processed else row[-1]}")
- return resp
-
-
-def file_process(
- fh,
- fname_short,
- is_processed,
- conn,
-):
- em = ExchangeMessage()
- _insert_queue = []
- if is_processed:
- process_processed_file(fh, _insert_queue, fname_short)
- else:
- errors = process_failed_files(fh, _insert_queue, fname_short)
- if insert_todb(conn, _insert_queue):
- if resp := _update_table(conn, _insert_queue, is_processed):
- em.send_email(
- f"(CITCO) UPLOAD {'success' if is_processed else 'fail'}",
- "\n".join(resp if is_processed else errors),
- ("selene-ops@lmcg.com",),
- )
+from citco_ops.utils import CitcoSubmission
+import pickle
def run():
- from lru import LRU
-
- _cache = LRU(128)
sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications")
while True:
try:
- conn = dbconn("dawndb")
- em = ExchangeMessage()
+ with open("citco.pickle", "rb") as fh:
+ already_uploaded = pickle.load(fh)
+ except FileNotFoundError:
+ already_uploaded = {}
+ try:
for f in sftp.client.listdir_iter():
if S_ISREG(f.st_mode):
- is_processed, fname_short = get_citco_property(f.filename)
- if fname_short not in _cache:
+ if f.filename not in already_uploaded:
_insert_queue = []
with sftp.client.open(f.filename) as fh:
- file_process(fh, fname_short, is_processed, conn)
- _cache[fname_short] = None
+ CitcoSubmission.process(fh, f.filename)
+ CitcoSubmission.commit()
+ already_uploaded[f.filename] = None
+ with open("citco.pickle", "wb") as fh:
+ pickle.dump(already_uploaded, fh)
except (SSHException, OSError):
sftp.client.close()
sftp = SftpClient.from_creds("citco", folder="/outgoing/notifications")