From 7d95a8cdfe350a6a9aabc93d9a1c6b3240eed538 Mon Sep 17 00:00:00 2001
From: Martin Mares <mj@ucw.cz>
Date: Thu, 18 Feb 2021 23:43:55 +0100
Subject: [PATCH] =?UTF-8?q?Import:=20Bodov=C3=A1n=C3=AD?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
mo/imports.py | 157 +++++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 155 insertions(+), 2 deletions(-)
diff --git a/mo/imports.py b/mo/imports.py
index 74a05cca..0648efb3 100644
--- a/mo/imports.py
+++ b/mo/imports.py
@@ -2,6 +2,8 @@ from dataclasses import dataclass
from enum import auto
import io
import re
+from sqlalchemy import and_
+from sqlalchemy.orm import joinedload, Query
from typing import List, Optional, Any, Dict, Type
import mo.csv
@@ -17,6 +19,7 @@ class ImportType(db.MOEnum):
participants = auto()
proctors = auto()
judges = auto()
+ points = auto()
def friendly_name(self) -> str:
return import_type_names[self]
@@ -26,6 +29,7 @@ import_type_names = {
ImportType.participants.name: 'účastníci',
ImportType.proctors.name: 'dozor',
ImportType.judges.name: 'opravovatelé',
+ ImportType.points.name: 'body',
}
@@ -37,6 +41,7 @@ class Import:
cnt_new_participants: int = 0
cnt_new_participations: int = 0
cnt_new_roles: int = 0
+ cnt_set_points: int = 0
# Veřejné vlastnosti importu
template_basename: str = "sablona"
@@ -45,6 +50,7 @@ class Import:
user: db.User
round: Optional[db.Round]
contest: Optional[db.Contest]
+ task: Optional[db.Task]
fmt: FileFormat
row_class: Type[mo.csv.Row]
row_example: mo.csv.Row
@@ -76,6 +82,15 @@ class Import:
logger.info('Import: >> %s', msg)
return None # Kdyby bylo otypováno správně jako -> None, při volání by si mypy stěžoval
+ def parse_user_id(self, user_id_str: str) -> Optional[int]:
+ if user_id_str == "":
+ return self.error('Chybí ID uživatele')
+
+ try:
+ return int(user_id_str)
+ except ValueError:
+ return self.error('ID uživatele není číslo')
+
def parse_email(self, email: str) -> Optional[str]:
if email == "":
return self.error('Chybí e-mailová adresa')
@@ -187,6 +202,20 @@ class Import:
self.new_user_ids.append(user.user_id)
return user
+ def parse_points(self, points_str: str) -> Optional[int]:
+ if points_str == "":
+ return None
+
+ try:
+ pts = int(points_str)
+ except ValueError:
+ return self.error('Body nejsou celé číslo')
+
+ if pts < 0:
+ return self.error('Body nesmí být záporné')
+
+ return pts
+
def find_or_create_participant(self, user: db.User, year: int, school_id: int, birth_year: int, grade: str) -> Optional[db.Participant]:
sess = db.get_session()
part = sess.query(db.Participant).get((user.user_id, year))
@@ -287,11 +316,24 @@ class Import:
args.append(f'round=#{self.round.round_id}')
if self.contest is not None:
args.append(f'contest=#{self.contest.contest_id}')
+ if self.task is not None:
+ args.append(f'task=#{self.task.task_id}')
logger.info('Import: %s ze souboru %s: %s', self.log_msg_prefix, path, " ".join(args))
def log_end(self):
- logger.info(f'Import: Hotovo (rows={self.cnt_rows} users={self.cnt_new_users} p-ants={self.cnt_new_participants} p-ions={self.cnt_new_participations} roles={self.cnt_new_roles})')
+ args = [f'rows=#{self.cnt_rows}']
+ for key, val in [
+ ('users', self.cnt_new_users),
+ ('p-ants', self.cnt_new_participants),
+ ('p-ions', self.cnt_new_participations),
+ ('roles', self.cnt_new_roles),
+ ('points', self.cnt_set_points),
+ ]:
+ if val > 0:
+ args.append(f'{key}={val}')
+ logger.info('Import: Hotovo (%s)', " ".join(args))
+
if self.contest is not None:
mo.util.log(
type=db.LogType.contest,
@@ -362,6 +404,7 @@ class Import:
sess.rollback()
def get_template(self) -> str:
+ # Odvozené třídy mohou přetížit
out = io.StringIO()
mo.csv.write(file=out, fmt=self.fmt, row_class=self.row_class, rows=[self.row_example])
return out.getvalue()
@@ -541,7 +584,114 @@ class JudgeImport(Import):
self.add_role(user, place, db.RoleType.opravovatel)
-def create_import(user: db.User, type: ImportType, fmt: FileFormat, round: Optional[db.Round] = None, contest: Optional[db.Contest] = None):
+@dataclass
+class PointsImportRow(mo.csv.Row):
+ user_id: str = ""
+ krestni: str = ""
+ prijmeni: str = ""
+ body: str = ""
+
+
+class PointsImport(Import):
+ row_class = PointsImportRow
+ log_msg_prefix = 'Body'
+
+ def setup(self):
+ assert self.round is not None
+ assert self.task is not None
+ self.log_details = {'action': 'import-points', 'task': self.task.code}
+ self.template_basename = 'body-' + self.task.code
+
+ def _pion_sol_query(self) -> Query:
+ sess = db.get_session()
+ query = (sess.query(db.Participation, db.Solution)
+ .select_from(db.Participation)
+ .outerjoin(db.Solution, and_(db.Solution.user_id == db.Participation.user_id, db.Solution.task == self.task))
+ .options(joinedload(db.Participation.user)))
+
+ if self.contest is not None:
+ query = query.filter(db.Participation.contest == self.contest)
+ else:
+ contest_query = sess.query(db.Contest.contest_id).filter_by(round=self.round)
+ query = query.filter(db.Participation.contest_id.in_(contest_query.subquery()))
+
+ return query
+
+ def import_row(self, r: mo.csv.Row):
+ assert isinstance(r, PointsImportRow)
+ num_prev_errs = len(self.errors)
+ user_id = self.parse_user_id(r.user_id)
+ krestni = self.parse_name(r.krestni)
+ prijmeni = self.parse_name(r.prijmeni)
+ body = self.parse_points(r.body)
+
+ if (len(self.errors) > num_prev_errs
+ or user_id is None
+ or krestni is None
+ or prijmeni is None):
+ return
+
+ assert self.round is not None
+ assert self.task is not None
+
+ sess = db.get_session()
+ query = self._pion_sol_query().filter(db.Solution.user_id == user_id)
+ pion_sols = query.all()
+ if not pion_sols:
+ return self.error('Soutěžící nenalezen v tomto kole')
+ elif len(pion_sols) > 1:
+ return self.error('Soutěžící v tomto kole soutěží vícekrát, neumím zpracovat')
+ pion, sol = pion_sols[0]
+
+ if self.contest is not None:
+ if pion.contest != self.contest:
+ return self.error('Soutěžící nesoutěží v této oblasti')
+
+ rights = self.gatekeeper.rights_for_contest(pion.contest)
+ if not rights.can_edit_points(self.round):
+ return self.error('Nemáte právo na úpravu bodů')
+
+ user = pion.user
+ if user.first_name != krestni or user.last_name != prijmeni:
+ return self.error('Neodpovídá ID a jméno soutěžícího')
+
+ if sol is None:
+ return self.error('Tento soutěžící úlohu neodevzdal')
+
+ if sol.points != body:
+ sol.points = body
+ sess.add(db.PointsHistory(
+ task=self.task,
+ participant_id=user_id,
+ user=self.user,
+ points_at=mo.now,
+ points=body,
+ ))
+ self.cnt_set_points += 1
+
+ def get_template(self) -> str:
+ rows = []
+ for pion, sol in sorted(self._pion_sol_query().all(), key=lambda pair: pair[0].user.sort_key()):
+ if sol is not None:
+ user = pion.user
+ rows.append(PointsImportRow(
+ user_id=user.user_id,
+ krestni=user.first_name,
+ prijmeni=user.last_name,
+ body=sol.points,
+ ))
+
+ out = io.StringIO()
+ mo.csv.write(file=out, fmt=self.fmt, row_class=self.row_class, rows=rows)
+ return out.getvalue()
+
+
+def create_import(user: db.User,
+ type: ImportType,
+ fmt: FileFormat,
+ round: Optional[db.Round] = None,
+ contest: Optional[db.Contest] = None,
+ task: Optional[db.Task] = None):
imp: Import
if type == ImportType.participants:
imp = ContestImport()
@@ -549,12 +699,15 @@ def create_import(user: db.User, type: ImportType, fmt: FileFormat, round: Optio
imp = ProctorImport()
elif type == ImportType.judges:
imp = JudgeImport()
+ elif type == ImportType.points:
+ imp = PointsImport()
else:
assert False, "Neznámý typ importu"
imp.user = user
imp.round = round
imp.contest = contest
+ imp.task = task
imp.fmt = fmt
imp.gatekeeper = mo.rights.Gatekeeper(user)
imp.setup()
--
GitLab