Create REST API to perform CRUD Operations using FastAPI and MongoDB

  • Last updated Apr 25, 2024

In this tutorial, you'll learn how to create REST API to perform CRUD operations using FastAPI with MongoDB.

Follow the steps below to complete this tutorial:

  1. Start by creating a new project folder:
  2. mkdir sample_fastapi_mongodb_app
  3. Navigate to your newly created project folder:
  4. cd sample_fastapi_mongodb_app
  5. Create a virtual environment inside the project root directory using the following command:
  6. py -m venv env
    python3 -m venv env
  7. Activate the virtual environment using the command:
  8. .\env\Scripts\activate
    source env/bin/activate
  9. Install FastAPI:
  10. pip install fastapi
  11. Install uvicorn:
  12. pip install uvicorn
  13. Install pydantic to validate email:
  14. pip install pydantic[email]
  15. Install motor. It is a coroutine-based API for non-blocking access to MongoDB:
  16. pip install motor
  17. Create a directory structure for your FastAPI project. A common structure should look like this:
  18. ├── sample_fastapi_mongodb_app
    │   │── env
    │   │── db
    │   │    └── database.py
    │   │
    │   ├── endpoints
    │   │     └── product.py
    │   │     └── user.py
    │   │
    │   ├── models
    │   │     └── product.py
    │   │     └── PyObjectId.py
    │   │
    │   ├── routes
    │   │     └── api.py
    │   │
    │   ├── __init__.py
    │   ├── main.py
    
  19. Create a model file named product.py in the src/models directory:
  20. from pydantic import BaseModel, EmailStr, Field
    from typing import Optional
    import datetime
    from models.PyObjectId import PyObjectId
    
    class ProductModel(BaseModel):
        name: str = Field(
            None, title="Product Name", max_length=500
        )
        price: float = Field(..., gt=0,
                             description="The price must be greater than zero")
        created_by: EmailStr = Field(
            None, title="Creater Email"
        )
    
        def as_dict(self):
            return {"name": self.name,
                    "price": self.price,
                    "created_by": self.created_by,
                    "created_at": datetime.datetime.now()}
    
    
    class ProductUpdateModel(BaseModel):
        id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
        name: str = Field(
            None, title="Product Name", max_length=500
        )
        price: float = Field(..., gt=0,
                             description="The price must be greater than zero")
        updated_by: Optional[EmailStr] = Field(
            None, title="Updater Email"
        )
    
        def as_dict(self):
            return {"id": self.id,
                    "name": self.name,
                    "price": self.price,
                    "updated_at": datetime.datetime.now(),
                    "updated_by": self.updated_by}
    
    
    def ResponseModel(data, code, message, error):
        return {
            "data": [data],
            "code": code,
            "message": message,
            "error": error
        }
  21. Create a file named PyObjectId.py model file in the src/models directory:
  22. from pydantic import BaseModel, Field as PydanticField
    from bson import ObjectId
    
    class PyObjectId(ObjectId):
    
        @classmethod
        def __get_validators__(cls):
    
            yield cls.validate
    
        @classmethod
        def validate(cls, v):
    
            if not ObjectId.is_valid(v):
    
                raise ValueError("Invalid objectid")
    
            return ObjectId(v)
    
        @classmethod
        def __modify_schema__(cls, field_schema):
    
            field_schema.update(type="string")
  23. Create database.py file in the db directory:
  24. # driver for mongodb
    import motor.motor_asyncio
    from models.PyObjectId import PyObjectId
    
    db_host = "localhost"
    db_port = "27017"
    db_username = "your-username"
    db_password = "your-password"
    mongodb_string = "mongodb://{db_username}:{db_password}@{db_host}:{db_port}"
    db_name = "test_database"
    
    
    class Database():
        def __init__(self) -> None:
            self.connected = False
            self.mongodb_client = None
    
        async def db_connection(self):
            if self.connected == False:
                self.client = motor.motor_asyncio.AsyncIOMotorClient(mongodb_string)
                self.connected = True
            db = self.client[db_name]
            return db
    
    
    database = Database()
    
    # Add a new product into the database
    async def add_product(product_data: dict):
        db = await database.db_connection()
        product = await db.product.insert_one(product_data)
        new_product = await db.product.find_one({"_id": product.inserted_id})
        return to_product(new_product)
    
    
    # Retrieve a product by id
    async def read_product(id: str):
        db = await database.db_connection()
        product = await db.product.find_one({"_id": PyObjectId(id)})
        if product:
            return to_product(product)
        return None
    
    
    # Update a product by id
    async def update_product(id: str, product_data: dict):
        if len(product_data) < 1:
            return False
        db = await database.db_connection()
        product = await db.product.find_one({"_id": PyObjectId(id)})
        if product:
            product["name"] = product_data.get("name")
            product["price"] = product_data.get("price")
            product["updated_by"] = product_data.get("updated_by")
            updated_product = await db.product.update_one({"_id": PyObjectId(id)}, {"$set": product})
            return updated_product.acknowledged
        return False
    
    # Delete a product from the database
    
    
    async def delete_product(id: str):
        db = await database.db_connection()
        product = await db.product.find_one({"_id": PyObjectId(id)})
        if product:
            await db.product.delete_one({"_id": PyObjectId(id)})
            return True
        else:
            return False
    
    
    def to_product(item) -> dict:
        return {
            "id": str(item.get("_id")),
            "name": item.get("name"),
            "price": item.get("price"),
            "created_at": item.get("created_at"),
            "created_by": item.get("created_by"),
            "updated_at": item.get("updated_at"),
            "updated_by": item.get("updated_by")
        }
    
    def to_product_list(items) -> list:
        return [to_product(item) for item in items]
  25. Create a file named product_service.py in the src/endpoints directory:
  26. from fastapi import APIRouter
    from models.product import ProductModel, ProductUpdateModel, ResponseModel
    from db.database import add_product, read_product, read_products, update_product, delete_product
    
    # APIRouter creates path operations for product module
    router = APIRouter(
        prefix="/products",
        tags=["Product"],
        responses={404: {"description": "Not found"}},
    )
    
    
    @router.post("/add", response_description="Product data added into the database")
    async def add_product_data(product: ProductModel):
        product = product.as_dict()
        new_product = await add_product(product)
        return ResponseModel(new_product, 200, "Product added successfully.", False)
    
    
    @router.put("/update")
    async def update_product_data(product: ProductUpdateModel):
        product = product.as_dict()
        updated_product = await update_product(product.get("id"), product)
        return ResponseModel(updated_product, 200, "Product updated successfully.", False)
    
    
    @router.delete("/{product_id}/delete")
    async def delete_product_data(product_id: str):
        deleted_result = await delete_product(product_id)
        return ResponseModel(deleted_result, 200, "Product deleted successfully.", False)
    
    
    @router.get("/{product_id}")
    async def read_product_data(product_id: str):
        product = await read_product(product_id)
        return ResponseModel(product, 200, "Product retrieved successfully.", False)
  27. Create user_service.py file for user API endpoints in the src/endpoints directory. We are not saving user data to database in this example:
  28. from fastapi import APIRouter
    
    # APIRouter creates path operations for product module
    router = APIRouter(
        prefix="/users",
        tags=["User"],
        responses={404: {"description": "Not found"}},
    )
    
    @router.get("/")
    async def read_user_data():
        return {"name": "John", "email": "john@example.com"}
  29. Create api.py file in the routes directory to handle API routing:
  30. from fastapi import APIRouter
    from endpoints import product_service, user_service
    
    router = APIRouter()
    router.include_router(product_service.router)
    router.include_router(user_service.router)
  31. Create a main file named main.py in the project's root directory:
  32. import uvicorn
    from fastapi.middleware.cors import CORSMiddleware
    from fastapi import FastAPI
    from routes.api import router as api_router
    
    app = FastAPI()
    
    origins = ["http://localhost:8000"]
    
    app.add_middleware(
        CORSMiddleware,
        allow_origins=origins,
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )
    
    app.include_router(api_router)
    
    if __name__ == '__main__':
        uvicorn.run("main:app", host='127.0.0.1', port=8000, log_level="info", reload=True)
        print("running")
  33. Run the application:
  34. python main.py

    To test your APIs, open http://127.0.0.1:8005/docs in your web browser. You will see a list of APIs ready for testing