FastAPI 3 - Advanced

Auteur

Fabrice Devaux

Publicatiedatum

24 februari 2026

Inleiding

In les 1 zijn we gestart met de basisconcepten van FastAPI en een introductie tot Pydantic. Tijdens les 2 kwamen alle basisconcepten van FastAPI aan bod.

In deze laatste les bekijken we een aantal meer geavanceerde features die FastAPI biedt, zoals dependencies, security an async.

Als herhaling van de laatste twee lessen bekijken we de oplossing van de laatste Task Management API oefening. We kregen de volgende requirements:

Elke Task heeft:

  • Een ID
  • Een titel met minimumlengte van 3 karakters
  • Een optionele beschrijving
  • Een prioriteit van 1 tot 5
  • Een interne timestamp (created_at) die automatisch wordt ingesteld
    • Is enkel voor intern gebruik en wordt nooit teruggestuurd naar de client.

API Paths:

  • POST /tasks: Tasks aanmaken
    • De aangemaakte task wordt in de respons body teruggestuurd met status code 201.
  • GET /tasks/{task_id}: Één task opvragen
  • GET /tasks: Tasks oplijsten en filteren
    • Stuurt by default alle tasks terug in een lijst
    • Query parameter min_priority (int) beperkt de lijst tot tasks met ten minste deze priority
    • Query parameter q (str) beperkt de lijst tot tasks waarbij deze string in de titel voorkomt (substring match)
  • PATCH /tasks/{task_id}: Task aanpassen
    • Past een of meerdere van de volgende velden aan: title, description, priority (partiële update)
    • Stuurt aangepaste task terug
  • POST /preferences: Gebruikersvoorkeuren opslaan
    • Één voorkeur: een default minimum prioriteit die gebruikt moet worden bij het GET /tasks path

Voor de ‘opslag’ van de tasks een globale dictionary gebruiken.

Een volledige voorbeeldoplossing kan je bekijken op https://github.com/dvx76/fastapi-tutorial/blob/main/les2/oefening1/main.py

Dependencies

https://fastapi.tiangolo.com/tutorial/dependencies/

FastAPI heeft een ingebouwd dependency injection mechanisme. Het gebruik hiervan is niet verplicht maar het vormt samen met de functionaliteit die we al zagen wel een belangrijk pluspunt van FastAPI.

We starten met een algemene uitleg rond het concept van dependency injection.

Dependency Injection

Dependency Injection (DI) is een algemeen (specifiek noch aan FastAPI of Python) programmeertechniek waarbij een functie of object zijn afhankelijkheden niet zelf aanmaakt, maar ze van buitenaf aangeleverd krijgt.

Hierdoor verkrijgen we een lossere koppeling tussen code, waardoor deze duidelijker, flexibeler en vooral makkelijker testbaar wordt.

Voorbeeld

Stel dat je een functie moet schrijven die op basis van een “Post ID” de details van een overeenkomstige blogpost ophaalt via een API en de titel van die blogpost teruggeeft.

Een eerste implementatie kan er als volgt uitzien:

import requests

def get_post_title(post_id: int) -> str:
    session = requests.Session()
    response = session.get(f"https://jsonplaceholder.typicode.com/posts/{post_id}")
    response.raise_for_status()
    return response.json()["title"]

print(get_post_title(1))

De get_post_title functie maakt zelf een requests.Session object aan om de API aan te spreken. We zeggen dat de functie afhankelijk is van (depends on) requests.Session.

Hierdoor is de functie sterk gekoppeld aan requests.Session en is het testen van de eigenlijke logica in de functie problematisch. Tijdens testen willen we namelijk geen echte API call uitvoeren, maar wel met voorspelbare test-data werken.

De oplossing is om het requests.Session object (de dependency) mee te geven aan de functie (te injecteren):

import requests

def get_post_title(post_id: int, session: requests.Session) -> str:
    response = session.get(f"https://jsonplaceholder.typicode.com/posts/{post_id}")
    response.raise_for_status()
    return response.json()["title"]

session = requests.Session()
print(get_post_title(1, session))

Testen kan nu vrij eenvoudig door een dummy object mee te geven bij het aanroepen van de functie.

Hieronder zie je een eenvoudig functioneel voorbeeld (uitvoerbaar met uv run pytest test.py) van hoe je dit kan doen.

test.py
from dataclasses import dataclass
import requests

def get_post_title(post_id: int, session: requests.Session) -> str:
    response = session.get(f"https://jsonplaceholder.typicode.com/posts/{post_id}")
    response.raise_for_status()
    return response.json()["title"]

@dataclass
class DummySession:
    get_response: requests.Response

    def get(self, url: str):
        return self.get_response

def test_get_post_title_success():
    dummy_response = requests.Response()
    dummy_response.status_code = 200
    dummy_response._content = b'{"title": "foo"}'
    dummy_session = DummySession(get_response=dummy_response)

    assert get_post_title(1, dummy_session) == "foo"

DI Frameworks

I.p.v. “handmatig” verschillende dependencies samen te stellen (zoals we in bovenstaand voorbeeld eerst zelf een requests.Session object hebben gemaakt) kan ook gebruik worden gemaakt van een zogenaamd dependency-injection framework.

Dependency-injector is een populaire optie. We gaan hier niet veel verder op in maar bekijken wel het concept.

Het vorig voorbeeld kan met dependency_injector als volgt herschreven worden. Het belangrijkste concept is dat het session functieargument nu een default waarde krijgt. Het DI framework maakt automatisch een requests.Session object aan en gebruikt hetzelfde object voor elke get_post_title aanroep.

from dependency_injector import containers, providers
from dependency_injector.wiring import inject, Provide

class Container(containers.DeclarativeContainer):
    session = providers.Singleton(requests.Session)

@inject
def get_post_title(
    post_id: int,
    session: requests.Session = Provide[Container.session]
) -> str:
    response = session.get(f"https://jsonplaceholder.typicode.com/posts/{post_id}")
    response.raise_for_status()
    return response.json()["title"]

Eenvoudige DI met FastAPI

FastAPI heeft een eigen dependency injection mechanisme dat, zoals vrijwel alles in FastAPI, gebaseerd is op type hints en geannoteerde types.

In het volgende voorbeeld worden gebruikers opgehaald vanuit een externe API. De logica en foutafhandeling om deze externe gebruikers op te halen kan uitgewerkt worden in een aparte functie. Die functie kan dan als dependency in de path functies van onze eigen API gebruikt worden.

main.py
import requests
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status

def get_external_users() -> list[dict]:
    response = requests.get("https://jsonplaceholder.typicode.com/users")
    try:
        response.raise_for_status()
    except requests.HTTPError:
2        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Failed to retrieve users",
        )
    return response.json()

app = FastAPI()

@app.get("/users")
1def list_users(external_users: Annotated[list[dict], Depends(get_external_users)]):
    return external_users

@app.get("/users/{user_id}")
def get_user(
3    user_id: int, external_users: Annotated[list[dict], Depends(get_external_users)]
):
    try:
        return next(u for u in external_users if u["id"] == user_id)
    except StopIteration:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="User not found",
        )
1
Depends krijgt een functienaam (een “Callable”) als argument. Deze functie wordt dynamisch uitgevoerd om de waarde van het external_users argument van list_users (en get_user) te bepalen.
2
Vanuit de dependency functie kan een HTTPException geraised worden die de uitvoering zal onderbreken - alsof de exception rechtstreeks uit de path functie komt.
3
Andere path functies kunnen dezelfde dependency hergebruiken.

Request Informatie gebruiken in Dependencies

In het vorige voorbeeld heeft de dependency-functie (get_external_users) geen argumenten. Heeft die functie wel argumenten dan werken die alsof ze in de eigenlijke path functie zouden staan!

M.a.w. een path of query parameter kan rechtstreeks in een dependency functie als parameter worden gebruikt.

Bovendien kunnen dependencies zelf ook dependencies hebben. Beide concepten illustreren we door het vorig voorbeeld aan te passen zonder de functionaliteit te wijzigen.

main.py
import requests
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status

def get_external_users() -> list[dict]:
    response = requests.get("https://jsonplaceholder.typicode.com/users")
    try:
        response.raise_for_status()
    except requests.HTTPError:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Failed to retrieve users",
        )
    return response.json()

def get_external_user(
2    user_id: int, external_users: Annotated[list[dict], Depends(get_external_users)]
) -> dict:
    try:
        return next(u for u in external_users if u["id"] == user_id)
    except StopIteration:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="User not found",
        )

app = FastAPI()

@app.get("/users")
def list_users(
    external_users: Annotated[list[dict], Depends(get_external_users)],
):
    return external_users

@app.get("/users/{user_id}")
1def get_user(user_details: Annotated[dict, Depends(get_external_user)]):
    return user_details
1
Deze path functie heeft nu zijn eigen user_id path parameter niet meer als argument, maar wel een dependency op een nieuwe functie die onmiddellijk de details van 1 gebruiker teruggeeft.
2
Die nieuwe functie kan rechtstreeks de user_id path parameter gebruiken, en bovendien zelf nog een dependency hebben, om de volledige lijst met gebruikers te verkrijgen.

