#!/usr/bin/env python # -*- coding: utf-8 -*- import re from bs4 import BeautifulSoup from fastapi import Depends, Query from sqlalchemy.ext.asyncio import AsyncSession from crud.paper import crud_question, crud_paper from models.paper import PaperQuestion from models.user import Admin from schemas.base import ReturnField from schemas.paper.questions import QuestionInfo, SaveQuePieceInfo from utils.depends import get_async_db, get_current_user from utils.imgtool import download_remote_img, crop_img,crop_img_local,crop_img_remote from utils.cv2img import CV2img from utils.ansDetect import rec_std_ans async def create_question(info: QuestionInfo, db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): info = info.dict(exclude_none=True) if not info: return {"errcode": 400, "mess": "请求参数为空!"} db_exist = await crud_question.find_one(db, filters={ "pid": info["pid"], "pno": info["pno"], "qno": info["qno"], "sqno": info["sqno"] }) if db_exist: return {"errcode": 400, "mess": "试题重复!"} info["creator_id"] = current_user.id info["creator_name"] = current_user.username db_obj = await crud_question.insert_one(db, info) return {"data": db_obj} async def update_question(info: SaveQuePieceInfo, db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): info_dict = info.dict(exclude_none=True) if not info_dict: return {"errcode": 400, "mess": "请求参数为空!"} pid = info_dict["id"] pno = info_dict["pno"] ques = info_dict["questions"] obj_questions = info_dict["obj_questions"] ques.extend(obj_questions) total_score = 0 for que in ques: qno = que["qno"] sqno = que["sqno"] qtype = que.get("qtype","") answer = que.get("answer",[]) answer = ",".join(answer) if answer else "" score = float(que.get("score",0)) total_score += score db_question = await crud_question.find_one(db, filters={ "pid": pid, "pno": pno, "qno": qno, "sqno": sqno }) if db_question: update_info = {"qtype": qtype, "answer": answer, "score": score} await crud_question.update(db, db_question, update_info) # 更新分数 db_paper = await crud_paper.find_one(db, filters={"id": pid}) await crud_paper.update(db, db_paper, {"score": total_score}) # 更新试卷试题数量 existed = await crud_question.count(db, filters={"pid": pid}) if existed: await crud_paper.update(db, db_paper, {"question_amount": existed}) return {"data": None} async def delete_question(qid: int, db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): existed = await crud_question.count(db, filters={"id": qid}) if not existed: return {"errcode": 400, "mess": "试题不存在!"} await crud_question.delete(db, qid) return {"data": None} async def get_question(qid: int, db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): db_exist = await crud_question.find_one(db, filters={"id": qid}) if not db_exist: return {"errcode": 400, "mess": "角色不存在!"} return {"data": db_exist} async def get_question_list(page: int = 1, size: int = 10, name: str = "", res: ReturnField = Query("", description="控制返回字段,字段逗号分隔"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): _q = [] if name: _q.append(PaperQuestion.name.like(f"{name}%")) offset = (page - 1) * size total, items = await crud_question.find_all(db, filters=_q, offset=offset, limit=size, return_fields=res) return {"total": total, "data": items} async def cut_imgs(std_img, points,std_points): """根据points完成图片切割 """ cut_urls = [] for idx, point in enumerate(points): x = point["x"] + std_points["x"] y = point["y"] + std_points["y"] w = point["w"] h = point["h"] point = (x, y, x + w, y + h) original_image = await download_remote_img(std_img) url = await crop_img(original_image, point, idx) cut_urls.append(url) return cut_urls async def get_cut_imgs(db, current_user, info: SaveQuePieceInfo): """ """ db_paper = await crud_paper.find_one(db, filters={"id": info.pid}) if not db_paper: return {"errcode": 404, "mess": "试卷不存在!"} if info.studentno_points: db_paper = await crud_paper.update(db, db_paper, {"studentno_points": info.studentno_points}) if info.ans_points: #这里获取标准点 std_points = db_paper.points[0][0] ans_url = await cut_imgs(db_paper.imgs[0], [info.ans_points],std_points) tmp_ans_img = await download_remote_img(ans_url[0]) std_ans_data = rec_std_ans(tmp_ans_img) db_paper = await crud_paper.update(db, db_paper, {"ans_points": info.ans_points,"ans_url":ans_url}) #写入questions for qno,std_points in std_ans_data.items(): std_que = await crud_question.find_one(db,filters={ "pid":info.pid, "pno":info.pno, "qno":qno, "sqno":1 }) if std_que: update_info = {"std_points":std_points} await crud_question.update(db,std_que,update_info) else: question_info = QuestionInfo( **{ "pid": info.pid, "pno": info.pno, "qno": qno, "sqno": 1, "page": 0, "points": [info.ans_points], "qtype":"单选题", "imgtype": 0, "creator_id": current_user.id, "creator_name": current_user.username, "editor_id": current_user.id, "editor_name": current_user.username, "usage": 0, "std_points": std_points }) db_question = await crud_question.insert_one(db,question_info) cut_data = [] added_questions = info.ques # 计算要删除的 old_questions = await crud_question.fetch_all(db, filters={"pid": info.pid,"usage__in":[0,1],"qtype__in":["填空题","解答题"]}) old_qids = [x.id for x in old_questions] now_questions = filter(lambda x: x["imgtype"] == 0 and x["id"], added_questions) now_qids = [x["id"] for x in now_questions] del_ids = list(set(old_qids) - set(now_qids)) del_filter = [PaperQuestion.id.in_(del_ids)] await crud_question.delete(db, where_clauses=del_filter) # 试题切割 questions = filter(lambda x: x["imgtype"] == 0 and not x["id"], added_questions) # 按qno和sqno聚合points cnt_dct = {} new_questions = [] for que in questions: key = (que["qno"],que["sqno"]) if cnt_dct.get(key): points = cnt_dct[key]["points"] points.extend(que["points"]) que["points"] = points cnt_dct[key] = que else: cnt_dct[key] = que for k,v in cnt_dct.items(): new_questions.append(v) for item in new_questions: if not ("-" in item["qno"]): question_info = QuestionInfo( **{ "pid": info.pid, "pno": info.pno, "qno": item["qno"], "sqno": item["sqno"], "page": item.get("page",0), "points": item["points"], "qtype": "解答题", "imgtype": item["imgtype"], "creator_id": current_user.id, "creator_name": current_user.username, "editor_id": current_user.id, "editor_name": current_user.username, "usage": 1 }) cut_data.append(question_info) else: question_info = QuestionInfo( **{ "pid": info.pid, "pno": info.pno, "qno": item["qno"], "sqno": item["sqno"], "page": item.get("page",0), "points": item["points"], "qtype": "解答题", "imgtype": item["imgtype"], "creator_id": current_user.id, "creator_name": current_user.username, "editor_id": current_user.id, "editor_name": current_user.username, "usage": 0 }) cut_data.append(question_info) for i in range(int(item["qno"].split("-")[0]),int(item["qno"].split("-")[1])+1): question_info = QuestionInfo( **{ "pid": info.pid, "pno": info.pno, "qno": i, "sqno": item["sqno"], "page": item.get("page",0), "points": item["points"], "qtype": "解答题", "imgtype": item["imgtype"], "creator_id": current_user.id, "creator_name": current_user.username, "editor_id": current_user.id, "editor_name": current_user.username, "usage": 2 }) cut_data.append(question_info) await crud_question.insert_many(db, cut_data) # 材料切割 stuffs = filter(lambda x: x["imgtype"] == 1, added_questions) # 按qno和sqno聚合points cnt_dct = {} new_stuffs = [] for que in stuffs: key = (que["qno"],que["sqno"]) if cnt_dct.get(key): points = cnt_dct[key]["points"] points.extend(que["points"]) que["points"] = points cnt_dct[key] = que else: cnt_dct[key] = que for k,v in cnt_dct.items(): new_stuffs.append(v) for item in new_stuffs: db_question = await crud_question.find_one(db, filters={ "pid": info.pid, "qno": item["qno"], "sqno": item["sqno"] }) if db_question: update_info = {"stuff_points": item["points"]} await crud_question.update(db, db_question, update_info) # 更新切割状态 filters = {"pid": info.pid} questions = await crud_question.fetch_all(db, filters=filters) if questions: await crud_paper.update(db, db_paper, {"cut": True}) return len(cut_data) async def save_pieces(info: SaveQuePieceInfo, db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): info_dict = info.dict(exclude_none=True) if not info_dict: return {"errcode": 400, "mess": "请求参数为空!"} cut_data = await get_cut_imgs(db, current_user, info) return {"data": cut_data} async def save_docs(info: SaveQuePieceInfo, db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): info_dict = info.dict(exclude_none=True) if not info_dict: return {"errcode": 400, "mess": "请求参数为空!"} new_ques = [] pid = info_dict["pid"] pno = info_dict["pno"] ques = info_dict["ques"] for que in ques: qno = que["qno"] tx = que["tx"].replace("【题型】", "") tg = que["tg"].replace("【题干】", "") nd = re.findall("[\u4e00-\u9fa5]+", que["nd"])[-1] zsd = re.findall("[\u4e00-\u9fa5]+", que["zsd"])[-1] soup = BeautifulSoup(que["da"], "html.parser") da = soup.text.replace("【答案】", "") jx = que["jx"].replace("【解析】", "") db_question = await crud_question.find_one(db, filters={ "pid": pid, "pno": pno, "qno": qno, "sqno": 1 }) if db_question: update_info = { "qtype": tx, "stem": tg, "level": nd, "answer": da, "lpoints": zsd, "analysis": jx } await crud_question.update(db, db_question, update_info) else: question_info = QuestionInfo( **{ "pid": pid, "pno": pno, "qno": qno, "sqno": 0, "qtype": tx, "stem": tg, "level": nd, "lpoints": zsd, "answer": da, "analysis": jx, "creator_id": current_user.id, "creator_name": current_user.username, }) new_ques.append(question_info) # 更新电子卷状态 db_paper = await crud_paper.find_one(db, filters={"id": pid}) await crud_paper.update(db, db_paper, {"attached": True}) return {"data": None}