#!/usr/bin/env python # -*- coding: utf-8 -*- import json import os from typing import List, Dict, Any from pydantic import Field from sqlalchemy import update, text, func, select from sqlalchemy.ext.asyncio import AsyncSession from common.const import GRADE_ORDER, OBJECTIVE_QUESTION_TYPES from crud.base import ModelType from crud.marktask import crud_task, crud_student_task, crud_student_answer from crud.problem import crud_class_error_statistic, crud_student_push_record,\ crud_student_error_statistic,crud_class_push_record from crud.school import crud_grade, crud_system, crud_class from crud.user import crud_teacher, crud_student from db.asyncsession import LocalAsyncSession from models.marktask import StudentMarkTask, StudentAnswer from models.problem import ClassErrorQuestion, StudentErrorPushRecord, StudentErrorQuestion, ClassErrorPushRecord from models.school import SchoolGrade, SchoolClass from models.user import Teacher, Student from schemas.app.task import BgUpdateTaskReviewInfo, BgUpdateStudentTaskInfo from schemas.problem import StudentErrorPushRecordInDB, UpdateClassErrorQuestionItem, UpdateStudentErrorQuestionItem from schemas.school.school import NewGrade async def bgtask_delete_related_object(sid: int = 0, gid: int = 0, cid: int = 0): db = LocalAsyncSession() # 根据学校删除年级、班级、教师、学生等 if sid and (not gid) and (not cid): # 1、删除年级 await crud_grade.delete(db, where_clauses=[SchoolGrade.school_id == sid]) # 2、删除班级 await crud_class.delete(db, where_clauses=[SchoolClass.school_id == sid]) # TODO: 删除教师、学生、作业、考试、阅卷任务等 await crud_teacher.delete(db, where_clauses=[Teacher.school_id == sid]) await crud_student.delete(db, where_clauses=[Student.school_id == sid]) elif gid and (not cid): # 如果是年级被删除,则删除该年级下的所有班级 await crud_class.delete(db, where_clauses=[SchoolClass.grade_id == gid]) # TODO: 删除学生、作业、考试、阅卷任务等 await crud_student.delete(db, where_clauses=[Student.grade_id == gid]) elif cid: # 如果班级被删除了,则删除该班级下的所有学生 # TODO: 删除学生、作业、考试、阅卷任务等 await crud_student.delete(db, where_clauses=[Student.class_id == cid]) else: pass async def bgtask_delete_remote_file(filepath: str): urls = [x.strip() for x in filepath.split(";")] try: for url in urls: os.remove(url) except Exception as ex: print(f"[Error] Delete file: {str(ex)}") async def bgtask_rank_score(task_id: int = Field(..., description="阅卷任务ID")): """ 更新学生排名 """ db = LocalAsyncSession() # 根据 task_id 查询 StudentMarkTask stmt = f"""SELECT id, CASE WHEN @prevRank = score THEN @curRank WHEN @prevRank := score THEN @curRank := @curRank + 1 END AS rank FROM { StudentMarkTask.__tablename__ }, (SELECT @curRank :=0, @prevRank := NULL) rb WHERE task_id={ task_id } AND is_completed=true AND score>0 ORDER BY score desc; """ current_rank = 1 ranks = await db.execute(stmt) for item in ranks: current_rank += 1 stmt = (update(StudentMarkTask).where(StudentMarkTask.id == item[0]).values(rank=item[1])) await crud_student_task.execute_v2(db, stmt) # 为0分记录设置排名 stmt = (update(StudentMarkTask).where(StudentMarkTask.task_id == task_id, StudentMarkTask.score == 0).values(rank=current_rank)) await crud_student_task.execute_v2(db, stmt) print(f"Calculate Rank OK!!") async def bgtask_update_marktask(task_id: int, smt_has_marked: bool, smt_passed: bool, smt_score: float, diff_score: float): """ 更新阅卷任务 smt_has_marked:是否已被批阅过? smt_passed:在更新之前是否及格? smt_score:StudentMarkTask的最终分数 diff_score: 重复批阅时的分数差 """ print("Async Update MarkTask Starting!") print( f"the params: task_id={task_id}, smt_has_marked={smt_has_marked},smt_passed={smt_passed}, smt_score={smt_score},diff_score={diff_score} " ) db = LocalAsyncSession() # 查询MarkTask是否存在 db_obj = await crud_task.find_one(db, filters={"id": task_id}) # 更新已批阅人数 / 及格人数 # 及格人数更新逻辑 # 1、如果StudentMarkTask本身是及格的,当新分数 <60 时,及格人数 -1; # 2、如果StudentMarkTask本身是不及格的,当新分数 >=60 时,及格人数 +1; mt_marked_amount = db_obj.marked_amount pass_amount = db_obj.pass_amount if not smt_has_marked: mt_marked_amount += 1 # 批阅人数 if smt_score >= 60: pass_amount = db_obj.pass_amount + 1 else: if smt_passed: if smt_score < 60: pass_amount = db_obj.pass_amount - 1 else: if smt_score >= 60: pass_amount = db_obj.pass_amount + 1 print("------------------") print(f"the mt_marked_amount={mt_marked_amount}") print(f"the pass_amount={pass_amount}") pass_rate = round(pass_amount / mt_marked_amount, 2) print(f"the pass_rate={pass_rate}") # 最高分 / 最低分 stmt = select(func.max(StudentMarkTask.score), func.min(StudentMarkTask.score)).select_from(StudentMarkTask) \ .where(StudentMarkTask.task_id == task_id, StudentMarkTask.is_completed == 1) high_score, low_score = list(await crud_student_task.execute_v2(db, stmt))[0] print("the high/low score: ", high_score, low_score) # 平均分 avg_score = round((db_obj.avg_score * db_obj.marked_amount + diff_score) / mt_marked_amount, 2) # 批阅状态,0=未开始,1=进行中,2=已完成 status = 2 if mt_marked_amount == db_obj.student_amount else 1 # 更新 mt = BgUpdateTaskReviewInfo(marked_amount=mt_marked_amount, high_score=high_score, low_score=low_score, avg_score=avg_score, pass_amount=pass_amount, pass_rate=pass_rate, status=status) await crud_task.update(db, db_obj, mt) # 更新StudentMarkTask排名 # 1、如果StudentMarkTask未被批阅完成: # 1.1 增加MarkTask的批阅数量; # 1.2 当新增后的批阅数量等于学生数量时,表示MarkTask被批阅完成,则开始更新排名; # 2、如果StudentMarkTask批阅已完成,则属于重复批阅,如果两次批阅间存在分数差,则更新排名; if mt_marked_amount == db_obj.student_amount: # MarkTask批阅完成时,更新排名 await bgtask_rank_score(task_id) else: if diff_score and db_obj.status == 2: # 如果重复批阅有分数差,且MarkTask已批阅完成时,更新排名 await bgtask_rank_score(task_id) print("Async Update MarkTask Successfully!") async def bgtask_update_student_marktask(task_id: int = Field(..., description="学生阅卷任务ID"), qtype: int = Field(..., description="试题类型,0:客观题,1: 主观题"), score: int = Field(..., description="多次批阅之间的得分差"), has_marked: bool = Field(..., description="是否已被批阅过"), editor_id: int = Field(..., description="最后编辑人ID"), editor_name: str = Field(..., description="最后编辑人姓名")): """ 数据更新流程: 1、根据task_id更新StudentMarkTask, 2、如果该学生的所有试题已经被批阅完,则根据StudentMarkTask.task_id 更新 MarkTask 的最高分/最低分/平均分...等信息。 3、如果MarkTask批阅完成,则对StudentMarkTask进行排名。 """ print("Async Update StudentMarkTask Starting!") # new db session db = LocalAsyncSession() # ------------------ 更新学生阅卷任务----------------------------------- student_task = await crud_student_task.find_one(db, filters={"id": task_id}) has_completed = student_task.is_completed # 原始批阅状态 smt_is_passed = True if student_task.score >= 60 else False # 是否及格 # 如果试题属于重复批阅,则不增加批阅数量 if not has_marked: smt_marked_amount = student_task.marked_amount + 1 else: smt_marked_amount = student_task.marked_amount # 学生任务更新信息 smt_is_completed = smt_marked_amount == student_task.question_amount smt = BgUpdateStudentTaskInfo(marked_amount=smt_marked_amount, is_completed=smt_is_completed, score=student_task.score + score, editor_id=editor_id, editor_name=editor_name) # 按题型统计得分 if qtype not in OBJECTIVE_QUESTION_TYPES: smt.subjective_score = student_task.subjective_score + score else: smt.objective_score = student_task.objective_score + score # 开始更新 student_task = await crud_student_task.update(db, student_task, smt) # ------------------ 更新学生阅卷任务结束 --------------------------- # 如果单个学生被批阅完成,更新MarkTask信息 if smt_is_completed: diff_score = student_task.score if not has_completed else score await bgtask_update_marktask(task_id=student_task.task_id, smt_has_marked=has_completed, smt_passed=smt_is_passed, smt_score=student_task.score, diff_score=diff_score) print("Async Update StudentMarkTask Successfully!") async def bgtask_create_grade(sid: int, category: int): db = LocalAsyncSession() # 年级列表 school_system = await crud_system.find_one(db, filters={"id": category}, return_fields={"grades"}) grade_set = set(school_system.grades.split(",")) # 查询学校是否存在 _, db_grades = await crud_grade.find_all(db, filters={"school_id": sid}) db_grade_set = set([x.name for x in db_grades]) diff_grades = grade_set - db_grade_set new_grades = [NewGrade(sid=sid, name=x, order=GRADE_ORDER[x]) for x in diff_grades] try: await crud_grade.insert_many(db, new_grades) except Exception as ex: print(f"AutoCreateGradeError: {str(ex)}") return async def bgtask_delete_student_task_question(task_id: int): db = LocalAsyncSession() # 查询学生任务ID列表 _, student_tasks = await crud_student_task.find_all(db, filters={"task_id": task_id}, return_fields=["id"]) student_task_ids = ",".join([str(x.id) for x in student_tasks]) # 删除学生任务和答题 await crud_student_task.delete(db, where_clauses=[StudentMarkTask.task_id == task_id]) await crud_student_answer.delete(db, where_clauses=[StudentAnswer.task_id == task_id]) # 删除学生错题统计 print(8888888888888888) await crud_student_error_statistic.delete( db, where_clauses=[StudentErrorQuestion.task_id == task_id]) # 删除班级错题统计 print(9999999999999999) await crud_class_error_statistic.delete(db, where_clauses=[ClassErrorQuestion.task_id == task_id]) async def bgtask_update_class_teacher_student(cid: int, op: str = "add", teacher_amount: int = 0, student_amount: int = 0): """ 更新班级的教师数量和学生数量, op: 操作类型,add=增加, del=减少 """ db = LocalAsyncSession() if op == "del": teacher_amount = -teacher_amount student_amount = -student_amount stmt = (update(SchoolClass).where(SchoolClass.id == cid).values( teacher_amount=SchoolClass.teacher_amount + teacher_amount, student_amount=SchoolClass.student_amount + student_amount)) await crud_class.execute_v2(db, stmt) print( f"Async Update Class OK! student_amount={student_amount}, teacher_amount={teacher_amount}") async def update_student_error_statistic(student_id: int, task_type: str, error_count: int): """ 更新学生错题统计数据 """ # new db session db = LocalAsyncSession() db_error = await crud_student_error_statistic.find_one(db, filters={"student_id": student_id}) update_dict = {"total_errors": db_error.total_errors + error_count} if task_type == "work": key = "work_error_count" else: key = "exam_error_count" update_dict[key] = getattr(db_error, key) + error_count update_dict["error_ratio"] = round(update_dict["total_errors"] / db_error.total_questions * 100, 2) # 更新 item = UpdateStudentErrorQuestionItem(**update_dict) await crud_student_error_statistic.update(db, db_error, item) # 更新错题统计数据 async def bgtask_update_class_error_statistic(task_id: int, class_id: int, qid: List[int], count: int, old_score: float, new_score: float, has_marked: bool): # new db session db = LocalAsyncSession() # update db_error = await crud_class_error_statistic.find_one(db, filters={ "task_id": task_id, "class_id": class_id, "qid": qid }) update_dict = {} print("Async Update Class-Error-Statistic Starting!") # 如果是错题,则更新错题数量和错题率 if count: error_count = db_error.error_count + count error_ratio = round(error_count / db_error.student_count * 100, 2) update_dict["error_count"] = error_count update_dict["error_ratio"] = error_ratio # 更新答案分布 if not db_error.answer_dist: answer_dist = {} else: answer_dist = json.loads(db_error.answer_dist) # 更新分数的人数 if old_score != 0: answer_dist[old_score] -= 1 if new_score not in answer_dist: answer_dist[new_score] = 1 else: answer_dist[new_score] += 1 if answer_dist: update_dict["answer_dist"] = json.dumps(answer_dist) # 更新 if update_dict: try: item = UpdateClassErrorQuestionItem(**update_dict) await crud_class_error_statistic.update(db, db_error, item) except Exception as ex: print(f"UpdateClassError: {ex}") print("Async Update Class-Error-Statistic Successfully!") # 批量更新错题统计数据 async def bgtask_batch_update_class_error_statistic(task_id: int, class_id: int, questions: List[Dict[str, Any]]): """ 批量更新班级错题统计 :param task_id: 阅卷任务ID, int :param class_id: 班级ID, int :param questions: 错题统计数据,list,形式:[{"qid": 1, "answer": "A"},...] """ # new db session db = LocalAsyncSession() for q in questions: # update db_error = await crud_class_error_statistic.find_one(db, filters={ "task_id": task_id, "class_id": class_id, "qid": q["qid"] }) update_dict = {} print("Async Update[batch] Class-Error-Statistic Starting!") # 如果是错题,则更新错题数量和错题率 error_count = db_error.error_count + 1 error_ratio = round(error_count / db_error.student_count * 100, 2) update_dict["error_count"] = error_count update_dict["error_ratio"] = error_ratio # 更新答案分布 answer_dist = {} if not db_error.answer_dist: answer_dist[q["answer"]] = 1 else: answer_dist = json.loads(db_error.answer_dist) answer_dist[q["answer"]] = answer_dist.get(q["answer"], 0) + 1 if answer_dist: update_dict["answer_dist"] = json.dumps(answer_dist) # 更新 if update_dict: try: item = UpdateClassErrorQuestionItem(**update_dict) await crud_class_error_statistic.update(db, db_error, item) except Exception as ex: print(f"UpdateClassError: {ex}") print("Async Update[batch] Class-Error-Statistic Successfully!") # 创建学生错题推送记录 async def bgtask_create_student_push_record(task: ModelType, db_class: ModelType, total_question: int, push_record_id: int, students: List[ModelType], current_user: Teacher): db = LocalAsyncSession() # 学生推送列表 push_records = [] # 查询错题ID total, db_errors = await crud_class_error_statistic.find_all( db, filters=[ ClassErrorQuestion.task_id == task.id, ClassErrorQuestion.class_id == db_class.id, ClassErrorQuestion.error_count > 0, ], return_fields=["qid"]) push_error_ids = [x.qid for x in db_errors] # 为每个学生创建推送记录 for stu in students: obj_in = StudentErrorPushRecordInDB(student_id=stu.id, student_sno=stu.sno, student_name=stu.name, task_id=task.id, task_name=task.name, task_type=task.mtype, question_count=total_question, push_record_id=push_record_id, push_error_count=total, push_error_ids=push_error_ids, creator_id=current_user.id, creator_name=current_user.name, editor_id=current_user.id, editor_name=current_user.name) push_records.append(obj_in) await crud_student_push_record.insert_many(db, push_records) # 克隆学生错题推送记录 async def bgtask_clone_student_push_record(db: AsyncSession, cls_prd: ModelType, students: List[ModelType]): # 从班级推送记录中查出一条记录 db_prd = await crud_student_push_record.find_one(db, filters={"push_record_id": cls_prd.id}) push_records = [] for stu in students: obj_in = StudentErrorPushRecordInDB(student_id=stu.id, student_sno=stu.sno, student_name=stu.name, task_id=db_prd.task_id, task_name=db_prd.task_name, task_type=db_prd.task_type, question_count=db_prd.total_question, push_record_id=db_prd.push_record_id, push_error_count=db_prd.total, push_error_ids=db_prd.push_error_ids, creator_id=db_prd.creator_id, creator_name=db_prd.creator_name, editor_id=db_prd.editor_id, editor_name=db_prd.editor_name) push_records.append(obj_in) await crud_student_push_record.insert_many(db, push_records) # 删除学生错题推送记录 async def bgtask_delete_student_push_record(db: AsyncSession, cls_prid: int, students: List[int] = None): _q = [StudentErrorPushRecord.push_record_id == cls_prid] if students: _q = [StudentErrorPushRecord.student_id.in_(students)] try: await crud_student_push_record.delete(db, where_clauses=_q) except Exception as ex: print(f"[ERROR] 删除学生推送列表失败! 错误信息: {str(ex)}")