Globale en Decorator Dependencies

Stel dat we voor een aantal operaties een security-token verwachten in een bepaalde request header. Dit kan eenvoudig gecontroleerd worden met een dependency functie, die dan in de verschillende operatie functies kan gebruikt worden. Bijvoorbeeld:

main.py
from typing import Annotated
from fastapi import Depends, FastAPI, Header, HTTPException, status

app = FastAPI()

def verify_token(x_token: Annotated[str, Header()]):
    if x_token != "fake-super-secret-token":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"
        )

@app.get("/items/")
def read_items(authorized: Annotated[None, Depends(verify_token)]):
    return [{"item": "Foo"}, {"item": "Bar"}]

Alleen is er geen return value voor de verify_token functie. Daardoor zijn we verplicht te annoteren met None als type, en zitten we bovendien met een ongebruikt authorized argument in de operatie functie. Weinig elegant.

In de plaats kunnen we dit soort dependencies rechtstreeks als argument meegeven in de path decorator:

main.py
@app.get("/items/", dependencies=[Depends(verify_token)])
def read_items():
    return [{"item": "Foo"}, {"item": "Bar"}]

We kunnen nog een stap verder gaan en beslissen dat een dependency van toepassing is op alle operaties door deze mee te geven aan de FastAPI applicatie zelf:

main.py
app = FastAPI(dependencies=[Depends(verify_token)])

Dependencies met Yield

Het FastAPI dependency systeem is ook ideaal om een database client/connection beschikbaar te maken in path functies. Maar in de meeste gevallen willen we op het einde van de request die client of connection terug netjes beëindigen.

FastAPI ondersteunt dit door in dependency functies yield i.p.v. return te gebruiken. Om zeker te zijn dat de code na de yield altijd uitgevoerd zal worden (bvb. in geval van een exception) kan een try/finally constructie gebruikt worden.

main.py
from typing import Annotated
from fastapi import Depends, FastAPI

app = FastAPI()

class DummyDB:
    def query(self, statement: str) -> list[str]:
        return [statement]

    def close(self) -> None:
        print("Closing DB Connection")

def get_db():
    db = DummyDB()
    try:
        yield db
    finally:
        db.close()

@app.get("/items/")
def read_items(db: Annotated[DummyDB, Depends(get_db)]):
    return db.query("foo")

Oefening

We gebruiken het dependency concept van FastAPI om de code uit de oefening van les 2 te verbeteren.

Welke bestaande code vormt een goede kandidaat om naar een dependency om te zetten? Tip: we zoeken code die in verschillende path functies herhaald wordt.

Oplossing - opdracht

De code die een task in TASKS_DB opzoekt en een exception raised als de task niet bestaat wordt gebruikt in zowel get_task() als in update_task.

    try:
        return TASKS_DB[task_id]
    except KeyError:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
        )
Hiervan kunnen we één aparte functie maken en vervolgens deze functie als dependency gebruiken in de twee path functies. Die functies krijgen dan als parameter rechtstreeks een TaskInternal i.p.v. een task_id (int).
Oplossing - code
main.py
from datetime import datetime, timezone
from fastapi import (Cookie, Depends, FastAPI, HTTPException, Path, Query, Response, status)
from pydantic import BaseModel, Field
from typing_extensions import Annotated

app = FastAPI(title="Task Management API", version="1.0.0")

TASKS_DB: dict[int, "TaskInternal"] = {}

class TaskCreate(BaseModel):
    title: Annotated[str, Field(min_length=3, examples=["Learn FastAPI"])]
    description: Annotated[str | None, Field(examples=["Use the FastAPI tutorial"])] = (
        None
    )
    priority: Annotated[int, Field(ge=1, le=5, examples=[4])]

class TaskExternal(TaskCreate):
    task_id: Annotated[int, Field(serialization_alias="id", examples=[1])]

class TaskInternal(TaskExternal):
    created_at: Annotated[
        datetime, Field(default_factory=lambda: datetime.now(tz=timezone.utc))
    ]

class TaskUpdate(TaskCreate):
    title: Annotated[str | None, Field(min_length=3)] = None
    priority: Annotated[int | None, Field(ge=1, le=5)] = None

class Preference(BaseModel):
    min_priority: Annotated[int, Field(ge=1, le=5)]

TaskId = Annotated[int, Path(ge=1)]

1def get_task_from_db(task_id: TaskId) -> TaskInternal:
    try:
        return TASKS_DB[task_id]
    except KeyError:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
        )

@app.post(
    "/tasks",
    response_model=TaskExternal,
    status_code=status.HTTP_201_CREATED,
    tags=["Tasks"],
    summary="Create a new task",
)
def create_task(task: TaskCreate):
    next_task_id = max(TASKS_DB.keys(), default=0) + 1
    new_task = TaskInternal(task_id=next_task_id, **task.model_dump())

    TASKS_DB[next_task_id] = new_task

    return new_task

@app.get(
    "/tasks/{task_id}",
    response_model=TaskExternal,
    tags=["Tasks"],
    summary="Get a single task",
)
2def get_task(task: Annotated[TaskInternal, Depends(get_task_from_db)]):
    return task

@app.get(
    "/tasks", response_model=list[TaskExternal], tags=["Tasks"], summary="List tasks"
)
def list_tasks(
    min_priority: Annotated[int | None, Query(ge=1)] = None,
    query: Annotated[str | None, Query(alias="q", min_length=3)] = None,
    cookie_min_priority: Annotated[
        int | None, Cookie(alias="min_priority", include_in_schema=False)
    ] = None,
):
    results = TASKS_DB.values()

    if min_priority is None and cookie_min_priority is not None:
        min_priority = cookie_min_priority

    if min_priority is not None:
        results = [t for t in results if t.priority >= min_priority]

    if query:
        results = [t for t in results if query.lower() in t.title.lower()]

    return results

@app.patch(
    "/tasks/{task_id}",
    response_model=TaskExternal,
    tags=["Tasks"],
    summary="Update a task",
)
def update_task(
    task: Annotated[TaskInternal, Depends(get_task_from_db)], task_data: TaskUpdate
):
    update_data = task_data.model_dump(exclude_unset=True)
    updated_task = task.model_copy(update=update_data)

    TASKS_DB[task.task_id] = updated_task
    return updated_task

@app.post(
    "/preferences",
    status_code=status.HTTP_204_NO_CONTENT,
    tags=["Preferences"],
    summary="Set user preferences",
)
def set_preferences(pref: Preference, response: Response):
    response.set_cookie(
        key="min_priority",
        value=str(pref.min_priority),
    )
1
De nieuwe functie krijgt een int task_id en geeft een TaskInternal terug
2
De nieuwe functie gebruiken we als dependency in de path functies

Security

https://fastapi.tiangolo.com/tutorial/security/

Basic HTTP Authentication

Bij Flask and Connexion gebruikten we een eenvoudige Basic HTTP authenticatie. De request bevat dan een Authorization header met daarin een username en password (als een base64 username:password string).

Ter vergelijking implementeren we eerst diezelfde aanpak met FastAPI. Daarna bekijken we een betere en modernere optie.

Ook voor authenticatie gebruikt FastAPI het dependency injection mechanisme en voorziet handige helpers:

main.py
from typing import Annotated

from fastapi import Depends, FastAPI
from fastapi.security import HTTPBasic, HTTPBasicCredentials

app = FastAPI()

1http_basic_scheme = HTTPBasic()

@app.get("/users/me")
def read_current_user(
2    credentials: Annotated[HTTPBasicCredentials, Depends(http_basic_scheme)],
):
    return {"username": credentials.username, "password": credentials.password}
1
Het HTTPBasic basic object zal de Authorization header verwerken en geeft een HTTPBasicCredentials model terug.
2
HTTPBasicCredentials is een eenvoudig model met een username en password veld. In credentials komt dus zo een model en de dependency op HTTPBasic zal dit invullen

We breiden dit voorbeeld verder uit met een dummy user database waarmee we requests zullen authentificeren. Voor een eenvoud laten we password hashing nog even achterwege.

main.py
import secrets
from typing import Annotated

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from pydantic import BaseModel

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "password": "secret",
    }
}

1class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None

class UserInDB(User):
    password: str

app = FastAPI()
http_basic_scheme = HTTPBasic()

def get_user(username: str) -> UserInDB | None:
    if username in fake_users_db:
        user_dict = fake_users_db[username]
        return UserInDB(**user_dict)

4def get_current_user(
    credentials: Annotated[HTTPBasicCredentials, Depends(http_basic_scheme)],
):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Basic"},
    )

    user = get_user(credentials.username)
    if not user:
        raise credentials_exception

    if not secrets.compare_digest(credentials.password, user.password):
        raise credentials_exception

    return user

2@app.get("/users/me/", response_model=User)
def read_users_me(
3    current_user: Annotated[UserInDB, Depends(get_current_user)],
):
    return current_user
