跳到主要内容

安全认证

前言

nil

HTTPBasic

HTTPBasic基本认证是Web服务器和客户端之间最基础的HTTP认证方式。使用简单,但因为账户信息用base64明文传输,容易被逆向破解。

from fastapi import APIRouter, Depends
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.responses import PlainTextResponse

router = APIRouter(prefix="/chapter9", tags=["安全认证"])

security = HTTPBasic()

@router.get("/login1")
async def login(credentials: HTTPBasicCredentials = Depends(security)):
if credentials.username == 'zhangsan' and credentials.password == "123456":
return PlainTextResponse(status_code=200, content="login success")
else:
return PlainTextResponse(status_code=401, content="login failed")

APIKey

APIKey认证鉴权方案是一种基于读取前端提交固定关键key值来进行比对加盐的一种方式。FastAPI提供的基于APIKey的特定密钥方案主要有APIKeyHeader、APIKeyQuery和APIKeyCookie。

from fastapi import APIRouter, Depends, Request
from fastapi.security import APIKeyHeader, APIKeyQuery, APIKeyCookie
from fastapi.responses import PlainTextResponse
from fastapi.params import Security
from fastapi.exceptions import HTTPException

router = APIRouter(prefix="/chapter9", tags=["安全认证"])

class APIKey():
API_KEY_HEADER = "XTOKEN"
API_KEY_HEADER_NAME = "X-TOKEN"
api_key_header_token = APIKeyHeader(name=API_KEY_HEADER_NAME, scheme_name="API Key Header", auto_error=True)

API_KEY_QUERY = "XQUERY"
API_KEY_QUERY_NAME = "X-QUERY"
api_key_query_token = APIKeyQuery(name=API_KEY_QUERY_NAME, scheme_name="API Key Query", auto_error=True)

# API_KEY_COOKIE = "XCOOKIE"
# API_KEY_COOKIE_NAME = "X-COOKIE"
# api_key_cookie_token = APIKeyCookie(name=API_KEY_COOKIE_NAME, scheme_name="API Key Cookie", auto_error=True)

async def __call__(self, request: Request,
api_key_header: str = Security(api_key_header_token),
api_key_query: str = Security(api_key_query_token),
# api_key_cookie: str = Security(api_key_cookie_token),
):
if api_key_header != self.API_KEY_HEADER:
raise HTTPException(status_code=401, detail="API Key Header Error")
if api_key_query != self.API_KEY_QUERY:
raise HTTPException(status_code=401, detail="API Key Query Error")
# if not api_key_cookie:
# raise HTTPException(status_code=401, detail="API Key Cookie中的Cookie没有值")
# if api_key_cookie != self.API_KEY_COOKIE:
# raise HTTPException(status_code=401, detail="API Key Cookie Error")

return True

apikeyauth = APIKey()

@router.get("/login2")
async def login2(request: Request, auth: bool = Depends(apikeyauth)):
if auth:
return PlainTextResponse(status_code=200, content="login success")
else:
return PlainTextResponse(status_code=401, content="login failed")

简单动态API Key

某些API需要做鉴权,但又不想集成数据库、OAuth2等机制搞得太复杂,可以用API Key。如果担心key泄露,还可以按照一定算法来生成key。只要算法没泄露,即便key泄露了后续也会失效。这里做个简单的demo,按照年月日时加字符串来生成key,字符串是固定的,时间是动态的,一个key过整点就失效。

from fastapi import Depends, Request, FastAPI
from fastapi.security import APIKeyHeader
from fastapi.responses import PlainTextResponse
from fastapi.params import Security
from fastapi.exceptions import HTTPException
import uvicorn
import hashlib
from datetime import datetime, timezone, timedelta

app = FastAPI()


def calculate_hash(src: str, alg: str = "md5"):
src_enc = src.encode("utf-8")
if alg == "sha256":
return hashlib.sha256(src_enc).hexdigest()
else:
return hashlib.md5(src_enc).hexdigest()


class APIKey:
API_KEY_HEADER_NAME = "X-TOKEN"
api_key_header_token = APIKeyHeader(
name=API_KEY_HEADER_NAME, scheme_name="API Key Header", auto_error=True
)

async def __call__(
self,
request: Request,
api_key_header: str = Security(api_key_header_token),
):
dt_ymdh = datetime.now(tz=timezone(timedelta(hours=8))).strftime("%Y%m%d%H")
# 先按md5算法, 再按sha256算法
secret = calculate_hash(calculate_hash(f"{dt_ymdh}_1234qwerASDF"), alg="sha256")
if api_key_header != secret:
raise HTTPException(status_code=401, detail="API Key Header Error")

return True


apikeyauth = APIKey()


@app.get("/login")
async def login(request: Request, auth: bool = Depends(apikeyauth)):
if auth:
return PlainTextResponse(status_code=200, content="login success")
else:
return PlainTextResponse(status_code=401, content="login failed")


if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)

测试

  1. 用命令行生成key
