123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import re
- from bs4 import BeautifulSoup
- from fastapi import Depends, Query
- from sqlalchemy.ext.asyncio import AsyncSession
- from crud.paper import crud_question, crud_paper
- from models.paper import PaperQuestion
- from models.user import Admin
- from schemas.base import ReturnField
- from schemas.paper.questions import QuestionInfo, SaveQuePieceInfo
- from utils.depends import get_async_db, get_current_user
- from utils.imgtool import download_remote_img, crop_img,crop_img_local,crop_img_remote
- from utils.cv2img import CV2img
- from utils.ansDetect import rec_std_ans
- async def create_question(info: QuestionInfo,
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- info = info.dict(exclude_none=True)
- if not info:
- return {"errcode": 400, "mess": "请求参数为空!"}
- db_exist = await crud_question.find_one(db,
- filters={
- "pid": info["pid"],
- "pno": info["pno"],
- "qno": info["qno"],
- "sqno": info["sqno"]
- })
- if db_exist:
- return {"errcode": 400, "mess": "试题重复!"}
- info["creator_id"] = current_user.id
- info["creator_name"] = current_user.username
- db_obj = await crud_question.insert_one(db, info)
- return {"data": db_obj}
- async def update_question(info: SaveQuePieceInfo,
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- info_dict = info.dict(exclude_none=True)
- if not info_dict:
- return {"errcode": 400, "mess": "请求参数为空!"}
- pid = info_dict["id"]
- pno = info_dict["pno"]
- ques = info_dict["questions"]
- obj_questions = info_dict["obj_questions"]
-
- ques.extend(obj_questions)
- total_score = 0
- for que in ques:
- qno = que["qno"]
- sqno = que["sqno"]
- qtype = que.get("qtype","")
- answer = que.get("answer",[])
- answer = ",".join(answer) if answer else ""
- score = float(que.get("score",0))
- total_score += score
- db_question = await crud_question.find_one(db,
- filters={
- "pid": pid,
- "pno": pno,
- "qno": qno,
- "sqno": sqno
- })
- if db_question:
- update_info = {"qtype": qtype, "answer": answer, "score": score}
- await crud_question.update(db, db_question, update_info)
- # 更新分数
- db_paper = await crud_paper.find_one(db, filters={"id": pid})
- await crud_paper.update(db, db_paper, {"score": total_score})
- # 更新试卷试题数量
- existed = await crud_question.count(db, filters={"pid": pid})
- if existed:
- await crud_paper.update(db, db_paper, {"question_amount": existed})
- return {"data": None}
- async def delete_question(qid: int,
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- existed = await crud_question.count(db, filters={"id": qid})
- if not existed:
- return {"errcode": 400, "mess": "试题不存在!"}
- await crud_question.delete(db, qid)
- return {"data": None}
- async def get_question(qid: int,
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- db_exist = await crud_question.find_one(db, filters={"id": qid})
- if not db_exist:
- return {"errcode": 400, "mess": "角色不存在!"}
- return {"data": db_exist}
- async def get_question_list(page: int = 1,
- size: int = 10,
- name: str = "",
- res: ReturnField = Query("", description="控制返回字段,字段逗号分隔"),
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- _q = []
- if name:
- _q.append(PaperQuestion.name.like(f"{name}%"))
- offset = (page - 1) * size
- total, items = await crud_question.find_all(db,
- filters=_q,
- offset=offset,
- limit=size,
- return_fields=res)
- return {"total": total, "data": items}
- async def cut_imgs(std_img, points,std_points):
- """根据points完成图片切割
- """
- cut_urls = []
- for idx, point in enumerate(points):
- x = point["x"] + std_points["x"]
- y = point["y"] + std_points["y"]
- w = point["w"]
- h = point["h"]
- point = (x, y, x + w, y + h)
- original_image = await download_remote_img(std_img)
- url = await crop_img(original_image, point, idx)
- cut_urls.append(url)
- return cut_urls
- async def get_cut_imgs(db, current_user, info: SaveQuePieceInfo):
- """
- """
- db_paper = await crud_paper.find_one(db, filters={"id": info.pid})
- if not db_paper:
- return {"errcode": 404, "mess": "试卷不存在!"}
- if info.studentno_points:
- db_paper = await crud_paper.update(db, db_paper, {"studentno_points": info.studentno_points})
- if info.ans_points:
- #这里获取标准点
- std_points = db_paper.points[0][0]
- ans_url = await cut_imgs(db_paper.imgs[0], [info.ans_points],std_points)
- tmp_ans_img = await download_remote_img(ans_url[0])
- std_ans_data = rec_std_ans(tmp_ans_img)
- db_paper = await crud_paper.update(db, db_paper, {"ans_points": info.ans_points,"ans_url":ans_url})
- #写入questions
- for qno,std_points in std_ans_data.items():
- std_que = await crud_question.find_one(db,filters={
- "pid":info.pid,
- "pno":info.pno,
- "qno":qno,
- "sqno":1
- })
- if std_que:
- update_info = {"std_points":std_points}
- await crud_question.update(db,std_que,update_info)
- else:
- question_info = QuestionInfo(
- **{
- "pid": info.pid,
- "pno": info.pno,
- "qno": qno,
- "sqno": 1,
- "page": 0,
- "points": [info.ans_points],
- "qtype":"单选题",
- "imgtype": 0,
- "creator_id": current_user.id,
- "creator_name": current_user.username,
- "editor_id": current_user.id,
- "editor_name": current_user.username,
- "usage": 0,
- "std_points": std_points
- })
- db_question = await crud_question.insert_one(db,question_info)
- cut_data = []
- added_questions = info.ques
- # 计算要删除的
- old_questions = await crud_question.fetch_all(db, filters={"pid": info.pid,"usage__in":[0,1],"qtype__in":["填空题","解答题"]})
- old_qids = [x.id for x in old_questions]
- now_questions = filter(lambda x: x["imgtype"] == 0 and x["id"], added_questions)
- now_qids = [x["id"] for x in now_questions]
- del_ids = list(set(old_qids) - set(now_qids))
- del_filter = [PaperQuestion.id.in_(del_ids)]
- await crud_question.delete(db, where_clauses=del_filter)
- # 试题切割
- questions = filter(lambda x: x["imgtype"] == 0 and not x["id"], added_questions)
- # 按qno和sqno聚合points
- cnt_dct = {}
- new_questions = []
- for que in questions:
- key = (que["qno"],que["sqno"])
- if cnt_dct.get(key):
- points = cnt_dct[key]["points"]
- points.extend(que["points"])
- que["points"] = points
- cnt_dct[key] = que
- else:
- cnt_dct[key] = que
- for k,v in cnt_dct.items():
- new_questions.append(v)
- for item in new_questions:
- if not ("-" in item["qno"]):
- question_info = QuestionInfo(
- **{
- "pid": info.pid,
- "pno": info.pno,
- "qno": item["qno"],
- "sqno": item["sqno"],
- "page": item.get("page",0),
- "points": item["points"],
- "qtype": "解答题",
- "imgtype": item["imgtype"],
- "creator_id": current_user.id,
- "creator_name": current_user.username,
- "editor_id": current_user.id,
- "editor_name": current_user.username,
- "usage": 1
- })
- cut_data.append(question_info)
- else:
- question_info = QuestionInfo(
- **{
- "pid": info.pid,
- "pno": info.pno,
- "qno": item["qno"],
- "sqno": item["sqno"],
- "page": item.get("page",0),
- "points": item["points"],
- "qtype": "解答题",
- "imgtype": item["imgtype"],
- "creator_id": current_user.id,
- "creator_name": current_user.username,
- "editor_id": current_user.id,
- "editor_name": current_user.username,
- "usage": 0
- })
- cut_data.append(question_info)
- for i in range(int(item["qno"].split("-")[0]),int(item["qno"].split("-")[1])+1):
- question_info = QuestionInfo(
- **{
- "pid": info.pid,
- "pno": info.pno,
- "qno": i,
- "sqno": item["sqno"],
- "page": item.get("page",0),
- "points": item["points"],
- "qtype": "解答题",
- "imgtype": item["imgtype"],
- "creator_id": current_user.id,
- "creator_name": current_user.username,
- "editor_id": current_user.id,
- "editor_name": current_user.username,
- "usage": 2
- })
- cut_data.append(question_info)
- await crud_question.insert_many(db, cut_data)
- # 材料切割
- stuffs = filter(lambda x: x["imgtype"] == 1, added_questions)
- # 按qno和sqno聚合points
- cnt_dct = {}
- new_stuffs = []
- for que in stuffs:
- key = (que["qno"],que["sqno"])
- if cnt_dct.get(key):
- points = cnt_dct[key]["points"]
- points.extend(que["points"])
- que["points"] = points
- cnt_dct[key] = que
- else:
- cnt_dct[key] = que
- for k,v in cnt_dct.items():
- new_stuffs.append(v)
- for item in new_stuffs:
- db_question = await crud_question.find_one(db,
- filters={
- "pid": info.pid,
- "qno": item["qno"],
- "sqno": item["sqno"]
- })
- if db_question:
- update_info = {"stuff_points": item["points"]}
- await crud_question.update(db, db_question, update_info)
- # 更新切割状态
- filters = {"pid": info.pid}
- questions = await crud_question.fetch_all(db, filters=filters)
- if questions:
- await crud_paper.update(db, db_paper, {"cut": True})
- return len(cut_data)
- async def save_pieces(info: SaveQuePieceInfo,
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- info_dict = info.dict(exclude_none=True)
- if not info_dict:
- return {"errcode": 400, "mess": "请求参数为空!"}
- cut_data = await get_cut_imgs(db, current_user, info)
- return {"data": cut_data}
- async def save_docs(info: SaveQuePieceInfo,
- db: AsyncSession = Depends(get_async_db),
- current_user: Admin = Depends(get_current_user)):
- info_dict = info.dict(exclude_none=True)
- if not info_dict:
- return {"errcode": 400, "mess": "请求参数为空!"}
- new_ques = []
- pid = info_dict["pid"]
- pno = info_dict["pno"]
- ques = info_dict["ques"]
- for que in ques:
- qno = que["qno"]
- tx = que["tx"].replace("【题型】", "")
- tg = que["tg"].replace("【题干】", "")
- nd = re.findall("[\u4e00-\u9fa5]+", que["nd"])[-1]
- zsd = re.findall("[\u4e00-\u9fa5]+", que["zsd"])[-1]
- soup = BeautifulSoup(que["da"], "html.parser")
- da = soup.text.replace("【答案】", "")
- jx = que["jx"].replace("【解析】", "")
- db_question = await crud_question.find_one(db,
- filters={
- "pid": pid,
- "pno": pno,
- "qno": qno,
- "sqno": 1
- })
- if db_question:
- update_info = {
- "qtype": tx,
- "stem": tg,
- "level": nd,
- "answer": da,
- "lpoints": zsd,
- "analysis": jx
- }
- await crud_question.update(db, db_question, update_info)
- else:
- question_info = QuestionInfo(
- **{
- "pid": pid,
- "pno": pno,
- "qno": qno,
- "sqno": 0,
- "qtype": tx,
- "stem": tg,
- "level": nd,
- "lpoints": zsd,
- "answer": da,
- "analysis": jx,
- "creator_id": current_user.id,
- "creator_name": current_user.username,
- })
- new_ques.append(question_info)
- # 更新电子卷状态
- db_paper = await crud_paper.find_one(db, filters={"id": pid})
- await crud_paper.update(db, db_paper, {"attached": True})
- return {"data": None}
|