123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import json
- import os
- from typing import List, Dict, Any
- from pydantic import Field
- from sqlalchemy import update, text, func, select
- from sqlalchemy.ext.asyncio import AsyncSession
- from common.const import GRADE_ORDER, OBJECTIVE_QUESTION_TYPES
- from crud.base import ModelType
- from crud.marktask import crud_task, crud_student_task, crud_student_answer
- from crud.problem import crud_class_error_statistic, crud_student_push_record,\
- crud_student_error_statistic,crud_class_push_record
- from crud.school import crud_grade, crud_system, crud_class
- from crud.user import crud_teacher, crud_student
- from db.asyncsession import LocalAsyncSession
- from models.marktask import StudentMarkTask, StudentAnswer
- from models.problem import ClassErrorQuestion, StudentErrorPushRecord, StudentErrorQuestion, ClassErrorPushRecord
- from models.school import SchoolGrade, SchoolClass
- from models.user import Teacher, Student
- from schemas.app.task import BgUpdateTaskReviewInfo, BgUpdateStudentTaskInfo
- from schemas.problem import StudentErrorPushRecordInDB, UpdateClassErrorQuestionItem, UpdateStudentErrorQuestionItem
- from schemas.school.school import NewGrade
- async def bgtask_delete_related_object(sid: int = 0, gid: int = 0, cid: int = 0):
- db = LocalAsyncSession()
- # 根据学校删除年级、班级、教师、学生等
- if sid and (not gid) and (not cid):
- # 1、删除年级
- await crud_grade.delete(db, where_clauses=[SchoolGrade.school_id == sid])
- # 2、删除班级
- await crud_class.delete(db, where_clauses=[SchoolClass.school_id == sid])
- # TODO: 删除教师、学生、作业、考试、阅卷任务等
- await crud_teacher.delete(db, where_clauses=[Teacher.school_id == sid])
- await crud_student.delete(db, where_clauses=[Student.school_id == sid])
- elif gid and (not cid): # 如果是年级被删除,则删除该年级下的所有班级
- await crud_class.delete(db, where_clauses=[SchoolClass.grade_id == gid])
- # TODO: 删除学生、作业、考试、阅卷任务等
- await crud_student.delete(db, where_clauses=[Student.grade_id == gid])
- elif cid: # 如果班级被删除了,则删除该班级下的所有学生
- # TODO: 删除学生、作业、考试、阅卷任务等
- await crud_student.delete(db, where_clauses=[Student.class_id == cid])
- else:
- pass
- async def bgtask_delete_remote_file(filepath: str):
- urls = [x.strip() for x in filepath.split(";")]
- try:
- for url in urls:
- os.remove(url)
- except Exception as ex:
- print(f"[Error] Delete file: {str(ex)}")
- async def bgtask_rank_score(task_id: int = Field(..., description="阅卷任务ID")):
- """
- 更新学生排名
- """
- db = LocalAsyncSession()
- # 根据 task_id 查询 StudentMarkTask
- stmt = f"""SELECT id,
- CASE
- WHEN @prevRank = score THEN @curRank
- WHEN @prevRank := score THEN @curRank := @curRank + 1
- END AS rank
- FROM { StudentMarkTask.__tablename__ },
- (SELECT @curRank :=0, @prevRank := NULL) rb
- WHERE task_id={ task_id } AND is_completed=true AND score>0
- ORDER BY score desc;
- """
- current_rank = 1
- ranks = await db.execute(stmt)
- for item in ranks:
- current_rank += 1
- stmt = (update(StudentMarkTask).where(StudentMarkTask.id == item[0]).values(rank=item[1]))
- await crud_student_task.execute_v2(db, stmt)
- # 为0分记录设置排名
- stmt = (update(StudentMarkTask).where(StudentMarkTask.task_id == task_id,
- StudentMarkTask.score == 0).values(rank=current_rank))
- await crud_student_task.execute_v2(db, stmt)
- print(f"Calculate Rank OK!!")
- async def bgtask_update_marktask(task_id: int, smt_has_marked: bool, smt_passed: bool,
- smt_score: float, diff_score: float):
- """
- 更新阅卷任务
- smt_has_marked:是否已被批阅过?
- smt_passed:在更新之前是否及格?
- smt_score:StudentMarkTask的最终分数
- diff_score: 重复批阅时的分数差
- """
- print("Async Update MarkTask Starting!")
- print(
- f"the params: task_id={task_id}, smt_has_marked={smt_has_marked},smt_passed={smt_passed}, smt_score={smt_score},diff_score={diff_score} "
- )
- db = LocalAsyncSession()
- # 查询MarkTask是否存在
- db_obj = await crud_task.find_one(db, filters={"id": task_id})
- # 更新已批阅人数 / 及格人数
- # 及格人数更新逻辑
- # 1、如果StudentMarkTask本身是及格的,当新分数 <60 时,及格人数 -1;
- # 2、如果StudentMarkTask本身是不及格的,当新分数 >=60 时,及格人数 +1;
- mt_marked_amount = db_obj.marked_amount
- pass_amount = db_obj.pass_amount
- if not smt_has_marked:
- mt_marked_amount += 1 # 批阅人数
- if smt_score >= 60:
- pass_amount = db_obj.pass_amount + 1
- else:
- if smt_passed:
- if smt_score < 60:
- pass_amount = db_obj.pass_amount - 1
- else:
- if smt_score >= 60:
- pass_amount = db_obj.pass_amount + 1
- print("------------------")
- print(f"the mt_marked_amount={mt_marked_amount}")
- print(f"the pass_amount={pass_amount}")
- pass_rate = round(pass_amount / mt_marked_amount, 2)
- print(f"the pass_rate={pass_rate}")
- # 最高分 / 最低分
- stmt = select(func.max(StudentMarkTask.score), func.min(StudentMarkTask.score)).select_from(StudentMarkTask) \
- .where(StudentMarkTask.task_id == task_id, StudentMarkTask.is_completed == 1)
- high_score, low_score = list(await crud_student_task.execute_v2(db, stmt))[0]
- print("the high/low score: ", high_score, low_score)
- # 平均分
- avg_score = round((db_obj.avg_score * db_obj.marked_amount + diff_score) / mt_marked_amount, 2)
- # 批阅状态,0=未开始,1=进行中,2=已完成
- status = 2 if mt_marked_amount == db_obj.student_amount else 1
- # 更新
- mt = BgUpdateTaskReviewInfo(marked_amount=mt_marked_amount,
- high_score=high_score,
- low_score=low_score,
- avg_score=avg_score,
- pass_amount=pass_amount,
- pass_rate=pass_rate,
- status=status)
- await crud_task.update(db, db_obj, mt)
- # 更新StudentMarkTask排名
- # 1、如果StudentMarkTask未被批阅完成:
- # 1.1 增加MarkTask的批阅数量;
- # 1.2 当新增后的批阅数量等于学生数量时,表示MarkTask被批阅完成,则开始更新排名;
- # 2、如果StudentMarkTask批阅已完成,则属于重复批阅,如果两次批阅间存在分数差,则更新排名;
- if mt_marked_amount == db_obj.student_amount: # MarkTask批阅完成时,更新排名
- await bgtask_rank_score(task_id)
- else:
- if diff_score and db_obj.status == 2: # 如果重复批阅有分数差,且MarkTask已批阅完成时,更新排名
- await bgtask_rank_score(task_id)
- print("Async Update MarkTask Successfully!")
- async def bgtask_update_student_marktask(task_id: int = Field(..., description="学生阅卷任务ID"),
- qtype: int = Field(..., description="试题类型,0:客观题,1: 主观题"),
- score: int = Field(..., description="多次批阅之间的得分差"),
- has_marked: bool = Field(..., description="是否已被批阅过"),
- editor_id: int = Field(..., description="最后编辑人ID"),
- editor_name: str = Field(..., description="最后编辑人姓名")):
- """
- 数据更新流程:
- 1、根据task_id更新StudentMarkTask,
- 2、如果该学生的所有试题已经被批阅完,则根据StudentMarkTask.task_id 更新 MarkTask 的最高分/最低分/平均分...等信息。
- 3、如果MarkTask批阅完成,则对StudentMarkTask进行排名。
- """
- print("Async Update StudentMarkTask Starting!")
- # new db session
- db = LocalAsyncSession()
- # ------------------ 更新学生阅卷任务-----------------------------------
- student_task = await crud_student_task.find_one(db, filters={"id": task_id})
- has_completed = student_task.is_completed # 原始批阅状态
- smt_is_passed = True if student_task.score >= 60 else False # 是否及格
- # 如果试题属于重复批阅,则不增加批阅数量
- if not has_marked:
- smt_marked_amount = student_task.marked_amount + 1
- else:
- smt_marked_amount = student_task.marked_amount
- # 学生任务更新信息
- smt_is_completed = smt_marked_amount == student_task.question_amount
- smt = BgUpdateStudentTaskInfo(marked_amount=smt_marked_amount,
- is_completed=smt_is_completed,
- score=student_task.score + score,
- editor_id=editor_id,
- editor_name=editor_name)
- # 按题型统计得分
- if qtype not in OBJECTIVE_QUESTION_TYPES:
- smt.subjective_score = student_task.subjective_score + score
- else:
- smt.objective_score = student_task.objective_score + score
- # 开始更新
- student_task = await crud_student_task.update(db, student_task, smt)
- # ------------------ 更新学生阅卷任务结束 ---------------------------
- # 如果单个学生被批阅完成,更新MarkTask信息
- if smt_is_completed:
- diff_score = student_task.score if not has_completed else score
- await bgtask_update_marktask(task_id=student_task.task_id,
- smt_has_marked=has_completed,
- smt_passed=smt_is_passed,
- smt_score=student_task.score,
- diff_score=diff_score)
- print("Async Update StudentMarkTask Successfully!")
- async def bgtask_create_grade(sid: int, category: int):
- db = LocalAsyncSession()
- # 年级列表
- school_system = await crud_system.find_one(db,
- filters={"id": category},
- return_fields={"grades"})
- grade_set = set(school_system.grades.split(","))
- # 查询学校是否存在
- _, db_grades = await crud_grade.find_all(db, filters={"school_id": sid})
- db_grade_set = set([x.name for x in db_grades])
- diff_grades = grade_set - db_grade_set
- new_grades = [NewGrade(sid=sid, name=x, order=GRADE_ORDER[x]) for x in diff_grades]
- try:
- await crud_grade.insert_many(db, new_grades)
- except Exception as ex:
- print(f"AutoCreateGradeError: {str(ex)}")
- return
- async def bgtask_delete_student_task_question(task_id: int):
- db = LocalAsyncSession()
- # 查询学生任务ID列表
- _, student_tasks = await crud_student_task.find_all(db,
- filters={"task_id": task_id},
- return_fields=["id"])
- student_task_ids = ",".join([str(x.id) for x in student_tasks])
- # 删除学生任务和答题
- await crud_student_task.delete(db, where_clauses=[StudentMarkTask.task_id == task_id])
- await crud_student_answer.delete(db, where_clauses=[StudentAnswer.task_id == task_id])
- # 删除学生错题统计
- print(8888888888888888)
- await crud_student_error_statistic.delete(
- db, where_clauses=[StudentErrorQuestion.task_id == task_id])
- # 删除班级错题统计
- print(9999999999999999)
- await crud_class_error_statistic.delete(db,
- where_clauses=[ClassErrorQuestion.task_id == task_id])
- async def bgtask_update_class_teacher_student(cid: int,
- op: str = "add",
- teacher_amount: int = 0,
- student_amount: int = 0):
- """
- 更新班级的教师数量和学生数量, op: 操作类型,add=增加, del=减少
- """
- db = LocalAsyncSession()
- if op == "del":
- teacher_amount = -teacher_amount
- student_amount = -student_amount
- stmt = (update(SchoolClass).where(SchoolClass.id == cid).values(
- teacher_amount=SchoolClass.teacher_amount + teacher_amount,
- student_amount=SchoolClass.student_amount + student_amount))
- await crud_class.execute_v2(db, stmt)
- print(
- f"Async Update Class OK! student_amount={student_amount}, teacher_amount={teacher_amount}")
- async def update_student_error_statistic(student_id: int, task_type: str, error_count: int):
- """
- 更新学生错题统计数据
- """
- # new db session
- db = LocalAsyncSession()
- db_error = await crud_student_error_statistic.find_one(db, filters={"student_id": student_id})
- update_dict = {"total_errors": db_error.total_errors + error_count}
- if task_type == "work":
- key = "work_error_count"
- else:
- key = "exam_error_count"
- update_dict[key] = getattr(db_error, key) + error_count
- update_dict["error_ratio"] = round(update_dict["total_errors"] / db_error.total_questions * 100,
- 2)
- # 更新
- item = UpdateStudentErrorQuestionItem(**update_dict)
- await crud_student_error_statistic.update(db, db_error, item)
- # 更新错题统计数据
- async def bgtask_update_class_error_statistic(task_id: int, class_id: int, qid: List[int],
- count: int, old_score: float, new_score: float,
- has_marked: bool):
- # new db session
- db = LocalAsyncSession()
- # update
- db_error = await crud_class_error_statistic.find_one(db,
- filters={
- "task_id": task_id,
- "class_id": class_id,
- "qid": qid
- })
- update_dict = {}
- print("Async Update Class-Error-Statistic Starting!")
- # 如果是错题,则更新错题数量和错题率
- if count:
- error_count = db_error.error_count + count
- error_ratio = round(error_count / db_error.student_count * 100, 2)
- update_dict["error_count"] = error_count
- update_dict["error_ratio"] = error_ratio
- # 更新答案分布
- if not db_error.answer_dist:
- answer_dist = {}
- else:
- answer_dist = json.loads(db_error.answer_dist)
- # 更新分数的人数
- if old_score != 0:
- answer_dist[old_score] -= 1
- if new_score not in answer_dist:
- answer_dist[new_score] = 1
- else:
- answer_dist[new_score] += 1
- if answer_dist:
- update_dict["answer_dist"] = json.dumps(answer_dist)
- # 更新
- if update_dict:
- try:
- item = UpdateClassErrorQuestionItem(**update_dict)
- await crud_class_error_statistic.update(db, db_error, item)
- except Exception as ex:
- print(f"UpdateClassError: {ex}")
- print("Async Update Class-Error-Statistic Successfully!")
- # 批量更新错题统计数据
- async def bgtask_batch_update_class_error_statistic(task_id: int, class_id: int,
- questions: List[Dict[str, Any]]):
- """
- 批量更新班级错题统计
- :param task_id: 阅卷任务ID, int
- :param class_id: 班级ID, int
- :param questions: 错题统计数据,list,形式:[{"qid": 1, "answer": "A"},...]
- """
- # new db session
- db = LocalAsyncSession()
- for q in questions:
- # update
- db_error = await crud_class_error_statistic.find_one(db,
- filters={
- "task_id": task_id,
- "class_id": class_id,
- "qid": q["qid"]
- })
- update_dict = {}
- print("Async Update[batch] Class-Error-Statistic Starting!")
- # 如果是错题,则更新错题数量和错题率
- error_count = db_error.error_count + 1
- error_ratio = round(error_count / db_error.student_count * 100, 2)
- update_dict["error_count"] = error_count
- update_dict["error_ratio"] = error_ratio
- # 更新答案分布
- answer_dist = {}
- if not db_error.answer_dist:
- answer_dist[q["answer"]] = 1
- else:
- answer_dist = json.loads(db_error.answer_dist)
- answer_dist[q["answer"]] = answer_dist.get(q["answer"], 0) + 1
- if answer_dist:
- update_dict["answer_dist"] = json.dumps(answer_dist)
- # 更新
- if update_dict:
- try:
- item = UpdateClassErrorQuestionItem(**update_dict)
- await crud_class_error_statistic.update(db, db_error, item)
- except Exception as ex:
- print(f"UpdateClassError: {ex}")
- print("Async Update[batch] Class-Error-Statistic Successfully!")
- # 创建学生错题推送记录
- async def bgtask_create_student_push_record(task: ModelType, db_class: ModelType,
- total_question: int, push_record_id: int,
- students: List[ModelType], current_user: Teacher):
- db = LocalAsyncSession()
- # 学生推送列表
- push_records = []
- # 查询错题ID
- total, db_errors = await crud_class_error_statistic.find_all(
- db,
- filters=[
- ClassErrorQuestion.task_id == task.id,
- ClassErrorQuestion.class_id == db_class.id,
- ClassErrorQuestion.error_count > 0,
- ],
- return_fields=["qid"])
- push_error_ids = [x.qid for x in db_errors]
- # 为每个学生创建推送记录
- for stu in students:
- obj_in = StudentErrorPushRecordInDB(student_id=stu.id,
- student_sno=stu.sno,
- student_name=stu.name,
- task_id=task.id,
- task_name=task.name,
- task_type=task.mtype,
- question_count=total_question,
- push_record_id=push_record_id,
- push_error_count=total,
- push_error_ids=push_error_ids,
- creator_id=current_user.id,
- creator_name=current_user.name,
- editor_id=current_user.id,
- editor_name=current_user.name)
- push_records.append(obj_in)
- await crud_student_push_record.insert_many(db, push_records)
- # 克隆学生错题推送记录
- async def bgtask_clone_student_push_record(db: AsyncSession, cls_prd: ModelType,
- students: List[ModelType]):
- # 从班级推送记录中查出一条记录
- db_prd = await crud_student_push_record.find_one(db, filters={"push_record_id": cls_prd.id})
- push_records = []
- for stu in students:
- obj_in = StudentErrorPushRecordInDB(student_id=stu.id,
- student_sno=stu.sno,
- student_name=stu.name,
- task_id=db_prd.task_id,
- task_name=db_prd.task_name,
- task_type=db_prd.task_type,
- question_count=db_prd.total_question,
- push_record_id=db_prd.push_record_id,
- push_error_count=db_prd.total,
- push_error_ids=db_prd.push_error_ids,
- creator_id=db_prd.creator_id,
- creator_name=db_prd.creator_name,
- editor_id=db_prd.editor_id,
- editor_name=db_prd.editor_name)
- push_records.append(obj_in)
- await crud_student_push_record.insert_many(db, push_records)
- # 删除学生错题推送记录
- async def bgtask_delete_student_push_record(db: AsyncSession,
- cls_prid: int,
- students: List[int] = None):
- _q = [StudentErrorPushRecord.push_record_id == cls_prid]
- if students:
- _q = [StudentErrorPushRecord.student_id.in_(students)]
- try:
- await crud_student_push_record.delete(db, where_clauses=_q)
- except Exception as ex:
- print(f"[ERROR] 删除学生推送列表失败! 错误信息: {str(ex)}")
|