123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import copy
- from typing import Optional, Union
- from fastapi import APIRouter, Depends, Query, Path
- from sqlalchemy import text, desc
- from sqlalchemy.ext.asyncio import AsyncSession
- from starlette.background import BackgroundTasks
- from bgtask.tasks import bgtask_delete_related_object
- from crud.school import crud_class, crud_school, crud_grade
- from models.school import SchoolClass
- from models.user import Admin
- from schemas.school.school import (SchoolClassList, NewClassInfo, SchoolClassDetail, ClassInDB,
- UpdateClass)
- from utils.depends import get_async_db, get_current_user
- router = APIRouter()
- # 班级列表
- @router.get("/classes",
- response_model=SchoolClassList,
- response_model_exclude_none=True,
- summary="班级列表")
- async def get_classes(page: Optional[int] = None,
- size: Optional[int] = None,
- sid: int = Query(None, description="学校ID"),
- gid: int = Query(None, description="年级ID"),
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- _q = {}
- if sid is not None:
- _q["school_id"] = sid
- if gid is not None:
- _q["grade_id"] = gid
- if ((page is not None) and page >= 1) and ((size is not None) and size >= 1):
- offset = (page - 1) * size
- else:
- offset = size = None
- total, items = await crud_class.find_all(db, filters=_q, offset=offset, limit=size)
- return {"total": total, "data": items}
- @router.post("/classes",
- response_model=SchoolClassDetail,
- response_model_exclude_none=True,
- summary="新建班级")
- async def create_class(info: NewClassInfo,
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- info_dict = info.dict(exclude_none=True)
- if not info_dict:
- return {"errcode": 400, "mess": "提交参数为空!"}
- if not (("name" not in info_dict) or ("amount" not in info_dict)): # 单个建班或快速建班
- return {"errcode": 400, "mess": "缺少班级名称或数量"}
- # 判断学校是否存在
- db_school = await crud_school.find_one(db,
- filters={"id": info_dict["school_id"]},
- return_fields=["name"])
- if not db_school:
- return {"errcode": 404, "mess": "学校不存在!"}
- else:
- info_dict["school_name"] = db_school.name
- # 判断年级是否存在
- db_grade = await crud_grade.find_one(db,
- filters={
- "school_id": info.school_id,
- "id": info.grade_id
- },
- return_fields=["name"])
- if not db_grade:
- return {"errcode": 404, "mess": "年级不存在!"}
- else:
- info_dict["grade_name"] = db_grade.name
- # 新建班级,如果存在amount,表示为快速建班,否则为普通建班
- new_classes = []
- info_dict["creator_id"] = info_dict["editor_id"] = current_user.id
- info_dict["creator_name"] = info_dict["editor_name"] = current_user.username
- if info_dict.get("amount", 0):
- # 查询当前学校+当前年级下是否存在班级
- _, db_class = await crud_class.find_all(db,
- filters={
- "school_id": info.school_id,
- "grade_id": info.grade_id
- },
- return_fields=["name"])
- db_class_names = [x.name for x in db_class]
- for i in range(1, info_dict.pop("amount") + 1):
- name = f"{info_dict['grade_name']}{str(i)}班"
- if name not in db_class_names:
- temp = copy.deepcopy(info_dict)
- temp["name"] = name
- new_classes.append(temp)
- else:
- info_dict["name"] = info_dict["grade_name"] + info_dict["name"]
- new_classes.append(ClassInDB(**info_dict))
- item = await crud_class.insert_many(db, new_classes)
- if item is None:
- return {"errcode": 400, "mess": "存在同名班级"}
- else:
- return {"data": item}
- @router.get("/classes/{cid}",
- response_model=SchoolClassDetail,
- response_model_exclude_none=True,
- summary="班级详情")
- async def get_class(cid: str = Path(..., description="班级ID"),
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- db_obj = await crud_class.find_one(db, filters={"id": cid})
- return {"data": db_obj} if db_obj else {"errcode": 404, "mess": "班级不存在!"}
- # 更新班级
- @router.put("/classes/{cid}",
- response_model=SchoolClassDetail,
- response_model_exclude_none=True,
- summary="更新班级")
- async def update_class(info: UpdateClass,
- cid: int = Path(..., description="班级ID"),
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- # 判断提交参数
- info_dict = info.dict(exclude_none=True)
- if not info_dict:
- return {"errcode": 400, "mess": "提交参数为空!"}
- # 判断班级是否存在
- db_obj = await crud_class.find_one(db, filters={"id": cid})
- if not db_obj:
- return {"errcode": 404, "mess": "班级不存在!"}
- # 如果学校ID不等于班级的school_id, 则更新
- if ("school_id" in info_dict) and (db_obj.school_id != info_dict["school_id"]):
- db_school = await crud_school.count(db,
- filters={"id": info.school_id},
- return_fields=["name"])
- if not db_school:
- return {"errcode": 404, "mess": "学校不存在!"}
- else:
- info.school_name = db_school.name
- # 判断年级是否存在
- if ("grade_id" in info_dict) and (db_obj.grade_id != info_dict["grade_id"]):
- db_grade = await crud_grade.count(db,
- filters={
- "school_id": info.school_id,
- "id": info.grade_id
- },
- return_fields=["name"])
- if not db_grade:
- return {"errcode": 404, "mess": "年级不存在!"}
- else:
- info.grade_name = db_grade.name
- # 更新班级
- info.editor_id = current_user.id
- info.editor_name = current_user.username
- db_obj = await crud_class.update(db, db_obj, info)
- return {"data": db_obj}
- # 删除班级
- @router.delete("/classes/{cid}",
- response_model=SchoolClassDetail,
- response_model_exclude_none=True,
- summary="删除班级")
- async def delete_class(bgtask: BackgroundTasks,
- cid: Union[int,
- str] = Path(...,
- description="学生ID,批量删除传用逗号分隔ID(str),单个删除传ID(int)"),
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- # 查询所有班级
- if isinstance(cid, int):
- deleted_count = 1
- _q = [SchoolClass.id == cid]
- else:
- cids = [int(x.strip()) for x in cid.split(',') if x.strip()]
- deleted_count = len(cids)
- _q = [SchoolClass.id.in_(cids)]
- total, db_objs = await crud_class.find_all(db, filters=_q)
- # 删除
- for item in db_objs:
- await crud_class.delete(db, obj_id=item.id)
- bgtask.add_task(bgtask_delete_related_object, cid=item.id)
- return {
- "data": total,
- "mess": "success" if total == deleted_count else f"成功:{total},失败: {deleted_count - total}"
- }
|