I made something that pulls messages from Gmail with push notifications using imap4lib.
First you have to enable IMAP connection on Gmail side. Please refer to the documentation in Gmail help. https://support.google.com/mail/answer/7126229?hl=ja
Let's connect when you're ready
import imaplib
def connect_with_server():
imap_server = 'imap.gmail.com'
imap_port = '993'
imap_user = 'Username' #gmail email address([email protected])
imap_pass = 'Password' #gmail login password
try:
imap = imaplib.IMAP4_SSL(imap_server,imap_port)
imap.login(imap_user,imap_pass)
except imaplib.IMAP4_SSL.error as e:
print(e)
return False
else:
return imap
You can now connect by calling connect_with_server ()
.
b'[ALERT] Please log in via your web browser: https://support.google.com/mail/accounts/answer/78754 (Failure)'
If you get the error, please refer to the following.
http://opendata.jp.net/?p=8498
The flow is briefly explained below.
DONE
will cause the server to exit ʻIDLE`.The timing when the server sends a message to the client after ʻIDLE is <b> when some update occurs in the mailbox </ b>. In other words, when new mail arrives or when another client issues a ʻEXPUNGE
command ... etc.
Also, the server will not respond to requests from clients until DONE
is sent.
Also, it may time out during Listen, so you will need to reconnect every 29 minutes.
The following is a quote from the first reference URL.
Example: (Omission)
C: A002 IDLE
S: + idling
...time passes; new mail arrives...
S: * 4 EXISTS
C: DONE
S: A002 OK IDLE terminated
...another client expunges message 2 now...
C: A003 FETCH 4 ALL
S: * 4 FETCH (...)
S: A003 OK FETCH completed
C: A004 IDLE
S: * 2 EXPUNGE
S: * 3 EXISTS
S: + idling
(The following is omitted)
For the time being
DONE
and close ʻIDLE`Isn't it okay?
You will actually receive the push.
imap = connect_with_server()
imap.select('inbox')
imap_idletag = imap._new_tag()
imap.send(b'%s IDLE\r\n'%(imap_idletag))
print('Waiting for a message...')
while True:
imap_line = imap.readline().strip().decode('utf-8');
if imap_line.startswith('* BYE ') or (len(imap_line) == 0):
print('Jumping out of a loop.')
flag = False
break
if imap_line.endswith('EXISTS'):
print('You got a message.')
imap.send(b'DONE\r\n')
imap_line = imap.readline().strip().decode('utf-8');
if imap_line.startswith('{} OK'.format(imap_idletag.decode('utf-8'))):
print('Terminating IDLE mode')
flag = True
else :
print('Failed to terminate')
flag = False
break
if flag == True:
#some codes...
else:
#some codes...
I prepared a function that decodes the body from the MIME header.
Decode only Content-Type: text / plain
and ignore the rest.
import email
import base64
import quopri
def get_info_from_header(headers):
mail_transfer_encoding = ''
mail_encoding = ''
for headinfo in headers:
if headinfo[0] == 'Content-Type':
charset = headinfo[1].split(';')[1]
if 'charset="' in charset:
mail_encoding = charset.split('charset="')[1]
mail_encoding = mail_encoding[:-1]
elif 'charset=' in charset:
mail_encoding = charset.split('charset=')[1]
else:
continue
elif headinfo[0] == 'Content-Transfer-Encoding':
mail_transfer_encoding = headinfo[1]
else:
continue
return mail_transfer_encoding , mail_encoding
def mail_to_txt(mail):
mail_transfer_encoding = ''
mail_encoding = 'ISO-2022-JP'
mail_body = ''
mail_data = email.message_from_string(mail)
if mail_data.is_multipart():
for payload in mail_data.get_payload():
if payload.get_content_type() == 'text/plain':
headers = payload._headers
mail_transfer_encoding,mail_encoding = get_info_from_header(headers)
mail_body = payload.get_payload()
else:
if mail_data.get_content_type() == 'text/plain':
mail_body = mail_data.get_payload()
headers = mail_data._headers
mail_transfer_encoding,mail_encoding = get_info_from_header(headers)
if mail_transfer_encoding == 'base64':
mail_body = base64.urlsafe_b64decode(mail_body.encode('ASCII'))
elif mail_transfer_encoding == 'quoted-printable':
mail_body = quopri.decodestring(mail_body.encode('ASCII'))
else:
mail_body = mail_body.encode('utf-8')
mail_body = mail_body.decode(mail_encoding)
return mail_body
After that, if you throw the data converted by utf-8 after getting it with Fetch to mail_to_txt ()
, you should probably spit out a readable body.
def main_routine():
text_list=[]
imap = connect_with_server()
imap.select('inbox')
imap_idletag = imap._new_tag()
imap.send(b'%s IDLE\r\n'%(imap_idletag))
print('Waiting for a message...')
while True:
imap_line = imap.readline().strip().decode('utf-8');
print(imap_line)
if imap_line.startswith('* BYE ') or (len(imap_line) == 0):
print('Jumping out of a loop.')
flag = False
break
if imap_line.endswith('EXISTS'):
print('You got a message.')
imap.send(b'DONE\r\n')
imap_line = imap.readline().strip().decode('utf-8');
if imap_line.startswith('{} OK'.format(imap_idletag.decode('utf-8'))):
print('Terminating IDLE mode')
flag = True
else :
print('Failed to terminate')
flag = False
break
if flag == True:
typ,data = imap.search(None,'UNSEEN')
if data[0] != '':
mail_ids = data[0].split()
for mail_id in mail_ids:
typ,data = imap.fetch(mail_id,'(RFC822)')
mail = data[0][1]
text_list.append(mail_to_txt(mail.decode('UTF-8')))
imap.store(mail_id,'+FLAGS','\\Seen')
else:
print('No unread e-mails')
print('Terminating FETCH')
else:
print('Jumped out')
imap.close()
imap.logout()
print('Logged out')
return text_list
if __name__ == '__main__':
while True:
mail_list = mail_routine()
for mail in mail_list:
print(mail)
The execution result is as follows.
Waiting for a message...
+ idling
* 12 EXISTS
You got a message.
Terminating IDLE mode
Terminating FETCH
Logged out
Test test
Hello world!!
Tesuto Tesuto
Recommended Posts