1
Een eenvoudig model van de gegevens van een gebruiker. We definiëren een apart model dat ook het password bevat. Dit laatste zullen we gebruiken voor de authenticatie-code maar in responses willen we natuurlijk nooit het password meesturen!
2
Belangrijk: door User als respons model te gebruiken verplichten we FastAPI om enkel die velden terug te sturen. Ook al bevat current_user (zie volgende stap) een UserInDB!
3
Een dependency zal voor authenticatie zorgen en indien succesvol een UserInDB meegeven in current_user.
4
De get_current_user dependency werkt op dezelfde manier als het vorige voorbeeld om HTTP Basic credentials uit de request te halen. De username wordt opgezocht in de (fake) database en het password vergeleken.

OAuth2

OAuth2 (rfc 6749) is een erg uitgebreid framework voor autorisatie en (in combinatie met OpenID Connect) authenticatie. Een typisch gebruiksscenario voor dat je zeker zelf al hebt gebruikt is het inloggen op een website via je bestaande Google, Facebook, GitHub, e.a. account.

OAuth2 komt in principe ook aan bod in de security module van de opleiding. We focussen hier bijgevolg op het gebruik in een FastAPI backend.

Tip

Wil je toch al meer leren over OAuth2 dan kan ik volgende links zeker aanraden:

Het scenario (OAuth2 gebruikt de term flow) waar we mee verdergaan is de Resource Owner Password Flow, ook wel Password Grant of gewoon Password Flow. Hierbij stuurt de client (bvb. een javascript frontend webapp, mobile app, …) via een Form de username/password naar de backend server en krijgt als antwoord een access token. Voor alle volgende requests stuurt de client dan dit token (in een Authorization header) mee.

Dat access token is een zogenaamd JWT, of JSON Web Token. Een gestandaardiseerd formaat om op een veilige manier JSON data door te sturen. De inhoud is niet geëncrypteerd, maar wel getekend (signed). Daarmee kan gecontroleerd worden dan een JWT niet is aangepast.

sequenceDiagram
    Client->>API: POST /token [username, password]
    API-->>Client: 200 {access_token: "eyJhb..."}
    Client->>API: GET /users/me (Authorization: Bearer eyJhb...)
    API-->>Client: 200 {username: "johndoe", ...}

FastAPI en OAuth2

Net als voor Basic HTTP Auth bevat FastAPI handige hulpmiddelen om met OAuth2 te werken - en opnieuw ook automatisch integratie met OpenAPI.

OAuth2 Scheme

We vertrekken vanuit het vorige voorbeeld met HTTP Basic Auth maar i.p.v. fastapi.security.HTTPBasic gebruiken we fastapi.security.OAuth2PasswordBearer.

We passen verder zo weinig mogelijk aan en kijken wat er gebeurd. Alle andere code blijft staan.

main.py
from fastapi.security import OAuth2PasswordBearer

# Bestaande code blijft staan: fake_users_db, User models, ...

# http_basic_scheme = HTTPBasic()
1oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

2def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

3    return fake_users_db["johndoe"]
1
I.p.v. een HTTPBasic object maken we een OAuth2PasswordBearer object. tokenUrl bepaalde het API route om een access_token te verkrijgen. Dit implementeren we later.
2
De dependency geeft nu een gewone str terug (token uit de Authorization header!) i.p.v. een HTTPBasicCredentials object. We doen nog even niets met dit token.
3
We hardcoden tijdelijk een specifieke user als return value. credentials_exception zullen we nog nodig hebben dus laten we staan.

Als we nu proberen autoriseren vanuit Swagger UI krijgen we een fout: auth errorError: Not Found. In de FastAPI logs zien we een request voor een path dat we nog niet geïmplementeerd hebben: "POST /token HTTP/1.1" 404.

Token Path

Het POST /token path zal een Form doorgestuurd krijgen met de username/password. FastAPI voorziet al een eigen dependency class voor deze form: fastapi.security.OAuth2PasswordBearer. Deze definieert naast username en password nog een aantal andere velden eigen aan OAuth2, zoals scope en client_id, maar deze worden hier niet gebruikt.

Dit path moet natuurlijk de username en password valideren. We laten het path eerst tijdelijk de username terugsturen als “access_token”. Later passen we dit aan naar een “echt” token.

Bijgevolg kunnen we in get_current_user het token gebruiken om de juiste user te vinden in fake_users_db.

main.py
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

# Bestaande code blijft staan: fake_users_db, User models, ...

def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    return fake_users_db[token]

@app.post("/token")
def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
    user = get_user(form_data.username)
    if not (user and form_data.password == user.password):
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}

Echte JWT tokens

Ter herinnering, een JWT, of JSON Web Token, is een gestandaardiseerd formaat om op een veilige manier JSON data door te sturen. De inhoud is niet geëncrypteerd, maar wel getekend (signed). Daarmee kan gecontroleerd worden dat een JWT niet is aangepast.

Bijvoorbeeld:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

Op de website https://www.jwt.io kan je JWTs decoden en de inhoud bekijken.

Om in Python met JWTs te werken hebben we het PyJWT package nodig. Dit moeten we dus eerst installeren.

uv add pyjwt

Voordat we dit gaan gebruiken in onze FastAPI applicatie bekijken we een voorbeeldje om de werking van PyJWT te illustreren.

Om data te encoderen (serialiseren) in een JWT moet een algoritme worden gekozen en een bijhorende geheime sleutel. Hier zullen we met het HMAC SHA256 algoritme werken. Daarvoor hebben we een 256 bit (= 32 byte of 64 hexadecimale karakters) string nodig als sleutel.

jwt_test.py
import secrets
from datetime import datetime, timedelta, timezone
import jwt

1SECRET_KEY = secrets.token_hex(32)
2ALGORITHM = "HS256"

print(f"Using secret key = {SECRET_KEY}")

now = datetime.now(timezone.utc)
data = {
    "sub": "johndoe",
    "name": "John Doe",
    "iat": now,
    "exp": now + timedelta(minutes=15),
}

3encoded_jwt = jwt.encode(data, key=SECRET_KEY, algorithm=ALGORITHM)
print(f"Encoded JWT = {encoded_jwt}")

4decoded_jwt = jwt.decode(encoded_jwt, key=SECRET_KEY, algorithms=[ALGORITHM])
print(f"Decoded JWT = {decoded_jwt}")
1
Met secrets.token_hex(32) krijgen we een random 32-byte hexadecimale string die we als sleutel kunnen gebruiken. In een productie-applicatie moeten we hiervoor natuurlijk een vaste string gebruiken die we bvb. vanuit de configuratie inlezen.
2
Het algoritme is HS256, ofwel HMAC met SHA256.
3
Met jwt.encode kan een dict geëncodeerd worden naar een JWT string.
4
Omgekeerd kan met jwt.decode een JWT string terug gedecodeerd worden.

sub (Subject), iat (Issued At) en exp (Expiry) zijn gestandaardiseerde namen van JWT velden. Deze zijn gedefinieerd in de JWT RFC 7519).

Daarnaast kunnen ook extra velden gebruikt worden (zoals hier: name) die specifiek zijn aan de applicatie.

Nu kunnen we voor de FastAPI applicatie in de login functie een echt JWT token encoderen en terugsturen, en in de get_current_user functie datzelfde token decoderen en op basis van de inhoud de juiste gebruikersdata opzoeken.

main.py
from datetime import datetime, timedelta, timezone
import jwt
from jwt.exceptions import InvalidTokenError

1SECRET_KEY = "2891cd0eaed8623ea769c2fc41960f40d8fd0a2bbbaddcb6af93e0ecd9ecb0e5"
ALGORITHM = "HS256"

def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

3    try:
        decoded_jwt = jwt.decode(token, key=SECRET_KEY, algorithms=[ALGORITHM])
    except InvalidTokenError:
        raise credentials_exception
4    username = decoded_jwt.get("sub")
    if username is None:
        raise credentials_exception

5    user = get_user(username)
    if not user:
        raise credentials_exception

    return user

@app.post("/token")
def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
    user = get_user(form_data.username)
    if not (user and form_data.password == user.password):
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    now = datetime.now(timezone.utc)
    data = {
        "iat": now,
        "exp": now + timedelta(minutes=15),
2        "sub": user.username,
        "name": user.full_name,
    }

    encoded_jwt = jwt.encode(data, key=SECRET_KEY, algorithm=ALGORITHM)
    return {"access_token": encoded_jwt, "token_type": "bearer"}
1
Nogmaals, voor een productie-omgeving zouden we een vaste SECRET_KEY uit configuratie inlezen.
2
Als sub in de JWT gebruiken we hier de username, want dat is de key in de fake_users_db dictionary. In een echt applicatie, gekoppeld aan een database, zou dit een unieke ID geassocieerd aan de user kunnen zijn.
3
Als er een probleem optreed tijdens het decoderen van de JWT wordt een HTTP 401 (Unauthorized) terug gestuurd. De WWW-Authenticate header geeft aan dat een Authorization header met een Bearer token in de request verwacht wordt.
4
Als geen sub veld in de JWT staat wordt dezelfde fout teruggestuurd.
5
Hetzelfde geldt als de waarde van die sub niet gevonden wordt in de fake_users_db.
iad and exp beïnvloeden het decoderen

Merk op dat we zelf geen code hebben geschreven die deze velden controleert. Dit gebeurt automatisch door de jwt.decode functie. Deze zal een exception geven als de huidige tijd vroeger is dan de iat, of later dan de exp!

Password Hashing

Op zich is dit niet specifiek aan OAuth2 (ofo FastAPI), maar wachtwoorden gebruiken we natuurlijk best nooit rechtstreeks.

Een hash is een wiskundige functie die voor een bepaalde input een unieke string met vaste lengte (bvb. 64 karakters) geeft. De kleinste wijziging van de input zorgt voor een volledig verschillende hash-waarde.

Een hash-waarde berekening gebeurt relatief snel, terwijl het afleiden van de oorspronkelijke input op basis van die hash-waarde heel erg lang zou duren.

Bij wachtwoorden kunnen we dus enkel de hash-waarde opslaan. Op die manier kan het echte wachtwoord niet achterhaald worden, maar kunnen we wel controleren of een bepaalde input overeenkomt met dat wachtwoord.

Tijdens de Flask lessen hebben we functies (check_password_hash/generate_password_hash) gebruikt die Flask (eigenlijk het onderliggende Werkzeug) zelf voorziet.

FastAPI (of Starlette) voorziet zelf zo geen functies maar raadt het gebruik van het pwdlib package aan. Dit ondersteunt verschillende hashing algoritmes en FastAPI raadt het moderne Argon2 aan.

uv add "pwdlib[argon2]"

Om met pwdlib wachtwoorden te kunnen hashen en controleren voegen we volgende code toe:

main.py
from pwdlib import PasswordHash

password_hash = PasswordHash.recommended()

def verify_password(plain_password, hashed_password) -> bool:
    return password_hash.verify(plain_password, hashed_password)


def get_password_hash(password) -> str:
    return password_hash.hash(password)

Om te testen, en de juiste hash voor het “secret” wachtwoord te berekenen, kunnen we het volgende uitvoeren:

$ uv run python

>>> from main import get_password_hash, verify_password

>>> get_password_hash("secret")
'$argon2id$v=19$m=65536,t=3,p=4$o8Zcvomlo/VTC+jJ7D1SiQ$hXfZ835HmErM0C8SnUTPLX2+L4DGDFwmF6uWPfMkVik'

>>> get_password_hash("secrit")
'$argon2id$v=19$m=65536,t=3,p=4$a+jJ+am07S/ccgnXSxYbKQ$hzIWtNlqTAmXQny658qF6qCgXQJKtuAg4Qk02jW5ksg'

Nu moeten we nog in fake_users_db het gehashte wachtwoord opslaan, en in de login functie verify_password gebruiken:

main.py
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "password": "$argon2id$v=19$m=65536,t=3,p=4$o8Zcvomlo/VTC+jJ7D1SiQ$hXfZ835HmErM0C8SnUTPLX2+L4DGDFwmF6uWPfMkVik",
    }
}

# ...

@app.post("/token")
def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
    user = get_user(form_data.username)
    if not (user and verify_password(form_data.password, user.password)):
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    # ...

Concurrency en async/await

Async (asynchroon) programmeren in Python komt uitgebreid aan bod in de module REST API. We bekijken hier heel kort het concept, zodat we het kunnen toepassen in het kader van FastAPI.

Synchrone Threads

Een traditioneel Python programma is synchroon (sequentieel). Functies worden de een na de ander uitgevoerd tot het programma eindigt. Dit vormt een probleem wanneer het programma moet wachten op iets externs zoals bijvoorbeeld het inlezen van een bestand, het antwoord op een API request of het resultaat van een database query. Tijdens dat wachten kan er niets anders gebeuren.

Dit is een groot probleem voor applicaties waarvan we verwachten dat ze verschillende zaken tegelijk uitvoeren. Zoals een web applicatie die verschillende HTTP requests tegelijk kan afhandelen. Stel dat de volledige afhandeling van een enkele request ongeveer 100ms duurt, dan zou een puur synchrone applicatie nooit meer dan 10 requests per seconde aankunnen!

Een eerste oplossing, waar we tot nu toe onbewust gebruik van hebben gemaakt, zijn threads. Een thread is een aparte uitvoeringslijn binnen één Python-proces, met een eigen call stack maar gedeeld geheugen, waardoor meerdere stukken code (zoals het afhandelen van verschillende HTTP-requests) schijnbaar tegelijk kunnen worden uitgevoerd. Het addertje onder het gras is dat we als programmeurs zelf geen controle hebben over welke thread op een bepaald moment wordt uitgevoerd. Dat bepaalt het besturingssysteem.

De Flask, Connexion en FastAPI voorbeelden die we tot nu hebben gezien werken op basis van threads. Voor elke nieuwe HTTP request wordt een aparte thread gestart.

Async

Met FastAPI kunnen we ook op een andere manier werken: async. Bij async hebben we maar één thread maar kan een volgende request verwerkt worden terwijl een eerste request moet wachten, bijvoorbeeld op het resultaat van een database query.

Vergelijk het met de kassier bij McDonalds die je bestelling opneemt. Als de gekozen producten niet allemaal klaarliggen geeft hij je bestelling door aan de keuken. De bereiding kan even duren tussen ondertussen kan de kassier alvast de volgende bestelling verwerken. Als de keuken klaar kan de kassier jou bestelling verder afwerken.

In deze analogie is de kassier een FastAPI app en de keuken bijvoorbeeld een SQL database. Terwijl de database een ingewikkelde query uitvoert hoeft de FastAPI path functie niet te blijven wachten en kan een andere request verwerkt worden.

Om in Python aan te geven dat een functie asynchroon is gebruik je bij de functiedeclaratie async def func_name(), i.p.v. gewoon def func_name(). Om een async functie aan te roepen gebruik je await func_name() i.p.v. gewoon func_name().

Maar…

  • Een async functie kan enkel worden aangeroepen (m.v.b. await) door een andere async functie. Er wordt soms gesproken over gekleurde functies.
  • In een async functie mag je zeker geen gewone I/O-gebonden functies gebruiken.
    • Dit zou immers de volledige applicatie ‘blokkeren’

Daarom voorziet de Python standard library in de asyncio module async versies van functies wie we al kunnen, bvb. sleep(5) -> asyncio.sleep(5). Externe packages bouwen hierop verder om async versies aan te bieden, bvb.

Zoals eerder aangehaald komen zowel multi-threading als async uitgebreid aan bod in de module REST API. Wil je dit onderwerp ondertussen toch wat verder uitdiepen bekijk dan zeker eens het Concurrency and async / await hoofdstuk in de FastAPI documentatie, of nog deze video van Corey Shafer: Python Tutorial: AsyncIO - Complete Guide to Asynchronous Programming with Animations.

FastAPI async path functies

Zoals we ondertussen gewend zijn maakt FastAPI het ons erg gemakkelijk:

  • Sync (threaded) en async path functies kunnen tegelijk en door elkaar gebruikt worden binnen dezelfde FastAPI app.
  • Om async te gebruiken hoeven we enkel de path functie async te maken.

In onderstaand voorbeeld illustreren we het verschil. We demonstreren tegelijk wat er gebeurt wanneer niet-async blokkerende functies (zoals sleep) gebruikt worden in een async path functie!

main.py
import asyncio
import time
from fastapi import FastAPI

app = FastAPI()

@app.get("/sync")
def get_sync():
    time.sleep(1)  # simulate work
    return {"message": "sync"}

@app.get("/async")
async def get_async():
    await asyncio.sleep(1)  # simulate work
    return {"message": "async"}

@app.get("/async_bad")
async def get_async_bad():
    time.sleep(1)  # simulate work - don't do this!
    return {"message": "async bad"}

Op het eerste zicht zijn alle endpoints functioneel equivalent en zien we vanuit het perspectief van de client geen verschil:

$ curl -w "\n%{time_total}s" http://127.0.0.1:8000/sync
{"message":"sync"}
1.006808s

$ curl -w "\n%{time_total}s" http://127.0.0.1:8000/async
{"message":"async"}
1.004921s

$ curl -w "\n%{time_total}s" http://127.0.0.1:8000/async_bad
{"message":"async bad"}
1.004625s

Dit is wat we verwachten. Als we echter meerdere requests tegelijk uitsturen krijgen we een heel andere ervaring. Om te testen gebruiken we Apache ab. Dit is standaard beschikbaar op MacOS - en op Linux of Windows WSL via het apache2-utils pakket.

We sturen in het totaal 100 requests (-n 100) vanuit 10 parallelle clients -c 10. Dus alsof we 10 browsers open hebben en in elke browser refreshen nadat de pagina geladen is.

