#!/usr/bin/env python # -*- coding: utf-8 -*- import datetime from typing import Optional from fastapi import APIRouter, Depends, Path, Query from sqlalchemy import text, between, desc, asc from sqlalchemy.ext.asyncio import AsyncSession from starlette.background import BackgroundTasks from bgtask.tasks import bgtask_delete_related_object, bgtask_create_grade from crud.school import crud_school, crud_system from crud.sysdata.region import crud_region from models.school import School from models.sysdata.region import Region from models.user import Admin from schemas.base import OrderByField, ReturnField from schemas.school.school import (SchoolItemList, SchoolDetail, UpdateSchool, NewSchoolInfo, SchoolInDB) from utils.depends import get_async_db, get_current_user router = APIRouter() # 学校列表 @router.get("/schools", response_model=SchoolItemList, response_model_exclude_none=True, summary="学校列表") async def get_schools(page: Optional[int] = None, size: Optional[int] = None, name: str = "", area: str = "", created_at: str = Query("", description="YYYY-MM-DD,YYYY-MM-DD,时间范围用逗号分隔"), order: str = Query("-id", description="排序字段,用逗号分隔,升降序以-判断,默认-created_at"), res: ReturnField = Query("", description="返回字段,默认列表展示字段。返回字段:id,name"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): _q = [School.is_active == True] if name: _q.append(School.name.like(f"%{name}%")) if area: _q.append(School.area_code == area) if created_at: start_date, end_date = created_at.split(",") _q.append( between(School.created_at, datetime.datetime.strptime(start_date, "%Y-%m-%d"), datetime.datetime.strptime(end_date, "%Y-%m-%d"))) if ((page is not None) and page >= 1) and ((size is not None) and size >= 1): offset = (page - 1) * size else: offset = size = None # 排序字段 order_fields = [] if order: for x in order.split(","): field = x.strip() if field: if field.startswith("-"): order_fields.append(desc(getattr(School, field[1:]))) else: order_fields.append(asc(getattr(School, field))) total, items = await crud_school.find_all(db, filters=_q, offset=offset, limit=size, order_by=order_fields, return_fields=res) for item in items: item.area = item.area_name.split(",") return {"total": total, "data": items} # 学校详情 @router.get("/schools/{sid}", response_model=SchoolDetail, response_model_exclude_none=True, summary="学校详情") async def get_school(sid: int = Path(..., description="学校ID"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): db_obj = await crud_school.find_one(db, filters={"id": sid}) if db_obj: db_obj.area = db_obj.area_code.split(",") else: return {"errcode": 404, "mess": "学校不存在!"} return {"data": db_obj} # 新建学校 @router.post("/schools", response_model=SchoolDetail, response_model_exclude_none=True, summary="新建学校") async def create_school(info: NewSchoolInfo, bgtask: BackgroundTasks, db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): info_dict = info.dict() # 判断学制是否正确 existed = await crud_system.count(db, filters={"id": info_dict["category"]}) if not existed: return {"errcode": 400, "mess": "学制不存在!"} # 判断区域是否存在 area_code = ",".join(info_dict["area_code"]) total, db_regions = await crud_region.find_all(db, filters=[text(f"code in ({area_code})")], return_fields=["name"]) if total != len(info_dict["area_code"]): return {"errcode": 400, "mess": "地区不存在!"} else: area_name = ",".join([x.name for x in db_regions]) info_dict["area_code"] = area_code info_dict["area_name"] = area_name existed = await crud_school.count(db, filters={ "name": info_dict["name"], "area_code": area_code }) if not existed: db_obj = SchoolInDB(**info_dict, creator_id=current_user.id, creator_name=current_user.username, editor_id=current_user.id, editor_name=current_user.username) school = await crud_school.insert_one(db, db_obj) bgtask.add_task(bgtask_create_grade, school.id, school.category) school.area = info_dict["area_code"].split(",") return {"data": school} else: return {"errcode": 100, "mess": "学校已存在!"} # 更新学校 @router.put("/schools/{sid}", response_model=SchoolDetail, response_model_exclude_none=True, summary="更新学校") async def update_school(info: UpdateSchool, bgtask: BackgroundTasks, sid: int = Path(..., description="学校ID"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 判断提交参数 info_dict = info.dict(exclude_none=True) if not info_dict: return {"errcode": 400, "mess": "提交参数为空!"} # 判断学校是否存在 db_obj = await crud_school.find_one(db, filters={"id": sid}) if not db_obj: return {"errcode": 404, "mess": "学校不存在!"} # 判断地区是否存在 area_code = ",".join(info_dict.get("area_code", [])) if area_code and db_obj.area_code != area_code: total, db_regions = await crud_region.find_all(db, filters=[Region.code.in_(area_code)], return_fields=["name"]) if total != len(info_dict["area_code"]): return {"errcode": 400, "mess": "地区不存在!"} else: info.area_code = area_code info.area_name = ",".join([x.name for x in db_regions]) else: delattr(info, "area_code") # 判断学制是否正确 if "category" in info_dict: existed = await crud_system.count(db, filters={"id": info_dict["category"]}) if not existed: return {"errcode": 400, "mess": "学制不存在!"} if db_obj.category < info_dict["category"]: bgtask.add_task(bgtask_create_grade, sid, info_dict["category"]) elif db_obj.category == info_dict["category"]: delattr(info, "category") else: return {"errcode": 400, "mess": "学制不允许降级!"} # 更新学校 info.editor_id = current_user.id info.editor_name = current_user.username school = await crud_school.update(db, db_obj, info) school.area = school.area_code.split(",") return {"data": school} # 删除学校 @router.delete("/schools/{sid}", response_model=SchoolDetail, response_model_exclude_none=True, summary="删除学校") async def delete_school(bgtask: BackgroundTasks, sid: int = Path(..., description="学校ID"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): existed = await crud_school.count(db, filters={"id": sid}) if not existed: return {"errcode": 404, "mess": "学校不存在!"} else: await crud_school.delete(db, obj_id=sid) bgtask.add_task(bgtask_delete_related_object, sid=sid) return {"data": None}