import base64 import email import json import logging import os import sys from apiclient import errors from pathlib import Path from pytz import timezone from send_email import get_gmail_service from email.message import EmailMessage from email.utils import parsedate_to_datetime def ListMessagesWithLabels(service, user_id, label_ids=[]): """List all Messages of the user's mailbox with label_ids applied. Args: service: Authorized Gmail API service instance. user_id: User's email address. The special value "me" can be used to indicate the authenticated user. label_ids: Only return Messages with these labelIds applied. Returns: List of Messages that have all required Labels applied. Note that the returned list contains Message IDs, you must use get with the appropriate id to get the details of a Message. """ try: response = service.users().messages().list(userId=user_id, labelIds=label_ids).execute() if 'messages' in response: yield from response['messages'] while 'nextPageToken' in response: page_token = response['nextPageToken'] response = service.users().messages().list(userId=user_id, labelIds=label_ids, pageToken=page_token).execute() yield from response['messages'] except errors.HttpError as error: print(json.loads(error.content.decode('utf-8'))['error']['message']) def ListHistory(service, user_id, label_id=None, start_history_id=10000): """List History of all changes to the user's mailbox. Args: service: Authorized Gmail API service instance. user_id: User's email address. The special value "me" can be used to indicate the authenticated user. start_history_id: Only return Histories at or after start_history_id. Returns: A list of mailbox changes that occurred after the start_history_id. """ history = (service.users().history().list(userId=user_id, startHistoryId=start_history_id, historyTypes="messageAdded", labelId=label_id) .execute()) changes = history['history'] if 'history' in history else [] for change in changes: if 'messagesAdded' in change: for c in change['messagesAdded']: yield c['message'] while 'nextPageToken' in history: page_token = history['nextPageToken'] history = (service.users().history().list(userId=user_id, startHistoryId=start_history_id, pageToken=page_token).execute()) for change in history['history']: if 'messagesAdded' in change: for c in change['messagesAdded']: yield c['message'] def labels_dict(service, user_id): """Returns a dictionary mapping labels to labelids. Args: service: Authorized Gmail API service instance. user_id: User's email address. The special value "me" Returns: dictionary mapping labels to labelids. """ try: response = service.users().labels().list(userId=user_id).execute() labels = response['labels'] return {label['name']: label['id'] for label in labels} except errors.HttpError as error: print(json.loads(error.content.decode('utf-8'))['error']['message']) class GmailMessage(EmailMessage): _labels = labels_dict(EmailMessage._service, 'me') def msgdict(self): return {'raw': base64.urlsafe_b64encode(self.as_bytes()).decode()} def send(self): try: message = (self._service.users().messages(). send(userId='me',body=self.msgdict()) .execute()) print('Message Id: %s' % message['id']) except errors.HttpError as error: print('An error occurred: %s' % error) @staticmethod def list_msg_ids(label, start_history_id=None): if start_history_id is not None: return ListHistory(EmailMessage._service, 'me', label_id=GmailMessage._labels[label], start_history_id=start_history_id) else: return ListMessagesWithLabels(EmailMessage._service, 'me', label_ids=[GmailMessage._labels[label]]) @classmethod def from_id(cls, msg_id, user_id='me'): try: message = (cls._service.users().messages(). get(userId=user_id, id=msg_id, format='raw').execute()) instance = email.message_from_bytes( base64.urlsafe_b64decode(message['raw']), policy=email.policy.EmailPolicy()) instance.history_id = message['historyId'] return instance except errors.HttpError as error: print(json.loads(error.content.decode('utf-8'))['error']['message']) def save_emails(update=True): """Download new emails that were labeled swaptions.""" DATA_DIR = Path(os.getenv("DATA_DIR")) if update: try: last_history_id = int((DATA_DIR / ".lastHistoryId").read_text()) except FileNotFoundError: logging.error("can't find .lastHistoryId file") sys.exit() existing_msgs = [] else: p = DATA_DIR / "swaptions" existing_msgs = set(str(x).split("_")[1] for x in p.iterdir() if x.is_file()) last_history_id = None for msg in GmailMessage.list_msg_ids('swaptions', last_history_id): if msg['id'] in existing_msgs: continue try: message = GmailMessage.from_id(msg['id']) logging.info(message.history_id) subject = message['subject'] date = parsedate_to_datetime(message['date']) if date.tzinfo is None: date = date.replace(tzinfo=timezone('utc')) date = date.astimezone(timezone('America/New_York')) body = message.get_body('plain') content = body.get_content() except (KeyError, UnicodeDecodeError, AttributeError) as e: logging.error("error decoding " + msg['id']) continue else: email = (DATA_DIR / "swaptions" / f"{date:%Y-%m-%d %H-%M-%S}_{msg['id']}") with email.open("w") as fh: fh.write(subject + "\r\n") fh.write(content) try: new_history_id = message.history_id (DATA_DIR / ".lastHistoryId").write_text(message.history_id) except UnboundLocalError: pass if __name__ == '__main__': try: save_emails() except errors.HttpError as e: logging.error(e) save_emails(update=False)