$ ab -n 100 -c 10 http://127.0.0.1:8000/sync
...
Time taken for tests:   11.117 seconds
Requests per second:    9.00 [#/sec] (mean)
Time per request:       1111.684 [ms] (mean)

$ ab -n 100 -c 10 http://127.0.0.1:8000/async
...
Time taken for tests:   11.098 seconds
Requests per second:    9.01 [#/sec] (mean)
Time per request:       1109.813 [ms] (mean)

$ ab -n 100 -c 10 http://127.0.0.1:8000/async_bad
...
Time taken for tests:   100.695 seconds
Requests per second:    0.99 [#/sec] (mean)
Time per request:       10069.532 [ms] (mean)

Uit deze resultaten kunnen we perfect zien dat een gewone sleep in een async functie de volledige applicatie blokkeert.

In de vorige resultaten lijkt het verschil tussen threads and async minimaal. Drijven we echter het aantal parallelle requests verder omhoog, dan wordt het verschil erg merkbaar:

 ab -n 1000 -c 100 http://127.0.0.1:8000/sync
...
Time taken for tests:   26.211 seconds
Requests per second:    38.15 [#/sec] (mean)
Time per request:       2621.086 [ms] (mean)

$ ab -n 1000 -c 100 http://127.0.0.1:8000/async
...
Time taken for tests:   11.241 seconds
Requests per second:    88.96 [#/sec] (mean)
Time per request:       1124.130 [ms] (mean)

Dit is natuurlijk een extreem voorbeeld omdat de API logica onbestaand is. De applicatie doet effectief niets behalve 1 seconde wachten. Hierdoor slorpt het afhandelen van threads relatief gezien een heel groot deel van de totale werkdruk op.

Async Database Access

sleep is natuurlijk een weinig realistisch voorbeeld van een I/O-gebonden stap die we in een API path functie kunnen tegenkomen.

Een database query (of update) daarentegen, kunnen we typisch wel verwachten. Net zoals we asyncio.sleep moesten gebruiken i.p.v. een gewone sleep, net zo kunnen we niet de gewone sqlalchemy and sqlite (of een andere database driver) functies gebruiken.

We hebben bij sqlalchemy de asyncio optie nodig, als ook het aiosqlite package, een async implementatie van de standaard sqlite module.

uv add "sqlalchemy[asyncio]" aiosqlite

We illustreren het gebruik met de gebruikelijke “Items” API:

main.py
from contextlib import asynccontextmanager

from fastapi import Depends, FastAPI, HTTPException, status
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

# FastAPI Pydantic Model
class Item(BaseModel):
    name: str
    description: str | None = None
    price: float
    tax: float | None = None

# SQLAlchemy ORM Model
class Base(DeclarativeBase):
    pass

class StoredItem(Base):
    __tablename__ = "items"

    name: Mapped[str] = mapped_column(primary_key=True)
    description: Mapped[str | None] = mapped_column(nullable=True)
    price: Mapped[float]
    tax: Mapped[float | None] = mapped_column(nullable=True)

# Async versies van de SQLAlchemy create_engine en sessionmaker
# uit de SQLAlchemy en Flask lessen
DATABASE_URL = "sqlite+aiosqlite:///./items.db"

engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

# Een 'lifespan' in FastAPI is een contextmanager functie die wordt uitgevoerd
# bij het opstarten van de applicatie. (Zie https://fastapi.tiangolo.com/advanced/events/)
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Create database tables on startup"""
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    print("Database tables created")
    yield

app = FastAPI(lifespan=lifespan)

# Async path functies kunnen async Dependencies hebben
# Met deze Dependency geven we elke request een eigen SQLAlchemy session object
async def get_db():
    """Provides a database session for each request"""
    async with AsyncSessionLocal() as session:
        yield session

@app.post("/items/", status_code=status.HTTP_201_CREATED, response_model=Item)
async def create_item(item: Item, db: AsyncSession = Depends(get_db)):
    new_task = StoredItem(**item.model_dump())  # SQLAlchemy model!
    db.add(new_task)
    await db.commit()  # Await! Want AsyncSession.commit() is een async methode!

    return new_task

@app.get("/items/{name}", response_model=Item)
async def get_item(name: str, db: AsyncSession = Depends(get_db)):
    db_query = select(StoredItem).where(StoredItem.name == name)
    query_result = await db.scalars(db_query)
    item = query_result.one_or_none()
    if item is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
        )
    return item

Oefening Security & Concurrency

Async Database

Vertrek vanuit de “task management” oefening uit het Dependencies hoofdstuk en pas deze aan zodat tasks worden opgeslagen in een SQLite database i.p.v. de in-memory dict.

Je zult alle path functions async moeten maken.

Het TaskInternal Pydantic model heb je niet meer nodig en wordt vervangen door een SQLAlchemy model (bvb. StoredTask)

Denk er aan await te gebruiken als je async functies aanroept zoals db.get() of db.commit()!

Oplossing
main.py
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Annotated

from fastapi import (
    Cookie,
    Depends,
    FastAPI,
    HTTPException,
    Path,
    Query,
    Response,
    status,
)
from pydantic import BaseModel, Field
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.sql.functions import current_timestamp

# FastAPI Pydantic Models blijvend ongewijzigd, maar
# TaskInternal is niet meer nodig en vervangen door een SQLAlchemy model
class TaskCreate(BaseModel):
    title: Annotated[str, Field(min_length=3, examples=["Learn FastAPI"])]
    description: Annotated[str | None, Field(examples=["Use the FastAPI tutorial"])] = (
        None
    )
    priority: Annotated[int, Field(ge=1, le=5, examples=[4])]

class TaskExternal(TaskCreate):
    task_id: Annotated[int, Field(serialization_alias="id", examples=[1])]

class TaskUpdate(TaskCreate):
    title: Annotated[str | None, Field(min_length=3)] = None
    priority: Annotated[int | None, Field(ge=1, le=5)] = None

class Preference(BaseModel):
    min_priority: Annotated[int, Field(ge=1, le=5)]

TaskId = Annotated[int, Path(ge=1)]

# SQLAlchemy ORM Model voor een opgeslagen Task - vervangt TaskInternal
# Field/Column namen en datatypes zijn hetzelfde
class Base(DeclarativeBase):
    pass

class StoredTask(Base):
    __tablename__ = "todos"

    task_id: Mapped[int] = mapped_column(name="id", primary_key=True, autoincrement=True)
    title: Mapped[str]
    description: Mapped[str]
    priority: Mapped[int]
    created_at: Mapped[datetime] = mapped_column(server_default=current_timestamp())

# Async versies van de SQLAlchemy create_engine en sessionmaker
# uit de SQLAlchemy en Flask lessen
DATABASE_URL = "sqlite+aiosqlite:///./tasks.db"

engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

# Een 'lifespan' in FastAPI is een contextmanager functie die wordt uitgevoerd
# bij het opstarten van de applicatie. (Zie https://fastapi.tiangolo.com/advanced/events/)
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Create database tables on startup"""
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    print("Database tables created")
    yield

app = FastAPI(title="Task Management API", version="1.0.0", lifespan=lifespan)

# Async path functies kunnen async Dependencies hebben
# Met deze Dependency geven we elke request een eigen SQLAlchemy session object
async def get_db():
    """Provides a database session for each request"""
    async with AsyncSessionLocal() as session:
        yield session

@app.post(
    "/tasks",
    response_model=TaskExternal,
    status_code=status.HTTP_201_CREATED,
    tags=["Tasks"],
    summary="Create a new task",
)
async def create_task(task: TaskCreate, db: AsyncSession = Depends(get_db)):
    new_task = StoredTask(**task.model_dump()) # SQLAlchemy model!
    db.add(new_task)
    await db.commit() # Await! Want AsyncSession.commit() is een async methode!

    return new_task

@app.get(
    "/tasks/{task_id}",
    response_model=TaskExternal,
    tags=["Tasks"],
    summary="Get a single task",
)
async def get_task(task_id: int, db: AsyncSession = Depends(get_db)):
    task = await db.get(StoredTask, task_id)
    if task is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
        )
    return task

@app.get(
    "/tasks",
    response_model=list[TaskExternal],
    tags=["Tasks"],
    summary="List tasks",
)
async def list_tasks(
    min_priority: Annotated[int | None, Query(ge=1)] = None,
    query: Annotated[str | None, Query(alias="q", min_length=3)] = None,
    cookie_min_priority: Annotated[
        int | None, Cookie(alias="min_priority", include_in_schema=False)
    ] = None,
    db: AsyncSession = Depends(get_db),
):
    db_query = select(StoredTask)

    if min_priority is None and cookie_min_priority is not None:
        min_priority = cookie_min_priority

    if min_priority is not None:
        db_query = db_query.where(StoredTask.priority >= min_priority)

    if query:
        db_query = db_query.where(func.lower(StoredTask.title.contains(query.lower())))

    result = await db.scalars(db_query)
    return result.all()

@app.patch(
    "/tasks/{task_id}",
    response_model=TaskExternal,
    tags=["Tasks"],
    summary="Update a task",
)
async def update_task(
    task_id: int, task_data: TaskUpdate, db: AsyncSession = Depends(get_db)
):
    stored_task = await db.get(StoredTask, task_id)
    if stored_task is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
        )

    update_data = task_data.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(stored_task, field, value)

    await db.commit()
    return stored_task

@app.post(
    "/preferences",
    status_code=status.HTTP_204_NO_CONTENT,
    tags=["Preferences"],
    summary="Set user preferences",
)
async def set_preferences(pref: Preference, response: Response):
    response.set_cookie(
        key="min_priority",
        value=str(pref.min_priority),
    )

OAuth2

Voeg nu ook OAuth2 authenticatie toe zoals we gezien hebben in het security hoofdstuk. Gebruik JWT en password hashing. Users moeten ook in de database worden bewaard.

Voor het aanmaken een een test user moet je geen API path voorzien maar mag je dit zelf in de database toevoegen of via SQLAlchemy in de lifespan functie, nadat het database schema is aangemaakt (Base.metadata.create_all).

Voor de user zul je zowel een Pydantic model (User), zonder password, als een SQLalchemy mode (StoredUser), met password, nodig hebben.

De OAuth2 login path functie zal nu ook async moeten zijn wegens de dependency op get_db!

Ook de dependency waarmee een token wordt gecontroleerd moet nu async zijn. Voor deze toepassing willen we eigenlijk enkel het token controleren en hebben we het overeenkomstig User object niet nodig. Die functie hoeft dus niets te returnen.

We kunnen de dependency dan in de path decorators declareren i.p.v. in de path functies zelf

Oplossing - opdracht
main.py
from contextlib import asynccontextmanager
from datetime import datetime, timedelta, timezone
from typing import Annotated

import jwt
from fastapi import (
    Cookie,
    Depends,
    FastAPI,
    HTTPException,
    Path,
    Query,
    Response,
    status,
)
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt.exceptions import InvalidTokenError
from pwdlib import PasswordHash
from pydantic import BaseModel, Field
from sqlalchemy import func, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.sql.functions import current_timestamp

# FastAPI Pydantic Models blijvend ongewijzigd, maar
# TaskInternal is niet meer nodig en vervangen door een SQLAlchemy model
class TaskCreate(BaseModel):
    title: Annotated[str, Field(min_length=3, examples=["Learn FastAPI"])]
    description: Annotated[str | None, Field(examples=["Use the FastAPI tutorial"])] = (
        None
    )
    priority: Annotated[int, Field(ge=1, le=5, examples=[4])]

class TaskExternal(TaskCreate):
    task_id: Annotated[int, Field(serialization_alias="id", examples=[1])]

class TaskUpdate(TaskCreate):
    title: Annotated[str | None, Field(min_length=3)] = None
    priority: Annotated[int | None, Field(ge=1, le=5)] = None

class Preference(BaseModel):
    min_priority: Annotated[int, Field(ge=1, le=5)]

class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None

TaskId = Annotated[int, Path(ge=1)]

# SQLAlchemy ORM Model voor een opgeslagen Task - vervangt TaskInternal
# Field/Column namen en datatypes zijn hetzelfde
class Base(DeclarativeBase):
    pass

class StoredTask(Base):
    __tablename__ = "todos"

    task_id: Mapped[int] = mapped_column(
        name="id", primary_key=True, autoincrement=True
    )
    title: Mapped[str]
    description: Mapped[str]
    priority: Mapped[int]
    created_at: Mapped[datetime] = mapped_column(server_default=current_timestamp())

class StoredUser(Base):
    __tablename__ = "users"

    username: Mapped[str] = mapped_column(primary_key=True)
    email: Mapped[str | None] = mapped_column(nullable=True)
    full_name: Mapped[str | None] = mapped_column(nullable=True)
    password: Mapped[str]

# Async versies van de SQLAlchemy create_engine en sessionmaker
# uit de SQLAlchemy en Flask lessen
DATABASE_URL = "sqlite+aiosqlite:///./tasks.db"

engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

# JWT and Password hashing
SECRET_KEY = "2891cd0eaed8623ea769c2fc41960f40d8fd0a2bbbaddcb6af93e0ecd9ecb0e5"
ALGORITHM = "HS256"
password_hash = PasswordHash.recommended()

def verify_password(plain_password, hashed_password) -> bool:
    return password_hash.verify(plain_password, hashed_password)

def get_password_hash(password) -> str:
    return password_hash.hash(password)

# Een 'lifespan' in FastAPI is een contextmanager functie die wordt uitgevoerd
# bij het opstarten van de applicatie. (Zie https://fastapi.tiangolo.com/advanced/events/)
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Create database tables on startup"""
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    async with AsyncSessionLocal() as session:
        session.add(StoredUser(username="admin", password=get_password_hash("test")))
        try:
            await session.commit()
        except IntegrityError:
            pass
    print("Database tables created")
    yield

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI(title="Task Management API", version="1.0.0", lifespan=lifespan)

# Async path functies kunnen async Dependencies hebben
# Met deze Dependency geven we elke request een eigen SQLAlchemy session object
async def get_db():
    """Provides a database session for each request"""
    async with AsyncSessionLocal() as session:
        yield session

async def verify_token(
    token: Annotated[str, Depends(oauth2_scheme)], db: AsyncSession = Depends(get_db)
):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    try:
        decoded_jwt = jwt.decode(token, key=SECRET_KEY, algorithms=[ALGORITHM])
    except InvalidTokenError:
        raise credentials_exception
    username = decoded_jwt.get("sub")
    if username is None:
        raise credentials_exception

    user = await db.get(StoredUser, username)
    if not user:
        raise credentials_exception

@app.post("/token")
async def login(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
    db: AsyncSession = Depends(get_db),
):
    user = await db.get(StoredUser, form_data.username)
    if not (user and verify_password(form_data.password, user.password)):
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    now = datetime.now(timezone.utc)
    data = {
        "iat": now,
        "exp": now + timedelta(minutes=15),
        "sub": user.username,
        "name": user.full_name,
    }

    encoded_jwt = jwt.encode(data, key=SECRET_KEY, algorithm=ALGORITHM)
    return {"access_token": encoded_jwt, "token_type": "bearer"}

@app.post(
    "/tasks",
    response_model=TaskExternal,
    status_code=status.HTTP_201_CREATED,
    tags=["Tasks"],
    summary="Create a new task",
    dependencies=[Depends(verify_token)],
)
async def create_task(task: TaskCreate, db: AsyncSession = Depends(get_db)):
    new_task = StoredTask(**task.model_dump())  # SQLAlchemy model!
    db.add(new_task)
    await db.commit()  # Await! Want AsyncSession.commit() is een async methode!

    return new_task

@app.get(
    "/tasks/{task_id}",
    response_model=TaskExternal,
    tags=["Tasks"],
    summary="Get a single task",
    dependencies=[Depends(verify_token)],
)
async def get_task(task_id: int, db: AsyncSession = Depends(get_db)):
    task = await db.get(StoredTask, task_id)
    if task is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
        )
    return task

@app.get(
    "/tasks",
    response_model=list[TaskExternal],
    tags=["Tasks"],
    summary="List tasks",
    dependencies=[Depends(verify_token)],
)
async def list_tasks(
    min_priority: Annotated[int | None, Query(ge=1)] = None,
    query: Annotated[str | None, Query(alias="q", min_length=3)] = None,
    cookie_min_priority: Annotated[
        int | None, Cookie(alias="min_priority", include_in_schema=False)
    ] = None,
    db: AsyncSession = Depends(get_db),
):
    db_query = select(StoredTask)

    if min_priority is None and cookie_min_priority is not None:
        min_priority = cookie_min_priority

    if min_priority is not None:
        db_query = db_query.where(StoredTask.priority >= min_priority)

    if query:
        db_query = db_query.where(func.lower(StoredTask.title.contains(query.lower())))

    result = await db.scalars(db_query)
    return result.all()

@app.patch(
    "/tasks/{task_id}",
    response_model=TaskExternal,
    tags=["Tasks"],
    summary="Update a task",
    dependencies=[Depends(verify_token)],
)
async def update_task(
    task_id: int, task_data: TaskUpdate, db: AsyncSession = Depends(get_db)
):
    stored_task = await db.get(StoredTask, task_id)
    if stored_task is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
        )

    update_data = task_data.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(stored_task, field, value)

    await db.commit()
    return stored_task

@app.post(
    "/preferences",
    status_code=status.HTTP_204_NO_CONTENT,
    tags=["Preferences"],
    summary="Set user preferences",
    dependencies=[Depends(verify_token)],
)
async def set_preferences(pref: Preference, response: Response):
    response.set_cookie(
        key="min_priority",
        value=str(pref.min_priority),
    )

SQLModel

In het laatste voorbeeld heb je waarschijnlijk gemerkt dat we nu twee bijna identieke modellen hebben voor een task. Een set Pydantic modellen voor de API zelf (request/response), en een SQLAlchemy ORM model voor de database table.

Wat als we deze modellen zouden kunnen combineren? Dat is precies één van de hoofddoelstellingen van SQLModel. SQLModel staat nog in zijn kinderschoenen maar zeker de moeite waard eens te bekijken als je met FastAPI en SQLAlchemy werkt.

We gaan hier in de cursus verder niet op in maar hieronder zie je het laatste voorbeeld aangepast naar SQLModel.

uv add sqlmodel
main.py
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Annotated, ClassVar

from fastapi import (
    Cookie,
    Depends,
    FastAPI,
    HTTPException,
    Path,
    Query,
    Response,
    status,
)
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.sql.functions import current_timestamp
from sqlmodel import Field, SQLModel, col, func, select

# SQLModel schema models (no table=True, so these are pure Pydantic models)
class TaskCreate(SQLModel):
    title: Annotated[
        str, Field(min_length=3, schema_extra={"examples": ["Learn FastAPI"]})
    ]
    description: Annotated[
        str | None,
        Field(schema_extra={"examples": ["Use the FastAPI tutorial"]}),
    ] = None
    priority: Annotated[int, Field(ge=1, le=5, schema_extra={"examples": [4]})]

class TaskExternal(TaskCreate):
    task_id: Annotated[
        int,
        Field(
            sa_column_kwargs={"name": "id"},
            primary_key=True,
            serialization_alias="id",
            schema_extra={"examples": [1]},
        ),
    ]

class TaskUpdate(TaskCreate):
    title: Annotated[str | None, Field(min_length=3)] = None
    priority: Annotated[int | None, Field(ge=1, le=5)] = None

class Preference(SQLModel):
    min_priority: Annotated[int, Field(ge=1, le=5)]

TaskId = Annotated[int, Path(ge=1)]

# SQLModel table model
class StoredTask(TaskExternal, table=True):
    __tablename__: ClassVar[str] = "todos"

    created_at: datetime | None = Field(
        default=None, sa_column_kwargs={"server_default": current_timestamp()}
    )

DATABASE_URL = "sqlite+aiosqlite:///./tasks.db"

engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Create database tables on startup"""
    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)
    print("Database tables created")
    yield

app = FastAPI(title="Task Management API", version="1.0.0", lifespan=lifespan)

async def get_db():
    """Provides a database session for each request"""
    async with AsyncSessionLocal() as session:
        yield session

@app.post(
    "/tasks",
    response_model=TaskExternal,
    status_code=status.HTTP_201_CREATED,
    tags=["Tasks"],
    summary="Create a new task",
)
async def create_task(task: TaskCreate, db: AsyncSession = Depends(get_db)):
    new_task = StoredTask(**task.model_dump())
    db.add(new_task)
    await db.commit()

    return new_task

@app.get(
    "/tasks/{task_id}",
    response_model=TaskExternal,
    tags=["Tasks"],
    summary="Get a single task",
)
async def get_task(task_id: int, db: AsyncSession = Depends(get_db)):
    task = await db.get(StoredTask, task_id)
    if task is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
        )
    return task

@app.get(
    "/tasks",
    response_model=list[TaskExternal],
    tags=["Tasks"],
    summary="List tasks",
)
async def list_tasks(
    min_priority: Annotated[int | None, Query(ge=1)] = None,
    query: Annotated[str | None, Query(alias="q", min_length=3)] = None,
    cookie_min_priority: Annotated[
        int | None, Cookie(alias="min_priority", include_in_schema=False)
    ] = None,
    db: AsyncSession = Depends(get_db),
):
    db_query = select(StoredTask)

    if min_priority is None and cookie_min_priority is not None:
        min_priority = cookie_min_priority

    if min_priority is not None:
        db_query = db_query.where(StoredTask.priority >= min_priority)

    if query:
        db_query = db_query.where(
            func.lower(col(StoredTask.title)).contains(query.lower())
        )

    result = await db.scalars(db_query)
    return result.all()

@app.patch(
    "/tasks/{task_id}",
    response_model=TaskExternal,
    tags=["Tasks"],
    summary="Update a task",
)
async def update_task(
    task_id: int, task_data: TaskUpdate, db: AsyncSession = Depends(get_db)
):
    stored_task = await db.get(StoredTask, task_id)
    if stored_task is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
        )

    update_data = task_data.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(stored_task, field, value)

    await db.commit()
    return stored_task