echo -en "$(date +%Y%m%d%H)_1234qwerASDF" | md5sum | awk '{printf $1}' | sha256sum | awk '{print $1}'
  1. 请求
curl 'http://127.0.0.1:8000/login' -H 'accept: application/json' -H 'X-TOKEN: f569b0c11f9ab85946a674eddbb4a111aaa52aab81d1815d26292612314256e7'

OAuth2

OAuth是一种开放协议,允许用户让第三方应用以安全且标准的方式获取该用户在某一网站、移动应用或者桌面应用上存储的受保护的资源(比如用户个人信息、照片、视频、联系人列表)。OAuth 2中的2表示版本,OAuth 2和OAuth 1不兼容。

注意,OAuth 2是授权协议,而非认证协议。认证是指对用户名和密码等用户身份信息进行验证;授权可在系统认证身份后判断用户可访问的资源范围。

常用的认证机制首先通过服务端办法token(令牌),然后客户端或第三方应用使用该令牌对指定资源进行显式或限范围访问。常见方案为OAuth 2 + JWT。

OAuth 2优点

  • 避免在授权给第三方应用的过程中泄露用户信息。
  • 结合token认证机制,可设置允许授权访问的第三方应用资源范围和token有效期。

JWT

JWT本质是一个字符串,但是这个字符串主要由Header、Payload、Signatue组成。3个部分在组合钱都需要经过加密算法处理,组合之后还需要进行序列化。

FastAPI官方推荐使用JOSE规范及进行JWT生成,对应的第三方库为python-jose

pip install python-jose

使用示例

from datetime import datetime, timedelta
from jose import jwt

SECRET_KEY = 'qwerasdf'
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_ACCESS_TOKEN_EXPIRE_MINUTES = 70

class TokenUtils:
@staticmethod
def token_encode(data: dict):
return jwt.encode(data, SECRET_KEY, algorithm=ALGORITHM)

@staticmethod
def token_decode(token: str):
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])

if __name__ == '__main__':
data = {
'iss': 'zhangsan', # issuer, token颁发者
'sub': '1', # subject, token的主题
'aud': 'lisi', # audience, token的接收者
'iat': datetime.utcnow(), # issued at, token创建时间
'nbf': datetime.utcnow(), # not before, token生效时间
'exp': datetime.utcnow() + timedelta(minutes=15), # 过期时间
'jti': '1234567890' # JWT ID
}
token = TokenUtils.token_encode(data)
print(token)
print(TokenUtils.token_decode(token))

OAuth 2参数说明

在授权处理处理过程中,OAuth 2规范需要带上指定配置参数,这些配置参数按类型可分为请求参数和响应参数。

请求参数

参数名描述
client_id分配给注册在授权服务器上的第三方应用的ID,通常在申请一些开放平台时由开放平台进行分配。该参数在URL地址中是必填参数。
client_secret分配给注册在授权服务器上的第三方应用的密钥。申请授权时是必填参数。
username表示参与授权的用户主体用户名,必选项
password表示参与授权的用户主体密码,必选项
response_type授权类型。该参数一般仅用于授权码模式,且此处的值固定为token,表示直接返回令牌。
grant_type授权方式。一般用于授权码模式和密码模式。
redirect_uri表示授权服务器在处理用户操作(比如确定或取消授权)之后重定向的客户端URL地址
scope表示此授权认证申请的权限范围,也就是允许访问的资源范围
state表示客户端当前的状态。客户端可以指定任意值,授权服务器会原封不动地返回这个值以帮助确认是哪个客户端发出的请求。

响应参数

参数名描述
code表示由授权服务器产生地临时随机字符码,它是第三方应用提交到授权服务器索取acces_token的凭据。这个码一般是临时码,且一般只能使用一次
access_token授权服务器返回客户端的访问令牌,这是第三方应用访问资源服务器的凭据
refresh_token主要用在access_token过期时,通过它可以刷新access_token令牌,实现access_token续期。一般在授权码模式中进行返回
token_type表示授权令牌access_token的类型。
expires_in表示授权令牌access_token的过期时间,单位为秒
scope权限范围,如果与客户端申请的范围一致,那么这个参数默认不返回,这个参数的值主要由需求决定
state如果客户端请求中包含这个参数,那么授权服务器会原样返回这个参数,用于客户端辨识是谁发出去的请求

OAuth 2主体角色

  • 资源所有者(Resource Owner):又称用户
  • 用户代理(User Agent):资源所有者在进行授权处理时用到的一些载体,这里所说的载体通常是指浏览器、APP等
  • 客户端(Client):指申请资源时所使用的应用程序。
  • 授权服务器(Authorization Server):指为资源所有者发放Access Token令牌的服务端。它主要负责办法token访问令牌给客户端
  • 资源服务器(Resource Server):指托管受保护资源的服务器

在中小型应用中,授权服务器和资源服务器通常在同一个服务内,当然少数请情况下也可以分布在不同的服务上。另外,一个授权服务器可以颁发多个资源服务器可接收的访问令牌。

在OAuth 2协议规范中,根据不同的参与角色,授权模式主要分为客户端模式(Client Credentials Grant)、密码模式(Password Grant)、授权码模式(Authorization Code Grant)、简化模式(Implicit Grant,又称隐私授权模式)等。

客户端模式

示例

from fastapi import APIRouter, Depends, Request, status, Query
from fastapi.exceptions import HTTPException
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security import OAuth2
from fastapi.security.utils import get_authorization_scheme_param
from jose import jwt, JWTError
from pydantic import ValidationError
from typing import Optional, Dict
from datetime import datetime, timedelta

router = APIRouter(prefix="/chapter9", tags=["安全认证"])

fake_client_db = {
"zhangsan": {
"client_id": "zhangsan",
"client_secret": "123456",
}
}

SECRET_KEY = "yJk48ijYXHVjSvgGosj6"
ALGORITHM = "HS256"

class TokenUtils:
@staticmethod
def token_encode(data: dict):
return jwt.encode(data, SECRET_KEY, algorithm=ALGORITHM)

@staticmethod
def token_decode(token: str):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Counld not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)
except (JWTError, ValidationError):
raise credentials_exception

return payload

class OAuth2ClientCredentialsBearer(OAuth2):
def __init__(
self,
tokenUrl: str,
scheme_name: Optional[str] = None,
scopes: Optional[Dict[str, str]] = None,
description: Optional[str] = None,
auto_error: bool = True,
):
if not scopes:
scopes = {}
flows = OAuthFlowsModel(
clientCredentials={
"tokenUrl": tokenUrl,
"scopes": scopes,
}
)
super().__init__(
flows=flows,
scheme_name=scheme_name,
description=description,
auto_error=auto_error,
)

async def __call__(self, request: Request) -> Optional[str]:
authorization: str = request.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
else:
return None
return param

oauth2_scheme = OAuth2ClientCredentialsBearer(tokenUrl="/chapter9/oauth2/authorize")

class OAuth2ClientCredentialsRequestForm:
def __init__(
self,
grant_type: str = Query(..., regex="client_credentials"),
scope: str = Query(""),
client_id: str = Query(...),
client_secret: str = Query(...),
username: Optional[str] = Query(None),
password: Optional[str] = Query(None),
):
self.grant_type = grant_type
self.scopes = scope.split()
self.client_id = client_id
self.client_secret = client_secret
self.username = username
self.password = password

@router.post("/oauth2/authorize", summary="请求授权URL地址")
async def authorize(client_data: OAuth2ClientCredentialsRequestForm = Depends()):
if not client_data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="请输入用户名及密码等信息",
)

if not client_data.client_id and not client_data.client_secret:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="请输入客户端ID及客户端密钥",
)
if client_data.client_id not in fake_client_db:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="客户端ID不存在",
)
clientinfo = fake_client_db.get(client_data.client_id)
if client_data.client_secret != clientinfo.get("client_secret"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="客户端密钥错误",
)

data = {
"iss": "client_id",
"sub": "dilibili",
"client_id": client_data.client_id,
"exp": datetime.utcnow() + timedelta(minutes=10),
}

token = TokenUtils.token_encode(data)
return {
"access_token": token,
"token_type": "bearer",
"expires_in": 600,
"scope": "all",
}


@router.get("/get/clientinfo", summary="获取客户端信息(受保护资源)")
async def get_clientinfo(token: str = Depends(oauth2_scheme)):
payload = TokenUtils.token_decode(token)
client_id = payload.get("client_id")
if client_id not in client_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Counld not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
clientinfo = fake_client_db.get(client_id)
return {
'client_id': clientinfo.get("client_id"),
'client_secret': clientinfo.get("client_secret")
}

测试

curl -s -X POST 'http://127.0.0.1:8000/chapter9/oauth2/authorize?grant_type=client_credentials&client_id=zhangsan&client_secret=123456&scope=all' | python -m json.tool

{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJjbGllbnRfaWQiLCJzdWIiOiJkaWxpYmlsaSIsImNsaWVudF9pZCI6InpoYW5nc2FuIiwiZXhwIjoxNzEwMTc1MzU4fQ.4FOww3rnfzbFHi8nhODsLrRutT0_0QHyPhMgeVBpxxM",
"token_type": "bearer",
"expires_in": 600,
"scope": "all"
}

# 2. 使用token访问
curl -X GET 'http://192.168.1.112:8000/chapter9/get/clientinfo' -H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJjbGllbnRfaWQiLCJzdWIiOiJkaWxpYmlsaSIsImNsaWVudF9pZCI6InpoYW5nc2FuIiwiZXhwIjoxNzEwMTc1MzU4fQ.4FOww3rnfzbFHi8nhODsLrRutT0_0QHyPhMgeVBpxxM'

{"client_id":"zhangsan","client_secret":"123456"}

参考