
JWT(JSON Web Tokens),是一种开发的行业标准,用于安全的表示双方之间的声明。目前,jwt广泛应用在系统的用户认证方面,特别是现在前后端分离项目。
1. JWT认证流程
传统token方式和jwt在认证方面有什么差异?
1 2
| 用户登录成功后,服务端生成一个随机token给用户,并且在服务端(数据库或缓存)中保存一份token 以后用户再来访问时需携带token,服务端接收到token之后,去数据库或缓存中进行校验token的是否超时、是否合法。
|
1 2
| 用户登录成功后,服务端通过jwt生成一个随机token给用户(服务端无需保留token), 以后用户再来访问时需携带token,服务端接收到token之后,通过jwt对token进行校验是否超时、是否合法。
|
2. jwt创建token
2.1 原理
jwt的生成token格式如下,即:由 .
连接的三段字符串组成。
1
| aaaaaaaaaaa.bbbbbbbbbb.ababababababab
|
生成规则如下:
- 第一段HEADER部分,固定包含算法和token类型,对此json进行base64url加密,这就是token的第一段。
1 2 3 4
| { "alg": "HS256", "typ": "JWT" }
|
- 第二段PAYLOAD,载荷部分,包含一些数据,对此json进行base64url加密,这就是token的第二段。
1 2 3 4
| { "name":"zhan", "age":"18" }
|
- 第三段SIGNATURE,签证部分,把前两段的base密文通过
.
拼接起来,然后对其进行HS256
加密,再然后对hs256
密文进行base64url加密,最终得到token的第三段。
1 2 3 4 5 6
| base64url( HMACSHA256( base64UrlEncode(header) + "." + base64UrlEncode(payload), your-256-bit-secret (秘钥) ) )
|
最后将三段字符串通过 .
拼接起来就生成了jwt的token。
注意:base64url加密是先做base64加密,然后再将 -
替代 +
及 _
替代 /
。
加密过程的部分源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| jwt.encode() def encode(self, payload, key, algorithm='HS256', headers=None, json_encoder=None ): segments = [] header = {'typ': self.header_typ, 'alg': algorithm} segments.append(base64url_encode(json_header)) segments.append(base64url_encode(payload)) signing_input = b'.'.join(segments) try: alg_obj = self._algorithms[algorithm] key = alg_obj.prepare_key(key) signature = alg_obj.sign(signing_input, key)
segments.append(base64url_encode(signature)) return b'.'.join(segments)
|
2.2 代码实现
基于Python的pyjwt模块创建jwt的token。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import jwt import datetime from jwt import exceptions SALT = 'iv%x6xo7l7_u9bf_u!9#g#m*)*=ej@bek5)(@u3kh*72+unjv=' def create_token(): headers = { 'typ': 'jwt', 'alg': 'HS256' } payload = { 'user_id': 1, 'username': 'wupeiqi', 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=5) } result = jwt.encode(payload=payload, key=SALT, algorithm="HS256", headers=headers).decode('utf-8') return result if __name__ == '__main__': token = create_token() print(token)
|
3. jwt校验token
一般在认证成功后,把jwt生成的token返回给用户,以后用户再次访问时候需要携带token,此时jwt需要对token进行超时
及合法性
校验。
将token分割成 header_segment
、payload_segment
、crypto_segment
三部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| jwt_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
signing_input, crypto_segment = jwt.rsplit(b'.', 1) header_segment, payload_segment = signing_input.split(b'.', 1)
header_data = base64url_decode(header_segment) payload = base64url_decode(payload_segment) signature = base64url_decode(crypto_segment)
try: alg_obj = self._algorithms[alg] key = alg_obj.prepare_key(key) if not alg_obj.verify(signing_input, key, signature): raise InvalidSignatureError('Signature verification failed')
|
对第一部分header_segment
进行base64url解密,得到header
对第二部分payload_segment
进行base64url解密,得到payload
对第三部分crypto_segment
进行base64url解密,得到signature
对第三部分signature
部分数据进行合法性校验
- 拼接前两段密文,即:
signing_input
- 从第一段明文中获取加密算法,默认:
HS256
- 使用 算法+密钥 对
signing_input
进行加密,将得到的结果即签证和signature
密文进行比较。
DEMO
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| import jwt import datetime from jwt import exceptions def get_payload(token): """ 根据token获取payload :param token: :return: """ try: verified_payload = jwt.decode(token, SALT, True) return verified_payload except exceptions.ExpiredSignatureError: print('token已失效') except jwt.DecodeError: print('token认证失败') except jwt.InvalidTokenError: print('非法的token') if __name__ == '__main__': token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1NzM1NTU1NzksInVzZXJuYW1lIjoid3VwZWlxaSIsInVzZXJfaWQiOjF9.xj-7qSts6Yg5Ui55-aUOHJS4KSaeLq5weXMui2IIEJU" payload = get_payload(token)
|
4. jwt实战
在用户登录成功之后,生成token并返回,用户再次来访问时需携带token。
此示例在django的中间件中对tokne进行校验,内部编写了两个中间件来支持用户通过两种方式传递token。
1
| http://www.xxxxx.com?token=eyJhbGciOiJIUzI1N...
|
1 2 3
| GET /something/ HTTP/1.1 Host: pythonav.com Authorization: JWT eyJhbGciOiAiSFMyNTYiLCAidHlwIj
|
drf示例
view.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| from rest_framework.views import APIView from rest_framework.response import Response
from utils.jwt_auth import create_token from extensions.auth import JwtQueryParamAuthentication, JwtAuthorizationAuthentication
class LoginView(APIView): def post(self, request, *args, **kwargs): """ 用户登录 """ user = request.POST.get('username') pwd = request.POST.get('password')
if user == 'zhan' and pwd == '123': token = create_token({'username': 'zhan'}) return Response({'status': True, 'token': token}) return Response({'status': False, 'error': '用户名或密码错误'})
class OrderView(APIView): authentication_classes = [JwtQueryParamAuthentication, ]
def get(self, request, *args, **kwargs): print(request.user, request.auth) return Response({'data': '订单列表'})
def post(self, request, *args, **kwargs): print(request.user, request.auth) return Response({'data': '添加订单'})
|
utils\jwt_auth.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
import jwt import datetime from jwt import exceptions
JWT_SALT = 'iv%x6xo7l7_u9bf_u!9#g#m*)*=ej@bek5)(@u3kh*72+unjv='
def create_token(payload, timeout=20): """ :param payload: 例如:{'user_id':1,'username':'wupeiqi'}用户信息 :param timeout: token的过期时间,默认20分钟 :return: """ headers = { 'typ': 'jwt', 'alg': 'HS256' } payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(minutes=timeout) result = jwt.encode(payload=payload, key=JWT_SALT, algorithm="HS256", headers=headers).decode('utf-8') return result
def parse_payload(token): """ 对token进行和发行校验并获取payload :param token: :return: """ result = {'status': False, 'data': None, 'error': None} try: verified_payload = jwt.decode(token, JWT_SALT, True) result['status'] = True result['data'] = verified_payload except exceptions.ExpiredSignatureError: result['error'] = 'token已失效' except jwt.DecodeError: result['error'] = 'token认证失败' except jwt.InvalidTokenError: result['error'] = '非法的token' return result
|
extensions/auth.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
|
from rest_framework.authentication import BaseAuthentication from rest_framework import exceptions from utils.jwt_auth import parse_payload
class JwtQueryParamAuthentication(BaseAuthentication): """ 用户需要在url中通过参数进行传输token,例如: http://www.pythonav.com?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1NzM1NTU1NzksInVzZXJuYW1lIjoid3VwZWlxaSIsInVzZXJfaWQiOjF9.xj-7qSts6Yg5Ui55-aUOHJS4KSaeLq5weXMui2IIEJU """
def authenticate(self, request): token = request.query_params.get('token') payload = parse_payload(token) if not payload['status']: raise exceptions.AuthenticationFailed(payload)
return (payload, token)
class JwtAuthorizationAuthentication(BaseAuthentication): """ 用户需要通过请求头的方式来进行传输token,例如: Authorization:jwt eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1NzM1NTU1NzksInVzZXJuYW1lIjoid3VwZWlxaSIsInVzZXJfaWQiOjF9.xj-7qSts6Yg5Ui55-aUOHJS4KSaeLq5weXMui2IIEJU """
def authenticate(self, request):
authorization = request.META.get('HTTP_AUTHORIZATION', '') auth = authorization.split() if not auth: raise exceptions.AuthenticationFailed({'error': '未获取到Authorization请求头', 'status': False}) if auth[0].lower() != 'jwt': raise exceptions.AuthenticationFailed({'error': 'Authorization请求头中认证方式错误', 'status': False})
if len(auth) == 1: raise exceptions.AuthenticationFailed({'error': "非法Authorization请求头", 'status': False}) elif len(auth) > 2: raise exceptions.AuthenticationFailed({'error': "非法Authorization请求头", 'status': False})
token = auth[1] result = parse_payload(token) if not result['status']: raise exceptions.AuthenticationFailed(result)
return (result, token)
|