Select Git revision
-
Martin Mareš authoredMartin Mareš authored
imports.py 27.22 KiB
from dataclasses import dataclass
import decimal
from enum import auto
import io
import re
from sqlalchemy import and_
from sqlalchemy.orm import joinedload, Query
from typing import List, Optional, Any, Dict, Type, Union
import mo.csv
from mo.csv import FileFormat, MissingHeaderError
import mo.db as db
import mo.rights
import mo.users
import mo.util
from mo.util import logger
from mo.util_format import format_decimal
class ImportType(db.MOEnum):
participants = auto()
proctors = auto()
judges = auto()
points = auto()
def friendly_name(self) -> str:
return import_type_names[self]
import_type_names = {
ImportType.participants.name: 'účastníci',
ImportType.proctors.name: 'dozor',
ImportType.judges.name: 'opravovatelé',
ImportType.points.name: 'body',
}
class Import:
# Výsledek importu
errors: List[str]
warnings: List[str]
cnt_rows: int = 0
cnt_new_users: int = 0
cnt_new_participants: int = 0
cnt_new_participations: int = 0
cnt_new_roles: int = 0
cnt_set_points: int = 0
cnt_add_sols: int = 0
cnt_del_sols: int = 0
# Veřejné vlastnosti importu
template_basename: str = "sablona"
# Interní: Co a jak zrovna importujeme
user: db.User
round: Optional[db.Round]
contest: Optional[db.Contest]
task: Optional[db.Task] # pro Import bodů
allow_add_del: bool # pro Import bodů: je povoleno zakládat/mazat řešení
fmt: FileFormat
row_class: Type[mo.csv.Row]
row_example: mo.csv.Row
log_msg_prefix: str
log_details: Any
# Interní: Stav importu
place_cache: Dict[str, db.Place]
school_place_cache: Dict[str, db.Place]
gatekeeper: mo.rights.Gatekeeper
new_user_ids: List[int]
line_number: int = 0
def __init__(self):
self.errors = []
self.warnings = []
self.rr = None
self.place_cache = {}
self.school_place_cache = {}
self.new_user_ids = []
def setup(self):
# Definováno odvozenými třídami
assert NotImplementedError()
def error(self, msg: str) -> Any:
if self.line_number > 0:
msg = f"Řádek {self.line_number}: {msg}"
self.errors.append(msg)
logger.info('Import: >> %s', msg)
return None # Kdyby bylo otypováno správně jako -> None, při volání by si mypy stěžoval
def parse_user_id(self, user_id_str: str) -> Optional[int]:
if user_id_str == "":
return self.error('Chybí ID uživatele')
try:
return int(user_id_str)
except ValueError:
return self.error('ID uživatele není číslo')
def parse_email(self, email: str) -> Optional[str]:
if email == "":
return self.error('Chybí e-mailová adresa')
try:
return mo.users.normalize_email(email)
except mo.CheckError as e:
return self.error(str(e))
def parse_name(self, name: str) -> Optional[str]:
if name == "":
return self.error('Jméno nesmí být prázdné')
# XXX: Tato kontrola úmyslně není striktní, aby prošla i jména jako 'de Beer'
if name == name.lower():
return self.error('Ve jméně nejsou velká písmena')
if name == name.upper():
return self.error('Ve jméně nejsou malá písmena')
return name
def check_rights(self, place: db.Place) -> bool:
round = self.round
assert round is not None
rights = self.gatekeeper.rights_for(place, round.year, round.category, round.seq)
return rights.have_right(mo.rights.Right.manage_contest)
def parse_opt_place(self, kod: str, what: str) -> Optional[db.Place]:
if kod == "":
return None
if kod in self.place_cache:
return self.place_cache[kod]
place = db.get_place_by_code(kod)
if not place:
return self.error(f'{what.title()} s kódem "{kod}" neexistuje'+
('. Nechybí vám # na začátku?' if re.fullmatch(r'\d+', kod) else ''))
if not self.check_rights(place):
return self.error(f'Nemáte práva na správu soutěže {place.name_locative()}')
self.place_cache[kod] = place
return place
def parse_school(self, kod: str) -> Optional[db.Place]:
if kod == "":
return self.error('Škola je povinná')
if kod in self.school_place_cache:
return self.school_place_cache[kod]
place = db.get_place_by_code(kod, fetch_school=True)
if not place:
return self.error(f'Škola s kódem "{kod}" nenalezena'+
('. Nechybí vám # na začátku?' if re.fullmatch(r'\d+', kod) else ''))
if place.type != db.PlaceType.school:
return self.error(f'Kód školy "{kod}" neodpovídá škole')
self.school_place_cache[kod] = place
return place
def parse_grade(self, rocnik: str, school: Optional[db.School]) -> Optional[str]:
if not school:
return None
# Ve snaze zabránit Excelu v interpretování ročníku jako kalendářního data
# lidé připisují všechny možné i nemožné znaky, které vypadají jako apostrof :)
rocnik = re.sub('[\'"\u00b4\u2019]', "", rocnik)
if (not re.fullmatch(r'\d(/\d)?', rocnik)):
return self.error(f'Ročník má neplatný formát, musí to být buď číslice, nebo číslice/číslice')
if (not school.is_zs and re.fullmatch(r'\d', rocnik)):
return self.error(f'Ročník pro střední školu ({school.place.name}) zapisujte ve formátu číslice/číslice')
if (not school.is_ss and re.fullmatch(r'\d/\d', rocnik)):
return self.error(f'Ročník pro základní školu ({school.place.name}) zapisujte jako číslici 1–9')
return rocnik
def parse_born(self, rok: str) -> Optional[int]:
if not re.fullmatch(r'\d{4}', rok):
return self.error('Rok narození musí být čtyřciferné číslo')
r = int(rok)
if r < 2000 or r > 2099:
return self.error('Rok narození musí být v intervalu [2000,2099]')
return r
def find_or_create_user(self, email: str, krestni: str, prijmeni: str, is_org: bool) -> Optional[db.User]:
sess = db.get_session()
user = sess.query(db.User).filter_by(email=email).one_or_none()
if user:
if user.first_name != krestni or user.last_name != prijmeni:
return self.error(f'Osoba již registrována s odlišným jménem {user.full_name()}')
if (user.is_admin or user.is_org) != is_org:
if is_org:
return self.error('Nelze předefinovat účastníka na organizátora')
else:
return self.error('Nelze předefinovat organizátora na účastníka')
else:
user = db.User(email=email, first_name=krestni, last_name=prijmeni, is_org=is_org)
sess.add(user)
sess.flush() # Aby uživatel dostal user_id
logger.info(f'Import: Založen uživatel user=#{user.user_id} email=<{user.email}>')
mo.util.log(
type=db.LogType.user,
what=user.user_id,
details={'action': 'import', 'new': db.row2dict(user)},
)
self.cnt_new_users += 1
self.new_user_ids.append(user.user_id)
return user
def parse_points(self, points_str: str) -> Union[decimal.Decimal, str, None]:
if points_str == "":
return self.error('Body musí být vyplněny')
points_str = points_str.upper()
if points_str in ['X', '?']:
return points_str
pts, error = mo.util.parse_points(points_str, self.task, self.round)
if error:
return self.error(error)
return pts
def find_or_create_participant(self, user: db.User, year: int, school_id: int, birth_year: int, grade: str) -> Optional[db.Participant]:
sess = db.get_session()
part = sess.query(db.Participant).get((user.user_id, year))
if part:
if (part.school != school_id
or part.grade != grade
or part.birth_year != birth_year):
return self.error('Účastník již zaregistrován s odlišnou školou/ročníkem/rokem narození')
else:
part = db.Participant(user=user, year=year, school=school_id, birth_year=birth_year, grade=grade)
sess.add(part)
logger.info(f'Import: Založen účastník #{user.user_id}')
mo.util.log(
type=db.LogType.participant,
what=user.user_id,
details={'action': 'import', 'new': db.row2dict(part)},
)
self.cnt_new_participants += 1
return part
def find_or_create_participation(self, user: db.User, contest: db.Contest, place: Optional[db.Place]) -> Optional[db.Participation]:
if place is None:
place = contest.place
sess = db.get_session()
pions = (sess.query(db.Participation)
.filter_by(user=user)
.filter(db.Participation.contest.has(db.Contest.round == contest.round))
.all())
if not pions:
pion = db.Participation(user=user, contest=contest, place_id=place.place_id, state=db.PartState.invited)
sess.add(pion)
logger.info(f'Import: Založena účast user=#{user.user_id} contest=#{contest.contest_id} place=#{place.place_id}')
mo.util.log(
type=db.LogType.participant,
what=user.user_id,
details={'action': 'add-to-contest', 'new': db.row2dict(pion)},
)
self.cnt_new_participations += 1
elif len(pions) == 1:
pion = pions[0]
if pion.place != place:
return self.error(f'Již se tohoto kola účastní v {contest.round.get_level().name_locative("jiném", "jiné", "jiném")} ({pion.place.get_code()})')
else:
return self.error('Již se tohoto kola účastní ve vice oblastech, což by nemělo být možné')
return pion
def obtain_contest(self, oblast: Optional[db.Place], allow_none: bool = False):
if self.contest:
contest = self.contest
if oblast is not None and oblast.place_id != contest.place.place_id:
return self.error('Oblast neodpovídá té, do které se importuje')
else:
if oblast is None:
if not allow_none:
self.error('Je nutné uvést ' + self.round.get_level().name)
return None
contest = db.get_session().query(db.Contest).filter_by(round=self.round, place=oblast).one_or_none()
if contest is None:
return self.error('V ' + self.round.get_level().name_locative("uvedeném", "uvedené", "uvedeném") + ' toto kolo neprobíhá')
return contest
def add_role(self, user: db.User, place: db.Place, role: db.RoleType):
sess = db.get_session()
round = self.round
assert round is not None
if (sess.query(db.UserRole)
.filter_by(user=user, place=place, role=role,
category=round.category, year=round.year, seq=round.seq)
.with_for_update()
.first()):
pass
else:
ur = db.UserRole(user=user, place=place, role=role,
category=round.category, year=round.year, seq=round.seq,
assigned_by_user=self.user)
sess.add(ur)
sess.flush()
logger.info(f'Import: {role.name.title()} user=#{user.user_id} place=#{place.place_id} user_role=#{ur.user_role_id}')
mo.util.log(
type=db.LogType.user_role,
what=ur.user_role_id,
details={'action': 'import', 'new': db.row2dict(ur)},
)
self.cnt_new_roles += 1
def import_row(self, r: mo.csv.Row):
# Definováno odvozenými třídami
assert NotImplementedError()
def log_start(self, path):
args = [f'user=#{self.user.user_id}', f'fmt={self.fmt.name}']
if self.round is not None:
args.append(f'round=#{self.round.round_id}')
if self.contest is not None:
args.append(f'contest=#{self.contest.contest_id}')
if self.task is not None:
args.append(f'task=#{self.task.task_id}')
logger.info('Import: %s ze souboru %s: %s', self.log_msg_prefix, path, " ".join(args))
def log_end(self):
args = [f'rows=#{self.cnt_rows}']
for key, val in [
('users', self.cnt_new_users),
('p-ants', self.cnt_new_participants),
('p-ions', self.cnt_new_participations),
('roles', self.cnt_new_roles),
('points', self.cnt_set_points),
('add-sols', self.cnt_add_sols),
('del-sols', self.cnt_del_sols),
]:
if val > 0:
args.append(f'{key}={val}')
logger.info('Import: Hotovo (%s)', " ".join(args))
if self.contest is not None:
mo.util.log(
type=db.LogType.contest,
what=self.contest.contest_id,
details=self.log_details,
)
elif self.round is not None:
mo.util.log(
type=db.LogType.round,
what=self.round.round_id,
details=self.log_details,
)
else:
assert False
def check_utf8(self, path: str) -> bool:
# Není pěkné, že soubory čteme dvakrát, ale ve srovnání s pasekou, kterou by
# napáchal import ve špatném kódování, je to maličkost.
try:
with open(path, encoding='utf-8') as f:
for _ in f:
pass
return True
except UnicodeDecodeError:
return False
def generic_import(self, path: str) -> bool:
charset = self.fmt.get_charset()
if charset != 'utf-8' and self.check_utf8(path):
logger.info('Import: Uhodnuto kódování utf-8')
charset = 'utf-8'
try:
with open(path, encoding=charset) as file:
try:
rows: List[mo.csv.Row]
rows, warnings = mo.csv.read(file=file, fmt=self.fmt, row_class=self.row_class)
self.warnings += warnings
except MissingHeaderError:
return self.error('Souboru chybí první řádek s názvy sloupců')
except UnicodeDecodeError:
return self.error(f'Soubor není v kódování {self.fmt.get_charset()}')
except Exception as e:
return self.error(f'Chybná struktura tabulky: {e}')
self.line_number = 2
for row in rows:
self.cnt_rows += 1
self.import_row(row)
if len(self.errors) >= 100:
self.errors.append('Import přerušen pro příliš mnoho chyb')
break
self.line_number += 1
return len(self.errors) == 0
def notify_users(self):
# Projde všechny uživatele a těm, kteří ještě nemají nastavené heslo,
# ani nepožádali o jeho reset, pošle mail s odkazem na reset. Každého
# uživatele zpracováváme ve zvlášť transakci, aby se po chybě neztratila
# informace o tom, že už jsme nějaké maily rozeslali.
sess = db.get_session()
for uid in self.new_user_ids:
u = sess.query(db.User).get(uid)
if u and not u.password_hash and not u.reset_at:
token = mo.users.ask_reset_password(u)
sess.commit()
mo.util.send_new_account_email(u, token)
else:
sess.rollback()
def get_template(self) -> str:
# Odvozené třídy mohou přetížit
out = io.StringIO()
mo.csv.write(file=out, fmt=self.fmt, row_class=self.row_class, rows=[self.row_example])
return out.getvalue()
def run(self, path: str) -> bool:
self.log_start(path)
if not self.generic_import(path):
logger.info('Import: Rollback')
db.get_session().rollback()
return False
self.log_end()
db.get_session().commit()
self.notify_users()
return True
@dataclass
class ContestImportRow(mo.csv.Row):
email: str = ""
krestni: str = ""
prijmeni: str = ""
kod_skoly: str = ""
rocnik: str = ""
rok_naroz: str = ""
kod_mista: str = ""
kod_oblasti: str = ""
class ContestImport(Import):
row_class = ContestImportRow
row_example = ContestImportRow(
email="nekdo@example.org",
krestni="Pokusný",
prijmeni="Králík",
kod_skoly="#3333",
rocnik="1/8",
rok_naroz="2000",
)
log_msg_prefix = 'Účastníci'
log_details = {'action': 'import'}
template_basename = 'sablona-ucast'
def setup(self):
assert self.round is not None
def import_row(self, r: mo.csv.Row):
assert isinstance(r, ContestImportRow)
num_prev_errs = len(self.errors)
email = self.parse_email(r.email)
krestni = self.parse_name(r.krestni)
prijmeni = self.parse_name(r.prijmeni)
school_place = self.parse_school(r.kod_skoly)
rocnik = self.parse_grade(r.rocnik, (school_place.school if school_place else None))
rok_naroz = self.parse_born(r.rok_naroz)
misto = self.parse_opt_place(r.kod_mista, 'místo')
oblast = self.parse_opt_place(r.kod_oblasti, 'oblast')
if (len(self.errors) > num_prev_errs
or email is None
or krestni is None
or prijmeni is None
or school_place is None
or rocnik is None
or rok_naroz is None):
return
user = self.find_or_create_user(email, krestni, prijmeni, is_org=False)
if user is None:
return
part = self.find_or_create_participant(user, mo.current_year, school_place.place_id, rok_naroz, rocnik)
if part is None:
return
contest = self.obtain_contest(oblast)
if contest is None:
return
self.find_or_create_participation(user, contest, misto)
@dataclass
class ProctorImportRow(mo.csv.Row):
email: str = ""
krestni: str = ""
prijmeni: str = ""
kod_mista: str = ""
class ProctorImport(Import):
row_class = ProctorImportRow
row_example = ProctorImportRow(
email='nekdo@example.org',
krestni='Pokusný',
prijmeni='Králík',
kod_mista='#3333',
)
log_msg_prefix = 'Dozor'
log_details = {'action': 'import-proctors'}
template_basename = 'sablona-dozor'
def setup(self):
assert self.round is not None
def import_row(self, r: mo.csv.Row):
assert isinstance(r, ProctorImportRow)
num_prev_errs = len(self.errors)
email = self.parse_email(r.email)
krestni = self.parse_name(r.krestni)
prijmeni = self.parse_name(r.prijmeni)
misto = self.parse_opt_place(r.kod_mista, 'místo')
if misto is None:
return self.error('Kód místa je povinné uvést')
if (len(self.errors) > num_prev_errs
or email is None
or krestni is None
or prijmeni is None):
return
user = self.find_or_create_user(email, krestni, prijmeni, is_org=True)
if user is None:
return
self.add_role(user, misto, db.RoleType.dozor)
@dataclass
class JudgeImportRow(mo.csv.Row):
email: str = ""
krestni: str = ""
prijmeni: str = ""
kod_oblasti: str = ""
class JudgeImport(Import):
row_class = JudgeImportRow
row_example = JudgeImportRow(
email='nekdo@example.org',
krestni='Pokusný',
prijmeni='Králík',
kod_oblasti='B',
)
log_msg_prefix = 'Opravovatelé'
log_details = {'action': 'import-judges'}
template_basename = 'sablona-oprav'
root_place: db.Place
def setup(self):
assert self.round is not None
self.root_place = db.get_root_place()
def import_row(self, r: mo.csv.Row):
assert isinstance(r, JudgeImportRow)
num_prev_errs = len(self.errors)
email = self.parse_email(r.email)
krestni = self.parse_name(r.krestni)
prijmeni = self.parse_name(r.prijmeni)
oblast = self.parse_opt_place(r.kod_oblasti, 'oblast')
if (len(self.errors) > num_prev_errs
or email is None
or krestni is None
or prijmeni is None):
return
user = self.find_or_create_user(email, krestni, prijmeni, is_org=True)
if user is None:
return
contest = self.obtain_contest(oblast, allow_none=True)
place = contest.place if contest else self.root_place
if not self.check_rights(place):
return self.error(f'Nemáte práva na správu soutěže {place.name_locative()}')
self.add_role(user, place, db.RoleType.opravovatel)
@dataclass
class PointsImportRow(mo.csv.Row):
user_id: str = ""
krestni: str = ""
prijmeni: str = ""
body: str = ""
class PointsImport(Import):
row_class = PointsImportRow
log_msg_prefix = 'Body'
def setup(self):
assert self.round is not None
assert self.task is not None
self.log_details = {'action': 'import-points', 'task': self.task.code}
self.template_basename = 'body-' + self.task.code
def _pion_sol_query(self) -> Query:
sess = db.get_session()
query = (sess.query(db.Participation, db.Solution)
.select_from(db.Participation)
.outerjoin(db.Solution, and_(db.Solution.user_id == db.Participation.user_id, db.Solution.task == self.task))
.options(joinedload(db.Participation.user)))
if self.contest is not None:
query = query.filter(db.Participation.contest_id == self.contest.master_contest_id)
else:
contest_query = sess.query(db.Contest.master_contest_id).filter_by(round=self.round)
query = query.filter(db.Participation.contest_id.in_(contest_query.subquery()))
return query
def import_row(self, r: mo.csv.Row):
assert isinstance(r, PointsImportRow)
num_prev_errs = len(self.errors)
user_id = self.parse_user_id(r.user_id)
krestni = self.parse_name(r.krestni)
prijmeni = self.parse_name(r.prijmeni)
body = self.parse_points(r.body)
if (len(self.errors) > num_prev_errs
or user_id is None
or krestni is None
or prijmeni is None
or body is None):
return
assert self.round is not None
assert self.task is not None
task_id = self.task.task_id
sess = db.get_session()
query = self._pion_sol_query().filter(db.Participation.user_id == user_id)
pion_sols = query.all()
if not pion_sols:
return self.error('Soutěžící nenalezen v tomto kole')
elif len(pion_sols) > 1:
return self.error('Soutěžící v tomto kole soutěží vícekrát, neumím zpracovat')
pion, sol = pion_sols[0]
if self.contest is not None:
if pion.contest != self.contest:
return self.error('Soutěžící nesoutěží v ' + self.round.get_level().name_locative('tomto', 'této', 'tomto'))
rights = self.gatekeeper.rights_for_contest(pion.contest)
if not rights.can_edit_points():
return self.error('Nemáte právo na úpravu bodů')
user = pion.user
if user.first_name != krestni or user.last_name != prijmeni:
return self.error('Neodpovídá ID a jméno soutěžícího')
if sol is None:
if body == 'X':
return
if not self.allow_add_del:
return self.error('Tento soutěžící úlohu neodevzdal')
if not rights.can_upload_solutions():
return self.error('Nemáte právo na zakládání nových řešení')
sol = db.Solution(user_id=user_id, task_id=task_id)
sess.add(sol)
logger.info(f'Import: Založeno řešení user=#{user_id} task=#{task_id}')
mo.util.log(
type=db.LogType.participant,
what=user_id,
details={'action': 'solution-created', 'task': task_id},
)
self.cnt_add_sols += 1
elif body == 'X':
if not self.allow_add_del:
return self.error('Tento soutěžící úlohu odevzdal')
if sol.final_submit is not None or sol.final_feedback is not None:
return self.error('Nelze smazat řešení, ke kterému existují odevzdané soubory')
if not rights.can_upload_solutions():
return self.error('Nemáte právo na mazání řešení')
logger.info(f'Import: Smazáno řešení user=#{user_id} task=#{task_id}')
mo.util.log(
type=db.LogType.participant,
what=user_id,
details={'action': 'solution-removed', 'task': task_id},
)
self.cnt_del_sols += 1
sess.delete(sol)
return
points = body if isinstance(body, decimal.Decimal) else None
if sol.points != points:
sol.points = points
sess.add(db.PointsHistory(
task=self.task,
participant_id=user_id,
user=self.user,
points_at=mo.now,
points=points,
))
self.cnt_set_points += 1
def get_template(self) -> str:
rows = []
for pion, sol in sorted(self._pion_sol_query().all(), key=lambda pair: pair[0].user.sort_key()):
if sol is None:
pts = 'X'
elif sol.points is None:
pts = '?'
else:
pts = format_decimal(sol.points)
user = pion.user
rows.append(PointsImportRow(
user_id=user.user_id,
krestni=user.first_name,
prijmeni=user.last_name,
body=pts,
))
out = io.StringIO()
mo.csv.write(file=out, fmt=self.fmt, row_class=self.row_class, rows=rows)
return out.getvalue()
def create_import(user: db.User,
type: ImportType,
fmt: FileFormat,
round: Optional[db.Round] = None,
contest: Optional[db.Contest] = None,
task: Optional[db.Task] = None,
allow_add_del: bool = False):
imp: Import
if type == ImportType.participants:
imp = ContestImport()
elif type == ImportType.proctors:
imp = ProctorImport()
elif type == ImportType.judges:
imp = JudgeImport()
elif type == ImportType.points:
imp = PointsImport()
else:
assert False, "Neznámý typ importu"
imp.user = user
imp.round = round
imp.contest = contest
imp.task = task
imp.allow_add_del = allow_add_del
imp.fmt = fmt
imp.gatekeeper = mo.rights.Gatekeeper(user)
imp.setup()
return imp