Get an access token by OAuth authentication

I wrote a script to break through OAuth authentication in Python and get an access token. I would like to add explanations later.

I referred to https://qiita.com/kai_kou/items/d03abd6012f32071c1aa.

oauth_authenticator


from http.server import HTTPServer
import ssl

from webbrowser import open_new
import random
import string
import urllib.parse

from access_token_request_handler import AccessTokenRequestHandler

class OAuthAuthenticator:

    def __init__(self, client_credential, client_info, authorize_url):
        #Credential reading
        self._client_credential = client_credential
        self._client_info = client_info
        self._authorize_url = authorize_url
        self._authorization_result = None
        self._app_uri = 'https://%s:%s' % (client_info['host'], client_info['port'])
        #Certificate reading

    def get_access_token(self):
        token = None

        params = {
            'client_id': self._client_credential['id'],
            'grant_type': 'authorization_code',
            'redirect_uri': self._app_uri,
            'response_type': 'code',
            'state': self.__randomname(40)
        }
        access_url = urllib.parse.urljoin(self._authorize_url, 'authorize')
        access_url = '?'.join([access_url, urllib.parse.urlencode(params)])

        #Authorization code request
        self.__request_authorization_code(access_url)

        #Token request
        handler = lambda request, address, server: AccessTokenRequestHandler(
            request, address, server, self._client_credential, self._app_uri, self._authorize_url
        )
        with HTTPServer((self._client_info['host'], self._client_info['port']), handler) as server:
            print('Server Starts - %s:%s' % (self._client_info['host'], self._client_info['port']))
            server.socket = self.__wrap_socket_ssl(server.socket)

            try:
                while token is None:
                    server.result = None
                    server.handle_request()
                    token = server.result
            except KeyboardInterrupt:
                pass

        print('Server Stops - %s:%s' % (self._client_info['host'], self._client_info['port']))

    def result(self):
        return self._authorization_result

    def __request_authorization_code(self, access_url):
        open_new(access_url)

    def __wrap_socket_ssl(self, socket):
        context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
        context.load_cert_chain('./ssl_test.crt', keyfile='./ssl_test.key')
        context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
        return context.wrap_socket(socket)

    def __randomname(self, n):
        randlst = [random.choice(string.ascii_letters + string.digits) for i in range(n)]
        return ''.join(randlst)

access_token_request_handler.py


from http.server import BaseHTTPRequestHandler
import urllib.parse
import requests

class AccessTokenRequestHandler(BaseHTTPRequestHandler):
    def __init__(self, request, address, server, client_credential, client_url, authorize_url):
        self._client_credential = client_credential
        self._authorize_url = authorize_url
        self._client_url = client_url
        super().__init__(request, address, server)

    def do_GET(self):
        self.__responde_200()

        #Get access token
        if 'code' in self.path:
            response = self.__request_access_token()
            self.server.result = response
            self.__write_response_message(response)
            return
            
        self.wfile.write(bytes('failed to get authorization code.', 'utf-8'))
        return

    def __write_response_message(self, response):
        print('status code:', response.status_code)
        self.__wfwrite('Status Code: %s' % response.status_code)
        print('response:', response.reason)
        self.__wfwrite('Response: %s' % response.reason)
        print(response.json())
        if response.ok:
            self.__wfwrite('<br>'.join(['%s: %s' % val for val in response.json().items()]))
        else:
            self.__wfwrite(response.json()['errors'][0]['message'])
    
    def __wfwrite(self, string):
        self.wfile.write(bytes('<p>%s</p>' % string, 'utf-8'))

    def __responde_200(self):
        self.send_response(200)
        self.end_headers()

    def __request_access_token(self):
        params = self.__params_from_path()

        access_url = urllib.parse.urljoin(self._authorize_url, 'token')
        post_params = {
            'client_id': self._client_credential['id'],
            'client_secret': self._client_credential['secret'],
            'grant_type': 'authorization_code',
            'redirect_uri': self._client_url,
            'code': params['code']
        }
        # The redirect URI is missing or do not match
        # Code doesn't exist or is invalid for the client
        response = requests.post(access_url, data=post_params, verify=False) # WARN: verify=False
        return response

    def __params_from_path(self):
        query = urllib.parse.urlparse(self.path).query
        params = urllib.parse.parse_qs(query)
        return params

main.py


from oauth_authenticator import OAuthAuthenticator

AUTHORIZATION_URL = 'https://0.0.0.0/oauth/v2/'
CLIENT_ID = '1_5j8ecbsu9cowo4wk8kwwcc8k8wc08c8o4sgo4s084cg880ggo0'
CLIENT_SECRET = '172h8p6mevy8w8cggc44gw4w4ookk4ockg440osggkw808c00g'
APP_URI = 'https://0.0.0.0:8888'
APP_HOST = '0.0.0.0'
APP_PORT = 8888

if __name__ == '__main__':
    client_info = (APP_HOST, APP_PORT)
    authenticator = OAuthAuthenticator(
        {'id': CLIENT_ID, 'secret': CLIENT_SECRET},
        {'host': '0.0.0.0', 'port': 8888},
        AUTHORIZATION_URL
    )
    authenticator.get_access_token()
    print('result', authenticator.result())

I used it as a reference

Recommended Posts

Get an access token by OAuth authentication
Get an access token for the Pocket API
Bitbucket OAuth: Until access token acquisition
I tried to get an image by scraping
Create a filter to get an Access Token in the Graph API (Flask)
Get an Access Token for your service account with the Firebase Admin Python SDK
Creating an authentication function
Let's make a websocket client with Python. (Access token authentication)
Quit Python's Simple HTTP Server triggered by an access log