As you know, Gmail has Gmail API, which allows you to take advantage of Gmail's unique features such as search function without using SMTP or POP3. In this article, we'll walk you step-by-step through the steps from enabling the Gmail API to creating and running scripts using Python.
Before using the Gmail API, creating a project, enabling the API, setting the scope, creating authentication information, etc. are relatively difficult to prepare, so I wrote it as a memorandum.
Open the Google Cloud Platform console (https://console.cloud.google.com/) in the browser that opened Gmail for your Gmail account.
The following screen will appear. Agree to the terms and conditions, and click "Agree and execute".
Select "New Project" from "Select Project"
Set the project name and click "Create"
If you can create it, the following screen will be displayed
Click "Library"
Search for "Gmail API" from "Search APIs and Services"
Click the displayed "Gmail API"
Click "Enable"
Click "OAuth consent screen" from "API and services"
Select "External" and click "Create"
Set an appropriate name for "Application Name" and click "Add Scope"
Select the scope as shown below and click "Add"
Click "Save"
Select "Credentials", click "Create Credentials", and click "Select with Wizard" from the menu that appears.
On the "Add Credentials to Project" screen, set as shown below and click "Required Credentials".
Click "Create OAuth Client ID"
Download the credentials (client_id.json) and click "Finish"
When you press the Finish button, the screen will change to the one shown below.
The downloaded credentials file client_id.json will be used later in the Python script.
You can create an API key, OAuth2.0 client ID, or service account key with your credentials. When dealing with user data, it seems common to use an OAuth2.0 client ID. The Google Cloud Document Authentication Overview (https://cloud.google.com/docs/authentication?hl=ja) page describes the uses of each authentication method.
The expiration date of the credential Refresh Token is described below. Once authenticated and the Refresh Token is issued, it does not seem to expire unless it is left for 6 months or 50 or more tokens are issued. It seems that automation mechanisms such as RPA are also acceptable. https://developers.google.com/identity/protocols/OAuth2#expiration
You must write your code to anticipate the possibility that a granted refresh token might no longer work. A refresh token might stop working for one of these reasons:
* The user has revoked your app's access.
* The refresh token has not been used for six months.
* The user changed passwords and the refresh token contains Gmail scopes.
* The user account has exceeded a maximum number of granted (live) refresh tokens.
* There is currently a limit of 50 refresh tokens per user account per client. If the limit is reached, creating a new refresh token automatically invalidates the oldest refresh token without warning. This limit does not apply to service accounts.
Now that we're ready, we'll have a Python script to run.
(I think many people have already lived in this part)
Install according to here. Please install 3.7.x.
Follow the steps here (http://pipenv-ja.readthedocs.io/ja/translate-ja/install.html#installing-pipenv) to install.
Prepare a suitable directory and prepare a python script. Save the client_id.json saved in the previous step in the same directory. Joined the Gmail API samples: sweat_drops:
You can find sample code in the Gmail API reference. It's easy to use when you come here!
gmail_credential.py
import os
import pickle
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
#Set scope for Gmail API
SCOPES = [
"https://www.googleapis.com/auth/gmail.compose",
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/gmail.labels",
"https://www.googleapis.com/auth/gmail.modify",
]
def get_credential():
"""
Obtaining an access token
Save the token in pickle format in the current directory so that it can be reused. (Sorry for the miscellaneous ...)
"""
creds = None
if os.path.exists("token.pickle"):
with open("token.pickle", "rb") as token:
creds = pickle.load(token)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file("client_id.json", SCOPES)
# creds = flow.run_local_server()
creds = flow.run_console()
with open("token.pickle", "wb") as token:
pickle.dump(creds, token)
return creds
listmail.py
"""
list GMail Inbox.
Usage:
listmail.py <query> <tag> <count>
listmail.py -h | --help
listmail.py --version
Options:
-h --help Show this screen.
--version Show version.
"""
import pickle
import base64
import json
import io
import csv
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import base64
from email.mime.text import MIMEText
from apiclient import errors
import logging
from docopt import docopt
from gmail_credential import get_credential
logger = logging.getLogger(__name__)
def list_labels(service, user_id):
"""
Get a list of labels
"""
labels = []
response = service.users().labels().list(userId=user_id).execute()
return response["labels"]
def decode_base64url_data(data):
"""
Base64url decoding
"""
decoded_bytes = base64.urlsafe_b64decode(data)
decoded_message = decoded_bytes.decode("UTF-8")
return decoded_message
def list_message(service, user_id, query, label_ids=[], count=3):
"""
Get a list of emails
Parameters
----------
service : googleapiclient.discovery.Resource
Resources for communicating with Gmail
user_id : str
User ID
query : str
Email query string. is:unread etc.
label_ids : list
List of IDs indicating the label to be searched
count : str
Maximum number of email information to be returned
Returns
----------
messages : list
id, body, subject,List of dictionary data with keys such as from
"""
messages = []
try:
message_ids = (
service.users()
.messages()
.list(userId=user_id, maxResults=count, q=query, labelIds=label_ids)
.execute()
)
if message_ids["resultSizeEstimate"] == 0:
logger.warning("no result data!")
return []
#Check the content of the message based on the message id
for message_id in message_ids["messages"]:
message_detail = (
service.users()
.messages()
.get(userId="me", id=message_id["id"])
.execute()
)
message = {}
message["id"] = message_id["id"]
message["body"] = decode_base64url_data(
message_detail["payload"]["body"]["data"]
)
# payload.headers[name: "Subject"]
message["subject"] = [
header["value"]
for header in message_detail["payload"]["headers"]
if header["name"] == "Subject"
][0]
# payload.headers[name: "From"]
message["from"] = [
header["value"]
for header in message_detail["payload"]["headers"]
if header["name"] == "From"
][0]
logger.info(message_detail["snippet"])
messages.append(message)
return messages
except errors.HttpError as error:
print("An error occurred: %s" % error)
def remove_labels(service, user_id, messages, remove_labels):
"""
Delete the label. Used to mark as read(is:If you remove the unread label, it becomes read)
"""
message_ids = [message["id"] for message in messages]
labels_mod = {
"ids": message_ids,
"removeLabelIds": remove_labels,
"addLabelIds": [],
}
# import pdb;pdb.set_trace()
try:
message_ids = (
service.users()
.messages()
.batchModify(userId=user_id, body=labels_mod)
.execute()
)
except errors.HttpError as error:
print("An error occurred: %s" % error)
#Main processing
def main(query="is:unread", tag="daily_report", count=3):
creds = get_credential()
service = build("gmail", "v1", credentials=creds, cache_discovery=False)
#Label list
labels = list_labels(service, "me")
target_label_ids = [label["id"] for label in labels if label["name"] == tag]
#Email list[{'body': 'xxx', 'subject': 'xxx', 'from': 'xxx'},]
messages = list_message(service, "me", query, target_label_ids, count=count)
# unread label
unread_label_ids = [label["id"] for label in labels if label["name"] == "UNREAD"]
# remove labels form messages
remove_labels(service, "me", messages, remove_labels=unread_label_ids)
logger.info(json.dumps(messages, ensure_ascii=False))
if messages:
return json.dumps(messages, ensure_ascii=False)
else:
return None
#Program execution part
if __name__ == "__main__":
arguments = docopt(__doc__, version="0.1")
query = arguments["<query>"]
tag = arguments["<tag>"]
count = arguments["<count>"]
logging.basicConfig(level=logging.DEBUG)
messages_ = main(query=query, tag=tag, count=count)
print(messages_)
The send script looks like this.
`return {'raw': base64. It was urlsafe_b64encode (message.as_string ())}
`, but it didn't work, so I've fixed it.sendmail.py
"""
Send E-Mail with GMail.
Usage:
sendmail.py <sender> <to> <subject> <message_text_file_path> [--attach_file_path=<file_path>] [--cc=<cc>]
sendmail.py -h | --help
sendmail.py --version
Options:
-h --help Show this screen.
--version Show version.
--attach_file_path=<file_path> Path of file attached to message.
--cc=<cc> cc email address list(separated by ','). Default None.
"""
import pickle
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import base64
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.mime.audio import MIMEAudio
from pathlib import Path
from email.mime.multipart import MIMEMultipart
import mimetypes
from apiclient import errors
from gmail_credential import get_credential
from docopt import docopt
import logging
logger = logging.getLogger(__name__)
def create_message(sender, to, subject, message_text, cc=None):
"""
Base64 encode MIMEText
"""
enc = "utf-8"
message = MIMEText(message_text.encode(enc), _charset=enc)
message["to"] = to
message["from"] = sender
message["subject"] = subject
if cc:
message["Cc"] = cc
encode_message = base64.urlsafe_b64encode(message.as_bytes())
return {"raw": encode_message.decode()}
def create_message_with_attachment(
sender, to, subject, message_text, file_path, cc=None
):
"""
Base64 encode MIMEText with attachments
"""
message = MIMEMultipart()
message["to"] = to
message["from"] = sender
message["subject"] = subject
if cc:
message["Cc"] = cc
# attach message text
enc = "utf-8"
msg = MIMEText(message_text.encode(enc), _charset=enc)
message.attach(msg)
content_type, encoding = mimetypes.guess_type(file_path)
if content_type is None or encoding is not None:
content_type = "application/octet-stream"
main_type, sub_type = content_type.split("/", 1)
if main_type == "text":
with open(file_path, "rb") as fp:
msg = MIMEText(fp.read(), _subtype=sub_type)
elif main_type == "image":
with open(file_path, "rb") as fp:
msg = MIMEImage(fp.read(), _subtype=sub_type)
elif main_type == "audio":
with open(file_path, "rb") as fp:
msg = MIMEAudio(fp.read(), _subtype=sub_type)
else:
with open(file_path, "rb") as fp:
msg = MIMEBase(main_type, sub_type)
msg.set_payload(fp.read())
p = Path(file_path)
msg.add_header("Content-Disposition", "attachment", filename=p.name)
message.attach(msg)
encode_message = base64.urlsafe_b64encode(message.as_bytes())
return {"raw": encode_message.decode()}
def send_message(service, user_id, message):
"""
send an email
Parameters
----------
service : googleapiclient.discovery.Resource
Resources for communicating with Gmail
user_id : str
User ID
message : dict
"raw"Key,A dict with a base64-encoded MIME Object as value
Returns
----------
None
"""
try:
sent_message = (
service.users().messages().send(userId=user_id, body=message).execute()
)
logger.info("Message Id: %s" % sent_message["id"])
return None
except errors.HttpError as error:
logger.info("An error occurred: %s" % error)
raise error
#Main processing
def main(sender, to, subject, message_text, attach_file_path, cc=None):
#Obtaining access tokens and building services
creds = get_credential()
service = build("gmail", "v1", credentials=creds, cache_discovery=False)
if attach_file_path:
#Creating email body
message = create_message_with_attachment(
sender, to, subject, message_text, attach_file_path, cc=cc
)
else:
message = create_message(
sender, to, subject, message_text, cc=cc
)
#send e-mail
send_message(service, "me", message)
#Program execution part
if __name__ == "__main__":
arguments = docopt(__doc__, version="0.1")
sender = arguments["<sender>"]
to = arguments["<to>"]
cc = arguments["--cc"]
subject = arguments["<subject>"]
message_text_file_path = arguments["<message_text_file_path>"]
attach_file_path = arguments["--attach_file_path"]
logging.basicConfig(level=logging.DEBUG)
with open(message_text_file_path, "r", encoding="utf-8") as fp:
message_text = fp.read()
main(
sender=sender,
to=to,
subject=subject,
message_text=message_text,
attach_file_path=attach_file_path,
cc=cc,
)
Go to the directory where you created the python script and install the required modules.
% pipenv install google-api-python-client oauth2client google-auth-httplib2 google-auth-oauthlib docopt
% pipenv run python listmail.py is:unread Request a quote 10
If you execute such as, the OAuth authentication screen URL will be displayed on the screen of cmd.exe at the first execution, so open it from the browser and approve it. From the second time onward, the OAuth authentication screen URL will not be displayed.
% pipenv run python listmail.py is:unread Request a quote 10
Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=AAAAAAAAAAAAAA
Enter the authorization code:
When you open the URL displayed on the above console, the following screen will be displayed.
Click "Details" and click the link "Go to XX (unsafe page)" that appears.
The permission grant dialog will be displayed several times, so allow each.
At the end, a confirmation screen will be displayed, so allow it.
Copy the code from the screen below and paste it after ```Enter the authorization code:` `` to run the script.
The Gmail API is interesting because you can use Gmail's powerful filters, labeling features and queries as-is.
Recommended Posts