@app.post(
    "/preferences",
    status_code=status.HTTP_204_NO_CONTENT,
    tags=["Preferences"],
    summary="Set user preferences",
)
def set_preferences(pref: Preference, response: Response):
    response.set_cookie(
        key="min_priority",
        value=str(pref.min_priority),
    )

Extras

Applicaties opsplitsen met APIRouter

Bij Flask gebruikten we Blueprints om de code van de FlaskR applicatie te organiseren. Zo hadden we bijvoorbeeld de blog.py module met een lokaal Blueprint object waarop alle authenticatie routes gedefinieerd werden.

FastAPI heeft met APIRouter een zeer gelijkaardig concept.

Een APIRouter object wordt op dezelfde manier als een FastAPI object geïnitialiseerd en gebruikt.

app/routers/users.py
from fastapi import APIRouter

router = APIRouter()

@router.get("/users/", tags=["users"])
def read_users():
    return [{"username": "Rick"}, {"username": "Morty"}]

@router.get("/users/me", tags=["users"])
def read_user_me():
    return {"username": "fakecurrentuser"}

@router.get("/users/{username}", tags=["users"])
def read_user(username: str):
    return {"username": username}

In de main module, waar de FastAPI applicatie wordt geïnitialiseerd, worden de verschillende router objecten geïmporteerd en toegevoegd aan de applicatie met de include_router methode.

