Skip to content
Snippets Groups Projects
Select Git revision
  • dc065fd65e8b235c995ab573b55d0bff47af3205
  • master default protected
2 results

Makefile

Blame
  • imports.py 27.22 KiB
    from dataclasses import dataclass
    import decimal
    from enum import auto
    import io
    import re
    from sqlalchemy import and_
    from sqlalchemy.orm import joinedload, Query
    from typing import List, Optional, Any, Dict, Type, Union
    
    import mo.csv
    from mo.csv import FileFormat, MissingHeaderError
    import mo.db as db
    import mo.rights
    import mo.users
    import mo.util
    from mo.util import logger
    from mo.util_format import format_decimal
    
    
    class ImportType(db.MOEnum):
        participants = auto()
        proctors = auto()
        judges = auto()
        points = auto()
    
        def friendly_name(self) -> str:
            return import_type_names[self]
    
    
    import_type_names = {
        ImportType.participants.name: 'účastníci',
        ImportType.proctors.name: 'dozor',
        ImportType.judges.name: 'opravovatelé',
        ImportType.points.name: 'body',
    }
    
    
    class Import:
        # Výsledek importu
        errors: List[str]
        warnings: List[str]
        cnt_rows: int = 0
        cnt_new_users: int = 0
        cnt_new_participants: int = 0
        cnt_new_participations: int = 0
        cnt_new_roles: int = 0
        cnt_set_points: int = 0
        cnt_add_sols: int = 0
        cnt_del_sols: int = 0
    
        # Veřejné vlastnosti importu
        template_basename: str = "sablona"
    
        # Interní: Co a jak zrovna importujeme
        user: db.User
        round: Optional[db.Round]
        contest: Optional[db.Contest]
        task: Optional[db.Task]         # pro Import bodů
        allow_add_del: bool             # pro Import bodů: je povoleno zakládat/mazat řešení
        fmt: FileFormat
        row_class: Type[mo.csv.Row]
        row_example: mo.csv.Row
        log_msg_prefix: str
        log_details: Any
    
        # Interní: Stav importu
        place_cache: Dict[str, db.Place]
        school_place_cache: Dict[str, db.Place]
        gatekeeper: mo.rights.Gatekeeper
        new_user_ids: List[int]
        line_number: int = 0
    
        def __init__(self):
            self.errors = []
            self.warnings = []
            self.rr = None
            self.place_cache = {}
            self.school_place_cache = {}
            self.new_user_ids = []
    
        def setup(self):
            # Definováno odvozenými třídami
            assert NotImplementedError()
    
        def error(self, msg: str) -> Any:
            if self.line_number > 0:
                msg = f"Řádek {self.line_number}: {msg}"
            self.errors.append(msg)
            logger.info('Import: >> %s', msg)
            return None     # Kdyby bylo otypováno správně jako -> None, při volání by si mypy stěžoval
    
        def parse_user_id(self, user_id_str: str) -> Optional[int]:
            if user_id_str == "":
                return self.error('Chybí ID uživatele')
    
            try:
                return int(user_id_str)
            except ValueError:
                return self.error('ID uživatele není číslo')
    
        def parse_email(self, email: str) -> Optional[str]:
            if email == "":
                return self.error('Chybí e-mailová adresa')
    
            try:
                return mo.users.normalize_email(email)
            except mo.CheckError as e:
                return self.error(str(e))
    
        def parse_name(self, name: str) -> Optional[str]:
            if name == "":
                return self.error('Jméno nesmí být prázdné')
    
            # XXX: Tato kontrola úmyslně není striktní, aby prošla i jména jako 'de Beer'
            if name == name.lower():
                return self.error('Ve jméně nejsou velká písmena')
    
            if name == name.upper():
                return self.error('Ve jméně nejsou malá písmena')
    
            return name
    
        def check_rights(self, place: db.Place) -> bool:
            round = self.round
            assert round is not None
            rights = self.gatekeeper.rights_for(place, round.year, round.category, round.seq)
            return rights.have_right(mo.rights.Right.manage_contest)
    
        def parse_opt_place(self, kod: str, what: str) -> Optional[db.Place]:
            if kod == "":
                return None
    
            if kod in self.place_cache:
                return self.place_cache[kod]
    
            place = db.get_place_by_code(kod)
            if not place:
                return self.error(f'{what.title()} s kódem "{kod}" neexistuje'+
                        ('. Nechybí vám # na začátku?' if re.fullmatch(r'\d+', kod) else ''))
    
            if not self.check_rights(place):
                return self.error(f'Nemáte práva na správu soutěže {place.name_locative()}')
    
            self.place_cache[kod] = place
            return place
    
        def parse_school(self, kod: str) -> Optional[db.Place]:
            if kod == "":
                return self.error('Škola je povinná')
    
            if kod in self.school_place_cache:
                return self.school_place_cache[kod]
    
            place = db.get_place_by_code(kod, fetch_school=True)
            if not place:
                return self.error(f'Škola s kódem "{kod}" nenalezena'+
                        ('. Nechybí vám # na začátku?' if re.fullmatch(r'\d+', kod) else ''))
    
            if place.type != db.PlaceType.school:
                return self.error(f'Kód školy "{kod}" neodpovídá škole')
    
            self.school_place_cache[kod] = place
            return place
    
        def parse_grade(self, rocnik: str, school: Optional[db.School]) -> Optional[str]:
            if not school:
                return None
    
            # Ve snaze zabránit Excelu v interpretování ročníku jako kalendářního data
            # lidé připisují všechny možné i nemožné znaky, které vypadají jako apostrof :)
            rocnik = re.sub('[\'"\u00b4\u2019]', "", rocnik)
    
            if (not re.fullmatch(r'\d(/\d)?', rocnik)):
                return self.error(f'Ročník má neplatný formát, musí to být buď číslice, nebo číslice/číslice')
    
            if (not school.is_zs and re.fullmatch(r'\d', rocnik)):
                return self.error(f'Ročník pro střední školu ({school.place.name}) zapisujte ve formátu číslice/číslice')
    
            if (not school.is_ss and re.fullmatch(r'\d/\d', rocnik)):
                return self.error(f'Ročník pro základní školu ({school.place.name}) zapisujte jako číslici 1–9')
    
            return rocnik
    
        def parse_born(self, rok: str) -> Optional[int]:
            if not re.fullmatch(r'\d{4}', rok):
                return self.error('Rok narození musí být čtyřciferné číslo')
    
            r = int(rok)
            if r < 2000 or r > 2099:
                return self.error('Rok narození musí být v intervalu [2000,2099]')
    
            return r
    
        def find_or_create_user(self, email: str, krestni: str, prijmeni: str, is_org: bool) -> Optional[db.User]:
            sess = db.get_session()
            user = sess.query(db.User).filter_by(email=email).one_or_none()
            if user:
                if user.first_name != krestni or user.last_name != prijmeni:
                    return self.error(f'Osoba již registrována s odlišným jménem {user.full_name()}')
                if (user.is_admin or user.is_org) != is_org:
                    if is_org:
                        return self.error('Nelze předefinovat účastníka na organizátora')
                    else:
                        return self.error('Nelze předefinovat organizátora na účastníka')
            else:
                user = db.User(email=email, first_name=krestni, last_name=prijmeni, is_org=is_org)
                sess.add(user)
                sess.flush()    # Aby uživatel dostal user_id
                logger.info(f'Import: Založen uživatel user=#{user.user_id} email=<{user.email}>')
                mo.util.log(
                    type=db.LogType.user,
                    what=user.user_id,
                    details={'action': 'import', 'new': db.row2dict(user)},
                )
                self.cnt_new_users += 1
                self.new_user_ids.append(user.user_id)
            return user
    
        def parse_points(self, points_str: str) -> Union[decimal.Decimal, str, None]:
            if points_str == "":
                return self.error('Body musí být vyplněny')
    
            points_str = points_str.upper()
            if points_str in ['X', '?']:
                return points_str
    
            pts, error = mo.util.parse_points(points_str, self.task, self.round)
            if error:
                return self.error(error)
    
            return pts
    
        def find_or_create_participant(self, user: db.User, year: int, school_id: int, birth_year: int, grade: str) -> Optional[db.Participant]:
            sess = db.get_session()
            part = sess.query(db.Participant).get((user.user_id, year))
            if part:
                if (part.school != school_id
                        or part.grade != grade
                        or part.birth_year != birth_year):
                    return self.error('Účastník již zaregistrován s odlišnou školou/ročníkem/rokem narození')
            else:
                part = db.Participant(user=user, year=year, school=school_id, birth_year=birth_year, grade=grade)
                sess.add(part)
                logger.info(f'Import: Založen účastník #{user.user_id}')
                mo.util.log(
                    type=db.LogType.participant,
                    what=user.user_id,
                    details={'action': 'import', 'new': db.row2dict(part)},
                )
                self.cnt_new_participants += 1
    
            return part
    
        def find_or_create_participation(self, user: db.User, contest: db.Contest, place: Optional[db.Place]) -> Optional[db.Participation]:
            if place is None:
                place = contest.place
    
            sess = db.get_session()
            pions = (sess.query(db.Participation)
                     .filter_by(user=user)
                     .filter(db.Participation.contest.has(db.Contest.round == contest.round))
                     .all())
    
            if not pions:
                pion = db.Participation(user=user, contest=contest, place_id=place.place_id, state=db.PartState.invited)
                sess.add(pion)
                logger.info(f'Import: Založena účast user=#{user.user_id} contest=#{contest.contest_id} place=#{place.place_id}')
                mo.util.log(
                    type=db.LogType.participant,
                    what=user.user_id,
                    details={'action': 'add-to-contest', 'new': db.row2dict(pion)},
                )
                self.cnt_new_participations += 1
            elif len(pions) == 1:
                pion = pions[0]
                if pion.place != place:
                    return self.error(f'Již se tohoto kola účastní v {contest.round.get_level().name_locative("jiném", "jiné", "jiném")} ({pion.place.get_code()})')
            else:
                return self.error('Již se tohoto kola účastní ve vice oblastech, což by nemělo být možné')
    
            return pion
    
        def obtain_contest(self, oblast: Optional[db.Place], allow_none: bool = False):
            if self.contest:
                contest = self.contest
                if oblast is not None and oblast.place_id != contest.place.place_id:
                    return self.error('Oblast neodpovídá té, do které se importuje')
            else:
                if oblast is None:
                    if not allow_none:
                        self.error('Je nutné uvést ' + self.round.get_level().name)
                    return None
                contest = db.get_session().query(db.Contest).filter_by(round=self.round, place=oblast).one_or_none()
                if contest is None:
                    return self.error('V ' + self.round.get_level().name_locative("uvedeném", "uvedené", "uvedeném") + ' toto kolo neprobíhá')
    
            return contest
    
        def add_role(self, user: db.User, place: db.Place, role: db.RoleType):
            sess = db.get_session()
            round = self.round
            assert round is not None
            if (sess.query(db.UserRole)
                    .filter_by(user=user, place=place, role=role,
                               category=round.category, year=round.year, seq=round.seq)
                    .with_for_update()
                    .first()):
                pass
            else:
                ur = db.UserRole(user=user, place=place, role=role,
                                 category=round.category, year=round.year, seq=round.seq,
                                 assigned_by_user=self.user)
                sess.add(ur)
                sess.flush()
                logger.info(f'Import: {role.name.title()} user=#{user.user_id} place=#{place.place_id} user_role=#{ur.user_role_id}')
                mo.util.log(
                    type=db.LogType.user_role,
                    what=ur.user_role_id,
                    details={'action': 'import', 'new': db.row2dict(ur)},
                )
                self.cnt_new_roles += 1
    
        def import_row(self, r: mo.csv.Row):
            # Definováno odvozenými třídami
            assert NotImplementedError()
    
        def log_start(self, path):
            args = [f'user=#{self.user.user_id}', f'fmt={self.fmt.name}']
            if self.round is not None:
                args.append(f'round=#{self.round.round_id}')
            if self.contest is not None:
                args.append(f'contest=#{self.contest.contest_id}')
            if self.task is not None:
                args.append(f'task=#{self.task.task_id}')
    
            logger.info('Import: %s ze souboru %s: %s', self.log_msg_prefix, path, " ".join(args))
    
        def log_end(self):
            args = [f'rows=#{self.cnt_rows}']
            for key, val in [
                ('users', self.cnt_new_users),
                ('p-ants', self.cnt_new_participants),
                ('p-ions', self.cnt_new_participations),
                ('roles', self.cnt_new_roles),
                ('points', self.cnt_set_points),
                ('add-sols', self.cnt_add_sols),
                ('del-sols', self.cnt_del_sols),
            ]:
                if val > 0:
                    args.append(f'{key}={val}')
            logger.info('Import: Hotovo (%s)', " ".join(args))
    
            if self.contest is not None:
                mo.util.log(
                    type=db.LogType.contest,
                    what=self.contest.contest_id,
                    details=self.log_details,
                )
            elif self.round is not None:
                mo.util.log(
                    type=db.LogType.round,
                    what=self.round.round_id,
                    details=self.log_details,
                )
            else:
                assert False
    
        def check_utf8(self, path: str) -> bool:
            # Není pěkné, že soubory čteme dvakrát, ale ve srovnání s pasekou, kterou by
            # napáchal import ve špatném kódování, je to maličkost.
            try:
                with open(path, encoding='utf-8') as f:
                    for _ in f:
                        pass
                return True
            except UnicodeDecodeError:
                return False
    
        def generic_import(self, path: str) -> bool:
            charset = self.fmt.get_charset()
            if charset != 'utf-8' and self.check_utf8(path):
                logger.info('Import: Uhodnuto kódování utf-8')
                charset = 'utf-8'
            try:
                with open(path, encoding=charset) as file:
                    try:
                        rows: List[mo.csv.Row]
                        rows, warnings = mo.csv.read(file=file, fmt=self.fmt, row_class=self.row_class)
                        self.warnings += warnings
                    except MissingHeaderError:
                        return self.error('Souboru chybí první řádek s názvy sloupců')
            except UnicodeDecodeError:
                return self.error(f'Soubor není v kódování {self.fmt.get_charset()}')
            except Exception as e:
                return self.error(f'Chybná struktura tabulky: {e}')
    
            self.line_number = 2
            for row in rows:
                self.cnt_rows += 1
                self.import_row(row)
                if len(self.errors) >= 100:
                    self.errors.append('Import přerušen pro příliš mnoho chyb')
                    break
                self.line_number += 1
    
            return len(self.errors) == 0
    
        def notify_users(self):
            # Projde všechny uživatele a těm, kteří ještě nemají nastavené heslo,
            # ani nepožádali o jeho reset, pošle mail s odkazem na reset. Každého
            # uživatele zpracováváme ve zvlášť transakci, aby se po chybě neztratila
            # informace o tom, že už jsme nějaké maily rozeslali.
    
            sess = db.get_session()
            for uid in self.new_user_ids:
                u = sess.query(db.User).get(uid)
                if u and not u.password_hash and not u.reset_at:
                    token = mo.users.ask_reset_password(u)
                    sess.commit()
                    mo.util.send_new_account_email(u, token)
                else:
                    sess.rollback()
    
        def get_template(self) -> str:
            # Odvozené třídy mohou přetížit
            out = io.StringIO()
            mo.csv.write(file=out, fmt=self.fmt, row_class=self.row_class, rows=[self.row_example])
            return out.getvalue()
    
        def run(self, path: str) -> bool:
            self.log_start(path)
    
            if not self.generic_import(path):
                logger.info('Import: Rollback')
                db.get_session().rollback()
                return False
    
            self.log_end()
            db.get_session().commit()
            self.notify_users()
            return True
    
    
    @dataclass
    class ContestImportRow(mo.csv.Row):
        email: str = ""
        krestni: str = ""
        prijmeni: str = ""
        kod_skoly: str = ""
        rocnik: str = ""
        rok_naroz: str = ""
        kod_mista: str = ""
        kod_oblasti: str = ""
    
    
    class ContestImport(Import):
        row_class = ContestImportRow
        row_example = ContestImportRow(
            email="nekdo@example.org",
            krestni="Pokusný",
            prijmeni="Králík",
            kod_skoly="#3333",
            rocnik="1/8",
            rok_naroz="2000",
        )
        log_msg_prefix = 'Účastníci'
        log_details = {'action': 'import'}
        template_basename = 'sablona-ucast'
    
        def setup(self):
            assert self.round is not None
    
        def import_row(self, r: mo.csv.Row):
            assert isinstance(r, ContestImportRow)
            num_prev_errs = len(self.errors)
            email = self.parse_email(r.email)
            krestni = self.parse_name(r.krestni)
            prijmeni = self.parse_name(r.prijmeni)
            school_place = self.parse_school(r.kod_skoly)
            rocnik = self.parse_grade(r.rocnik, (school_place.school if school_place else None))
            rok_naroz = self.parse_born(r.rok_naroz)
            misto = self.parse_opt_place(r.kod_mista, 'místo')
            oblast = self.parse_opt_place(r.kod_oblasti, 'oblast')
    
            if (len(self.errors) > num_prev_errs
                    or email is None
                    or krestni is None
                    or prijmeni is None
                    or school_place is None
                    or rocnik is None
                    or rok_naroz is None):
                return
    
            user = self.find_or_create_user(email, krestni, prijmeni, is_org=False)
            if user is None:
                return
    
            part = self.find_or_create_participant(user, mo.current_year, school_place.place_id, rok_naroz, rocnik)
            if part is None:
                return
    
            contest = self.obtain_contest(oblast)
            if contest is None:
                return
    
            self.find_or_create_participation(user, contest, misto)
    
    
    @dataclass
    class ProctorImportRow(mo.csv.Row):
        email: str = ""
        krestni: str = ""
        prijmeni: str = ""
        kod_mista: str = ""
    
    
    class ProctorImport(Import):
        row_class = ProctorImportRow
        row_example = ProctorImportRow(
            email='nekdo@example.org',
            krestni='Pokusný',
            prijmeni='Králík',
            kod_mista='#3333',
        )
        log_msg_prefix = 'Dozor'
        log_details = {'action': 'import-proctors'}
        template_basename = 'sablona-dozor'
    
        def setup(self):
            assert self.round is not None
    
        def import_row(self, r: mo.csv.Row):
            assert isinstance(r, ProctorImportRow)
            num_prev_errs = len(self.errors)
            email = self.parse_email(r.email)
            krestni = self.parse_name(r.krestni)
            prijmeni = self.parse_name(r.prijmeni)
            misto = self.parse_opt_place(r.kod_mista, 'místo')
    
            if misto is None:
                return self.error('Kód místa je povinné uvést')
    
            if (len(self.errors) > num_prev_errs
                    or email is None
                    or krestni is None
                    or prijmeni is None):
                return
    
            user = self.find_or_create_user(email, krestni, prijmeni, is_org=True)
            if user is None:
                return
    
            self.add_role(user, misto, db.RoleType.dozor)
    
    
    @dataclass
    class JudgeImportRow(mo.csv.Row):
        email: str = ""
        krestni: str = ""
        prijmeni: str = ""
        kod_oblasti: str = ""
    
    
    class JudgeImport(Import):
        row_class = JudgeImportRow
        row_example = JudgeImportRow(
            email='nekdo@example.org',
            krestni='Pokusný',
            prijmeni='Králík',
            kod_oblasti='B',
        )
        log_msg_prefix = 'Opravovatelé'
        log_details = {'action': 'import-judges'}
        template_basename = 'sablona-oprav'
        root_place: db.Place
    
        def setup(self):
            assert self.round is not None
            self.root_place = db.get_root_place()
    
        def import_row(self, r: mo.csv.Row):
            assert isinstance(r, JudgeImportRow)
            num_prev_errs = len(self.errors)
            email = self.parse_email(r.email)
            krestni = self.parse_name(r.krestni)
            prijmeni = self.parse_name(r.prijmeni)
            oblast = self.parse_opt_place(r.kod_oblasti, 'oblast')
    
            if (len(self.errors) > num_prev_errs
                    or email is None
                    or krestni is None
                    or prijmeni is None):
                return
    
            user = self.find_or_create_user(email, krestni, prijmeni, is_org=True)
            if user is None:
                return
    
            contest = self.obtain_contest(oblast, allow_none=True)
            place = contest.place if contest else self.root_place
            if not self.check_rights(place):
                return self.error(f'Nemáte práva na správu soutěže {place.name_locative()}')
    
            self.add_role(user, place, db.RoleType.opravovatel)
    
    
    @dataclass
    class PointsImportRow(mo.csv.Row):
        user_id: str = ""
        krestni: str = ""
        prijmeni: str = ""
        body: str = ""
    
    
    class PointsImport(Import):
        row_class = PointsImportRow
        log_msg_prefix = 'Body'
    
        def setup(self):
            assert self.round is not None
            assert self.task is not None
            self.log_details = {'action': 'import-points', 'task': self.task.code}
            self.template_basename = 'body-' + self.task.code
    
        def _pion_sol_query(self) -> Query:
            sess = db.get_session()
            query = (sess.query(db.Participation, db.Solution)
                     .select_from(db.Participation)
                     .outerjoin(db.Solution, and_(db.Solution.user_id == db.Participation.user_id, db.Solution.task == self.task))
                     .options(joinedload(db.Participation.user)))
    
            if self.contest is not None:
                query = query.filter(db.Participation.contest_id == self.contest.master_contest_id)
            else:
                contest_query = sess.query(db.Contest.master_contest_id).filter_by(round=self.round)
                query = query.filter(db.Participation.contest_id.in_(contest_query.subquery()))
    
            return query
    
        def import_row(self, r: mo.csv.Row):
            assert isinstance(r, PointsImportRow)
            num_prev_errs = len(self.errors)
            user_id = self.parse_user_id(r.user_id)
            krestni = self.parse_name(r.krestni)
            prijmeni = self.parse_name(r.prijmeni)
            body = self.parse_points(r.body)
    
            if (len(self.errors) > num_prev_errs
                    or user_id is None
                    or krestni is None
                    or prijmeni is None
                    or body is None):
                return
    
            assert self.round is not None
            assert self.task is not None
            task_id = self.task.task_id
    
            sess = db.get_session()
            query = self._pion_sol_query().filter(db.Participation.user_id == user_id)
            pion_sols = query.all()
            if not pion_sols:
                return self.error('Soutěžící nenalezen v tomto kole')
            elif len(pion_sols) > 1:
                return self.error('Soutěžící v tomto kole soutěží vícekrát, neumím zpracovat')
            pion, sol = pion_sols[0]
    
            if self.contest is not None:
                if pion.contest != self.contest:
                    return self.error('Soutěžící nesoutěží v ' + self.round.get_level().name_locative('tomto', 'této', 'tomto'))
    
            rights = self.gatekeeper.rights_for_contest(pion.contest)
            if not rights.can_edit_points():
                return self.error('Nemáte právo na úpravu bodů')
    
            user = pion.user
            if user.first_name != krestni or user.last_name != prijmeni:
                return self.error('Neodpovídá ID a jméno soutěžícího')
    
            if sol is None:
                if body == 'X':
                    return
                if not self.allow_add_del:
                    return self.error('Tento soutěžící úlohu neodevzdal')
                if not rights.can_upload_solutions():
                    return self.error('Nemáte právo na zakládání nových řešení')
                sol = db.Solution(user_id=user_id, task_id=task_id)
                sess.add(sol)
                logger.info(f'Import: Založeno řešení user=#{user_id} task=#{task_id}')
                mo.util.log(
                    type=db.LogType.participant,
                    what=user_id,
                    details={'action': 'solution-created', 'task': task_id},
                )
                self.cnt_add_sols += 1
            elif body == 'X':
                if not self.allow_add_del:
                    return self.error('Tento soutěžící úlohu odevzdal')
                if sol.final_submit is not None or sol.final_feedback is not None:
                    return self.error('Nelze smazat řešení, ke kterému existují odevzdané soubory')
                if not rights.can_upload_solutions():
                    return self.error('Nemáte právo na mazání řešení')
                logger.info(f'Import: Smazáno řešení user=#{user_id} task=#{task_id}')
                mo.util.log(
                    type=db.LogType.participant,
                    what=user_id,
                    details={'action': 'solution-removed', 'task': task_id},
                )
                self.cnt_del_sols += 1
                sess.delete(sol)
                return
    
            points = body if isinstance(body, decimal.Decimal) else None
            if sol.points != points:
                sol.points = points
                sess.add(db.PointsHistory(
                    task=self.task,
                    participant_id=user_id,
                    user=self.user,
                    points_at=mo.now,
                    points=points,
                ))
                self.cnt_set_points += 1
    
        def get_template(self) -> str:
            rows = []
            for pion, sol in sorted(self._pion_sol_query().all(), key=lambda pair: pair[0].user.sort_key()):
                if sol is None:
                    pts = 'X'
                elif sol.points is None:
                    pts = '?'
                else:
                    pts = format_decimal(sol.points)
                user = pion.user
                rows.append(PointsImportRow(
                    user_id=user.user_id,
                    krestni=user.first_name,
                    prijmeni=user.last_name,
                    body=pts,
                ))
    
            out = io.StringIO()
            mo.csv.write(file=out, fmt=self.fmt, row_class=self.row_class, rows=rows)
            return out.getvalue()
    
    
    def create_import(user: db.User,
                      type: ImportType,
                      fmt: FileFormat,
                      round: Optional[db.Round] = None,
                      contest: Optional[db.Contest] = None,
                      task: Optional[db.Task] = None,
                      allow_add_del: bool = False):
        imp: Import
        if type == ImportType.participants:
            imp = ContestImport()
        elif type == ImportType.proctors:
            imp = ProctorImport()
        elif type == ImportType.judges:
            imp = JudgeImport()
        elif type == ImportType.points:
            imp = PointsImport()
        else:
            assert False, "Neznámý typ importu"
    
        imp.user = user
        imp.round = round
        imp.contest = contest
        imp.task = task
        imp.allow_add_del = allow_add_del
        imp.fmt = fmt
        imp.gatekeeper = mo.rights.Gatekeeper(user)
        imp.setup()
    
        return imp