#!/usr/bin/env python # -*- coding: utf-8 -*- from typing import Optional from fastapi import APIRouter, Depends, Path, Query from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from starlette.background import BackgroundTasks from bgtask.tasks import delete_related_object, async_create_grade from crud.school import crud_school, crud_system from crud.sysdata.region import crud_region from models.user import SysUser from schemas.base import OrderByField, ReturnField from schemas.school.school import (SchoolItemList, SchoolDetail, UpdateSchool, NewSchoolInfo, SchoolInDB) from utils.depends import get_async_db, get_current_user router = APIRouter() # 学校列表 @router.get("/schools", response_model=SchoolItemList, response_model_exclude_none=True, summary="学校列表") async def get_schools( page: Optional[int] = None, size: Optional[int] = None, name: str = "", area: str = "", created_at: str = Query("", description="YYYY-MM-DD,YYYY-MM-DD,时间范围用逗号分隔"), order: OrderByField = Query("-created_at", description="排序字段,用逗号分隔,升降序以-判断,默认-created_at"), res: ReturnField = Query("", description="返回字段,默认列表展示字段。返回字段:id,name"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): filters = [text("is_active = 1")] if name: filters.append(text(f"name LIKE '%{name}%'")) if area: filters.append(text(f"area = '{area}'")) if created_at: created_at_s = created_at.split(",")[0] created_at_e = created_at.split(",")[0] filters.append( text( f"created_at > '{created_at_s}' and created_at < '{created_at_e}'" )) if ((page is not None) and page >= 1) and ((size is not None) and size >= 1): offset = (page - 1) * size else: offset = size = None if isinstance(order, str): order = [text(order)] total, items = await crud_school.find_all(db, filters=filters, offset=offset, limit=size, order_by=order, return_fields=res) for item in items: item.area = item.area_name.split(",") return {"total": total, "data": items} # 学校详情 @router.get("/schools/{sid}", response_model=SchoolDetail, response_model_exclude_none=True, summary="学校详情") async def get_school(sid: int = Path(..., description="学校ID"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): db_obj = await crud_school.find_one(db, {"id": sid}) if db_obj: db_obj.area = db_obj.area_code.split(",") else: return {"errcode": 404, "mess": "学校不存在!"} return {"data": db_obj} # 新建学校 @router.post("/schools", response_model=SchoolDetail, response_model_exclude_none=True, summary="新建学校") async def create_school(info: NewSchoolInfo, bg_task: BackgroundTasks, db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): info_dict = info.dict() # 判断学制是否正确 existed = await crud_system.count(db, filters={"id": info_dict["category"]}) if not existed: return {"errcode": 400, "mess": "学制不存在!"} # 判断区域是否存在 area_code = ",".join(info_dict["area_code"]) total, db_regions = await crud_region.find_all( db, filters=[text(f"code in ({area_code})")], return_fields=["name"]) if total != len(info_dict["area_code"]): return {"errcode": 400, "mess": "地区不存在!"} else: area_name = ",".join([x.name for x in db_regions]) info_dict["area_code"] = area_code info_dict["area_name"] = area_name existed = await crud_school.count(db, { "name": info_dict["name"], "area_code": area_code }) if not existed: db_obj = SchoolInDB(**info_dict, creator_id=current_user.id, creator_name=current_user.username, editor_id=current_user.id, editor_name=current_user.username) school = await crud_school.insert_one(db, db_obj) bg_task.add_task(async_create_grade, db, school.id, school.category) school.area = info_dict["area_code"].split(",") return {"data": school} else: return {"errcode": 100, "mess": "学校已存在!"} # 更新学校 @router.put("/schools/{sid}", response_model=SchoolDetail, response_model_exclude_none=True, summary="更新学校") async def update_school(info: UpdateSchool, bg_task: BackgroundTasks, sid: int = Path(..., description="学校ID"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): # 判断提交参数 info_dict = info.dict(exclude_none=True) if not info_dict: return {"errcode": 400, "mess": "提交参数为空!"} # 判断学校是否存在 db_obj = await crud_school.find_one(db, {"id": sid}) if not db_obj: return {"errcode": 404, "mess": "学校不存在!"} # 判断地区是否存在 area_code = ",".join(info_dict.get("area_code", [])) if area_code and db_obj.area_code != area_code: total, db_regions = await crud_region.find_all( db, filters=[text(f"code in ({area_code})")], return_fields=["name"]) if total != len(info_dict["area_code"]): return {"errcode": 400, "mess": "地区不存在!"} else: info.area_code = area_code info.area_name = ",".join([x.name for x in db_regions]) else: delattr(info, "area_code") # 判断学制是否正确 if "category" in info_dict: existed = await crud_system.count(db, filters={"id": info_dict["category"]}) if not existed: return {"errcode": 400, "mess": "学制不存在!"} if db_obj.category < info_dict["category"]: bg_task.add_task(async_create_grade, db, sid, info_dict["category"]) elif db_obj.category == info_dict["category"]: delattr(info, "category") else: return {"errcode": 400, "mess": "学制不允许降级!"} # 更新学校 info.editor_id = current_user.id info.editor_name = current_user.username school = await crud_school.update(db, db_obj, info) school.area = school.area_code.split(",") return {"data": school} # 删除学校 @router.delete("/schools/{sid}", response_model=SchoolDetail, response_model_exclude_none=True, summary="删除学校") async def delete_school(bg_task: BackgroundTasks, sid: int = Path(..., description="学校ID"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): existed = await crud_school.count(db, {"id": sid}) if not existed: return {"errcode": 404, "mess": "学校不存在!"} else: await crud_school.delete(db, obj_id=sid) bg_task.add_task(delete_related_object, db, sid=sid) return {"data": None}