app/main.py
from fastapi import FastAPI
from .routers import items, users

app = FastAPI()

app.include_router(users.router)
app.include_router(items.router)

Volledige code kan je terugvinden op https://github.com/dvx76/fastapi-tutorial en uitvoeren met:

uv sync
uv run fastapi dev app/main.py

HTML met FastAPI

Ondanks zijn naam is het met FastAPI ook perfect mogelijk gewone HTML content te sturen. FastAPI kan dus prima gebruikt worden om een eenvoudige web-applicatie te maken zoals we in de Flask lessen hebben gezien. Bij APIs kan het ook interessant zijn een aantal HTML endpoints te voorzien, bijvoorbeeld om een login pagina te presenteren, of om frontend code te serveren.

StaticFiles

Met StaticFiles kan de inhoud van een directory beschikbaar worden gemaakt via een gekozen URL path. Dit wordt typisch gebruikt om bvb. javascript of CSS bestanden via een static/ path te delen.

Een FastAPI app object kan iets mounten op een gekozen path - net zoals een filesysteem of netwerkfolder gemount wordt in Windows, MacOS of Linux.

Een StaticFiles object is een van de opties die op deze manier gemount kunnen worden. StaticFiles zelf heeft als argument de directory (absoluut, of relatief t.o.v. de huidige directory) die gedeeld moet worden.

app.mount heeft als argumenten het gewenste URL path, het StaticFiles object, en een willekeurige naam waarmee de mount intern in FastAPI aanspreekbaar blijft.

main.py
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles

app = FastAPI()

app.mount("/static", StaticFiles(directory="static"), name="static")
static/hello.html
<html>
    <head>
        <title>A Static HTML page</title>
    </head>
    <body>
        <h1>Served from /static</h1>
    </body>
</html>

De pagina is dan bereikbaar via http://127.0.0.1:8000/static/hello.html.

HTMLResponse

De path functie decorators (bvb. @app.get()) hebben een response_class parameter. Default is dit fastapi.responses.JSONResponse. Dat zorgt er o.a. voor dat een gereturnde dict of Pydantic model automatische wordt geserialiseerd naar JSON, en de Content-Type header de correct waarde van application/json krijgt.

Als we voor de response_class expliciet fastapi.responses.HTMLResponse gebruiken kan de functie een HTML string terugsturen. Ook in dit geval zal bvb. de Content-Type automatisch juist staan (test/html).

Het is weinig zinvol zo een endpoint op te nemen in de OAS, dus we kunnen opnieuw include_in_schema=False gebruiken.

main.py
from fastapi import FastAPI
from fastapi.responses import HTMLResponse

app = FastAPI()

@app.get("/items/", response_class=HTMLResponse, include_in_schema=False)
async def read_items():
    return """
    <html>
        <head>
            <title>Some HTML in here</title>
        </head>
        <body>
            <h1>Look ma! HTML!</h1>
        </body>
    </html>
    """

Om het resultaat te bekijken gaan we rechtstreeks naar http://127.0.0.1:8000/items/.

Naast JSONReponse en HTMLResponse zijn er nog verschillende andere mogelijkheden. Deze zijn allemaal subclasses van Response (dat ook rechtstreeks gebruikt kan worden). Bijvoorbeeld RedirectResponse en FileResponse.

Templates

FastAPI ondersteunt (eigenlijk via Starlette) rechtstreeks dezelfde Jinja2 templating die we al kennen van Flask. Omdat we FastAPI met de fastapi[standard] optie hebben geïnstalleerd is hebben we automatisch al de jinja2 dependency - anders zou dit package expliciet geïnstalleerd moeten worden.

Om templates te renderen gebruik je een fastapi.templating.Jinja2Templates object. Het argument is de directory (opnieuw absoluut of relatief) maar de template bestanden zich bevinden. In principe is dit één globaal object.

Om dan in een path functie een bepaalde template naar HTML te renderen roep je te TemplateResponse methode aan op dat object. Deze krijgt als argumenten altijd het Request object en de naam van de template mee. Optioneel kan je ook een context dictionary meegeven met extra data om in de template te gebruiken - net als bij Flask.

main.py
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates

app = FastAPI()

templates = Jinja2Templates(directory="templates")

