#!/usr/bin/env python # -*- coding: utf-8 -*- import json import os from fastapi import Query, Depends, File, UploadFile, Path from openpyxl import load_workbook from sqlalchemy import asc, text from sqlalchemy.ext.asyncio import AsyncSession from starlette.background import BackgroundTasks from common.common import delete_oss_file from core.config import settings from crud.paper import crud_paper, crud_question from crud.resource import crud_work_category from models.user import SysUser from schemas.base import OrderByField, ReturnField from schemas.paper import NewPaperInfo, UpdatePaperInfo from schemas.paper import PaperInDB, UpdatePaper from utils.depends import get_async_db, get_current_user async def get_papers(page: int = 1, size: int = 10, name: str = "", pno: str = "", ctgid: int = Query(0, description="资源分类ID"), order: OrderByField = Query( "-created_at", description="排序字段,用逗号分隔,升降序以-判断,默认-created_at"), res: ReturnField = Query("", description="返回字段,默认列表展示字段"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): filters = {} if name: filters["name"] = name if pno: filters["pno"] = pno if ctgid: filters["category_id"] = ctgid offset = (page - 1) * size if isinstance(order, str): order = [text(order)] total, items = await crud_paper.find_all(db, filters=filters, offset=offset, limit=size, order_by=order, return_fields=res) for item in items: item.points = json.loads(item.points) return {"data": items, "total": total} async def get_papers_info(pid: int = Path(..., description="试卷ID"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): db_obj = await crud_paper.find_one(db, {"id": pid}) if not db_obj: return {"errcode": 400, "mess": "试卷不存在!"} # 查询该试卷的试题列表 db_obj.points = json.loads(db_obj.points) filters = {"pid": pid} questions = await crud_question.fetch_all(db, filters=filters, order_by=[asc("qno")]) for que in questions: que.imgs = json.loads(que.imgs) que.stuff_imgs = json.loads(que.stuff_imgs) db_obj.questions = questions return {"data": db_obj} async def create_paper(info: NewPaperInfo, db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): # 判断分类是否存在 existed = await crud_work_category.count(db, filters={"id": info.ctgid}) if not existed: return {"errcode": 404, "mess": "分类不存在!"} # 判断试卷编号是否重复 existed = await crud_paper.count(db, filters={"pno": info.pno}) if existed: return {"errcode": 400, "mess": "试卷编号重复!"} # 初始化对象 db_obj = PaperInDB(ctgid=info.ctgid, name=info.name, pno=info.pno, pages=info.pages, creator_id=current_user.id, creator_name=current_user.username, editor_id=current_user.id, editor_name=current_user.username) if info.imgs: db_obj.imgs = ";".join(info.imgs) db_obj.uploaded = True if info.attachments: db_obj.attach_url = ";".join(info.attachments) db_obj.attached = True # 创建 db_obj = await crud_paper.insert_one(db, db_obj) return {"data": db_obj} async def update_paper(info: UpdatePaperInfo, bg_task: BackgroundTasks, pid: int = Path(..., description="试卷ID"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): # 判断试卷是否存在 db_obj = await crud_paper.find_one(db, filters={"id": pid}) if not db_obj: return {"errcode": 404, "mess": "试卷不存在!"} # 提交参数 info_dict = info.dict(exclude_none=True) if not info_dict: return {"errcode": 400, "mess": "提交参数为空!"} if "ctgid" in info_dict: # 判断分类是否存在 existed = await crud_work_category.count( db, filters={"id": info_dict["ctgid"]}) if not existed: return {"errcode": 404, "mess": "分类不存在!"} if "imgs" in info_dict: # 试卷图片 info_dict["imgs"] = ";".join(info_dict["imgs"]) info_dict["uploaded"] = True bg_task.add_task(delete_oss_file, db_obj.imgs) # 删除原始URL if "attachments" in info_dict: # 电子卷附件 info_dict["attach_url"] = ";".join(info_dict["attachments"]) info_dict["attached"] = True bg_task.add_task(delete_oss_file, db_obj.attach_url) info_dict["editor_id"] = current_user.id info_dict["editor_name"] = current_user.username info = UpdatePaper(**info_dict) db_obj = await crud_paper.update(db, db_obj, info) return {"data": db_obj} async def delete_paper(bg_task: BackgroundTasks, pid: int = Path(..., description="试卷ID"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): # 删除 db_obj = await crud_paper.find_one(db, {"id": pid}) if not db_obj: return {"errcode": 404, "mess": "试卷不存在!"} else: await crud_paper.delete(db, obj_id=pid) bg_task.add_task(delete_oss_file, db_obj.imgs) return {"data": None} async def import_paper(datafile: UploadFile = File(..., description="数据文件"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): # 判断文件格式 if not datafile.filename.endswith(".xlsx"): return {"errcode": 400, "mess": "文件格式错误!"} # 把文件写入磁盘,再加载回来 disk_file = os.path.join(settings.UPLOADER_PATH, datafile.filename) content = await datafile.read() with open(disk_file, "wb") as f: f.write(content) # 解析文件 errors = [] success = 0 papers = [] # 使用openpyxl读取文件 wb = load_workbook(disk_file) ws = wb.worksheets[0] for row in ws.iter_rows(min_row=2, max_col=ws.max_column, max_row=ws.max_row, values_only=True): idx = row[0] if len(row) != 12: errors.append(f"第{idx}行: 提交参数错误!") continue # 判断必填参数 if not (row[0] and row[1] and row[2] and row[3] and row[4]): errors.append(f"第{idx}行: 缺少必填字段!") continue # 判断分类是否存在 existed = await crud_work_category.count(db, filters={"id": row[1]}) if not existed: errors.append(f"第{idx}行: 分类不存在!") continue # 判断试卷编号是否重复 existed = await crud_paper.count(db, filters={"pno": row[3]}) if existed: errors.append(f"第{idx}行: 试卷编号重复!") continue # 创建 obj_in = PaperInDB(ctgid=row[1], name=row[2], pno=row[3], pages=row[4], imgs=";".join(row[5]) if row[5] else "", uploaded=True if row[6] else False, points=row[7] if row[7] else "", attach_url=";".join(row[8]) if row[8] else "", attached=True if row[9] else False, cut=True if row[10] else False, score=row[11] if row[11] else 100, creator_id=current_user.id, creator_name=current_user.username, editor_id=current_user.id, editor_name=current_user.username) papers.append(obj_in) success += 1 if len(papers) == 20: await crud_paper.insert_many(db, papers) papers.clear() if papers: await crud_paper.insert_many(db, papers) papers.clear() os.remove(disk_file) return {"data": {"success": success, "fail": len(errors), "errors": errors}}