sequenceDiagram
Client->>API: POST /token [username, password]
API-->>Client: 200 {access_token: "eyJhb..."}
Client->>API: GET /users/me (Authorization: Bearer eyJhb...)
API-->>Client: 200 {username: "johndoe", ...}
FastAPI 3 - Advanced
Inleiding
In les 1 zijn we gestart met de basisconcepten van FastAPI en een introductie tot Pydantic. Tijdens les 2 kwamen alle basisconcepten van FastAPI aan bod.
In deze laatste les bekijken we een aantal meer geavanceerde features die FastAPI biedt, zoals dependencies, security an async.
Als herhaling van de laatste twee lessen bekijken we de oplossing van de laatste Task Management API oefening. We kregen de volgende requirements:
Elke Task heeft:
- Een ID
- Een titel met minimumlengte van 3 karakters
- Een optionele beschrijving
- Een prioriteit van 1 tot 5
- Een interne timestamp (
created_at) die automatisch wordt ingesteld- Is enkel voor intern gebruik en wordt nooit teruggestuurd naar de client.
API Paths:
POST /tasks: Tasks aanmaken- De aangemaakte task wordt in de respons body teruggestuurd met status code 201.
GET /tasks/{task_id}: Één task opvragenGET /tasks: Tasks oplijsten en filteren- Stuurt by default alle tasks terug in een lijst
- Query parameter
min_priority(int) beperkt de lijst tot tasks met ten minste deze priority - Query parameter
q(str) beperkt de lijst tot tasks waarbij deze string in de titel voorkomt (substring match)
PATCH /tasks/{task_id}: Task aanpassen- Past een of meerdere van de volgende velden aan:
title,description,priority(partiële update) - Stuurt aangepaste task terug
- Past een of meerdere van de volgende velden aan:
POST /preferences: Gebruikersvoorkeuren opslaan- Één voorkeur: een default minimum prioriteit die gebruikt moet worden bij het
GET /taskspath
- Één voorkeur: een default minimum prioriteit die gebruikt moet worden bij het
Voor de ‘opslag’ van de tasks een globale dictionary gebruiken.
Een volledige voorbeeldoplossing kan je bekijken op https://github.com/dvx76/fastapi-tutorial/blob/main/les2/oefening1/main.py
Dependencies
https://fastapi.tiangolo.com/tutorial/dependencies/
FastAPI heeft een ingebouwd dependency injection mechanisme. Het gebruik hiervan is niet verplicht maar het vormt samen met de functionaliteit die we al zagen wel een belangrijk pluspunt van FastAPI.
We starten met een algemene uitleg rond het concept van dependency injection.
Dependency Injection
Dependency Injection (DI) is een algemeen (specifiek noch aan FastAPI of Python) programmeertechniek waarbij een functie of object zijn afhankelijkheden niet zelf aanmaakt, maar ze van buitenaf aangeleverd krijgt.
Hierdoor verkrijgen we een lossere koppeling tussen code, waardoor deze duidelijker, flexibeler en vooral makkelijker testbaar wordt.
Voorbeeld
Stel dat je een functie moet schrijven die op basis van een “Post ID” de details van een overeenkomstige blogpost ophaalt via een API en de titel van die blogpost teruggeeft.
Een eerste implementatie kan er als volgt uitzien:
import requests
def get_post_title(post_id: int) -> str:
session = requests.Session()
response = session.get(f"https://jsonplaceholder.typicode.com/posts/{post_id}")
response.raise_for_status()
return response.json()["title"]
print(get_post_title(1))De get_post_title functie maakt zelf een requests.Session object aan om de API aan te spreken. We zeggen dat de functie afhankelijk is van (depends on) requests.Session.
Hierdoor is de functie sterk gekoppeld aan requests.Session en is het testen van de eigenlijke logica in de functie problematisch. Tijdens testen willen we namelijk geen echte API call uitvoeren, maar wel met voorspelbare test-data werken.
De oplossing is om het requests.Session object (de dependency) mee te geven aan de functie (te injecteren):
import requests
def get_post_title(post_id: int, session: requests.Session) -> str:
response = session.get(f"https://jsonplaceholder.typicode.com/posts/{post_id}")
response.raise_for_status()
return response.json()["title"]
session = requests.Session()
print(get_post_title(1, session))Testen kan nu vrij eenvoudig door een dummy object mee te geven bij het aanroepen van de functie.
Hieronder zie je een eenvoudig functioneel voorbeeld (uitvoerbaar met uv run pytest test.py) van hoe je dit kan doen.
test.py
from dataclasses import dataclass
import requests
def get_post_title(post_id: int, session: requests.Session) -> str:
response = session.get(f"https://jsonplaceholder.typicode.com/posts/{post_id}")
response.raise_for_status()
return response.json()["title"]
@dataclass
class DummySession:
get_response: requests.Response
def get(self, url: str):
return self.get_response
def test_get_post_title_success():
dummy_response = requests.Response()
dummy_response.status_code = 200
dummy_response._content = b'{"title": "foo"}'
dummy_session = DummySession(get_response=dummy_response)
assert get_post_title(1, dummy_session) == "foo"DI Frameworks
I.p.v. “handmatig” verschillende dependencies samen te stellen (zoals we in bovenstaand voorbeeld eerst zelf een requests.Session object hebben gemaakt) kan ook gebruik worden gemaakt van een zogenaamd dependency-injection framework.
Dependency-injector is een populaire optie. We gaan hier niet veel verder op in maar bekijken wel het concept.
Het vorig voorbeeld kan met dependency_injector als volgt herschreven worden. Het belangrijkste concept is dat het session functieargument nu een default waarde krijgt. Het DI framework maakt automatisch een requests.Session object aan en gebruikt hetzelfde object voor elke get_post_title aanroep.
from dependency_injector import containers, providers
from dependency_injector.wiring import inject, Provide
class Container(containers.DeclarativeContainer):
session = providers.Singleton(requests.Session)
@inject
def get_post_title(
post_id: int,
session: requests.Session = Provide[Container.session]
) -> str:
response = session.get(f"https://jsonplaceholder.typicode.com/posts/{post_id}")
response.raise_for_status()
return response.json()["title"]Eenvoudige DI met FastAPI
FastAPI heeft een eigen dependency injection mechanisme dat, zoals vrijwel alles in FastAPI, gebaseerd is op type hints en geannoteerde types.
In het volgende voorbeeld worden gebruikers opgehaald vanuit een externe API. De logica en foutafhandeling om deze externe gebruikers op te halen kan uitgewerkt worden in een aparte functie. Die functie kan dan als dependency in de path functies van onze eigen API gebruikt worden.
main.py
import requests
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status
def get_external_users() -> list[dict]:
response = requests.get("https://jsonplaceholder.typicode.com/users")
try:
response.raise_for_status()
except requests.HTTPError:
2 raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to retrieve users",
)
return response.json()
app = FastAPI()
@app.get("/users")
1def list_users(external_users: Annotated[list[dict], Depends(get_external_users)]):
return external_users
@app.get("/users/{user_id}")
def get_user(
3 user_id: int, external_users: Annotated[list[dict], Depends(get_external_users)]
):
try:
return next(u for u in external_users if u["id"] == user_id)
except StopIteration:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)- 1
-
Dependskrijgt een functienaam (een “Callable”) als argument. Deze functie wordt dynamisch uitgevoerd om de waarde van hetexternal_usersargument vanlist_users(enget_user) te bepalen. - 2
-
Vanuit de dependency functie kan een
HTTPExceptiongeraised worden die de uitvoering zal onderbreken - alsof de exception rechtstreeks uit de path functie komt. - 3
- Andere path functies kunnen dezelfde dependency hergebruiken.
Request Informatie gebruiken in Dependencies
In het vorige voorbeeld heeft de dependency-functie (get_external_users) geen argumenten. Heeft die functie wel argumenten dan werken die alsof ze in de eigenlijke path functie zouden staan!
M.a.w. een path of query parameter kan rechtstreeks in een dependency functie als parameter worden gebruikt.
Bovendien kunnen dependencies zelf ook dependencies hebben. Beide concepten illustreren we door het vorig voorbeeld aan te passen zonder de functionaliteit te wijzigen.
main.py
import requests
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status
def get_external_users() -> list[dict]:
response = requests.get("https://jsonplaceholder.typicode.com/users")
try:
response.raise_for_status()
except requests.HTTPError:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to retrieve users",
)
return response.json()
def get_external_user(
2 user_id: int, external_users: Annotated[list[dict], Depends(get_external_users)]
) -> dict:
try:
return next(u for u in external_users if u["id"] == user_id)
except StopIteration:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
app = FastAPI()
@app.get("/users")
def list_users(
external_users: Annotated[list[dict], Depends(get_external_users)],
):
return external_users
@app.get("/users/{user_id}")
1def get_user(user_details: Annotated[dict, Depends(get_external_user)]):
return user_details- 1
-
Deze path functie heeft nu zijn eigen
user_idpath parameter niet meer als argument, maar wel een dependency op een nieuwe functie die onmiddellijk de details van 1 gebruiker teruggeeft. - 2
-
Die nieuwe functie kan rechtstreeks de
user_idpath parameter gebruiken, en bovendien zelf nog een dependency hebben, om de volledige lijst met gebruikers te verkrijgen.
Globale en Decorator Dependencies
Stel dat we voor een aantal operaties een security-token verwachten in een bepaalde request header. Dit kan eenvoudig gecontroleerd worden met een dependency functie, die dan in de verschillende operatie functies kan gebruikt worden. Bijvoorbeeld:
main.py
from typing import Annotated
from fastapi import Depends, FastAPI, Header, HTTPException, status
app = FastAPI()
def verify_token(x_token: Annotated[str, Header()]):
if x_token != "fake-super-secret-token":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"
)
@app.get("/items/")
def read_items(authorized: Annotated[None, Depends(verify_token)]):
return [{"item": "Foo"}, {"item": "Bar"}]Alleen is er geen return value voor de verify_token functie. Daardoor zijn we verplicht te annoteren met None als type, en zitten we bovendien met een ongebruikt authorized argument in de operatie functie. Weinig elegant.
In de plaats kunnen we dit soort dependencies rechtstreeks als argument meegeven in de path decorator:
main.py
We kunnen nog een stap verder gaan en beslissen dat een dependency van toepassing is op alle operaties door deze mee te geven aan de FastAPI applicatie zelf:
Dependencies met Yield
Het FastAPI dependency systeem is ook ideaal om een database client/connection beschikbaar te maken in path functies. Maar in de meeste gevallen willen we op het einde van de request die client of connection terug netjes beëindigen.
FastAPI ondersteunt dit door in dependency functies yield i.p.v. return te gebruiken. Om zeker te zijn dat de code na de yield altijd uitgevoerd zal worden (bvb. in geval van een exception) kan een try/finally constructie gebruikt worden.
main.py
from typing import Annotated
from fastapi import Depends, FastAPI
app = FastAPI()
class DummyDB:
def query(self, statement: str) -> list[str]:
return [statement]
def close(self) -> None:
print("Closing DB Connection")
def get_db():
db = DummyDB()
try:
yield db
finally:
db.close()
@app.get("/items/")
def read_items(db: Annotated[DummyDB, Depends(get_db)]):
return db.query("foo")Oefening
We gebruiken het dependency concept van FastAPI om de code uit de oefening van les 2 te verbeteren.
Welke bestaande code vormt een goede kandidaat om naar een dependency om te zetten? Tip: we zoeken code die in verschillende path functies herhaald wordt.
Oplossing - opdracht
De code die een task in TASKS_DB opzoekt en een exception raised als de task niet bestaat wordt gebruikt in zowel get_task() als in update_task.
try:
return TASKS_DB[task_id]
except KeyError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
)TaskInternal i.p.v. een task_id (int).
Oplossing - code
main.py
from datetime import datetime, timezone
from fastapi import (Cookie, Depends, FastAPI, HTTPException, Path, Query, Response, status)
from pydantic import BaseModel, Field
from typing_extensions import Annotated
app = FastAPI(title="Task Management API", version="1.0.0")
TASKS_DB: dict[int, "TaskInternal"] = {}
class TaskCreate(BaseModel):
title: Annotated[str, Field(min_length=3, examples=["Learn FastAPI"])]
description: Annotated[str | None, Field(examples=["Use the FastAPI tutorial"])] = (
None
)
priority: Annotated[int, Field(ge=1, le=5, examples=[4])]
class TaskExternal(TaskCreate):
task_id: Annotated[int, Field(serialization_alias="id", examples=[1])]
class TaskInternal(TaskExternal):
created_at: Annotated[
datetime, Field(default_factory=lambda: datetime.now(tz=timezone.utc))
]
class TaskUpdate(TaskCreate):
title: Annotated[str | None, Field(min_length=3)] = None
priority: Annotated[int | None, Field(ge=1, le=5)] = None
class Preference(BaseModel):
min_priority: Annotated[int, Field(ge=1, le=5)]
TaskId = Annotated[int, Path(ge=1)]
1def get_task_from_db(task_id: TaskId) -> TaskInternal:
try:
return TASKS_DB[task_id]
except KeyError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
)
@app.post(
"/tasks",
response_model=TaskExternal,
status_code=status.HTTP_201_CREATED,
tags=["Tasks"],
summary="Create a new task",
)
def create_task(task: TaskCreate):
next_task_id = max(TASKS_DB.keys(), default=0) + 1
new_task = TaskInternal(task_id=next_task_id, **task.model_dump())
TASKS_DB[next_task_id] = new_task
return new_task
@app.get(
"/tasks/{task_id}",
response_model=TaskExternal,
tags=["Tasks"],
summary="Get a single task",
)
2def get_task(task: Annotated[TaskInternal, Depends(get_task_from_db)]):
return task
@app.get(
"/tasks", response_model=list[TaskExternal], tags=["Tasks"], summary="List tasks"
)
def list_tasks(
min_priority: Annotated[int | None, Query(ge=1)] = None,
query: Annotated[str | None, Query(alias="q", min_length=3)] = None,
cookie_min_priority: Annotated[
int | None, Cookie(alias="min_priority", include_in_schema=False)
] = None,
):
results = TASKS_DB.values()
if min_priority is None and cookie_min_priority is not None:
min_priority = cookie_min_priority
if min_priority is not None:
results = [t for t in results if t.priority >= min_priority]
if query:
results = [t for t in results if query.lower() in t.title.lower()]
return results
@app.patch(
"/tasks/{task_id}",
response_model=TaskExternal,
tags=["Tasks"],
summary="Update a task",
)
def update_task(
task: Annotated[TaskInternal, Depends(get_task_from_db)], task_data: TaskUpdate
):
update_data = task_data.model_dump(exclude_unset=True)
updated_task = task.model_copy(update=update_data)
TASKS_DB[task.task_id] = updated_task
return updated_task
@app.post(
"/preferences",
status_code=status.HTTP_204_NO_CONTENT,
tags=["Preferences"],
summary="Set user preferences",
)
def set_preferences(pref: Preference, response: Response):
response.set_cookie(
key="min_priority",
value=str(pref.min_priority),
)- 1
-
De nieuwe functie krijgt een
inttask_id en geeft eenTaskInternalterug - 2
- De nieuwe functie gebruiken we als dependency in de path functies
Security
https://fastapi.tiangolo.com/tutorial/security/
Basic HTTP Authentication
Bij Flask and Connexion gebruikten we een eenvoudige Basic HTTP authenticatie. De request bevat dan een Authorization header met daarin een username en password (als een base64 username:password string).
Ter vergelijking implementeren we eerst diezelfde aanpak met FastAPI. Daarna bekijken we een betere en modernere optie.
Ook voor authenticatie gebruikt FastAPI het dependency injection mechanisme en voorziet handige helpers:
main.py
from typing import Annotated
from fastapi import Depends, FastAPI
from fastapi.security import HTTPBasic, HTTPBasicCredentials
app = FastAPI()
1http_basic_scheme = HTTPBasic()
@app.get("/users/me")
def read_current_user(
2 credentials: Annotated[HTTPBasicCredentials, Depends(http_basic_scheme)],
):
return {"username": credentials.username, "password": credentials.password}- 1
-
Het
HTTPBasicbasic object zal deAuthorizationheader verwerken en geeft eenHTTPBasicCredentialsmodel terug. - 2
-
HTTPBasicCredentialsis een eenvoudig model met eenusernameenpasswordveld. Incredentialskomt dus zo een model en de dependency opHTTPBasiczal dit invullen
We breiden dit voorbeeld verder uit met een dummy user database waarmee we requests zullen authentificeren. Voor een eenvoud laten we password hashing nog even achterwege.
main.py
import secrets
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from pydantic import BaseModel
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"password": "secret",
}
}
1class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
class UserInDB(User):
password: str
app = FastAPI()
http_basic_scheme = HTTPBasic()
def get_user(username: str) -> UserInDB | None:
if username in fake_users_db:
user_dict = fake_users_db[username]
return UserInDB(**user_dict)
4def get_current_user(
credentials: Annotated[HTTPBasicCredentials, Depends(http_basic_scheme)],
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Basic"},
)
user = get_user(credentials.username)
if not user:
raise credentials_exception
if not secrets.compare_digest(credentials.password, user.password):
raise credentials_exception
return user
2@app.get("/users/me/", response_model=User)
def read_users_me(
3 current_user: Annotated[UserInDB, Depends(get_current_user)],
):
return current_user- 1
- Een eenvoudig model van de gegevens van een gebruiker. We definiëren een apart model dat ook het password bevat. Dit laatste zullen we gebruiken voor de authenticatie-code maar in responses willen we natuurlijk nooit het password meesturen!
- 2
-
Belangrijk: door
Userals respons model te gebruiken verplichten we FastAPI om enkel die velden terug te sturen. Ook al bevatcurrent_user(zie volgende stap) eenUserInDB! - 3
-
Een dependency zal voor authenticatie zorgen en indien succesvol een
UserInDBmeegeven incurrent_user. - 4
-
De
get_current_userdependency werkt op dezelfde manier als het vorige voorbeeld om HTTP Basic credentials uit de request te halen. De username wordt opgezocht in de (fake) database en het password vergeleken.
OAuth2
OAuth2 (rfc 6749) is een erg uitgebreid framework voor autorisatie en (in combinatie met OpenID Connect) authenticatie. Een typisch gebruiksscenario voor dat je zeker zelf al hebt gebruikt is het inloggen op een website via je bestaande Google, Facebook, GitHub, e.a. account.
OAuth2 komt in principe ook aan bod in de security module van de opleiding. We focussen hier bijgevolg op het gebruik in een FastAPI backend.
Wil je toch al meer leren over OAuth2 dan kan ik volgende links zeker aanraden:
Het scenario (OAuth2 gebruikt de term flow) waar we mee verdergaan is de Resource Owner Password Flow, ook wel Password Grant of gewoon Password Flow. Hierbij stuurt de client (bvb. een javascript frontend webapp, mobile app, …) via een Form de username/password naar de backend server en krijgt als antwoord een access token. Voor alle volgende requests stuurt de client dan dit token (in een Authorization header) mee.
Dat access token is een zogenaamd JWT, of JSON Web Token. Een gestandaardiseerd formaat om op een veilige manier JSON data door te sturen. De inhoud is niet geëncrypteerd, maar wel getekend (signed). Daarmee kan gecontroleerd worden dan een JWT niet is aangepast.
FastAPI en OAuth2
Net als voor Basic HTTP Auth bevat FastAPI handige hulpmiddelen om met OAuth2 te werken - en opnieuw ook automatisch integratie met OpenAPI.
OAuth2 Scheme
We vertrekken vanuit het vorige voorbeeld met HTTP Basic Auth maar i.p.v. fastapi.security.HTTPBasic gebruiken we fastapi.security.OAuth2PasswordBearer.
We passen verder zo weinig mogelijk aan en kijken wat er gebeurd. Alle andere code blijft staan.
main.py
from fastapi.security import OAuth2PasswordBearer
# Bestaande code blijft staan: fake_users_db, User models, ...
# http_basic_scheme = HTTPBasic()
1oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
2def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
3 return fake_users_db["johndoe"]- 1
-
I.p.v. een
HTTPBasicobject maken we eenOAuth2PasswordBearerobject.tokenUrlbepaalde het API route om eenaccess_tokente verkrijgen. Dit implementeren we later. - 2
-
De dependency geeft nu een gewone
strterug (token uit deAuthorizationheader!) i.p.v. eenHTTPBasicCredentialsobject. We doen nog even niets met dit token. - 3
-
We hardcoden tijdelijk een specifieke user als return value.
credentials_exceptionzullen we nog nodig hebben dus laten we staan.
Als we nu proberen autoriseren vanuit Swagger UI krijgen we een fout: auth errorError: Not Found. In de FastAPI logs zien we een request voor een path dat we nog niet geïmplementeerd hebben: "POST /token HTTP/1.1" 404.
Token Path
Het POST /token path zal een Form doorgestuurd krijgen met de username/password. FastAPI voorziet al een eigen dependency class voor deze form: fastapi.security.OAuth2PasswordBearer. Deze definieert naast username en password nog een aantal andere velden eigen aan OAuth2, zoals scope en client_id, maar deze worden hier niet gebruikt.
Dit path moet natuurlijk de username en password valideren. We laten het path eerst tijdelijk de username terugsturen als “access_token”. Later passen we dit aan naar een “echt” token.
Bijgevolg kunnen we in get_current_user het token gebruiken om de juiste user te vinden in fake_users_db.
main.py
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
# Bestaande code blijft staan: fake_users_db, User models, ...
def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return fake_users_db[token]
@app.post("/token")
def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
user = get_user(form_data.username)
if not (user and form_data.password == user.password):
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}Echte JWT tokens
Ter herinnering, een JWT, of JSON Web Token, is een gestandaardiseerd formaat om op een veilige manier JSON data door te sturen. De inhoud is niet geëncrypteerd, maar wel getekend (signed). Daarmee kan gecontroleerd worden dat een JWT niet is aangepast.
Bijvoorbeeld:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
Op de website https://www.jwt.io kan je JWTs decoden en de inhoud bekijken.
Om in Python met JWTs te werken hebben we het PyJWT package nodig. Dit moeten we dus eerst installeren.
Voordat we dit gaan gebruiken in onze FastAPI applicatie bekijken we een voorbeeldje om de werking van PyJWT te illustreren.
Om data te encoderen (serialiseren) in een JWT moet een algoritme worden gekozen en een bijhorende geheime sleutel. Hier zullen we met het HMAC SHA256 algoritme werken. Daarvoor hebben we een 256 bit (= 32 byte of 64 hexadecimale karakters) string nodig als sleutel.
jwt_test.py
import secrets
from datetime import datetime, timedelta, timezone
import jwt
1SECRET_KEY = secrets.token_hex(32)
2ALGORITHM = "HS256"
print(f"Using secret key = {SECRET_KEY}")
now = datetime.now(timezone.utc)
data = {
"sub": "johndoe",
"name": "John Doe",
"iat": now,
"exp": now + timedelta(minutes=15),
}
3encoded_jwt = jwt.encode(data, key=SECRET_KEY, algorithm=ALGORITHM)
print(f"Encoded JWT = {encoded_jwt}")
4decoded_jwt = jwt.decode(encoded_jwt, key=SECRET_KEY, algorithms=[ALGORITHM])
print(f"Decoded JWT = {decoded_jwt}")- 1
-
Met
secrets.token_hex(32)krijgen we een random 32-byte hexadecimale string die we als sleutel kunnen gebruiken. In een productie-applicatie moeten we hiervoor natuurlijk een vaste string gebruiken die we bvb. vanuit de configuratie inlezen. - 2
-
Het algoritme is
HS256, ofwel HMAC met SHA256. - 3
-
Met
jwt.encodekan eendictgeëncodeerd worden naar een JWT string. - 4
-
Omgekeerd kan met
jwt.decodeeen JWT string terug gedecodeerd worden.
sub (Subject), iat (Issued At) en exp (Expiry) zijn gestandaardiseerde namen van JWT velden. Deze zijn gedefinieerd in de JWT RFC 7519).
Daarnaast kunnen ook extra velden gebruikt worden (zoals hier: name) die specifiek zijn aan de applicatie.
Nu kunnen we voor de FastAPI applicatie in de login functie een echt JWT token encoderen en terugsturen, en in de get_current_user functie datzelfde token decoderen en op basis van de inhoud de juiste gebruikersdata opzoeken.
main.py
from datetime import datetime, timedelta, timezone
import jwt
from jwt.exceptions import InvalidTokenError
1SECRET_KEY = "2891cd0eaed8623ea769c2fc41960f40d8fd0a2bbbaddcb6af93e0ecd9ecb0e5"
ALGORITHM = "HS256"
def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
3 try:
decoded_jwt = jwt.decode(token, key=SECRET_KEY, algorithms=[ALGORITHM])
except InvalidTokenError:
raise credentials_exception
4 username = decoded_jwt.get("sub")
if username is None:
raise credentials_exception
5 user = get_user(username)
if not user:
raise credentials_exception
return user
@app.post("/token")
def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
user = get_user(form_data.username)
if not (user and form_data.password == user.password):
raise HTTPException(status_code=400, detail="Incorrect username or password")
now = datetime.now(timezone.utc)
data = {
"iat": now,
"exp": now + timedelta(minutes=15),
2 "sub": user.username,
"name": user.full_name,
}
encoded_jwt = jwt.encode(data, key=SECRET_KEY, algorithm=ALGORITHM)
return {"access_token": encoded_jwt, "token_type": "bearer"}- 1
-
Nogmaals, voor een productie-omgeving zouden we een vaste
SECRET_KEYuit configuratie inlezen. - 2
-
Als
subin de JWT gebruiken we hier de username, want dat is de key in defake_users_dbdictionary. In een echt applicatie, gekoppeld aan een database, zou dit een unieke ID geassocieerd aan de user kunnen zijn. - 3
-
Als er een probleem optreed tijdens het decoderen van de JWT wordt een HTTP 401 (Unauthorized) terug gestuurd. De
WWW-Authenticateheader geeft aan dat een Authorization header met een Bearer token in de request verwacht wordt. - 4
-
Als geen
subveld in de JWT staat wordt dezelfde fout teruggestuurd. - 5
-
Hetzelfde geldt als de waarde van die
subniet gevonden wordt in defake_users_db.
Merk op dat we zelf geen code hebben geschreven die deze velden controleert. Dit gebeurt automatisch door de jwt.decode functie. Deze zal een exception geven als de huidige tijd vroeger is dan de iat, of later dan de exp!
Password Hashing
Op zich is dit niet specifiek aan OAuth2 (ofo FastAPI), maar wachtwoorden gebruiken we natuurlijk best nooit rechtstreeks.
Een hash is een wiskundige functie die voor een bepaalde input een unieke string met vaste lengte (bvb. 64 karakters) geeft. De kleinste wijziging van de input zorgt voor een volledig verschillende hash-waarde.
Een hash-waarde berekening gebeurt relatief snel, terwijl het afleiden van de oorspronkelijke input op basis van die hash-waarde heel erg lang zou duren.
Bij wachtwoorden kunnen we dus enkel de hash-waarde opslaan. Op die manier kan het echte wachtwoord niet achterhaald worden, maar kunnen we wel controleren of een bepaalde input overeenkomt met dat wachtwoord.
Tijdens de Flask lessen hebben we functies (check_password_hash/generate_password_hash) gebruikt die Flask (eigenlijk het onderliggende Werkzeug) zelf voorziet.
FastAPI (of Starlette) voorziet zelf zo geen functies maar raadt het gebruik van het pwdlib package aan. Dit ondersteunt verschillende hashing algoritmes en FastAPI raadt het moderne Argon2 aan.
Om met pwdlib wachtwoorden te kunnen hashen en controleren voegen we volgende code toe:
main.py
Om te testen, en de juiste hash voor het “secret” wachtwoord te berekenen, kunnen we het volgende uitvoeren:
$ uv run python
>>> from main import get_password_hash, verify_password
>>> get_password_hash("secret")
'$argon2id$v=19$m=65536,t=3,p=4$o8Zcvomlo/VTC+jJ7D1SiQ$hXfZ835HmErM0C8SnUTPLX2+L4DGDFwmF6uWPfMkVik'
>>> get_password_hash("secrit")
'$argon2id$v=19$m=65536,t=3,p=4$a+jJ+am07S/ccgnXSxYbKQ$hzIWtNlqTAmXQny658qF6qCgXQJKtuAg4Qk02jW5ksg'Nu moeten we nog in fake_users_db het gehashte wachtwoord opslaan, en in de login functie verify_password gebruiken:
main.py
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"password": "$argon2id$v=19$m=65536,t=3,p=4$o8Zcvomlo/VTC+jJ7D1SiQ$hXfZ835HmErM0C8SnUTPLX2+L4DGDFwmF6uWPfMkVik",
}
}
# ...
@app.post("/token")
def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
user = get_user(form_data.username)
if not (user and verify_password(form_data.password, user.password)):
raise HTTPException(status_code=400, detail="Incorrect username or password")
# ...Concurrency en async/await
Async (asynchroon) programmeren in Python komt uitgebreid aan bod in de module REST API. We bekijken hier heel kort het concept, zodat we het kunnen toepassen in het kader van FastAPI.
Synchrone Threads
Een traditioneel Python programma is synchroon (sequentieel). Functies worden de een na de ander uitgevoerd tot het programma eindigt. Dit vormt een probleem wanneer het programma moet wachten op iets externs zoals bijvoorbeeld het inlezen van een bestand, het antwoord op een API request of het resultaat van een database query. Tijdens dat wachten kan er niets anders gebeuren.
Dit is een groot probleem voor applicaties waarvan we verwachten dat ze verschillende zaken tegelijk uitvoeren. Zoals een web applicatie die verschillende HTTP requests tegelijk kan afhandelen. Stel dat de volledige afhandeling van een enkele request ongeveer 100ms duurt, dan zou een puur synchrone applicatie nooit meer dan 10 requests per seconde aankunnen!
Een eerste oplossing, waar we tot nu toe onbewust gebruik van hebben gemaakt, zijn threads. Een thread is een aparte uitvoeringslijn binnen één Python-proces, met een eigen call stack maar gedeeld geheugen, waardoor meerdere stukken code (zoals het afhandelen van verschillende HTTP-requests) schijnbaar tegelijk kunnen worden uitgevoerd. Het addertje onder het gras is dat we als programmeurs zelf geen controle hebben over welke thread op een bepaald moment wordt uitgevoerd. Dat bepaalt het besturingssysteem.
De Flask, Connexion en FastAPI voorbeelden die we tot nu hebben gezien werken op basis van threads. Voor elke nieuwe HTTP request wordt een aparte thread gestart.
Async
Met FastAPI kunnen we ook op een andere manier werken: async. Bij async hebben we maar één thread maar kan een volgende request verwerkt worden terwijl een eerste request moet wachten, bijvoorbeeld op het resultaat van een database query.
Vergelijk het met de kassier bij McDonalds die je bestelling opneemt. Als de gekozen producten niet allemaal klaarliggen geeft hij je bestelling door aan de keuken. De bereiding kan even duren tussen ondertussen kan de kassier alvast de volgende bestelling verwerken. Als de keuken klaar kan de kassier jou bestelling verder afwerken.
In deze analogie is de kassier een FastAPI app en de keuken bijvoorbeeld een SQL database. Terwijl de database een ingewikkelde query uitvoert hoeft de FastAPI path functie niet te blijven wachten en kan een andere request verwerkt worden.
Om in Python aan te geven dat een functie asynchroon is gebruik je bij de functiedeclaratie async def func_name(), i.p.v. gewoon def func_name(). Om een async functie aan te roepen gebruik je await func_name() i.p.v. gewoon func_name().
Maar…
- Een async functie kan enkel worden aangeroepen (m.v.b.
await) door een andere async functie. Er wordt soms gesproken over gekleurde functies. - In een async functie mag je zeker geen gewone I/O-gebonden functies gebruiken.
- Dit zou immers de volledige applicatie ‘blokkeren’
Daarom voorziet de Python standard library in de asyncio module async versies van functies wie we al kunnen, bvb. sleep(5) -> asyncio.sleep(5). Externe packages bouwen hierop verder om async versies aan te bieden, bvb.
aiofilesaiofiles.open("filename")i.p.v.open("filename")
httpxi.p.v.requestsasyncpgi.p.v.psycopg2
Zoals eerder aangehaald komen zowel multi-threading als async uitgebreid aan bod in de module REST API. Wil je dit onderwerp ondertussen toch wat verder uitdiepen bekijk dan zeker eens het Concurrency and async / await hoofdstuk in de FastAPI documentatie, of nog deze video van Corey Shafer: Python Tutorial: AsyncIO - Complete Guide to Asynchronous Programming with Animations.
FastAPI async path functies
Zoals we ondertussen gewend zijn maakt FastAPI het ons erg gemakkelijk:
- Sync (threaded) en async path functies kunnen tegelijk en door elkaar gebruikt worden binnen dezelfde FastAPI app.
- Om async te gebruiken hoeven we enkel de path functie async te maken.
In onderstaand voorbeeld illustreren we het verschil. We demonstreren tegelijk wat er gebeurt wanneer niet-async blokkerende functies (zoals sleep) gebruikt worden in een async path functie!
main.py
import asyncio
import time
from fastapi import FastAPI
app = FastAPI()
@app.get("/sync")
def get_sync():
time.sleep(1) # simulate work
return {"message": "sync"}
@app.get("/async")
async def get_async():
await asyncio.sleep(1) # simulate work
return {"message": "async"}
@app.get("/async_bad")
async def get_async_bad():
time.sleep(1) # simulate work - don't do this!
return {"message": "async bad"}Op het eerste zicht zijn alle endpoints functioneel equivalent en zien we vanuit het perspectief van de client geen verschil:
$ curl -w "\n%{time_total}s" http://127.0.0.1:8000/sync
{"message":"sync"}
1.006808s
$ curl -w "\n%{time_total}s" http://127.0.0.1:8000/async
{"message":"async"}
1.004921s
$ curl -w "\n%{time_total}s" http://127.0.0.1:8000/async_bad
{"message":"async bad"}
1.004625sDit is wat we verwachten. Als we echter meerdere requests tegelijk uitsturen krijgen we een heel andere ervaring. Om te testen gebruiken we Apache ab. Dit is standaard beschikbaar op MacOS - en op Linux of Windows WSL via het apache2-utils pakket.
We sturen in het totaal 100 requests (-n 100) vanuit 10 parallelle clients -c 10. Dus alsof we 10 browsers open hebben en in elke browser refreshen nadat de pagina geladen is.
$ ab -n 100 -c 10 http://127.0.0.1:8000/sync
...
Time taken for tests: 11.117 seconds
Requests per second: 9.00 [#/sec] (mean)
Time per request: 1111.684 [ms] (mean)
$ ab -n 100 -c 10 http://127.0.0.1:8000/async
...
Time taken for tests: 11.098 seconds
Requests per second: 9.01 [#/sec] (mean)
Time per request: 1109.813 [ms] (mean)
$ ab -n 100 -c 10 http://127.0.0.1:8000/async_bad
...
Time taken for tests: 100.695 seconds
Requests per second: 0.99 [#/sec] (mean)
Time per request: 10069.532 [ms] (mean)Uit deze resultaten kunnen we perfect zien dat een gewone sleep in een async functie de volledige applicatie blokkeert.
In de vorige resultaten lijkt het verschil tussen threads and async minimaal. Drijven we echter het aantal parallelle requests verder omhoog, dan wordt het verschil erg merkbaar:
❯ ab -n 1000 -c 100 http://127.0.0.1:8000/sync
...
Time taken for tests: 26.211 seconds
Requests per second: 38.15 [#/sec] (mean)
Time per request: 2621.086 [ms] (mean)
$ ab -n 1000 -c 100 http://127.0.0.1:8000/async
...
Time taken for tests: 11.241 seconds
Requests per second: 88.96 [#/sec] (mean)
Time per request: 1124.130 [ms] (mean)Dit is natuurlijk een extreem voorbeeld omdat de API logica onbestaand is. De applicatie doet effectief niets behalve 1 seconde wachten. Hierdoor slorpt het afhandelen van threads relatief gezien een heel groot deel van de totale werkdruk op.
Async Database Access
sleep is natuurlijk een weinig realistisch voorbeeld van een I/O-gebonden stap die we in een API path functie kunnen tegenkomen.
Een database query (of update) daarentegen, kunnen we typisch wel verwachten. Net zoals we asyncio.sleep moesten gebruiken i.p.v. een gewone sleep, net zo kunnen we niet de gewone sqlalchemy and sqlite (of een andere database driver) functies gebruiken.
We hebben bij sqlalchemy de asyncio optie nodig, als ook het aiosqlite package, een async implementatie van de standaard sqlite module.
We illustreren het gebruik met de gebruikelijke “Items” API:
main.py
from contextlib import asynccontextmanager
from fastapi import Depends, FastAPI, HTTPException, status
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
# FastAPI Pydantic Model
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None
# SQLAlchemy ORM Model
class Base(DeclarativeBase):
pass
class StoredItem(Base):
__tablename__ = "items"
name: Mapped[str] = mapped_column(primary_key=True)
description: Mapped[str | None] = mapped_column(nullable=True)
price: Mapped[float]
tax: Mapped[float | None] = mapped_column(nullable=True)
# Async versies van de SQLAlchemy create_engine en sessionmaker
# uit de SQLAlchemy en Flask lessen
DATABASE_URL = "sqlite+aiosqlite:///./items.db"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
# Een 'lifespan' in FastAPI is een contextmanager functie die wordt uitgevoerd
# bij het opstarten van de applicatie. (Zie https://fastapi.tiangolo.com/advanced/events/)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Create database tables on startup"""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
print("Database tables created")
yield
app = FastAPI(lifespan=lifespan)
# Async path functies kunnen async Dependencies hebben
# Met deze Dependency geven we elke request een eigen SQLAlchemy session object
async def get_db():
"""Provides a database session for each request"""
async with AsyncSessionLocal() as session:
yield session
@app.post("/items/", status_code=status.HTTP_201_CREATED, response_model=Item)
async def create_item(item: Item, db: AsyncSession = Depends(get_db)):
new_task = StoredItem(**item.model_dump()) # SQLAlchemy model!
db.add(new_task)
await db.commit() # Await! Want AsyncSession.commit() is een async methode!
return new_task
@app.get("/items/{name}", response_model=Item)
async def get_item(name: str, db: AsyncSession = Depends(get_db)):
db_query = select(StoredItem).where(StoredItem.name == name)
query_result = await db.scalars(db_query)
item = query_result.one_or_none()
if item is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
)
return itemOefening Security & Concurrency
Async Database
Vertrek vanuit de “task management” oefening uit het Dependencies hoofdstuk en pas deze aan zodat tasks worden opgeslagen in een SQLite database i.p.v. de in-memory dict.
Je zult alle path functions async moeten maken.
Het TaskInternal Pydantic model heb je niet meer nodig en wordt vervangen door een SQLAlchemy model (bvb. StoredTask)
Denk er aan await te gebruiken als je async functies aanroept zoals db.get() of db.commit()!
Oplossing
main.py
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Annotated
from fastapi import (
Cookie,
Depends,
FastAPI,
HTTPException,
Path,
Query,
Response,
status,
)
from pydantic import BaseModel, Field
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.sql.functions import current_timestamp
# FastAPI Pydantic Models blijvend ongewijzigd, maar
# TaskInternal is niet meer nodig en vervangen door een SQLAlchemy model
class TaskCreate(BaseModel):
title: Annotated[str, Field(min_length=3, examples=["Learn FastAPI"])]
description: Annotated[str | None, Field(examples=["Use the FastAPI tutorial"])] = (
None
)
priority: Annotated[int, Field(ge=1, le=5, examples=[4])]
class TaskExternal(TaskCreate):
task_id: Annotated[int, Field(serialization_alias="id", examples=[1])]
class TaskUpdate(TaskCreate):
title: Annotated[str | None, Field(min_length=3)] = None
priority: Annotated[int | None, Field(ge=1, le=5)] = None
class Preference(BaseModel):
min_priority: Annotated[int, Field(ge=1, le=5)]
TaskId = Annotated[int, Path(ge=1)]
# SQLAlchemy ORM Model voor een opgeslagen Task - vervangt TaskInternal
# Field/Column namen en datatypes zijn hetzelfde
class Base(DeclarativeBase):
pass
class StoredTask(Base):
__tablename__ = "todos"
task_id: Mapped[int] = mapped_column(name="id", primary_key=True, autoincrement=True)
title: Mapped[str]
description: Mapped[str]
priority: Mapped[int]
created_at: Mapped[datetime] = mapped_column(server_default=current_timestamp())
# Async versies van de SQLAlchemy create_engine en sessionmaker
# uit de SQLAlchemy en Flask lessen
DATABASE_URL = "sqlite+aiosqlite:///./tasks.db"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
# Een 'lifespan' in FastAPI is een contextmanager functie die wordt uitgevoerd
# bij het opstarten van de applicatie. (Zie https://fastapi.tiangolo.com/advanced/events/)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Create database tables on startup"""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
print("Database tables created")
yield
app = FastAPI(title="Task Management API", version="1.0.0", lifespan=lifespan)
# Async path functies kunnen async Dependencies hebben
# Met deze Dependency geven we elke request een eigen SQLAlchemy session object
async def get_db():
"""Provides a database session for each request"""
async with AsyncSessionLocal() as session:
yield session
@app.post(
"/tasks",
response_model=TaskExternal,
status_code=status.HTTP_201_CREATED,
tags=["Tasks"],
summary="Create a new task",
)
async def create_task(task: TaskCreate, db: AsyncSession = Depends(get_db)):
new_task = StoredTask(**task.model_dump()) # SQLAlchemy model!
db.add(new_task)
await db.commit() # Await! Want AsyncSession.commit() is een async methode!
return new_task
@app.get(
"/tasks/{task_id}",
response_model=TaskExternal,
tags=["Tasks"],
summary="Get a single task",
)
async def get_task(task_id: int, db: AsyncSession = Depends(get_db)):
task = await db.get(StoredTask, task_id)
if task is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
)
return task
@app.get(
"/tasks",
response_model=list[TaskExternal],
tags=["Tasks"],
summary="List tasks",
)
async def list_tasks(
min_priority: Annotated[int | None, Query(ge=1)] = None,
query: Annotated[str | None, Query(alias="q", min_length=3)] = None,
cookie_min_priority: Annotated[
int | None, Cookie(alias="min_priority", include_in_schema=False)
] = None,
db: AsyncSession = Depends(get_db),
):
db_query = select(StoredTask)
if min_priority is None and cookie_min_priority is not None:
min_priority = cookie_min_priority
if min_priority is not None:
db_query = db_query.where(StoredTask.priority >= min_priority)
if query:
db_query = db_query.where(func.lower(StoredTask.title.contains(query.lower())))
result = await db.scalars(db_query)
return result.all()
@app.patch(
"/tasks/{task_id}",
response_model=TaskExternal,
tags=["Tasks"],
summary="Update a task",
)
async def update_task(
task_id: int, task_data: TaskUpdate, db: AsyncSession = Depends(get_db)
):
stored_task = await db.get(StoredTask, task_id)
if stored_task is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
)
update_data = task_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(stored_task, field, value)
await db.commit()
return stored_task
@app.post(
"/preferences",
status_code=status.HTTP_204_NO_CONTENT,
tags=["Preferences"],
summary="Set user preferences",
)
async def set_preferences(pref: Preference, response: Response):
response.set_cookie(
key="min_priority",
value=str(pref.min_priority),
)OAuth2
Voeg nu ook OAuth2 authenticatie toe zoals we gezien hebben in het security hoofdstuk. Gebruik JWT en password hashing. Users moeten ook in de database worden bewaard.
Voor het aanmaken een een test user moet je geen API path voorzien maar mag je dit zelf in de database toevoegen of via SQLAlchemy in de lifespan functie, nadat het database schema is aangemaakt (Base.metadata.create_all).
Voor de user zul je zowel een Pydantic model (User), zonder password, als een SQLalchemy mode (StoredUser), met password, nodig hebben.
De OAuth2 login path functie zal nu ook async moeten zijn wegens de dependency op get_db!
Ook de dependency waarmee een token wordt gecontroleerd moet nu async zijn. Voor deze toepassing willen we eigenlijk enkel het token controleren en hebben we het overeenkomstig User object niet nodig. Die functie hoeft dus niets te returnen.
We kunnen de dependency dan in de path decorators declareren i.p.v. in de path functies zelf
Oplossing - opdracht
main.py
from contextlib import asynccontextmanager
from datetime import datetime, timedelta, timezone
from typing import Annotated
import jwt
from fastapi import (
Cookie,
Depends,
FastAPI,
HTTPException,
Path,
Query,
Response,
status,
)
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt.exceptions import InvalidTokenError
from pwdlib import PasswordHash
from pydantic import BaseModel, Field
from sqlalchemy import func, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.sql.functions import current_timestamp
# FastAPI Pydantic Models blijvend ongewijzigd, maar
# TaskInternal is niet meer nodig en vervangen door een SQLAlchemy model
class TaskCreate(BaseModel):
title: Annotated[str, Field(min_length=3, examples=["Learn FastAPI"])]
description: Annotated[str | None, Field(examples=["Use the FastAPI tutorial"])] = (
None
)
priority: Annotated[int, Field(ge=1, le=5, examples=[4])]
class TaskExternal(TaskCreate):
task_id: Annotated[int, Field(serialization_alias="id", examples=[1])]
class TaskUpdate(TaskCreate):
title: Annotated[str | None, Field(min_length=3)] = None
priority: Annotated[int | None, Field(ge=1, le=5)] = None
class Preference(BaseModel):
min_priority: Annotated[int, Field(ge=1, le=5)]
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
TaskId = Annotated[int, Path(ge=1)]
# SQLAlchemy ORM Model voor een opgeslagen Task - vervangt TaskInternal
# Field/Column namen en datatypes zijn hetzelfde
class Base(DeclarativeBase):
pass
class StoredTask(Base):
__tablename__ = "todos"
task_id: Mapped[int] = mapped_column(
name="id", primary_key=True, autoincrement=True
)
title: Mapped[str]
description: Mapped[str]
priority: Mapped[int]
created_at: Mapped[datetime] = mapped_column(server_default=current_timestamp())
class StoredUser(Base):
__tablename__ = "users"
username: Mapped[str] = mapped_column(primary_key=True)
email: Mapped[str | None] = mapped_column(nullable=True)
full_name: Mapped[str | None] = mapped_column(nullable=True)
password: Mapped[str]
# Async versies van de SQLAlchemy create_engine en sessionmaker
# uit de SQLAlchemy en Flask lessen
DATABASE_URL = "sqlite+aiosqlite:///./tasks.db"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
# JWT and Password hashing
SECRET_KEY = "2891cd0eaed8623ea769c2fc41960f40d8fd0a2bbbaddcb6af93e0ecd9ecb0e5"
ALGORITHM = "HS256"
password_hash = PasswordHash.recommended()
def verify_password(plain_password, hashed_password) -> bool:
return password_hash.verify(plain_password, hashed_password)
def get_password_hash(password) -> str:
return password_hash.hash(password)
# Een 'lifespan' in FastAPI is een contextmanager functie die wordt uitgevoerd
# bij het opstarten van de applicatie. (Zie https://fastapi.tiangolo.com/advanced/events/)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Create database tables on startup"""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with AsyncSessionLocal() as session:
session.add(StoredUser(username="admin", password=get_password_hash("test")))
try:
await session.commit()
except IntegrityError:
pass
print("Database tables created")
yield
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI(title="Task Management API", version="1.0.0", lifespan=lifespan)
# Async path functies kunnen async Dependencies hebben
# Met deze Dependency geven we elke request een eigen SQLAlchemy session object
async def get_db():
"""Provides a database session for each request"""
async with AsyncSessionLocal() as session:
yield session
async def verify_token(
token: Annotated[str, Depends(oauth2_scheme)], db: AsyncSession = Depends(get_db)
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
decoded_jwt = jwt.decode(token, key=SECRET_KEY, algorithms=[ALGORITHM])
except InvalidTokenError:
raise credentials_exception
username = decoded_jwt.get("sub")
if username is None:
raise credentials_exception
user = await db.get(StoredUser, username)
if not user:
raise credentials_exception
@app.post("/token")
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: AsyncSession = Depends(get_db),
):
user = await db.get(StoredUser, form_data.username)
if not (user and verify_password(form_data.password, user.password)):
raise HTTPException(status_code=400, detail="Incorrect username or password")
now = datetime.now(timezone.utc)
data = {
"iat": now,
"exp": now + timedelta(minutes=15),
"sub": user.username,
"name": user.full_name,
}
encoded_jwt = jwt.encode(data, key=SECRET_KEY, algorithm=ALGORITHM)
return {"access_token": encoded_jwt, "token_type": "bearer"}
@app.post(
"/tasks",
response_model=TaskExternal,
status_code=status.HTTP_201_CREATED,
tags=["Tasks"],
summary="Create a new task",
dependencies=[Depends(verify_token)],
)
async def create_task(task: TaskCreate, db: AsyncSession = Depends(get_db)):
new_task = StoredTask(**task.model_dump()) # SQLAlchemy model!
db.add(new_task)
await db.commit() # Await! Want AsyncSession.commit() is een async methode!
return new_task
@app.get(
"/tasks/{task_id}",
response_model=TaskExternal,
tags=["Tasks"],
summary="Get a single task",
dependencies=[Depends(verify_token)],
)
async def get_task(task_id: int, db: AsyncSession = Depends(get_db)):
task = await db.get(StoredTask, task_id)
if task is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
)
return task
@app.get(
"/tasks",
response_model=list[TaskExternal],
tags=["Tasks"],
summary="List tasks",
dependencies=[Depends(verify_token)],
)
async def list_tasks(
min_priority: Annotated[int | None, Query(ge=1)] = None,
query: Annotated[str | None, Query(alias="q", min_length=3)] = None,
cookie_min_priority: Annotated[
int | None, Cookie(alias="min_priority", include_in_schema=False)
] = None,
db: AsyncSession = Depends(get_db),
):
db_query = select(StoredTask)
if min_priority is None and cookie_min_priority is not None:
min_priority = cookie_min_priority
if min_priority is not None:
db_query = db_query.where(StoredTask.priority >= min_priority)
if query:
db_query = db_query.where(func.lower(StoredTask.title.contains(query.lower())))
result = await db.scalars(db_query)
return result.all()
@app.patch(
"/tasks/{task_id}",
response_model=TaskExternal,
tags=["Tasks"],
summary="Update a task",
dependencies=[Depends(verify_token)],
)
async def update_task(
task_id: int, task_data: TaskUpdate, db: AsyncSession = Depends(get_db)
):
stored_task = await db.get(StoredTask, task_id)
if stored_task is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
)
update_data = task_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(stored_task, field, value)
await db.commit()
return stored_task
@app.post(
"/preferences",
status_code=status.HTTP_204_NO_CONTENT,
tags=["Preferences"],
summary="Set user preferences",
dependencies=[Depends(verify_token)],
)
async def set_preferences(pref: Preference, response: Response):
response.set_cookie(
key="min_priority",
value=str(pref.min_priority),
)SQLModel
In het laatste voorbeeld heb je waarschijnlijk gemerkt dat we nu twee bijna identieke modellen hebben voor een task. Een set Pydantic modellen voor de API zelf (request/response), en een SQLAlchemy ORM model voor de database table.
Wat als we deze modellen zouden kunnen combineren? Dat is precies één van de hoofddoelstellingen van SQLModel. SQLModel staat nog in zijn kinderschoenen maar zeker de moeite waard eens te bekijken als je met FastAPI en SQLAlchemy werkt.
We gaan hier in de cursus verder niet op in maar hieronder zie je het laatste voorbeeld aangepast naar SQLModel.
main.py
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Annotated, ClassVar
from fastapi import (
Cookie,
Depends,
FastAPI,
HTTPException,
Path,
Query,
Response,
status,
)
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.sql.functions import current_timestamp
from sqlmodel import Field, SQLModel, col, func, select
# SQLModel schema models (no table=True, so these are pure Pydantic models)
class TaskCreate(SQLModel):
title: Annotated[
str, Field(min_length=3, schema_extra={"examples": ["Learn FastAPI"]})
]
description: Annotated[
str | None,
Field(schema_extra={"examples": ["Use the FastAPI tutorial"]}),
] = None
priority: Annotated[int, Field(ge=1, le=5, schema_extra={"examples": [4]})]
class TaskExternal(TaskCreate):
task_id: Annotated[
int,
Field(
sa_column_kwargs={"name": "id"},
primary_key=True,
serialization_alias="id",
schema_extra={"examples": [1]},
),
]
class TaskUpdate(TaskCreate):
title: Annotated[str | None, Field(min_length=3)] = None
priority: Annotated[int | None, Field(ge=1, le=5)] = None
class Preference(SQLModel):
min_priority: Annotated[int, Field(ge=1, le=5)]
TaskId = Annotated[int, Path(ge=1)]
# SQLModel table model
class StoredTask(TaskExternal, table=True):
__tablename__: ClassVar[str] = "todos"
created_at: datetime | None = Field(
default=None, sa_column_kwargs={"server_default": current_timestamp()}
)
DATABASE_URL = "sqlite+aiosqlite:///./tasks.db"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Create database tables on startup"""
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
print("Database tables created")
yield
app = FastAPI(title="Task Management API", version="1.0.0", lifespan=lifespan)
async def get_db():
"""Provides a database session for each request"""
async with AsyncSessionLocal() as session:
yield session
@app.post(
"/tasks",
response_model=TaskExternal,
status_code=status.HTTP_201_CREATED,
tags=["Tasks"],
summary="Create a new task",
)
async def create_task(task: TaskCreate, db: AsyncSession = Depends(get_db)):
new_task = StoredTask(**task.model_dump())
db.add(new_task)
await db.commit()
return new_task
@app.get(
"/tasks/{task_id}",
response_model=TaskExternal,
tags=["Tasks"],
summary="Get a single task",
)
async def get_task(task_id: int, db: AsyncSession = Depends(get_db)):
task = await db.get(StoredTask, task_id)
if task is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
)
return task
@app.get(
"/tasks",
response_model=list[TaskExternal],
tags=["Tasks"],
summary="List tasks",
)
async def list_tasks(
min_priority: Annotated[int | None, Query(ge=1)] = None,
query: Annotated[str | None, Query(alias="q", min_length=3)] = None,
cookie_min_priority: Annotated[
int | None, Cookie(alias="min_priority", include_in_schema=False)
] = None,
db: AsyncSession = Depends(get_db),
):
db_query = select(StoredTask)
if min_priority is None and cookie_min_priority is not None:
min_priority = cookie_min_priority
if min_priority is not None:
db_query = db_query.where(StoredTask.priority >= min_priority)
if query:
db_query = db_query.where(
func.lower(col(StoredTask.title)).contains(query.lower())
)
result = await db.scalars(db_query)
return result.all()
@app.patch(
"/tasks/{task_id}",
response_model=TaskExternal,
tags=["Tasks"],
summary="Update a task",
)
async def update_task(
task_id: int, task_data: TaskUpdate, db: AsyncSession = Depends(get_db)
):
stored_task = await db.get(StoredTask, task_id)
if stored_task is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Task not found"
)
update_data = task_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(stored_task, field, value)
await db.commit()
return stored_task
@app.post(
"/preferences",
status_code=status.HTTP_204_NO_CONTENT,
tags=["Preferences"],
summary="Set user preferences",
)
def set_preferences(pref: Preference, response: Response):
response.set_cookie(
key="min_priority",
value=str(pref.min_priority),
)Extras
Applicaties opsplitsen met APIRouter
Bij Flask gebruikten we Blueprints om de code van de FlaskR applicatie te organiseren. Zo hadden we bijvoorbeeld de blog.py module met een lokaal Blueprint object waarop alle authenticatie routes gedefinieerd werden.
FastAPI heeft met APIRouter een zeer gelijkaardig concept.
Een APIRouter object wordt op dezelfde manier als een FastAPI object geïnitialiseerd en gebruikt.
app/routers/users.py
from fastapi import APIRouter
router = APIRouter()
@router.get("/users/", tags=["users"])
def read_users():
return [{"username": "Rick"}, {"username": "Morty"}]
@router.get("/users/me", tags=["users"])
def read_user_me():
return {"username": "fakecurrentuser"}
@router.get("/users/{username}", tags=["users"])
def read_user(username: str):
return {"username": username}In de main module, waar de FastAPI applicatie wordt geïnitialiseerd, worden de verschillende router objecten geïmporteerd en toegevoegd aan de applicatie met de include_router methode.
app/main.py
Volledige code kan je terugvinden op https://github.com/dvx76/fastapi-tutorial en uitvoeren met:
HTML met FastAPI
Ondanks zijn naam is het met FastAPI ook perfect mogelijk gewone HTML content te sturen. FastAPI kan dus prima gebruikt worden om een eenvoudige web-applicatie te maken zoals we in de Flask lessen hebben gezien. Bij APIs kan het ook interessant zijn een aantal HTML endpoints te voorzien, bijvoorbeeld om een login pagina te presenteren, of om frontend code te serveren.
StaticFiles
Met StaticFiles kan de inhoud van een directory beschikbaar worden gemaakt via een gekozen URL path. Dit wordt typisch gebruikt om bvb. javascript of CSS bestanden via een static/ path te delen.
Een FastAPI app object kan iets mounten op een gekozen path - net zoals een filesysteem of netwerkfolder gemount wordt in Windows, MacOS of Linux.
Een StaticFiles object is een van de opties die op deze manier gemount kunnen worden. StaticFiles zelf heeft als argument de directory (absoluut, of relatief t.o.v. de huidige directory) die gedeeld moet worden.
app.mount heeft als argumenten het gewenste URL path, het StaticFiles object, en een willekeurige naam waarmee de mount intern in FastAPI aanspreekbaar blijft.
main.py
static/hello.html
De pagina is dan bereikbaar via http://127.0.0.1:8000/static/hello.html.
HTMLResponse
De path functie decorators (bvb. @app.get()) hebben een response_class parameter. Default is dit fastapi.responses.JSONResponse. Dat zorgt er o.a. voor dat een gereturnde dict of Pydantic model automatische wordt geserialiseerd naar JSON, en de Content-Type header de correct waarde van application/json krijgt.
Als we voor de response_class expliciet fastapi.responses.HTMLResponse gebruiken kan de functie een HTML string terugsturen. Ook in dit geval zal bvb. de Content-Type automatisch juist staan (test/html).
Het is weinig zinvol zo een endpoint op te nemen in de OAS, dus we kunnen opnieuw include_in_schema=False gebruiken.
main.py
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
app = FastAPI()
@app.get("/items/", response_class=HTMLResponse, include_in_schema=False)
async def read_items():
return """
<html>
<head>
<title>Some HTML in here</title>
</head>
<body>
<h1>Look ma! HTML!</h1>
</body>
</html>
"""Om het resultaat te bekijken gaan we rechtstreeks naar http://127.0.0.1:8000/items/.
Naast JSONReponse en HTMLResponse zijn er nog verschillende andere mogelijkheden. Deze zijn allemaal subclasses van Response (dat ook rechtstreeks gebruikt kan worden). Bijvoorbeeld RedirectResponse en FileResponse.
Templates
FastAPI ondersteunt (eigenlijk via Starlette) rechtstreeks dezelfde Jinja2 templating die we al kennen van Flask. Omdat we FastAPI met de fastapi[standard] optie hebben geïnstalleerd is hebben we automatisch al de jinja2 dependency - anders zou dit package expliciet geïnstalleerd moeten worden.
Om templates te renderen gebruik je een fastapi.templating.Jinja2Templates object. Het argument is de directory (opnieuw absoluut of relatief) maar de template bestanden zich bevinden. In principe is dit één globaal object.
Om dan in een path functie een bepaalde template naar HTML te renderen roep je te TemplateResponse methode aan op dat object. Deze krijgt als argumenten altijd het Request object en de naam van de template mee. Optioneel kan je ook een context dictionary meegeven met extra data om in de template te gebruiken - net als bij Flask.
main.py
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
app = FastAPI()
templates = Jinja2Templates(directory="templates")
@app.get("/items/{id}", response_class=HTMLResponse)
async def read_item(request: Request, id: str):
return templates.TemplateResponse(
request=request, name="item.html.j2", context={"id": id}
)templates/item.html.j2
Om te testen gebruiken we bvb. http://127.0.0.1:8000/items/1
FlaskR met FastAPI
Als praktisch voorbeeld bekijken we hoe de FlaskR applicatie uit de Flask lessen aangepast kan worden naar FastAPI. Om de focus op HTML en Jinja2 te houden gebruiken we geen authenticatie, statische data (i.p.v. een SQLite database) en implementeren we enkel de home page (index).
flaskr/blog.py
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
templates = Jinja2Templates(directory="templates")
router = APIRouter(include_in_schema=False)
fake_posts_db = [
{"id": 1, "username": "Bob", "title": "Bob is awesome", "body": "Everybody loves Bob!", "created": "2026-01-03"},
{"id": 2, "username": "Bob", "title": "Easy title", "body": "Easy post", "created": "2026-01-02"},
{"id": 3, "username": "Alice", "title": "Alice is awesome too", "body": "<3", "created": "2026-01-01"}
]
@router.get("/", response_class=HTMLResponse)
def home(request: Request):
return templates.TemplateResponse(
request=request, name="blog/index.html.j2", context={"posts": fake_posts_db}
)De volledige code kan je terugvinden op https://github.com/dvx76/fastapi-tutorial en uitvoeren met:
We bekijken de belangrijkste verschillen in de Jinja2 templates.
url_for
url_for heeft een keyword-argument path i.p.v. filename, bvb:
before_app_request en flask.g
Er is geen g object (tijdelijke request-specifieke opslag). In de plaats kunnen we Request.state gebruiken, en een Middleware functie als equivalent voor Flask’s before_app_request.
@app.middleware("http")
async def get_user(request: Request, call_next):
request.state.user = "Fabrice"
return await call_next(request)- {% if g.user %}
- <li><span>{{ g.user['username'] }}</span>
- <li><a href="{{ url_for('auth.logout') }}">Log Out</a>
- {% else %}
- <li><a href="{{ url_for('auth.register') }}">Register</a>
- <li><a href="{{ url_for('auth.login') }}">Log In</a>
- {% endif %}
+ {% if request.state.user %}
+ <li><span>{{ request.state.user }}</span>
+ <li><a href="{{ url_for('logout') }}">Log Out</a>
+ {% else %}
+ <li><a href="{{ url_for('register') }}">Register</a>
+ <li><a href="{{ url_for('login') }}">Log In</a>
+ {% endif %}Websockets
https://fastapi.tiangolo.com/advanced/websockets/
WebSockets is een protocol waarmee een permanente, bidirectionele verbinding wordt opgezet tussen client (bvb. browser) en server. In tegenstelling tot klassieke HTTP-requests (GET, POST, …) blijft de verbinding open, waardoor beide kanten op elk moment berichten kunnen sturen zonder telkens een nieuwe request te starten.
Dit maakt WebSockets ideaal voor real-time toepassingen zoals chatapps, live notificaties of dashboards.
sequenceDiagram
participant C as Client (Browser)
participant S as Server (FastAPI)
C->>S: HTTP GET (Connection: Upgrade)
S-->>C: 101 Switching Protocols
Note over C,S: WebSocket connection established
C->>S: WebSocket Message
S-->>C: WebSocket Message
C->>S: WebSocket Message
S-->>C: WebSocket Message
Websocket verbindingen worden typisch vanuit frontend javascript code gestart en gebruikt. Ter illustratie werken we met een eenvoudig voorbeeld waarvoor we geen specific frontend framework nodig hebben. Als eerste stap hebben we natuurlijk een path nodig om de HTML + Javascript pagina door te sturen naar de browser:
main.py
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
1 var ws = new WebSocket("ws://localhost:8000/ws");
3 ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
2 function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)- 1
-
Het
wsobject stelt de websocket verbinding voor. Het/wspad moeten we nog implementeren! - 2
-
De HTML form zal deze
sendMessagefunctie aanroepen. Die haalt het te versturen bericht uit de form en stuurt het over de websocket verbinden metws.send() - 3
-
Aan
ws.onmessagewordt een functie toegekend die wordt aangeroepen voor elk ontvangen bericht. Die functie voegt het bericht gewoon toe aan de HTML pagina in een (initieel lege) lijst.
De HTML pagina is nu wel al bereikbaar maar er gebeurd nog niets omdat we nog geen websocket path in the FastAPI applicatie hebben. Daar zorgt de volgende code voor:
main.py
- 1
-
De
app.websocketdecorator maakt van deze functie een websocket path functie. - 2
-
Met
WebSocket.accept()wordt de verbinden tot stand gebracht en is deze klaar om berichten te versturen of ontvangen. - 3
-
Een eenvoudig lus zal steeds ontvangen berichten uitlezen (
WebSocket.receive_text()) en hetzelfde bericht terugsturen (WebSocket.send_text()).
Merk op dat websockets per definitie asynchroon zijn. De WebSocket class implementeert enkel async methodes. Bijgevolg moet de path functie ook async zijn.
Naast send_text() heeft WebSocket ook send_binary() en send_json() methodes.
Berichten naar jezelf sturen is natuurlijk weinig zinvol. Als we de verschillende websocket verbindingen (als WebSocket objecten) bijhouden kunnen we elk ontvangen bericht naar alle andere websocket clients sturen. Met minder dan 100 lijnen code hebben we met FastAPI een (zeer) rudimentaire chat-applicatie gemaakt:
main.py
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<label>Nickname: <input type="text" id="nickName" autocomplete="off" value="Anonymous Coward"/></label>
<button onclick="connect(event)">Connect</button>
<hr>
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<span id="connected"></span>
<ul id='messages'>
</ul>
<script>
var ws = null;
function connect(event) {
5 var nickName = document.getElementById("nickName")
ws = new WebSocket("ws://localhost:8000/ws/" + nickName.value);
document.querySelector("#connected").textContent = "Connected!";
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
event.preventDefault()
}
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
1ws_connections: list[WebSocket] = []
@app.get("/")
async def get():
return HTMLResponse(html)
2@app.websocket("/ws/{nickname}")
async def websocket_endpoint(websocket: WebSocket, nickname: str):
await websocket.accept()
3 ws_connections.append(websocket)
while True:
data = await websocket.receive_text()
await websocket.send_text(f"You: {data}")
4 for ws_connection in ws_connections:
if ws_connection != websocket:
await ws_connection.send_text(f"{nickname}: {data}")- 1
- We gebruiken een eenvoudige globale in-memory lijst om alle verbindingen mij te houden.
- 2
- Om elke client een naam de geven laten we een path parameter toe. Websocket paths kunnen net als gewone HTTP paths path en/of query parameters gebruiken!
- 3
-
Eenmaal verbonden wordt het
WebSocketobject aan de lijst toegevoegd. - 4
- Ontvangen berichten kunnen nu naar elke verbinding uit de lijst worden gestuurd.
- 5
- Om de nickname in te stellen gebruiken we een eenvoudig extra input veld
