Create REST API to perform CRUD Operations using FastAPI and MongoDB

In this tutorial, you'll learn how to create REST API to perform CRUD operations using FastAPI with MongoDB.

Follow the steps below to complete this tutorial:

Create a sample Application

  1. Create a project folder and navigate to it:
  2. mkdir sample_fastapi_mongodb_app
    cd sample_fastapi_mongodb_app
  3. Installing all Python dependencies in a virtualenv for the project is always a good idea. So, lets create a virtual environment inside the project root directory using the following command:
  4. 
         virtualenv env
        
  5. Activate the virtual environment using the command:
  6. On ubuntu/Linux/Mac
    
        source env/bin/activate
        
    On Windows
    
        env/Scripts/activate
        

Installing Dependencies

You'll need to install a few dependencies, such as FastAPI, uvicorn, and Motor. Make sure your virtualenv is activated before running pip.

  1. Install FastAPI:
  2. pip install fastapi
  3. Install uvicorn. Uvicorn is an implementation of ASGI server for fast performance. To install uvicorn, use the following command:
  4. pip install uvicorn
  5. Install motor. It is a coroutine-based API for non-blocking access to MongoDB.
  6. pip install motor
  7. Install pydantic to validate email:
  8. pip install pydantic[email]

Create Files and Folders

Create folders and files as shown below to make your project's file structure look like this:


├── sample_fastapi_mongodb_app
│   │── env
│   │── db
│   │    └── database.py
│   │
│   ├── endpoints
│   │     └── product.py
│   │     └── user.py
│   │
│   ├── models
│   │     └── product.py
│   │     └── PyObjectId.py
│   │
│   ├── routes
│   │     └── api.py
│   │
│   ├── __init__.py
│   ├── main.py   

Create Models

Create product.py file and add the following code to it:


from pydantic import BaseModel, EmailStr, Field
from typing import Optional
import datetime
from models.PyObjectId import PyObjectId

class ProductModel(BaseModel):
    name: str = Field(
        None, title="Product Name", max_length=500
    )
    price: float = Field(..., gt=0,
                         description="The price must be greater than zero")
    created_by: EmailStr = Field(
        None, title="Creater Email"
    )

    def as_dict(self):
        return {"name": self.name,
                "price": self.price,
                "created_by": self.created_by,
                "created_at": datetime.datetime.now()}


class ProductUpdateModel(BaseModel):
    id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
    name: str = Field(
        None, title="Product Name", max_length=500
    )
    price: float = Field(..., gt=0,
                         description="The price must be greater than zero")
    updated_by: Optional[EmailStr] = Field(
        None, title="Updater Email"
    )

    def as_dict(self):
        return {"id": self.id,
                "name": self.name,
                "price": self.price,
                "updated_at": datetime.datetime.now(),
                "updated_by": self.updated_by}


def ResponseModel(data, code, message, error):
    return {
        "data": [data],
        "code": code,
        "message": message,
        "error": error
    }

Create PyObjectId.py model file:


from pydantic import BaseModel, Field as PydanticField
from bson import ObjectId

class PyObjectId(ObjectId):

    @classmethod
    def __get_validators__(cls):

        yield cls.validate

    @classmethod
    def validate(cls, v):

        if not ObjectId.is_valid(v):

            raise ValueError("Invalid objectid")

        return ObjectId(v)

    @classmethod
    def __modify_schema__(cls, field_schema):

        field_schema.update(type="string")

Connect Database

Create database.py file:


# driver for mongodb
import motor.motor_asyncio
from models.PyObjectId import PyObjectId

MONGODB_URL = "mongodb://testuser:[email protected]:27017"
DATABASE_NAME = "online_shop_db"


class Database():
    def __init__(self) -> None:
        self.connected = False
        self.mongodb_client = None

    async def db_connection(self):
        if self.connected == False:
            self.client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URL)
            self.connected = True
        db = self.client[DATABASE_NAME]
        return db


