#!/usr/bin/env python # -*- coding: utf-8 -*- from typing import Any, List from fastapi import Depends, Query, Path from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from common.const import PERIODS, SUBJECTS, RESOURCE_TYPES, WORK_RESOURCE_TYPES from crud.base import CrudManager from crud.resource import crud_work_category, crud_exam_category from models.user import SysUser from schemas.resource import NewCategory, UpdateCategory from utils.depends import get_async_db, get_current_user # 学段列表 async def get_periods(current_user: SysUser = Depends(get_current_user)): data = [{"name": item} for item in PERIODS] return {"data": data} # 学科列表 async def get_subjects(current_user: SysUser = Depends(get_current_user)): data = [{"name": item} for item in SUBJECTS] return {"data": data} async def get_work_types(current_user: SysUser = Depends(get_current_user)): data = [{"name": item} for item in WORK_RESOURCE_TYPES] return {"data": data} # 分类 async def get_category_tree(crud: CrudManager, db: AsyncSession = Depends(get_async_db), root: List[Any] = None): for item in root: children = await crud.fetch_all(db, filters=[text(f"pid={item.id}")]) item.children = children await get_category_tree(crud, db, item.children) async def get_categories(ctype: str = Query(..., description="资源类型,exam / work"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): if ctype not in RESOURCE_TYPES: return {"errcode": 400, "mess": "资源类型错误!"} crud = crud_work_category if ctype == "work" else crud_exam_category root = await crud.fetch_all(db, filters=[text("pid = 0")]) await get_category_tree(crud, db, root) return {"data": root} # 创建资源分类 async def create_category(info: NewCategory, db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): # 判断提交参数是否为空 info_dict = info.dict(exclude_none=True) if not info_dict: return {"errcode": 400, "mess": "提交参数为空!"} # 分类 crud = crud_work_category if info.ctype == "work" else crud_exam_category filters = { "name": info.name, "period": info.period, "subject": info.subject } if info.pid: # 判断上级分类是否存在 category = await crud.find_one(db, filters={"id": info.pid}) if not category: return {"errcode": 404, "mess": "上级分类不存在!"} else: info.pname = category.name # 判断同级分类是否存在重复 existed = await crud.count(db, filters=filters) if existed: return {"errcode": 400, "mess": "存在同名分类!"} # 创建 delattr(info, "ctype") db_obj = await crud.insert_one(db, info) return {"data": db_obj} # 更新资源分类 async def update_category(info: UpdateCategory, cid: int = Path(..., description="分类ID"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): # 判断提交参数是否为空 info_dict = info.dict(exclude_none=True) if not info_dict: return {"errcode": 400, "mess": "提交参数为空!"} # 判断分类是否存在 crud = crud_work_category if info.ctype == "work" else crud_exam_category db_obj = await crud.find_one(db, filters={"id": cid}) if not db_obj: return {"errcode": 404, "mess": "分类不存在!"} # 判断上级分类是否存在 if ("pid" in info_dict) or ("pname" in info_dict): if bool(info.pid) ^ bool(info.pname): return {"errcode": 400, "mess": "缺少上级分类ID或名称"} existed = await crud.count(db, filters={ "id": info.pid, "name": info.pname }) if not existed: return {"errcode": 404, "mess": "上级分类不存在!"} # 判断同级分类是否存在重复 if "name" in info_dict: existed = await crud.count( db, filters=[ text(f"id != {cid} AND name = '{info.name}'"), text(f"pid = {info.pid} AND pname = '{info.pname}'") ]) if existed: return {"errcode": 400, "mess": "存在同名分类!"} # 更新 db_obj = await crud.update(db, db_obj, info) return {"data": db_obj} # 删除资源分类 async def delete_category(cid: int = Path(..., description="资源分类ID"), ctype: str = Query(..., description="分类类型,exam / work"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): if ctype not in RESOURCE_TYPES: return {"errcode": 400, "mess": "分类类型错误!"} crud = crud_work_category if ctype == "work" else crud_exam_category existed = await crud.count(db, {"id": cid}) if not existed: return {"errcode": 404, "mess": "分类不存在!"} else: await crud.delete(db, obj_id=cid) # TODO: 删除关联数据 # bg_task.add_task(delete_related_object, db, cid=id) return {"data": None}