Staff Prep 14: Auth & Authorization — JWT, OAuth2, RBAC vs ABAC
Back to Part 13: Caching Strategies. Authentication (who are you?) and authorization (what can you do?) are adjacent problems solved by different tools. JWT handles the first efficiently without database lookups. OAuth2 handles delegated authorization across services. RBAC and ABAC are two models for access control that have very different scaling characteristics. Staff engineers need to know all four and when each breaks down.
JWT internals: what is in the token
A JWT (JSON Web Token) is three base64url-encoded parts separated by dots:
header.payload.signature. The server signs the payload with a secret key.
Verification does not require a database lookup — just recompute the signature and compare.
import jwt
import time
from datetime import datetime, timedelta, timezone
SECRET_KEY = "your-secret-key-min-32-chars"
ALGORITHM = "HS256"
def create_access_token(user_id: int, role: str, expires_minutes: int = 15) -> str:
payload = {
"sub": str(user_id), # subject: user identifier
"role": role,
"iat": int(time.time()), # issued at
"exp": int((datetime.now(timezone.utc) + timedelta(minutes=expires_minutes)).timestamp()),
"jti": generate_uuid(), # JWT ID: unique per token (for revocation)
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_access_token(token: str) -> dict:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(401, "Token expired")
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid token")
# JWT structure decoded:
# Header: {"alg": "HS256", "typ": "JWT"}
# Payload: {"sub": "42", "role": "admin", "exp": 1743710400}
# Signature: HMACSHA256(base64(header) + "." + base64(payload), secret)
The revocation problem and refresh token rotation
JWTs are stateless. Once issued, you cannot revoke them before they expire — unless you maintain a denylist. The solution: short-lived access tokens (15 minutes) + long-lived refresh tokens stored server-side.
import secrets
# Refresh token: stored in DB, one per user session
async def create_refresh_token(user_id: int, db) -> str:
token = secrets.token_urlsafe(32) # cryptographically random
expires_at = datetime.now(timezone.utc) + timedelta(days=30)
await db.execute(
"""INSERT INTO refresh_tokens (user_id, token_hash, expires_at, created_at)
VALUES ($1, $2, $3, NOW())""",
user_id, hash_token(token), expires_at
)
return token
async def rotate_refresh_token(old_token: str, db) -> tuple[str, str]:
"""Refresh token rotation: exchange old refresh token for new pair."""
token_hash = hash_token(old_token)
# Find and validate old token
row = await db.fetchrow(
"SELECT user_id, used, expires_at FROM refresh_tokens WHERE token_hash = $1",
token_hash
)
if not row:
raise HTTPException(401, "Invalid refresh token")
if row["used"]:
# Token reuse detected — possible theft — invalidate ALL tokens for user
await db.execute(
"DELETE FROM refresh_tokens WHERE user_id = $1", row["user_id"]
)
raise HTTPException(401, "Refresh token reuse detected — all sessions invalidated")
if row["expires_at"] < datetime.now(timezone.utc):
raise HTTPException(401, "Refresh token expired")
# Mark old token as used (cannot be used again)
await db.execute(
"UPDATE refresh_tokens SET used = true WHERE token_hash = $1", token_hash
)
# Issue new pair
new_refresh_token = await create_refresh_token(row["user_id"], db)
new_access_token = create_access_token(row["user_id"], role=row.get("role", "user"))
return new_access_token, new_refresh_token
FastAPI JWT dependency
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Security(security),
db: AsyncSession = Depends(get_db),
) -> User:
token = credentials.credentials
payload = verify_access_token(token)
user_id = int(payload["sub"])
user = await db.get(User, user_id)
if not user or not user.is_active:
raise HTTPException(401, "User not found or inactive")
return user
async def require_role(role: str):
"""Factory for role-based dependencies."""
async def check_role(user: User = Depends(get_current_user)) -> User:
if user.role != role:
raise HTTPException(403, f"Requires {role} role")
return user
return check_role
@app.get("/admin/users")
async def admin_list_users(admin: User = Depends(require_role("admin"))):
return await list_all_users()
Oauth2: delegated authorization flow
from fastapi.security import OAuth2AuthorizationCodeBearer
# OAuth2 Authorization Code Flow (for user-facing apps)
# 1. Redirect user to authorization server:
# GET /oauth/authorize?response_type=code&client_id=XYZ&redirect_uri=...&scope=read:profile
#
# 2. User logs in and approves; server redirects back:
# GET /callback?code=AUTHORIZATION_CODE
#
# 3. Exchange code for tokens (server-to-server):
async def exchange_code_for_token(code: str) -> dict:
response = await http_client.post(
"https://auth.example.com/oauth/token",
data={
"grant_type": "authorization_code",
"code": code,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"redirect_uri": REDIRECT_URI,
}
)
return response.json()
# Returns: {"access_token": "...", "refresh_token": "...", "expires_in": 3600}
# OAuth2 Client Credentials (for service-to-service)
async def get_service_token(client_id: str, client_secret: str) -> str:
response = await http_client.post(
"https://auth.example.com/oauth/token",
data={
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": "read:orders write:fulfillment",
}
)
return response.json()["access_token"]
RBAC vs ABAC
RBAC (Role-Based Access Control): Users have roles. Roles have permissions. Simple to implement and reason about. Scales to hundreds of roles. Breaks when you need conditional access (user can edit their own posts but not others').
PERMISSIONS = {
"admin": {"read:any", "write:any", "delete:any"},
"editor": {"read:any", "write:own", "delete:own"},
"viewer": {"read:any"},
}
def has_permission(user: User, permission: str) -> bool:
role_permissions = PERMISSIONS.get(user.role, set())
return permission in role_permissions
# RBAC breaks on ownership: "write:own" is not expressive enough
# You need to know WHICH resource is "own"
# ABAC (Attribute-Based Access Control): evaluate policies with context
def can_edit_post(user: User, post: Post) -> bool:
# Policy: user can edit if they own the post OR are an admin
return user.id == post.author_id or user.role == "admin"
def can_access_tenant_data(user: User, resource: dict) -> bool:
# Policy: user can access data if same tenant AND has required role
return (
user.tenant_id == resource["tenant_id"]
and user.role in resource["allowed_roles"]
)
# ABAC is more expressive but harder to audit
# "Who can access resource X?" requires evaluating policies for all users
Quiz: test your understanding
Before moving on, answer these in your head (or out loud):
- A user's JWT access token is stolen. It expires in 15 minutes. What can you do right now to invalidate it before expiration? What does this cost architecturally?
- What is refresh token rotation? What happens when a stolen refresh token is detected being reused?
- What is the difference between OAuth2 Authorization Code flow and Client Credentials flow? When would you use each?
- Your RBAC has roles: admin, manager, viewer. A manager should be able to approve requests only within their department. How does pure RBAC fail here? How would ABAC solve it?
- JWT claims are base64-encoded, not encrypted. What information should you never put in a JWT payload? Why?
Next up — Part 15: Python GIL Explained. What the GIL actually is, when it hurts, and what Python 3.13 changes.