Create REST API to perform CRUD Operations using FastAPI and MongoDB
In this tutorial, you'll learn how to create REST API to perform CRUD operations using FastAPI with MongoDB.
Follow the steps below to complete this tutorial:
- Start by creating a new project folder:
- Navigate to your newly created project folder:
- Create a virtual environment inside the project root directory using the following command:
- Activate the virtual environment using the command:
- Install FastAPI:
- Install uvicorn:
- Install pydantic to validate email:
- Install motor. It is a coroutine-based API for non-blocking access to MongoDB:
- Create a directory structure for your FastAPI project. A common structure should look like this:
- Create a model file named product.py in the src/models directory:
- Create a file named PyObjectId.py model file in the src/models directory:
- Create database.py file in the db directory:
- Create a file named product_service.py in the src/endpoints directory:
- Create user_service.py file for user API endpoints in the src/endpoints directory. We are not saving user data to database in this example:
- Create api.py file in the routes directory to handle API routing:
- Create a main file named main.py in the project's root directory:
- Run the application:
mkdir sample_fastapi_mongodb_app
cd sample_fastapi_mongodb_app
py -m venv env
python3 -m venv env
.\env\Scripts\activate
source env/bin/activate
pip install fastapi
pip install uvicorn
pip install pydantic[email]
pip install motor
├── sample_fastapi_mongodb_app │ │── env │ │── db │ │ └── database.py │ │ │ ├── endpoints │ │ └── product.py │ │ └── user.py │ │ │ ├── models │ │ └── product.py │ │ └── PyObjectId.py │ │ │ ├── routes │ │ └── api.py │ │ │ ├── __init__.py │ ├── main.py
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
import datetime
from models.PyObjectId import PyObjectId
class ProductModel(BaseModel):
name: str = Field(
None, title="Product Name", max_length=500
)
price: float = Field(..., gt=0,
description="The price must be greater than zero")
created_by: EmailStr = Field(
None, title="Creater Email"
)
def as_dict(self):
return {"name": self.name,
"price": self.price,
"created_by": self.created_by,
"created_at": datetime.datetime.now()}
class ProductUpdateModel(BaseModel):
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
name: str = Field(
None, title="Product Name", max_length=500
)
price: float = Field(..., gt=0,
description="The price must be greater than zero")
updated_by: Optional[EmailStr] = Field(
None, title="Updater Email"
)
def as_dict(self):
return {"id": self.id,
"name": self.name,
"price": self.price,
"updated_at": datetime.datetime.now(),
"updated_by": self.updated_by}
def ResponseModel(data, code, message, error):
return {
"data": [data],
"code": code,
"message": message,
"error": error
}
from pydantic import BaseModel, Field as PydanticField
from bson import ObjectId
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid objectid")
return ObjectId(v)
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")
# driver for mongodb
import motor.motor_asyncio
from models.PyObjectId import PyObjectId
db_host = "localhost"
db_port = "27017"
db_username = "your-username"
db_password = "your-password"
mongodb_string = "mongodb://{db_username}:{db_password}@{db_host}:{db_port}"
db_name = "test_database"
class Database():
def __init__(self) -> None:
self.connected = False
self.mongodb_client = None
async def db_connection(self):
if self.connected == False:
self.client = motor.motor_asyncio.AsyncIOMotorClient(mongodb_string)
self.connected = True
db = self.client[db_name]
return db
database = Database()
# Add a new product into the database
async def add_product(product_data: dict):
db = await database.db_connection()
product = await db.product.insert_one(product_data)
new_product = await db.product.find_one({"_id": product.inserted_id})
return to_product(new_product)
# Retrieve a product by id
async def read_product(id: str):
db = await database.db_connection()
product = await db.product.find_one({"_id": PyObjectId(id)})
if product:
return to_product(product)
return None
# Update a product by id
async def update_product(id: str, product_data: dict):
if len(product_data) < 1:
return False
db = await database.db_connection()
product = await db.product.find_one({"_id": PyObjectId(id)})
if product:
product["name"] = product_data.get("name")
product["price"] = product_data.get("price")
product["updated_by"] = product_data.get("updated_by")
updated_product = await db.product.update_one({"_id": PyObjectId(id)}, {"$set": product})
return updated_product.acknowledged
return False
# Delete a product from the database
async def delete_product(id: str):
db = await database.db_connection()
product = await db.product.find_one({"_id": PyObjectId(id)})
if product:
await db.product.delete_one({"_id": PyObjectId(id)})
return True
else:
return False
def to_product(item) -> dict:
return {
"id": str(item.get("_id")),
"name": item.get("name"),
"price": item.get("price"),
"created_at": item.get("created_at"),
"created_by": item.get("created_by"),
"updated_at": item.get("updated_at"),
"updated_by": item.get("updated_by")
}
def to_product_list(items) -> list:
return [to_product(item) for item in items]
from fastapi import APIRouter
from models.product import ProductModel, ProductUpdateModel, ResponseModel
from db.database import add_product, read_product, read_products, update_product, delete_product
# APIRouter creates path operations for product module
router = APIRouter(
prefix="/products",
tags=["Product"],
responses={404: {"description": "Not found"}},
)
@router.post("/add", response_description="Product data added into the database")
async def add_product_data(product: ProductModel):
product = product.as_dict()
new_product = await add_product(product)
return ResponseModel(new_product, 200, "Product added successfully.", False)
@router.put("/update")
async def update_product_data(product: ProductUpdateModel):
product = product.as_dict()
updated_product = await update_product(product.get("id"), product)
return ResponseModel(updated_product, 200, "Product updated successfully.", False)
@router.delete("/{product_id}/delete")
async def delete_product_data(product_id: str):
deleted_result = await delete_product(product_id)
return ResponseModel(deleted_result, 200, "Product deleted successfully.", False)
@router.get("/{product_id}")
async def read_product_data(product_id: str):
product = await read_product(product_id)
return ResponseModel(product, 200, "Product retrieved successfully.", False)
from fastapi import APIRouter
# APIRouter creates path operations for product module
router = APIRouter(
prefix="/users",
tags=["User"],
responses={404: {"description": "Not found"}},
)
@router.get("/")
async def read_user_data():
return {"name": "John", "email": "john@example.com"}
from fastapi import APIRouter
from endpoints import product_service, user_service
router = APIRouter()
router.include_router(product_service.router)
router.include_router(user_service.router)
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI
from routes.api import router as api_router
app = FastAPI()
origins = ["http://localhost:8000"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(api_router)
if __name__ == '__main__':
uvicorn.run("main:app", host='127.0.0.1', port=8000, log_level="info", reload=True)
print("running")
python main.py
To test your APIs, open http://127.0.0.1:8005/docs in your web browser. You will see a list of APIs ready for testing