Skip to content

分页

在查看数据列表的API中,如果 数据量 比较大,肯定不能把所有的数据都展示给用户,而需要通过分页展示。

在drf中为我们提供了一些分页先关类:

BasePagination,分页基类
PageNumberPagination(BasePagination)	支持 /accounts/?page=4&page_size=100 格式的分页
LimitOffsetPagination(BasePagination)	支持 ?offset=100&limit=10 格式的分页
CursorPagination(BasePagination)		支持 上一下 & 下一页 格式的分页(不常用)

1. 继承APIView视图时:

如果编写视图是直接继承APIView,那么在使用分页时,就必须自己手动 实例化 和 调用相关方法。

1.PageNumberPagination

image-20210826165642846

继承APIView使用时,如上图:

python
from rest_framework.pagination import PageNumberPagination

class UserView(APIView):
    def get(self, request, *args, **kwargs):
        queryset = UserInfo.objects.all()
        pager = PageNumberPagination()
        pagination_queryset = pager.paginate_queryset(queryset, request, self)
        serializer = UserInfoSerializer(instance=pagination_queryset, many=True)
        return Response(serializer.data)

image-20210826165918075

或定义size等参数:如上图

python
from rest_framework.pagination import PageNumberPagination

class MyPageNumberPagination(PageNumberPagination):
    page_size_query_param = 'size'  # 定义的搜索关键词
    page_size = 2  # 每页默认数量
    max_page_size = 20  # 每页允许的最大数量

class UserView(APIView):
    def get(self, request, *args, **kwargs):
        queryset = UserInfo.objects.all().order_by('id')
        pager = MyPageNumberPagination()
        pager_queryset = pager.paginate_queryset(queryset, request, self)
        serializer = UserInfoSerializer(instance=pager_queryset, many=True)
        return Response(serializer.data)
2.LimitOffsetPagination

image-20210826170617347

python
from rest_framework.pagination import LimitOffsetPagination

class UserView(APIView):
    def get(self, request, *args, **kwargs):
        queryset = UserInfo.objects.all().order_by('id')

        pager = LimitOffsetPagination()
        pager_queryset = pager.paginate_queryset(queryset)

        serializer = UserInfoSerializer(instance=pager_queryset, many=True)
        return Response(serializer.data)
3.CursorPagination

image-20240303135236651

python
from rest_framework.pagination import CursorPagination


class MyCursorPagination(CursorPagination):
    ordering = 'id'  # 先排序后分页,排序关键词
    page_size_query_param = 'size'  # 每页页数关键词
    page_size = 2  # 默认的每页页数
    max_page_size = 20  # 每页允许的最大页数


class UserInfoView(APIView):
    def get(self, request, *args, **kwargs):
        queryset = UserInfo.objects.all().order_by('id')
        pager = MyCursorPagination()
        pager_queryset = pager.paginate_queryset(queryset)
        serializer = UserInfoSerializer(instance=pager_queryset, many=True)
        return Response(serializer.data)

2. 继承GenericAPIView派生类时

如果是使用 ListModelMixinModelViewSet ,则只需要配置相关类即可,内部会自动执行相关分页的方法。

1.PageNumberPagination

image-20240303140206096

python
from rest_framework.pagination import PageNumberPagination


class MyPageNumberPagination(PageNumberPagination):
    page_size = 2
    page_size_query_param = 'size'
    page_query_param = 'page'
    max_page_size = 20


class UserInfoView(ListCreateAPIView):
    serializer_class = UserInfoSerializer
    queryset = UserInfo.objects.all()
    pagination_class = MyPageNumberPagination
2.LimitOffsetPagination

image-20240303140219985

python
from rest_framework.pagination import LimitOffsetPagination


class MyLimitOffsetPagination(LimitOffsetPagination):
    default_limit = 2
    max_limit = 20


