Incorporate JWT authentication into Python's Flask Web API

Introduction

I tried to incorporate JWT (JSON Web Token) authentication into the Flask Web API created in Python. Reference: Creation of Web API using Python + Flask + MongoDB and placement on Azure VM + Nginx (collecting hololive video distribution schedule 3)

What is JWT (JSON Web Token)?

JWT is an abbreviation for JSON Web Token, which is a token specification for exchanging request information (Claim) as a JSON object. The specification is defined in RFC7519 and is used for authorization during communication between two parties.

This time, based on the user name and password specified in the request to the Web API, I generated a JWT token and returned it as a response, and used that JWT token to make it possible to use the functions of the Web API. ..

environment

Preparation of user information

In order to determine whether the Web API functions can be used based on the user name and password, add the users collection to the existing MongoDB and register the user information.

> mongo localhost:27017/admin -u admin -p
> use holoduledb
switched to db holoduledb
> show collections
holodules
> db.createCollection("users");
{ "ok" : 1 }
> db.users.save( {"id":"1", "username":"user01", "password":"password01"} );
WriteResult({ "nInserted" : 1 })
> db.users.find();
{ "_id" : ObjectId("5fe1aca6d53eaa62c5f8c75b"), "id" : "1", "username" : "user01", "password" : "password01" }

Creating a user class

Create a class to store the user information obtained from the MongoDB users collection. Since I don't use Object-Document-Mapper, I also created a method to convert between JSON and object.

class User:
    def __init__(self, id, username, password):
        self.__id = id
        self.__username = username
        self.__password = password

    # id
    @property
    def id(self):
        return self.__id

    @id.setter
    def id(self, id):
        self.__id = id

    # username
    @property
    def username(self):
        return self.__username

    @username.setter
    def username(self, username):
        self.__username = username

    # password
    @property
    def password(self):
        return self.__password

    @password.setter
    def password(self, password):
        self.__password = password

    #Convert from document
    @classmethod
    def from_doc(cls, doc):
        if doc is None:
            return None
        user = User(doc['id'], 
                    doc['username'], 
                    doc['password'])
        return user

    #Convert to document
    def to_doc(self):
        doc = { 'id': str(self.id),
                'username': str(self.username),
                'password' : str(self.password) }
        return doc

Adding a package that uses JWT authentication in Flask

The existing Web API uses Flask as a framework, so add the Flask-JWT package to use JWT with Flask.

> poetry add Flask-JWT

Built-in JWT authentication process

Incorporate JWT authentication processing into the existing Web API app.py.

Add import of created User class and Flask-JWT package

from flask_jwt import jwt_required, current_identity, JWT
from models.user import User

Added a function that authenticates based on username and password and returns a user object (identity)

def authoricate(username, password):
    user = User.from_doc(db.users.find_one({"username": username}))
    authenticated = True if user is not None and user.password == password else False
    return user if authenticated else None

Added a function that returns a user object (identity) based on the user ID

def identity(payload):
    # @jwt.jwt_payload_Customize the JWT payload with handler and use identity as username
    username = payload['identity']
    user = User.from_doc(db.users.find_one({"username": username}))
    return user

Added JWT authentication initialization process

The private key JWT_SECRET_KEY is set from the configuration file.

# Flask
app = Flask(__name__)
#Suppress JSON sorting
app.config['JSON_SORT_KEYS'] = False
# Flask JWT
app.config['JWT_SECRET_KEY'] = settings.jwt_secret_key      #Private key when signing the JWT
app.config['JWT_ALGORITHM'] = 'HS256'                       #Cryptographic signature algorithm
app.config['JWT_LEEWAY'] = 0                                #Margin time for expiration date
app.config['JWT_EXPIRATION_DELTA'] = timedelta(seconds=300) #Token validity period
app.config['JWT_NOT_BEFORE_DELTA'] = timedelta(seconds=0)   #Relative time to start using the token
app.config['JWT_AUTH_URL_RULE'] = '/auth'                   #Authentication endpoint URL
jwt = JWT(app, authoricate, identity)                       #Specify the above two functions here

