import base64 import email import json import logging import os import sys from apiclient import errors from pathlib import Path from pytz import timezone from send_email import get_gmail_service from email.message import EmailMessage from email.utils import parsedate_to_datetime def ListMessagesWithLabels(service, user_id, label_ids=[]): """List all Messages of the user's mailbox with label_ids applied. Args: service: Authorized Gmail API service instance. user_id: User's email address. The special value "me" can be used to indicate the authenticated user. label_ids: Only return Messages with these labelIds applied. Returns: List of Messages that have all required Labels applied. Note that the returned list contains Message IDs, you must use get with the appropriate id to get the details of a Message. """ try: response = service.users().messages().list(userId=user_id, labelIds=label_ids).execute() messages = [] if 'messages' in response: messages.extend(response['messages']) while 'nextPageToken' in response: page_token = response['nextPageToken'] response = service.users().messages().list(userId=user_id, labelIds=label_ids, pageToken=page_token).execute() messages.extend(response['messages']) return messages except errors.HttpError as error: print(json.loads(error.content.decode('utf-8'))['error']['message']) def ListHistory(service, user_id, label_id=None, start_history_id=10000): """List History of all changes to the user's mailbox. Args: service: Authorized Gmail API service instance. user_id: User's email address. The special value "me" can be used to indicate the authenticated user. start_history_id: Only return Histories at or after start_history_id. Returns: A list of mailbox changes that occurred after the start_history_id. """ try: history = (service.users().history().list(userId=user_id, startHistoryId=start_history_id, historyTypes="messageAdded", labelId=label_id) .execute()) changes = history['history'] if 'history' in history else [] for change in changes: if 'messagesAdded' in change: for c in change['messagesAdded']: yield c['message'] while 'nextPageToken' in history: page_token = history['nextPageToken'] history = (service.users().history().list(userId=user_id, startHistoryId=start_history_id, pageToken=page_token).execute()) for change in history['history']: if 'messagesAdded' in change: for c in change['messagesAdded']: yield c['message'] except errors.HttpError as error: print('An error occurred:', error) def labels_dict(service, user_id): """Returns a dictionary mapping labels to labelids. Args: service: Authorized Gmail API service instance. user_id: User's email address. The special value "me" Returns: dictionary mapping labels to labelids. """ try: response = service.users().labels().list(userId=user_id).execute() labels = response['labels'] return {label['name']: label['id'] for label in labels} except errors.HttpError as error: print(json.loads(error.content.decode('utf-8'))['error']['message']) class GmailMessage(EmailMessage): _service = get_gmail_service() def msgdict(self): return {'raw': base64.urlsafe_b64encode(self.as_bytes()).decode()} def send(self): try: message = (self._service.users().messages(). send(userId='me',body=self.msgdict()) .execute()) print('Message Id: %s' % message['id']) except errors.HttpError as error: print('An error occurred: %s' % error) @classmethod def from_id(cls, msg_id, user_id='me'): try: message = (cls._service.users().messages(). get(userId=user_id, id=msg_id, format='raw').execute()) instance = email.message_from_bytes( base64.urlsafe_b64decode(message['raw']), policy=email.policy.EmailPolicy()) instance.history_id = message['historyId'] return instance except errors.HttpError as error: print(json.loads(error.content.decode('utf-8'))['error']['message']) def save_emails(update=True): """Download new emails that were labeled swaptions.""" labelsdict = labels_dict(GmailMessage._service, 'me') p = Path(os.getenv("DATA_DIR")) / Path('swaptions') if update: try: with open(os.path.join(os.environ['DATA_DIR'], '.lastHistoryId')) as fh: last_history_id = int(fh.read()) except FileNotFoundError: sys.exit() email_list = ListHistory(GmailMessage._service, 'me', label_id=labelsdict['swaptions'], start_history_id=last_history_id) existing_msgs = [] else: email_list = ListMessagesWithLabels(GmailMessage._service, 'me', labelsdict['swaptions']) existing_msgs = set(str(x).split("_")[1] for x in p.iterdir() if x.is_file()) for msg in email_list: if msg['id'] in existing_msgs: continue try: message = GmailMessage.from_id(msg['id']) print(message.history_id) subject = message['subject'] date = parsedate_to_datetime(message['date']) if date.tzinfo is None: date = date.replace(tzinfo=timezone('utc')) date = date.astimezone(timezone('America/New_York')) body = message.get_body('plain') content = body.get_content() except (KeyError, UnicodeDecodeError) as e: logging.error("error decoding " + msg['id']) continue else: email = p / "{:%Y-%m-%d %H-%M-%S}_{}".format(date, msg['id']) with email.open("w") as fh: fh.write(subject + "\r\n") fh.write(content) try: new_history_id = message.history_id with open(os.path.join(os.environ['DATA_DIR'], '.lastHistoryId'), 'w') as fh: fh.write(new_history_id) except UnboundLocalError: pass # for msg in ListMessagesWithLabels(GmailMessage._service, 'me', labelsdict['swaptions']): # if msg['id'] not in current_msgs: # try: if __name__ == '__main__': save_emails()