分页
在查看数据列表的API中,如果 数据量 比较大,肯定不能把所有的数据都展示给用户,而需要通过分页展示。
在drf中为我们提供了一些分页先关类:
BasePagination,分页基类
PageNumberPagination(BasePagination) 支持 /accounts/?page=4&page_size=100 格式的分页
LimitOffsetPagination(BasePagination) 支持 ?offset=100&limit=10 格式的分页
CursorPagination(BasePagination) 支持 上一下 & 下一页 格式的分页(不常用)
1. 继承APIView视图时:
如果编写视图是直接继承APIView,那么在使用分页时,就必须自己手动 实例化 和 调用相关方法。
1.PageNumberPagination
继承APIView使用时,如上图:
python
from rest_framework.pagination import PageNumberPagination
class UserView(APIView):
def get(self, request, *args, **kwargs):
queryset = UserInfo.objects.all()
pager = PageNumberPagination()
pagination_queryset = pager.paginate_queryset(queryset, request, self)
serializer = UserInfoSerializer(instance=pagination_queryset, many=True)
return Response(serializer.data)
或定义size等参数:如上图
python
from rest_framework.pagination import PageNumberPagination
class MyPageNumberPagination(PageNumberPagination):
page_size_query_param = 'size' # 定义的搜索关键词
page_size = 2 # 每页默认数量
max_page_size = 20 # 每页允许的最大数量
class UserView(APIView):
def get(self, request, *args, **kwargs):
queryset = UserInfo.objects.all().order_by('id')
pager = MyPageNumberPagination()
pager_queryset = pager.paginate_queryset(queryset, request, self)
serializer = UserInfoSerializer(instance=pager_queryset, many=True)
return Response(serializer.data)
2.LimitOffsetPagination
python
from rest_framework.pagination import LimitOffsetPagination
class UserView(APIView):
def get(self, request, *args, **kwargs):
queryset = UserInfo.objects.all().order_by('id')
pager = LimitOffsetPagination()
pager_queryset = pager.paginate_queryset(queryset)
serializer = UserInfoSerializer(instance=pager_queryset, many=True)
return Response(serializer.data)
3.CursorPagination
python
from rest_framework.pagination import CursorPagination
class MyCursorPagination(CursorPagination):
ordering = 'id' # 先排序后分页,排序关键词
page_size_query_param = 'size' # 每页页数关键词
page_size = 2 # 默认的每页页数
max_page_size = 20 # 每页允许的最大页数
class UserInfoView(APIView):
def get(self, request, *args, **kwargs):
queryset = UserInfo.objects.all().order_by('id')
pager = MyCursorPagination()
pager_queryset = pager.paginate_queryset(queryset)
serializer = UserInfoSerializer(instance=pager_queryset, many=True)
return Response(serializer.data)
2. 继承GenericAPIView派生类时
如果是使用 ListModelMixin
或 ModelViewSet
,则只需要配置相关类即可,内部会自动执行相关分页的方法。
1.PageNumberPagination
python
from rest_framework.pagination import PageNumberPagination
class MyPageNumberPagination(PageNumberPagination):
page_size = 2
page_size_query_param = 'size'
page_query_param = 'page'
max_page_size = 20
class UserInfoView(ListCreateAPIView):
serializer_class = UserInfoSerializer
queryset = UserInfo.objects.all()
pagination_class = MyPageNumberPagination
2.LimitOffsetPagination
python
from rest_framework.pagination import LimitOffsetPagination
class MyLimitOffsetPagination(LimitOffsetPagination):
default_limit = 2
max_limit = 20
class UserInfoView(ListCreateAPIView):
serializer_class = UserInfoSerializer
queryset = UserInfo.objects.all()
pagination_class = MyLimitOffsetPagination
3.CursorPagination
python
from rest_framework.pagination import CursorPagination
class MyCursorPagination(CursorPagination):
ordering = 'id'
page_size_query_param = 'size'
page_size = 2
max_page_size = 20
class UserInfoView(ListCreateAPIView):
serializer_class = UserInfoSerializer
queryset = UserInfo.objects.all()
pagination_class = MyCursorPagination
案例:分页,加载更多的实现
需要前端传来最后一条数据的latest_id和每页几条数据:limit。
python
from django_filters import FilterSet, filters
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.pagination import LimitOffsetPagination
from rest_framework.generics import ListCreateAPIView,RetrieveUpdateDestroyAPIView
from api import models
from rest_framework.viewsets import ViewSet
from api.serializers.topic import TopicSerializer
from api.utils.filters import SelfFilterBackend
class MyLimitOffsetPagination(LimitOffsetPagination): #负责筛选每页几条数据
default_limit = 2
max_limit = 20
offset_query_param = None
class TopicFilterSet(FilterSet): #筛选所有id<latest_id的数据
# ?latest_id=99 -> id<99
# ?latest_id=99&limit=10 -> id<99 limit 10
latest_id = filters.NumberFilter(field_name='id', lookup_expr='lt')
class Meta:
model = models.Topic
fields = ["latest_id", ]
class SelfFilterBackend(BaseFilterBackend): #筛选用户
def filter_queryset(self, request, queryset, view):
return queryset.filter(user=request.user)
class TopicView(ListCreateAPIView,RetrieveUpdateDestroyAPIView,ViewSet):
""" 主题 """
# 当前登录用户的调教
filter_backends = [SelfFilterBackend, DjangoFilterBackend] #DjangoFilterBackend是django需要的类,不能省略
filterset_class = TopicFilterSet
pagination_class = MyLimitOffsetPagination
queryset = models.Topic.objects.filter(deleted=False).order_by('-id')
serializer_class = TopicSerializer
def perform_create(self, serializer):
serializer.save(user=self.request.user)
def perform_destroy(self, instance):
instance.deleted = True
instance.save()
路由
在之前进行drf开发时,对于路由我们一般进行两种配置:
视图继承APIView
pythonfrom django.urls import path from app01 import views urlpatterns = [ path('api/users/', views.UserView.as_view()), ]
视图继承
ViewSetMixin
(GenericViewSet、ModelViewSet)pythonfrom django.urls import path, re_path, include from app01 import views urlpatterns = [ path('api/users/', views.UserView.as_view({"get":"list","post":"create"})), path('api/users/<int:pk>/', views.UserView.as_view({"get":"retrieve","put":"update","patch":"partial_update","delete":"destory"})), ]
对于这种形式的路由,drf中提供了更简便的方式:
pythonfrom rest_framework import routers from app01 import views router = routers.SimpleRouter() router.register(r'api/users', views.UserView) urlpatterns = [ # 其他URL # path('xxxx/', xxxx.as_view()), ] urlpatterns += router.urls
也可以利用include,给URL加前缀:
pythonfrom django.urls import path, include from rest_framework import routers from app01 import views router = routers.SimpleRouter() router.register(r'users', views.UserView) urlpatterns = [ path('api/', include((router.urls, 'app_name'), namespace='instance_name')), # 其他URL # path('forgot-password/', ForgotPasswordFormView.as_view()), ]
解析器
之前使用 request.data
获取请求体中的数据。
这个 reqeust.data
的数据怎么来的呢?其实在drf内部是由解析器,根据请求者传入的数据格式 + 请求头来进行处理。
1.JSONParser
2.FormParser
3.MultiPartParser
html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<form action="http://127.0.0.1:8000/test/" method="post" enctype="multipart/form-data">
<input type="text" name="user" />
<input type="file" name="img">
<input type="submit" value="提交">
</form>
</body>
</html>
4.FileUploadParser
解析器可以设置多个,默认解析器:
python
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.parsers import MultiPartParser, JSONParser, FormParser
class UserView(APIView):
def post(self, request):
print(request.content_type)
print(request.data)
return Response("...")
drf实战案例
1. 需求
请结合上述学习的drf知识开发 简易版《抽屉新热榜》。其中包含的功能如下:
注册
输入:手机号、用户名、密码、确认密码。
登录
输入:手机号 或 用户名 + 密码 注意:登录成功后给用户返回token,后续请求需要在url中携带token(有效期2周)
我的话题
- 我的话题列表 - 创建话题 - 修改话题 - 删除话题(逻辑删除)
我的资讯
- 创建资讯(5分钟创建一个,需要根据用户限流) 问题1:5/h 2/m; 问题2:成功后,下次再创建; - 文本(你问我答、42区、挨踢1024、段子) - 图片(图片、你问我答、42区、挨踢1024、段子) - 连接(图片、你问我答、42区、挨踢1024、段子) 注意:创建时默认自己做1个推荐。 - 我的资讯列表
首页
- 资讯首页 - 时间倒序,读取已审核通过的资讯 - 加载更多,分页处理 - 支持传入参数,查询各分区资讯:图片、你问我答、42区、挨踢1024、段子 ?zone=2
推荐
- 推荐 - 取消推荐 - 我的推荐列表
收藏
python- 收藏 or 取消收藏 - 我的收藏列表
评论
python- 查看评论列表 - 根据【后代的更新时间】从大到小排序,读取根评论,每次读20条。 - 读取根评论先关的子评论。 - 将子评论挂靠到跟评论上,最终形成父子关系通过JSON返回给前端。 注意:自己也可以通过depth实现逐步读取子评论(此处不这样操作) - 创建评论 - 判断是根评论 or 回复 - 回复时,深度+1 - 评论后,找到根评论去更新【后代的更新时间】
2. 参考表结构
表结构参考:
python
from django.db import models
class DeletedModel(models.Model):
deleted = models.BooleanField(verbose_name="已删除", default=False)
class Meta:
abstract = True
class UserInfo(DeletedModel):
""" 用户表 """
username = models.CharField(verbose_name="用户名", max_length=32)
phone = models.CharField(verbose_name="手机号", max_length=32, db_index=True)
password = models.CharField(verbose_name="密码", max_length=64)
token = models.CharField(verbose_name="token", max_length=64, null=True, blank=True, db_index=True)
token_expiry_date = models.DateTimeField(verbose_name="token有效期", null=True, blank=True)
status_choice = (
(1, "激活"),
(2, "禁用"),
)
status = models.IntegerField(verbose_name="状态", choices=status_choice, default=1)
create_datetime = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)
class Meta:
# The newer indexes option provides more functionality than index_together
# index_together may be deprecated in the future.
# https://docs.djangoproject.com/en/3.2/ref/models/options/#index-together
indexes = [
models.Index(fields=['username', "password"], name='idx_name_pwd')
]
class Topic(DeletedModel):
""" 话题 """
title = models.CharField(verbose_name="话题", max_length=16, db_index=True)
is_hot = models.BooleanField(verbose_name="热门话题", default=False)
user = models.ForeignKey(verbose_name="用户", to="UserInfo", on_delete=models.CASCADE)
create_datetime = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)
class News(DeletedModel):
""" 新闻资讯 """
zone_choices = ((1, "42区"), (2, "段子"), (3, "图片"), (4, "挨踢1024"), (5, "你问我答"))
zone = models.IntegerField(verbose_name="专区", choices=zone_choices)
title = models.CharField(verbose_name="文字", max_length=150)
url = models.CharField(verbose_name="链接", max_length=200, null=True, blank=True)
# xxxxx?xxxxxx.png,xxxxxxxx.jeg
image = models.TextField(verbose_name="图片地址", help_text="逗号分割", null=True, blank=True)
topic = models.ForeignKey(verbose_name="话题", to="Topic", on_delete=models.CASCADE, null=True, blank=True)
user = models.ForeignKey(verbose_name="用户", to="UserInfo", on_delete=models.CASCADE)
create_datetime = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)
status_choice = (
(1, "待审核"),
(2, "已通过"),
(3, "未通过"),
)
status = models.IntegerField(verbose_name="状态", choices=status_choice, default=1)
collect_count = models.IntegerField(verbose_name="收藏数", default=0)
recommend_count = models.IntegerField(verbose_name="推荐数", default=0)
comment_count = models.IntegerField(verbose_name="评论数", default=0)
class Collect(models.Model):
""" 收藏 """
news = models.ForeignKey(verbose_name="资讯", to="News", on_delete=models.CASCADE)
user = models.ForeignKey(verbose_name="用户", to="UserInfo", on_delete=models.CASCADE)
create_datetime = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)
class Meta:
# unique_together = [['news', 'user']]
constraints = [
models.UniqueConstraint(fields=['news', 'user'], name='uni_collect_news_user')
]
class Recommend(models.Model):
""" 推荐 """
news = models.ForeignKey(verbose_name="资讯", to="News", on_delete=models.CASCADE)
user = models.ForeignKey(verbose_name="用户", to="UserInfo", on_delete=models.CASCADE)
create_datetime = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)
class Meta:
constraints = [
models.UniqueConstraint(fields=['news', 'user'], name='uni_recommend_news_user')
]
class Comment(models.Model):
""" 评论表 """
news = models.ForeignKey(verbose_name="资讯", to="News", on_delete=models.CASCADE)
user = models.ForeignKey(verbose_name="用户", to="UserInfo", on_delete=models.CASCADE)
content = models.CharField(verbose_name="内容", max_length=150)
depth = models.IntegerField(verbose_name="深度", default=0)
root = models.ForeignKey(verbose_name="根评论", to="Comment", related_name="descendant", on_delete=models.CASCADE,
null=True, blank=True)
reply = models.ForeignKey(verbose_name="回复", to="Comment", related_name="reply_list", on_delete=models.CASCADE,
null=True, blank=True)
create_datetime = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)
# 针对根评论
descendant_update_datetime = models.DateTimeField(verbose_name="后代更新时间", auto_now_add=True)
3. 案例讲解
3.1 环境准备
基于django创建项目,例如:dig
安装必备模块
django-filter==2.4.0 django-redis==5.0.0 djangorestframework==3.12.4
创建app,例如:api
注册app
pythonINSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'rest_framework', 'django_filters', 'api.apps.ApiConfig' ]
拷贝表结构到 models.py 并 生成数据库
3.2 接口开发
1.注册
2.登陆
3.我的话题增删改查
4.自定义限流类,限流频率为1/5m,并解决访问不成功时的限流问题
Serializer.py
python
from rest_framework import serializers
from api.models import UserInfo, Topic, News, Recommend
from rest_framework import exceptions
import re
class RegisterSerializer(serializers.ModelSerializer):
re_password = serializers.CharField(max_length=32, allow_null=False, write_only=True)
password = serializers.CharField(max_length=32, allow_null=False, write_only=True)
class Meta:
model = UserInfo
fields = ['username', 'phone', 'password', 're_password']
def validate_phone(self, value):
is_exist = UserInfo.objects.filter(phone=value).exists()
if is_exist:
raise exceptions.ValidationError('手机号已注册,请直接登陆')
if not re.match(r'^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\d{8}$', value):
raise exceptions.ValidationError('手机号格式错误')
return value
def validate_password(self, value):
print(self.initial_data, 'init')
re_password = self.initial_data.get('re_password')
if re_password != value:
raise exceptions.ValidationError('密码不一致')
return value
class LoginSerializer(serializers.ModelSerializer):
class Meta:
model = UserInfo
fields = ['username', 'phone', 'password']
extra_kwargs = {
'token': {'write_only': True},
'token_expiry_date': {'write_only': True},
'username': {'required': False},
'phone': {'required': False},
'password': {'required': True},
}
def validate(self, attrs):
username = attrs.get('username', None)
phone = attrs.get('phone', None)
if not username and not phone:
raise exceptions.ValidationError('用户名或手机号为空')
return attrs
class TopicSerializer(serializers.ModelSerializer):
class Meta:
model = Topic
fields = ['id', 'title', 'is_hot']
class NewsSerializer(serializers.ModelSerializer):
image_list = serializers.SerializerMethodField(read_only=True)
topic_title = serializers.CharField(source="topic.title", read_only=True)
zone_title = serializers.CharField(source="get_zone_display", read_only=True)
status = serializers.CharField(source="get_status_display", read_only=True)
# title、url、image、'topic', "zone"
# - 只有title,只创建文本 + 分区不能是图片
# - 有title,image,
# - 有title,url
class Meta:
model = News
fields = ['id', "title", "url",
'image', 'topic', "zone",
"zone_title", 'image_list', "topic_title", 'collect_count', 'recommend_count', 'comment_count',
"status"]
read_only_fields = ['collect_count', 'recommend_count', 'comment_count']
extra_kwargs = {
'topic': {'write_only': True}, # 新增时,topic=1
'image': {'write_only': True}, # 图片地址 xxxx,xxxx,xxxx
'zone': {'write_only': True},
}
def get_image_list(self, obj):
if not obj.image:
return []
return obj.image.split(',')
def validate_topic(self, value):
if not value:
return value
request = self.context['request']
exists = Topic.objects.filter(deleted=False, id=value.id, user=request.user).exists()
if not exists:
raise exceptions.ValidationError("话题不存在")
return value
def validate_title(self, value):
url = self.initial_data.get('url')
image = self.initial_data.get('image')
zone = self.initial_data.get('zone')
if url and image:
raise exceptions.ValidationError("请求数据错误")
if not url and not image:
if zone == 3:
raise exceptions.ValidationError("分区选择错误")
return value
def create(self, validated_data):
request = self.context["request"]
# 1.创建新闻资讯
new_object = News.objects.create(recommend_count=1, **validated_data)
# 2.推荐记录
Recommend.objects.create(
news=new_object,
user=request.user
)
return new_object
def create(self, validated_data):
news_obj = News.objects.create(**validated_data, recommend_count=1)
Recommend.objects.create(
news=news_obj,
user=self.context["request"].user,
)
return news_obj
Views.py
python
from rest_framework.generics import CreateAPIView
from rest_framework.views import APIView
from api.models import UserInfo
from api.serialisers import RegisterSerializer, LoginSerializer
from api.utils import encrypt
from django.db.models import Q
from rest_framework.response import Response
from api import ResponseCode
import random
import datetime
class RegisterView(CreateAPIView):
serializer_class = RegisterSerializer
queryset = UserInfo.objects.filter(deleted=False, status=1)
def perform_create(self, serializer):
serializer.validated_data.pop('re_password')
password = serializer.validated_data.pop('password')
password = encrypt.Mymd5(password)
serializer.validated_data['password'] = password
token = random.randint(1000, 9999)
token_expiry_date = datetime.datetime.now() + datetime.timedelta(weeks=2)
serializer.validated_data['token_expiry_date'] = token_expiry_date
serializer.validated_data['token'] = token
serializer.save()
class LoginView(CreateAPIView):
queryset = UserInfo.objects.filter(deleted=False, status=1)
serializer_class = LoginSerializer
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
# print(111, serializer.validated_data)
username = serializer.validated_data.get('username', None)
password = serializer.validated_data.get('password', None)
password = encrypt.Mymd5(password)
phone = serializer.validated_data.get('phone', None)
user_obj = UserInfo.objects.filter(Q(Q(username=username) | Q(phone=phone)), password=password).first()
if not user_obj:
return Response({'code': ResponseCode.LOGIN_FAILURE, 'detail': '用户名或密码错误'})
token = random.randint(1000, 9999)
token_expiry_date = datetime.datetime.now() + datetime.timedelta(weeks=2)
user_obj.token_expiry_date = token_expiry_date
user_obj.token = token
# print(222, serializer.validated_data)
return Response({'code': ResponseCode.LOGIN_SUCCESS, 'detail': '登陆成功'})
settings.py
python
TIME_ZONE = 'Asia/Shanghai'
USE_TZ = False
Urls.py
python
from django.urls import path
from api.views import RegisterView, LoginView
urlpatterns = [
path('register/', RegisterView.as_view(), name='register'),
path('login/', LoginView.as_view(), name='login')
]
throttle.py
python
from rest_framework.throttling import SimpleRateThrottle
from django.core.cache import cache as default_cache
from api.utils import return_code
from rest_framework import exceptions
from rest_framework import status
class ThrottledException(exceptions.APIException):
status_code = status.HTTP_429_TOO_MANY_REQUESTS
default_code = 'throttled'
class MySimpleRateThrottle(SimpleRateThrottle):
cache = default_cache # 访问记录存放在django的缓存中(需设置缓存)
scope = "user" # 构造缓存中的key
cache_format = 'throttle_%(scope)s_%(ident)s'
# 设置访问频率,例如:1/5m
THROTTLE_RATES = {"user": "1/5m"}
def parse_rate(self, rate):
"""
Given the request rate string, return a two tuple of:
<allowed number of requests>, <period of time in seconds>
"""
if rate is None:
return (None, None)
num, period = rate.split('/')
num_requests = int(num)
duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[-1]]
count = int(period[0])
return (num_requests, duration * count)
def get_cache_key(self, request, view):
ident = request.user.pk
return self.cache_format % {'scope': self.scope, 'ident': ident}
def throttle_success(self):
return True
def throttle_failure(self):
wait = self.wait()
detail = {
"code": return_code.TOO_MANY_REQUESTS,
"data": "访问频率限制",
'detail': "需等待{}秒后才能访问".format(int(wait))
}
raise ThrottledException(detail) # 自定义的exception,在上面
def done(self):
self.history.insert(0, self.now)
self.cache.set(self.key, self.history, self.duration)