#!/usr/bin/env python # -*- coding: utf-8 -*- import json from typing import Union from fastapi import APIRouter, Depends, Query, Path from sqlalchemy import select, func, case, or_, and_, distinct, asc from sqlalchemy.ext.asyncio import AsyncSession from starlette.background import BackgroundTasks from bgtask.tasks import (bgtask_create_student_push_record, bgtask_clone_student_push_record, bgtask_delete_student_push_record) from crud.marktask import crud_student_answer, crud_task from crud.paper import crud_question from crud.problem import (crud_class_push_record, crud_class_error_statistic, crud_student_push_record, crud_student_error_statistic) from crud.school import crud_class from crud.user import crud_student from models.marktask import MarkTask, StudentAnswer from models.paper import PaperQuestion from models.problem import ClassErrorQuestion, StudentErrorQuestion from models.user import Student, Teacher from schemas.base import DetailMixin from schemas.paper import UpdateMarkTaskInfo from schemas.problem import (ClassErrorQuestionList, StudentErrorQuestionList, StudentErrorQuestionDetailList, ClassErrorPushStudentList, StudentTaskErrorQuestionList, StudentTaskErrorDetail, ClassErrorRatioInfo) from schemas.problem import (CreateClassErrorPushRecordInfo, ClassErrorPushRecordInDB, StudentErrorBookQuestionList, UpdateClassErrorPushRecordInfo, ClassErrorPushRecordList, ClassErrorPushRecordDetail) from utils.depends import get_current_user, get_async_db router = APIRouter(tags=["错题中心-教师学生端"]) @router.get("/cls-errs", response_model=ClassErrorQuestionList, response_model_exclude_none=True, summary="班级错题列表") async def get_class_errors(page: int = 1, size: int = 10, school_id: int = Query(0, alias="sid", description="学校ID"), grade_id: int = Query(0, alias="gid", description="年级ID"), class_id: str = Query("", alias="cid", description="班级ID"), task_id: int = Query(0, alias="tid", description="阅卷任务ID"), kw: str = Query("", description="题目关键字"), ratio: int = Query(0, description="错题率"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 查询条件 _q = [] if school_id: # 学校ID _q.append(ClassErrorQuestion.school_id == school_id) if grade_id: # 年级ID _q.append(ClassErrorQuestion.grade_id == grade_id) if class_id and (class_id != "0"): # 班级ID _q.append(ClassErrorQuestion.class_id == int(class_id)) else: _q.append( ClassErrorQuestion.class_id.in_([int(x) for x in current_user.class_id.split(",")])) if task_id: # 试卷ID _q.append(ClassErrorQuestion.task_id == task_id) if ratio > 0: _q.append(ClassErrorQuestion.error_ratio >= min(ratio, 100)) else: _q.append(ClassErrorQuestion.error_ratio > max(ratio, 0)) if kw: # 题目关键字 _q.append(ClassErrorQuestion.task_name.like(f"%{kw}%")) # 查询总数 count_stmt = select(func.count()).select_from(ClassErrorQuestion).join( PaperQuestion, ClassErrorQuestion.qid == PaperQuestion.id) # 从学生答案表中查出所有的错题 error_fields = [ "student_count", "error_count", "error_ratio", "task_type", "task_name", "qid", "answer_dist" ] question_fields = ["stem", "answer", "analysis"] data_stmt = select( *[getattr(ClassErrorQuestion, x) for x in error_fields], *[getattr(PaperQuestion, x) for x in question_fields], ).select_from(ClassErrorQuestion).join(PaperQuestion, ClassErrorQuestion.qid == PaperQuestion.id) if _q: count_stmt = count_stmt.where(*_q) data_stmt = data_stmt.where(*_q) # 总数 count = await crud_student_answer.execute_v2(db, count_stmt) total = count[0][0] # 错题列表 data = [] offset = (page - 1) * size data_stmt = data_stmt.limit(size).offset(offset).order_by(asc(ClassErrorQuestion.qid)) db_questions = await crud_student_answer.execute_v2(db, data_stmt) for q in db_questions: temp = { "student_count": q[0], "error_count": q[1], "right_count": q[0] - q[1], "error_ratio": q[2], "task_type": q[3], "task_name": q[4], "qid": q[5], "stem": q[7], "answer": q[8], "analysis": q[9] } try: _dist = [{"key": k, "val": v} for k, v in json.loads(q[6]).items()] _dist.sort(key=lambda x: x["key"]) except json.decoder.JSONDecodeError: _dist = [] temp["dist"] = _dist data.append(temp) return {"data": data, "total": total} @router.get("/stu-errs", response_model=StudentErrorQuestionList, response_model_exclude_none=True, summary="学生错题列表-综合") async def get_student_errors(page: int = 1, size: int = 10, school_id: int = Query(0, alias="sid", description="学校ID"), grade_id: int = Query(0, alias="gid", description="年级ID"), class_id: int = Query(0, alias="cid", description="班级ID"), kw: str = Query("", description="关键词"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): _q = [StudentErrorQuestion.error_ratio > 0] if class_id: # 班级ID _q.append(StudentErrorQuestion.class_id == class_id) if school_id: _q.append(StudentErrorQuestion.school_id == school_id) if grade_id: _q.append(StudentErrorQuestion.grade_id == grade_id) else: # 通过school_id和grade_id查询class_id _sq = {} if school_id: _sq["school_id"] = school_id if grade_id: _sq["grade_id"] = grade_id if _sq: total, db_classes = await crud_class.find_all(db, filters=_sq, return_fields=["id"]) if not total: return {"errcode": 400, "mess": "班级不存在!"} _q.append(StudentErrorQuestion.class_id.in_([x.id for x in db_classes])) if kw: # 关键词 _q.append( or_(StudentErrorQuestion.student_sno == kw, StudentErrorQuestion.student_name == kw)) # 返回结果 offset = (page - 1) * size stmt = select(func.count(distinct(StudentErrorQuestion.student_id)))\ .select_from(StudentErrorQuestion).where(*_q) total = (await crud_student_error_statistic.execute_v2(db, stmt))[0][0] stmt = select(StudentErrorQuestion.student_id, StudentErrorQuestion.student_sno, StudentErrorQuestion.student_name, func.sum(StudentErrorQuestion.total_questions).label("total_questions"), func.sum(StudentErrorQuestion.total_errors).label("total_errors"), func.sum(StudentErrorQuestion.work_error_count).label("work_error_count"), func.sum(StudentErrorQuestion.exam_error_count).label("exam_error_count"), func.avg(StudentErrorQuestion.error_ratio).label("error_ratio"))\ .select_from(StudentErrorQuestion)\ .where(*_q)\ .group_by(StudentErrorQuestion.student_id, StudentErrorQuestion.student_sno, StudentErrorQuestion.student_name)\ .offset(offset)\ .limit(size) # 查询数据 db_errors = await crud_student_error_statistic.execute_v2(db, stmt) return {"data": db_errors, "total": total} @router.get("/stu-errs/{sid}/tasks", response_model=StudentTaskErrorQuestionList, response_model_exclude_none=True, summary="学生错题-任务错题统计") async def get_student_task_errors(page: int = 1, size: int = 10, student_id: int = Path(..., alias="sid", description="学生ID"), task_id: int = Query(0, alias="tid", description="任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 查询条件 _q = [StudentErrorQuestion.student_id == student_id, StudentErrorQuestion.error_ratio > 0] if task_id: _q.append(StudentErrorQuestion.task_id == task_id) # 查询错题统计数据 offset = (page - 1) * size stmt = select(func.count(distinct(StudentErrorQuestion.task_id))) \ .select_from(StudentErrorQuestion).where(*_q) total = (await crud_student_error_statistic.execute_v2(db, stmt))[0][0] stmt = select( StudentErrorQuestion.student_task_id, MarkTask.name, MarkTask.mtype, StudentErrorQuestion.total_questions, StudentErrorQuestion.work_error_count, StudentErrorQuestion.exam_error_count, StudentErrorQuestion.error_ratio, MarkTask.id)\ .select_from(StudentErrorQuestion).join(MarkTask, StudentErrorQuestion.task_id == MarkTask.id)\ .where(*_q).offset(offset).limit(size) # 查询数据 data = [] db_objs = await crud_student_error_statistic.execute_v2(db, stmt) for item in db_objs: temp = { "student_task_id": item[0], "task_name": item[1], "task_type": item[2], "total_count": item[3], "error_count": item[4] if item[2] == "work" else item[5], "error_ratio": item[6], "task_id": item[7] } data.append(temp) return {"data": data, "total": total} @router.get("/stu-errs/{sid}/tasks/{tid}/error-info", response_model=StudentTaskErrorDetail, response_model_exclude_none=True, summary="学生阅卷任务错题信息(学生错题顶部使用)") async def get_task_error_info(student_id: int = Path(..., alias="sid", description="学生ID"), student_task_id: int = Path(..., alias="tid", description="学生阅卷任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Union[Teacher, Student] = Depends(get_current_user)): db_obj = await crud_student_error_statistic.find_one(db, filters={ "student_id": student_id, "student_task_id": student_task_id }) db_task = await crud_task.find_one(db, filters={"id": db_obj.task_id}) data = { "student_sno": db_obj.student_sno, "student_name": db_obj.student_name, "total_questions": db_obj.total_questions, "total_errors": db_obj.total_errors, "error_ratio": db_obj.error_ratio, "task_id": db_obj.task_id, "task_name": db_task.name } if not db_obj: return {"errcode": 404, "mess": "阅卷任务不存在!"} return {"data": data} @router.get("/stu-errs/{sid}/tasks/{tid}/questions", response_model=StudentErrorQuestionDetailList, response_model_exclude_none=True, summary="学生错题-错误试题列表") async def get_personal_errors(page: int = 1, size: int = 10, student_id: int = Path(..., alias="sid", description="学生ID"), student_task_id: int = Path(..., alias="tid", description="学生阅卷任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Union[Teacher, Student] = Depends(get_current_user)): # 返回结果 data = [] _q = {"incorrect": 1, "student_id": student_id, "student_task_id": student_task_id} question_ids = [] question_dict = {} counter = 0 offset = (page - 1) * size total, db_errors = await crud_student_answer.find_all( db, filters=_q, return_fields=["pid", "qid", "task_id", "task_name", "mtype", "qimg", "marked_img"], limit=size, offset=offset, order_by=[asc(StudentAnswer.qid)]) for x in db_errors: question_dict[x.qid] = counter question_ids.append(x.qid) temp = { "pid": x.pid, "qid": x.qid, "marked_img": x.marked_img or x.qimg, "task_id": x.task_id, "task_name": x.task_name, "task_type": x.mtype.value } data.append(temp) counter += 1 # 查询试题信息 if data: _q = [PaperQuestion.pid == data[0]['pid']] if len(question_ids) == 1: _q.append(PaperQuestion.id == question_ids[0]) else: _q.append(PaperQuestion.id.in_(question_ids)) _, db_questions = await crud_question.find_all( db, filters=_q, return_fields=["id", "answer", "analysis", "level", "lpoints"]) for x in db_questions: data[question_dict[x.id]].update({ "answer": x.answer, "analysis": x.analysis, "level": x.level, "lpoints": x.lpoints }) return {"data": data, "total": total} @router.get("/stu-errs/{sprid}/errbook", response_model=StudentErrorBookQuestionList, response_model_exclude_none=True, summary="学生错题本") async def get_student_error_book(page: int = 1, size: int = 10, sprid: int = Path(..., description="学生错题推送记录ID"), db: AsyncSession = Depends(get_async_db), current_user: Union[Teacher, Student] = Depends(get_current_user)): # 分页 offset = (page - 1) * size # 学生错题推送记录 db_error = await crud_student_push_record.find_one(db, filters={"id": sprid}) # 根据学生错题推送记录中的试题ID去获取相应的试题 _q = [] if len(db_error.push_error_ids) == 1: _q.append(PaperQuestion.id == db_error.push_error_ids[0]) else: _q.append(PaperQuestion.id.in_(db_error.push_error_ids)) total, db_questions = await crud_question.find_all(db, filters=_q, limit=size, offset=offset) return {"data": db_questions, "total": total} @router.get("/stu-errs/{qid}/rel-questions", response_model=StudentErrorBookQuestionList, response_model_exclude_none=True, summary="举一反三试题列表") async def get_related_questions(qid: int = Path(..., description="试题ID"), db: AsyncSession = Depends(get_async_db), current_user: Union[Teacher, Student] = Depends(get_current_user)): # 举一反三,使用知识点查询+难度查询 db_question = await crud_question.find_one(db, filters={"id": qid}) _q = [ PaperQuestion.id != qid, PaperQuestion.level == db_question.level, PaperQuestion.lpoints == db_question.lpoints ] total, db_questions = await crud_question.find_all(db, filters=_q, limit=3) return {"data": db_questions, "total": total} @router.get("/cls-push-errs", response_model=ClassErrorPushRecordList, response_model_exclude_none=True, summary="错题推送记录列表") async def get_cls_push_records(page: int = 1, size: int = 10, school_id: int = Query(0, alias="sid", description="学校ID"), grade_id: int = Query(0, alias="gid", description="年级ID"), class_id: int = Query(0, alias="cid", description="班级ID"), task_id: int = Query(0, alias="tid", description="阅卷任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): _q = {} if school_id: _q["school_id"] = school_id if grade_id: _q["grade_id"] = grade_id if class_id: _q["class_id"] = class_id if task_id: _q["task_id"] = task_id # 查询并返回 total, db_records = await crud_class_push_record.find_all(db, filters=_q, limit=size, offset=(page - 1) * size) return {"data": db_records, "total": total} @router.get("/cls-push-errs/{prid}", response_model=ClassErrorPushStudentList, response_model_exclude_none=True, summary="班级错题推送详情") async def get_cls_error_push_record(page: int = 1, size: int = 10, prid: int = Path(..., description="班级错题推送记录ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 判断推送记录是否存在 db_prd = await crud_class_push_record.find_one(db, filters={"id": prid}) if not db_prd: return {"errcode": 400, "mess": "推送记录不存在!"} # 查询学生推送记录 return_fields = [ "id", "student_id", "student_sno", "student_name", "push_error_count", "printed", "created_at" ] total, db_student_push_records = await crud_student_push_record.find_all( db, filters={"push_record_id": {db_prd.id}}, return_fields=return_fields, limit=size, offset=(page - 1) * size) return {"data": db_student_push_records, "total": total} @router.post("/cls-push-errs", response_model=ClassErrorPushRecordDetail, response_model_exclude_none=True, summary="创建错题推送") async def create_error_push_record(info: CreateClassErrorPushRecordInfo, bgtask: BackgroundTasks, db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 判断班级是否存在 db_class = await crud_class.find_one(db, filters={"id": info.class_id}) if not db_class: return {"errcode": 400, "mess": "班级不存在!"} info.school_id = db_class.school_id info.grade_id = db_class.grade_id # 判断阅卷任务是否存在 db_task = await crud_task.find_one(db, filters={"id": info.task_id}) if not db_task: return {"errcode": 400, "mess": "阅卷任务不存在!"} if db_task.status != 2: return {"errcode": 400, "mess": "任务批阅中,不能推送错题!"} # 判断学生是否存在 _q = [and_(Student.class_id == info.class_id, Student.id.in_(info.student_ids))] student_count, db_students = await crud_student.find_all(db, filters=_q) if student_count != len(info.student_ids): return {"errcode": 400, "mess": "有学生不存在!"} # 获取错题总数 stmt = select(func.sum(case((ClassErrorQuestion.error_ratio > 0, 1), else_=0)), func.count()).select_from(ClassErrorQuestion).where( ClassErrorQuestion.class_id == db_class.id, ClassErrorQuestion.task_id == db_task.id) result = list(await crud_class_error_statistic.execute_v2(db, stmt)) error_count, total_count = result[0] if result else (0, 0) if not error_count: return { "errcode": 400, "mess": f"{'考试' if db_task.mtype == 'exam' else '作业'}【{db_task.name}】无错题可推送!" } # 创建班级错题推送记录 cls_error = ClassErrorPushRecordInDB(school_id=db_class.school_id, grade_id=db_class.grade_id, class_id=db_class.id, class_name=db_class.name, task_id=db_task.id, task_name=db_task.name, task_type=db_task.mtype, error_count=error_count, student_count=db_class.student_amount, push_student_count=student_count, push_student_ids=info.student_ids, creator_id=current_user.id, creator_name=current_user.name, editor_id=current_user.id, editor_name=current_user.name) db_cls_error = await crud_class_push_record.insert_one(db, cls_error) # 后台异步创建学生推送列表 bgtask.add_task(bgtask_create_student_push_record, db_task, db_class, total_count, db_cls_error.id, db_students, current_user) return {"data": db_cls_error} @router.put("/cls-push-errs/{prid}", response_model=ClassErrorPushRecordDetail, response_model_exclude_none=True, summary="更新班级错题推送记录") async def update_error_push_record(info: UpdateClassErrorPushRecordInfo, bg_task: BackgroundTasks, prid: int = Path(..., description="班级错题推送记录ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 判断推送记录是否存在 db_prd = await crud_class_push_record.find_one(db, filters={"id": prid}) if not db_prd: return {"errcode": 400, "mess": "推送记录不存在!"} info_dict = info.dict(exclude_none=True) # 判断推送学生是否变更 if ("student_ids" in info_dict) and (info_dict["student_ids"] != db_prd.push_student_ids): post_student_set = set(info_dict["student_ids"]) pushed_student_set = set(db_prd.push_student_ids) add_students = post_student_set - pushed_student_set # 待添加的学生 del_students = pushed_student_set - post_student_set # 待删除的学生 # 判断学生是否存在 student_count, db_students = await crud_student.find_all( db, filters=[and_(Student.class_id == db_prd.class_id, Student.id.in_(add_students))], ) if student_count != len(add_students): return {"errcode": 400, "mess": "有学生不存在!"} # 后台异步创建/删除学生推送列表 if add_students: bg_task.add_task(bgtask_clone_student_push_record, db, db_prd, db_students, current_user) if del_students: bg_task.add_task(bgtask_delete_student_push_record, db, db_prd, del_students) return {"data": db_prd} @router.delete("/cls-push-errs/{prid}", response_model=DetailMixin, response_model_exclude_none=True, summary="删除班级错题推送记录") async def delete_cls_push_record(bg_task: BackgroundTasks, prid: int = Path(..., description="班级错题推送记录ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 删除班级错题推送记录 try: await crud_class_push_record.delete(db, obj_id=prid) except Exception as ex: print(f"[ERROR] 删除班级推送列表失败! 错误信息: {str(ex)}") else: bg_task.add_task(bgtask_delete_student_push_record, db, prid) # 删除相关的学生推送记录 return {"data": None} @router.post("/cls-err/ratio", response_model=DetailMixin, response_model_exclude_none=True, summary="设置阅卷任务错题率") async def set_class_error_ratio(info: ClassErrorRatioInfo, db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 校验task是否存在 db_task = await crud_task.find_one(db, filters={"id": info.task_id}) if not db_task: return {"errcode": 404, "mess": "阅卷任务不存在!"} obj4upd = UpdateMarkTaskInfo(error_ratio=info.ratio) await crud_task.update(db, db_task, obj4upd) return {"data": info.ratio}