aboutsummaryrefslogtreecommitdiffstats
path: root/python/download_emails.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/download_emails.py')
-rw-r--r--python/download_emails.py168
1 files changed, 122 insertions, 46 deletions
diff --git a/python/download_emails.py b/python/download_emails.py
index bc338fde..21d5794a 100644
--- a/python/download_emails.py
+++ b/python/download_emails.py
@@ -1,18 +1,22 @@
-from datetime import datetime
+import argparse
+import base64
+import email
+import json
+import logging
+import oauth2client
import os
-from pathlib import Path
+import sys
from apiclient.discovery import build
from apiclient import errors
+from datetime import datetime
from httplib2 import Http
-import oauth2client
from oauth2client import client
-import json
-import base64
-import binascii
+from pathlib import Path
+from pytz import timezone
from send_email import get_gmail_service
-import argparse
-import logging
+from email.message import EmailMessage
+from email.utils import parsedate_to_datetime
def ListMessagesWithLabels(service, user_id, label_ids=[]):
"""List all Messages of the user's mailbox with label_ids applied.
@@ -45,6 +49,43 @@ def ListMessagesWithLabels(service, user_id, label_ids=[]):
except errors.HttpError as error:
print(json.loads(error.content.decode('utf-8'))['error']['message'])
+def ListHistory(service, user_id, label_id=None, start_history_id=10000):
+ """List History of all changes to the user's mailbox.
+
+ Args:
+ service: Authorized Gmail API service instance.
+ user_id: User's email address. The special value "me"
+ can be used to indicate the authenticated user.
+ start_history_id: Only return Histories at or after start_history_id.
+
+ Returns:
+ A list of mailbox changes that occurred after the start_history_id.
+ """
+ try:
+ history = (service.users().history().list(userId=user_id,
+ startHistoryId=start_history_id,
+ historyTypes="messageAdded",
+ labelId=label_id)
+ .execute())
+ changes = history['history'] if 'history' in history else []
+ for change in changes:
+ if 'messagesAdded' in change:
+ for c in change['messagesAdded']:
+ yield c['message']
+
+ while 'nextPageToken' in history:
+ page_token = history['nextPageToken']
+ history = (service.users().history().list(userId=user_id,
+ startHistoryId=start_history_id,
+ pageToken=page_token).execute())
+ for change in history['history']:
+ if 'messagesAdded' in change:
+ for c in change['messagesAdded']:
+ yield c['message']
+
+ except errors.HttpError as error:
+ print('An error occurred:', error)
+
def labels_dict(service, user_id):
"""Returns a dictionary mapping labels to labelids.
@@ -62,48 +103,83 @@ def labels_dict(service, user_id):
except errors.HttpError as error:
print(json.loads(error.content.decode('utf-8'))['error']['message'])
-def get_msg(service, user_id, msg_id):
- try:
- message = service.users().messages().get(userId=user_id, id=msg_id, format='full').execute()
- return message
- except errors.HttpError as error:
- print(json.loads(error.content.decode('utf-8'))['error']['message'])
+class GmailMessage(EmailMessage):
+ _service = get_gmail_service()
-def msg_content(msg):
- """Extract subject and body from a gmail message"""
- subject = [x['value'] for x in msg['payload']['headers'] if x['name']=='Subject'][0]
- payload = msg['payload']
- if payload['mimeType'] == 'text/plain':
- body = payload['body']
- elif payload['mimeType'] == 'multipart/alternative':
- parts = payload['parts']
- body = parts[0]['body']
- else:
- raise KeyError
- content = base64.urlsafe_b64decode(body['data']).decode('utf-8')
- date = msg['internalDate'] ## date /1000 to get timestamp
- return subject, content, date
+ def msgdict(self):
+ return {'raw': base64.urlsafe_b64encode(self.as_bytes()).decode()}
+
+ def send(self):
+ try:
+ message = (self._service.users().messages().
+ send(userId='me',body=self.msgdict())
+ .execute())
+ print('Message Id: %s' % message['id'])
+ except errors.HttpError as error:
+ print('An error occurred: %s' % error)
-def update_emails():
+ @classmethod
+ def from_id(cls, msg_id, user_id='me'):
+ try:
+ message = (cls._service.users().messages().
+ get(userId=user_id, id=msg_id, format='raw').execute())
+ instance = email.message_from_bytes(
+ base64.urlsafe_b64decode(message['raw']),
+ policy=email.policy.EmailPolicy())
+ instance.history_id = message['historyId']
+ return instance
+ except errors.HttpError as error:
+ print(json.loads(error.content.decode('utf-8'))['error']['message'])
+
+
+def save_emails(update=True):
"""Download new emails that were labeled swaptions."""
- service = get_gmail_service()
- labelsdict = labels_dict(service, 'me')
+ labelsdict = labels_dict(GmailMessage._service, 'me')
p = Path(os.getenv("DATA_DIR")) / Path('swaptions')
- current_msgs = set([f.name for f in p.iterdir() if f.is_file()])
- for msg in ListMessagesWithLabels(service, 'me', labelsdict['swaptions']):
- if msg['id'] not in current_msgs:
- try:
- subject, content, date = msg_content(get_msg(service, 'me', msg['id']))
- except (binascii.Error, KeyError, UnicodeDecodeError) as e:
- logging.error("error decoding " + msg['id'])
- continue
- else:
- email = p / msg['id']
- with email.open("w") as fh:
- fh.write(date + "\r\n")
- fh.write(subject + "\r\n")
- fh.write(content)
+ try:
+ with open(os.path.join(os.environ['DATA_DIR'], '.lastHistoryId')) as fh:
+ last_history_id = int(fh.read())
+ except FileNotFoundError:
+ sys.exit()
+ if update:
+ email_list = ListHistory(GmailMessage._service,
+ 'me',
+ label_id=labelsdict['swaptions'],
+ start_history_id=last_history_id)
+ else:
+ email_list = ListMessagesWithLabels(GmailMessage._service, 'me', labelsdict['swaptions'])
+
+ for msg in email_list:
+ try:
+ message = GmailMessage.from_id(msg['id'])
+ print(message.history_id)
+ subject = message['subject']
+ date = parsedate_to_datetime(message['date'])
+ if date.tzinfo is None:
+ date = date.replace(tzinfo=timezone('utc'))
+ body = message.get_body('plain')
+ content = body.get_content()
+ except (KeyError, UnicodeDecodeError) as e:
+ logging.error("error decoding " + msg['id'])
+ continue
+ else:
+ email = p / msg['id']
+ with email.open("w") as fh:
+ fh.write("{:.0f}\r\n".format(date.timestamp()*1000))
+ fh.write(subject + "\r\n")
+ fh.write(content)
+ try:
+ new_history_id = msg['id']
+ with open(os.path.join(os.environ['DATA_DIR'], '.lastHistoryId'), 'w') as fh:
+ fh.write(new_history_id)
+ except UnboundLocalError:
+ pass
+
+ # for msg in ListMessagesWithLabels(GmailMessage._service, 'me', labelsdict['swaptions']):
+ # if msg['id'] not in current_msgs:
+ # try:
+
if __name__ == '__main__':
- update_emails()
+ save_emails()