#!/usr/bin/env python # -*- coding: utf-8 -*- from collections import OrderedDict import openpyxl from fastapi import Query, Depends, Path from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from starlette.background import BackgroundTasks from starlette.responses import FileResponse from bgtask.tasks import update_mark_task from core.config import settings from crud.mark import crud_mark, crud_answer, crud_studentmark from crud.paper import crud_question from models.mark import StudentAnswer from models.paper import WorkPaperQuestion from models.user import Teacher from schemas.app.task import UpdateMarkTaskQuestion from schemas.base import OrderByField from utils.depends import get_async_db, get_current_user # 阅卷任务列表 async def get_mark_tasks( page: int = 1, size: int = 10, cid: int = Query(0, description="班级ID"), mtype: str = Query(..., description="阅卷任务类型,work/exam"), ctgid: int = Query(0, description="分类ID"), year: int = Query(0, description="年份"), status: int = Query(None, description="状态"), name: str = Query(None, description="任务名称"), order: OrderByField = Query("-created_at", description="排序字段,用逗号分隔,升降序以-判断,默认-created_at"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): filters = [text(f"mtype = '{mtype}'")] if ctgid: filters.append(text(f"category_id = {ctgid}")) if year: filters.append(text(f"year = {year}")) if cid: filters.append(text(f"class_id = {cid}")) if name: filters.append(text(f"name LIKE '%{name}%'")) if status: filters.append(text(f"status = {status}")) offset = (page - 1) * size if isinstance(order, str): order = [text(order)] total, marks = await crud_mark.find_all(db, filters=filters, limit=size, offset=offset, order_by=order) for item in marks: if item.marked_amount: item.pass_rate = round(item.pass_amount / item.marked_amount, 2) return {"data": marks, "total": total} # 阅卷任务详情 async def get_mark_task(tid: int = Path(..., description="批阅任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 判断task是否存在 db_obj = await crud_mark.find_one(db, filters={"id": tid}) if not db_obj: return {"errcode": 400, "mess": "阅卷任务不存在!"} # 按成绩分段统计 stmt = f"""SELECT sum( CASE WHEN score < 50 THEN 1 ELSE 0 END ) AS '0-50', sum( CASE WHEN score >= 50 AND score < 60 THEN 1 ELSE 0 END ) AS '50-60', sum( CASE WHEN score >= 60 AND score < 70 THEN 1 ELSE 0 END ) AS '60-70', sum( CASE WHEN score >= 70 AND score < 80 THEN 1 ELSE 0 END ) AS '70-80', sum( CASE WHEN score >= 80 AND score < 90 THEN 1 ELSE 0 END ) AS '80-90', sum( CASE WHEN score >= 90 THEN 1 ELSE 0 END ) AS '90-100' FROM student_marktask WHERE task_id = {tid}; """ cursor = await db.execute(stmt) scores = [x if x else 0 for x in cursor.fetchall()[0]] score_table = [{ "key": "0-50", "val": scores[0] }, { "key": "50-70", "val": scores[1] + scores[2] }, { "key": "70-100", "val": sum(scores[3:]) }] score_gaps = ["0-50", "50-60", "60-70", "70-80", "80-90", "90-100"] score_chart = [{"key": x[0], "val": x[1]} for x in zip(score_gaps, scores)] return { "data": db_obj, "score_table": score_table, "score_chart": score_chart } # 阅卷任务试题列表 async def get_task_questions(page: int = 1, size: int = 10, tid: int = Path(..., description="阅卷任务ID"), stid: int = Query(None, description="学生ID"), qno: int = Query(None, description="题号"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 按学生批阅,数据流:查询StudentMarkTask获取ID,使用ID去查询改学生的所有试题 if stid: student_task = await crud_studentmark.find_one(db, filters={ "task_id": tid, "student_id": stid }, return_fields=["id"]) filters = {"student_task_id": student_task.id} elif qno: # 按试题批阅,数据流:通过 task_id 和 qno 查询试题 filters = {"task_id": tid, "qno": qno} else: filters = {"task_id": tid} # 查询试题列表 offset = (page - 1) * size total, items = await crud_answer.find_all(db, filters=filters, offset=offset, limit=size) for item in items: if item.marked_img: item.qimg = item.marked_img return {"data": items, "total": total} # 批阅 async def mark_question(info: UpdateMarkTaskQuestion, bg_task: BackgroundTasks, qid: int = Path(..., description="试题ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 判断试题是否存在 filters = {"id": qid} db_obj = await crud_answer.find_one(db, filters=filters) if not db_obj: return {"errcode": 400, "mess": "试题不存在!"} # 更新流程:更新StudentAnswer -> 根据StudentAnswer.task_id更新StudentMarkTask -> 根据StudentMarkTask.task_id更新MarkTask db_obj = await crud_answer.update(db, db_obj, info) bg_task.add_task(update_mark_task, db, bg_task, task_id=db_obj.student_task_id, qtype=db_obj.qtype, score=db_obj.marked_score, editor_id=current_user.id, editor_name=current_user.username) return {"data": None} # 作业中心 - 学生答题统计 async def get_student_tasks(page: int = 1, size: int = 10, tid: int = Query(..., description="阅卷任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 查询task列表 offset = (page - 1) * size total, db_tasks = await crud_studentmark.find_all(db, filters={"task_id": tid}, offset=offset, limit=size) # 查询每个学生的错题 for item in db_tasks: _, db_questions = await crud_answer.find_all( db, filters=[ text(f"student_task_id = {item.id} AND marked_score != score") ], return_fields=["id", "qno", "sqno"]) questions = [f"{x.id},{x.qno},{x.sqno}" for x in db_questions] item.wrong_questions = questions return {"data": db_tasks, "total": total} # 作业中心 - 学生阅卷任务详情 async def get_student_task(tid: int = Path(..., description="学生阅卷任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): db_task = await crud_studentmark.find_one(db, filters={"id": tid}, return_fields=[ "id", "student_id", "student_name", "student_sno", "score", "objective_score", "subjective_score", "question_amount" ]) return {"data": db_task} # 作业中心 - 学生答题列表 async def get_student_answers( page: int = 1, size: int = 10, tid: int = Path(..., description="学生阅卷任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 查询学生阅卷任务 student_task = await crud_studentmark.find_one( db, filters={"id": tid}, return_fields=["task_id", "question_amount"]) offset = (page - 1) * size # 查询学生答题列表 stmt = f"""SELECT sa.qno, sa.sqno, wsq.answer, wsq.analysis, sa.marked_img FROM {StudentAnswer.__tablename__} AS sa INNER JOIN {WorkPaperQuestion.__tablename__} AS wsq ON sa.pid = wsq.pid AND sa.qno = wsq.qno WHERE sa.student_task_id = {tid} ORDER BY sa.qno, sa.sqno LIMIT {offset},{size};""" answers = [{ "qno": x[0], "sqno": x[1], "answer": x[2], "analysis": x[3], "marked_img": x[4] } for x in await db.execute(stmt)] # 统计每道题的答案分布 for item in answers: stmt = f"""SELECT answer, count(id) AS amount FROM {StudentAnswer.__tablename__} WHERE task_id={student_task.task_id} AND qno={item["qno"]} AND sqno={item["sqno"]} GROUP BY qno, sqno, answer;""" item["dist"] = [{ "key": x[0], "val": int(x[1]) } for x in await db.execute(stmt)] return {"data": answers, "total": student_task.question_amount} # 作业中心 - 学生阅卷任务列表下载 async def download_student_task( tid: int = Query(..., description="阅卷任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 查询task列表 _, db_tasks = await crud_studentmark.find_all(db, filters={"task_id": tid}) # 查询每个学生的错题 for item in db_tasks: _, db_questions = await crud_answer.find_all( db, filters=[ text(f"student_task_id = {item.id} AND marked_score != score") ], return_fields=["qno", "sqno"]) item.questions = ",".join([f"第{x.qno}-{x.sqno}题" for x in db_questions]) # 写入excel表 wb = openpyxl.Workbook() sh = wb.active title = ["姓名", "学号", "得分", "客观题", "主观题", "名次", "失分题"] for ridx, ritem in enumerate([title, db_tasks]): for cidx, citem in enumerate(ritem): if ridx == 0: sh.cell(row=ridx + 1, column=cidx + 1, value=citem) else: values = [ citem.student_name, citem.student_sno, citem.score, citem.objective_score, citem.subjective_score, citem.rank, citem.questions ] sh.append(values) outfile = f"{settings.UPLOADER_PATH}/{tid}.xlsx" wb.save(outfile) return FileResponse(outfile, filename=f"{tid}.xlsx") async def task_mark_process(tid: int = Path(..., description="任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 查询MarkTask db_task = await crud_mark.find_one(db, filters={"id": tid}, return_fields=["uploaded_amount", "pid"]) if not db_task: return {"errcode": 400, "mess": "阅卷任务不存在!"} # 根据试卷ID查询试卷题目列表 _, db_question = await crud_question.find_all( db, filters={"pid": db_task["pid"]}, return_fields=["qno"]) questions = OrderedDict() for q in db_question: questions[q[0]] = { "qno": q[0], "question_amount": db_task.uploaded_amount, "marked_amount": 0 } # 按题统计已批阅试题数量 stmt = f"""SELECT qno, count( id ) AS marked_amount FROM student_answer WHERE task_id = {tid} AND marked_img != "" GROUP BY qno;""" for x in await db.execute(stmt): questions[x[0]]["marked_amount"] += x[1] return {"data": list(questions.values())} # 作业中心 - 阅卷任务详情 - 答题分析 async def get_task_answers(tid: int = Path(..., description="阅卷任务ID"), db: AsyncSession = Depends(get_async_db), current_user: Teacher = Depends(get_current_user)): # 查询阅卷任务 task = await crud_mark.find_one(db, filters={"id": tid}) if not task: return {"errcode": 404, "mess": "阅卷任务不存在!"} # 查询试题列表 total, questions = await crud_question.find_all(db, filters={"pid": task.pid}) # 统计每道题的答案分布 data = [] for item in questions: temp = { "qno": item.qno, "sqno": item.sqno, "img": item.imgs, "answer": item.answer, "analysis": item.analysis } stmt = f"""SELECT answer, count(id) AS amount FROM {StudentAnswer.__tablename__} WHERE task_id={tid} AND qno={item.qno} AND sqno={item.sqno} GROUP BY qno, sqno, answer;""" temp["dist"] = [{ "key": x[0], "val": int(x[1]) } for x in await db.execute(stmt)] data.append(temp) return {"data": data, "total": total}