Project 'mj/mo-submit' was moved to 'mo-p/osmo'. Please update any links and bookmarks that may still have the old path.
Select Git revision
Jiří Kalvoda authored
imports.py 26.08 KiB
from dataclasses import dataclass
import decimal
from enum import auto
import io
import re
from sqlalchemy import and_
from sqlalchemy.orm import joinedload, Query
from typing import List, Optional, Any, Dict, Type, Union, Set
import mo.config as config
import mo.csv
from mo.csv import FileFormat, MissingHeaderError
import mo.db as db
import mo.email
import mo.rights
import mo.users
import mo.util
from mo.util import logger
from mo.util_format import format_decimal
class ImportType(db.MOEnum):
participants = auto()
proctors = auto()
judges = auto()
points = auto()
def friendly_name(self) -> str:
return import_type_names[self]
import_type_names = {
ImportType.participants.name: 'účastníci',
ImportType.proctors.name: 'dozor',
ImportType.judges.name: 'opravovatelé',
ImportType.points.name: 'body',
}
class Import:
# Výsledek importu
errors: List[str]
warnings: List[str]
cnt_rows: int = 0
cnt_new_users: int = 0
cnt_new_participants: int = 0
cnt_new_participations: int = 0
cnt_new_roles: int = 0
cnt_set_points: int = 0
cnt_add_sols: int = 0
cnt_del_sols: int = 0
# Veřejné vlastnosti importu
template_basename: str = "sablona"
# Interní: Co a jak zrovna importujeme
user: db.User
round: Optional[db.Round]
contest: Optional[db.Contest]
only_region: Optional[db.Place]
task: Optional[db.Task] # pro Import bodů
allow_add_del: bool # pro Import bodů: je povoleno zakládat/mazat řešení
fmt: FileFormat
row_class: Type[mo.csv.Row]
row_example: mo.csv.Row
log_msg_prefix: str
log_details: Any
# Interní: Stav importu
place_cache: Dict[str, db.Place]
school_place_cache: Dict[str, db.Place]
gatekeeper: mo.rights.Gatekeeper
new_user_ids: List[int]
line_number: int = 0
row_name: Optional[str] = None
def __init__(self):
self.errors = []
self.warnings = []
self.rr = None
self.place_cache = {}
self.school_place_cache = {}
self.new_user_ids = []
def setup(self):
# Definováno odvozenými třídami
assert NotImplementedError()
def error(self, msg: str) -> Any:
if self.line_number > 0:
if self.row_name:
msg = f"Řádek {self.line_number} ({self.row_name}): {msg}"
else:
msg = f"Řádek {self.line_number}: {msg}"
self.errors.append(msg)
logger.info('Import: >> %s', msg)
return None # Kdyby bylo otypováno správně jako -> None, při volání by si mypy stěžoval
def parse_user_id(self, user_id_str: str) -> Optional[int]:
if user_id_str == "":
return self.error('Chybí ID uživatele')
try:
return int(user_id_str)
except ValueError:
return self.error('ID uživatele není číslo')
def parse_email(self, email: str) -> Optional[str]:
if email == "":
return self.error('Chybí e-mailová adresa')
try:
return mo.users.normalize_email(email)
except mo.CheckError as e:
return self.error(str(e))
def parse_name(self, name: str) -> Optional[str]:
if name == "":
return self.error('Jméno nesmí být prázdné')
# XXX: Tato kontrola úmyslně není striktní, aby prošla i jména jako 'de Beer'
if name == name.lower():
return self.error('Ve jméně nejsou velká písmena')
if name == name.upper():
return self.error('Ve jméně nejsou malá písmena')
return name
def check_rights(self, place: db.Place) -> bool:
round = self.round
assert round is not None
rights = self.gatekeeper.rights_for(place, round.year, round.category, round.seq)
return rights.have_right(mo.rights.Right.manage_contest)
def parse_opt_place(self, kod: str, what: str) -> Optional[db.Place]:
if kod == "":
return None
if kod in self.place_cache:
return self.place_cache[kod]
place = db.get_place_by_code(kod)
if not place:
return self.error(f'{what.title()} s kódem "{kod}" neexistuje'+
('. Nechybí vám # na začátku?' if re.fullmatch(r'\d+', kod) else ''))
if not self.check_rights(place):
return self.error(f'Nemáte práva na správu soutěže {place.name_locative()}')
self.place_cache[kod] = place
return place
def parse_school(self, kod: str) -> Optional[db.Place]:
if kod in self.school_place_cache:
return self.school_place_cache[kod]
try:
place = mo.users.validate_and_find_school(kod)
except mo.CheckError as e:
return self.error(str(e))
self.school_place_cache[kod] = place
return place
def parse_grade(self, rocnik: str, school: Optional[db.School]) -> Optional[str]:
if not school:
return None
# Ve snaze zabránit Excelu v interpretování ročníku jako kalendářního data
# lidé připisují všechny možné i nemožné znaky, které vypadají jako apostrof :)
rocnik = re.sub('[\'"\u00b4\u2019]', "", rocnik)
try:
return mo.users.normalize_grade(rocnik, school)
except mo.CheckError as e:
return self.error(str(e))
def parse_born(self, rok: str) -> Optional[int]:
if not re.fullmatch(r'\d{4}', rok):
return self.error('Rok narození musí být čtyřciferné číslo')
r = int(rok)
try:
mo.users.validate_born_year(r)
except mo.CheckError as e:
return self.error(str(e))
return r
def find_or_create_user(self, email: str, krestni: Optional[str], prijmeni: Optional[str], is_org: bool) -> Optional[db.User]:
try:
user, is_new = mo.users.find_or_create_user(email, krestni, prijmeni, is_org, reason='import')
except mo.CheckError as e:
return self.error(str(e))
if is_new:
self.cnt_new_users += 1
self.new_user_ids.append(user.user_id)
return user
def parse_points(self, points_str: str) -> Union[decimal.Decimal, str, None]:
if points_str == "":
return self.error('Body musí být vyplněny')
points_str = points_str.upper()
if points_str in ['X', '?']:
return points_str
pts, error = mo.util.parse_points(points_str, self.task, self.round)
if error:
return self.error(error)
return pts
def find_or_create_participant(self, user: db.User, year: int, school_id: Optional[int], birth_year: Optional[int], grade: Optional[str]) -> Optional[db.Participant]:
try:
part, is_new = mo.users.find_or_create_participant(user, year, school_id, birth_year, grade, reason='import')
except mo.CheckError as e:
return self.error(str(e))
if is_new:
self.cnt_new_participants += 1
return part
def find_or_create_participation(self, user: db.User, contest: db.Contest, place: Optional[db.Place]) -> Optional[db.Participation]:
try:
pion, is_new = mo.users.find_or_create_participation(user, contest, place, reason='import')
except mo.CheckError as e:
return self.error(str(e))
if is_new:
self.cnt_new_participations += 1
return pion
def place_is_allowed(self, place: db.Place) -> bool:
if self.contest is not None and self.contest.place_id != place.place_id:
return False
if self.only_region is not None and not self.gatekeeper.is_ancestor_of(self.only_region, place):
return False
return True
def obtain_contest(self, oblast: Optional[db.Place], allow_none: bool = False):
assert self.round
if oblast is not None and not self.place_is_allowed(oblast):
return self.error('Oblast neodpovídá té, do které se importuje')
if self.contest:
contest = self.contest
else:
# Zde mluvíme o oblastech, místo abychom používali place_levels,
# protože sloupec má ve jménu oblast a také je potřeba rozlišovat školu
# účastníka a školu jako oblast.
if oblast is None:
if not allow_none:
self.error('Je nutné uvést kód oblasti')
return None
contest = db.get_session().query(db.Contest).filter_by(round=self.round, place=oblast).one_or_none()
if contest is None:
return self.error('V uvedené oblasti toto kolo neprobíhá')
return contest
def add_role(self, user: db.User, place: db.Place, role: db.RoleType):
sess = db.get_session()
round = self.round
assert round is not None
if (sess.query(db.UserRole)
.filter_by(user=user, place=place, role=role,
category=round.category, year=round.year, seq=round.seq)
.with_for_update()
.first()):
pass
else:
ur = db.UserRole(user=user, place=place, role=role,
category=round.category, year=round.year, seq=round.seq,
assigned_by_user=self.user)
sess.add(ur)
sess.flush()
logger.info(f'Import: {role.name.title()} user=#{user.user_id} place=#{place.place_id} user_role=#{ur.user_role_id}')
mo.util.log(
type=db.LogType.user_role,
what=ur.user_role_id,
details={'action': 'import', 'new': db.row2dict(ur)},
)
self.cnt_new_roles += 1
def import_row(self, r: mo.csv.Row):
# Definováno odvozenými třídami
assert NotImplementedError()
def log_start(self, path) -> None:
args = [f'user=#{self.user.user_id}', f'fmt={self.fmt.name}']
if self.round is not None:
args.append(f'round=#{self.round.round_id}')
if self.contest is not None:
args.append(f'contest=#{self.contest.contest_id}')
if self.only_region is not None:
args.append(f'region=#{self.only_region.place_id}')
if self.task is not None:
args.append(f'task=#{self.task.task_id}')
logger.info('Import: %s ze souboru %s: %s', self.log_msg_prefix, path, " ".join(args))
def log_end(self) -> None:
args = [f'rows=#{self.cnt_rows}']
for key, val in [
('users', self.cnt_new_users),
('p-ants', self.cnt_new_participants),
('p-ions', self.cnt_new_participations),
('roles', self.cnt_new_roles),
('points', self.cnt_set_points),
('add-sols', self.cnt_add_sols),
('del-sols', self.cnt_del_sols),
]:
if val > 0:
args.append(f'{key}={val}')
logger.info('Import: Hotovo (%s)', " ".join(args))
details = self.log_details.copy()
if self.only_region:
details['region'] = self.only_region.place_id
if self.contest is not None:
mo.util.log(
type=db.LogType.contest,
what=self.contest.contest_id,
details=details,
)
elif self.round is not None:
mo.util.log(
type=db.LogType.round,
what=self.round.round_id,
details=details,
)
else:
assert False
def check_utf8(self, path: str) -> bool:
# Není pěkné, že soubory čteme dvakrát, ale ve srovnání s pasekou, kterou by
# napáchal import ve špatném kódování, je to maličkost.
try:
with open(path, encoding='utf-8') as f:
for _ in f:
pass
return True
except UnicodeDecodeError:
return False
def get_row_name(self, row: mo.csv.Row) -> Optional[str]:
if hasattr(row, 'email'):
# čtení prvku potomka
return row.email # type: ignore
return None
def generic_import(self, path: str) -> bool:
charset = self.fmt.get_charset()
if charset != 'utf-8' and self.check_utf8(path):
logger.info('Import: Uhodnuto kódování utf-8')
charset = 'utf-8'
try:
with open(path, encoding=charset) as file:
try:
rows: List[mo.csv.Row]
rows, warnings = mo.csv.read(file=file, fmt=self.fmt, row_class=self.row_class)
self.warnings += warnings
except MissingHeaderError:
return self.error('Souboru chybí první řádek s názvy sloupců')
except UnicodeDecodeError:
return self.error(f'Soubor není v kódování {self.fmt.get_charset()}')
except Exception as e:
return self.error(f'Chybná struktura tabulky: {e}')
self.line_number = 2
for row in rows:
self.row_name = self.get_row_name(row)
self.cnt_rows += 1
self.import_row(row)
if len(self.errors) >= 100:
self.errors.append('Import přerušen pro příliš mnoho chyb')
break
self.line_number += 1
self.row_name = None
return len(self.errors) == 0
def notify_users(self) -> None:
# Projde všechny uživatele a těm, kteří ještě nemají nastavené heslo,
# ani nepožádali o jeho reset, pošle mail s odkazem na reset. Každého
# uživatele zpracováváme ve zvlášť transakci, aby se po chybě neztratila
# informace o tom, že už jsme nějaké maily rozeslali.
sess = db.get_session()
for uid in self.new_user_ids:
u = sess.query(db.User).get(uid)
if u and not u.password_hash and not u.reset_at:
token = mo.users.make_activation_token(u)
sess.commit()
mo.email.send_new_account_email(u, token)
else:
sess.rollback()
def get_template(self) -> str:
# Odvozené třídy mohou přetížit
out = io.StringIO()
mo.csv.write(file=out, fmt=self.fmt, row_class=self.row_class, rows=[self.row_example])
return out.getvalue()
def run(self, path: str) -> bool:
self.log_start(path)
if not self.generic_import(path):
logger.info('Import: Rollback')
db.get_session().rollback()
return False
self.log_end()
db.get_session().commit()
self.notify_users()
return True
@dataclass
class ContestImportRow(mo.csv.Row):
email: str = ""
krestni: str = ""
prijmeni: str = ""
kod_skoly: str = ""
rocnik: str = ""
rok_naroz: str = ""
kod_mista: str = ""
kod_oblasti: str = ""
class ContestImport(Import):
row_class = ContestImportRow
row_example = ContestImportRow(
email="nekdo@example.org",
krestni="Pokusný",
prijmeni="Králík",
kod_skoly="#3333",
rocnik="1/8",
rok_naroz="2000",
)
log_msg_prefix = 'Účastníci'
log_details = {'action': 'import'}
template_basename = 'sablona-ucast'
def setup(self):
assert self.round is not None
assert not self.round.is_subround()
def import_row(self, r: mo.csv.Row):
assert isinstance(r, ContestImportRow)
num_prev_errs = len(self.errors)
email = self.parse_email(r.email)
krestni = self.parse_name(r.krestni) if r.krestni else None
prijmeni = self.parse_name(r.prijmeni) if r.prijmeni else None
school_place = self.parse_school(r.kod_skoly) if r.kod_skoly else None
rocnik = self.parse_grade(r.rocnik, (school_place.school if school_place else None)) if r.rocnik else None
rok_naroz = self.parse_born(r.rok_naroz) if r.rok_naroz else None
misto = self.parse_opt_place(r.kod_mista, 'místo')
oblast = self.parse_opt_place(r.kod_oblasti, 'oblast')
if (len(self.errors) > num_prev_errs
or email is None):
return
user = self.find_or_create_user(email, krestni, prijmeni, is_org=False)
if user is None:
return
part = self.find_or_create_participant(user, config.CURRENT_YEAR, school_place.place_id if school_place else None, rok_naroz, rocnik)
if part is None:
return
contest = self.obtain_contest(oblast)
if contest is None:
return
self.find_or_create_participation(user, contest, misto)
@dataclass
class ProctorImportRow(mo.csv.Row):
email: str = ""
krestni: str = ""
prijmeni: str = ""
kod_mista: str = ""
class ProctorImport(Import):
row_class = ProctorImportRow
row_example = ProctorImportRow(
email='nekdo@example.org',
krestni='Pokusný',
prijmeni='Králík',
kod_mista='#3333',
)
log_msg_prefix = 'Dozor'
log_details = {'action': 'import-proctors'}
template_basename = 'sablona-dozor'
def setup(self):
assert self.round is not None
def import_row(self, r: mo.csv.Row):
assert isinstance(r, ProctorImportRow)
num_prev_errs = len(self.errors)
email = self.parse_email(r.email)
krestni = self.parse_name(r.krestni)
prijmeni = self.parse_name(r.prijmeni)
misto = self.parse_opt_place(r.kod_mista, 'místo')
if misto is None:
return self.error('Kód místa je povinné uvést')
if (len(self.errors) > num_prev_errs
or email is None
or krestni is None
or prijmeni is None):
return
user = self.find_or_create_user(email, krestni, prijmeni, is_org=True)
if user is None:
return
self.add_role(user, misto, db.RoleType.dozor)
@dataclass
class JudgeImportRow(mo.csv.Row):
email: str = ""
krestni: str = ""
prijmeni: str = ""
kod_oblasti: str = ""
class JudgeImport(Import):
row_class = JudgeImportRow
row_example = JudgeImportRow(
email='nekdo@example.org',
krestni='Pokusný',
prijmeni='Králík',
kod_oblasti='B',
)
log_msg_prefix = 'Opravovatelé'
log_details = {'action': 'import-judges'}
template_basename = 'sablona-oprav'
root_place: db.Place
def setup(self):
assert self.round is not None
self.root_place = db.get_root_place()
def import_row(self, r: mo.csv.Row):
assert isinstance(r, JudgeImportRow)
num_prev_errs = len(self.errors)
email = self.parse_email(r.email)
krestni = self.parse_name(r.krestni)
prijmeni = self.parse_name(r.prijmeni)
oblast = self.parse_opt_place(r.kod_oblasti, 'oblast')
if (len(self.errors) > num_prev_errs
or email is None
or krestni is None
or prijmeni is None):
return
user = self.find_or_create_user(email, krestni, prijmeni, is_org=True)
if user is None:
return
contest = self.obtain_contest(oblast, allow_none=True)
place = contest.place if contest else self.root_place
if not self.check_rights(place):
return self.error(f'Nemáte práva na správu soutěže {place.name_locative()}')
self.add_role(user, place, db.RoleType.opravovatel)
@dataclass
class PointsImportRow(mo.csv.Row):
user_id: str = ""
krestni: str = ""
prijmeni: str = ""
body: str = ""
class PointsImport(Import):
row_class = PointsImportRow
log_msg_prefix = 'Body'
def setup(self):
assert self.round is not None
assert self.task is not None
self.log_details = {'action': 'import-points', 'task': self.task.code}
self.template_basename = 'body-' + self.task.code
def _pion_sol_query(self) -> Query:
sess = db.get_session()
query = (sess.query(db.Participation, db.Solution)
.select_from(db.Participation)
.outerjoin(db.Solution, and_(db.Solution.user_id == db.Participation.user_id, db.Solution.task == self.task))
.options(joinedload(db.Participation.user)))
if self.contest is not None:
query = query.filter(db.Participation.contest_id == self.contest.master_contest_id)
else:
contest_query = sess.query(db.Contest.master_contest_id).filter_by(round=self.round)
if self.only_region:
assert self.round
contest_query = db.filter_place_nth_parent(contest_query, db.Contest.place_id, self.round.level - self.only_region.level, self.only_region.place_id)
query = query.filter(db.Participation.contest_id.in_(contest_query.subquery()))
return query
def import_row(self, r: mo.csv.Row):
assert isinstance(r, PointsImportRow)
num_prev_errs = len(self.errors)
user_id = self.parse_user_id(r.user_id)
krestni = self.parse_name(r.krestni)
prijmeni = self.parse_name(r.prijmeni)
body = self.parse_points(r.body)
if (len(self.errors) > num_prev_errs
or user_id is None
or krestni is None
or prijmeni is None
or body is None):
return
assert self.round is not None
assert self.task is not None
task_id = self.task.task_id
sess = db.get_session()
query = self._pion_sol_query().filter(db.Participation.user_id == user_id)
pion_sols = query.all()
if not pion_sols:
if self.contest is not None:
msg = self.round.get_level().name_locative('tomto', 'této', 'tomto')
elif self.only_region is not None:
msg = self.only_region.get_level().name_locative('tomto', 'této', 'tomto')
else:
msg = 'tomto kole'
return self.error(f'Soutěžící nenalezen v {msg}')
elif len(pion_sols) > 1:
return self.error('Soutěžící v tomto kole soutěží vícekrát, neumím zpracovat')
pion, sol = pion_sols[0]
if not self.round.is_subround():
contest = pion.contest
else:
contest = sess.query(db.Contest).filter_by(round=self.round, master_contest_id=pion.contest_id).one()
rights = self.gatekeeper.rights_for_contest(contest)
if not rights.can_edit_points():
return self.error('Nemáte právo na úpravu bodů')
user = pion.user
if user.first_name != krestni or user.last_name != prijmeni:
return self.error('Neodpovídá ID a jméno soutěžícího')
if sol is None:
if body == 'X':
return
if not self.allow_add_del:
return self.error('Tento soutěžící úlohu neodevzdal')
if not rights.can_upload_solutions():
return self.error('Nemáte právo na zakládání nových řešení')
sol = db.Solution(user_id=user_id, task_id=task_id)
sess.add(sol)
logger.info(f'Import: Založeno řešení user=#{user_id} task=#{task_id}')
mo.util.log(
type=db.LogType.participant,
what=user_id,
details={'action': 'solution-created', 'task': task_id},
)
self.cnt_add_sols += 1
elif body == 'X':
if not self.allow_add_del:
return self.error('Tento soutěžící úlohu odevzdal')
if sol.final_submit is not None or sol.final_feedback is not None:
return self.error('Nelze smazat řešení, ke kterému existují odevzdané soubory')
if not rights.can_upload_solutions():
return self.error('Nemáte právo na mazání řešení')
logger.info(f'Import: Smazáno řešení user=#{user_id} task=#{task_id}')
mo.util.log(
type=db.LogType.participant,
what=user_id,
details={'action': 'solution-removed', 'task': task_id},
)
self.cnt_del_sols += 1
sess.delete(sol)
return
points = body if isinstance(body, decimal.Decimal) else None
if sol.points != points:
sol.points = points
sess.add(db.PointsHistory(
task=self.task,
participant_id=user_id,
user=self.user,
points_at=mo.now,
points=points,
))
self.cnt_set_points += 1
def get_template(self) -> str:
rows = []
for pion, sol in sorted(self._pion_sol_query().all(), key=lambda pair: pair[0].user.sort_key()):
if sol is None:
pts = 'X'
elif sol.points is None:
pts = '?'
else:
pts = format_decimal(sol.points)
user = pion.user
rows.append(PointsImportRow(
user_id=user.user_id,
krestni=user.first_name,
prijmeni=user.last_name,
body=pts,
))
out = io.StringIO()
mo.csv.write(file=out, fmt=self.fmt, row_class=self.row_class, rows=rows)
return out.getvalue()
def create_import(user: db.User,
type: ImportType,
fmt: FileFormat,
round: Optional[db.Round] = None,
contest: Optional[db.Contest] = None,
only_region: Optional[db.Place] = None,
task: Optional[db.Task] = None,
allow_add_del: bool = False):
imp: Import
if type == ImportType.participants:
imp = ContestImport()
elif type == ImportType.proctors:
imp = ProctorImport()
elif type == ImportType.judges:
imp = JudgeImport()
elif type == ImportType.points:
imp = PointsImport()
else:
assert False, "Neznámý typ importu"
imp.user = user
imp.round = round
imp.contest = contest
imp.only_region = only_region
imp.task = task
imp.allow_add_del = allow_add_del
imp.fmt = fmt
imp.gatekeeper = mo.rights.Gatekeeper(user)
imp.setup()
return imp