Added function to customize JWT payload

@jwt.jwt_payload_handler
def make_payload(identity):
    iat = datetime.utcnow()
    exp = iat + current_app.config.get('JWT_EXPIRATION_DELTA') 
    nbf = iat + current_app.config.get('JWT_NOT_BEFORE_DELTA')
    identity = getattr(identity, 'username')
    return {'exp': exp, 'iat': iat, 'nbf': nbf, 'identity': identity}

Specify a decorator for a Web API method that requires authentication

@log(logger)
@app.route('/holodules/<string:date>', methods=['GET'])
@jwt_required()                                   #Specify decorator
def get_Holodules(date):
    logger.info(f"holodules/{date}")
    if len(date) != 8:
        abort(500)
    ...

JWT authentication processing operation check

Launch Web API

> poetry run flask run

 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
[INFO    ]_log -  * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)

Get a JWT token using the curl command

Make a request to/auth of the authentication endpoint URL with the username and password to verify that you can get the JWT token.

#JSON escape required for Powershell
> curl "http://127.0.0.1:5000/auth" -X POST -H "Content-Type: application/json" -d '"{ \"username\": \"user01\", \"password\": \"password01\" }"'

{"access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."}
#As usual with WSL
$ curl "http://127.0.0.1:5000/auth" -X POST -H "Content-Type: application/json" -d '{ "username": "user01", "password": "password01" }'

{"access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."}

Using Postman, it looks like this. img01.jpg

Web API method invocation using curl command

Specify the obtained JWT token and call the method of Web API to confirm that the response can be obtained.

# Powershell
> curl "http://localhost:5000/holodules/20201209" -H "Authorization: JWT eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."

