JWT -- JSON WEB TOKEN

JWT(JSON Web Tokens),是一种开发的行业标准,用于安全的表示双方之间的声明。目前,jwt广泛应用在系统的用户认证方面,特别是现在前后端分离项目。

1. JWT认证流程

jwt流程

传统token方式和jwt在认证方面有什么差异?

  • 传统token方式
1
2
用户登录成功后,服务端生成一个随机token给用户,并且在服务端(数据库或缓存)中保存一份token
以后用户再来访问时需携带token,服务端接收到token之后,去数据库或缓存中进行校验token的是否超时、是否合法。
  • jwt方式
1
2
用户登录成功后,服务端通过jwt生成一个随机token给用户(服务端无需保留token),
以后用户再来访问时需携带token,服务端接收到token之后,通过jwt对token进行校验是否超时、是否合法。

2. jwt创建token

2.1 原理

jwt的生成token格式如下,即:由 . 连接的三段字符串组成。

1
aaaaaaaaaaa.bbbbbbbbbb.ababababababab

生成规则如下:

  • 第一段HEADER部分,固定包含算法和token类型,对此json进行base64url加密,这就是token的第一段。
1
2
3
4
{
"alg": "HS256",
"typ": "JWT"
}
  • 第二段PAYLOAD,载荷部分,包含一些数据,对此json进行base64url加密,这就是token的第二段。
1
2
3
4
{
"name":"zhan",
"age":"18"
}
  • 第三段SIGNATURE,签证部分,把前两段的base密文通过.拼接起来,然后对其进行HS256加密,再然后对hs256密文进行base64url加密,最终得到token的第三段。
1
2
3
4
5
6
base64url(
HMACSHA256(
base64UrlEncode(header) + "." + base64UrlEncode(payload),
your-256-bit-secret (秘钥)
)
)

最后将三段字符串通过 .拼接起来就生成了jwt的token。

注意:base64url加密是先做base64加密,然后再将 - 替代 +_ 替代 /

加密过程的部分源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
jwt.encode()
def encode(self,
payload, # type: Union[Dict, bytes]
key, # type: str
algorithm='HS256', # type: str
headers=None, # type: Optional[Dict]
json_encoder=None # type: Optional[Callable]
):
segments = []
# Header
header = {'typ': self.header_typ, 'alg': algorithm}

segments.append(base64url_encode(json_header))
segments.append(base64url_encode(payload))
# Segments
signing_input = b'.'.join(segments)
try:
alg_obj = self._algorithms[algorithm]
key = alg_obj.prepare_key(key)
signature = alg_obj.sign(signing_input, key)

segments.append(base64url_encode(signature))
return b'.'.join(segments)

2.2 代码实现

基于Python的pyjwt模块创建jwt的token。

  • 安装
1
pip3 install pyjwt
  • 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import jwt
import datetime
from jwt import exceptions
SALT = 'iv%x6xo7l7_u9bf_u!9#g#m*)*=ej@bek5)(@u3kh*72+unjv='
def create_token():
# 构造header
headers = {
'typ': 'jwt',
'alg': 'HS256'
}
# 构造payload
payload = {
'user_id': 1, # 自定义用户ID
'username': 'wupeiqi', # 自定义用户名
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=5) # 超时时间
}
result = jwt.encode(payload=payload, key=SALT, algorithm="HS256", headers=headers).decode('utf-8')
return result
if __name__ == '__main__':
token = create_token()
print(token)

3. jwt校验token

一般在认证成功后,把jwt生成的token返回给用户,以后用户再次访问时候需要携带token,此时jwt需要对token进行超时合法性校验。

将token分割成 header_segmentpayload_segmentcrypto_segment 三部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
jwt_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"

signing_input, crypto_segment = jwt.rsplit(b'.', 1)
header_segment, payload_segment = signing_input.split(b'.', 1)

header_data = base64url_decode(header_segment)
payload = base64url_decode(payload_segment)
signature = base64url_decode(crypto_segment)

try:
alg_obj = self._algorithms[alg]
key = alg_obj.prepare_key(key)
if not alg_obj.verify(signing_input, key, signature):
raise InvalidSignatureError('Signature verification failed')

对第一部分header_segment进行base64url解密,得到header

对第二部分payload_segment进行base64url解密,得到payload

对第三部分crypto_segment进行base64url解密,得到signature

对第三部分signature部分数据进行合法性校验

  • 拼接前两段密文,即:signing_input
  • 从第一段明文中获取加密算法,默认:HS256
  • 使用 算法+密钥 对signing_input 进行加密,将得到的结果即签证和signature密文进行比较。

DEMO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import jwt
import datetime
from jwt import exceptions
def get_payload(token):
"""
根据token获取payload
:param token:
:return:
"""
try:
# 从token中获取payload【不校验合法性】
# unverified_payload = jwt.decode(token, None, False)
# print(unverified_payload)
# 从token中获取payload【校验合法性】
verified_payload = jwt.decode(token, SALT, True)
return verified_payload
except exceptions.ExpiredSignatureError:
print('token已失效')
except jwt.DecodeError:
print('token认证失败')
except jwt.InvalidTokenError:
print('非法的token')
if __name__ == '__main__':
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1NzM1NTU1NzksInVzZXJuYW1lIjoid3VwZWlxaSIsInVzZXJfaWQiOjF9.xj-7qSts6Yg5Ui55-aUOHJS4KSaeLq5weXMui2IIEJU"
payload = get_payload(token)