class UserInfoView(ListCreateAPIView):
    serializer_class = UserInfoSerializer
    queryset = UserInfo.objects.all()
    pagination_class = MyLimitOffsetPagination
3.CursorPagination

image-20240303140332882

python
from rest_framework.pagination import CursorPagination


class MyCursorPagination(CursorPagination):
    ordering = 'id'
    page_size_query_param = 'size'
    page_size = 2
    max_page_size = 20


class UserInfoView(ListCreateAPIView):
    serializer_class = UserInfoSerializer
    queryset = UserInfo.objects.all()
    pagination_class = MyCursorPagination

案例:分页,加载更多的实现

需要前端传来最后一条数据的latest_id和每页几条数据:limit。

python
from django_filters import FilterSet, filters
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.pagination import LimitOffsetPagination
from rest_framework.generics import ListCreateAPIView,RetrieveUpdateDestroyAPIView
from api import models
from rest_framework.viewsets import ViewSet
from api.serializers.topic import TopicSerializer
from api.utils.filters import SelfFilterBackend

class MyLimitOffsetPagination(LimitOffsetPagination): #负责筛选每页几条数据
    default_limit = 2
    max_limit = 20
    offset_query_param = None


class TopicFilterSet(FilterSet):              #筛选所有id<latest_id的数据
    # ?latest_id=99             ->  id<99
    # ?latest_id=99&limit=10    ->  id<99  limit 10
    latest_id = filters.NumberFilter(field_name='id', lookup_expr='lt')

    class Meta:
        model = models.Topic
        fields = ["latest_id", ]
        
        
class SelfFilterBackend(BaseFilterBackend):       #筛选用户
    def filter_queryset(self, request, queryset, view):
        return queryset.filter(user=request.user)


class TopicView(ListCreateAPIView,RetrieveUpdateDestroyAPIView,ViewSet):
    """ 主题 """

    # 当前登录用户的调教
    filter_backends = [SelfFilterBackend, DjangoFilterBackend]  #DjangoFilterBackend是django需要的类,不能省略
    
    filterset_class = TopicFilterSet
    pagination_class = MyLimitOffsetPagination

    queryset = models.Topic.objects.filter(deleted=False).order_by('-id')

    serializer_class = TopicSerializer

    def perform_create(self, serializer):
        serializer.save(user=self.request.user)

    def perform_destroy(self, instance):
        instance.deleted = True
        instance.save()

路由

在之前进行drf开发时,对于路由我们一般进行两种配置:

  • 视图继承APIView

    python
    from django.urls import path
    from app01 import views
    
    urlpatterns = [
        path('api/users/', views.UserView.as_view()),
    ]
  • 视图继承 ViewSetMixin(GenericViewSet、ModelViewSet)

    python
    from django.urls import path, re_path, include
    from app01 import views
    
    urlpatterns = [
        path('api/users/', views.UserView.as_view({"get":"list","post":"create"})),
        path('api/users/<int:pk>/', views.UserView.as_view({"get":"retrieve","put":"update","patch":"partial_update","delete":"destory"})),
    ]

    对于这种形式的路由,drf中提供了更简便的方式:

    python
    from rest_framework import routers
    from app01 import views
    
    router = routers.SimpleRouter()
    router.register(r'api/users', views.UserView)
    
    urlpatterns = [
        # 其他URL
        # path('xxxx/', xxxx.as_view()),
    ]
    
    urlpatterns += router.urls

    也可以利用include,给URL加前缀:

    python
    from django.urls import path, include
    from rest_framework import routers
    from app01 import views
    
    router = routers.SimpleRouter()
    router.register(r'users', views.UserView)
    
    urlpatterns = [
        path('api/', include((router.urls, 'app_name'), namespace='instance_name')),
        # 其他URL
        # path('forgot-password/', ForgotPasswordFormView.as_view()),
    ]

解析器

之前使用 request.data 获取请求体中的数据。

