Add an API Endpoint
This guide walks through adding a new API endpoint to MenoTime from start to finish.
Overview
Adding an endpoint involves: 1. Creating a Pydantic request/response schema 2. Writing a service function with business logic 3. Adding the FastAPI route 4. Writing tests 5. Testing locally 6. Committing and deploying
Example: Create a Patient Profile Endpoint
Let's walk through adding a POST endpoint to create a new patient profile.
Step 1: Create the Pydantic Schema
Schemas define the request and response formats. Create a new file or add to existing schema:
# app/schemas/patient.py
from pydantic import BaseModel, EmailStr, Field
from datetime import datetime
from typing import Optional
# Request schema (data from client)
class PatientCreate(BaseModel):
"""Request body for creating a new patient."""
email: EmailStr = Field(..., description="Patient email address")
first_name: str = Field(..., min_length=1, max_length=100)
last_name: str = Field(..., min_length=1, max_length=100)
date_of_birth: Optional[datetime] = Field(None, description="YYYY-MM-DD format")
class Config:
json_schema_extra = {
"example": {
"email": "jane@example.com",
"first_name": "Jane",
"last_name": "Doe",
"date_of_birth": "1985-06-15"
}
}
# Response schema (data sent to client)
class PatientResponse(BaseModel):
"""Response body for patient endpoints."""
id: int
email: str
first_name: str
last_name: str
date_of_birth: Optional[datetime] = None
is_active: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True # Enable ORM mode
# List response (for pagination)
class PatientListResponse(BaseModel):
"""Response body for listing patients."""
total: int = Field(..., description="Total patient count")
limit: int = Field(default=20, description="Items per page")
offset: int = Field(default=0, description="Offset for pagination")
items: list[PatientResponse]
Step 2: Write the Service Function
Services contain business logic, database queries, and validations:
# app/services/patient_service.py
from sqlalchemy.orm import Session
from sqlalchemy import select, func
from app.models.patient import Patient
from app.schemas.patient import PatientCreate, PatientResponse
from fastapi import HTTPException, status
import logging
logger = logging.getLogger(__name__)
class PatientService:
"""Service for patient-related operations."""
@staticmethod
def create_patient(db: Session, patient_data: PatientCreate) -> Patient:
"""
Create a new patient in the database.
Args:
db: Database session
patient_data: Patient creation request
Returns:
Created Patient model instance
Raises:
HTTPException: If email already exists
"""
# Check if email already exists
existing = db.execute(
select(Patient).where(Patient.email == patient_data.email)
).scalar_one_or_none()
if existing:
logger.warning(f"Attempt to create patient with duplicate email: {patient_data.email}")
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Patient with email {patient_data.email} already exists"
)
# Create new patient
patient = Patient(
email=patient_data.email,
first_name=patient_data.first_name,
last_name=patient_data.last_name,
date_of_birth=patient_data.date_of_birth,
is_active=True
)
db.add(patient)
db.commit()
db.refresh(patient)
logger.info(f"Created new patient: {patient.id} ({patient.email})")
return patient
@staticmethod
def get_patient(db: Session, patient_id: int) -> Patient:
"""
Retrieve a patient by ID.
Args:
db: Database session
patient_id: Patient ID
Returns:
Patient model instance
Raises:
HTTPException: If patient not found
"""
patient = db.execute(
select(Patient).where(Patient.id == patient_id)
).scalar_one_or_none()
if not patient:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Patient {patient_id} not found"
)
return patient
@staticmethod
def list_patients(db: Session, limit: int = 20, offset: int = 0) -> tuple[list[Patient], int]:
"""
List all patients with pagination.
Args:
db: Database session
limit: Number of items to return (max 100)
offset: Number of items to skip
Returns:
Tuple of (patients list, total count)
"""
# Validate limits
limit = min(limit, 100)
if limit < 1:
limit = 20
if offset < 0:
offset = 0
# Get total count
total = db.execute(select(func.count(Patient.id))).scalar()
# Get paginated results
patients = db.execute(
select(Patient)
.limit(limit)
.offset(offset)
.order_by(Patient.created_at.desc())
).scalars().all()
return patients, total
Step 3: Add the FastAPI Route
Routes handle HTTP requests and responses:
# app/api/v1/patients.py
from fastapi import APIRouter, Depends, status
from sqlalchemy.orm import Session
from app.database import get_db
from app.schemas.patient import PatientCreate, PatientResponse, PatientListResponse
from app.services.patient_service import PatientService
from app.middleware.auth import get_current_user
router = APIRouter(
prefix="/patients",
tags=["patients"],
responses={401: {"description": "Not authenticated"}}
)
@router.post(
"",
status_code=status.HTTP_201_CREATED,
response_model=PatientResponse,
summary="Create a new patient",
description="Create a new patient record in the system"
)
def create_patient(
patient_data: PatientCreate,
db: Session = Depends(get_db),
current_user = Depends(get_current_user)
) -> PatientResponse:
"""
Create a new patient.
**Required permissions:** Can create patients in their organization
**Request body:**
- `email`: Patient email (must be unique)
- `first_name`: Patient first name
- `last_name`: Patient last name
- `date_of_birth`: Optional date of birth
**Response:** Created patient with ID and timestamps
"""
patient = PatientService.create_patient(db, patient_data)
return PatientResponse.from_orm(patient)
@router.get(
"/{patient_id}",
response_model=PatientResponse,
summary="Get patient by ID",
responses={404: {"description": "Patient not found"}}
)
def get_patient(
patient_id: int,
db: Session = Depends(get_db),
current_user = Depends(get_current_user)
) -> PatientResponse:
"""
Retrieve a specific patient by their ID.
**Path parameters:**
- `patient_id`: The ID of the patient to retrieve
**Response:** Patient data with timestamps
"""
patient = PatientService.get_patient(db, patient_id)
return PatientResponse.from_orm(patient)
@router.get(
"",
response_model=PatientListResponse,
summary="List all patients",
description="Retrieve a paginated list of all patients"
)
def list_patients(
limit: int = 20,
offset: int = 0,
db: Session = Depends(get_db),
current_user = Depends(get_current_user)
) -> PatientListResponse:
"""
List all patients with pagination.
**Query parameters:**
- `limit`: Number of results per page (default: 20, max: 100)
- `offset`: Number of results to skip (default: 0)
**Response:** Paginated list of patients with total count
"""
patients, total = PatientService.list_patients(db, limit, offset)
return PatientListResponse(
total=total,
limit=limit,
offset=offset,
items=[PatientResponse.from_orm(p) for p in patients]
)
Step 4: Register the Router
Add the router to your main FastAPI app:
# app/main.py
from fastapi import FastAPI
from app.api.v1 import patients # Import the router
from app.middleware.cors import setup_cors
from app.middleware.auth import setup_auth
app = FastAPI(
title="MenoTime API",
description="HIPAA-compliant health platform",
version="1.0.0"
)
# Setup middleware
setup_cors(app)
setup_auth(app)
# Include routers
app.include_router(patients.router, prefix="/api/v1")
@app.get("/health")
def health():
return {"status": "healthy"}
Step 5: Write Tests
Write comprehensive tests for your endpoint:
# tests/api/v1/test_patients.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.main import app
from app.models.patient import Patient
from tests.conftest import db_session
client = TestClient(app)
@pytest.fixture
def auth_headers():
"""Return valid authorization headers for testing."""
return {"Authorization": "Bearer test-token-123"}
class TestCreatePatient:
"""Tests for POST /api/v1/patients"""
def test_create_patient_success(self, db_session, auth_headers):
"""Test successful patient creation."""
response = client.post(
"/api/v1/patients",
json={
"email": "jane@example.com",
"first_name": "Jane",
"last_name": "Doe",
"date_of_birth": "1985-06-15"
},
headers=auth_headers
)
assert response.status_code == 201
data = response.json()
assert data["email"] == "jane@example.com"
assert data["first_name"] == "Jane"
assert "id" in data
assert "created_at" in data
def test_create_patient_duplicate_email(self, db_session, auth_headers):
"""Test creation fails with duplicate email."""
# Create first patient
patient = Patient(
email="john@example.com",
first_name="John",
last_name="Doe"
)
db_session.add(patient)
db_session.commit()
# Try to create another with same email
response = client.post(
"/api/v1/patients",
json={
"email": "john@example.com",
"first_name": "Jane",
"last_name": "Smith"
},
headers=auth_headers
)
assert response.status_code == 409
assert "already exists" in response.json()["detail"]
def test_create_patient_invalid_email(self, auth_headers):
"""Test creation fails with invalid email."""
response = client.post(
"/api/v1/patients",
json={
"email": "invalid-email",
"first_name": "Jane",
"last_name": "Doe"
},
headers=auth_headers
)
assert response.status_code == 422 # Validation error
assert "email" in response.json()["detail"][0]["loc"]
def test_create_patient_missing_fields(self, auth_headers):
"""Test creation fails with missing required fields."""
response = client.post(
"/api/v1/patients",
json={"email": "jane@example.com"},
headers=auth_headers
)
assert response.status_code == 422
assert "first_name" in str(response.json())
def test_create_patient_unauthorized(self):
"""Test creation fails without authorization."""
response = client.post(
"/api/v1/patients",
json={
"email": "jane@example.com",
"first_name": "Jane",
"last_name": "Doe"
}
)
assert response.status_code == 401
class TestGetPatient:
"""Tests for GET /api/v1/patients/{patient_id}"""
def test_get_patient_success(self, db_session, auth_headers):
"""Test retrieving a patient by ID."""
# Create a patient
patient = Patient(
email="jane@example.com",
first_name="Jane",
last_name="Doe"
)
db_session.add(patient)
db_session.commit()
# Retrieve it
response = client.get(
f"/api/v1/patients/{patient.id}",
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert data["id"] == patient.id
assert data["email"] == "jane@example.com"
def test_get_patient_not_found(self, auth_headers):
"""Test retrieving non-existent patient."""
response = client.get(
"/api/v1/patients/99999",
headers=auth_headers
)
assert response.status_code == 404
assert "not found" in response.json()["detail"]
class TestListPatients:
"""Tests for GET /api/v1/patients"""
def test_list_patients_empty(self, db_session, auth_headers):
"""Test listing when no patients exist."""
response = client.get(
"/api/v1/patients",
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 0
assert data["items"] == []
def test_list_patients_with_pagination(self, db_session, auth_headers):
"""Test listing with pagination."""
# Create 5 patients
for i in range(5):
patient = Patient(
email=f"patient{i}@example.com",
first_name=f"Patient{i}",
last_name="Test"
)
db_session.add(patient)
db_session.commit()
# Get first 2
response = client.get(
"/api/v1/patients?limit=2&offset=0",
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 5
assert len(data["items"]) == 2
assert data["limit"] == 2
assert data["offset"] == 0
Step 6: Test Locally
Run tests to ensure your endpoint works:
# Run all tests
pytest
# Run only patient tests
pytest tests/api/v1/test_patients.py -v
# Run with coverage
pytest tests/api/v1/test_patients.py --cov=app --cov-report=term-missing
Test the endpoint manually while the dev server is running:
# Start the server
uvicorn app.main:app --reload
# In another terminal, test the endpoint
curl -X POST http://localhost:8000/api/v1/patients \
-H "Content-Type: application/json" \
-H "Authorization: Bearer test-token" \
-d '{
"email": "jane@example.com",
"first_name": "Jane",
"last_name": "Doe"
}'
# Get the patient
curl http://localhost:8000/api/v1/patients/1 \
-H "Authorization: Bearer test-token"
# List patients
curl http://localhost:8000/api/v1/patients \
-H "Authorization: Bearer test-token"
View the interactive API documentation at http://localhost:8000/docs
Step 7: Commit Your Changes
Create a feature branch and commit:
git checkout -b feature/add-patient-endpoints
git add app/schemas/patient.py app/services/patient_service.py app/api/v1/patients.py tests/api/v1/test_patients.py
git commit -m "feat: add patient CRUD endpoints"
Include these files: - Schema definitions - Service functions - Route handlers - Tests - Any database migrations
Step 8: Create a Pull Request
Push your branch and create a PR:
git push origin feature/add-patient-endpoints
On GitHub, create a PR with: - Clear description of the endpoints - Any breaking changes - Examples of how to use the new endpoints - Test results
API Endpoint Best Practices
1. Use Proper HTTP Status Codes
@router.post("", status_code=status.HTTP_201_CREATED) # Creation
@router.get("") # Success (200 by default)
@router.patch("/{id}") # Update
@router.delete("/{id}", status_code=status.HTTP_204_NO_CONTENT) # Delete
2. Document with OpenAPI
@router.post(
"",
status_code=status.HTTP_201_CREATED,
response_model=PatientResponse,
summary="Create a new patient",
description="Create a new patient record in the system"
)
def create_patient(...):
"""Detailed docstring shown in OpenAPI docs."""
3. Validate Input
from pydantic import BaseModel, Field, validator
class PatientCreate(BaseModel):
email: EmailStr # Pydantic validates email format
first_name: str = Field(..., min_length=1, max_length=100)
age: int = Field(..., ge=0, le=150) # Between 0 and 150
@validator('age')
def age_must_be_reasonable(cls, v):
if v < 18:
raise ValueError('Must be 18 or older')
return v
4. Handle Errors Gracefully
from fastapi import HTTPException, status
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Patient not found"
)
5. Use Path and Query Parameters Correctly
@router.get("/{patient_id}") # Path parameter (required)
def get_patient(patient_id: int):
pass
@router.get("")
def list_patients(limit: int = 20, offset: int = 0): # Query parameters
pass
6. Implement Pagination
class ListResponse(BaseModel):
total: int
limit: int
offset: int
items: list[ItemResponse]
@router.get("", response_model=ListResponse)
def list_items(limit: int = 20, offset: int = 0):
limit = min(limit, 100) # Cap at 100
items, total = service.list_items(limit, offset)
return ListResponse(total=total, limit=limit, offset=offset, items=items)