I tried to incorporate JWT (JSON Web Token) authentication into the Flask Web API created in Python. Reference: Creation of Web API using Python + Flask + MongoDB and placement on Azure VM + Nginx (collecting hololive video distribution schedule 3)
JWT is an abbreviation for JSON Web Token, which is a token specification for exchanging request information (Claim) as a JSON object. The specification is defined in RFC7519 and is used for authorization during communication between two parties.
This time, based on the user name and password specified in the request to the Web API, I generated a JWT token and returned it as a response, and used that JWT token to make it possible to use the functions of the Web API. ..
Local environment
Cloud environment
Ubuntu VM Ubuntu 20.04 (LTS)
In order to determine whether the Web API functions can be used based on the user name and password, add the users collection to the existing MongoDB and register the user information.
> mongo localhost:27017/admin -u admin -p
> use holoduledb
switched to db holoduledb
> show collections
holodules
> db.createCollection("users");
{ "ok" : 1 }
> db.users.save( {"id":"1", "username":"user01", "password":"password01"} );
WriteResult({ "nInserted" : 1 })
> db.users.find();
{ "_id" : ObjectId("5fe1aca6d53eaa62c5f8c75b"), "id" : "1", "username" : "user01", "password" : "password01" }
Create a class to store the user information obtained from the MongoDB users collection. Since I don't use Object-Document-Mapper, I also created a method to convert between JSON and object.
class User:
def __init__(self, id, username, password):
self.__id = id
self.__username = username
self.__password = password
# id
@property
def id(self):
return self.__id
@id.setter
def id(self, id):
self.__id = id
# username
@property
def username(self):
return self.__username
@username.setter
def username(self, username):
self.__username = username
# password
@property
def password(self):
return self.__password
@password.setter
def password(self, password):
self.__password = password
#Convert from document
@classmethod
def from_doc(cls, doc):
if doc is None:
return None
user = User(doc['id'],
doc['username'],
doc['password'])
return user
#Convert to document
def to_doc(self):
doc = { 'id': str(self.id),
'username': str(self.username),
'password' : str(self.password) }
return doc
The existing Web API uses Flask as a framework, so add the Flask-JWT package to use JWT with Flask.
> poetry add Flask-JWT
Incorporate JWT authentication processing into the existing Web API app.py.
from flask_jwt import jwt_required, current_identity, JWT
from models.user import User
def authoricate(username, password):
user = User.from_doc(db.users.find_one({"username": username}))
authenticated = True if user is not None and user.password == password else False
return user if authenticated else None
def identity(payload):
# @jwt.jwt_payload_Customize the JWT payload with handler and use identity as username
username = payload['identity']
user = User.from_doc(db.users.find_one({"username": username}))
return user
The private key JWT_SECRET_KEY is set from the configuration file.
# Flask
app = Flask(__name__)
#Suppress JSON sorting
app.config['JSON_SORT_KEYS'] = False
# Flask JWT
app.config['JWT_SECRET_KEY'] = settings.jwt_secret_key #Private key when signing the JWT
app.config['JWT_ALGORITHM'] = 'HS256' #Cryptographic signature algorithm
app.config['JWT_LEEWAY'] = 0 #Margin time for expiration date
app.config['JWT_EXPIRATION_DELTA'] = timedelta(seconds=300) #Token validity period
app.config['JWT_NOT_BEFORE_DELTA'] = timedelta(seconds=0) #Relative time to start using the token
app.config['JWT_AUTH_URL_RULE'] = '/auth' #Authentication endpoint URL
jwt = JWT(app, authoricate, identity) #Specify the above two functions here
@jwt.jwt_payload_handler
def make_payload(identity):
iat = datetime.utcnow()
exp = iat + current_app.config.get('JWT_EXPIRATION_DELTA')
nbf = iat + current_app.config.get('JWT_NOT_BEFORE_DELTA')
identity = getattr(identity, 'username')
return {'exp': exp, 'iat': iat, 'nbf': nbf, 'identity': identity}
@log(logger)
@app.route('/holodules/<string:date>', methods=['GET'])
@jwt_required() #Specify decorator
def get_Holodules(date):
logger.info(f"holodules/{date}")
if len(date) != 8:
abort(500)
...
> poetry run flask run
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: off
[INFO ]_log - * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
Make a request to/auth of the authentication endpoint URL with the username and password to verify that you can get the JWT token.
#JSON escape required for Powershell
> curl "http://127.0.0.1:5000/auth" -X POST -H "Content-Type: application/json" -d '"{ \"username\": \"user01\", \"password\": \"password01\" }"'
{"access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."}
#As usual with WSL
$ curl "http://127.0.0.1:5000/auth" -X POST -H "Content-Type: application/json" -d '{ "username": "user01", "password": "password01" }'
{"access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."}
Using Postman, it looks like this.
Specify the obtained JWT token and call the method of Web API to confirm that the response can be obtained.
# Powershell
> curl "http://localhost:5000/holodules/20201209" -H "Authorization: JWT eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."
{"result":17,"holodules":[{"key":"HL0503_20201209_230000","video_id":"yEttl2nfhsQ","datetime":"20201209 230000","name":"\u5c3e\u4e38\u30dd\u30eb\u30ab","title":"\u30a2\u30ab\u30da\u30e9\u3067...
# WSL
$ curl "http://localhost:5000/Holodules/20201209" -H "Authorization: JWT eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."
{"result":17,"holodules":[{"key":"HL0503_20201209_230000","video_id":"yEttl2nfhsQ","datetime":"20201209 230000","name":"\u5c3e\u4e38\u30dd\u30eb\u30ab","title":"\u30a2\u30ab\u30da\u30e9\u3067...
Using Postman, it looks like this.
If you do not specify the JWT token, an authentication error will occur.
$ curl "http://localhost:5000/Holodules/20201209"
{"status_code":401,"error":"Authorization Required","description":"Request does not contain an access token"}
This is the case if the JWT token has expired.
$ curl "http://localhost:5000/Holodules/20201209" -H "Authorization: JWT eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."
{"status_code": 401,"error": "Invalid token","description": "Signature has expired"}
Since the last time, we have modified not only JWT authentication but also logging.
import json
from flask import Flask, jsonify, request, abort, make_response, current_app
from flask_jwt import jwt_required, current_identity, JWT
from pymongo import MongoClient
from os.path import join, dirname
from urllib.parse import quote_plus
from datetime import timedelta, datetime
from models.holodule import Holodule
from models.user import User
from settings import Settings
from logger import log, get_logger
#Logging settings
json_path = join(dirname(__file__), "config/logger.json")
log_dir = join(dirname(__file__), "log")
logger = get_logger(log_dir, json_path, False)
#Settings instance
settings = Settings(join(dirname(__file__), '.env'))
#MongoDB connection information
mongodb_user = quote_plus(settings.mongodb_user)
mongodb_password = quote_plus(settings.mongodb_password)
mongodb_host = "mongodb://%s/" % (settings.mongodb_host)
#MongoDB connection authentication
client = MongoClient(mongodb_host)
db = client.holoduledb
db.authenticate(name=mongodb_user,password=mongodb_password)
#Callback function that validates credentials using username and password
def authoricate(username, password):
user = User.from_doc(db.users.find_one({"username": username}))
authenticated = True if user is not None and user.password == password else False
return user if authenticated else None
#Callback function to get user information based on JWT payload
def identity(payload):
# @jwt.jwt_payload_Customize the JWT payload with handler and use identity as username
username = payload['identity']
user = User.from_doc(db.users.find_one({"username": username}))
return user
# Flask
app = Flask(__name__)
#Suppress JSON sorting
app.config['JSON_SORT_KEYS'] = False
# Flask JWT
app.config['JWT_SECRET_KEY'] = settings.jwt_secret_key #Private key when signing the JWT
app.config['JWT_ALGORITHM'] = 'HS256' #Cryptographic signature algorithm
app.config['JWT_LEEWAY'] = 0 #Margin time for expiration date
app.config['JWT_EXPIRATION_DELTA'] = timedelta(seconds=300) #Token validity period
app.config['JWT_NOT_BEFORE_DELTA'] = timedelta(seconds=0) #Relative time to start using the token
app.config['JWT_AUTH_URL_RULE'] = '/auth' #Authentication endpoint URL
jwt = JWT(app, authoricate, identity) #Specify the above two functions here
#Customize JWT payload
@jwt.jwt_payload_handler
def make_payload(identity):
iat = datetime.utcnow()
exp = iat + current_app.config.get('JWT_EXPIRATION_DELTA')
nbf = iat + current_app.config.get('JWT_NOT_BEFORE_DELTA')
identity = getattr(identity, 'username')
return {'exp': exp, 'iat': iat, 'nbf': nbf, 'identity': identity}
#Acquisition of Holojour delivery schedule
@log(logger)
@app.route('/holodules/<string:date>', methods=['GET'])
@jwt_required()
def get_Holodules(date):
logger.info(f"holodules/{date}")
if len(date) != 8:
abort(500)
#Get the holojour delivery schedule from MongoDB based on the date and store it in the list
holodule_list = []
for doc in db.holodules.find({"datetime": {'$regex':'^'+date}}).sort("datetime", -1):
holodule = Holodule.from_doc(doc)
holodule_list.append(holodule)
if len(holodule_list) == 0:
abort(404)
#Build a dictionary based on the object and return it as JSON
holodules = []
for holodule in holodule_list:
doc = holodule.to_doc()
holodules.append(doc)
result = {
"result":len(holodule_list),
"holodules":holodules
}
# UTF-8 code, Content-Type is application/json
return make_response(jsonify(result))
# UTF-8 characters, Content-Type is text/html; charset=utf-8
# return make_response(json.dumps(result, ensure_ascii=False))
#Error handler: 404
@log(logger)
@app.errorhandler(404)
def not_found(error):
logger.error(f"{error}")
return make_response(jsonify({'error': 'Not found'}), 404)
#Error handler: 500
@log(logger)
@app.errorhandler(500)
def internal_server_error(error):
logger.error(f"{error}")
return make_response(jsonify({'error': 'Internal Server Error'}), 500)
if __name__ == "__main__":
app.run()
We have incorporated a minimum mechanism that uses JWT (JSON Web Token) authentication. I was able to realize the SSL communication and JWT authentication that I wanted to do, but I feel that I have forcibly combined various mechanisms, so I would like to review the whole.
This is the end of the server-side functions, and we will modify them while starting the development of the client-side (Android application) that uses the Web API.