Secure FastAPI Application with Oauth2 JWT

In this tutorial, we will learn how to sign up, login for token, and secure FastAPI application with Oauth2 JWT. We will use MongoDB database to persist users data.

FastAPI provides several tools for implementing security easily without a big amount of effort and code.

FastAPI is based on OpenAPI. OpenAPI was previously known as Swagger. Swagger was orginally designed to generate interactive documentation interfaces.

FastAPI defines several security schemes. Some are as follows:

  • apiKey: This is an application specific key which can come from a header, cookie or a query parameter.
  • http: A standard http authentication system that includes the following:
    • bearer : A header Authorization value with a token. It is inherited from OAuth2.
    • HTTP basic authentication
    • HTTP Digest
  • Oauth2: Includes all the OAuth2 process of handling security flows:
    • The implicit, clientCredentials, and authorizationCode flows are appropriate for creating authentication providers like Google, Facebook, etc.
    • The password flow can be used for handling authentication in the same application directly.
  • openIdConnect: This provides a way to discover Oauth2 authentication data automatically. The automatic discovery is defined in the OpenID connect specification.

Follow the steps below to complete this tutorial:

  1. Create a Python project with the following file structure:
  2. 
    ├── app
    │   │── env
    │   ├── __init__.py
    │   ├── main.py
    │   └── src
    │   │   ├── __init__.py
    │   │   ├── user
    │   │   │    ├── __init__.py
    │   │   │    └── main.py
    
    

    The user folder has a main.py file for user APIs.

    There is one __init__.py file in every directory or subdirectory. This presence of __init__.py files allows importing code from one file into another.

  3. Install the following dependencies for FastAPI, Uvicorn, passlib, python-jose[cryptography], python-multipart, mongoengine:
  4. 
    pip install fastapi
    
    
    pip install uvicorn
    
    
    pip install passlib
    
    
    pip install python-jose[cryptography]
    
    
    pip install python-multipart
    
    
    pip install mongoengine
    
  5. Create src/models.py file and add User class for saving user data in MongoDB:
  6. 
    from mongoengine import Document, StringField, DateTimeField, IntField, BooleanField
    import datetime
    
    class User(Document):
        _id = IntField()
        username = StringField(max_length=250, required=True)
        password = StringField(max_length=250, required=True)
        disabled = BooleanField(default=False)
        date_created = DateTimeField(default=datetime.datetime.utcnow)
        date_modified = DateTimeField(default=datetime.datetime.utcnow)
    
  7. Create user related APIs in the src/user/main.py file:
  8. 
    from datetime import datetime, timedelta
    from fastapi import APIRouter, Depends, HTTPException, Query, status
    from pydantic import BaseModel, Field
    from typing import Optional
    from src.models import User
    from passlib.context import CryptContext
    from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
    import json
    from jose import jwt, JWTError
    
    router = APIRouter(
        prefix="/users",
        tags=["users"],
        responses={404: {"description": "Not found"}}
    )
    
    class NewUser(BaseModel):
        username: str
        password: str
    
    crypt_context = CryptContext(schemes=["sha256_crypt", "md5_crypt"])
    
    def get_password_hash(password):
        return crypt_context.hash(password)
    
    oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
    
    @router.post("/signup")
    async def sign_up(newUser: NewUser):
        user = User(username=newUser.username,
                    password=get_password_hash(newUser.password))
        user.save()
        return {"message": "Created user successfully!"}
    
    # run the following on terminal to generate a secret key
    # openssl rand -hex 32
    SECRET_KEY = "3e8a3f31aab886f8793176988f8298c9265f84b8388c9fef93635b08951f379b"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES = 30
    
    def create_access_token(data: dict, expires_delta: timedelta):
        to_encode = data.copy()
        expire = datetime.utcnow() + expires_delta
        to_encode.update({"exp": expire})
        encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
        return encoded_jwt
    
    
    def verify_password(plain_password, hashed_password):
        return crypt_context.verify(plain_password, hashed_password)
    
    
    def authenticate(username, password):
        try:
            user = get_user(username)
            password_check = verify_password(password, user['password'])
            return password_check
        except User.DoesNotExist:
            return False
    
    class Token(BaseModel):
        access_token: str
        token_type: str
    
    @router.post("/token" , response_model=Token)
    async def login(form_data: OAuth2PasswordRequestForm = Depends()):
        username = form_data.username
        password = form_data.password
        if authenticate(username, password):
            access_token = create_access_token(
                data={"sub": username}, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
            return {"access_token": access_token, "token_type": "bearer"}
        else:
            raise HTTPException(
                status_code=400, detail="Incorrect username or password")
    
    
    class TokenData(BaseModel):
        username: Optional[str] = None
    
    
    def get_user(username: str):
        try:
            user = json.loads(User.objects.get(username=username).to_json())
            return user
        except User.DoesNotExist:
            return None
    
    
    async def get_current_user(token: str = Depends(oauth2_scheme)):
        credentials_exception = HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
            username: str = payload.get("sub")
            if username is None:
                raise credentials_exception
            token_data = TokenData(username=username)
        except JWTError:
            raise credentials_exception
        user = get_user(username=token_data.username)
        if user is None:
            raise credentials_exception
        return user
    
    @router.get("/detail")
    async def user_detail(current_user: User = Depends(get_current_user)):
        return {"name": "Danny", "email": "[email protected]"}
    
  9. On the app/main.py file add the code as shown in the example below:
  10. 
    import uvicorn
    from fastapi import FastAPI
    from src.user import main as user_main
    from mongoengine import connect, disconnect
    
    app = FastAPI()
    connect('my_db_fast_api', host='127.0.0.1', port=27107)
    
    app.include_router(user_main.router)
    
    if __name__ == '__main__':
        uvicorn.run(app, host='127.0.0.1', port=8005)
        print("running")
    
  11. Run your application and open http://127.0.0.1:8005/docs on your web browser.
  12. Sign up a user with /users/signup POST API and use /users/token POST API to retrieve access token. Use this access token to access /users/detail GET API which is protected.