Skip to content

Add an API Endpoint

This guide walks through adding a new API endpoint to MenoTime from start to finish.

Overview

Adding an endpoint involves: 1. Creating a Pydantic request/response schema 2. Writing a service function with business logic 3. Adding the FastAPI route 4. Writing tests 5. Testing locally 6. Committing and deploying

Example: Create a Patient Profile Endpoint

Let's walk through adding a POST endpoint to create a new patient profile.

Step 1: Create the Pydantic Schema

Schemas define the request and response formats. Create a new file or add to existing schema:

# app/schemas/patient.py
from pydantic import BaseModel, EmailStr, Field
from datetime import datetime
from typing import Optional

# Request schema (data from client)
class PatientCreate(BaseModel):
    """Request body for creating a new patient."""
    email: EmailStr = Field(..., description="Patient email address")
    first_name: str = Field(..., min_length=1, max_length=100)
    last_name: str = Field(..., min_length=1, max_length=100)
    date_of_birth: Optional[datetime] = Field(None, description="YYYY-MM-DD format")

    class Config:
        json_schema_extra = {
            "example": {
                "email": "jane@example.com",
                "first_name": "Jane",
                "last_name": "Doe",
                "date_of_birth": "1985-06-15"
            }
        }

# Response schema (data sent to client)
class PatientResponse(BaseModel):
    """Response body for patient endpoints."""
    id: int
    email: str
    first_name: str
    last_name: str
    date_of_birth: Optional[datetime] = None
    is_active: bool
    created_at: datetime
    updated_at: datetime

    class Config:
        from_attributes = True  # Enable ORM mode

# List response (for pagination)
class PatientListResponse(BaseModel):
    """Response body for listing patients."""
    total: int = Field(..., description="Total patient count")
    limit: int = Field(default=20, description="Items per page")
    offset: int = Field(default=0, description="Offset for pagination")
    items: list[PatientResponse]

Step 2: Write the Service Function

Services contain business logic, database queries, and validations:

# app/services/patient_service.py
from sqlalchemy.orm import Session
from sqlalchemy import select, func
from app.models.patient import Patient
from app.schemas.patient import PatientCreate, PatientResponse
from fastapi import HTTPException, status
import logging

logger = logging.getLogger(__name__)

class PatientService:
    """Service for patient-related operations."""

    @staticmethod
    def create_patient(db: Session, patient_data: PatientCreate) -> Patient:
        """
        Create a new patient in the database.

        Args:
            db: Database session
            patient_data: Patient creation request

        Returns:
            Created Patient model instance

        Raises:
            HTTPException: If email already exists
        """
        # Check if email already exists
        existing = db.execute(
            select(Patient).where(Patient.email == patient_data.email)
        ).scalar_one_or_none()

        if existing:
            logger.warning(f"Attempt to create patient with duplicate email: {patient_data.email}")
            raise HTTPException(
                status_code=status.HTTP_409_CONFLICT,
                detail=f"Patient with email {patient_data.email} already exists"
            )

        # Create new patient
        patient = Patient(
            email=patient_data.email,
            first_name=patient_data.first_name,
            last_name=patient_data.last_name,
            date_of_birth=patient_data.date_of_birth,
            is_active=True
        )

        db.add(patient)
        db.commit()
        db.refresh(patient)

        logger.info(f"Created new patient: {patient.id} ({patient.email})")
        return patient

    @staticmethod
    def get_patient(db: Session, patient_id: int) -> Patient:
        """
        Retrieve a patient by ID.

        Args:
            db: Database session
            patient_id: Patient ID

        Returns:
            Patient model instance

        Raises:
            HTTPException: If patient not found
        """
        patient = db.execute(
            select(Patient).where(Patient.id == patient_id)
        ).scalar_one_or_none()

        if not patient:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail=f"Patient {patient_id} not found"
            )

        return patient

    @staticmethod
    def list_patients(db: Session, limit: int = 20, offset: int = 0) -> tuple[list[Patient], int]:
        """
        List all patients with pagination.

        Args:
            db: Database session
            limit: Number of items to return (max 100)
            offset: Number of items to skip

        Returns:
            Tuple of (patients list, total count)
        """
        # Validate limits
        limit = min(limit, 100)
        if limit < 1:
            limit = 20
        if offset < 0:
            offset = 0

        # Get total count
        total = db.execute(select(func.count(Patient.id))).scalar()

        # Get paginated results
        patients = db.execute(
            select(Patient)
            .limit(limit)
            .offset(offset)
            .order_by(Patient.created_at.desc())
        ).scalars().all()

        return patients, total

Step 3: Add the FastAPI Route

Routes handle HTTP requests and responses:

# app/api/v1/patients.py
from fastapi import APIRouter, Depends, status
from sqlalchemy.orm import Session
from app.database import get_db
from app.schemas.patient import PatientCreate, PatientResponse, PatientListResponse
from app.services.patient_service import PatientService
from app.middleware.auth import get_current_user

router = APIRouter(
    prefix="/patients",
    tags=["patients"],
    responses={401: {"description": "Not authenticated"}}
)

