#!/usr/bin/env python # -*- coding: utf-8 -*- import os from fastapi import Query, Depends, File, UploadFile, Path from openpyxl import load_workbook from sqlalchemy import asc, desc from sqlalchemy.ext.asyncio import AsyncSession from starlette.background import BackgroundTasks from common.common import delete_oss_file from core.config import settings from crud.paper import crud_paper, crud_question from crud.resource import crud_work_category from models.paper import Paper from models.user import Admin from schemas.base import OrderByField, ReturnField from schemas.paper import NewPaperInfo, UpdatePaperInfo from schemas.paper import PaperInDB, UpdatePaper from utils.cv2img import CV2img,get_std_points from utils.depends import get_async_db, get_current_user from utils.imgtool import download_remote_img async def get_papers(page: int = 1, size: int = 10, name: str = "", pno: str = "", ptype: str = "", ctgid: int = Query(0, description="资源分类ID"), order: OrderByField = Query("-created_at", description="排序字段,用逗号分隔,升降序以-判断,默认-created_at"), res: ReturnField = Query("", description="返回字段,默认列表展示字段"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): _q = {} if name: _q["name"] = name if pno: _q["pno"] = pno if ctgid: _q["category_id"] = ctgid if ptype: _q["ptype"] = ptype offset = (page - 1) * size order_fields = [] if order: for x in order.split(","): field = x.strip() if field: if field.startswith("-"): order_fields.append(desc(getattr(Paper, field[1:]))) else: order_fields.append(asc(getattr(Paper, field))) total, items = await crud_paper.find_all(db, filters=_q, offset=offset, limit=size, order_by=order_fields, return_fields=res) return {"data": items, "total": total} async def get_papers_info(pid: int = Path(..., description="试卷ID"), usage: int = Query(None,description="用途"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): db_obj = await crud_paper.find_one(db, filters={"id": pid}) if not db_obj: return {"errcode": 400, "mess": "试卷不存在!"} # 查询该试卷的试题列表 if usage == 12: filters = {"pid": pid,"usage__in":[1,2],"qtype__in":["填空题","解答题",""]} else: filters = {"pid": pid,"usage__in":[0,1],"qtype__in":["填空题","解答题",""]} questions = await crud_question.fetch_all(db, filters=filters, order_by=[asc("id")]) new_questions = [] for q in questions: q.std_answer = ["A", "B", "C", "D"] que = q.__dict__ if q.stuff_points: for sp in q.stuff_points: stuff = dict(que) stuff["points"] = [sp] stuff["imgtype"] = 1 new_questions.append(stuff) for point in q.points: que = dict(que) que["points"] = [point] new_questions.append(que) db_obj.questions = new_questions #客观题 obj_questions = await crud_question.fetch_all(db, filters={"pid":pid,"qtype__in":["单选题","多选题"]}, order_by=[asc("id")]) for item in obj_questions: item.answer = item.answer.split() db_obj.obj_questions = obj_questions return {"data": db_obj} async def create_paper(info: NewPaperInfo, db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 判断分类是否存在 if info.ptype == "work": existed = await crud_work_category.count(db, filters={"id": info.ctgid}) if not existed: return {"errcode": 404, "mess": "分类不存在!"} # 判断试卷编号是否重复 existed = await crud_paper.count(db, filters={"pno": info.pno}) if existed: return {"errcode": 400, "mess": "试卷编号重复!"} # 标准点处理 std_points = [] if info.imgs: remote_imgs = info.imgs for url in remote_imgs: tmp_ri = await download_remote_img(url) points = get_std_points(tmp_ri) std_points.append(points) if not std_points: return {"errcode": 400, "mess": "缺少标准点!"} # 初始化对象 db_obj = PaperInDB(imgs=info.imgs, ctgid=info.ctgid, name=info.name, pno=info.pno, size=info.size, pages=info.pages, points=std_points, ptype=info.ptype, subject=info.subject, creator_id=current_user.id, creator_name=current_user.username, editor_id=current_user.id, editor_name=current_user.username) if std_points: db_obj.uploaded = True if info.attachments: db_obj.attach_url = ";".join(info.attachments) db_obj.attached = True # 创建 db_obj = await crud_paper.insert_one(db, db_obj) return {"data": db_obj} async def update_paper(info: UpdatePaperInfo, bg_task: BackgroundTasks, pid: int = Path(..., description="试卷ID"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 判断试卷是否存在 db_obj = await crud_paper.find_one(db, filters={"id": pid}) if not db_obj: return {"errcode": 404, "mess": "试卷不存在!"} # 提交参数 info_dict = info.dict(exclude_none=True) if not info_dict: return {"errcode": 400, "mess": "提交参数为空!"} if "ctgid" in info_dict: # 判断分类是否存在 existed = await crud_work_category.count(db, filters={"id": info_dict["ctgid"]}) if not existed: return {"errcode": 404, "mess": "分类不存在!"} if "imgs" in info_dict: # 试卷图片 info_dict["uploaded"] = True bg_task.add_task(delete_oss_file, db_obj.imgs) # 删除原始URL if "attachments" in info_dict: # 电子卷附件 info_dict["attach_url"] = ";".join(info_dict["attachments"]) info_dict["attached"] = True bg_task.add_task(delete_oss_file, db_obj.attach_url) info_dict["editor_id"] = current_user.id info_dict["editor_name"] = current_user.username info = UpdatePaper(**info_dict) db_obj = await crud_paper.update(db, db_obj, info) return {"data": db_obj} async def delete_paper(bg_task: BackgroundTasks, pid: int = Path(..., description="试卷ID"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 删除 db_obj = await crud_paper.find_one(db, filters={"id": pid}) if not db_obj: return {"errcode": 404, "mess": "试卷不存在!"} else: await crud_paper.delete(db, obj_id=pid) bg_task.add_task(delete_oss_file, db_obj.imgs) return {"data": None} async def import_paper(datafile: UploadFile = File(..., description="数据文件"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 判断文件格式 if not datafile.filename.endswith(".xlsx"): return {"errcode": 400, "mess": "文件格式错误!"} # 把文件写入磁盘,再加载回来 disk_file = os.path.join(settings.UPLOADER_PATH, datafile.filename) content = await datafile.read() with open(disk_file, "wb") as f: f.write(content) # 解析文件 errors = [] success = 0 papers = [] # 使用openpyxl读取文件 wb = load_workbook(disk_file) ws = wb.worksheets[0] for row in ws.iter_rows(min_row=2, max_col=ws.max_column, max_row=ws.max_row, values_only=True): idx = row[0] if len(row) != 12: errors.append(f"第{idx}行: 提交参数错误!") continue # 判断必填参数 if not (row[0] and row[1] and row[2] and row[3] and row[4]): errors.append(f"第{idx}行: 缺少必填字段!") continue # 判断分类是否存在 existed = await crud_work_category.count(db, filters={"id": row[1]}) if not existed: errors.append(f"第{idx}行: 分类不存在!") continue # 判断试卷编号是否重复 existed = await crud_paper.count(db, filters={"pno": row[3]}) if existed: errors.append(f"第{idx}行: 试卷编号重复!") continue # 创建 obj_in = PaperInDB(ctgid=row[1], name=row[2], pno=row[3], pages=row[4], imgs=row[5] if row[5] else [], uploaded=True if row[6] else False, points=row[7] if row[7] else [], attach_url=";".join(row[8]) if row[8] else "", attached=True if row[9] else False, cut=True if row[10] else False, score=row[11] if row[11] else 100, creator_id=current_user.id, creator_name=current_user.username, editor_id=current_user.id, editor_name=current_user.username) papers.append(obj_in) success += 1 if len(papers) == 20: await crud_paper.insert_many(db, papers) papers.clear() if papers: await crud_paper.insert_many(db, papers) papers.clear() os.remove(disk_file) return {"data": {"success": success, "fail": len(errors), "errors": errors}}