这个 reqeust.data 的数据怎么来的呢?其实在drf内部是由解析器,根据请求者传入的数据格式 + 请求头来进行处理。

1.JSONParser

image-20210827081058194

2.FormParser

image-20210827081244795

3.MultiPartParser

image-20210827083047327

html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<form action="http://127.0.0.1:8000/test/" method="post" enctype="multipart/form-data">
    <input type="text" name="user" />
    <input type="file" name="img">

    <input type="submit" value="提交">

</form>
</body>
</html>

4.FileUploadParser

image-20210827084403453

解析器可以设置多个,默认解析器:

python
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.parsers import MultiPartParser, JSONParser, FormParser


class UserView(APIView):

    def post(self, request):
        print(request.content_type)
        print(request.data)

        return Response("...")

drf实战案例

1. 需求

请结合上述学习的drf知识开发 简易版《抽屉新热榜》。其中包含的功能如下:

  • 注册

    输入:手机号、用户名、密码、确认密码。
  • 登录

    输入:手机号 或 用户名  + 密码
    
    注意:登录成功后给用户返回token,后续请求需要在url中携带token(有效期2周)
  • 我的话题

    - 我的话题列表
    - 创建话题
    - 修改话题
    - 删除话题(逻辑删除)
  • 我的资讯

    - 创建资讯(5分钟创建一个,需要根据用户限流)    问题1:5/h   2/m; 问题2:成功后,下次再创建;
    	- 文本(你问我答、42区、挨踢1024、段子)
    	- 图片(图片、你问我答、42区、挨踢1024、段子)
    	- 连接(图片、你问我答、42区、挨踢1024、段子)
    	注意:创建时默认自己做1个推荐。
    - 我的资讯列表
  • 首页

    - 资讯首页
    	- 时间倒序,读取已审核通过的资讯
    	- 加载更多,分页处理
    	- 支持传入参数,查询各分区资讯:图片、你问我答、42区、挨踢1024、段子   ?zone=2
  • 推荐

    - 推荐
    - 取消推荐
    - 我的推荐列表
  • 收藏

    python
    - 收藏 or 取消收藏
    - 我的收藏列表
  • 评论

    python
    - 查看评论列表
    	- 根据【后代的更新时间】从大到小排序,读取根评论,每次读20条。
        - 读取根评论先关的子评论。
        - 将子评论挂靠到跟评论上,最终形成父子关系通过JSON返回给前端。
        注意:自己也可以通过depth实现逐步读取子评论(此处不这样操作)
        
    - 创建评论
    	- 判断是根评论 or 回复
        - 回复时,深度+1
        - 评论后,找到根评论去更新【后代的更新时间】

2. 参考表结构

表结构参考:

python
from django.db import models


class DeletedModel(models.Model):
    deleted = models.BooleanField(verbose_name="已删除", default=False)

    class Meta:
        abstract = True


class UserInfo(DeletedModel):
    """ 用户表 """
    username = models.CharField(verbose_name="用户名", max_length=32)
    phone = models.CharField(verbose_name="手机号", max_length=32, db_index=True)
    password = models.CharField(verbose_name="密码", max_length=64)

    token = models.CharField(verbose_name="token", max_length=64, null=True, blank=True, db_index=True)
    token_expiry_date = models.DateTimeField(verbose_name="token有效期", null=True, blank=True)

    status_choice = (
        (1, "激活"),
        (2, "禁用"),
    )
    status = models.IntegerField(verbose_name="状态", choices=status_choice, default=1)
    create_datetime = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)

    class Meta:
        # The newer indexes option provides more functionality than index_together
        # index_together may be deprecated in the future.
        # https://docs.djangoproject.com/en/3.2/ref/models/options/#index-together
        indexes = [
            models.Index(fields=['username', "password"], name='idx_name_pwd')
        ]


