#!/usr/bin/env python # -*- coding: utf-8 -*- import copy import os from typing import Optional from fastapi import APIRouter, Query, Depends, Path, UploadFile, File from openpyxl import load_workbook from sqlalchemy import or_, text from sqlalchemy.ext.asyncio import AsyncSession from starlette.background import BackgroundTasks from admin.api.endpoints.school.utils import check_row from core.config import settings from core.security import hashed_password from crud.school import crud_school, crud_grade, crud_class from crud.user import crud_student from models.user import SysUser from schemas.base import OrderByField, ReturnField from schemas.school.student import (StudentList, StudentDetail, NewStudent, StudentInDB, UpdateStudent) from utils.depends import get_async_db, get_current_user router = APIRouter() # 学生列表 @router.get("/students", response_model=StudentList, response_model_exclude_none=True, summary="学生列表") async def get_students( page: Optional[int] = None, size: Optional[int] = None, sno: str = Query("", description="学号"), name: str = Query("", description="学生姓名"), tid: int = Query(0, description="学生ID"), sid: int = Query(0, description="学校ID"), gid: int = Query(0, description="年级ID"), cid: int = Query(0, description="班级ID"), order: OrderByField = Query("-created_at", description="排序字段,用逗号分隔,升降序以-判断,默认-created_at"), res: ReturnField = Query("", description="返回字段,取值: id,name,sno,phone"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): # 过滤条件 filters = {} if sno: filters["sno"] = sno if name: filters["name"] = name if tid: filters["id"] = tid if sid: filters["school_id"] = sid if gid: filters["grade_id"] = gid if cid: filters["class_id"] = cid if ((page is not None) and page >= 1) and ((size is not None) and size >= 1): offset = (page - 1) * size else: offset = size = None if isinstance(order, str): order = [text(order)] total, items = await crud_student.find_all(db, filters=filters, offset=offset, limit=size, order_by=order, return_fields=res) return {"total": total, "data": items} # 创建学生 @router.post("/students", response_model=StudentDetail, response_model_exclude_none=True, summary="创建学生") async def create_student(info: NewStudent, db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): # 判断学校是否存在 db_school = await crud_school.find_one(db, filters={"id": info.school_id}) if not db_school: return {"errcode": 404, "mess": "学校不存在!"} # 判断年级是否存在 db_grade = await crud_grade.find_one(db, filters={"id": info.grade_id}) if not db_grade: return {"errcode": 404, "mess": "年级不存在!"} # 判断班级是否存在 db_class = await crud_class.find_one(db, filters={"id": info.class_id}) if not db_class: return {"errcode": 404, "mess": "班级不存在!"} # 判断是否存在同年级同学号 existed = await crud_student.count( db, filters=[ or_(text(f"school_id = {info.school_id} AND sno = '{info.sno}'"), text(f"phone = '{info.phone}'")) ]) if existed: return {"errcode": 400, "mess": "学号或手机号重复!"} # 开始创建 obj_in = StudentInDB(**info.dict(by_alias=True), school_name=db_school.name, grade_name=db_grade.name, class_name=db_class.name, username=info.phone, password=hashed_password(info.phone[-6:]), creator_id=current_user.id, creator_name=current_user.username, editor_id=current_user.id, editor_name=current_user.username) db_obj = await crud_student.insert_one(db, obj_in) return {"data": db_obj} @router.get("/students/{sid}", response_model=StudentDetail, response_model_exclude_none=True, summary="学生详情") async def get_student(sid: int = Path(..., description="学生ID"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): db_obj = await crud_student.find_one(db, filters={"id": sid}) return {"data": db_obj} # 更新学生 @router.put("/students/{sid}", response_model=StudentDetail, response_model_exclude_none=True, summary="更新学生") async def update_student(info: UpdateStudent, sid: int = Path(..., description="学生ID"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): # 判断提交参数是否为空 info_dict = info.dict(exclude_none=True) if not info_dict: return {"errcode": 400, "mess": "提交参数为空!"} # 判断学生是否存在 db_obj = await crud_student.find_one(db, filters={"id": sid}) if not db_obj: return {"errcode": 404, "mess": "学生不存在!"} # 判断学校是否存在 if ("school_id" in info_dict) and (db_obj.school_id != info_dict["school_id"]): db_school = await crud_school.find_one(db, filters={"id": info.school_id}) if not db_school: return {"errcode": 404, "mess": "学校不存在!"} else: info.school_name = db_school.name # 判断年级是否存在 if ("grade_id" in info_dict) and (db_obj.grade_id != info_dict["grade_id"]): db_grade = await crud_grade.find_one(db, filters={"id": info.grade_id}) if not db_grade: return {"errcode": 404, "mess": "年级不存在!"} else: info.grade_name = db_grade.name # 判断班级是否存在 if ("class_id" in info_dict) and (db_obj.class_id != info_dict["class_id"]): db_class = await crud_class.find_one(db, filters={"id": info.class_id}) if not db_class: return {"errcode": 404, "mess": "班级不存在!"} else: info.class_name = db_class.name # 判断手机号是否重复 if ("phone" in info_dict) and (db_obj.phone != info_dict["phone"]): db_student = await crud_student.find_one( db, filters=[text(f"id != {sid} AND phone = '{info.phone}'")]) if db_student: return {"errcode": 400, "mess": "手机号重复!"} info.username = info.phone[-6:] # 更新 info.editor_id = current_user.id info.editor_name = current_user.username db_obj = await crud_student.update(db, db_obj, info) return {"data": db_obj} # 删除学生 @router.delete("/students/{sid}", response_model=StudentDetail, response_model_exclude_none=True, summary="删除学生") async def delete_student(bg_task: BackgroundTasks, sid: int = Path(..., description="学生ID"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): existed = await crud_student.count(db, {"id": sid}) if not existed: return {"errcode": 404, "mess": "学生不存在!"} else: await crud_student.delete(db, obj_id=sid) # TODO: 删除关联数据 # bg_task.add_task(delete_related_object, db, cid=id) return {"data": None} # 批量导入学生 @router.post("/students/bulk", response_model=StudentDetail, response_model_exclude_none=True, summary="批量导入学生") async def import_student(datafile: UploadFile = File(..., description="数据文件"), db: AsyncSession = Depends(get_async_db), current_user: SysUser = Depends(get_current_user)): # 判断文件格式 if not datafile.filename.endswith(".xlsx"): return {"errcode": 400, "mess": "文件格式错误!"} # 把文件写入磁盘,再加载回来 disk_file = os.path.join(settings.UPLOADER_PATH, datafile.filename) content = await datafile.read() with open(disk_file, "wb") as f: f.write(content) # 返回结果 errors = [] success = 0 students = [] counter = 0 # 使用openpyxl读取文件 wb = load_workbook(disk_file) ws = wb.worksheets[0] for row in ws.iter_rows(min_row=2, max_col=ws.max_column, max_row=ws.max_row, values_only=True): row = await check_row(row, 9) if row is None: # 空行 continue elif not row: # 字段不完整 errors.append(f"第{row[0]}行: 某些单元格为空!") continue # 判断学校是否存在 db_school = await crud_school.find_one(db, filters={"name": row[6]}) if not db_school: errors.append(f"第{row[0]}行: 学校不存在!") continue # 判断年级是否存在 db_grade = await crud_grade.find_one(db, filters={ "school_id": db_school.id, "name": row[7] }) if not db_grade: errors.append(f"第{row[0]}行: 年级不存在!") continue # 判断班级是否存在 db_class = await crud_class.find_one(db, filters={ "school_id": db_school.id, "grade_id": db_grade.id, "name": row[8] }) if not db_class: errors.append(f"第{row[0]}行: 班级不存在!") continue # 判断是否存在同名学生 db_student = await crud_student.find_one( db, filters=[ or_(text(f"school_id = {db_school.id} AND sno = '{row[1]}'"), text(f"phone = '{row[5]}'")) ]) if db_student: errors.append(f"第{row[0]}行: 学号或手机号重复!") continue # 创建教师对象 obj_in = StudentInDB(sno=row[1], username=row[5], password=hashed_password(row[5][-6:]), name=row[2], sex=1 if row[3] == "男" else 0, age=row[4], phone=row[5], sid=db_school.id, school_name=row[6], gid=db_grade.id, grade_name=row[7], cid=db_class.id, class_name=row[8], creator_id=current_user.id, creator_name=current_user.username, editor_id=current_user.id, editor_name=current_user.username) students.append(obj_in) success += 1 counter += 1 if counter == 50: await crud_student.insert_many(db, copy.deepcopy(students)) students.clear() counter = 0 if counter: await crud_student.insert_many(db, students) # 删除上传文件 os.remove(disk_file) return {"data": {"success": success, "fail": len(errors), "errors": errors}}