Skip to content
Snippets Groups Projects
Commit 7d95a8cd authored by Martin Mareš's avatar Martin Mareš
Browse files

Import: Bodování

parent 8e536825
No related branches found
No related tags found
1 merge request!32Dávkové bodování
......@@ -2,6 +2,8 @@ from dataclasses import dataclass
from enum import auto
import io
import re
from sqlalchemy import and_
from sqlalchemy.orm import joinedload, Query
from typing import List, Optional, Any, Dict, Type
import mo.csv
......@@ -17,6 +19,7 @@ class ImportType(db.MOEnum):
participants = auto()
proctors = auto()
judges = auto()
points = auto()
def friendly_name(self) -> str:
return import_type_names[self]
......@@ -26,6 +29,7 @@ import_type_names = {
ImportType.participants.name: 'účastníci',
ImportType.proctors.name: 'dozor',
ImportType.judges.name: 'opravovatelé',
ImportType.points.name: 'body',
}
......@@ -37,6 +41,7 @@ class Import:
cnt_new_participants: int = 0
cnt_new_participations: int = 0
cnt_new_roles: int = 0
cnt_set_points: int = 0
# Veřejné vlastnosti importu
template_basename: str = "sablona"
......@@ -45,6 +50,7 @@ class Import:
user: db.User
round: Optional[db.Round]
contest: Optional[db.Contest]
task: Optional[db.Task]
fmt: FileFormat
row_class: Type[mo.csv.Row]
row_example: mo.csv.Row
......@@ -76,6 +82,15 @@ class Import:
logger.info('Import: >> %s', msg)
return None # Kdyby bylo otypováno správně jako -> None, při volání by si mypy stěžoval
def parse_user_id(self, user_id_str: str) -> Optional[int]:
if user_id_str == "":
return self.error('Chybí ID uživatele')
try:
return int(user_id_str)
except ValueError:
return self.error('ID uživatele není číslo')
def parse_email(self, email: str) -> Optional[str]:
if email == "":
return self.error('Chybí e-mailová adresa')
......@@ -187,6 +202,20 @@ class Import:
self.new_user_ids.append(user.user_id)
return user
def parse_points(self, points_str: str) -> Optional[int]:
if points_str == "":
return None
try:
pts = int(points_str)
except ValueError:
return self.error('Body nejsou celé číslo')
if pts < 0:
return self.error('Body nesmí být záporné')
return pts
def find_or_create_participant(self, user: db.User, year: int, school_id: int, birth_year: int, grade: str) -> Optional[db.Participant]:
sess = db.get_session()
part = sess.query(db.Participant).get((user.user_id, year))
......@@ -287,11 +316,24 @@ class Import:
args.append(f'round=#{self.round.round_id}')
if self.contest is not None:
args.append(f'contest=#{self.contest.contest_id}')
if self.task is not None:
args.append(f'task=#{self.task.task_id}')
logger.info('Import: %s ze souboru %s: %s', self.log_msg_prefix, path, " ".join(args))
def log_end(self):
logger.info(f'Import: Hotovo (rows={self.cnt_rows} users={self.cnt_new_users} p-ants={self.cnt_new_participants} p-ions={self.cnt_new_participations} roles={self.cnt_new_roles})')
args = [f'rows=#{self.cnt_rows}']
for key, val in [
('users', self.cnt_new_users),
('p-ants', self.cnt_new_participants),
('p-ions', self.cnt_new_participations),
('roles', self.cnt_new_roles),
('points', self.cnt_set_points),
]:
if val > 0:
args.append(f'{key}={val}')
logger.info('Import: Hotovo (%s)', " ".join(args))
if self.contest is not None:
mo.util.log(
type=db.LogType.contest,
......@@ -362,6 +404,7 @@ class Import:
sess.rollback()
def get_template(self) -> str:
# Odvozené třídy mohou přetížit
out = io.StringIO()
mo.csv.write(file=out, fmt=self.fmt, row_class=self.row_class, rows=[self.row_example])
return out.getvalue()
......@@ -541,7 +584,114 @@ class JudgeImport(Import):
self.add_role(user, place, db.RoleType.opravovatel)
def create_import(user: db.User, type: ImportType, fmt: FileFormat, round: Optional[db.Round] = None, contest: Optional[db.Contest] = None):
@dataclass
class PointsImportRow(mo.csv.Row):
user_id: str = ""
krestni: str = ""
prijmeni: str = ""
body: str = ""
class PointsImport(Import):
row_class = PointsImportRow
log_msg_prefix = 'Body'
def setup(self):
assert self.round is not None
assert self.task is not None
self.log_details = {'action': 'import-points', 'task': self.task.code}
self.template_basename = 'body-' + self.task.code
def _pion_sol_query(self) -> Query:
sess = db.get_session()
query = (sess.query(db.Participation, db.Solution)
.select_from(db.Participation)
.outerjoin(db.Solution, and_(db.Solution.user_id == db.Participation.user_id, db.Solution.task == self.task))
.options(joinedload(db.Participation.user)))
if self.contest is not None:
query = query.filter(db.Participation.contest == self.contest)
else:
contest_query = sess.query(db.Contest.contest_id).filter_by(round=self.round)
query = query.filter(db.Participation.contest_id.in_(contest_query.subquery()))
return query
def import_row(self, r: mo.csv.Row):
assert isinstance(r, PointsImportRow)
num_prev_errs = len(self.errors)
user_id = self.parse_user_id(r.user_id)
krestni = self.parse_name(r.krestni)
prijmeni = self.parse_name(r.prijmeni)
body = self.parse_points(r.body)
if (len(self.errors) > num_prev_errs
or user_id is None
or krestni is None
or prijmeni is None):
return
assert self.round is not None
assert self.task is not None
sess = db.get_session()
query = self._pion_sol_query().filter(db.Solution.user_id == user_id)
pion_sols = query.all()
if not pion_sols:
return self.error('Soutěžící nenalezen v tomto kole')
elif len(pion_sols) > 1:
return self.error('Soutěžící v tomto kole soutěží vícekrát, neumím zpracovat')
pion, sol = pion_sols[0]
if self.contest is not None:
if pion.contest != self.contest:
return self.error('Soutěžící nesoutěží v této oblasti')
rights = self.gatekeeper.rights_for_contest(pion.contest)
if not rights.can_edit_points(self.round):
return self.error('Nemáte právo na úpravu bodů')
user = pion.user
if user.first_name != krestni or user.last_name != prijmeni:
return self.error('Neodpovídá ID a jméno soutěžícího')
if sol is None:
return self.error('Tento soutěžící úlohu neodevzdal')
if sol.points != body:
sol.points = body
sess.add(db.PointsHistory(
task=self.task,
participant_id=user_id,
user=self.user,
points_at=mo.now,
points=body,
))
self.cnt_set_points += 1
def get_template(self) -> str:
rows = []
for pion, sol in sorted(self._pion_sol_query().all(), key=lambda pair: pair[0].user.sort_key()):
if sol is not None:
user = pion.user
rows.append(PointsImportRow(
user_id=user.user_id,
krestni=user.first_name,
prijmeni=user.last_name,
body=sol.points,
))
out = io.StringIO()
mo.csv.write(file=out, fmt=self.fmt, row_class=self.row_class, rows=rows)
return out.getvalue()
def create_import(user: db.User,
type: ImportType,
fmt: FileFormat,
round: Optional[db.Round] = None,
contest: Optional[db.Contest] = None,
task: Optional[db.Task] = None):
imp: Import
if type == ImportType.participants:
imp = ContestImport()
......@@ -549,12 +699,15 @@ def create_import(user: db.User, type: ImportType, fmt: FileFormat, round: Optio
imp = ProctorImport()
elif type == ImportType.judges:
imp = JudgeImport()
elif type == ImportType.points:
imp = PointsImport()
else:
assert False, "Neznámý typ importu"
imp.user = user
imp.round = round
imp.contest = contest
imp.task = task
imp.fmt = fmt
imp.gatekeeper = mo.rights.Gatekeeper(user)
imp.setup()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment