|
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- from typing import Optional
- from fastapi import APIRouter, Depends, Path, Query
- from sqlalchemy import text
- from sqlalchemy.ext.asyncio import AsyncSession
- from starlette.background import BackgroundTasks
- from bgtask.tasks import delete_related_object, async_create_grade
- from crud.school import crud_school, crud_system
- from crud.sysdata.region import crud_region
- from models.user import SysUser
- from schemas.base import OrderByField, ReturnField
- from schemas.school.school import (SchoolItemList, SchoolDetail, UpdateSchool,
- NewSchoolInfo, SchoolInDB)
- from utils.depends import get_async_db, get_current_user
- router = APIRouter()
- # 学校列表
- @router.get("/schools",
- response_model=SchoolItemList,
- response_model_exclude_none=True,
- summary="学校列表")
- async def get_schools(
- page: Optional[int] = None,
- size: Optional[int] = None,
- name: str = "",
- area: str = "",
- created_at: str = Query("", description="YYYY-MM-DD,YYYY-MM-DD,时间范围用逗号分隔"),
- order: OrderByField = Query("-created_at",
- description="排序字段,用逗号分隔,升降序以-判断,默认-created_at"),
- res: ReturnField = Query("", description="返回字段,默认列表展示字段。返回字段:id,name"),
- db: AsyncSession = Depends(get_async_db),
- current_user: SysUser = Depends(get_current_user)):
- filters = [text("is_active = 1")]
- if name:
- filters.append(text(f"name LIKE '%{name}%'"))
- if area:
- filters.append(text(f"area = '{area}'"))
- if created_at:
- created_at_s = created_at.split(",")[0]
- created_at_e = created_at.split(",")[0]
- filters.append(
- text(
- f"created_at > '{created_at_s}' and created_at < '{created_at_e}'"
- ))
- if ((page is not None) and page >= 1) and ((size is not None)
- and size >= 1):
- offset = (page - 1) * size
- else:
- offset = size = None
- if isinstance(order, str):
- order = [text(order)]
- total, items = await crud_school.find_all(db,
- filters=filters,
- offset=offset,
- limit=size,
- order_by=order,
- return_fields=res)
- for item in items:
- item.area = item.area_name.split(",")
- return {"total": total, "data": items}
- # 学校详情
- @router.get("/schools/{sid}",
- response_model=SchoolDetail,
- response_model_exclude_none=True,
- summary="学校详情")
- async def get_school(sid: int = Path(..., description="学校ID"),
- db: AsyncSession = Depends(get_async_db),
- current_user: SysUser = Depends(get_current_user)):
- db_obj = await crud_school.find_one(db, {"id": sid})
- if db_obj:
- db_obj.area = db_obj.area_code.split(",")
- else:
- return {"errcode": 404, "mess": "学校不存在!"}
- return {"data": db_obj}
- # 新建学校
- @router.post("/schools",
- response_model=SchoolDetail,
- response_model_exclude_none=True,
- summary="新建学校")
- async def create_school(info: NewSchoolInfo,
- bg_task: BackgroundTasks,
- db: AsyncSession = Depends(get_async_db),
- current_user: SysUser = Depends(get_current_user)):
- info_dict = info.dict()
- # 判断学制是否正确
- existed = await crud_system.count(db, filters={"id": info_dict["category"]})
- if not existed:
- return {"errcode": 400, "mess": "学制不存在!"}
- # 判断区域是否存在
- area_code = ",".join(info_dict["area_code"])
- total, db_regions = await crud_region.find_all(
- db, filters=[text(f"code in ({area_code})")], return_fields=["name"])
- if total != len(info_dict["area_code"]):
- return {"errcode": 400, "mess": "地区不存在!"}
- else:
- area_name = ",".join([x.name for x in db_regions])
- info_dict["area_code"] = area_code
- info_dict["area_name"] = area_name
- existed = await crud_school.count(db, {
- "name": info_dict["name"],
- "area_code": area_code
- })
- if not existed:
- db_obj = SchoolInDB(**info_dict,
- creator_id=current_user.id,
- creator_name=current_user.username,
- editor_id=current_user.id,
- editor_name=current_user.username)
- school = await crud_school.insert_one(db, db_obj)
- bg_task.add_task(async_create_grade, db, school.id, school.category)
- school.area = info_dict["area_code"].split(",")
- return {"data": school}
- else:
- return {"errcode": 100, "mess": "学校已存在!"}
- # 更新学校
- @router.put("/schools/{sid}",
- response_model=SchoolDetail,
- response_model_exclude_none=True,
- summary="更新学校")
- async def update_school(info: UpdateSchool,
- bg_task: BackgroundTasks,
- sid: int = Path(..., description="学校ID"),
- db: AsyncSession = Depends(get_async_db),
- current_user: SysUser = Depends(get_current_user)):
- # 判断提交参数
- info_dict = info.dict(exclude_none=True)
- if not info_dict:
- return {"errcode": 400, "mess": "提交参数为空!"}
- # 判断学校是否存在
- db_obj = await crud_school.find_one(db, {"id": sid})
- if not db_obj:
- return {"errcode": 404, "mess": "学校不存在!"}
- # 判断地区是否存在
- area_code = ",".join(info_dict.get("area_code", []))
- if area_code and db_obj.area_code != area_code:
- total, db_regions = await crud_region.find_all(
- db,
- filters=[text(f"code in ({area_code})")],
- return_fields=["name"])
- if total != len(info_dict["area_code"]):
- return {"errcode": 400, "mess": "地区不存在!"}
- else:
- info.area_code = area_code
- info.area_name = ",".join([x.name for x in db_regions])
- else:
- delattr(info, "area_code")
- # 判断学制是否正确
- if "category" in info_dict:
- existed = await crud_system.count(db,
- filters={"id": info_dict["category"]})
- if not existed:
- return {"errcode": 400, "mess": "学制不存在!"}
- if db_obj.category < info_dict["category"]:
- bg_task.add_task(async_create_grade, db, sid, info_dict["category"])
- elif db_obj.category == info_dict["category"]:
- delattr(info, "category")
- else:
- return {"errcode": 400, "mess": "学制不允许降级!"}
- # 更新学校
- info.editor_id = current_user.id
- info.editor_name = current_user.username
- school = await crud_school.update(db, db_obj, info)
- school.area = school.area_code.split(",")
- return {"data": school}
- # 删除学校
- @router.delete("/schools/{sid}",
- response_model=SchoolDetail,
- response_model_exclude_none=True,
- summary="删除学校")
- async def delete_school(bg_task: BackgroundTasks,
- sid: int = Path(..., description="学校ID"),
- db: AsyncSession = Depends(get_async_db),
- current_user: SysUser = Depends(get_current_user)):
- existed = await crud_school.count(db, {"id": sid})
- if not existed:
- return {"errcode": 404, "mess": "学校不存在!"}
- else:
- await crud_school.delete(db, obj_id=sid)
- bg_task.add_task(delete_related_object, db, sid=sid)
- return {"data": None}
|