class Topic(DeletedModel):
    """ 话题 """
    title = models.CharField(verbose_name="话题", max_length=16, db_index=True)

    is_hot = models.BooleanField(verbose_name="热门话题", default=False)
    user = models.ForeignKey(verbose_name="用户", to="UserInfo", on_delete=models.CASCADE)
    create_datetime = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)


class News(DeletedModel):
    """ 新闻资讯 """
    zone_choices = ((1, "42区"), (2, "段子"), (3, "图片"), (4, "挨踢1024"), (5, "你问我答"))
    zone = models.IntegerField(verbose_name="专区", choices=zone_choices)

    title = models.CharField(verbose_name="文字", max_length=150)
    url = models.CharField(verbose_name="链接", max_length=200, null=True, blank=True)

    # xxxxx?xxxxxx.png,xxxxxxxx.jeg
    image = models.TextField(verbose_name="图片地址", help_text="逗号分割", null=True, blank=True)

    topic = models.ForeignKey(verbose_name="话题", to="Topic", on_delete=models.CASCADE, null=True, blank=True)
    user = models.ForeignKey(verbose_name="用户", to="UserInfo", on_delete=models.CASCADE)

    create_datetime = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)
    status_choice = (
        (1, "待审核"),
        (2, "已通过"),
        (3, "未通过"),
    )
    status = models.IntegerField(verbose_name="状态", choices=status_choice, default=1)

    collect_count = models.IntegerField(verbose_name="收藏数", default=0)
    recommend_count = models.IntegerField(verbose_name="推荐数", default=0)
    comment_count = models.IntegerField(verbose_name="评论数", default=0)


class Collect(models.Model):
    """ 收藏 """
    news = models.ForeignKey(verbose_name="资讯", to="News", on_delete=models.CASCADE)
    user = models.ForeignKey(verbose_name="用户", to="UserInfo", on_delete=models.CASCADE)
    create_datetime = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)

    class Meta:
        # unique_together = [['news', 'user']]
        constraints = [
            models.UniqueConstraint(fields=['news', 'user'], name='uni_collect_news_user')
        ]


class Recommend(models.Model):
    """ 推荐 """
    news = models.ForeignKey(verbose_name="资讯", to="News", on_delete=models.CASCADE)
    user = models.ForeignKey(verbose_name="用户", to="UserInfo", on_delete=models.CASCADE)
    create_datetime = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)

    class Meta:
        constraints = [
            models.UniqueConstraint(fields=['news', 'user'], name='uni_recommend_news_user')
        ]


class Comment(models.Model):
    """ 评论表 """
    news = models.ForeignKey(verbose_name="资讯", to="News", on_delete=models.CASCADE)
    user = models.ForeignKey(verbose_name="用户", to="UserInfo", on_delete=models.CASCADE)
    content = models.CharField(verbose_name="内容", max_length=150)

    depth = models.IntegerField(verbose_name="深度", default=0)

    root = models.ForeignKey(verbose_name="根评论", to="Comment", related_name="descendant", on_delete=models.CASCADE,
                             null=True, blank=True)

    reply = models.ForeignKey(verbose_name="回复", to="Comment", related_name="reply_list", on_delete=models.CASCADE,
                              null=True, blank=True)

    create_datetime = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)

    # 针对根评论
    descendant_update_datetime = models.DateTimeField(verbose_name="后代更新时间", auto_now_add=True)

3. 案例讲解

3.1 环境准备

  • 基于django创建项目,例如:dig

  • 安装必备模块

    django-filter==2.4.0
    django-redis==5.0.0
    djangorestframework==3.12.4
  • 创建app,例如:api

  • 注册app

    python
    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'rest_framework',
        'django_filters',
        'api.apps.ApiConfig'
    ]
  • 拷贝表结构到 models.py 并 生成数据库

3.2 接口开发

1.注册

2.登陆

3.我的话题增删改查

4.自定义限流类,限流频率为1/5m,并解决访问不成功时的限流问题

Serializer.py

