aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/collateral/__init__.py1
-rw-r--r--python/collateral/baml_fcm.py20
-rw-r--r--python/collateral/wells.py51
-rw-r--r--python/process_queue.py44
-rw-r--r--python/remote.py82
5 files changed, 103 insertions, 95 deletions
diff --git a/python/collateral/__init__.py b/python/collateral/__init__.py
index 2c13d5c8..ab60aae6 100644
--- a/python/collateral/__init__.py
+++ b/python/collateral/__init__.py
@@ -9,6 +9,7 @@ except KeyError:
from utils import SerenitasFileHandler
from dates import bus_day
+from remote import SftpClient, SftpClient2
fh = SerenitasFileHandler("collateral_calc.log")
logger = logging.getLogger(__name__)
diff --git a/python/collateral/baml_fcm.py b/python/collateral/baml_fcm.py
index d6a676f8..b6bfddfb 100644
--- a/python/collateral/baml_fcm.py
+++ b/python/collateral/baml_fcm.py
@@ -1,25 +1,13 @@
-from . import DAILY_DIR
+from . import DAILY_DIR, SftpClient
from .common import compare_notionals, STRATEGY_CASH_MAPPING
-from paramiko import Transport, SFTPClient, RSAKey
-import os.path
+from functools import partial
import pandas as pd
from sqlalchemy.exc import IntegrityError
-def get_sftp_client():
- transport = Transport(("ftps.b2b.ml.com", 22))
- pkey = RSAKey.from_private_key_file(os.path.expanduser("~/.ssh/id_rsa_lmcg"))
- transport.connect(username="lmcginvs", pkey=pkey)
- return SFTPClient.from_transport(transport)
+sftp = SftpClient.from_creds("baml_fcm")
-
-def download_files(d=None):
- DATA_DIR = DAILY_DIR / "BAML_reports"
- sftp = get_sftp_client()
- for f in sftp.listdir("outgoing"):
- local_file = DATA_DIR / f
- if not local_file.exists():
- sftp.get(f"outgoing/{f}", localpath=DATA_DIR / f)
+download_files = partial(sftp.download_files, "outgoing", DAILY_DIR / "BAML_reports")
def collateral(d, positions, engine):
diff --git a/python/collateral/wells.py b/python/collateral/wells.py
index affd99a8..e47142ee 100644
--- a/python/collateral/wells.py
+++ b/python/collateral/wells.py
@@ -1,54 +1,15 @@
import pandas as pd
-import socket
-from . import DAILY_DIR
+from . import DAILY_DIR, SftpClient2
from .common import compare_notionals, STRATEGY_CASH_MAPPING
-from paramiko import Transport, SFTPClient
+from functools import partial
from sqlalchemy.exc import IntegrityError
-from ssh2.session import Session
-from ssh2.sftp import LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR, LIBSSH2_SFTP_S_IFREG
-def get_wells_sftp_client():
- transport = Transport(("axst.wellsfargo.com", 10022))
- transport.connect(username="LMCHsWC6EP", password="HI2s2h19+")
- return SFTPClient.from_transport(transport)
+sftp = SftpClient2.from_creds("wells")
-
-def get_wells_sftp_client2():
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect(("axst.wellsfargo.com", 10022))
- session = Session()
- session.handshake(sock)
- session.userauth_password("LMCHsWC6EP", "HI2s2h19+")
- sftp = session.sftp_init()
- return sftp
-
-
-def download_files2(d=None):
- DATA_DIR = DAILY_DIR / "Wells_reports"
- sftp = get_wells_sftp_client()
- base_dir = "/RECEIVE/339425_DATO2"
- for f in sftp.listdir(base_dir):
- if not (DATA_DIR / f).exists():
- sftp.get(f"{base_dir}/{f}", localpath=DATA_DIR / f)
-
-
-def download_files(d=None):
- DATA_DIR = DAILY_DIR / "Wells_reports"
- sftp = get_wells_sftp_client2()
- files = []
- with sftp.opendir("/RECEIVE/339425_DATO2") as fh:
- for size, buf, attrs in fh.readdir():
- if attrs.permissions & LIBSSH2_SFTP_S_IFREG:
- files.append(buf.decode())
- for f in files:
- local_file = DATA_DIR / f
- if not local_file.exists():
- with sftp.open(
- f"/RECEIVE/339425_DATO2/{f}", LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR
- ) as remote_handle, local_file.open("wb") as local_handle:
- for size, data in remote_handle:
- local_handle.write(data)
+download_files = partial(
+ sftp.download_files, "/RECEIVE/339425_DATO2", DAILY_DIR / "Wells_reports"
+)
def collateral(d, positions, engine):
diff --git a/python/process_queue.py b/python/process_queue.py
index a588cea8..42e666a3 100644
--- a/python/process_queue.py
+++ b/python/process_queue.py
@@ -2,7 +2,6 @@ import argparse
import blpapi
import csv
import datetime
-import json
import logging
import psycopg2
import pathlib
@@ -15,14 +14,12 @@ from io import StringIO
from analytics import CreditIndex
from analytics.utils import bus_day
from itertools import groupby
-from pathlib import Path
from pickle import loads
-from ftplib import FTP
from bbg_helpers import init_bbg_session, retrieve_data, BBG_IP
from common import get_redis_queue
from functools import partial
-from paramiko import Transport, SFTPClient
from pyisda.date import previous_twentieth
+from remote import FtpClient, SftpClient
from utils.db import dbconn
from quantlib.time.api import pydate_from_qldate, UnitedStates, Days, Date
from gmail_helpers import GmailMessage
@@ -293,26 +290,6 @@ HEADERS = {
}
-def load_credentials(name):
- return json.load((Path(".credentials") / f"{name}.json").open())
-
-
-def get_sftp_client(host, port, username, password, folder=None):
- transport = Transport((host, port))
- transport.connect(username=username, password=password)
- sftp = SFTPClient.from_transport(transport)
- if folder is not None:
- sftp.chdir(folder)
- return sftp
-
-
-def get_ftp_client(host, username, password, folder=None):
- ftp = FTP(host, username, password)
- if folder is not None:
- ftp.cwd(folder)
- return ftp
-
-
def get_headers(trade_type, fund):
headers = HEADERS[trade_type]
if trade_type == "bond" and fund == "BOWDST":
@@ -860,23 +837,22 @@ def get_filepath(
return (
base_dir
/ str(timestamp.date())
- / f"Bowdst.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[trade_type]}.csv"
+ / f"LMC01CFE.Bowdst.ALL.{timestamp:%Y%m%d.%H%M%S}.{d[trade_type]}.csv"
)
def upload_file(file_path: pathlib.Path) -> None:
if "BBH" in file_path.name:
- return
+ sftp = SftpClient.from_creds("bbh")
+ sftp.put(file_path)
elif file_path.name.startswith("Serenitas"):
- ftp = get_ftp_client(**{**load_credentials("globeop"), "folder": "incoming"})
- cmd = f"STOR {file_path.name}"
- with file_path.open("rb") as fh:
- ftp.storbinary(cmd, fh)
+ ftp = FtpClient.from_creds("globeop")
+ ftp.client.cwd("incoming")
+ ftp.put(file_path)
elif file_path.name.startswith("Bowdst"):
- sftp = get_sftp_client(**load_credentials("bony"))
- sftp.chdir("/inbound/cfe/")
- with file_path.open("rb") as fh:
- sftp.putfo(fh, f"LMC01CFE.{file_path.name}")
+ sftp = SftpClient.from_creds("bony")
+ sftp.client.chdir("/inbound/cfe/")
+ sftp.put(file_path)
def write_buffer(
diff --git a/python/remote.py b/python/remote.py
new file mode 100644
index 00000000..525ec814
--- /dev/null
+++ b/python/remote.py
@@ -0,0 +1,82 @@
+import json
+import pathlib
+import socket
+
+from ftplib import FTP
+from paramiko import Transport, SFTPClient, RSAKey
+from pathlib import Path
+from ssh2.session import Session
+from ssh2.sftp import LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR, LIBSSH2_SFTP_S_IFREG
+
+
+def load_credentials(name):
+ return json.load((Path(".credentials") / f"{name}.json").open())
+
+
+class Client:
+ @classmethod
+ def from_creds(cls, name: str):
+ args = json.load((Path(".credentials") / f"{name}.json").open())
+ print(cls)
+ return cls(**args)
+
+
+class FtpClient(Client):
+ def __init__(self, host, username, password, folder=None):
+ self.client = FTP(host, username, password)
+ if folder is not None:
+ self.client.cwd(folder)
+
+ def put(self, src: pathlib.Path, dst: str = None):
+ if dst is None:
+ dst = src.name
+ with src.open("rb") as fh:
+ self.client.storbinary(f"STOR {dst}", fh)
+
+
+class SftpClient(Client):
+ def __init__(self, host, port, username, password=None, key=None, folder=None):
+ transport = Transport((host, port))
+ if key is not None:
+ pkey = RSAKey.from_private_key_file(Path.home() / ".ssh" / key)
+ transport.connect(username=username, password=password, pkey=pkey)
+ self.client = SFTPClient.from_transport(transport)
+ if folder is not None:
+ self.client.chdir(folder)
+
+ def download_files(self, src: str, dst: pathlib.Path, *args):
+ for f in self.client.listdir(src):
+ local_file = dst / f
+ if not local_file.exists():
+ self.client.get(f"{src}/{f}", localpath=local_file)
+
+ def put(self, src: pathlib.Path, dst: str = None):
+ if dst is None:
+ dst = src.name
+ with src.open("rb") as fh:
+ self.client.putfo(fh, dst)
+
+
+class SftpClient2(Client):
+ def __init__(self, host, port, username, password):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((host, port))
+ session = Session()
+ session.handshake(sock)
+ session.userauth_password(username, password)
+ self.client = session.sftp_init()
+
+ def download_files(self, src: str, dst: pathlib.Path, *args):
+ files = []
+ with self.client.opendir(src) as fh:
+ for size, buf, attrs in fh.readdir():
+ if attrs.permissions & LIBSSH2_SFTP_S_IFREG:
+ files.append(buf.decode())
+ for f in files:
+ local_file = dst / f
+ if not local_file.exists():
+ with self.client.open(
+ f"{src}/{f}", LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR
+ ) as remote_handle, local_file.open("wb") as local_handle:
+ for size, data in remote_handle:
+ local_handle.write(data)