Secure FastAPI Application with Oauth2 JWT
In this tutorial, we will show you on how to secure FastAPI REST APIs with Oauth2 JWT. To demonstrate this example, we will create the following REST APIs:
- Create a new user.
- Log in to obtain an access token.
- Access the secured APIs using the access token obtained after a successful login.
To store user data, we'll use a MongoDB database.
FastAPI is based on OpenAPI. OpenAPI was previously known as Swagger. Swagger was orginally designed to generate interactive documentation interfaces.
FastAPI provides several tools for implementing security easily without a big amount of effort and code.
FastAPI defines several security schemes. Some are as follows:
- apiKey: This is an application specific key which can come from a header, cookie or a query parameter.
- http: A standard http authentication system that includes the following:
- bearer : A header Authorization value with a token. It is inherited from OAuth2.
- HTTP basic authentication
- HTTP Digest
- Oauth2: Includes all the OAuth2 process of handling security flows:
- The implicit, clientCredentials, and authorizationCode flows are appropriate for creating authentication providers like Google, Facebook, etc.
- The password flow can be used for handling authentication in the same application directly.
- openIdConnect: This provides a way to discover Oauth2 authentication data automatically. The automatic discovery is defined in the OpenID connect specification.
Follow the steps below to complete this tutorial:
- Start by creating a new project folder:
- Navigate to your newly created project folder:
- Create a virtual environment inside the project root directory using the following command:
- Activate the virtual environment using the command:
- Install the following dependencies for FastAPI, Uvicorn, passlib, python-jose[cryptography], python-multipart, mongoengine:
- Create a directory structure for your FastAPI project. A common structure should look like this:
- Create a model file named user.py in the src/models directory:
- Create a service file named user_service.py in the src/endpoints directory:
- Create another service file named product_service.py in the src/endpoints directory:
- Create a route file named api.py in the routes/ directory:
- Create a main file named main.py in the project's root directory:
- Run the application:
mkdir project_name
cd project_name
py -m venv env
python3 -m venv env
.\env\Scripts\activate
source env/bin/activate
pip install fastapi
pip install "uvicorn[standard]"
pip install passlib
pip install python-jose[cryptography]
pip install python-multipart
pip install mongoengine
├── project_name │ │── env │ ├── routes │ │ └── api.py │ ├── src │ │ ├── __init__.py │ │ ├── endpoints │ │ │ ├── __init__.py │ │ │ ├── product_service.py │ │ │ └── user_service.py │ │ │ │ │ └── models │ │ ├── __init__.py │ │ └── user.py │ │ │ ├── __init__.py │ ├── main.py
There are folders named models and endpoints inside the src folder. The models folder contains user.py file. Similarly, the endpoints folder contains user_service.py and product_service.py files with API endpoints.
There is a __init__.py file in every directory or subdirectory. This presence of __init__.py files allows importing code from one file into another.
from mongoengine import Document, StringField, DateTimeField, IntField, BooleanField
import datetime
class UserModel(Document):
_id = IntField()
username = StringField(max_length=250, required=True)
password = StringField(max_length=250, required=True)
disabled = BooleanField(default=False)
created_date = DateTimeField(default=datetime.datetime.utcnow)
modified_date = DateTimeField(default=datetime.datetime.utcnow)
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from typing import Optional
from src.models.user import UserModel
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
import json
from jose import jwt, JWTError
router = APIRouter(
prefix="/users",
tags=["users"],
responses={404: {"description": "Not found"}}
)
class NewUser(BaseModel):
username: str
password: str
crypt_context = CryptContext(schemes=["sha256_crypt", "md5_crypt"])
def get_password_hash(password):
return crypt_context.hash(password)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@router.post("/signup")
async def sign_up(newUser: NewUser):
user = UserModel(username=newUser.username,
password=get_password_hash(newUser.password))
user.save()
return {"message": "Created user successfully!"}
# run the following on terminal to generate a secret key
# openssl rand -hex 32
SECRET_KEY = "3e8a3f31aab886f8793176988f8298c9265f84b8388c9fef93635b08951f379b"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_password(plain_password, hashed_password):
return crypt_context.verify(plain_password, hashed_password)
def authenticate(username, password):
try:
user = get_user(username)
password_check = verify_password(password, user['password'])
return password_check
except UserModel.DoesNotExist:
return False
class Token(BaseModel):
access_token: str
token_type: str
@router.post("/token" , response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
username = form_data.username
password = form_data.password
if authenticate(username, password):
access_token = create_access_token(
data={"sub": username}, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
return {"access_token": access_token, "token_type": "bearer"}
else:
raise HTTPException(
status_code=400, detail="Incorrect username or password")
class TokenData(BaseModel):
username: Optional[str] = None
def get_user(username: str):
try:
user = json.loads(UserModel.objects.get(username=username).to_json())
return user
except UserModel.DoesNotExist:
return None
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(username=token_data.username)
if user is None:
raise credentials_exception
return user
@router.get("/detail")
async def user_detail(current_user: UserModel = Depends(get_current_user)):
return {"name": "Danny", "email": "danny@tutorialsbuddy.com"}
from fastapi import APIRouter, Depends
from src.models.user import UserModel
from src.endpoints.user_service import get_current_user
#APIRouter creates path operations for item module
router = APIRouter(
prefix="/products",
tags=["Product"],
responses={404: {"description": "Not found"}},
)
@router.get("/detail")
async def read_item_detail(current_user: UserModel = Depends(get_current_user)):
return {"products": [{"id": 1}, {"name": "Smart Phone"}]}
from fastapi import APIRouter
from src.endpoints import user_service, product_service
router = APIRouter()
router.include_router(user_service.router)
router.include_router(product_service.router)
import uvicorn
from fastapi import FastAPI
from src.endpoints import user_service as user
from mongoengine import connect, disconnect
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI
from routes.api import router as api_router
app = FastAPI()
# MongoDB connection parameters
db_host = "127.0.0.1"
db_port = "27017"
db_name = "your-database-name"
db_username = "your-username"
db_password = "your-password"
# Connection string
connection_string = f"mongodb://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}"
# Connect to the database
connect(host=connection_string)
origins = ["http://localhost:8005"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(api_router)
if __name__ == '__main__':
uvicorn.run(app, host='127.0.0.1', port=8005)
print("running...")
python main.py
Run your application and open http://127.0.0.1:8005/docs in your web browser. You will see list of APIs that you can test:
Calling API to create a new user:
Calling login API to retrieve access token:
Calling secured API using access token: