Skip to content

嗅嗅笔记

python
import jwt
import datetime
from jwt import exceptions
from django.conf import settings
from django_redis import get_redis_connection


# def create_token(payload, timeout=settings.TOKEN_TIMEOUT):
#     """
#     payload: 用户要保存的字典数据
#     timeout:超时时间,可以在settings中进行修改
#     """
#     headers = {"typ": "jwt", "alg": "HS256"}
#     payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(seconds=timeout)
#     # 生成jwt token
#     result = jwt.encode(
#         payload=payload,
#         key=settings.SECRET_KEY.encode('utf8'),
#         algorithm='HS256',
#         headers=headers
#     )
#     return result

这个生成的token是唯一的,也就是如果用户在token未过期时,就重新访问登录,那么就会重新生成一个token,但原来的token还是可用的,这个是不应该的。

​ 为了解决这个问题,我选择将生成的jwt token根据手机号设置到Redis中,每次生成一个新的token就会把原来的覆盖掉。

python
import jwt
import datetime
from jwt import exceptions
from django.conf import settings
from django_redis import get_redis_connection

def create_token2(payload, timeout=settings.TOKEN_TIMEOUT):
    """
    payload: 用户要保存的字典数据
    timeout:超时时间,可以在settings中进行修改
    """
    headers = {"typ": "jwt", "alg": "HS256"}
    payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(seconds=timeout)
    result = jwt.encode(
        payload=payload,
        key=settings.SECRET_KEY.encode('utf8'),
        algorithm='HS256',
        headers=headers
    )
    # 生成的token值,根据手机号写入到Redis中
    conn = get_redis_connection("default")
    conn.set(f"jwt_token_{payload['mobile']}", result, ex=timeout)  # 过期时间也跟jwt token保持一致
    return result


def parse_payload(token):
    """
    根据传来的jwt token,进行校验,校验通过,则提取payload中的用户存储的数据
    """
    try:
        # 根据传来的token进行解密,拿到payload,这个payload中有手机号字段{'user_id': 1, 'name': '大和实业', 'mobile': '18211101742', 'auth_id': 1, 'exp': 1701319372}
        verified_payload = jwt.decode(token, settings.SECRET_KEY.encode('utf8'), algorithms=["HS256"])
        # 从Redis中拿到登录成功之后,写入的token值
        conn = get_redis_connection("default")
        jwt_token_mobile = conn.get(f"jwt_token_{verified_payload['mobile']}")
        # 如果用户这次请求携带的token和登录时生成的最新的token(已经存到Redis中了)不一致,表示用户携带的token肯定不是最新的
        # 然后我们手动raise,抛出token失效的错误信息,返回给客户端,客户端就可以根据这个错误信息让其跳转到登录页面从新登录,进而从新获取新的token
        if jwt_token_mobile != token:
            raise exceptions.ExpiredSignatureError
        # 如果token校验通过,就返回一个True,另外再把用户字典给返回
        return True, verified_payload
    except exceptions.ExpiredSignatureError:
        error = "token已失效"
    except jwt.DecodeError:
        error = "token认证失败"
    except jwt.InvalidTokenError:
        error = "非法的token"
    # 如果token校验失败,就返回False,并且将具体的校验失败原因返回
    return False, error

Settings.py

python
# jwt认证相关配置项
TOKEN_TIMEOUT = 3600*24

我改良的

python
import jwt
import datetime
from jwt import exceptions
from django.conf import settings
from django_redis import get_redis_connection


def create_token(payload, timeout=settings.TOKEN_TIMEOUT):
    """
    payload: 用户要保存的字典数据
    timeout:超时时间,可以在settings中进行修改
    """
    headers = {"typ": "jwt", "alg": "HS256"}
    payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(seconds=timeout)
    token = jwt.encode(
        payload=payload,
        key=settings.SECRET_KEY.encode('utf8'),
        algorithm='HS256',
        headers=headers
    )
    token = token.decode('utf-8')
    # 生成的token值,根据手机号写入到Redis中
    conn = get_redis_connection("default")
    conn.set(f"jwt_token_{payload['id']}", token, ex=timeout)  # 过期时间也跟jwt token保持一致
    return [token, payload]


def parse_payload(token):
    """
    根据传来的jwt token,进行校验,校验通过,则提取payload中的用户存储的数据
    """
    try:
        # 根据传来的token进行解密,拿到payload,这个payload中有手机号字段{'user_id': 1, 'name': '大和实业', 'mobile': '18211101742', 'auth_id': 1, 'exp': 1701319372}
        verified_payload = jwt.decode(token, settings.SECRET_KEY.encode('utf8'), algorithms=["HS256"])
        # 从Redis中拿到登录成功之后,写入的token值
        conn = get_redis_connection("default")
        jwt_token_id = conn.get(f"jwt_token_{verified_payload['id']}")
        jwt_token_id = jwt_token_id.decode('utf-8')
        # 如果用户这次请求携带的token和登录时生成的最新的token(已经存到Redis中了)不一致,表示用户携带的token肯定不是最新的
        # 然后我们手动raise,抛出token失效的错误信息,返回给客户端,客户端就可以根据这个错误信息让其跳转到登录页面从新登录,进而从新获取新的token
        if jwt_token_id != token:
            raise exceptions.ExpiredSignatureError
        # 如果token校验通过,就返回一个True,另外再把用户字典给返回
        return True, verified_payload
    except exceptions.ExpiredSignatureError:
        error = "token已失效"
    except jwt.DecodeError:
        error = "token认证失败"
    except jwt.InvalidTokenError:
        error = "非法的token"
    # 如果token校验失败,就返回False,并且将具体的校验失败原因返回
    return False, error