@app.get("/items/{id}", response_class=HTMLResponse)
async def read_item(request: Request, id: str):
    return templates.TemplateResponse(
        request=request, name="item.html.j2", context={"id": id}
    )
templates/item.html.j2
<html>
<head>
  <title>Item Details</title>
</head>
<body>
  <h1>Header details</h1>
  {{ request.headers }}
  <h1>Item</h1>
  <a href="{{ url_for('read_item', id=id) }}">Item ID: {{ id }}</a>
</body>
</html>

Om te testen gebruiken we bvb. http://127.0.0.1:8000/items/1

FlaskR met FastAPI

Als praktisch voorbeeld bekijken we hoe de FlaskR applicatie uit de Flask lessen aangepast kan worden naar FastAPI. Om de focus op HTML en Jinja2 te houden gebruiken we geen authenticatie, statische data (i.p.v. een SQLite database) en implementeren we enkel de home page (index).

flaskr/blog.py
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates

templates = Jinja2Templates(directory="templates")

router = APIRouter(include_in_schema=False)

fake_posts_db = [
    {"id": 1, "username": "Bob", "title": "Bob is awesome", "body": "Everybody loves Bob!", "created": "2026-01-03"},
    {"id": 2, "username": "Bob", "title": "Easy title", "body": "Easy post", "created": "2026-01-02"},
    {"id": 3, "username": "Alice", "title": "Alice is awesome too", "body": "<3", "created": "2026-01-01"}
]

@router.get("/", response_class=HTMLResponse)
def home(request: Request):
    return templates.TemplateResponse(
        request=request, name="blog/index.html.j2", context={"posts": fake_posts_db}
    )

De volledige code kan je terugvinden op https://github.com/dvx76/fastapi-tutorial en uitvoeren met:

uv sync
uv run fastapi dev flaskr/main.py

We bekijken de belangrijkste verschillen in de Jinja2 templates.

url_for

url_for heeft een keyword-argument path i.p.v. filename, bvb:

- <link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}">
+ <link rel="stylesheet" href="{{ url_for('static', path='/styles.css') }}">

before_app_request en flask.g

Er is geen g object (tijdelijke request-specifieke opslag). In de plaats kunnen we Request.state gebruiken, en een Middleware functie als equivalent voor Flask’s before_app_request.

@app.middleware("http")
async def get_user(request: Request, call_next):
    request.state.user = "Fabrice"
    return await call_next(request)
- {% if g.user %}
- <li><span>{{ g.user['username'] }}</span>
- <li><a href="{{ url_for('auth.logout') }}">Log Out</a>
-   {% else %}
- <li><a href="{{ url_for('auth.register') }}">Register</a>
- <li><a href="{{ url_for('auth.login') }}">Log In</a>
- {% endif %}
+ {% if request.state.user %}
+ <li><span>{{ request.state.user }}</span>
+ <li><a href="{{ url_for('logout') }}">Log Out</a>
+ {% else %}
+ <li><a href="{{ url_for('register') }}">Register</a>
+ <li><a href="{{ url_for('login') }}">Log In</a>
+ {% endif %}

Websockets

https://fastapi.tiangolo.com/advanced/websockets/

WebSockets is een protocol waarmee een permanente, bidirectionele verbinding wordt opgezet tussen client (bvb. browser) en server. In tegenstelling tot klassieke HTTP-requests (GET, POST, …) blijft de verbinding open, waardoor beide kanten op elk moment berichten kunnen sturen zonder telkens een nieuwe request te starten.

Dit maakt WebSockets ideaal voor real-time toepassingen zoals chatapps, live notificaties of dashboards.

sequenceDiagram
    participant C as Client (Browser)
    participant S as Server (FastAPI)

    C->>S: HTTP GET (Connection: Upgrade)
    S-->>C: 101 Switching Protocols

    Note over C,S: WebSocket connection established

    C->>S: WebSocket Message
    S-->>C: WebSocket Message
    C->>S: WebSocket Message
    S-->>C: WebSocket Message

Websocket verbindingen worden typisch vanuit frontend javascript code gestart en gebruikt. Ter illustratie werken we met een eenvoudig voorbeeld waarvoor we geen specific frontend framework nodig hebben. Als eerste stap hebben we natuurlijk een path nodig om de HTML + Javascript pagina door te sturen naar de browser:

main.py
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse

app = FastAPI()

html = """
<!DOCTYPE html>
<html>
    <head>
        <title>Chat</title>
    </head>
    <body>
        <h1>WebSocket Chat</h1>
        <form action="" onsubmit="sendMessage(event)">
            <input type="text" id="messageText" autocomplete="off"/>
            <button>Send</button>
        </form>
        <ul id='messages'>
        </ul>
        <script>
1            var ws = new WebSocket("ws://localhost:8000/ws");
3            ws.onmessage = function(event) {
                var messages = document.getElementById('messages')
                var message = document.createElement('li')
                var content = document.createTextNode(event.data)
                message.appendChild(content)
                messages.appendChild(message)
            };
2            function sendMessage(event) {
                var input = document.getElementById("messageText")
                ws.send(input.value)
                input.value = ''
                event.preventDefault()
            }
        </script>
    </body>
</html>
"""

@app.get("/")
async def get():
    return HTMLResponse(html)
1
Het ws object stelt de websocket verbinding voor. Het /ws pad moeten we nog implementeren!
2
De HTML form zal deze sendMessage functie aanroepen. Die haalt het te versturen bericht uit de form en stuurt het over de websocket verbinden met ws.send()
3
Aan ws.onmessage wordt een functie toegekend die wordt aangeroepen voor elk ontvangen bericht. Die functie voegt het bericht gewoon toe aan de HTML pagina in een (initieel lege) lijst.

De HTML pagina is nu wel al bereikbaar maar er gebeurd nog niets omdat we nog geen websocket path in the FastAPI applicatie hebben. Daar zorgt de volgende code voor:

main.py
1@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
2    await websocket.accept()
3    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Message text was: {data}")
1
De app.websocket decorator maakt van deze functie een websocket path functie.
2
Met WebSocket.accept() wordt de verbinden tot stand gebracht en is deze klaar om berichten te versturen of ontvangen.
3
Een eenvoudig lus zal steeds ontvangen berichten uitlezen (WebSocket.receive_text()) en hetzelfde bericht terugsturen (WebSocket.send_text()).

Merk op dat websockets per definitie asynchroon zijn. De WebSocket class implementeert enkel async methodes. Bijgevolg moet de path functie ook async zijn.

Naast send_text() heeft WebSocket ook send_binary() en send_json() methodes.

Berichten naar jezelf sturen is natuurlijk weinig zinvol. Als we de verschillende websocket verbindingen (als WebSocket objecten) bijhouden kunnen we elk ontvangen bericht naar alle andere websocket clients sturen. Met minder dan 100 lijnen code hebben we met FastAPI een (zeer) rudimentaire chat-applicatie gemaakt:

main.py
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse

app = FastAPI()

html = """
<!DOCTYPE html>
<html>
    <head>
        <title>Chat</title>
    </head>
    <body>
        <h1>WebSocket Chat</h1>
        <form action="" onsubmit="sendMessage(event)">
            <label>Nickname: <input type="text" id="nickName" autocomplete="off" value="Anonymous Coward"/></label>
            <button onclick="connect(event)">Connect</button>
            <hr>
            <input type="text" id="messageText" autocomplete="off"/>
            <button>Send</button>
        </form>
        <span id="connected"></span>
        <ul id='messages'>
        </ul>
        <script>
        var ws = null;
            function connect(event) {
5                var nickName = document.getElementById("nickName")
                ws = new WebSocket("ws://localhost:8000/ws/" + nickName.value);
                document.querySelector("#connected").textContent = "Connected!";
                ws.onmessage = function(event) {
                    var messages = document.getElementById('messages')
                    var message = document.createElement('li')
                    var content = document.createTextNode(event.data)
                    message.appendChild(content)
                    messages.appendChild(message)
                };
                event.preventDefault()
            }
            function sendMessage(event) {
                var input = document.getElementById("messageText")
                ws.send(input.value)
                input.value = ''
                event.preventDefault()
            }
        </script>
    </body>
</html>
"""

1ws_connections: list[WebSocket] = []

@app.get("/")
async def get():
    return HTMLResponse(html)

2@app.websocket("/ws/{nickname}")
async def websocket_endpoint(websocket: WebSocket, nickname: str):
    await websocket.accept()
3    ws_connections.append(websocket)

    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"You: {data}")
4        for ws_connection in ws_connections:
            if ws_connection != websocket:
                await ws_connection.send_text(f"{nickname}: {data}")
1
We gebruiken een eenvoudige globale in-memory lijst om alle verbindingen mij te houden.
2
Om elke client een naam de geven laten we een path parameter toe. Websocket paths kunnen net als gewone HTTP paths path en/of query parameters gebruiken!
3
Eenmaal verbonden wordt het WebSocket object aan de lijst toegevoegd.
4
Ontvangen berichten kunnen nu naar elke verbinding uit de lijst worden gestuurd.
5
Om de nickname in te stellen gebruiken we een eenvoudig extra input veld