4. jwt实战

在用户登录成功之后,生成token并返回,用户再次来访问时需携带token。

此示例在django的中间件中对tokne进行校验,内部编写了两个中间件来支持用户通过两种方式传递token。

  • url传参
1
http://www.xxxxx.com?token=eyJhbGciOiJIUzI1N...
  • Authorization请求头
1
2
3
GET /something/ HTTP/1.1
Host: pythonav.com
Authorization: JWT eyJhbGciOiAiSFMyNTYiLCAidHlwIj

drf示例

view.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from rest_framework.views import APIView
from rest_framework.response import Response

from utils.jwt_auth import create_token
from extensions.auth import JwtQueryParamAuthentication, JwtAuthorizationAuthentication


class LoginView(APIView):
def post(self, request, *args, **kwargs):
""" 用户登录 """
user = request.POST.get('username')
pwd = request.POST.get('password')

# 检测用户和密码是否正确,此处可以在数据进行校验。
if user == 'zhan' and pwd == '123':
# 用户名和密码正确,给用户生成token并返回
token = create_token({'username': 'zhan'})
return Response({'status': True, 'token': token})
return Response({'status': False, 'error': '用户名或密码错误'})


class OrderView(APIView):
# 通过url传递token
authentication_classes = [JwtQueryParamAuthentication, ]

# 通过Authorization请求头传递token
# authentication_classes = [JwtAuthorizationAuthentication, ]

def get(self, request, *args, **kwargs):
print(request.user, request.auth)
return Response({'data': '订单列表'})

def post(self, request, *args, **kwargs):
print(request.user, request.auth)
return Response({'data': '添加订单'})

utils\jwt_auth.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import jwt
import datetime
from jwt import exceptions

JWT_SALT = 'iv%x6xo7l7_u9bf_u!9#g#m*)*=ej@bek5)(@u3kh*72+unjv='

def create_token(payload, timeout=20):
"""
:param payload: 例如:{'user_id':1,'username':'wupeiqi'}用户信息
:param timeout: token的过期时间,默认20分钟
:return:
"""
headers = {
'typ': 'jwt',
'alg': 'HS256'
}
payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(minutes=timeout)
result = jwt.encode(payload=payload, key=JWT_SALT, algorithm="HS256", headers=headers).decode('utf-8')
return result


def parse_payload(token):
"""
对token进行和发行校验并获取payload
:param token:
:return:
"""
result = {'status': False, 'data': None, 'error': None}
try:
verified_payload = jwt.decode(token, JWT_SALT, True)
result['status'] = True
result['data'] = verified_payload
except exceptions.ExpiredSignatureError:
result['error'] = 'token已失效'
except jwt.DecodeError:
result['error'] = 'token认证失败'
except jwt.InvalidTokenError:
result['error'] = '非法的token'
return result

extensions/auth.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from rest_framework.authentication import BaseAuthentication
from rest_framework import exceptions
from utils.jwt_auth import parse_payload


class JwtQueryParamAuthentication(BaseAuthentication):
"""
用户需要在url中通过参数进行传输token,例如:
http://www.pythonav.com?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1NzM1NTU1NzksInVzZXJuYW1lIjoid3VwZWlxaSIsInVzZXJfaWQiOjF9.xj-7qSts6Yg5Ui55-aUOHJS4KSaeLq5weXMui2IIEJU
"""

def authenticate(self, request):
token = request.query_params.get('token')
payload = parse_payload(token)
if not payload['status']:
raise exceptions.AuthenticationFailed(payload)

# 如果想要request.user等于用户对象,此处可以根据payload去数据库中获取用户对象。
return (payload, token)


class JwtAuthorizationAuthentication(BaseAuthentication):
"""
用户需要通过请求头的方式来进行传输token,例如:
Authorization:jwt eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1NzM1NTU1NzksInVzZXJuYW1lIjoid3VwZWlxaSIsInVzZXJfaWQiOjF9.xj-7qSts6Yg5Ui55-aUOHJS4KSaeLq5weXMui2IIEJU
"""

def authenticate(self, request):

# 非登录页面需要校验token
authorization = request.META.get('HTTP_AUTHORIZATION', '')
auth = authorization.split()
if not auth:
raise exceptions.AuthenticationFailed({'error': '未获取到Authorization请求头', 'status': False})
if auth[0].lower() != 'jwt':
raise exceptions.AuthenticationFailed({'error': 'Authorization请求头中认证方式错误', 'status': False})

if len(auth) == 1:
raise exceptions.AuthenticationFailed({'error': "非法Authorization请求头", 'status': False})
elif len(auth) > 2:
raise exceptions.AuthenticationFailed({'error': "非法Authorization请求头", 'status': False})

token = auth[1]
result = parse_payload(token)
if not result['status']:
raise exceptions.AuthenticationFailed(result)

# 如果想要request.user等于用户对象,此处可以根据payload去数据库中获取用户对象。
return (result, token)
都看到这里了,不赏点银子吗^v^