diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/download_emails.py | 168 | ||||
| -rw-r--r-- | python/parse_emails.py | 4 |
2 files changed, 124 insertions, 48 deletions
diff --git a/python/download_emails.py b/python/download_emails.py index bc338fde..21d5794a 100644 --- a/python/download_emails.py +++ b/python/download_emails.py @@ -1,18 +1,22 @@ -from datetime import datetime +import argparse +import base64 +import email +import json +import logging +import oauth2client import os -from pathlib import Path +import sys from apiclient.discovery import build from apiclient import errors +from datetime import datetime from httplib2 import Http -import oauth2client from oauth2client import client -import json -import base64 -import binascii +from pathlib import Path +from pytz import timezone from send_email import get_gmail_service -import argparse -import logging +from email.message import EmailMessage +from email.utils import parsedate_to_datetime def ListMessagesWithLabels(service, user_id, label_ids=[]): """List all Messages of the user's mailbox with label_ids applied. @@ -45,6 +49,43 @@ def ListMessagesWithLabels(service, user_id, label_ids=[]): except errors.HttpError as error: print(json.loads(error.content.decode('utf-8'))['error']['message']) +def ListHistory(service, user_id, label_id=None, start_history_id=10000): + """List History of all changes to the user's mailbox. + + Args: + service: Authorized Gmail API service instance. + user_id: User's email address. The special value "me" + can be used to indicate the authenticated user. + start_history_id: Only return Histories at or after start_history_id. + + Returns: + A list of mailbox changes that occurred after the start_history_id. + """ + try: + history = (service.users().history().list(userId=user_id, + startHistoryId=start_history_id, + historyTypes="messageAdded", + labelId=label_id) + .execute()) + changes = history['history'] if 'history' in history else [] + for change in changes: + if 'messagesAdded' in change: + for c in change['messagesAdded']: + yield c['message'] + + while 'nextPageToken' in history: + page_token = history['nextPageToken'] + history = (service.users().history().list(userId=user_id, + startHistoryId=start_history_id, + pageToken=page_token).execute()) + for change in history['history']: + if 'messagesAdded' in change: + for c in change['messagesAdded']: + yield c['message'] + + except errors.HttpError as error: + print('An error occurred:', error) + def labels_dict(service, user_id): """Returns a dictionary mapping labels to labelids. @@ -62,48 +103,83 @@ def labels_dict(service, user_id): except errors.HttpError as error: print(json.loads(error.content.decode('utf-8'))['error']['message']) -def get_msg(service, user_id, msg_id): - try: - message = service.users().messages().get(userId=user_id, id=msg_id, format='full').execute() - return message - except errors.HttpError as error: - print(json.loads(error.content.decode('utf-8'))['error']['message']) +class GmailMessage(EmailMessage): + _service = get_gmail_service() -def msg_content(msg): - """Extract subject and body from a gmail message""" - subject = [x['value'] for x in msg['payload']['headers'] if x['name']=='Subject'][0] - payload = msg['payload'] - if payload['mimeType'] == 'text/plain': - body = payload['body'] - elif payload['mimeType'] == 'multipart/alternative': - parts = payload['parts'] - body = parts[0]['body'] - else: - raise KeyError - content = base64.urlsafe_b64decode(body['data']).decode('utf-8') - date = msg['internalDate'] ## date /1000 to get timestamp - return subject, content, date + def msgdict(self): + return {'raw': base64.urlsafe_b64encode(self.as_bytes()).decode()} + + def send(self): + try: + message = (self._service.users().messages(). + send(userId='me',body=self.msgdict()) + .execute()) + print('Message Id: %s' % message['id']) + except errors.HttpError as error: + print('An error occurred: %s' % error) -def update_emails(): + @classmethod + def from_id(cls, msg_id, user_id='me'): + try: + message = (cls._service.users().messages(). + get(userId=user_id, id=msg_id, format='raw').execute()) + instance = email.message_from_bytes( + base64.urlsafe_b64decode(message['raw']), + policy=email.policy.EmailPolicy()) + instance.history_id = message['historyId'] + return instance + except errors.HttpError as error: + print(json.loads(error.content.decode('utf-8'))['error']['message']) + + +def save_emails(update=True): """Download new emails that were labeled swaptions.""" - service = get_gmail_service() - labelsdict = labels_dict(service, 'me') + labelsdict = labels_dict(GmailMessage._service, 'me') p = Path(os.getenv("DATA_DIR")) / Path('swaptions') - current_msgs = set([f.name for f in p.iterdir() if f.is_file()]) - for msg in ListMessagesWithLabels(service, 'me', labelsdict['swaptions']): - if msg['id'] not in current_msgs: - try: - subject, content, date = msg_content(get_msg(service, 'me', msg['id'])) - except (binascii.Error, KeyError, UnicodeDecodeError) as e: - logging.error("error decoding " + msg['id']) - continue - else: - email = p / msg['id'] - with email.open("w") as fh: - fh.write(date + "\r\n") - fh.write(subject + "\r\n") - fh.write(content) + try: + with open(os.path.join(os.environ['DATA_DIR'], '.lastHistoryId')) as fh: + last_history_id = int(fh.read()) + except FileNotFoundError: + sys.exit() + if update: + email_list = ListHistory(GmailMessage._service, + 'me', + label_id=labelsdict['swaptions'], + start_history_id=last_history_id) + else: + email_list = ListMessagesWithLabels(GmailMessage._service, 'me', labelsdict['swaptions']) + + for msg in email_list: + try: + message = GmailMessage.from_id(msg['id']) + print(message.history_id) + subject = message['subject'] + date = parsedate_to_datetime(message['date']) + if date.tzinfo is None: + date = date.replace(tzinfo=timezone('utc')) + body = message.get_body('plain') + content = body.get_content() + except (KeyError, UnicodeDecodeError) as e: + logging.error("error decoding " + msg['id']) + continue + else: + email = p / msg['id'] + with email.open("w") as fh: + fh.write("{:.0f}\r\n".format(date.timestamp()*1000)) + fh.write(subject + "\r\n") + fh.write(content) + try: + new_history_id = msg['id'] + with open(os.path.join(os.environ['DATA_DIR'], '.lastHistoryId'), 'w') as fh: + fh.write(new_history_id) + except UnboundLocalError: + pass + + # for msg in ListMessagesWithLabels(GmailMessage._service, 'me', labelsdict['swaptions']): + # if msg['id'] not in current_msgs: + # try: + if __name__ == '__main__': - update_emails() + save_emails() diff --git a/python/parse_emails.py b/python/parse_emails.py index 8b5612d5..32c1f6d6 100644 --- a/python/parse_emails.py +++ b/python/parse_emails.py @@ -4,7 +4,7 @@ import os import pdb from db import dbconn import psycopg2.sql as sql -from download_emails import update_emails +from download_emails import save_emails import datetime import logging import pickle @@ -374,7 +374,7 @@ def pickle_drop_date(date): pickle.dump(newdict, fh) if __name__=="__main__": - update_emails() + save_emails() data_dir = os.path.join(os.getenv("DATA_DIR"), "swaptions") emails = [f for f in os.scandir(data_dir) if f.is_file()] swaption_stack = {} |