{"result":17,"holodules":[{"key":"HL0503_20201209_230000","video_id":"yEttl2nfhsQ","datetime":"20201209 230000","name":"\u5c3e\u4e38\u30dd\u30eb\u30ab","title":"\u30a2\u30ab\u30da\u30e9\u3067...
# WSL
$ curl "http://localhost:5000/Holodules/20201209" -H "Authorization: JWT eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."

{"result":17,"holodules":[{"key":"HL0503_20201209_230000","video_id":"yEttl2nfhsQ","datetime":"20201209 230000","name":"\u5c3e\u4e38\u30dd\u30eb\u30ab","title":"\u30a2\u30ab\u30da\u30e9\u3067...

Using Postman, it looks like this. img02.jpg

If you do not specify the JWT token, an authentication error will occur.

$ curl "http://localhost:5000/Holodules/20201209"

{"status_code":401,"error":"Authorization Required","description":"Request does not contain an access token"}

This is the case if the JWT token has expired.

$ curl "http://localhost:5000/Holodules/20201209" -H "Authorization: JWT eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."

{"status_code": 401,"error": "Invalid token","description": "Signature has expired"}

Source code for app.py with JWT authentication

Since the last time, we have modified not only JWT authentication but also logging.

import json
from flask import Flask, jsonify, request, abort, make_response, current_app
from flask_jwt import jwt_required, current_identity, JWT
from pymongo import MongoClient
from os.path import join, dirname
from urllib.parse import quote_plus
from datetime import timedelta, datetime
from models.holodule import Holodule
from models.user import User
from settings import Settings
from logger import log, get_logger

#Logging settings
json_path = join(dirname(__file__), "config/logger.json")
log_dir = join(dirname(__file__), "log")
logger = get_logger(log_dir, json_path, False)

#Settings instance
settings = Settings(join(dirname(__file__), '.env'))

#MongoDB connection information
mongodb_user = quote_plus(settings.mongodb_user)
mongodb_password = quote_plus(settings.mongodb_password)
mongodb_host = "mongodb://%s/" % (settings.mongodb_host)

#MongoDB connection authentication
client = MongoClient(mongodb_host)
db = client.holoduledb
db.authenticate(name=mongodb_user,password=mongodb_password)

#Callback function that validates credentials using username and password
def authoricate(username, password):
    user = User.from_doc(db.users.find_one({"username": username}))
    authenticated = True if user is not None and user.password == password else False
    return user if authenticated else None

#Callback function to get user information based on JWT payload
def identity(payload):
    # @jwt.jwt_payload_Customize the JWT payload with handler and use identity as username
    username = payload['identity']
    user = User.from_doc(db.users.find_one({"username": username}))
    return user

# Flask
app = Flask(__name__)
#Suppress JSON sorting
app.config['JSON_SORT_KEYS'] = False
# Flask JWT
app.config['JWT_SECRET_KEY'] = settings.jwt_secret_key      #Private key when signing the JWT
app.config['JWT_ALGORITHM'] = 'HS256'                       #Cryptographic signature algorithm
app.config['JWT_LEEWAY'] = 0                                #Margin time for expiration date
app.config['JWT_EXPIRATION_DELTA'] = timedelta(seconds=300) #Token validity period
app.config['JWT_NOT_BEFORE_DELTA'] = timedelta(seconds=0)   #Relative time to start using the token
app.config['JWT_AUTH_URL_RULE'] = '/auth'                   #Authentication endpoint URL
jwt = JWT(app, authoricate, identity)                       #Specify the above two functions here

#Customize JWT payload
@jwt.jwt_payload_handler
def make_payload(identity):
    iat = datetime.utcnow()
    exp = iat + current_app.config.get('JWT_EXPIRATION_DELTA') 
    nbf = iat + current_app.config.get('JWT_NOT_BEFORE_DELTA')
    identity = getattr(identity, 'username')
    return {'exp': exp, 'iat': iat, 'nbf': nbf, 'identity': identity}

#Acquisition of Holojour delivery schedule
@log(logger)
@app.route('/holodules/<string:date>', methods=['GET'])
@jwt_required()
def get_Holodules(date):
    logger.info(f"holodules/{date}")
    if len(date) != 8:
        abort(500)

    #Get the holojour delivery schedule from MongoDB based on the date and store it in the list
    holodule_list = []
    for doc in db.holodules.find({"datetime": {'$regex':'^'+date}}).sort("datetime", -1):
        holodule = Holodule.from_doc(doc)
        holodule_list.append(holodule)

    if len(holodule_list) == 0:
        abort(404)

    #Build a dictionary based on the object and return it as JSON
    holodules = []
    for holodule in holodule_list:
        doc = holodule.to_doc()
        holodules.append(doc)
    result = {
        "result":len(holodule_list),
        "holodules":holodules
    }
    # UTF-8 code, Content-Type is application/json
    return make_response(jsonify(result))
    # UTF-8 characters, Content-Type is text/html; charset=utf-8
    # return make_response(json.dumps(result, ensure_ascii=False))

#Error handler: 404
@log(logger)
@app.errorhandler(404)
def not_found(error):
    logger.error(f"{error}")
    return make_response(jsonify({'error': 'Not found'}), 404)

#Error handler: 500
@log(logger)
@app.errorhandler(500)
def internal_server_error(error):
    logger.error(f"{error}")
    return make_response(jsonify({'error': 'Internal Server Error'}), 500)

if __name__ == "__main__":
    app.run()

in conclusion

We have incorporated a minimum mechanism that uses JWT (JSON Web Token) authentication. I was able to realize the SSL communication and JWT authentication that I wanted to do, but I feel that I have forcibly combined various mechanisms, so I would like to review the whole.

This is the end of the server-side functions, and we will modify them while starting the development of the client-side (Android application) that uses the Web API.

Recommended Posts

Incorporate JWT authentication into Python's Flask Web API
Try slack OAuth authentication with flask (Slack API V2)
Incorporate Sheltie Judgment AI into your web app
Flask Basic authentication