Select Git revision
-
Martin Mareš authoredMartin Mareš authored
imports.py 29.69 KiB
from dataclasses import dataclass
import decimal
from enum import auto
import io
import re
from sqlalchemy import and_
from sqlalchemy.orm import joinedload, Query
from typing import List, Optional, Any, Dict, Type, Union
import mo.csv
from mo.csv import FileFormat, MissingHeaderError
import mo.db as db
import mo.email
import mo.rights
import mo.users
import mo.util
from mo.util import logger
from mo.util_format import format_decimal
class ImportType(db.MOEnum):
participants = auto()
orgs = auto()
points = auto()
def friendly_name(self) -> str:
return import_type_names[self]
import_type_names = {
ImportType.participants.name: 'účastníci',
ImportType.orgs.name: 'organizátoři',
ImportType.points.name: 'body',
}
class Import:
# Výsledek importu
errors: List[str]
warnings: List[str]
cnt_rows: int = 0
cnt_new_users: int = 0
cnt_new_participants: int = 0
cnt_new_participations: int = 0
cnt_new_roles: int = 0
cnt_set_points: int = 0
cnt_add_sols: int = 0
cnt_del_sols: int = 0
cnt_change_user_to_org: int = 0 # pro Import orgů: Počet provedených/požadovaných změn účastnka na orga
# Veřejné vlastnosti importu
template_basename: str = "sablona"
fmt: FileFormat
# Interní: Co a jak zrovna importujeme
user: db.User
round: Optional[db.Round] = None
contest: Optional[db.Contest] = None
only_region: Optional[db.Place] = None
task: Optional[db.Task] = None # pro Import bodů
default_place: Optional[db.Place] = None
category: Optional[str] = None
row_class: Type[mo.csv.Row]
row_example: mo.csv.Row
log_msg_prefix: str
log_details: Any
allow_change_user_to_org: bool = False # pro Import orgů: je povoleno vyrobit orga z účastníka
# Interní: Stav importu
place_cache: Dict[str, db.Place]
school_place_cache: Dict[str, db.Place]
gatekeeper: mo.rights.Gatekeeper
new_user_ids: List[int]
line_number: int = 0
row_name: Optional[str] = None
def __init__(self, user: db.User):
self.errors = []
self.warnings = []
self.place_cache = {}
self.school_place_cache = {}
self.new_user_ids = []
self.gatekeeper = mo.rights.Gatekeeper(user)
self.user = user
def setup(self):
# Definováno odvozenými třídami
assert NotImplementedError()
def error(self, msg: str) -> Any:
if self.line_number > 0:
if self.row_name:
msg = f"Řádek {self.line_number} ({self.row_name}): {msg}"
else:
msg = f"Řádek {self.line_number}: {msg}"
self.errors.append(msg)
logger.info('Import: >> %s', msg)
return None # Kdyby bylo otypováno správně jako -> None, při volání by si mypy stěžoval
def parse_user_id(self, user_id_str: str) -> Optional[int]:
if user_id_str == "":
return self.error('Chybí ID uživatele')
try:
return int(user_id_str)
except ValueError:
return self.error('ID uživatele není číslo')
def parse_email(self, email: str) -> Optional[str]:
if email == "":
return self.error('Chybí e-mailová adresa')
try:
return mo.users.normalize_email(email)
except mo.CheckError as e:
return self.error(str(e))
def parse_name(self, name: str) -> Optional[str]:
if name == "":
return self.error('Jméno nesmí být prázdné')
# XXX: Tato kontrola úmyslně není striktní, aby prošla i jména jako 'de Beer'
if name == name.lower():
return self.error('Ve jméně nejsou velká písmena')
if name == name.upper():
return self.error('Ve jméně nejsou malá písmena')
return name
def check_rights(self, round: db.Round, place: db.Place) -> bool:
assert round is not None
rights = self.gatekeeper.rights_for(place, round.year, round.category, round.seq)
return rights.have_right(mo.rights.Right.manage_contest)
def parse_opt_place(self, kod: str, what: str) -> Optional[db.Place]:
if kod == "":
return None
if kod in self.place_cache:
return self.place_cache[kod]
place = db.get_place_by_code(kod)
if not place:
return self.error(f'{what.title()} s kódem "{kod}" neexistuje'+
('. Nechybí vám # na začátku?' if re.fullmatch(r'\d+', kod) else ''))
self.place_cache[kod] = place
return place
def parse_role(self, name):
if name not in db.RoleType.__members__:
return self.error(f"Role {name} neexistuje. Podívejte se do manuálu na existující role.")
return db.RoleType[name]
def parse_school(self, kod: str) -> Optional[db.Place]:
if kod in self.school_place_cache:
return self.school_place_cache[kod]
try:
place = mo.users.validate_and_find_school(kod)
except mo.CheckError as e:
return self.error(str(e))
self.school_place_cache[kod] = place
return place
def parse_grade(self, rocnik: str, school: Optional[db.School]) -> Optional[str]:
if not school:
return None
# Ve snaze zabránit Excelu v interpretování ročníku jako kalendářního data
# lidé připisují všechny možné i nemožné znaky, které vypadají jako apostrof :)
rocnik = re.sub('[\'"\u00b4\u2019]', "", rocnik)
try:
return mo.users.normalize_grade(rocnik, school)
except mo.CheckError as e:
return self.error(str(e))
def parse_born(self, rok: str) -> Optional[int]:
if not re.fullmatch(r'\d{4}', rok):
return self.error('Rok narození musí být čtyřciferné číslo')
r = int(rok)
try:
mo.users.validate_born_year(r)
except mo.CheckError as e:
return self.error(str(e))
return r
def parse_category(self, kategorie: Optional[str]) -> Optional[str]:
return kategorie
def parse_round(self, year: Optional[int], category: Optional[str], seq: str) -> Optional[db.Round]:
if not year:
return self.error('Neuveden ročník pro nalezení kola')
if not category:
return self.error('Neuvedena kategorie pro nalezení kola')
r = (
db.get_session().query(db.Round)
.filter_by(year=year, category=category, seq=seq)
.one_or_none()
)
if r is None:
return self.error(f'Kolo {year}-{category}-{seq} nenalezeno')
return r
def find_or_create_user(self, email: str, krestni: Optional[str], prijmeni: Optional[str], is_org: bool) -> Optional[db.User]:
try:
try:
user, is_new, is_user_to_org = mo.users.find_or_create_user(email, krestni, prijmeni, is_org, allow_change_user_to_org=self.allow_change_user_to_org, reason='import')
self.cnt_change_user_to_org += is_user_to_org
except mo.users.CheckErrorOrgIsUser as e:
self.cnt_change_user_to_org += 1
raise mo.CheckError(str(e) + " Změnu můžete povolit ve formuláři.")
except mo.CheckError as e:
return self.error(str(e))
if is_new:
self.cnt_new_users += 1
self.new_user_ids.append(user.user_id)
return user
def parse_points(self, points_str: str) -> Union[decimal.Decimal, str, None]:
if points_str == "":
return self.error('Body musí být vyplněny')
points_str = points_str.upper()
if points_str in ['X', '?']:
return points_str
pts, error = mo.util.parse_points(points_str, self.task, self.round)
if error:
return self.error(error)
return pts
def find_or_create_participant(self, user: db.User, year: int, school_id: Optional[int], birth_year: Optional[int], grade: Optional[str]) -> Optional[db.Participant]:
try:
part, is_new = mo.users.find_or_create_participant(user, year, school_id, birth_year, grade, reason='import')
except mo.CheckError as e:
return self.error(str(e))
if is_new:
self.cnt_new_participants += 1
return part
def find_or_create_participation(self, user: db.User, contest: db.Contest, place: Optional[db.Place]) -> Optional[db.Participation]:
try:
pion, is_new = mo.users.find_or_create_participation(user, contest, place, reason='import')
except mo.CheckError as e:
return self.error(str(e))
if is_new:
self.cnt_new_participations += 1
return pion
def place_is_allowed(self, place: db.Place) -> bool:
if self.contest is not None and self.contest.place_id != place.place_id:
return False
if self.only_region is not None and not self.gatekeeper.is_ancestor_of(self.only_region, place):
return False
return True
def obtain_contest(self, round: db.Round, oblast: Optional[db.Place], allow_none: bool = False) -> Optional[db.Contest]:
contest: Optional[db.Contest]
if oblast is not None and not self.place_is_allowed(oblast):
return self.error('Oblast neodpovídá té, do které se importuje')
if self.contest:
contest = self.contest
else:
# Zde mluvíme o oblastech, místo abychom používali place_levels,
# protože sloupec má ve jménu oblast a také je potřeba rozlišovat školu
# účastníka a školu jako oblast.
if oblast is None:
if not allow_none:
self.error('Je nutné uvést kód oblasti')
return None
contest = db.get_session().query(db.Contest).filter_by(round=round, place=oblast).one_or_none()
if contest is None:
return self.error('V uvedené oblasti toto kolo neprobíhá')
return contest
def add_role(self, user: db.User, role: db.RoleType, place: Optional[db.Place], year: Optional[int], category: Optional[str], round: Optional[db.Round]):
sess = db.get_session()
if year and round and round.year != year:
return self.error('Ročník neodpovídá zadanému kolu.')
if category and round and round.category != category:
return self.error('Kategorie neodpovídá zadanému kolu.')
seq = None
if round:
category = round.category
year = round.year
seq = round.seq
if (sess.query(db.UserRole)
.filter_by(user=user, place=place, role=role,
category=category, year=year, seq=seq)
.with_for_update()
.first()):
pass
else:
ur = db.UserRole(user=user, place=place, role=role,
category=category, year=year, seq=seq,
assigned_by_user=self.user)
if not self.gatekeeper.can_set_role(ur):
return self.error('Roli "{new_role}" nelze přidělit, není podmnožinou žádné vaší role')
sess.add(ur)
sess.flush()
logger.info(f'Import: {role.name.title()} user=#{user.user_id} place=#{ place.place_id if place else "null" } user_role=#{ur.user_role_id}')
mo.util.log(
type=db.LogType.user_role,
what=ur.user_role_id,
details={'action': 'import', 'new': db.row2dict(ur)},
)
self.cnt_new_roles += 1
def import_row(self, r: mo.csv.Row):
# Definováno odvozenými třídami
assert NotImplementedError()
def log_start(self, path):
args = [f'user=#{self.user.user_id}', f'fmt={self.fmt.name}']
if self.round is not None:
args.append(f'round=#{self.round.round_id}')
if self.contest is not None:
args.append(f'contest=#{self.contest.contest_id}')
if self.only_region is not None:
args.append(f'region=#{self.only_region.place_id}')
if self.task is not None:
args.append(f'task=#{self.task.task_id}')
logger.info('Import: %s ze souboru %s: %s', self.log_msg_prefix, path, " ".join(args))
def log_end(self):
args = [f'rows=#{self.cnt_rows}']
for key, val in [
('users', self.cnt_new_users),
('p-ants', self.cnt_new_participants),
('p-ions', self.cnt_new_participations),
('roles', self.cnt_new_roles),
('points', self.cnt_set_points),
('add-sols', self.cnt_add_sols),
('del-sols', self.cnt_del_sols),
]:
if val > 0:
args.append(f'{key}={val}')
logger.info('Import: Hotovo (%s)', " ".join(args))
details = self.log_details.copy()
if self.only_region:
details['region'] = self.only_region.place_id
if self.contest is not None:
mo.util.log(
type=db.LogType.contest,
what=self.contest.contest_id,
details=details,
)
elif self.round is not None:
mo.util.log(
type=db.LogType.round,
what=self.round.round_id,
details=details,
)
else:
pass
# TODO
def check_utf8(self, path: str) -> bool:
# Není pěkné, že soubory čteme dvakrát, ale ve srovnání s pasekou, kterou by
# napáchal import ve špatném kódování, je to maličkost.
try:
with open(path, encoding='utf-8') as f:
for _ in f:
pass
return True
except UnicodeDecodeError:
return False
def get_row_name(self, row: mo.csv.Row) -> Optional[str]:
if hasattr(row, 'email'):
return row.email # type: ignore
# čtení prvku potomka
return None
def generic_import(self, path: str) -> bool:
charset = self.fmt.get_charset()
if charset != 'utf-8' and self.check_utf8(path):
logger.info('Import: Uhodnuto kódování utf-8')
charset = 'utf-8'
try:
with open(path, encoding=charset) as file:
try:
rows: List[mo.csv.Row]
rows, warnings = mo.csv.read(file=file, fmt=self.fmt, row_class=self.row_class)
self.warnings += warnings
except MissingHeaderError:
return self.error('Souboru chybí první řádek s názvy sloupců')
except UnicodeDecodeError:
return self.error(f'Soubor není v kódování {self.fmt.get_charset()}')
except Exception as e:
return self.error(f'Chybná struktura tabulky: {e}')
self.line_number = 2
for row in rows:
self.row_name = self.get_row_name(row)
self.cnt_rows += 1
self.import_row(row)
if len(self.errors) >= 100:
self.errors.append('Import přerušen pro příliš mnoho chyb')
break
self.line_number += 1
self.row_name = None
return len(self.errors) == 0
def notify_users(self):
# Projde všechny uživatele a těm, kteří ještě nemají nastavené heslo,
# ani nepožádali o jeho reset, pošle mail s odkazem na reset. Každého
# uživatele zpracováváme ve zvlášť transakci, aby se po chybě neztratila
# informace o tom, že už jsme nějaké maily rozeslali.
sess = db.get_session()
for uid in self.new_user_ids:
u = sess.query(db.User).get(uid)
if u and not u.password_hash and not u.reset_at:
token = mo.users.make_activation_token(u)
sess.commit()
mo.email.send_new_account_email(u, token)
else:
sess.rollback()
def get_template(self) -> str:
# Odvozené třídy mohou přetížit
out = io.StringIO()
mo.csv.write(file=out, fmt=self.fmt, row_class=self.row_class, rows=[self.row_example])
return out.getvalue()
def run(self, path: str) -> bool:
self.log_start(path)
if not self.generic_import(path):
logger.info('Import: Rollback')
db.get_session().rollback()
return False
self.log_end()
db.get_session().commit()
self.notify_users()
return True
def get_after_import_message(self):
return "Import proveden."
@dataclass
class ContestImportRow(mo.csv.Row):
email: str = ""
krestni: str = ""
prijmeni: str = ""
kod_skoly: str = ""
rocnik: str = ""
rok_naroz: str = ""
kod_mista: str = ""
kod_oblasti: str = ""
class ContestImport(Import):
row_class = ContestImportRow
row_example = ContestImportRow(
email="nekdo@example.org",
krestni="Pokusný",
prijmeni="Králík",
kod_skoly="#3333",
rocnik="1/8",
rok_naroz="2000",
)
log_msg_prefix = 'Účastníci'
log_details = {'action': 'import'}
template_basename = 'sablona-ucast'
def __init__(
self,
user: db.User,
round: db.Round,
contest: Optional[db.Contest] = None,
only_region: Optional[db.Place] = None,
default_place: Optional[db.Place] = None
):
super().__init__(user)
self.user = user
self.round = round
self.contest = contest
self.only_region = only_region
self.default_place = default_place
self.setup()
assert self.round is not None
assert not self.round.is_subround()
def import_row(self, r: mo.csv.Row):
assert self.round
assert isinstance(r, ContestImportRow)
num_prev_errs = len(self.errors)
email = self.parse_email(r.email)
krestni = self.parse_name(r.krestni) if r.krestni else None
prijmeni = self.parse_name(r.prijmeni) if r.prijmeni else None
school_place = self.parse_school(r.kod_skoly) if r.kod_skoly else None
rocnik = self.parse_grade(r.rocnik, (school_place.school if school_place else None)) if r.rocnik else None
rok_naroz = self.parse_born(r.rok_naroz) if r.rok_naroz else None
misto = self.parse_opt_place(r.kod_mista, 'místo')
if misto and not self.check_rights(self.round, misto):
return self.error(f'Nemáte práva na správu soutěže {misto.name_locative()}')
oblast = self.parse_opt_place(r.kod_oblasti, 'oblast')
if oblast and not self.check_rights(self.round, oblast):
return self.error(f'Nemáte práva na správu soutěže {oblast.name_locative()}')
if (len(self.errors) > num_prev_errs
or email is None):
return
user = self.find_or_create_user(email, krestni, prijmeni, is_org=False)
if user is None:
return
part = self.find_or_create_participant(user, self.round.year, school_place.place_id if school_place else None, rok_naroz, rocnik)
if part is None:
return
contest = self.obtain_contest(self.round, oblast)
if contest is None:
return
self.find_or_create_participation(user, contest, misto)
def get_after_import_message(self):
return f'Importováno ({self.cnt_rows} řádků, založeno {self.cnt_new_users} uživatelů, {self.cnt_new_participations} účastí, {self.cnt_new_roles} rolí)'
@dataclass
class OrgsImportRow(mo.csv.Row):
email: str = ""
krestni: str = ""
prijmeni: str = ""
kod_oblasti: str = ""
role: str = ""
class OrgsImport(Import):
row_class = OrgsImportRow
row_example = OrgsImportRow(
email='nekdo@example.org',
krestni='Pokusný',
prijmeni='Králík',
kod_oblasti='#3333',
role='dozor',
)
log_msg_prefix = 'Organizátoři'
log_details = {'action': 'import-orgs'}
template_basename = 'sablona-organizatori'
default_cat: Optional[str]
default_seq: Optional[str]
def __init__(
self,
user: db.User,
round: Optional[db.Round] = None, # Prázdné může být pouze když se jedná o GlobalOrgsImport
contest: Optional[db.Contest] = None,
only_region: Optional[db.Place] = None,
allow_change_user_to_org: bool = False,
default_place: Optional[db.Place] = None,
default_cat: Optional[str] = None,
default_seq: Optional[str] = None,
year: Optional[int] = None
):
super().__init__(user)
self.round = round
self.contest = contest
self.only_region = only_region
self.default_place = default_place
self.default_cat = self.parse_category(default_cat)
self.default_seq = default_seq
self.default_place = default_place
self.setup()
self.allow_change_user_to_org = allow_change_user_to_org
self.root_place = db.get_root_place()
self.year = round.year if round else year
assert hasattr(self.row_class, "kolo") or self.round
def import_row(self, r: mo.csv.Row):
assert isinstance(r, OrgsImportRow) or isinstance(r, GlobalOrgsImportRow)
num_prev_errs = len(self.errors)
email = self.parse_email(r.email)
krestni = self.parse_name(r.krestni)
prijmeni = self.parse_name(r.prijmeni)
role = self.parse_role(r.role)
if getattr(r, "kategorie", ""):
kategorie = self.parse_category(getattr(r, "kategorie"))
else:
kategorie = self.default_cat
seq = getattr(r, "kolo", "") or self.default_seq
if seq:
kolo = self.parse_round(self.year, kategorie, seq)
if not kolo:
return
else:
kolo=self.round
oblast = self.parse_opt_place(r.kod_oblasti, 'oblast')
if oblast is None:
oblast = self.default_place
if oblast is None:
return self.error('Chybí oblast.')
if (len(self.errors) > num_prev_errs
or email is None
or krestni is None
or prijmeni is None):
return
user = self.find_or_create_user(email, krestni, prijmeni, is_org=True)
if user is None:
return
self.add_role(user, role, oblast, self.year, kategorie, kolo)
def get_after_import_message(self):
return f'Importováno ({self.cnt_rows} řádků, založeno {self.cnt_new_users} uživatelů, {self.cnt_new_participations} účastí, {self.cnt_new_roles} rolí)'
@dataclass
class GlobalOrgsImportRow(OrgsImportRow):
kategorie: str = ""
kolo: str = ""
class GlobalOrgsImport(OrgsImport):
row_class = GlobalOrgsImportRow
row_example = GlobalOrgsImportRow(
email='nekdo@example.org',
krestni='Pokusný',
prijmeni='Králík',
kod_oblasti='#3333',
role='dozor',
kategorie='Z',
kolo='',
)
log_msg_prefix = 'Organizátoři'
log_details = {'action': 'import-orgs'}
template_basename = 'sablona-organizatori'
def __init__(
self,
user: db.User,
**kvargs
):
super().__init__(user, **kvargs)
def setup(self):
self.root_place = db.get_root_place()
@dataclass
class PointsImportRow(mo.csv.Row):
user_id: str = ""
krestni: str = ""
prijmeni: str = ""
body: str = ""
class PointsImport(Import):
row_class = PointsImportRow
log_msg_prefix = 'Body'
allow_add_del: bool # je povoleno zakládat/mazat řešení
def __init__(
self,
user: db.User,
round: db.Round,
task: db.Task,
contest: Optional[db.Contest] = None,
only_region: Optional[db.Place] = None,
allow_add_del: bool = False,
):
super().__init__(user)
self.round = round
self.contest = contest
self.task = task
self.only_region = only_region
self.allow_add_del = allow_add_del
assert self.round is not None
assert self.task is not None
self.log_details = {'action': 'import-points', 'task': self.task.code}
self.template_basename = 'body-' + self.task.code
def _pion_sol_query(self) -> Query:
sess = db.get_session()
query = (sess.query(db.Participation, db.Solution)
.select_from(db.Participation)
.outerjoin(db.Solution, and_(db.Solution.user_id == db.Participation.user_id, db.Solution.task == self.task))
.options(joinedload(db.Participation.user)))
if self.contest is not None:
query = query.filter(db.Participation.contest_id == self.contest.master_contest_id)
else:
contest_query = sess.query(db.Contest.master_contest_id).filter_by(round=self.round)
if self.only_region:
assert self.round
contest_query = db.filter_place_nth_parent(contest_query, db.Contest.place_id, self.round.level - self.only_region.level, self.only_region.place_id)
query = query.filter(db.Participation.contest_id.in_(contest_query.subquery()))
return query
def import_row(self, r: mo.csv.Row):
assert isinstance(r, PointsImportRow)
num_prev_errs = len(self.errors)
user_id = self.parse_user_id(r.user_id)
krestni = self.parse_name(r.krestni)
prijmeni = self.parse_name(r.prijmeni)
body = self.parse_points(r.body)
if (len(self.errors) > num_prev_errs
or user_id is None
or krestni is None
or prijmeni is None
or body is None):
return
assert self.round is not None
assert self.task is not None
task_id = self.task.task_id
sess = db.get_session()
query = self._pion_sol_query().filter(db.Participation.user_id == user_id)
pion_sols = query.all()
if not pion_sols:
if self.contest is not None:
msg = self.round.get_level().name_locative('tomto', 'této', 'tomto')
elif self.only_region is not None:
msg = self.only_region.get_level().name_locative('tomto', 'této', 'tomto')
else:
msg = 'tomto kole'
return self.error(f'Soutěžící nenalezen v {msg}')
elif len(pion_sols) > 1:
return self.error('Soutěžící v tomto kole soutěží vícekrát, neumím zpracovat')
pion, sol = pion_sols[0]
if not self.round.is_subround():
contest = pion.contest
else:
contest = sess.query(db.Contest).filter_by(round=self.round, master_contest_id=pion.contest_id).one()
rights = self.gatekeeper.rights_for_contest(contest)
if not rights.can_edit_points():
return self.error('Nemáte právo na úpravu bodů')
user = pion.user
if user.first_name != krestni or user.last_name != prijmeni:
return self.error('Neodpovídá ID a jméno soutěžícího')
if sol is None:
if body == 'X':
return
if not self.allow_add_del:
return self.error('Tento soutěžící úlohu neodevzdal')
if not rights.can_upload_solutions():
return self.error('Nemáte právo na zakládání nových řešení')
sol = db.Solution(user_id=user_id, task_id=task_id)
sess.add(sol)
logger.info(f'Import: Založeno řešení user=#{user_id} task=#{task_id}')
mo.util.log(
type=db.LogType.participant,
what=user_id,
details={'action': 'solution-created', 'task': task_id},
)
self.cnt_add_sols += 1
elif body == 'X':
if not self.allow_add_del:
return self.error('Tento soutěžící úlohu odevzdal')
if sol.final_submit is not None or sol.final_feedback is not None:
return self.error('Nelze smazat řešení, ke kterému existují odevzdané soubory')
if not rights.can_upload_solutions():
return self.error('Nemáte právo na mazání řešení')
logger.info(f'Import: Smazáno řešení user=#{user_id} task=#{task_id}')
mo.util.log(
type=db.LogType.participant,
what=user_id,
details={'action': 'solution-removed', 'task': task_id},
)
self.cnt_del_sols += 1
sess.delete(sol)
return
points = body if isinstance(body, decimal.Decimal) else None
if sol.points != points:
sol.points = points
sess.add(db.PointsHistory(
task=self.task,
participant_id=user_id,
user=self.user,
points_at=mo.now,
points=points,
))
self.cnt_set_points += 1
def get_template(self) -> str:
rows = []
for pion, sol in sorted(self._pion_sol_query().all(), key=lambda pair: pair[0].user.sort_key()):
if sol is None:
pts = 'X'
elif sol.points is None:
pts = '?'
else:
pts = format_decimal(sol.points)
user = pion.user
rows.append(PointsImportRow(
user_id=user.user_id,
krestni=user.first_name,
prijmeni=user.last_name,
body=pts,
))
out = io.StringIO()
mo.csv.write(file=out, fmt=self.fmt, row_class=self.row_class, rows=rows)
return out.getvalue()
def get_after_import_message(self):
return f'Importováno ({self.cnt_rows} řádků, {self.cnt_set_points} řešení přebodováno, {self.cnt_add_sols} založeno a {self.cnt_del_sols} smazáno)'