import pandas as pd from db import dbengine import db import datetime df = pd.read_table("/home/share/guillaume/murano leads.csv.txt", encoding="utf-16-le", low_memory=False) ## first drop all NA values df = df.dropna(1, 'all') ## then drop columns with only one values todrop = [] for col in df: if df[col].value_counts(dropna=False).size == 1: todrop.append(col) df = df.drop(todrop, axis=1) ## drop columns with ID in it df = df.drop([col for col in df if 'ID' in col], axis=1) ## cleanup url website = df.Website website = website.replace("http://","") df.loc[website.str.startswith('www').fillna(False),'Website'] = 'http://' + \ website[website.str.startswith('www').fillna(False)] df = df.drop(['Teams', 'Do Not Call', 'Assigned User Name', 'Hit Status'], axis=1) df['Minimum Track Record'] = pd.to_numeric(df['Minimum Track Record'], errors='coerce') for col in ['Date Created', 'Date Modified', 'Last Spoke Date']: df[col] = pd.to_datetime(df[col]) for col in df: if df[col].dtypes == 'object': df.loc[df[col].isnull(),col] = None for col in ['Allocating', 'Allocating to FoHFs']: df.loc[df[col] == 'Active', col] = True df.loc[df[col] == 'Inactive', col] = False df.loc[df[col] == 'Unknown', col] = None ## strip out commas df.loc[df['Non-primary emails'].notnull(),'Non-primary emails'] = \ df.loc[df['Non-primary emails'].notnull(),'Non-primary emails'].str.extract("([^,]*)", expand=False) df.Continent = df.Continent.str.title() for col in ['General Strategy Preferences', 'Specific Strategy Preferences']: df.loc[df[col].notnull(),col] = (df.loc[df[col].notnull(),col]. str.replace("__SugarMassUpdateClearField__,","")) for col in ['Specific Strategy Preferences']: df.loc[df[col].notnull(),col] = (df.loc[df[col].notnull(),col].str. replace("HF_Corporate_Bonds","HF: Corporate Bonds").str. replace("HF_Multi_Asset","HF: Multi Asset").str. replace("OA_Direct_Lending", "OA: Direct Lending").str. replace("PE_Energy", "PE: Energy").str. replace("lo_multi_asset", "LO: Multi Asset").str. replace("PE_Oil_and_Gas", "PE: Oil and Gas")) for col in ['Investor Type', 'Investment Geography', 'General Strategy Preferences', 'Specific Strategy Preferences', 'Volatility Preference', 'Location Preferences']: df.loc[df[col].notnull(),col] = df.loc[df[col].notnull(),col].str.split(",") df['FuM Currency'] = df['FuM Currency'].replace("Unknown", None) ## cleanup country names col = df['Primary Address Country'] col = col.str.strip().str.title() col = col.replace(["Republic Of Ireland", "United State Of America", "The Netherlands"], ["Ireland", "United States Of America", "Netherlands"]) def generate_table(df): print("CREATE TABLE investors(ID serial PRIMARY KEY,") r = [] for col, t in df.dtypes.iteritems(): if col in ['Allocating', 'Allocating to FoHFs']: sql_type = 'boolean' elif col == 'Continent': sql_type = 'Region' elif col == 'Investor Type': sql_type = 'InvestorType[]' elif col == 'FuM Currency': sql_type = 'Currency' elif col in ['Investment Geography', 'Location Preferences']: sql_type = 'Geography[]' elif col == 'Mifid': sql_type = 'Mifid' elif col == 'Specific Strategy Preferences': sql_type = 'Strategy_detailed[]' elif col == 'General Strategy Preferences': sql_type = 'Strategy[]' elif col == 'Volatility Preference': sql_type = 'Volatility_tolerance[]' elif col == 'Last Spoke Date': sql_type = 'date' elif t.name == "object": sql_type = 'text' elif t.name == "datetime64[ns]": sql_type = 'timestamp' elif t.name == 'float64': sql_type = 'smallint' else: raise ValueError("Something fell through the crack") r.append('"{}" {}'.format(col, sql_type)) print(",\n".join(r),")") def list_placeholders(df): r = ['%s'] # ID for col in df: if col == 'Investor Type': r.append("%s::InvestorType[]") elif col in ['Investment Geography', 'Location Preferences']: r.append("%s::Geography[]") elif col == 'Specific Strategy Preferences': r.append('%s::Strategy_detailed[]') elif col == 'General Strategy Preferences': r.append('%s::Strategy[]') elif col == 'Volatility Preference': r.append('%s::Volatility_tolerance[]') else: r.append('%s') return r from psycopg2.extensions import register_adapter, AsIs import psycopg2 def nat_to_null(d, _NULL=AsIs('NULL'), _Timestamp=psycopg2.extensions.TimestampFromPy): if not type(d) == pd.tslib.NaTType: return _Timestamp(d) return _NULL register_adapter(datetime.datetime, nat_to_null) investorsdb = dbengine('investorsdb') sql_str = "INSERT INTO investors VALUES({})".format(",".join(list_placeholders(df))) conn = investorsdb.raw_connection() with conn.cursor() as c: for r in df.itertuples(): try: c.execute(sql_str, r) except db.DataError as e: print(e) conn.commit()