import argparse import base64 import email import json import logging import oauth2client import os import sys from apiclient.discovery import build from apiclient import errors from datetime import datetime from httplib2 import Http from oauth2client import client from pathlib import Path from pytz import timezone from send_email import get_gmail_service from email.message import EmailMessage from email.utils import parsedate_to_datetime def ListMessagesWithLabels(service, user_id, label_ids=[]): """List all Messages of the user's mailbox with label_ids applied. Args: service: Authorized Gmail API service instance. user_id: User's email address. The special value "me" can be used to indicate the authenticated user. label_ids: Only return Messages with these labelIds applied. Returns: List of Messages that have all required Labels applied. Note that the returned list contains Message IDs, you must use get with the appropriate id to get the details of a Message. """ try: response = service.users().messages().list(userId=user_id, labelIds=label_ids).execute() messages = [] if 'messages' in response: messages.extend(response['messages']) while 'nextPageToken' in response: page_token = response['nextPageToken'] response = service.users().messages().list(userId=user_id, labelIds=label_ids, pageToken=page_token).execute() messages.extend(response['messages']) return messages except errors.HttpError as error: print(json.loads(error.content.decode('utf-8'))['error']['message']) def ListHistory(service, user_id, label_id=None, start_history_id=10000): """List History of all changes to the user's mailbox. Args: service: Authorized Gmail API service instance. user_id: User's email address. The special value "me" can be used to indicate the authenticated user. start_history_id: Only return Histories at or after start_history_id. Returns: A list of mailbox changes that occurred after the start_history_id. """ try: history = (service.users().history().list(userId=user_id, startHistoryId=start_history_id, historyTypes="messageAdded", labelId=label_id) .execute()) changes = history['history'] if 'history' in history else [] for change in changes: if 'messagesAdded' in change: for c in change['messagesAdded']: yield c['message'] while 'nextPageToken' in history: page_token = history['nextPageToken'] history = (service.users().history().list(userId=user_id, startHistoryId=start_history_id, pageToken=page_token).execute()) for change in history['history']: if 'messagesAdded' in change: for c in change['messagesAdded']: yield c['message'] except errors.HttpError as error: print('An error occurred:', error) def labels_dict(service, user_id): """Returns a dictionary mapping labels to labelids. Args: service: Authorized Gmail API service instance. user_id: User's email address. The special value "me" Returns: dictionary mapping labels to labelids. """ try: response = service.users().labels().list(userId=user_id).execute() labels = response['labels'] return {label['name']: label['id'] for label in labels} except errors.HttpError as error: print(json.loads(error.content.decode('utf-8'))['error']['message']) class GmailMessage(EmailMessage): _service = get_gmail_service() def msgdict(self): return {'raw': base64.urlsafe_b64encode(self.as_bytes()).decode()} def send(self): try: message = (self._service.users().messages(). send(userId='me',body=self.msgdict()) .execute()) print('Message Id: %s' % message['id']) except errors.HttpError as error: print('An error occurred: %s' % error) @classmethod def from_id(cls, msg_id, user_id='me'): try: message = (cls._service.users().messages(). get(userId=user_id, id=msg_id, format='raw').execute()) instance = email.message_from_bytes( base64.urlsafe_b64decode(message['raw']), policy=email.policy.EmailPolicy()) instance.history_id = message['historyId'] return instance except errors.HttpError as error: print(json.loads(error.content.decode('utf-8'))['error']['message']) def save_emails(update=True): """Download new emails that were labeled swaptions.""" labelsdict = labels_dict(GmailMessage._service, 'me') p = Path(os.getenv("DATA_DIR")) / Path('swaptions') try: with open(os.path.join(os.environ['DATA_DIR'], '.lastHistoryId')) as fh: last_history_id = int(fh.read()) except FileNotFoundError: sys.exit() if update: email_list = ListHistory(GmailMessage._service, 'me', label_id=labelsdict['swaptions'], start_history_id=last_history_id) else: email_list = ListMessagesWithLabels(GmailMessage._service, 'me', labelsdict['swaptions']) for msg in email_list: try: message = GmailMessage.from_id(msg['id']) print(message.history_id) subject = message['subject'] date = parsedate_to_datetime(message['date']) if date.tzinfo is None: date = date.replace(tzinfo=timezone('utc')) date = date.astimezone(timezone('America/New_York')) body = message.get_body('plain') content = body.get_content() except (KeyError, UnicodeDecodeError) as e: logging.error("error decoding " + msg['id']) continue else: email = p / "{:%Y-%m-%d %H-%M-%S}_{}".format(date, msg['id']) with email.open("w") as fh: fh.write(subject + "\r\n") fh.write(content) try: new_history_id = message.history_id with open(os.path.join(os.environ['DATA_DIR'], '.lastHistoryId'), 'w') as fh: fh.write(new_history_id) except UnboundLocalError: pass # for msg in ListMessagesWithLabels(GmailMessage._service, 'me', labelsdict['swaptions']): # if msg['id'] not in current_msgs: # try: if __name__ == '__main__': save_emails()