#!/usr/bin/env python # -*- coding: utf-8 -*- import datetime from fastapi import Depends, Query, Path from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from starlette.background import BackgroundTasks from bgtask.tasks import bgtask_delete_remote_file from common.const import RESOURCE_TYPES from crud.resource import CATEGORY_CRUDS, RESOURCE_CRUDS from crud.school import crud_school from models.resource import RESOURCE_MODES from models.user import Admin from schemas.base import OrderByField, ReturnField from schemas.resource.resource import NewResource, UpdateResource from utils.depends import get_async_db, get_current_user async def get_resources(page: int = 1, size: int = 10, name: str = "", year: int = 0, sid: int = 0, ctype: str = Query(..., description="资源分类名称,exam/work"), ctgid: int = Query(0, description="资源分类ID"), rtype: str = Query("", description="作业资源分类"), created_at: str = Query("", description="YYYY-MM-DD"), order: OrderByField = Query("-created_at", description="排序字段,用逗号分隔,升降序以-判断,默认-created_at"), res: ReturnField = Query("", description="返回字段,默认列表展示字段,自定义: id,name"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 判断资源类型 if ctype not in RESOURCE_TYPES: return {"errcode": 400, "mess": "资源类型不存在!"} else: crud = RESOURCE_CRUDS[ctype] model = RESOURCE_MODES[ctype] # 筛选条件 _q = [] if name: _q.append(model.name.like(f"%{name}%")) if year: _q.append(model.year == year) if sid: _q.append(model.school_id == sid) if ctgid: _q.append(model.category_id == ctgid) if ctype and rtype: _q.append(model.rtype == rtype) if created_at: _q.append(model.created_at >= datetime.datetime.strptime(created_at, "%Y-%m-%d")) offset = (page - 1) * size # 查询 if isinstance(order, str): order = [text(order)] total, items = await crud.find_all(db, filters=_q, offset=offset, limit=size, order_by=order, return_fields=res) if (not res) or (res and ("attach_url" in res)): for item in items: item.attach_url = item.attach_url.split(";") return {"data": items, "total": total} async def create_resource(info: NewResource, db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 判断提交参数 info_dict = info.dict(exclude_none=True) if not info_dict: return {"errcode": 400, "mess": "提交参数为空!"} # 是否有附件 if not info_dict.get("attach_url", ""): return {"errcode": 400, "mess": "缺少附件URL!"} else: info.attach_url = ";".join(info_dict.get("attach_url")) # 判断资源类型 if info_dict["ctype"] not in RESOURCE_TYPES: return {"errcode": 400, "mess": "资源类型不存在!"} db_category = await CATEGORY_CRUDS[info_dict["ctype"] ].find_one(db, filters={"id": info_dict["category_id"]}) if not db_category: return {"errcode": 404, "mess": "资源分类不存在!"} info.subject = db_category.subject info.category_name = db_category.name # 如果是考试资源,先判断学校是否存在 if info_dict["ctype"] == "exam": if not info_dict.get("school_id", None): return {"errcode": 400, "mess": "缺少学校ID或名称"} db_school = await crud_school.find_one(db, filters={"id": info_dict["school_id"]}) if not db_school: return {"errcode": 404, "mess": "学校不存在!"} else: info.school_name = db_school.name # 创建 info.creator_id = info.editor_id = current_user.id info.creator_name = info.editor_name = current_user.username delattr(info, "ctype") db_obj = await RESOURCE_CRUDS[info_dict["ctype"]].insert_one(db, info) db_obj.attach_url = db_obj.attach_url.split(";") return {"data": db_obj} async def get_resource(rid: int = Path(..., description="资源ID"), ctype: str = Query(..., description="资源分类,exam/work"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 判断资源类型 if ctype not in RESOURCE_TYPES: return {"errcode": 400, "mess": "资源类型不存在!"} db_obj = await RESOURCE_CRUDS[ctype].find_one(db, filters={"id": rid}) db_obj.attach_url = db_obj.attach_url.split(";") return {"data": db_obj} async def update_resource(info: UpdateResource, bgtask: BackgroundTasks, rid: int = Path(..., description="资源ID"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 判断提交参数 info_dict = info.dict(exclude_none=True) if not info_dict: return {"errcode": 400, "mess": "提交参数为空!"} # 判断资源类型 if info_dict["ctype"] not in RESOURCE_TYPES: return {"errcode": 400, "mess": "资源类型不存在!"} # 判断资源是否存在 db_obj = await RESOURCE_CRUDS[info_dict["ctype"]].find_one(db, filters={"id": rid}) if not db_obj: return {"errcode": 404, "mess": "资源不存在!"} # 判断资源分类是否存在 if ("category_id" in info_dict) and (db_obj.category_id != info_dict["category_id"]): db_category = await CATEGORY_CRUDS[info_dict["ctype"] ].find_one(db, filters={"id": info_dict["category_id"]}) if not db_category: return {"errcode": 404, "mess": "资源分类不存在!"} info.subject = db_category.subject info.category_name = db_category.name # 如果是考试资源,判断学校是否存在 if info_dict["ctype"] == "exam": if ("school_id" in info_dict) and (db_obj.school_id != info_dict["school_id"]): db_school = await crud_school.find_one(db, filters={"id": info_dict["school_id"]}) if not db_school: return {"errcode": 404, "mess": "学校不存在!"} else: info.school_name = db_school.name # 上传文件 if info_dict.get("attach_url", ""): info.attach_url = ";".join(info_dict.get("attach_url")) # 删除旧的附件 bgtask.add_task(bgtask_delete_remote_file, db_obj.attach_url) # 开始更新 info_dict["editor_id"] = current_user.id info_dict["editor_name"] = current_user.username info = UpdateResource(**info_dict) db_obj = await RESOURCE_CRUDS[info_dict["ctype"]].update(db, db_obj, info) db_obj.attach_url = db_obj.attach_url.split(";") return {"data": db_obj} async def delete_resource(bgtask: BackgroundTasks, rid: int = Path(..., description="资源ID"), ctype: str = Query(..., description="资源分类,exam/work"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 判断资源类型 if ctype not in RESOURCE_TYPES: return {"errcode": 400, "mess": "资源类型不存在!"} # 删除 db_obj = await RESOURCE_CRUDS[ctype].find_one(db, filters={"id": rid}) if not db_obj: return {"errcode": 404, "mess": "资源不存在!"} else: await RESOURCE_CRUDS[ctype].delete(db, obj_id=rid) bgtask.add_task(bgtask_delete_remote_file, db_obj.attach_url) return {"data": None}