专业编程教程与实战项目分享平台

网站首页 > 技术文章 正文

JWT + Refresh + SSO模版(jwt demo)

ins518 2025-06-24 14:11:59 技术文章 2 ℃ 0 评论

1 功能目标

JWT 用于业务请求认证
Refresh Token 用于续期 JWT(长会话)
防重放:JWT 的 jti 存 Redis 防止重复使用
单点登录(Single Sign-On, SSO):一用户一个有效 refresh token,踢掉旧设备

2 数据结构设计

Redis Key 设计

auth:used_jti:{user_id}:{jti}           # 防重放
auth:refresh_token:{user_id}            # 当前 refresh token,有效期长
auth:sso_session:{user_id}              # 当前 sso 会话标记(可选,防多设备)

3 代码示例

jwt_utils.py

import jwt
import uuid
import time

SECRET_KEY = 'my_super_secret_key'
JWT_EXPIRE_SECONDS = 300  # 5分钟
REFRESH_EXPIRE_SECONDS = 86400 * 7  # 7天

def generate_jwt(user_id):
    now = int(time.time())
    payload = {
        'sub': str(user_id),
        'iat': now,
        'exp': now + JWT_EXPIRE_SECONDS,
        'jti': str(uuid.uuid4())
    }
    token = jwt.encode(payload, SECRET_KEY, algorithm='HS256')
    return token

def decode_jwt(token):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
        return payload
    except jwt.ExpiredSignatureError:
        raise ValueError("Token expired")
    except jwt.InvalidTokenError:
        raise ValueError("Invalid token")

refresh_token_utils.py

import uuid
import time
import redis

redis_client = redis.Redis(host='localhost', port=6379, db=0)

REFRESH_TOKEN_NAMESPACE = "auth:refresh_token"

def build_refresh_token_key(user_id):
    return f"{REFRESH_TOKEN_NAMESPACE}:{user_id}"

def generate_refresh_token(user_id):
    refresh_token = str(uuid.uuid4())
    redis_key = build_refresh_token_key(user_id)
    redis_client.setex(redis_key, 86400 * 7, refresh_token)  # 7天
    return refresh_token

def validate_refresh_token(user_id, refresh_token):
    redis_key = build_refresh_token_key(user_id)
    stored_token = redis_client.get(redis_key)
    if not stored_token:
        raise ValueError("No refresh token stored or expired")
    if stored_token.decode() != refresh_token:
        raise ValueError("Invalid refresh token")
    return True

replay_protection.py

import redis
import time

redis_client = redis.Redis(host='localhost', port=6379, db=0)

JTI_NAMESPACE = "auth:used_jti"

def build_jti_key(user_id, jti):
    return f"{JTI_NAMESPACE}:{user_id}:{jti}"

def check_and_store_jti(user_id, jti, exp_timestamp):
    redis_key = build_jti_key(user_id, jti)
    ttl = exp_timestamp - int(time.time())
    if ttl <= 0:
        raise ValueError("Token already expired")

    result = redis_client.set(redis_key, 1, nx=True, ex=ttl)

    if result is None:
        raise ValueError("Replay attack detected: token already used")
    else:
        return True

main.py

from auth.jwt_utils import generate_jwt, decode_jwt
from auth.replay_protection import check_and_store_jti

# 生成 token
token = generate_jwt(user_id='user_1234', ttl_seconds=300)
print("Generated token:", token)

# 客户端发请求,服务端处理验证:
try:
    payload = decode_jwt(token)
    user_id = payload['sub']
    jti = payload['jti']
    exp_timestamp = payload['exp']

    # 防重放检查
    check_and_store_jti(user_id, jti, exp_timestamp)

    # 通过,正常业务处理
    print("Token valid and first use, process business logic.")

except ValueError as e:
    print("Token verification failed:", str(e))

4 单点登录(SSO)

用户只能在一个设备登录(踢掉旧设备)

SSO_SESSION_NAMESPACE = "auth:sso_session"

def build_sso_key(user_id):
    return f"{SSO_SESSION_NAMESPACE}:{user_id}"

def store_sso_session(user_id, session_id):
    redis_key = build_sso_key(user_id)
    redis_client.setex(redis_key, 86400 * 7, session_id)

def validate_sso_session(user_id, session_id):
    redis_key = build_sso_key(user_id)
    stored_session_id = redis_client.get(redis_key)
    if not stored_session_id:
        raise ValueError("SSO session expired")
    if stored_session_id.decode() != session_id:
        raise ValueError("SSO session conflict (another device logged in)")
    return True

登录成功后,session_id 存 Redis,token 里带 session_id,每次校验对齐。

5 综合使用流程

登录接口

  • 验证账号密码
  • 生成 JWT、Refresh Token、SSO Session
  • 返回给客户端:
{
  "access_token": "JWT",
  "refresh_token": "refresh_token",
  "session_id": "session_id"
}

API 请求流程

  • 客户端带 JWT + session_id
  • 服务端:
payload = decode_jwt(token)
check_and_store_jti(payload['sub'], payload['jti'], payload['exp'])
validate_sso_session(payload['sub'], payload.get('session_id'))
# 通过后执行业务逻辑

Refresh Token 刷新流程

  • 客户端请求刷新 token,带 refresh_token
  • 服务端:
validate_refresh_token(user_id, refresh_token)
new_jwt = generate_jwt(user_id)

防重放 + SSO 设计总结

安全设计点

对应做法

JWT 防重放

jti + Redis

Refresh Token

Redis 存储 + 校验

SSO 防多设备登录

session_id + Redis 对比

Token 过期策略

JWT 短期 5-15 min,Refresh 长期 7-14 天

自动踢下线

新登录刷新 session_id

客户端什么时候刷新 token

分场景讲:

1 JWT 本质是短期 token

  • 例如 JWT_EXPIRE_SECONDS = 5分钟
  • 为什么?为了减少被盗用窗口,提升安全性
  • 所以需要 定期刷新

2 刷新 token 典型策略

策略 1:到期前自动刷新(最推荐)

  • 客户端维护 JWT 过期时间
  • 每次请求前判断:
    • 如果 token 将在 N 秒后过期(例如 30 秒),提前刷新
  • 优点:
    • 用户体验无感知,不会突然 token 失效
    • 减少请求过程中遇到 token 失效的错误

策略 2:接口 401 刷新

  • 如果请求 API 返回 401(Token Expired):
    • 自动用 Refresh Token 请求新 JWT
    • 重新请求原 API
  • 缺点:
    • 体验略差,用户可能看到轻微延迟
    • 但代码实现简单

策略 3:定时后台刷新

  • 客户端启动后 定时器轮询刷新 token
    • 例如每 3 分钟刷新一次 JWT
  • 优点:
    • 不依赖业务请求
  • 缺点:
    • 增加后台轮询请求,适合 App 场景,不适合 Web 前端

典型流程图

+------------------+              +-----------------------+
|     Client       |              |        Server         |
+------------------+              +-----------------------+
         |                                      |
         | --- Request with JWT --------------> |
         |                                      |
         | <--- 200 OK / 401 Expired -----------|
         |                                      |
 if JWT expiring soon or 401:
         | --- Refresh Token Request ---------> |
         |                                      |
         | <--- New JWT + Optional Refresh ---- |
         |                                      |
         | --- Re-send Original API ----------> |
         |                                      |

综合建议

Web 前端 (Vue/React/SPA)

  • 推荐 策略 1 + 2
    • 请求拦截器(axios interceptors)
    • 自动判断 JWT 是否快过期
    • 401 自动刷新重试

App 端 (iOS/Android)

  • 推荐 策略 1 + 3
    • App 启动后定时后台刷新 token
    • 网络请求同样提前判断 JWT 是否快过期

Websocket 场景

  • 初始连接时带上 JWT
  • 后端定时推送 "Token Expire Soon" 通知
  • 客户端收到后用 Refresh Token 主动换新 JWT + 重连 Websocket

场景

推荐策略组合

Web API 调用

策略 1 + 策略 2

App 长会话

策略 1 + 策略 3

Websocket 长连接

后端通知 + 主动刷新

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表