FastAPI(59)- 详解使用 OAuth2PasswordBearer + JWT 认证
JWT
JSON Web Tokens
它是一个将 JSON 对象编码为密集且没有空格的长字符串的标准
使用 JWT token 和安全密码 hash 使应用程序真正安全
JWT 小栗子
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
它还没有加密,因此任何人都可以从该字符串中恢复信息
但是已经加签了,因此,当收到发出的 token 时,可以验证是否实际发出了它
创建一个有效期为 1 周的 token,然后当用户第二天带着 token 回来时,知道该用户仍然登录到系统中
一周后,令牌将过期,用户将无法获得授权,必须重新登录以获取新的 token
如果用户(或第三方)试图修改 token 以更改过期时间,将能够发现它,因为签名不匹配
前提
需要安装 python-jose 来在 Python 中生成和验证 JWT token
pip install python-jose pip install cryptography
JWT 流程
前端登录提交用户名、密码
后端拿到用户名、密码进行验证,如果没问题,则返回 token
前端访问需要认证的 url 时携带 token
后端拿到 token 进行验证
验证通过返回用户信息及访问的 url 信息
hash 密码
前提
数据库存储的密码不能是明文的,需要加密
PassLib 是一个用于处理哈希密码的包
推荐的算法是 「Bcrypt」
pip install passlib pip install bcrypt
包含的功能
hash 密码
验证 hash 密码是否一致
通过用户名、密码验证用户
hash 密码
# 导入 CryptContextfrom passlib.context import CryptContext pwd_context = CryptContext(schemes=['bcrypt'], deprecated="auto")# 密码加密def hash_password(password: str) -> str: return pwd_context.hash(password)
验证 hash 密码是否一致
# 验证密码def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)
通过用户名、密码验证用户
# 模拟从数据库中根据用户名查找用户def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict)# 根据用户名、密码来验证用户def authenticate_user(db, username: str, password: str): # 1、通过用户名模拟去数据库查找用户 user = get_user(db, username) if not user: # 2、用户不存在 return False if not verify_password(password, user.hashed_password): # 3、密码验证失败 return False # 4、验证通过,返回用户信息 return user
处理 JWT token
生成用于签名 JWT token 的随机密钥
在命令行敲
> openssl rand -hex 32dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c
常量池
方便后续复用
# 常量池# 通过 openssl rand -hex 32 生成的随机密钥SECRET_KEY = "dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c"# 加密算法ALGORITHM = "HS256"# 过期时间,分钟ACCESS_TOKEN_EXPIRE_MINUTES = 30
创建生成 JWT token 需要用的 Pydantic Model
其实不创建也没事,但这里为了规范和数据校验功能,还是建吧
# 返回给客户端的 Token Modelclass Token(BaseModel): access_token: str token_type: strclass TokenData(BaseModel): username: Optional[str] = None
生成 JWT token
# 导入 JWT 相关库from jose import JWTError, jwt# 用户名、密码验证成功后,生成 tokendef create_access_token( data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) # 加密 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt
修改 get_current_user
获取 token 后解码并获取用户
# 导入 JWT 相关库from jose import JWTError, jwt# 根据当前用户的 token 获取用户,token 已失效则返回错误码async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: # 1、解码收到的 token payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM) # 2、拿到 username username: str = payload.get("sub") if not username: # 3、若 token 失效,则返回错误码 raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception # 4、获取用户 user = get_user(fake_users_db, username=token_data.username) if not user: raise credentials_exception # 5、返回用户 return user
修改获取 token 的路径操作函数
# OAuth2 获取 token 的请求路径@app.post("/token", response_model=Token) async def login(form_data: OAuth2PasswordRequestForm = Depends()): # 1、获取客户端传过来的用户名、密码 username = form_data.username password = form_data.password # 2、验证用户 user = authenticate_user(fake_users_db, username, password) if not user: # 3、验证失败,返回错误码 raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) # 4、生成 token access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) # 5、返回 JSON 响应 return {"access_token": access_token, "token_type": "bearer"}
sub 的是什么?
JWT 规范中有一个 sub key,子健
它是可选的,这里的作用是通过用户名设置用户标识
子健应该在整个应用程序中具有唯一的标识符,并且它应该是一个字符串
完整的代码
#!usr/bin/env python# -*- coding:utf-8 _*-"""# author: 小菠萝测试笔记 # blog: https://www.cnblogs.com/poloyy/ # time: 2021/10/6 12:05 下午 # file: 49_bearer.py"""from typing import Optionalimport uvicornfrom fastapi import FastAPI, Depends, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormfrom pydantic import BaseModelfrom datetime import datetime, timedelta# 导入 CryptContextfrom passlib.context import CryptContext# 导入 JWT 相关库from jose import JWTError, jwt# 常量池# 通过 openssl rand -hex 32 生成的随机密钥SECRET_KEY = "dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c"# 加密算法ALGORITHM = "HS256"# 过期时间,分钟ACCESS_TOKEN_EXPIRE_MINUTES = 30# 模拟数据库fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, } }# 返回给客户端的 User Model,不需要包含密码class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None# 继承 User,用于密码验证,所以要包含密码class UserInDB(User): hashed_password: str# 获取 token 路径操作函数的响应模型class Token(BaseModel): access_token: str token_type: strclass TokenData(BaseModel): username: Optional[str] = None# 实例对象池app = FastAPI() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") pwd_context = CryptContext(schemes=['bcrypt'], deprecated="auto")# 密码加密def hash_password(password: str) -> str: return pwd_context.hash(password)# 验证密码def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)# 模拟从数据库中根据用户名查找用户def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict)# 根据用户名、密码来验证用户def authenticate_user(db, username: str, password: str): # 1、通过用户名模拟去数据库查找用户 user = get_user(db, username) if not user: # 2、用户不存在 return False if not verify_password(password, user.hashed_password): # 3、密码验证失败 return False # 4、验证通过,返回用户信息 return user# 用户名、密码验证成功后,生成 tokendef create_access_token( data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) # 加密 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt# OAuth2 获取 token 的请求路径@app.post("/token", response_model=Token) async def login(form_data: OAuth2PasswordRequestForm = Depends()): # 1、获取客户端传过来的用户名、密码 username = form_data.username password = form_data.password # 2、验证用户 user = authenticate_user(fake_users_db, username, password) if not user: # 3、验证失败,返回错误码 raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) # 4、生成 token access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) # 5、返回 JSON 响应 return {"access_token": access_token, "token_type": "bearer"}# 根据当前用户的 token 获取用户,token 已失效则返回错误码async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: # 1、解码收到的 token payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM) # 2、拿到 username username: str = payload.get("sub") if not username: # 3、若 token 失效,则返回错误码 raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception # 4、获取用户 user = get_user(fake_users_db, username=token_data.username) if not user: raise credentials_exception # 5、返回用户 return user# 判断用户是否活跃,活跃则返回,不活跃则返回错误码async def get_current_active_user(user: User = Depends(get_current_user)): if user.disabled: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid User") return user# 获取当前用户信息@app.get("/user/me") async def read_user(user: User = Depends(get_current_active_user)): return user# 正常的请求@app.get("/items/") async def read_items(token: str = Depends(oauth2_scheme)): return {"token": token}if __name__ == '__main__': uvicorn.run(app="49_bearer:app", reload=True, host="127.0.0.1", port=8080)
请求结果
来源https://www.cnblogs.com/poloyy/p/15376636.html