|
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import os
- from fastapi import Query, Depends, File, UploadFile, Path
- from openpyxl import load_workbook
- from sqlalchemy import asc, desc
- from sqlalchemy.ext.asyncio import AsyncSession
- from starlette.background import BackgroundTasks
- from common.common import delete_oss_file
- from core.config import settings
- from crud.paper import crud_paper, crud_question
- from crud.resource import crud_work_category
- from models.paper import Paper
- from models.user import Admin
- from schemas.base import OrderByField, ReturnField
- from schemas.paper import NewPaperInfo, UpdatePaperInfo
- from schemas.paper import PaperInDB, UpdatePaper
- from utils.cv2img import CV2img,get_std_points
- from utils.depends import get_async_db, get_current_user
- from utils.imgtool import download_remote_img
- async def get_papers(page: int = 1,
- size: int = 10,
- name: str = "",
- pno: str = "",
- ptype: str = "",
- ctgid: int = Query(0, description="资源分类ID"),
- order: OrderByField = Query("-created_at",
- description="排序字段,用逗号分隔,升降序以-判断,默认-created_at"),
- res: ReturnField = Query("", description="返回字段,默认列表展示字段"),
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- _q = {}
- if name:
- _q["name"] = name
- if pno:
- _q["pno"] = pno
- if ctgid:
- _q["category_id"] = ctgid
- if ptype:
- _q["ptype"] = ptype
- offset = (page - 1) * size
- order_fields = []
- if order:
- for x in order.split(","):
- field = x.strip()
- if field:
- if field.startswith("-"):
- order_fields.append(desc(getattr(Paper, field[1:])))
- else:
- order_fields.append(asc(getattr(Paper, field)))
- total, items = await crud_paper.find_all(db,
- filters=_q,
- offset=offset,
- limit=size,
- order_by=order_fields,
- return_fields=res)
- return {"data": items, "total": total}
- async def get_papers_info(pid: int = Path(..., description="试卷ID"),
- usage: int = Query(None,description="用途"),
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- db_obj = await crud_paper.find_one(db, filters={"id": pid})
- if not db_obj:
- return {"errcode": 400, "mess": "试卷不存在!"}
- # 查询该试卷的试题列表
- if usage == 12:
- filters = {"pid": pid,"usage__in":[1,2],"qtype__in":["填空题","解答题",""]}
- else:
- filters = {"pid": pid,"usage__in":[0,1],"qtype__in":["填空题","解答题",""]}
- questions = await crud_question.fetch_all(db, filters=filters, order_by=[asc("id")])
- new_questions = []
- for q in questions:
- q.std_answer = ["A", "B", "C", "D"]
- que = q.__dict__
- if q.stuff_points:
- for sp in q.stuff_points:
- stuff = dict(que)
- stuff["points"] = [sp]
- stuff["imgtype"] = 1
- new_questions.append(stuff)
- for point in q.points:
- que = dict(que)
- que["points"] = [point]
- new_questions.append(que)
- db_obj.questions = new_questions
- #客观题
- obj_questions = await crud_question.fetch_all(db, filters={"pid":pid,"qtype__in":["单选题","多选题"]}, order_by=[asc("id")])
- for item in obj_questions:
- item.answer = item.answer.split()
- db_obj.obj_questions = obj_questions
- return {"data": db_obj}
- async def create_paper(info: NewPaperInfo,
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- # 判断分类是否存在
- if info.ptype == "work":
- existed = await crud_work_category.count(db, filters={"id": info.ctgid})
- if not existed:
- return {"errcode": 404, "mess": "分类不存在!"}
- # 判断试卷编号是否重复
- existed = await crud_paper.count(db, filters={"pno": info.pno})
- if existed:
- return {"errcode": 400, "mess": "试卷编号重复!"}
- # 标准点处理
- std_points = []
- if info.imgs:
- remote_imgs = info.imgs
- for url in remote_imgs:
- tmp_ri = await download_remote_img(url)
- points = get_std_points(tmp_ri)
- std_points.append(points)
- if not std_points:
- return {"errcode": 400, "mess": "缺少标准点!"}
- # 初始化对象
- db_obj = PaperInDB(imgs=info.imgs,
- ctgid=info.ctgid,
- name=info.name,
- pno=info.pno,
- size=info.size,
- pages=info.pages,
- points=std_points,
- ptype=info.ptype,
- subject=info.subject,
- creator_id=current_user.id,
- creator_name=current_user.username,
- editor_id=current_user.id,
- editor_name=current_user.username)
- if std_points:
- db_obj.uploaded = True
- if info.attachments:
- db_obj.attach_url = ";".join(info.attachments)
- db_obj.attached = True
- # 创建
- db_obj = await crud_paper.insert_one(db, db_obj)
- return {"data": db_obj}
- async def update_paper(info: UpdatePaperInfo,
- bg_task: BackgroundTasks,
- pid: int = Path(..., description="试卷ID"),
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- # 判断试卷是否存在
- db_obj = await crud_paper.find_one(db, filters={"id": pid})
- if not db_obj:
- return {"errcode": 404, "mess": "试卷不存在!"}
- # 提交参数
- info_dict = info.dict(exclude_none=True)
- if not info_dict:
- return {"errcode": 400, "mess": "提交参数为空!"}
- if "ctgid" in info_dict: # 判断分类是否存在
- existed = await crud_work_category.count(db, filters={"id": info_dict["ctgid"]})
- if not existed:
- return {"errcode": 404, "mess": "分类不存在!"}
- if "imgs" in info_dict: # 试卷图片
- info_dict["uploaded"] = True
- bg_task.add_task(delete_oss_file, db_obj.imgs) # 删除原始URL
- if "attachments" in info_dict: # 电子卷附件
- info_dict["attach_url"] = ";".join(info_dict["attachments"])
- info_dict["attached"] = True
- bg_task.add_task(delete_oss_file, db_obj.attach_url)
- info_dict["editor_id"] = current_user.id
- info_dict["editor_name"] = current_user.username
- info = UpdatePaper(**info_dict)
- db_obj = await crud_paper.update(db, db_obj, info)
- return {"data": db_obj}
- async def delete_paper(bg_task: BackgroundTasks,
- pid: int = Path(..., description="试卷ID"),
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- # 删除
- db_obj = await crud_paper.find_one(db, filters={"id": pid})
- if not db_obj:
- return {"errcode": 404, "mess": "试卷不存在!"}
- else:
- await crud_paper.delete(db, obj_id=pid)
- bg_task.add_task(delete_oss_file, db_obj.imgs)
- return {"data": None}
- async def import_paper(datafile: UploadFile = File(..., description="数据文件"),
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- # 判断文件格式
- if not datafile.filename.endswith(".xlsx"):
- return {"errcode": 400, "mess": "文件格式错误!"}
- # 把文件写入磁盘,再加载回来
- disk_file = os.path.join(settings.UPLOADER_PATH, datafile.filename)
- content = await datafile.read()
- with open(disk_file, "wb") as f:
- f.write(content)
- # 解析文件
- errors = []
- success = 0
- papers = []
- # 使用openpyxl读取文件
- wb = load_workbook(disk_file)
- ws = wb.worksheets[0]
- for row in ws.iter_rows(min_row=2, max_col=ws.max_column, max_row=ws.max_row, values_only=True):
- idx = row[0]
- if len(row) != 12:
- errors.append(f"第{idx}行: 提交参数错误!")
- continue
- # 判断必填参数
- if not (row[0] and row[1] and row[2] and row[3] and row[4]):
- errors.append(f"第{idx}行: 缺少必填字段!")
- continue
- # 判断分类是否存在
- existed = await crud_work_category.count(db, filters={"id": row[1]})
- if not existed:
- errors.append(f"第{idx}行: 分类不存在!")
- continue
- # 判断试卷编号是否重复
- existed = await crud_paper.count(db, filters={"pno": row[3]})
- if existed:
- errors.append(f"第{idx}行: 试卷编号重复!")
- continue
- # 创建
- obj_in = PaperInDB(ctgid=row[1],
- name=row[2],
- pno=row[3],
- pages=row[4],
- imgs=row[5] if row[5] else [],
- uploaded=True if row[6] else False,
- points=row[7] if row[7] else [],
- attach_url=";".join(row[8]) if row[8] else "",
- attached=True if row[9] else False,
- cut=True if row[10] else False,
- score=row[11] if row[11] else 100,
- creator_id=current_user.id,
- creator_name=current_user.username,
- editor_id=current_user.id,
- editor_name=current_user.username)
- papers.append(obj_in)
- success += 1
- if len(papers) == 20:
- await crud_paper.insert_many(db, papers)
- papers.clear()
- if papers:
- await crud_paper.insert_many(db, papers)
- papers.clear()
- os.remove(disk_file)
- return {"data": {"success": success, "fail": len(errors), "errors": errors}}
|