#!/usr/bin/env python # -*- coding: utf-8 -*- from collections import OrderedDict import openpyxl from fastapi import Query, Depends, Path from sqlalchemy import select, func, case, between, and_, asc, desc from sqlalchemy.ext.asyncio import AsyncSession from starlette.background import BackgroundTasks from starlette.responses import FileResponse from app.api.endpoints.review._utils import complete_answer_statistic from bgtask.tasks import (bgtask_update_student_marktask, bgtask_update_class_error_statistic, update_student_error_statistic) from common.const import OBJECTIVE_QUESTION_TYPES from core.config import settings from crud.marktask import crud_task, crud_student_answer, crud_student_task from crud.paper import crud_question from models.marktask import StudentAnswer, StudentMarkTask, MarkTask from models.paper import PaperQuestion from models.user import Teacher from schemas.app.task import UpdateMarkTaskQuestion from schemas.base import OrderByField from utils.depends import get_async_db, get_current_user # 阅卷任务列表 async def get_mark_tasks(page: int = 1, size: int = 10, cid: int = Query(0, description="班级ID"), mtype: str = Query(..., description="阅卷任务类型,work/exam"), ctgid: int = Query(0, description="分类ID"), year: int = Query(0, description="年份"), status: int = Query(None, description="状态"), name: str = Query(None, description="任务名称"), order: OrderByField = Query( "-created_at", description="排序字段,用逗号分隔,升降序以-判断,默认-created_at"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): current_teacher_classes = [int(x) for x in current_user.class_id.split(',')] _q = [MarkTask.mtype == mtype, MarkTask.subject == current_user.subject] if ctgid: _q.append(MarkTask.category_id == ctgid) if year: _q.append(MarkTask.year == year) # 班级条件,只查询教师所属班级 if cid: _q.append(MarkTask.class_id == cid) else: _q.append(MarkTask.class_id.in_(current_teacher_classes)) if name: _q.append(MarkTask.name.like(f"%{name}%")) if status: _q.append(MarkTask.status == status) offset = (page - 1) * size # 排序字段 order_fields = [] if order: for x in order.split(","): field = x.strip() if field: if field.startswith("-"): order_fields.append(desc(getattr(MarkTask, field[1:]))) else: order_fields.append(asc(getattr(MarkTask, field))) total, marks = await crud_task.find_all(db, filters=_q, limit=size, offset=offset, order_by=order_fields) for item in marks: item.pass_rate = f"{int(item.pass_rate * 100)}%" return {"data": marks, "total": total} # 阅卷任务详情 async def get_mark_task(tid: int = Path(..., description="批阅任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 判断task是否存在 db_obj = await crud_task.find_one(db, filters={"id": tid}) if not db_obj: return {"errcode": 400, "mess": "阅卷任务不存在!"} db_obj.pass_rate = f"{int(db_obj.pass_rate * 100)}%" # 按成绩分段统计 score_gaps = ["0-50", "50-60", "60-70", "70-80", "80-90", "90-100"] if db_obj.status != 0: stmt = select( func.sum(case(whens=(between(StudentMarkTask.score, 0, 49), 1), else_=0)).label("0-50"), func.sum(case(whens=(between(StudentMarkTask.score, 50, 59), 1), else_=0)).label("50-60"), func.sum(case(whens=(between(StudentMarkTask.score, 60, 69), 1), else_=0)).label("60-70"), func.sum(case(whens=(between(StudentMarkTask.score, 70, 79), 1), else_=0)).label("70-80"), func.sum(case(whens=(between(StudentMarkTask.score, 80, 89), 1), else_=0)).label("80-90"), func.sum(case(whens=(between(StudentMarkTask.score, 90, 100), 1), else_=0)).label("90-100"), ).select_from(StudentMarkTask).where(StudentMarkTask.task_id == tid, StudentMarkTask.is_completed == True) scores = [x if x else 0 for x in (await crud_student_task.execute_v2(db, stmt))[0]] score_table = [{ "key": "0-50", "val": scores[0] }, { "key": "50-70", "val": scores[1] + scores[2] }, { "key": "70-100", "val": sum(scores[3:]) }] score_chart = [{"key": x[0], "val": x[1]} for x in zip(score_gaps, scores)] else: score_table = [{ "key": "0-50", "val": 0 }, { "key": "50-70", "val": 0 }, { "key": "70-100", "val": 0 }] score_chart = [{"key": x, "val": 0} for x in score_gaps] return {"data": db_obj, "score_table": score_table, "score_chart": score_chart} # 阅卷任务试题列表 async def get_task_questions(page: int = 1, size: int = 10, tid: int = Path(..., description="阅卷任务ID"), stid: int = Query(None, description="学生ID"), qno: int = Query(None, description="题号"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): _q = [StudentAnswer.qtype.notin_(OBJECTIVE_QUESTION_TYPES)] # 按学生批阅,数据流:查询StudentMarkTask获取ID,使用ID去查询改学生的所有试题 if stid: student_task = await crud_student_task.find_one( db, filters=[StudentMarkTask.task_id == tid, StudentMarkTask.student_id == stid], return_fields=["id"]) _q.append(StudentAnswer.student_task_id == student_task.id) elif qno: # 按试题批阅,数据流:通过 task_id 和 qno 查询试题 _q.extend([StudentAnswer.task_id == tid, StudentAnswer.qno == qno]) else: _q.append(StudentAnswer.task_id == tid) # 查询试题列表 offset = (page - 1) * size total, items = await crud_student_answer.find_all(db, filters=_q, offset=offset, limit=size, order_by=[asc("qid")]) for item in items: if item.marked_img: item.qimg = item.marked_img return {"data": items, "total": total} # 批阅 async def mark_question(info: UpdateMarkTaskQuestion, bgtask: BackgroundTasks, tid: int = Path(..., description="阅卷任务ID"), qid: int = Path(..., description="试题ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 判断试题是否存在 _q = {"id": qid, "task_id": tid} db_obj = await crud_student_answer.find_one(db, filters=_q) if not db_obj: return {"errcode": 400, "mess": "试题不存在!"} if info.marked_score > db_obj.score: return {"errcode": 400, "mess": "批阅得分大于试题满分,请修改!"} old_score = db_obj.marked_score has_marked = db_obj.status # 是否已被批阅过 # 错题统计业务逻辑: # 1、必须是客观题 # 2、如果未批阅,当提交得分不等于试题满分时,错题数量加1; # 3、如果已批阅,当提交得分等于试题满分时,错题数量减1; error_count = 0 # 是否错误 info.incorrect = False if db_obj.score == info.marked_score else True if db_obj.status: # 已批阅 if info.incorrect != db_obj.incorrect: if db_obj.incorrect: error_count += -1 else: error_count += 1 else: # 未批阅 if info.incorrect: error_count += 1 # 更新班级错题统计数据 bgtask.add_task(bgtask_update_class_error_statistic, db_obj.task_id, db_obj.class_id, db_obj.qid, error_count, old_score, info.marked_score, has_marked) # 更新学生错题统计数据 if error_count: bgtask.add_task(update_student_error_statistic, db_obj.student_id, db_obj.mtype.value, error_count) else: info.incorrect = None info.status = True db_obj = await crud_student_answer.update(db, db_obj, info) # 更新流程:更新StudentAnswer -> 根据StudentAnswer.task_id更新StudentMarkTask -> 根据StudentMarkTask.task_id更新MarkTask bgtask.add_task( bgtask_update_student_marktask, task_id=db_obj.student_task_id, qtype=db_obj.qtype, score=db_obj.marked_score - old_score, # 多次批阅时的分数差距 has_marked=has_marked, editor_id=current_user.id, editor_name=current_user.username) return {"data": None} # 作业中心 - 学生答题统计 async def get_student_tasks(page: int = 1, size: int = 10, tid: int = Query(..., description="阅卷任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 查询task列表 offset = (page - 1) * size total, db_tasks = await crud_student_task.find_all(db, filters={"task_id": tid}, offset=offset, limit=size) # 查询每个学生的错题 for item in db_tasks: _, db_questions = await crud_student_answer.find_all(db, filters={ "student_task_id": item.id, "incorrect": True }, return_fields=["id", "qno", "sqno"]) questions = [f"{x.id},{x.qno},{x.sqno}" for x in db_questions] item.wrong_questions = questions return {"data": db_tasks, "total": total} # 作业中心 - 学生阅卷任务详情 async def get_student_task(tid: int = Path(..., description="学生阅卷任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): return_fields = [ "id", "student_id", "student_name", "student_sno", "score", "objective_score", "subjective_score", "question_amount" ] db_task = await crud_student_task.find_one(db, filters={"id": tid}, return_fields=return_fields) return {"data": db_task} # 作业中心 - 学生答题列表 async def get_student_answers(page: int = 1, size: int = 10, tid: int = Path(..., description="学生阅卷任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 查询学生阅卷任务 student_task = await crud_student_task.find_one(db, filters={"id": tid}, return_fields=["task_id", "question_amount"]) offset = (page - 1) * size # 查询学生答题列表 stmt = select(StudentAnswer.qno, StudentAnswer.sqno, StudentAnswer.qimg, StudentAnswer.marked_img, PaperQuestion.answer, PaperQuestion.analysis)\ .join(PaperQuestion, and_(StudentAnswer.pid == PaperQuestion.pid, StudentAnswer.qno == PaperQuestion.qno))\ .where(StudentAnswer.student_task_id == tid).order_by(StudentAnswer.qid)\ .limit(size).offset(offset) answers = [{ "qno": x[0], "sqno": x[1], "answer": x[4], "analysis": x[5], "marked_img": x[2] if x[2] else x[3] } for x in await crud_student_answer.execute_v2(db, stmt)] # 统计每道题的答案分布 for item in answers: stmt = select(StudentAnswer.answer, func.count()).select_from(StudentAnswer).where( StudentAnswer.task_id == student_task.task_id, StudentAnswer.qno == item["qno"], StudentAnswer.sqno == item["sqno"]).group_by(StudentAnswer.answer) item["dist"] = [{ "key": x[0], "val": int(x[1]) } for x in await crud_student_answer.execute_v2(db, stmt)] item["dist"] = complete_answer_statistic(item["dist"]) return {"data": answers, "total": student_task.question_amount} # 作业中心 - 学生阅卷任务列表下载 async def download_student_task(tid: int = Query(..., description="阅卷任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 查询task列表 _, db_tasks = await crud_student_task.find_all(db, filters={"task_id": tid}) # 查询每个学生的错题 for item in db_tasks: _, db_questions = await crud_student_answer.find_all( db, filters=[StudentAnswer.student_task_id == item.id, StudentAnswer.incorrect == True], return_fields=["qno", "sqno"]) item.questions = ",".join([f"第{x.qno}-{x.sqno}题" for x in db_questions]) # 写入excel表 wb = openpyxl.Workbook() sh = wb.active title = ["姓名", "学号", "得分", "客观题", "主观题", "名次", "失分题"] for ridx, ritem in enumerate([title, db_tasks]): for cidx, citem in enumerate(ritem): if ridx == 0: sh.cell(row=ridx + 1, column=cidx + 1, value=citem) else: values = [ citem.student_name, citem.student_sno, citem.score, citem.objective_score, citem.subjective_score, citem.rank, citem.questions ] sh.append(values) outfile = f"{settings.UPLOADER_PATH}/{tid}.xlsx" wb.save(outfile) return FileResponse(outfile, filename=f"{tid}.xlsx") async def task_mark_process(tid: int = Path(..., description="任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 查询MarkTask db_task = await crud_task.find_one(db, filters={"id": tid}, return_fields=["uploaded_amount", "pid"]) if not db_task: return {"errcode": 400, "mess": "阅卷任务不存在!"} # 根据试卷ID查询试卷题目列表 _, db_question = await crud_question.find_all( db, filters=[PaperQuestion.qtype.notin_(OBJECTIVE_QUESTION_TYPES),PaperQuestion.pid==db_task.pid,PaperQuestion.usage.in_([1,2])], return_fields=["qno"], order_by=[asc("id")]) questions = OrderedDict() for q in db_question: questions[q[0]] = { "qno": q[0], "question_amount": db_task.uploaded_amount, "marked_amount": 0 } # 按题统计已批阅试题数量 stmt = select(StudentAnswer.qno, func.count()).select_from(StudentAnswer)\ .where(StudentAnswer.task_id == tid, StudentAnswer.status == True, StudentAnswer.qtype.notin_(OBJECTIVE_QUESTION_TYPES)).group_by(StudentAnswer.qno) for x in await crud_student_answer.execute_v2(db, stmt): questions[x[0]]["marked_amount"] += x[1] return {"data": list(questions.values())} async def mark_process_by_student(tid: int = Path(..., description="任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): process = OrderedDict() # 从试题列表获取总的试题数量 stmt = select(StudentAnswer.student_id, StudentAnswer.status, func.count())\ .select_from(StudentAnswer)\ .where(StudentAnswer.task_id == tid, StudentAnswer.qtype.notin_(OBJECTIVE_QUESTION_TYPES))\ .group_by(StudentAnswer.student_id, StudentAnswer.status) db_students = await crud_student_answer.execute_v2(db, stmt) for item in db_students: # 主观题总数 if item[0] not in process: process[item[0]] = {"question_amount": item[2]} else: process[item[0]]["question_amount"] += item[2] # 已批阅数量 if item[1]: process[item[0]]["marked_amount"] += item[2] else: process[item[0]]["marked_amount"] = 0 # 按题统计已批阅试题数量 return {"data": [{"student_id": k, **process[k]} for k in process]} # 作业中心 - 阅卷任务详情 - 答题分析 async def get_task_answers(page: int = 0, size: int = 10, tid: int = Path(..., description="阅卷任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 查询阅卷任务 task = await crud_task.find_one(db, filters={"id": tid}) if not task: return {"errcode": 404, "mess": "阅卷任务不存在!"} # 查询试题列表 total, questions = await crud_question.find_all(db, filters={"pid": task.pid}, order_by=[asc("id")], offset=(page - 1) * size, limit=size) # 统计每道题的答案分布 data = [] for item in questions: temp = { "qno": item.qno, "sqno": item.sqno, "stem": item.stem, "img": item.full_img, "answer": item.answer, "analysis": item.analysis } stmt = select(StudentAnswer.answer, func.count()).select_from(StudentAnswer)\ .where(StudentAnswer.task_id == tid, StudentAnswer.qno == item.qno, StudentAnswer.sqno == item.sqno)\ .group_by(StudentAnswer.answer) temp["dist"] = [{ "key": x[0], "val": int(x[1]) } for x in await crud_student_answer.execute_v2(db, stmt)] temp["dist"] = complete_answer_statistic(temp["dist"]) data.append(temp) return {"data": data, "total": total}