阅读 197

FastAPI(59)- 详解使用 OAuth2PasswordBearer + JWT 认证

JWT

  • JSON Web Tokens

  • 它是一个将 JSON 对象编码为密集且没有空格的长字符串的标准

  • 使用 JWT token 和安全密码 hash 使应用程序真正安全

 

JWT 小栗子

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
  • 它还没有加密,因此任何人都可以从该字符串中恢复信息

  • 但是已经加签了,因此,当收到发出的 token 时,可以验证是否实际发出了它

  • 创建一个有效期为 1 周的 token,然后当用户第二天带着 token 回来时,知道该用户仍然登录到系统中

  • 一周后,令牌将过期,用户将无法获得授权,必须重新登录以获取新的 token

  • 如果用户(或第三方)试图修改 token 以更改过期时间,将能够发现它,因为签名不匹配

 

前提

需要安装 python-jose 来在 Python 中生成和验证 JWT token

pip install python-jose
pip install cryptography

 

JWT 流程

  • 前端登录提交用户名、密码

  • 后端拿到用户名、密码进行验证,如果没问题,则返回 token

  • 前端访问需要认证的 url 时携带 token

  • 后端拿到 token 进行验证

  • 验证通过返回用户信息及访问的 url 信息

 

hash 密码

前提

  • 数据库存储的密码不能是明文的,需要加密

  • PassLib 是一个用于处理哈希密码的包

  • 推荐的算法是 「Bcrypt

pip install passlib
pip install bcrypt

 

包含的功能

  • hash 密码

  • 验证 hash 密码是否一致

  • 通过用户名、密码验证用户

 

hash 密码

复制代码

# 导入 CryptContextfrom passlib.context import CryptContext

pwd_context = CryptContext(schemes=['bcrypt'], deprecated="auto")# 密码加密def hash_password(password: str) -> str:    return pwd_context.hash(password)

复制代码

 

验证 hash 密码是否一致

# 验证密码def verify_password(plain_password, hashed_password):    return pwd_context.verify(plain_password, hashed_password)

  

通过用户名、密码验证用户

复制代码

# 模拟从数据库中根据用户名查找用户def get_user(db, username: str):    if username in db:
        user_dict = db[username]        return UserInDB(**user_dict)# 根据用户名、密码来验证用户def authenticate_user(db, username: str, password: str):    # 1、通过用户名模拟去数据库查找用户
    user = get_user(db, username)    if not user:        # 2、用户不存在
        return False    if not verify_password(password, user.hashed_password):        # 3、密码验证失败
        return False    # 4、验证通过,返回用户信息
    return user

复制代码

 

处理 JWT token

生成用于签名 JWT token 的随机密钥

在命令行敲

> openssl rand -hex 32dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c

 

常量池

方便后续复用

复制代码

# 常量池# 通过 openssl rand -hex 32 生成的随机密钥SECRET_KEY = "dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c"# 加密算法ALGORITHM = "HS256"# 过期时间,分钟ACCESS_TOKEN_EXPIRE_MINUTES = 30

复制代码

 

创建生成 JWT token 需要用的 Pydantic Model

其实不创建也没事,但这里为了规范和数据校验功能,还是建吧

复制代码

# 返回给客户端的 Token Modelclass Token(BaseModel):
    access_token: str
    token_type: strclass TokenData(BaseModel):
    username: Optional[str] = None

复制代码

 

生成 JWT token

复制代码

# 导入 JWT 相关库from jose import JWTError, jwt# 用户名、密码验证成功后,生成 tokendef create_access_token(
        data: dict,
        expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()    if expires_delta:
        expire = datetime.utcnow() + expires_delta    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})    # 加密
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)    return encoded_jwt

复制代码

 

修改 get_current_user

获取 token 后解码并获取用户

复制代码

# 导入 JWT 相关库from jose import JWTError, jwt# 根据当前用户的 token 获取用户,token 已失效则返回错误码async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )    try:        # 1、解码收到的 token
        payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)        # 2、拿到 username
        username: str = payload.get("sub")        if not username:            # 3、若 token 失效,则返回错误码
            raise credentials_exception
        token_data = TokenData(username=username)    except JWTError:        raise credentials_exception    # 4、获取用户
    user = get_user(fake_users_db, username=token_data.username)    if not user:        raise credentials_exception    # 5、返回用户
    return user

复制代码

  

修改获取 token 的路径操作函数

复制代码

# OAuth2 获取 token 的请求路径@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):    # 1、获取客户端传过来的用户名、密码
    username = form_data.username
    password = form_data.password    # 2、验证用户
    user = authenticate_user(fake_users_db, username, password)    if not user:        # 3、验证失败,返回错误码
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)    # 4、生成 token
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=access_token_expires
    )    # 5、返回 JSON 响应
    return {"access_token": access_token, "token_type": "bearer"}

复制代码

 

sub 的是什么?

  • JWT 规范中有一个 sub key,子健

  • 它是可选的,这里的作用是通过用户名设置用户标识

  • 子健应该在整个应用程序中具有唯一的标识符,并且它应该是一个字符串

 

完整的代码

复制代码

#!usr/bin/env python# -*- coding:utf-8 _*-"""# author: 小菠萝测试笔记
# blog:  https://www.cnblogs.com/poloyy/
# time: 2021/10/6 12:05 下午
# file: 49_bearer.py"""from typing import Optionalimport uvicornfrom fastapi import FastAPI, Depends, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestFormfrom pydantic import BaseModelfrom datetime import datetime, timedelta# 导入 CryptContextfrom passlib.context import CryptContext# 导入 JWT 相关库from jose import JWTError, jwt# 常量池# 通过 openssl rand -hex 32 生成的随机密钥SECRET_KEY = "dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c"# 加密算法ALGORITHM = "HS256"# 过期时间,分钟ACCESS_TOKEN_EXPIRE_MINUTES = 30# 模拟数据库fake_users_db = {    "johndoe": {        "username": "johndoe",        "full_name": "John Doe",        "email": "johndoe@example.com",        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",        "disabled": False,
    }
}# 返回给客户端的 User Model,不需要包含密码class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None# 继承 User,用于密码验证,所以要包含密码class UserInDB(User):
    hashed_password: str# 获取 token 路径操作函数的响应模型class Token(BaseModel):
    access_token: str
    token_type: strclass TokenData(BaseModel):
    username: Optional[str] = None# 实例对象池app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

pwd_context = CryptContext(schemes=['bcrypt'], deprecated="auto")# 密码加密def hash_password(password: str) -> str:    return pwd_context.hash(password)# 验证密码def verify_password(plain_password, hashed_password):    return pwd_context.verify(plain_password, hashed_password)# 模拟从数据库中根据用户名查找用户def get_user(db, username: str):    if username in db:
        user_dict = db[username]        return UserInDB(**user_dict)# 根据用户名、密码来验证用户def authenticate_user(db, username: str, password: str):    # 1、通过用户名模拟去数据库查找用户
    user = get_user(db, username)    if not user:        # 2、用户不存在
        return False    if not verify_password(password, user.hashed_password):        # 3、密码验证失败
        return False    # 4、验证通过,返回用户信息
    return user# 用户名、密码验证成功后,生成 tokendef create_access_token(
        data: dict,
        expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()    if expires_delta:
        expire = datetime.utcnow() + expires_delta    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})    # 加密
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)    return encoded_jwt# OAuth2 获取 token 的请求路径@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):    # 1、获取客户端传过来的用户名、密码
    username = form_data.username
    password = form_data.password    # 2、验证用户
    user = authenticate_user(fake_users_db, username, password)    if not user:        # 3、验证失败,返回错误码
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)    # 4、生成 token
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=access_token_expires
    )    # 5、返回 JSON 响应
    return {"access_token": access_token, "token_type": "bearer"}# 根据当前用户的 token 获取用户,token 已失效则返回错误码async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )    try:        # 1、解码收到的 token
        payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)        # 2、拿到 username
        username: str = payload.get("sub")        if not username:            # 3、若 token 失效,则返回错误码
            raise credentials_exception
        token_data = TokenData(username=username)    except JWTError:        raise credentials_exception    # 4、获取用户
    user = get_user(fake_users_db, username=token_data.username)    if not user:        raise credentials_exception    # 5、返回用户
    return user# 判断用户是否活跃,活跃则返回,不活跃则返回错误码async def get_current_active_user(user: User = Depends(get_current_user)):    if user.disabled:        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid User")    return user# 获取当前用户信息@app.get("/user/me")
async def read_user(user: User = Depends(get_current_active_user)):    return user# 正常的请求@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):    return {"token": token}if __name__ == '__main__':
    uvicorn.run(app="49_bearer:app", reload=True, host="127.0.0.1", port=8080)

复制代码

 

请求结果

 

 

来源https://www.cnblogs.com/poloyy/p/15376636.html

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