#!/usr/bin/env python # -*- coding: utf-8 -*- import copy import os from collections import defaultdict from typing import Optional, Union from fastapi import APIRouter, Query, Depends, Path, File, UploadFile from openpyxl import load_workbook from sqlalchemy import desc, asc from sqlalchemy.ext.asyncio import AsyncSession from starlette.background import BackgroundTasks from admin.api.endpoints.school.utils import check_row, check_filetype from bgtask.tasks import bgtask_update_class_teacher_student from common.const import SUBJECTS from core.config import settings from core.security import hashed_password from crud.school import crud_class, crud_school, crud_grade from crud.sysdata.role import crud_role from crud.user import crud_teacher from models.school import SchoolClass from models.user import Admin, Teacher from schemas.base import ReturnField from schemas.school.teacher import TeacherList, TeacherDetail, NewTeacher, TeacherInDB, UpdateTeacher from utils.depends import get_async_db, get_current_user router = APIRouter() # 教师列表 @router.get("/teachers", response_model=TeacherList, response_model_exclude_none=True, summary="教师列表") async def get_teachers(page: Optional[int] = None, size: Optional[int] = None, name: str = Query("", description="教师姓名"), tid: int = Query(0, description="教师ID"), sid: int = Query(0, description="学校ID"), gid: int = Query(0, description="年级ID"), cid: str = Query(0, description="班级ID"), rid: int = Query(0, description="职务(角色ID)"), subject: str = Query("", description="任教科目"), order: str = Query("-id", description="排序字段,用逗号分隔,升降序以-判断,默认-created_at"), res: ReturnField = Query("", description="返回字段,默认列表展示字段, 自定义id,name,phone"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 过滤条件 _q = [] if name: _q.append(Teacher.name.like(f"{name}%")) if tid: _q.append(Teacher.id == tid) if sid: _q.append(Teacher.school_id == sid) if gid: _q.append(Teacher.grade_id == gid) if cid: _q.append(Teacher.class_id == cid) if rid: _q.append(Teacher.role_id == rid) if subject: _q.append(Teacher.subject == subject) if ((page is not None) and page >= 1) and ((size is not None) and size >= 1): offset = (page - 1) * size else: offset = size = None # 排序字段 order_fields = [] if order: for x in order.split(","): field = x.strip() if field: if field.startswith("-"): order_fields.append(desc(getattr(Teacher, field[1:]))) else: order_fields.append(asc(getattr(Teacher, field))) total, items = await crud_teacher.find_all(db, filters=_q, offset=offset, limit=size, order_by=order_fields, return_fields=res) for x in items: if not x.age: x.age = "" return {"total": total, "data": items} # 创建教师 @router.post("/teachers", response_model=TeacherDetail, response_model_exclude_none=True, summary="创建教师") async def create_teacher(info: NewTeacher, bgtask: BackgroundTasks, db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 判断学科是否正确 if info.subject not in SUBJECTS: return {"errcode": 400, "mess": "教学科目错误!"} # 判断学校是否存在 db_school = await crud_school.find_one(db, filters={"id": info.school_id}, return_fields=["name", "category"]) if not db_school: return {"errcode": 404, "mess": "学校不存在!"} # 判断年级是否存在 db_grade = await crud_grade.find_one(db, filters={"id": info.grade_id}, return_fields=["name"]) if not db_grade: return {"errcode": 404, "mess": "年级不存在!"} # 判断角色是否存在 db_role = await crud_role.find_one(db, filters={"id": info.role_id}, return_fields=["name"]) if not db_role: return {"errcode": 404, "mess": "角色不存在!"} # 判断是否存在同年级同名教师 db_teacher = await crud_teacher.count(db, filters={"phone": info.phone}) if db_teacher: return {"errcode": 400, "mess": "手机号重复!"} # 判断班级是否存在 class_ids = [x.strip() for x in info.class_id.split(",") if x.strip()] total, db_class = await crud_class.find_all(db, filters=[SchoolClass.id.in_(class_ids)], return_fields=["name"]) if total != len(class_ids): return {"errcode": 400, "mess": "班级不存在!"} # 开始创建 obj_in = TeacherInDB(**info.dict(by_alias=True), period=db_school.category, school_name=db_school.name, grade_name=db_grade.name, class_name=",".join([x.name for x in db_class]), role_name=db_role.name, username=info.phone, password=hashed_password(info.phone[-6:]), creator_id=current_user.id, creator_name=current_user.username, editor_id=current_user.id, editor_name=current_user.username) db_obj = await crud_teacher.insert_one(db, obj_in) # 异步更新班级的教师数量 for cid in class_ids: bgtask.add_task(bgtask_update_class_teacher_student, cid, "add", 1) return {"data": db_obj} @router.get("/teachers/{tid}", response_model=TeacherDetail, response_model_exclude_none=True, summary="教师详情") async def get_teacher(tid: int = Path(..., description="教师ID"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): db_obj = await crud_teacher.find_one(db, filters={"id": tid}) return {"data": db_obj} # 更新教师 @router.put("/teachers/{tid}", response_model=TeacherDetail, response_model_exclude_none=True, summary="更新教师") async def update_teacher(info: UpdateTeacher, bgtask: BackgroundTasks, tid: int = Path(..., description="教师ID"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 判断提交参数是否为空 info_dict = info.dict(exclude_none=True) if not info_dict: return {"errcode": 400, "mess": "提交参数为空!"} # 判断教师是否存在 db_obj = await crud_teacher.find_one(db, filters={"id": tid}) if not db_obj: return {"errcode": 404, "mess": "教师不存在!"} # 判断手机号是否重复 if ("phone" in info_dict) and (db_obj.phone != info_dict["phone"]): db_teacher = await crud_teacher.count( db, filters=[Teacher.id != tid, Teacher.phone == info.phone]) if db_teacher: return {"errcode": 400, "mess": "手机号重复!"} info.username = info.phone[-6:] # 判断学校是否存在 if ("school_id" in info_dict) and (db_obj.school_id != info_dict["school_id"]): db_school = await crud_school.find_one(db, filters={"id": info.school_id}, return_fields=["name", "category"]) if not db_school: return {"errcode": 404, "mess": "学校不存在!"} else: info.school_name = db_school.name info.period = db_school.category # 判断年级是否存在 if ("grade_id" in info_dict) and (db_obj.grade_id != info_dict["grade_id"]): db_grade = await crud_grade.find_one(db, filters={"id": info.grade_id}, return_fields=["name"]) if not db_grade: return {"errcode": 404, "mess": "年级不存在!"} else: info.grade_name = db_grade.name # 判断班级是否存在 if ("class_id" in info_dict) and (db_obj.class_id != info_dict["class_id"]): original_class_set = set(db_obj.class_id.split(",")) new_class_set = set([x.strip() for x in info.class_id.split(",") if x.strip()]) total, db_class = await crud_class.find_all(db, filters=[SchoolClass.id.in_(new_class_set)], return_fields=["name"]) if total != len(new_class_set): return {"errcode": 400, "mess": "班级不存在!"} else: info.class_name = ",".join([x.name for x in db_class]) # 待增加教师的班级 add_diff_set = new_class_set - original_class_set for cid in add_diff_set: bgtask.add_task(bgtask_update_class_teacher_student, cid, "add", 1, 0) # 待减少教师的班级 del_diff_set = original_class_set - new_class_set for cid in del_diff_set: bgtask.add_task(bgtask_update_class_teacher_student, cid, "del", 1, 0) # 判断角色是否存在 if ("role_id" in info_dict) and (db_obj.role_id != info_dict["role_id"]): db_role = await crud_role.find_one(db, filters={"id": info.role_id}, return_fields=["name"]) if not db_role: return {"errcode": 404, "mess": "角色不存在!"} else: info.role_name = db_role.name # 判断学科是否正确 if ("subject" in info_dict) and (db_obj.subject != info_dict["subject"]): if info_dict["subject"] not in SUBJECTS: return {"errcode": 400, "mess": "教学科目错误!"} # 更新 info.editor_id = current_user.id info.editor_name = current_user.username db_obj = await crud_teacher.update(db, db_obj, info) return {"data": db_obj} # 删除教师 @router.delete("/teachers/{tid}", response_model=TeacherDetail, response_model_exclude_none=True, summary="删除教师") async def delete_teacher(bgtask: BackgroundTasks, tid: Union[int, str] = Path(..., description="教师ID,批量删除传用逗号分隔ID(str),单个删除传ID(int)"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 查询所有教师 if isinstance(tid, int): deleted_count = 1 _q = [Teacher.id == tid] else: tids = [int(x.strip()) for x in tid.split(',') if x.strip()] deleted_count = len(tids) _q = [Teacher.id.in_(tids)] total, db_objs = await crud_teacher.find_all(db, filters=_q) # 删除 for item in db_objs: await crud_teacher.delete(db, obj_id=item.id) # 异步更新班级的教师数量 for cid in item.class_id.split(","): bgtask.add_task(bgtask_update_class_teacher_student, cid, "del", 1) return { "data": total, "mess": "success" if total == deleted_count else f"成功:{total},失败: {deleted_count - total}" } # 批量导入教师 @router.post("/teachers/bulk", response_model=TeacherDetail, response_model_exclude_none=True, summary="批量导入教师") async def import_teacher(bgtask: BackgroundTasks, datafile: UploadFile = File(..., description="数据文件"), db: AsyncSession = Depends(get_async_db), current_user: Admin = Depends(get_current_user)): # 判断文件格式 if not check_filetype(datafile.filename, ".xlsx"): return {"errcode": 400, "mess": "文件格式错误!"} # 把文件写入磁盘,再加载回来 disk_file = os.path.join(settings.UPLOADER_PATH, datafile.filename) content = await datafile.read() with open(disk_file, "wb") as f: f.write(content) # 返回结果 phones = [] errors = [] success = 0 teachers = [] class_teachers = defaultdict(int) schools = {} grades = {} classes = {} roles = {} # 使用openpyxl读取文件 wb = load_workbook(disk_file) ws = wb.worksheets[0] counter = 1 for row in ws.iter_rows(min_row=2, max_col=ws.max_column, max_row=ws.max_row, values_only=True): row = await check_row(row, 10) if row is None: # 空行 continue elif not row: # 字段不完整 errors.append(f"第{counter}行: 某些单元格为空!") continue # 判断学校是否存在 if row[5] not in schools: db_school = await crud_school.find_one(db, filters={"name": row[5]}, return_fields=["id", "category"]) if not db_school: errors.append(f"第{counter}行: 学校不存在!") continue schools[row[5]] = {"id": db_school.id, "category": db_school.category} # 判断年级是否存在 grade_key = f"{row[5]}-{row[6]}" if grade_key not in grades: db_grade = await crud_grade.find_one(db, filters={ "school_id": schools[row[5]]["id"], "name": row[6] }, return_fields=["id"]) if not db_grade: errors.append(f"第{counter}行: 年级不存在!") continue grades[grade_key] = db_grade.id # 判断角色是否存在 if row[8] not in roles: db_role = await crud_role.find_one(db, filters={"name": row[8]}, return_fields=["id"]) if not db_role: errors.append(f"第{counter}行: 角色不存在!") continue else: roles[row[8]] = db_role.id # 判断手机号是否重复 if row[4] in phones: # 本次导入文件内是否存在重复手机号 errors.append(f"第{counter}行: 手机号重复!") continue else: phones.append(row[4]) # 判断数据库中是否存在重复手机号 existed = await crud_teacher.count(db, filters={"phone": row[4]}) if existed: errors.append(f"第{counter}行: 手机号重复!") continue # 判断学科是否正确 if row[9] not in SUBJECTS: errors.append(f"第{counter}行: 教学科目错误!") continue # 判断班级是否存在 class_names = [] class_ids = [] for x in row[7].split(";"): name = x.strip() if name: class_key = f"{row[5]}-{row[6]}-{name}" if class_key not in classes: class_names.append(name) else: class_ids.append(classes[class_key]) class_teachers[classes[class_key]] += 1 if class_names: total, db_classes = await crud_class.find_all( db, filters=[ SchoolClass.school_id == schools[row[5]]["id"], SchoolClass.grade_id == grades[grade_key], SchoolClass.name.in_(class_names) ], return_fields=["id", "name"]) if total != len(class_names): errors.append(f"第{counter}行: 班级不存在!") continue else: for x in db_classes: key = f"{row[5]}-{row[6]}-{x.name}" classes[key] = x.id class_ids.append(str(x.id)) class_teachers[x.id] += 1 # 创建教师对象 phone = str(row[4]) obj_in = TeacherInDB(username=phone, password=hashed_password(phone[-6:]), name=row[1] if row[1] else "", sex=1 if row[2] == "男" else 0, age=row[3] or 0, phone=phone, sid=schools[row[5]]["id"], school_name=row[5], period=schools[row[5]]["category"], gid=grades[f"{row[5]}-{row[6]}"], grade_name=row[6], cid=",".join(class_ids), class_name=row[7], rid=roles[row[8]], role_name=row[8], subject=row[9], creator_id=current_user.id, creator_name=current_user.username, editor_id=current_user.id, editor_name=current_user.username) teachers.append(obj_in) success += 1 counter += 1 if counter == 50: try: await crud_teacher.insert_many(db, copy.deepcopy(teachers)) except Exception as ex: print(f"[teacher] INSERT-ERROR: {str(ex)}, teachers: {teachers}") return {"errcode": 1, "mess": "入库失败,请联系技术人员解决!"} else: teachers.clear() counter = 0 if counter: try: await crud_teacher.insert_many(db, teachers) except Exception as ex: print(f"[teacher] INSERT-ERROR: {str(ex)}, teachers: {teachers}") return {"errcode": 1, "mess": "入库失败,请联系技术人员解决!"} # 删除上传文件 os.remove(disk_file) # 异步更新班级的教师数量 for k, v in class_teachers.items(): bgtask.add_task(bgtask_update_class_teacher_student, int(k), "add", v) return {"data": {"success": success, "fail": len(errors), "errors": errors}}