Secure FastAPI Application with Oauth2 JWT

  • Last updated Apr 25, 2024

In this tutorial, we will show you on how to secure FastAPI REST APIs with Oauth2 JWT. To demonstrate this example, we will create the following REST APIs:

  1. Create a new user.
  2. Log in to obtain an access token.
  3. Access the secured APIs using the access token obtained after a successful login.

To store user data, we'll use a MongoDB database.

FastAPI is based on OpenAPI. OpenAPI was previously known as Swagger. Swagger was orginally designed to generate interactive documentation interfaces.

FastAPI provides several tools for implementing security easily without a big amount of effort and code.

FastAPI defines several security schemes. Some are as follows:

  • apiKey: This is an application specific key which can come from a header, cookie or a query parameter.
  • http: A standard http authentication system that includes the following:
    • bearer : A header Authorization value with a token. It is inherited from OAuth2.
    • HTTP basic authentication
    • HTTP Digest
  • Oauth2: Includes all the OAuth2 process of handling security flows:
    • The implicit, clientCredentials, and authorizationCode flows are appropriate for creating authentication providers like Google, Facebook, etc.
    • The password flow can be used for handling authentication in the same application directly.
  • openIdConnect: This provides a way to discover Oauth2 authentication data automatically. The automatic discovery is defined in the OpenID connect specification.

Follow the steps below to complete this tutorial:

  1. Start by creating a new project folder:
  2. mkdir project_name
  3. Navigate to your newly created project folder:
  4. cd project_name
  5. Create a virtual environment inside the project root directory using the following command:
  6. py -m venv env
    python3 -m venv env
  7. Activate the virtual environment using the command:
  8. .\env\Scripts\activate
    source env/bin/activate
  9. Install the following dependencies for FastAPI, Uvicorn, passlib, python-jose[cryptography], python-multipart, mongoengine:
  10. pip install fastapi
    pip install "uvicorn[standard]"
    pip install passlib
    pip install python-jose[cryptography]
    pip install python-multipart
    pip install mongoengine
  11. Create a directory structure for your FastAPI project. A common structure should look like this:
  12. ├── project_name
    │   │── env
    │   ├── routes
    │   │     └── api.py
    │   ├── src
    │   │    ├── __init__.py
    │   │    ├── endpoints
    │   │    │      ├── __init__.py
    │   │    │      ├── product_service.py
    │   │    │      └── user_service.py
    │   │    │
    │   │    └── models
    │   │           ├── __init__.py
    │   │           └── user.py
    │   │            
    │   ├── __init__.py
    │   ├── main.py
    

    There are folders named models and endpoints inside the src folder. The models folder contains user.py file. Similarly, the endpoints folder contains user_service.py and product_service.py files with API endpoints.

    There is a __init__.py file in every directory or subdirectory. This presence of __init__.py files allows importing code from one file into another.

  13. Create a model file named user.py in the src/models directory:
  14. from mongoengine import Document, StringField, DateTimeField, IntField, BooleanField
    import datetime
    
    class UserModel(Document):
        _id = IntField()
        username = StringField(max_length=250, required=True)
        password = StringField(max_length=250, required=True)
        disabled = BooleanField(default=False)
        created_date = DateTimeField(default=datetime.datetime.utcnow)
        modified_date = DateTimeField(default=datetime.datetime.utcnow)
  15. Create a service file named user_service.py in the src/endpoints directory:
  16. from datetime import datetime, timedelta
    from fastapi import APIRouter, Depends, HTTPException, status
    from pydantic import BaseModel
    from typing import Optional
    from src.models.user import UserModel
    from passlib.context import CryptContext
    from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
    import json
    from jose import jwt, JWTError
    
    router = APIRouter(
        prefix="/users",
        tags=["users"],
        responses={404: {"description": "Not found"}}
    )
    
    class NewUser(BaseModel):
        username: str
        password: str
    
    crypt_context = CryptContext(schemes=["sha256_crypt", "md5_crypt"])
    
    def get_password_hash(password):
        return crypt_context.hash(password)
    
    oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
    
    @router.post("/signup")
    async def sign_up(newUser: NewUser):
        user = UserModel(username=newUser.username,
                    password=get_password_hash(newUser.password))
        user.save()
        return {"message": "Created user successfully!"}
    
    # run the following on terminal to generate a secret key
    # openssl rand -hex 32
    SECRET_KEY = "3e8a3f31aab886f8793176988f8298c9265f84b8388c9fef93635b08951f379b"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES = 30
    
    def create_access_token(data: dict, expires_delta: timedelta):
        to_encode = data.copy()
        expire = datetime.utcnow() + expires_delta
        to_encode.update({"exp": expire})
        encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
        return encoded_jwt
    
    
    def verify_password(plain_password, hashed_password):
        return crypt_context.verify(plain_password, hashed_password)
    
    
    def authenticate(username, password):
        try:
            user = get_user(username)
            password_check = verify_password(password, user['password'])
            return password_check
        except UserModel.DoesNotExist:
            return False
    
    class Token(BaseModel):
        access_token: str
        token_type: str
    
    @router.post("/token" , response_model=Token)
    async def login(form_data: OAuth2PasswordRequestForm = Depends()):
        username = form_data.username
        password = form_data.password
        if authenticate(username, password):
            access_token = create_access_token(
                data={"sub": username}, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
            return {"access_token": access_token, "token_type": "bearer"}
        else:
            raise HTTPException(
                status_code=400, detail="Incorrect username or password")
    
    
    class TokenData(BaseModel):
        username: Optional[str] = None
    
    
    def get_user(username: str):
        try:
            user = json.loads(UserModel.objects.get(username=username).to_json())
            return user
        except UserModel.DoesNotExist:
            return None
    
    
    async def get_current_user(token: str = Depends(oauth2_scheme)):
        credentials_exception = HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
            username: str = payload.get("sub")
            if username is None:
                raise credentials_exception
            token_data = TokenData(username=username)
        except JWTError:
            raise credentials_exception
        user = get_user(username=token_data.username)
        if user is None:
            raise credentials_exception
        return user
    
    @router.get("/detail")
    async def user_detail(current_user: UserModel = Depends(get_current_user)):
        return {"name": "Danny", "email": "danny@tutorialsbuddy.com"}
  17. Create another service file named product_service.py in the src/endpoints directory:
  18. from fastapi import APIRouter, Depends
    from src.models.user import UserModel
    from src.endpoints.user_service import get_current_user
    
    #APIRouter creates path operations for item module
    router = APIRouter(
        prefix="/products",
        tags=["Product"],
        responses={404: {"description": "Not found"}},
    )
    
    @router.get("/detail")
    async def read_item_detail(current_user: UserModel = Depends(get_current_user)):
        return {"products": [{"id": 1}, {"name": "Smart Phone"}]}
        
  19. Create a route file named api.py in the routes/ directory:
  20. from fastapi import APIRouter
    from src.endpoints import user_service, product_service
    
    router = APIRouter()
    router.include_router(user_service.router)
    router.include_router(product_service.router)
  21. Create a main file named main.py in the project's root directory:
  22. import uvicorn
    from fastapi import FastAPI
    from src.endpoints import user_service as user
    from mongoengine import connect, disconnect
    
    
    import uvicorn
    from fastapi.middleware.cors import CORSMiddleware
    from fastapi import FastAPI
    from routes.api import router as api_router
    
    app = FastAPI()
    
    # MongoDB connection parameters
    db_host = "127.0.0.1"
    db_port = "27017"
    db_name = "your-database-name"
    db_username = "your-username"
    db_password = "your-password"
    
    # Connection string
    connection_string = f"mongodb://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}"
    
    # Connect to the database
    connect(host=connection_string)
    
    origins = ["http://localhost:8005"]
    
    app.add_middleware(
        CORSMiddleware,
        allow_origins=origins,
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )
    
    app.include_router(api_router)
    
    if __name__ == '__main__':
        uvicorn.run(app, host='127.0.0.1', port=8005)
        print("running...")
  23. Run the application:
  24. python main.py

    Run your application and open http://127.0.0.1:8005/docs in your web browser. You will see list of APIs that you can test:

    Calling API to create a new user:

    Calling login API to retrieve access token:

    Calling secured API using access token: