diff options
Diffstat (limited to 'python/gmail_helpers.py')
| -rw-r--r-- | python/gmail_helpers.py | 228 |
1 files changed, 0 insertions, 228 deletions
diff --git a/python/gmail_helpers.py b/python/gmail_helpers.py deleted file mode 100644 index 7b47ce10..00000000 --- a/python/gmail_helpers.py +++ /dev/null @@ -1,228 +0,0 @@ -from apiclient.discovery import build -from apiclient import errors -from email.message import EmailMessage -from google.oauth2.credentials import Credentials -from google_auth_oauthlib.flow import InstalledAppFlow - -import argparse -import base64 -import email -import json -import oauth2client -import os - -SCOPES = ["https://www.googleapis.com/auth/gmail.modify"] -CLIENT_SECRET_FILE = "secret.json" -APPLICATION_NAME = "Swaptions" - - -def get_gmail_service(): - """Gets valid user credentials from storage. - - If nothing has been stored, or if the stored credentials are invalid, - the OAuth2 flow is completed to obtain the new credentials. - - Returns: - Credentials, the obtained credential. - """ - credential_dir = ".credentials" - if not os.path.exists(credential_dir): - os.makedirs(credential_dir) - credential_path = os.path.join( - credential_dir, "guillaume.horel@serenitascapital.com.json" - ) - - try: - credentials = Credentials.from_authorized_user_file(credential_path) - except: - flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET_FILE, SCOPES) - credentials = flow.run_console() - to_save = {} - for attr in [ - "token", - "refresh_token", - "id_token", - "token_uri", - "client_id", - "client_secret", - "scopes", - ]: - to_save[attr] = getattr(credentials, attr) - with open(credential_path, "w") as fh: - json.dump(to_save, fh) - service = build("gmail", "v1", credentials=credentials) - return service - - -def ListMessagesWithLabels(service, user_id, label_ids=[]): - """List all Messages of the user's mailbox with label_ids applied. - - Args: - service: Authorized Gmail API service instance. - user_id: User's email address. The special value "me" - can be used to indicate the authenticated user. - label_ids: Only return Messages with these labelIds applied. - - Returns: - List of Messages that have all required Labels applied. Note that the - returned list contains Message IDs, you must use get with the - appropriate id to get the details of a Message. - """ - try: - response = ( - service.users() - .messages() - .list(userId=user_id, labelIds=label_ids) - .execute() - ) - if "messages" in response: - yield from response["messages"] - while "nextPageToken" in response: - page_token = response["nextPageToken"] - response = ( - service.users() - .messages() - .list(userId=user_id, labelIds=label_ids, pageToken=page_token) - .execute() - ) - yield from response["messages"] - - except errors.HttpError as error: - print(json.loads(error.content.decode("utf-8"))["error"]["message"]) - - -def ListHistory(service, user_id, label_id=None, start_history_id=10000): - """List History of all changes to the user's mailbox. - - Args: - service: Authorized Gmail API service instance. - user_id: User's email address. The special value "me" - can be used to indicate the authenticated user. - start_history_id: Only return Histories at or after start_history_id. - - Returns: - A list of mailbox changes that occurred after the start_history_id. - """ - history = ( - service.users() - .history() - .list( - userId=user_id, - startHistoryId=start_history_id, - historyTypes="messageAdded", - labelId=label_id, - ) - .execute() - ) - changes = history["history"] if "history" in history else [] - for change in changes: - if "messagesAdded" in change: - for c in change["messagesAdded"]: - yield c["message"] - - while "nextPageToken" in history: - page_token = history["nextPageToken"] - history = ( - service.users() - .history() - .list(userId=user_id, startHistoryId=start_history_id, pageToken=page_token) - .execute() - ) - for change in history["history"]: - if "messagesAdded" in change: - for c in change["messagesAdded"]: - yield c["message"] - - -def labels_dict(service, user_id): - """Returns a dictionary mapping labels to labelids. - - Args: - service: Authorized Gmail API service instance. - user_id: User's email address. The special value "me" - - Returns: - dictionary mapping labels to labelids. - """ - try: - response = service.users().labels().list(userId=user_id).execute() - labels = response["labels"] - return {label["name"]: label["id"] for label in labels} - except errors.HttpError as error: - print(json.loads(error.content.decode("utf-8"))["error"]["message"]) - - -class GmailMessage(EmailMessage): - _service = None - _labels = None - - def __init__(self): - super().__init__() - if GmailMessage._service is None: - GmailMessage._service = get_gmail_service() - if GmailMessage._labels is None: - GmailMessage._labels = labels_dict(self._service, "me") - - def msgdict(self): - return {"raw": base64.urlsafe_b64encode(self.as_bytes()).decode()} - - def send(self): - try: - message = ( - self._service.users() - .messages() - .send(userId="me", body=self.msgdict()) - .execute() - ) - print("Message Id: %s" % message["id"]) - except errors.HttpError as error: - print("An error occurred: %s" % error) - - @classmethod - def list_msg_ids(cls, label, start_history_id=None): - if start_history_id is not None: - return ListHistory( - cls._service, - "me", - label_id=cls._labels[label], - start_history_id=start_history_id, - ) - else: - return ListMessagesWithLabels( - cls._service, "me", label_ids=[cls._labels[label]] - ) - - @classmethod - def from_id(cls, msg_id, user_id="me"): - try: - message = ( - cls._service.users() - .messages() - .get(userId=user_id, id=msg_id, format="raw") - .execute() - ) - instance = email.message_from_bytes( - base64.urlsafe_b64decode(message["raw"]), - policy=email.policy.EmailPolicy(), - ) - instance.history_id = message["historyId"] - return instance - except errors.HttpError as error: - print(json.loads(error.content.decode("utf-8"))["error"]["message"]) - - -def main(): - """Shows basic usage of the Gmail API. - - Creates a Gmail API service object and outputs a list of label names - of the user's Gmail account. - """ - message = GmailMessage() - message.set_content("Hello everyone!") - message["To"] = "guillaume.horel@gmail.com" - message["Subject"] = "pomme" - message.send() - - -if __name__ == "__main__": - message = main() |