python
from rest_framework import serializers
from api.models import UserInfo, Topic, News, Recommend
from rest_framework import exceptions
import re


class RegisterSerializer(serializers.ModelSerializer):
    re_password = serializers.CharField(max_length=32, allow_null=False, write_only=True)
    password = serializers.CharField(max_length=32, allow_null=False, write_only=True)

    class Meta:
        model = UserInfo
        fields = ['username', 'phone', 'password', 're_password']

    def validate_phone(self, value):
        is_exist = UserInfo.objects.filter(phone=value).exists()
        if is_exist:
            raise exceptions.ValidationError('手机号已注册,请直接登陆')
        if not re.match(r'^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\d{8}$', value):
            raise exceptions.ValidationError('手机号格式错误')
        return value

    def validate_password(self, value):
        print(self.initial_data, 'init')
        re_password = self.initial_data.get('re_password')
        if re_password != value:
            raise exceptions.ValidationError('密码不一致')
        return value


class LoginSerializer(serializers.ModelSerializer):
    class Meta:
        model = UserInfo
        fields = ['username', 'phone', 'password']
        extra_kwargs = {
            'token': {'write_only': True},
            'token_expiry_date': {'write_only': True},
            'username': {'required': False},
            'phone': {'required': False},
            'password': {'required': True},
        }

    def validate(self, attrs):
        username = attrs.get('username', None)
        phone = attrs.get('phone', None)
        if not username and not phone:
            raise exceptions.ValidationError('用户名或手机号为空')
        return attrs


class TopicSerializer(serializers.ModelSerializer):
    class Meta:
        model = Topic
        fields = ['id', 'title', 'is_hot']


class NewsSerializer(serializers.ModelSerializer):
    image_list = serializers.SerializerMethodField(read_only=True)
    topic_title = serializers.CharField(source="topic.title", read_only=True)
    zone_title = serializers.CharField(source="get_zone_display", read_only=True)
    status = serializers.CharField(source="get_status_display", read_only=True)

    # title、url、image、'topic', "zone"
    #   - 只有title,只创建文本 + 分区不能是图片
    #   - 有title,image,
    #   - 有title,url
    class Meta:
        model = News
        fields = ['id', "title", "url",
                  'image', 'topic', "zone",
                  "zone_title", 'image_list', "topic_title", 'collect_count', 'recommend_count', 'comment_count',
                  "status"]
        read_only_fields = ['collect_count', 'recommend_count', 'comment_count']
        extra_kwargs = {
            'topic': {'write_only': True},  # 新增时,topic=1
            'image': {'write_only': True},  # 图片地址   xxxx,xxxx,xxxx
            'zone': {'write_only': True},
        }

    def get_image_list(self, obj):
        if not obj.image:
            return []
        return obj.image.split(',')

    def validate_topic(self, value):
        if not value:
            return value
        request = self.context['request']
        exists = Topic.objects.filter(deleted=False, id=value.id, user=request.user).exists()
        if not exists:
            raise exceptions.ValidationError("话题不存在")
        return value

    def validate_title(self, value):
        url = self.initial_data.get('url')
        image = self.initial_data.get('image')
        zone = self.initial_data.get('zone')

        if url and image:
            raise exceptions.ValidationError("请求数据错误")
        if not url and not image:
            if zone == 3:
                raise exceptions.ValidationError("分区选择错误")
        return value

    def create(self, validated_data):
        request = self.context["request"]

        # 1.创建新闻资讯
        new_object = News.objects.create(recommend_count=1, **validated_data)

        # 2.推荐记录
        Recommend.objects.create(
            news=new_object,
            user=request.user
        )
        return new_object


    def create(self, validated_data):
        news_obj = News.objects.create(**validated_data, recommend_count=1)
        Recommend.objects.create(
            news=news_obj,
            user=self.context["request"].user,
        )
        return news_obj

Views.py

python
from rest_framework.generics import CreateAPIView
from rest_framework.views import APIView
from api.models import UserInfo
from api.serialisers import RegisterSerializer, LoginSerializer
from api.utils import encrypt
from django.db.models import Q
from rest_framework.response import Response
from api import ResponseCode
import random
import datetime


class RegisterView(CreateAPIView):
    serializer_class = RegisterSerializer
    queryset = UserInfo.objects.filter(deleted=False, status=1)

    def perform_create(self, serializer):
        serializer.validated_data.pop('re_password')
        password = serializer.validated_data.pop('password')
        password = encrypt.Mymd5(password)
        serializer.validated_data['password'] = password
        token = random.randint(1000, 9999)
        token_expiry_date = datetime.datetime.now() + datetime.timedelta(weeks=2)
        serializer.validated_data['token_expiry_date'] = token_expiry_date
        serializer.validated_data['token'] = token
        serializer.save()


class LoginView(CreateAPIView):
    queryset = UserInfo.objects.filter(deleted=False, status=1)
    serializer_class = LoginSerializer

    def create(self, request, *args, **kwargs):
        serializer = self.get_serializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        # print(111, serializer.validated_data)
        username = serializer.validated_data.get('username', None)
        password = serializer.validated_data.get('password', None)
        password = encrypt.Mymd5(password)
        phone = serializer.validated_data.get('phone', None)
        user_obj = UserInfo.objects.filter(Q(Q(username=username) | Q(phone=phone)), password=password).first()
        if not user_obj:
            return Response({'code': ResponseCode.LOGIN_FAILURE, 'detail': '用户名或密码错误'})

        token = random.randint(1000, 9999)
        token_expiry_date = datetime.datetime.now() + datetime.timedelta(weeks=2)
        user_obj.token_expiry_date = token_expiry_date
        user_obj.token = token
        # print(222, serializer.validated_data)
        return Response({'code': ResponseCode.LOGIN_SUCCESS, 'detail': '登陆成功'})

settings.py

python
TIME_ZONE = 'Asia/Shanghai'
USE_TZ = False

Urls.py

python
from django.urls import path
from api.views import RegisterView, LoginView

urlpatterns = [

    path('register/', RegisterView.as_view(), name='register'),
    path('login/', LoginView.as_view(), name='login')
]

throttle.py

python
from rest_framework.throttling import SimpleRateThrottle
from django.core.cache import cache as default_cache
from api.utils import return_code
from rest_framework import exceptions
from rest_framework import status


class ThrottledException(exceptions.APIException):
    status_code = status.HTTP_429_TOO_MANY_REQUESTS
    default_code = 'throttled'


class MySimpleRateThrottle(SimpleRateThrottle):
    cache = default_cache  # 访问记录存放在django的缓存中(需设置缓存)
    scope = "user"  # 构造缓存中的key
    cache_format = 'throttle_%(scope)s_%(ident)s'

    # 设置访问频率,例如:1/5m
    THROTTLE_RATES = {"user": "1/5m"}

    def parse_rate(self, rate):
        """
        Given the request rate string, return a two tuple of:
        <allowed number of requests>, <period of time in seconds>
        """
        if rate is None:
            return (None, None)
        num, period = rate.split('/')
        num_requests = int(num)
        duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[-1]]
        count = int(period[0])
        return (num_requests, duration * count)

    def get_cache_key(self, request, view):
        ident = request.user.pk
        return self.cache_format % {'scope': self.scope, 'ident': ident}

    def throttle_success(self):
        return True

    def throttle_failure(self):
        wait = self.wait()
        detail = {
            "code": return_code.TOO_MANY_REQUESTS,
            "data": "访问频率限制",
            'detail': "需等待{}秒后才能访问".format(int(wait))
        }
        raise ThrottledException(detail)  # 自定义的exception,在上面

    def done(self):
        self.history.insert(0, self.now)
        self.cache.set(self.key, self.history, self.duration)