Skip to content
Snippets Groups Projects
Select Git revision
  • 4447138c3ee166735149e6af5268b168070595ec
  • master default
2 results

Program.cs

Blame
  • imports.py 29.69 KiB
    from dataclasses import dataclass
    import decimal
    from enum import auto
    import io
    import re
    from sqlalchemy import and_
    from sqlalchemy.orm import joinedload, Query
    from typing import List, Optional, Any, Dict, Type, Union
    
    import mo.csv
    from mo.csv import FileFormat, MissingHeaderError
    import mo.db as db
    import mo.email
    import mo.rights
    import mo.users
    import mo.util
    from mo.util import logger
    from mo.util_format import format_decimal
    
    
    class ImportType(db.MOEnum):
        participants = auto()
        orgs = auto()
        points = auto()
    
        def friendly_name(self) -> str:
            return import_type_names[self]
    
    
    import_type_names = {
        ImportType.participants.name: 'účastníci',
        ImportType.orgs.name: 'organizátoři',
        ImportType.points.name: 'body',
    }
    
    
    class Import:
        # Výsledek importu
        errors: List[str]
        warnings: List[str]
        cnt_rows: int = 0
        cnt_new_users: int = 0
        cnt_new_participants: int = 0
        cnt_new_participations: int = 0
        cnt_new_roles: int = 0
        cnt_set_points: int = 0
        cnt_add_sols: int = 0
        cnt_del_sols: int = 0
        cnt_change_user_to_org: int = 0  # pro Import orgů: Počet provedených/požadovaných změn účastnka na orga
    
        # Veřejné vlastnosti importu
        template_basename: str = "sablona"
        fmt: FileFormat
    
        # Interní: Co a jak zrovna importujeme
        user: db.User
        round: Optional[db.Round] = None
        contest: Optional[db.Contest] = None
        only_region: Optional[db.Place] = None
        task: Optional[db.Task] = None        # pro Import bodů
        default_place: Optional[db.Place] = None
        category: Optional[str] = None
        row_class: Type[mo.csv.Row]
        row_example: mo.csv.Row
        log_msg_prefix: str
        log_details: Any
        allow_change_user_to_org: bool = False  # pro Import orgů: je povoleno vyrobit orga z účastníka
    
        # Interní: Stav importu
        place_cache: Dict[str, db.Place]
        school_place_cache: Dict[str, db.Place]
        gatekeeper: mo.rights.Gatekeeper
        new_user_ids: List[int]
        line_number: int = 0
        row_name: Optional[str] = None
    
        def __init__(self, user: db.User):
            self.errors = []
            self.warnings = []
            self.place_cache = {}
            self.school_place_cache = {}
            self.new_user_ids = []
            self.gatekeeper = mo.rights.Gatekeeper(user)
            self.user = user
    
        def setup(self):
            # Definováno odvozenými třídami
            assert NotImplementedError()
    
        def error(self, msg: str) -> Any:
            if self.line_number > 0:
                if self.row_name:
                    msg = f"Řádek {self.line_number} ({self.row_name}): {msg}"
                else:
                    msg = f"Řádek {self.line_number}: {msg}"
            self.errors.append(msg)
            logger.info('Import: >> %s', msg)
            return None     # Kdyby bylo otypováno správně jako -> None, při volání by si mypy stěžoval
    
        def parse_user_id(self, user_id_str: str) -> Optional[int]:
            if user_id_str == "":
                return self.error('Chybí ID uživatele')
    
            try:
                return int(user_id_str)
            except ValueError:
                return self.error('ID uživatele není číslo')
    
        def parse_email(self, email: str) -> Optional[str]:
            if email == "":
                return self.error('Chybí e-mailová adresa')
    
            try:
                return mo.users.normalize_email(email)
            except mo.CheckError as e:
                return self.error(str(e))
    
        def parse_name(self, name: str) -> Optional[str]:
            if name == "":
                return self.error('Jméno nesmí být prázdné')
    
            # XXX: Tato kontrola úmyslně není striktní, aby prošla i jména jako 'de Beer'
            if name == name.lower():
                return self.error('Ve jméně nejsou velká písmena')
    
            if name == name.upper():
                return self.error('Ve jméně nejsou malá písmena')
    
            return name
    
        def check_rights(self, round: db.Round, place: db.Place) -> bool:
            assert round is not None
            rights = self.gatekeeper.rights_for(place, round.year, round.category, round.seq)
            return rights.have_right(mo.rights.Right.manage_contest)
    
        def parse_opt_place(self, kod: str, what: str) -> Optional[db.Place]:
            if kod == "":
                return None
    
            if kod in self.place_cache:
                return self.place_cache[kod]
    
            place = db.get_place_by_code(kod)
            if not place:
                return self.error(f'{what.title()} s kódem "{kod}" neexistuje'+
                        ('. Nechybí vám # na začátku?' if re.fullmatch(r'\d+', kod) else ''))
    
            self.place_cache[kod] = place
            return place
    
        def parse_role(self, name):
            if name not in db.RoleType.__members__:
                return self.error(f"Role {name} neexistuje. Podívejte se do manuálu na existující role.")
    
            return db.RoleType[name]
    
        def parse_school(self, kod: str) -> Optional[db.Place]:
            if kod in self.school_place_cache:
                return self.school_place_cache[kod]
    
            try:
                place = mo.users.validate_and_find_school(kod)
            except mo.CheckError as e:
                return self.error(str(e))
    
            self.school_place_cache[kod] = place
    
            return place
    
        def parse_grade(self, rocnik: str, school: Optional[db.School]) -> Optional[str]:
            if not school:
                return None
    
            # Ve snaze zabránit Excelu v interpretování ročníku jako kalendářního data
            # lidé připisují všechny možné i nemožné znaky, které vypadají jako apostrof :)
            rocnik = re.sub('[\'"\u00b4\u2019]', "", rocnik)
    
            try:
                return mo.users.normalize_grade(rocnik, school)
            except mo.CheckError as e:
                return self.error(str(e))
    
        def parse_born(self, rok: str) -> Optional[int]:
            if not re.fullmatch(r'\d{4}', rok):
                return self.error('Rok narození musí být čtyřciferné číslo')
    
            r = int(rok)
    
            try:
                mo.users.validate_born_year(r)
            except mo.CheckError as e:
                return self.error(str(e))
    
            return r
    
        def parse_category(self, kategorie: Optional[str]) -> Optional[str]:
            return kategorie
    
        def parse_round(self, year: Optional[int], category: Optional[str], seq: str) -> Optional[db.Round]:
            if not year:
                return self.error('Neuveden ročník pro nalezení kola')
            if not category:
                return self.error('Neuvedena kategorie pro nalezení kola')
            r = (
                db.get_session().query(db.Round)
                .filter_by(year=year, category=category, seq=seq)
                .one_or_none()
            )
            if r is None:
                return self.error(f'Kolo {year}-{category}-{seq} nenalezeno')
            return r
    
        def find_or_create_user(self, email: str, krestni: Optional[str], prijmeni: Optional[str], is_org: bool) -> Optional[db.User]:
            try:
                try:
                    user, is_new, is_user_to_org = mo.users.find_or_create_user(email, krestni, prijmeni, is_org, allow_change_user_to_org=self.allow_change_user_to_org, reason='import')
                    self.cnt_change_user_to_org += is_user_to_org
                except mo.users.CheckErrorOrgIsUser as e:
                    self.cnt_change_user_to_org += 1
                    raise mo.CheckError(str(e) + " Změnu můžete povolit ve formuláři.")
            except mo.CheckError as e:
                return self.error(str(e))
            if is_new:
                self.cnt_new_users += 1
                self.new_user_ids.append(user.user_id)
            return user
    
        def parse_points(self, points_str: str) -> Union[decimal.Decimal, str, None]:
            if points_str == "":
                return self.error('Body musí být vyplněny')
    
            points_str = points_str.upper()
            if points_str in ['X', '?']:
                return points_str
    
            pts, error = mo.util.parse_points(points_str, self.task, self.round)
            if error:
                return self.error(error)
    
            return pts
    
        def find_or_create_participant(self, user: db.User, year: int, school_id: Optional[int], birth_year: Optional[int], grade: Optional[str]) -> Optional[db.Participant]:
            try:
                part, is_new = mo.users.find_or_create_participant(user, year, school_id, birth_year, grade, reason='import')
            except mo.CheckError as e:
                return self.error(str(e))
            if is_new:
                self.cnt_new_participants += 1
            return part
    
        def find_or_create_participation(self, user: db.User, contest: db.Contest, place: Optional[db.Place]) -> Optional[db.Participation]:
            try:
                pion, is_new = mo.users.find_or_create_participation(user, contest, place, reason='import')
            except mo.CheckError as e:
                return self.error(str(e))
            if is_new:
                self.cnt_new_participations += 1
            return pion
    
        def place_is_allowed(self, place: db.Place) -> bool:
            if self.contest is not None and self.contest.place_id != place.place_id:
                return False
            if self.only_region is not None and not self.gatekeeper.is_ancestor_of(self.only_region, place):
                return False
            return True
    
        def obtain_contest(self, round: db.Round, oblast: Optional[db.Place], allow_none: bool = False) -> Optional[db.Contest]:
            contest: Optional[db.Contest]
            if oblast is not None and not self.place_is_allowed(oblast):
                return self.error('Oblast neodpovídá té, do které se importuje')
            if self.contest:
                contest = self.contest
            else:
                # Zde mluvíme o oblastech, místo abychom používali place_levels,
                # protože sloupec má ve jménu oblast a také je potřeba rozlišovat školu
                # účastníka a školu jako oblast.
                if oblast is None:
                    if not allow_none:
                        self.error('Je nutné uvést kód oblasti')
                    return None
                contest = db.get_session().query(db.Contest).filter_by(round=round, place=oblast).one_or_none()
                if contest is None:
                    return self.error('V uvedené oblasti toto kolo neprobíhá')
    
            return contest
    
        def add_role(self, user: db.User, role: db.RoleType, place: Optional[db.Place], year: Optional[int], category: Optional[str], round: Optional[db.Round]):
            sess = db.get_session()
    
            if year and round and round.year != year:
                return self.error('Ročník neodpovídá zadanému kolu.')
            if category and round and round.category != category:
                return self.error('Kategorie neodpovídá zadanému kolu.')
            seq = None
            if round:
                category = round.category
                year = round.year
                seq = round.seq
    
            if (sess.query(db.UserRole)
                    .filter_by(user=user, place=place, role=role,
                               category=category, year=year, seq=seq)
                    .with_for_update()
                    .first()):
                pass
            else:
                ur = db.UserRole(user=user, place=place, role=role,
                                 category=category, year=year, seq=seq,
                                 assigned_by_user=self.user)
    
                if not self.gatekeeper.can_set_role(ur):
                    return self.error('Roli "{new_role}" nelze přidělit, není podmnožinou žádné vaší role')
    
                sess.add(ur)
                sess.flush()
                logger.info(f'Import: {role.name.title()} user=#{user.user_id} place=#{ place.place_id if place else "null" } user_role=#{ur.user_role_id}')
                mo.util.log(
                    type=db.LogType.user_role,
                    what=ur.user_role_id,
                    details={'action': 'import', 'new': db.row2dict(ur)},
                )
                self.cnt_new_roles += 1
    
        def import_row(self, r: mo.csv.Row):
            # Definováno odvozenými třídami
            assert NotImplementedError()
    
        def log_start(self, path):
            args = [f'user=#{self.user.user_id}', f'fmt={self.fmt.name}']
            if self.round is not None:
                args.append(f'round=#{self.round.round_id}')
            if self.contest is not None:
                args.append(f'contest=#{self.contest.contest_id}')
            if self.only_region is not None:
                args.append(f'region=#{self.only_region.place_id}')
            if self.task is not None:
                args.append(f'task=#{self.task.task_id}')
    
            logger.info('Import: %s ze souboru %s: %s', self.log_msg_prefix, path, " ".join(args))
    
        def log_end(self):
            args = [f'rows=#{self.cnt_rows}']
            for key, val in [
                ('users', self.cnt_new_users),
                ('p-ants', self.cnt_new_participants),
                ('p-ions', self.cnt_new_participations),
                ('roles', self.cnt_new_roles),
                ('points', self.cnt_set_points),
                ('add-sols', self.cnt_add_sols),
                ('del-sols', self.cnt_del_sols),
            ]:
                if val > 0:
                    args.append(f'{key}={val}')
            logger.info('Import: Hotovo (%s)', " ".join(args))
    
            details = self.log_details.copy()
            if self.only_region:
                details['region'] = self.only_region.place_id
    
            if self.contest is not None:
                mo.util.log(
                    type=db.LogType.contest,
                    what=self.contest.contest_id,
                    details=details,
                )
            elif self.round is not None:
                mo.util.log(
                    type=db.LogType.round,
                    what=self.round.round_id,
                    details=details,
                )
            else:
                pass
                # TODO
    
        def check_utf8(self, path: str) -> bool:
            # Není pěkné, že soubory čteme dvakrát, ale ve srovnání s pasekou, kterou by
            # napáchal import ve špatném kódování, je to maličkost.
            try:
                with open(path, encoding='utf-8') as f:
                    for _ in f:
                        pass
                return True
            except UnicodeDecodeError:
                return False
    
        def get_row_name(self, row: mo.csv.Row) -> Optional[str]:
            if hasattr(row, 'email'):
                return row.email # type: ignore
                    # čtení prvku potomka
            return None
    
        def generic_import(self, path: str) -> bool:
            charset = self.fmt.get_charset()
            if charset != 'utf-8' and self.check_utf8(path):
                logger.info('Import: Uhodnuto kódování utf-8')
                charset = 'utf-8'
            try:
                with open(path, encoding=charset) as file:
                    try:
                        rows: List[mo.csv.Row]
                        rows, warnings = mo.csv.read(file=file, fmt=self.fmt, row_class=self.row_class)
                        self.warnings += warnings
                    except MissingHeaderError:
                        return self.error('Souboru chybí první řádek s názvy sloupců')
            except UnicodeDecodeError:
                return self.error(f'Soubor není v kódování {self.fmt.get_charset()}')
            except Exception as e:
                return self.error(f'Chybná struktura tabulky: {e}')
    
            self.line_number = 2
            for row in rows:
                self.row_name = self.get_row_name(row)
                self.cnt_rows += 1
                self.import_row(row)
                if len(self.errors) >= 100:
                    self.errors.append('Import přerušen pro příliš mnoho chyb')
                    break
                self.line_number += 1
            self.row_name = None
    
            return len(self.errors) == 0
    
        def notify_users(self):
            # Projde všechny uživatele a těm, kteří ještě nemají nastavené heslo,
            # ani nepožádali o jeho reset, pošle mail s odkazem na reset. Každého
            # uživatele zpracováváme ve zvlášť transakci, aby se po chybě neztratila
            # informace o tom, že už jsme nějaké maily rozeslali.
    
            sess = db.get_session()
            for uid in self.new_user_ids:
                u = sess.query(db.User).get(uid)
                if u and not u.password_hash and not u.reset_at:
                    token = mo.users.make_activation_token(u)
                    sess.commit()
                    mo.email.send_new_account_email(u, token)
                else:
                    sess.rollback()
    
        def get_template(self) -> str:
            # Odvozené třídy mohou přetížit
            out = io.StringIO()
            mo.csv.write(file=out, fmt=self.fmt, row_class=self.row_class, rows=[self.row_example])
            return out.getvalue()
    
        def run(self, path: str) -> bool:
            self.log_start(path)
    
            if not self.generic_import(path):
                logger.info('Import: Rollback')
                db.get_session().rollback()
                return False
    
            self.log_end()
            db.get_session().commit()
            self.notify_users()
            return True
    
        def get_after_import_message(self):
            return "Import proveden."
    
    
    @dataclass
    class ContestImportRow(mo.csv.Row):
        email: str = ""
        krestni: str = ""
        prijmeni: str = ""
        kod_skoly: str = ""
        rocnik: str = ""
        rok_naroz: str = ""
        kod_mista: str = ""
        kod_oblasti: str = ""
    
    
    class ContestImport(Import):
        row_class = ContestImportRow
        row_example = ContestImportRow(
            email="nekdo@example.org",
            krestni="Pokusný",
            prijmeni="Králík",
            kod_skoly="#3333",
            rocnik="1/8",
            rok_naroz="2000",
        )
        log_msg_prefix = 'Účastníci'
        log_details = {'action': 'import'}
        template_basename = 'sablona-ucast'
    
        def __init__(
             self,
             user: db.User,
             round: db.Round,
             contest: Optional[db.Contest] = None,
             only_region: Optional[db.Place] = None,
             default_place: Optional[db.Place] = None
        ):
            super().__init__(user)
            self.user = user
            self.round = round
            self.contest = contest
            self.only_region = only_region
            self.default_place = default_place
            self.setup()
            assert self.round is not None
            assert not self.round.is_subround()
    
        def import_row(self, r: mo.csv.Row):
            assert self.round
            assert isinstance(r, ContestImportRow)
            num_prev_errs = len(self.errors)
            email = self.parse_email(r.email)
            krestni = self.parse_name(r.krestni) if r.krestni else None
            prijmeni = self.parse_name(r.prijmeni) if r.prijmeni else None
            school_place = self.parse_school(r.kod_skoly) if r.kod_skoly else None
            rocnik = self.parse_grade(r.rocnik, (school_place.school if school_place else None)) if r.rocnik else None
            rok_naroz = self.parse_born(r.rok_naroz) if r.rok_naroz else None
            misto = self.parse_opt_place(r.kod_mista, 'místo')
            if misto and not self.check_rights(self.round, misto):
                return self.error(f'Nemáte práva na správu soutěže {misto.name_locative()}')
    
            oblast = self.parse_opt_place(r.kod_oblasti, 'oblast')
            if oblast and not self.check_rights(self.round, oblast):
                return self.error(f'Nemáte práva na správu soutěže {oblast.name_locative()}')
    
            if (len(self.errors) > num_prev_errs
                    or email is None):
                return
    
            user = self.find_or_create_user(email, krestni, prijmeni, is_org=False)
            if user is None:
                return
    
            part = self.find_or_create_participant(user, self.round.year, school_place.place_id if school_place else None, rok_naroz, rocnik)
            if part is None:
                return
    
            contest = self.obtain_contest(self.round, oblast)
            if contest is None:
                return
    
            self.find_or_create_participation(user, contest, misto)
    
        def get_after_import_message(self):
            return f'Importováno ({self.cnt_rows} řádků, založeno {self.cnt_new_users} uživatelů, {self.cnt_new_participations} účastí, {self.cnt_new_roles} rolí)'
    
    
    @dataclass
    class OrgsImportRow(mo.csv.Row):
        email: str = ""
        krestni: str = ""
        prijmeni: str = ""
        kod_oblasti: str = ""
        role: str = ""
    
    
    class OrgsImport(Import):
        row_class = OrgsImportRow
        row_example = OrgsImportRow(
            email='nekdo@example.org',
            krestni='Pokusný',
            prijmeni='Králík',
            kod_oblasti='#3333',
            role='dozor',
        )
        log_msg_prefix = 'Organizátoři'
        log_details = {'action': 'import-orgs'}
        template_basename = 'sablona-organizatori'
    
        default_cat: Optional[str]
        default_seq: Optional[str]
    
        def __init__(
            self,
            user: db.User,
            round: Optional[db.Round] = None,  # Prázdné může být pouze když se jedná o GlobalOrgsImport
            contest: Optional[db.Contest] = None,
            only_region: Optional[db.Place] = None,
            allow_change_user_to_org: bool = False,
            default_place: Optional[db.Place] = None,
            default_cat: Optional[str] = None,
            default_seq: Optional[str] = None,
            year: Optional[int] = None
        ):
            super().__init__(user)
            self.round = round
            self.contest = contest
            self.only_region = only_region
            self.default_place = default_place
            self.default_cat = self.parse_category(default_cat)
            self.default_seq = default_seq
            self.default_place = default_place
            self.setup()
            self.allow_change_user_to_org = allow_change_user_to_org
            self.root_place = db.get_root_place()
            self.year = round.year if round else year
            assert hasattr(self.row_class, "kolo") or self.round
    
        def import_row(self, r: mo.csv.Row):
            assert isinstance(r, OrgsImportRow) or isinstance(r, GlobalOrgsImportRow)
            num_prev_errs = len(self.errors)
            email = self.parse_email(r.email)
            krestni = self.parse_name(r.krestni)
            prijmeni = self.parse_name(r.prijmeni)
            role = self.parse_role(r.role)
            if getattr(r, "kategorie", ""):
                kategorie = self.parse_category(getattr(r, "kategorie"))
            else:
                kategorie = self.default_cat
            seq = getattr(r, "kolo", "") or self.default_seq
            if seq:
                kolo = self.parse_round(self.year, kategorie, seq)
                if not kolo:
                    return
            else:
                kolo=self.round
    
            oblast = self.parse_opt_place(r.kod_oblasti, 'oblast')
            if oblast is None:
                oblast = self.default_place
            if oblast is None:
                return self.error('Chybí oblast.')
    
            if (len(self.errors) > num_prev_errs
                    or email is None
                    or krestni is None
                    or prijmeni is None):
                return
    
            user = self.find_or_create_user(email, krestni, prijmeni, is_org=True)
            if user is None:
                return
    
            self.add_role(user, role, oblast, self.year, kategorie, kolo)
    
        def get_after_import_message(self):
            return f'Importováno ({self.cnt_rows} řádků, založeno {self.cnt_new_users} uživatelů, {self.cnt_new_participations} účastí, {self.cnt_new_roles} rolí)'
    
    
    @dataclass
    class GlobalOrgsImportRow(OrgsImportRow):
        kategorie: str = ""
        kolo: str = ""
    
    
    class GlobalOrgsImport(OrgsImport):
        row_class = GlobalOrgsImportRow
        row_example = GlobalOrgsImportRow(
            email='nekdo@example.org',
            krestni='Pokusný',
            prijmeni='Králík',
            kod_oblasti='#3333',
            role='dozor',
            kategorie='Z',
            kolo='',
        )
        log_msg_prefix = 'Organizátoři'
        log_details = {'action': 'import-orgs'}
        template_basename = 'sablona-organizatori'
    
        def __init__(
            self,
            user: db.User,
            **kvargs
        ):
            super().__init__(user, **kvargs)
    
        def setup(self):
            self.root_place = db.get_root_place()
    
    
    @dataclass
    class PointsImportRow(mo.csv.Row):
        user_id: str = ""
        krestni: str = ""
        prijmeni: str = ""
        body: str = ""
    
    
    class PointsImport(Import):
        row_class = PointsImportRow
        log_msg_prefix = 'Body'
    
        allow_add_del: bool             # je povoleno zakládat/mazat řešení
    
        def __init__(
            self,
            user: db.User,
            round: db.Round,
            task: db.Task,
            contest: Optional[db.Contest] = None,
            only_region: Optional[db.Place] = None,
            allow_add_del: bool = False,
        ):
            super().__init__(user)
            self.round = round
            self.contest = contest
            self.task = task
            self.only_region = only_region
            self.allow_add_del = allow_add_del
            assert self.round is not None
            assert self.task is not None
            self.log_details = {'action': 'import-points', 'task': self.task.code}
            self.template_basename = 'body-' + self.task.code
    
        def _pion_sol_query(self) -> Query:
            sess = db.get_session()
            query = (sess.query(db.Participation, db.Solution)
                     .select_from(db.Participation)
                     .outerjoin(db.Solution, and_(db.Solution.user_id == db.Participation.user_id, db.Solution.task == self.task))
                     .options(joinedload(db.Participation.user)))
    
            if self.contest is not None:
                query = query.filter(db.Participation.contest_id == self.contest.master_contest_id)
            else:
                contest_query = sess.query(db.Contest.master_contest_id).filter_by(round=self.round)
                if self.only_region:
                    assert self.round
                    contest_query = db.filter_place_nth_parent(contest_query, db.Contest.place_id, self.round.level - self.only_region.level, self.only_region.place_id)
                query = query.filter(db.Participation.contest_id.in_(contest_query.subquery()))
    
            return query
    
        def import_row(self, r: mo.csv.Row):
            assert isinstance(r, PointsImportRow)
            num_prev_errs = len(self.errors)
            user_id = self.parse_user_id(r.user_id)
            krestni = self.parse_name(r.krestni)
            prijmeni = self.parse_name(r.prijmeni)
            body = self.parse_points(r.body)
    
            if (len(self.errors) > num_prev_errs
                    or user_id is None
                    or krestni is None
                    or prijmeni is None
                    or body is None):
                return
    
            assert self.round is not None
            assert self.task is not None
            task_id = self.task.task_id
    
            sess = db.get_session()
            query = self._pion_sol_query().filter(db.Participation.user_id == user_id)
            pion_sols = query.all()
            if not pion_sols:
                if self.contest is not None:
                    msg = self.round.get_level().name_locative('tomto', 'této', 'tomto')
                elif self.only_region is not None:
                    msg = self.only_region.get_level().name_locative('tomto', 'této', 'tomto')
                else:
                    msg = 'tomto kole'
                return self.error(f'Soutěžící nenalezen v {msg}')
            elif len(pion_sols) > 1:
                return self.error('Soutěžící v tomto kole soutěží vícekrát, neumím zpracovat')
            pion, sol = pion_sols[0]
    
            if not self.round.is_subround():
                contest = pion.contest
            else:
                contest = sess.query(db.Contest).filter_by(round=self.round, master_contest_id=pion.contest_id).one()
    
            rights = self.gatekeeper.rights_for_contest(contest)
            if not rights.can_edit_points():
                return self.error('Nemáte právo na úpravu bodů')
    
            user = pion.user
            if user.first_name != krestni or user.last_name != prijmeni:
                return self.error('Neodpovídá ID a jméno soutěžícího')
    
            if sol is None:
                if body == 'X':
                    return
                if not self.allow_add_del:
                    return self.error('Tento soutěžící úlohu neodevzdal')
                if not rights.can_upload_solutions():
                    return self.error('Nemáte právo na zakládání nových řešení')
                sol = db.Solution(user_id=user_id, task_id=task_id)
                sess.add(sol)
                logger.info(f'Import: Založeno řešení user=#{user_id} task=#{task_id}')
                mo.util.log(
                    type=db.LogType.participant,
                    what=user_id,
                    details={'action': 'solution-created', 'task': task_id},
                )
                self.cnt_add_sols += 1
            elif body == 'X':
                if not self.allow_add_del:
                    return self.error('Tento soutěžící úlohu odevzdal')
                if sol.final_submit is not None or sol.final_feedback is not None:
                    return self.error('Nelze smazat řešení, ke kterému existují odevzdané soubory')
                if not rights.can_upload_solutions():
                    return self.error('Nemáte právo na mazání řešení')
                logger.info(f'Import: Smazáno řešení user=#{user_id} task=#{task_id}')
                mo.util.log(
                    type=db.LogType.participant,
                    what=user_id,
                    details={'action': 'solution-removed', 'task': task_id},
                )
                self.cnt_del_sols += 1
                sess.delete(sol)
                return
    
            points = body if isinstance(body, decimal.Decimal) else None
            if sol.points != points:
                sol.points = points
                sess.add(db.PointsHistory(
                    task=self.task,
                    participant_id=user_id,
                    user=self.user,
                    points_at=mo.now,
                    points=points,
                ))
                self.cnt_set_points += 1
    
        def get_template(self) -> str:
            rows = []
            for pion, sol in sorted(self._pion_sol_query().all(), key=lambda pair: pair[0].user.sort_key()):
                if sol is None:
                    pts = 'X'
                elif sol.points is None:
                    pts = '?'
                else:
                    pts = format_decimal(sol.points)
                user = pion.user
                rows.append(PointsImportRow(
                    user_id=user.user_id,
                    krestni=user.first_name,
                    prijmeni=user.last_name,
                    body=pts,
                ))
    
            out = io.StringIO()
            mo.csv.write(file=out, fmt=self.fmt, row_class=self.row_class, rows=rows)
            return out.getvalue()
    
        def get_after_import_message(self):
            return f'Importováno ({self.cnt_rows} řádků, {self.cnt_set_points} řešení přebodováno, {self.cnt_add_sols} založeno a {self.cnt_del_sols} smazáno)'