Dependencies
FasterAPI
comes with multiple pre-built dependencies for assisting user authentication.
autenticated
A dependency function to authenticate the user.
Source code in FasterAPI\dependencies.py
| async def authenticated(
request: Request,
security_scopes: SecurityScopes,
token: Annotated[str, Depends(oauth2_scheme)],
db: Annotated[Session, Depends(get_db)],
):
"""A dependency function to authenticate the user."""
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
multi_session_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Multiple sessions are not allowed",
headers={"WWW-Authenticate": authenticate_value},
)
scope_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Insufficient privileges",
headers={"WWW-Authenticate": authenticate_value},
)
jwt_exception = HTTPException(
status_code=status.HTTP_406_NOT_ACCEPTABLE,
detail="Invalid JWT token",
headers={"WWW-Authenticate": authenticate_value},
)
jwt_expired_exception = HTTPException(
status_code=status.HTTP_406_NOT_ACCEPTABLE,
detail="JWT token has expired",
headers={"WWW-Authenticate": authenticate_value},
)
blacklisted_token = (
db.query(BlacklistedToken).filter(BlacklistedToken.token == token).first()
)
if blacklisted_token:
raise jwt_exception
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload["sub"]
expiration = datetime.fromtimestamp(payload["exp"])
except JWTError:
raise jwt_exception
if expiration < datetime.now():
raise jwt_expired_exception
user = db.query(User).filter(User.username == username).first()
if user is None:
raise credentials_exception
user_privileges = [privilege.privilege for privilege in user.privileges]
if not set(security_scopes.scopes).issubset(set(user_privileges)):
raise scope_exception
active_session = (
db.query(ActiveSession).filter(ActiveSession.username == username).first()
)
if ALLOW_MULTI_SESSIONS is False:
if request.client.host != str(active_session.client): # type: ignore
raise multi_session_exception
return user
|
This dependency can be used on any FastAPI endpoints to secure it that only authenticated user can access. To be authenticated, the user request must contains "Authorization: Bear XXXX", where "XXXX" is the JWT token obtained after login.
is_superuser
A dependency function to check if the user is a superuser.
Source code in FasterAPI\dependencies.py
| async def is_superuser(user: Annotated[User, Depends(authenticated)]):
"""A dependency function to check if the user is a superuser."""
if not user.is_superuser: # type: ignore
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="The user doesn't have enough privileges",
)
return user
|
This dependency will secure the endpoint that only authenticated user can access, and the user also must be a superuser.
scope
FasterAPI implemented the Oauth2 scope but it is verified against the user privileges. It means the scopes inside the JWT will always be the user's privileges. Therefore, you can use the scope header to verify against the user privilege. For example:
@app.get("/users/me/items/")
async def read_own_items(
current_user: Annotated[User, Security(authenticated, scopes=["items"])]
):
return [{"item_id": "Foo", "owner": current_user.username}]
If the user does not have priviledge "items", then it will be refused to access endpoint "/users/me/items/". Dependency "is_superuser" works in the similar manner.