Secure FastAPI Application with Oauth2 JWT
In this tutorial, we will learn how to sign up, login for token, and secure FastAPI application with Oauth2 JWT. We will use MongoDB database to persist users data.
FastAPI provides several tools for implementing security easily without a big amount of effort and code.
FastAPI is based on OpenAPI. OpenAPI was previously known as Swagger. Swagger was orginally designed to generate interactive documentation interfaces.
FastAPI defines several security schemes. Some are as follows:
- apiKey: This is an application specific key which can come from a header, cookie or a query parameter.
- http: A standard http authentication system that includes the following:
- bearer : A header Authorization value with a token. It is inherited from OAuth2.
- HTTP basic authentication
- HTTP Digest
- Oauth2: Includes all the OAuth2 process of handling security flows:
- The implicit, clientCredentials, and authorizationCode flows are appropriate for creating authentication providers like Google, Facebook, etc.
- The password flow can be used for handling authentication in the same application directly.
- openIdConnect: This provides a way to discover Oauth2 authentication data automatically. The automatic discovery is defined in the OpenID connect specification.
Follow the steps below to complete this tutorial:
- Create a Python project with the following file structure:
- Install the following dependencies for FastAPI, Uvicorn, passlib, python-jose[cryptography], python-multipart, mongoengine:
- Create src/models.py file and add User class for saving user data in MongoDB:
- Create user related APIs in the src/user/main.py file:
- On the app/main.py file add the code as shown in the example below:
- Run your application and open http://127.0.0.1:8005/docs on your web browser.
- Sign up a user with /users/signup POST API and use /users/token POST API to retrieve access token. Use this access token to access /users/detail GET API which is protected.
├── app
│ │── env
│ ├── __init__.py
│ ├── main.py
│ └── src
│ │ ├── __init__.py
│ │ ├── user
│ │ │ ├── __init__.py
│ │ │ └── main.py
The user folder has a main.py file for user APIs.
There is one __init__.py file in every directory or subdirectory. This presence of __init__.py files allows importing code from one file into another.
pip install fastapi
pip install uvicorn
pip install passlib
pip install python-jose[cryptography]
pip install python-multipart
pip install mongoengine
from mongoengine import Document, StringField, DateTimeField, IntField, BooleanField
import datetime
class User(Document):
_id = IntField()
username = StringField(max_length=250, required=True)
password = StringField(max_length=250, required=True)
disabled = BooleanField(default=False)
date_created = DateTimeField(default=datetime.datetime.utcnow)
date_modified = DateTimeField(default=datetime.datetime.utcnow)
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel, Field
from typing import Optional
from src.models import User
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
import json
from jose import jwt, JWTError
router = APIRouter(
prefix="/users",
tags=["users"],
responses={404: {"description": "Not found"}}
)
class NewUser(BaseModel):
username: str
password: str
crypt_context = CryptContext(schemes=["sha256_crypt", "md5_crypt"])
def get_password_hash(password):
return crypt_context.hash(password)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@router.post("/signup")
async def sign_up(newUser: NewUser):
user = User(username=newUser.username,
password=get_password_hash(newUser.password))
user.save()
return {"message": "Created user successfully!"}
# run the following on terminal to generate a secret key
# openssl rand -hex 32
SECRET_KEY = "3e8a3f31aab886f8793176988f8298c9265f84b8388c9fef93635b08951f379b"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_password(plain_password, hashed_password):
return crypt_context.verify(plain_password, hashed_password)
def authenticate(username, password):
try:
user = get_user(username)
password_check = verify_password(password, user['password'])
return password_check
except User.DoesNotExist:
return False
class Token(BaseModel):
access_token: str
token_type: str
@router.post("/token" , response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
username = form_data.username
password = form_data.password
if authenticate(username, password):
access_token = create_access_token(
data={"sub": username}, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
return {"access_token": access_token, "token_type": "bearer"}
else:
raise HTTPException(
status_code=400, detail="Incorrect username or password")
class TokenData(BaseModel):
username: Optional[str] = None
def get_user(username: str):
try:
user = json.loads(User.objects.get(username=username).to_json())
return user
except User.DoesNotExist:
return None
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(username=token_data.username)
if user is None:
raise credentials_exception
return user
@router.get("/detail")
async def user_detail(current_user: User = Depends(get_current_user)):
return {"name": "Danny", "email": "[email protected]"}
import uvicorn
from fastapi import FastAPI
from src.user import main as user_main
from mongoengine import connect, disconnect
app = FastAPI()
connect('my_db_fast_api', host='127.0.0.1', port=27107)
app.include_router(user_main.router)
if __name__ == '__main__':
uvicorn.run(app, host='127.0.0.1', port=8005)
print("running")