@router.post(
    "",
    status_code=status.HTTP_201_CREATED,
    response_model=PatientResponse,
    summary="Create a new patient",
    description="Create a new patient record in the system"
)
def create_patient(
    patient_data: PatientCreate,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
) -> PatientResponse:
    """
    Create a new patient.

    **Required permissions:** Can create patients in their organization

    **Request body:**
    - `email`: Patient email (must be unique)
    - `first_name`: Patient first name
    - `last_name`: Patient last name
    - `date_of_birth`: Optional date of birth

    **Response:** Created patient with ID and timestamps
    """
    patient = PatientService.create_patient(db, patient_data)
    return PatientResponse.from_orm(patient)


@router.get(
    "/{patient_id}",
    response_model=PatientResponse,
    summary="Get patient by ID",
    responses={404: {"description": "Patient not found"}}
)
def get_patient(
    patient_id: int,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
) -> PatientResponse:
    """
    Retrieve a specific patient by their ID.

    **Path parameters:**
    - `patient_id`: The ID of the patient to retrieve

    **Response:** Patient data with timestamps
    """
    patient = PatientService.get_patient(db, patient_id)
    return PatientResponse.from_orm(patient)


@router.get(
    "",
    response_model=PatientListResponse,
    summary="List all patients",
    description="Retrieve a paginated list of all patients"
)
def list_patients(
    limit: int = 20,
    offset: int = 0,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
) -> PatientListResponse:
    """
    List all patients with pagination.

    **Query parameters:**
    - `limit`: Number of results per page (default: 20, max: 100)
    - `offset`: Number of results to skip (default: 0)

    **Response:** Paginated list of patients with total count
    """
    patients, total = PatientService.list_patients(db, limit, offset)
    return PatientListResponse(
        total=total,
        limit=limit,
        offset=offset,
        items=[PatientResponse.from_orm(p) for p in patients]
    )

Step 4: Register the Router

Add the router to your main FastAPI app:

# app/main.py
from fastapi import FastAPI
from app.api.v1 import patients  # Import the router
from app.middleware.cors import setup_cors
from app.middleware.auth import setup_auth

app = FastAPI(
    title="MenoTime API",
    description="HIPAA-compliant health platform",
    version="1.0.0"
)

# Setup middleware
setup_cors(app)
setup_auth(app)

# Include routers
app.include_router(patients.router, prefix="/api/v1")

@app.get("/health")
def health():
    return {"status": "healthy"}

Step 5: Write Tests

Write comprehensive tests for your endpoint:

# tests/api/v1/test_patients.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.main import app
from app.models.patient import Patient
from tests.conftest import db_session

client = TestClient(app)

@pytest.fixture
def auth_headers():
    """Return valid authorization headers for testing."""
    return {"Authorization": "Bearer test-token-123"}


class TestCreatePatient:
    """Tests for POST /api/v1/patients"""

    def test_create_patient_success(self, db_session, auth_headers):
        """Test successful patient creation."""
        response = client.post(
            "/api/v1/patients",
            json={
                "email": "jane@example.com",
                "first_name": "Jane",
                "last_name": "Doe",
                "date_of_birth": "1985-06-15"
            },
            headers=auth_headers
        )

        assert response.status_code == 201
        data = response.json()
        assert data["email"] == "jane@example.com"
        assert data["first_name"] == "Jane"
        assert "id" in data
        assert "created_at" in data

    def test_create_patient_duplicate_email(self, db_session, auth_headers):
        """Test creation fails with duplicate email."""
        # Create first patient
        patient = Patient(
            email="john@example.com",
            first_name="John",
            last_name="Doe"
        )
        db_session.add(patient)
        db_session.commit()

        # Try to create another with same email
        response = client.post(
            "/api/v1/patients",
            json={
                "email": "john@example.com",
                "first_name": "Jane",
                "last_name": "Smith"
            },
            headers=auth_headers
        )

        assert response.status_code == 409
        assert "already exists" in response.json()["detail"]

    def test_create_patient_invalid_email(self, auth_headers):
        """Test creation fails with invalid email."""
        response = client.post(
            "/api/v1/patients",
            json={
                "email": "invalid-email",
                "first_name": "Jane",
                "last_name": "Doe"
            },
            headers=auth_headers
        )

        assert response.status_code == 422  # Validation error
        assert "email" in response.json()["detail"][0]["loc"]

    def test_create_patient_missing_fields(self, auth_headers):
        """Test creation fails with missing required fields."""
        response = client.post(
            "/api/v1/patients",
            json={"email": "jane@example.com"},
            headers=auth_headers
        )

        assert response.status_code == 422
        assert "first_name" in str(response.json())

    def test_create_patient_unauthorized(self):
        """Test creation fails without authorization."""
        response = client.post(
            "/api/v1/patients",
            json={
                "email": "jane@example.com",
                "first_name": "Jane",
                "last_name": "Doe"
            }
        )

        assert response.status_code == 401


