I wrote a script to break through OAuth authentication in Python and get an access token. I would like to add explanations later.
I referred to https://qiita.com/kai_kou/items/d03abd6012f32071c1aa.
oauth_authenticator
from http.server import HTTPServer
import ssl
from webbrowser import open_new
import random
import string
import urllib.parse
from access_token_request_handler import AccessTokenRequestHandler
class OAuthAuthenticator:
def __init__(self, client_credential, client_info, authorize_url):
#Credential reading
self._client_credential = client_credential
self._client_info = client_info
self._authorize_url = authorize_url
self._authorization_result = None
self._app_uri = 'https://%s:%s' % (client_info['host'], client_info['port'])
#Certificate reading
def get_access_token(self):
token = None
params = {
'client_id': self._client_credential['id'],
'grant_type': 'authorization_code',
'redirect_uri': self._app_uri,
'response_type': 'code',
'state': self.__randomname(40)
}
access_url = urllib.parse.urljoin(self._authorize_url, 'authorize')
access_url = '?'.join([access_url, urllib.parse.urlencode(params)])
#Authorization code request
self.__request_authorization_code(access_url)
#Token request
handler = lambda request, address, server: AccessTokenRequestHandler(
request, address, server, self._client_credential, self._app_uri, self._authorize_url
)
with HTTPServer((self._client_info['host'], self._client_info['port']), handler) as server:
print('Server Starts - %s:%s' % (self._client_info['host'], self._client_info['port']))
server.socket = self.__wrap_socket_ssl(server.socket)
try:
while token is None:
server.result = None
server.handle_request()
token = server.result
except KeyboardInterrupt:
pass
print('Server Stops - %s:%s' % (self._client_info['host'], self._client_info['port']))
def result(self):
return self._authorization_result
def __request_authorization_code(self, access_url):
open_new(access_url)
def __wrap_socket_ssl(self, socket):
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain('./ssl_test.crt', keyfile='./ssl_test.key')
context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
return context.wrap_socket(socket)
def __randomname(self, n):
randlst = [random.choice(string.ascii_letters + string.digits) for i in range(n)]
return ''.join(randlst)
access_token_request_handler.py
from http.server import BaseHTTPRequestHandler
import urllib.parse
import requests
class AccessTokenRequestHandler(BaseHTTPRequestHandler):
def __init__(self, request, address, server, client_credential, client_url, authorize_url):
self._client_credential = client_credential
self._authorize_url = authorize_url
self._client_url = client_url
super().__init__(request, address, server)
def do_GET(self):
self.__responde_200()
#Get access token
if 'code' in self.path:
response = self.__request_access_token()
self.server.result = response
self.__write_response_message(response)
return
self.wfile.write(bytes('failed to get authorization code.', 'utf-8'))
return
def __write_response_message(self, response):
print('status code:', response.status_code)
self.__wfwrite('Status Code: %s' % response.status_code)
print('response:', response.reason)
self.__wfwrite('Response: %s' % response.reason)
print(response.json())
if response.ok:
self.__wfwrite('<br>'.join(['%s: %s' % val for val in response.json().items()]))
else:
self.__wfwrite(response.json()['errors'][0]['message'])
def __wfwrite(self, string):
self.wfile.write(bytes('<p>%s</p>' % string, 'utf-8'))
def __responde_200(self):
self.send_response(200)
self.end_headers()
def __request_access_token(self):
params = self.__params_from_path()
access_url = urllib.parse.urljoin(self._authorize_url, 'token')
post_params = {
'client_id': self._client_credential['id'],
'client_secret': self._client_credential['secret'],
'grant_type': 'authorization_code',
'redirect_uri': self._client_url,
'code': params['code']
}
# The redirect URI is missing or do not match
# Code doesn't exist or is invalid for the client
response = requests.post(access_url, data=post_params, verify=False) # WARN: verify=False
return response
def __params_from_path(self):
query = urllib.parse.urlparse(self.path).query
params = urllib.parse.parse_qs(query)
return params
main.py
from oauth_authenticator import OAuthAuthenticator
AUTHORIZATION_URL = 'https://0.0.0.0/oauth/v2/'
CLIENT_ID = '1_5j8ecbsu9cowo4wk8kwwcc8k8wc08c8o4sgo4s084cg880ggo0'
CLIENT_SECRET = '172h8p6mevy8w8cggc44gw4w4ookk4ockg440osggkw808c00g'
APP_URI = 'https://0.0.0.0:8888'
APP_HOST = '0.0.0.0'
APP_PORT = 8888
if __name__ == '__main__':
client_info = (APP_HOST, APP_PORT)
authenticator = OAuthAuthenticator(
{'id': CLIENT_ID, 'secret': CLIENT_SECRET},
{'host': '0.0.0.0', 'port': 8888},
AUTHORIZATION_URL
)
authenticator.get_access_token()
print('result', authenticator.result())
I used it as a reference
Recommended Posts