database = Database()

# Add a new product into the database


async def add_product(product_data: dict):
    db = await database.db_connection()
    product = await db.product.insert_one(product_data)
    new_product = await db.product.find_one({"_id": product.inserted_id})
    return to_product(new_product)


# Retrieve a product by id
async def read_product(id: str):
    db = await database.db_connection()
    product = await db.product.find_one({"_id": PyObjectId(id)})
    if product:
        return to_product(product)
    return None


# Update a product by id
async def update_product(id: str, product_data: dict):
    if len(product_data) < 1:
        return False
    db = await database.db_connection()
    product = await db.product.find_one({"_id": PyObjectId(id)})
    if product:
        product["name"] = product_data.get("name")
        product["price"] = product_data.get("price")
        product["updated_by"] = product_data.get("updated_by")
        updated_product = await db.product.update_one({"_id": PyObjectId(id)}, {"$set": product})
        return updated_product.acknowledged
    return False

# Delete a product from the database


async def delete_product(id: str):
    db = await database.db_connection()
    product = await db.product.find_one({"_id": PyObjectId(id)})
    if product:
        await db.product.delete_one({"_id": PyObjectId(id)})
        return True
    else:
        return False


def to_product(item) -> dict:
    return {
        "id": str(item.get("_id")),
        "name": item.get("name"),
        "price": item.get("price"),
        "created_at": item.get("created_at"),
        "created_by": item.get("created_by"),
        "updated_at": item.get("updated_at"),
        "updated_by": item.get("updated_by")
    }

def to_product_list(items) -> list:
    return [to_product(item) for item in items]

Create API Endpoints

Create product.py inside the endpoints directory:


from fastapi import APIRouter
from models.product import ProductModel, ProductUpdateModel, ResponseModel
from db.database import add_product, read_product, read_products, update_product, delete_product

# APIRouter creates path operations for product module
router = APIRouter(
    prefix="/products",
    tags=["Product"],
    responses={404: {"description": "Not found"}},
)


@router.post("/add", response_description="Product data added into the database")
async def add_product_data(product: ProductModel):
    product = product.as_dict()
    new_product = await add_product(product)
    return ResponseModel(new_product, 200, "Product added successfully.", False)


@router.put("/update")
async def update_product_data(product: ProductUpdateModel):
    product = product.as_dict()
    updated_product = await update_product(product.get("id"), product)
    return ResponseModel(updated_product, 200, "Product updated successfully.", False)


@router.delete("/{product_id}/delete")
async def delete_product_data(product_id: str):
    deleted_result = await delete_product(product_id)
    return ResponseModel(deleted_result, 200, "Product deleted successfully.", False)


@router.get("/{product_id}")
async def read_product_data(product_id: str):
    product = await read_product(product_id)
    return ResponseModel(product, 200, "Product retrieved successfully.", False)

Create user.py file for user API endpoints. We are not saving user data to database in this example:


from fastapi import APIRouter

# APIRouter creates path operations for product module
router = APIRouter(
    prefix="/users",
    tags=["User"],
    responses={404: {"description": "Not found"}},
)

@router.get("/")
async def read_user_data():
    return {"name": "John", "email": "[email protected]"}

Create Routes

Create api.py file to handle API routing:


from fastapi import APIRouter
from endpoints import product, user

router = APIRouter()
router.include_router(product.router)
router.include_router(user.router)

Create Main File

Create main.py file with the following code:


import uvicorn
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI
from routes.api import router as api_router

app = FastAPI()

origins = ["http://localhost:8000"]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.include_router(api_router)

if __name__ == '__main__':
    uvicorn.run("main:app", host='127.0.0.1', port=8000, log_level="info", reload=True)
    print("running")

Test Endpoints

Run main.py file using the following command:


python main.py    

To test the endpoints, go to your browser and open http://localhost:8000/docs. You will see list of APIs that you can test.