class TestGetPatient:
    """Tests for GET /api/v1/patients/{patient_id}"""

    def test_get_patient_success(self, db_session, auth_headers):
        """Test retrieving a patient by ID."""
        # Create a patient
        patient = Patient(
            email="jane@example.com",
            first_name="Jane",
            last_name="Doe"
        )
        db_session.add(patient)
        db_session.commit()

        # Retrieve it
        response = client.get(
            f"/api/v1/patients/{patient.id}",
            headers=auth_headers
        )

        assert response.status_code == 200
        data = response.json()
        assert data["id"] == patient.id
        assert data["email"] == "jane@example.com"

    def test_get_patient_not_found(self, auth_headers):
        """Test retrieving non-existent patient."""
        response = client.get(
            "/api/v1/patients/99999",
            headers=auth_headers
        )

        assert response.status_code == 404
        assert "not found" in response.json()["detail"]


class TestListPatients:
    """Tests for GET /api/v1/patients"""

    def test_list_patients_empty(self, db_session, auth_headers):
        """Test listing when no patients exist."""
        response = client.get(
            "/api/v1/patients",
            headers=auth_headers
        )

        assert response.status_code == 200
        data = response.json()
        assert data["total"] == 0
        assert data["items"] == []

    def test_list_patients_with_pagination(self, db_session, auth_headers):
        """Test listing with pagination."""
        # Create 5 patients
        for i in range(5):
            patient = Patient(
                email=f"patient{i}@example.com",
                first_name=f"Patient{i}",
                last_name="Test"
            )
            db_session.add(patient)
        db_session.commit()

        # Get first 2
        response = client.get(
            "/api/v1/patients?limit=2&offset=0",
            headers=auth_headers
        )

        assert response.status_code == 200
        data = response.json()
        assert data["total"] == 5
        assert len(data["items"]) == 2
        assert data["limit"] == 2
        assert data["offset"] == 0

Step 6: Test Locally

Run tests to ensure your endpoint works:

# Run all tests
pytest

# Run only patient tests
pytest tests/api/v1/test_patients.py -v

# Run with coverage
pytest tests/api/v1/test_patients.py --cov=app --cov-report=term-missing

Test the endpoint manually while the dev server is running:

# Start the server
uvicorn app.main:app --reload

# In another terminal, test the endpoint
curl -X POST http://localhost:8000/api/v1/patients \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer test-token" \
  -d '{
    "email": "jane@example.com",
    "first_name": "Jane",
    "last_name": "Doe"
  }'

# Get the patient
curl http://localhost:8000/api/v1/patients/1 \
  -H "Authorization: Bearer test-token"

# List patients
curl http://localhost:8000/api/v1/patients \
  -H "Authorization: Bearer test-token"

View the interactive API documentation at http://localhost:8000/docs

Step 7: Commit Your Changes

Create a feature branch and commit:

git checkout -b feature/add-patient-endpoints
git add app/schemas/patient.py app/services/patient_service.py app/api/v1/patients.py tests/api/v1/test_patients.py
git commit -m "feat: add patient CRUD endpoints"

Include these files: - Schema definitions - Service functions - Route handlers - Tests - Any database migrations

Step 8: Create a Pull Request

Push your branch and create a PR:

git push origin feature/add-patient-endpoints

On GitHub, create a PR with: - Clear description of the endpoints - Any breaking changes - Examples of how to use the new endpoints - Test results

API Endpoint Best Practices

1. Use Proper HTTP Status Codes

@router.post("", status_code=status.HTTP_201_CREATED)  # Creation
@router.get("")  # Success (200 by default)
@router.patch("/{id}")  # Update
@router.delete("/{id}", status_code=status.HTTP_204_NO_CONTENT)  # Delete

2. Document with OpenAPI

@router.post(
    "",
    status_code=status.HTTP_201_CREATED,
    response_model=PatientResponse,
    summary="Create a new patient",
    description="Create a new patient record in the system"
)
def create_patient(...):
    """Detailed docstring shown in OpenAPI docs."""

3. Validate Input

from pydantic import BaseModel, Field, validator

class PatientCreate(BaseModel):
    email: EmailStr  # Pydantic validates email format
    first_name: str = Field(..., min_length=1, max_length=100)
    age: int = Field(..., ge=0, le=150)  # Between 0 and 150

    @validator('age')
    def age_must_be_reasonable(cls, v):
        if v < 18:
            raise ValueError('Must be 18 or older')
        return v

4. Handle Errors Gracefully

from fastapi import HTTPException, status

raise HTTPException(
    status_code=status.HTTP_404_NOT_FOUND,
    detail="Patient not found"
)

5. Use Path and Query Parameters Correctly

@router.get("/{patient_id}")  # Path parameter (required)
def get_patient(patient_id: int):
    pass

@router.get("")
def list_patients(limit: int = 20, offset: int = 0):  # Query parameters
    pass

6. Implement Pagination

class ListResponse(BaseModel):
    total: int
    limit: int
    offset: int
    items: list[ItemResponse]

@router.get("", response_model=ListResponse)
def list_items(limit: int = 20, offset: int = 0):
    limit = min(limit, 100)  # Cap at 100
    items, total = service.list_items(limit, offset)
    return ListResponse(total=total, limit=limit, offset=offset, items=items)