From eae0b3c425b5022e4760fc2bf6d47bba533c6e57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ji=C5=99=C3=AD=20Setni=C4=8Dka?= <setnicka@seznam.cz> Date: Sun, 21 Feb 2021 23:31:59 +0100 Subject: [PATCH] =?UTF-8?q?Backend=20modul=20pro=20v=C3=BDrobu=20v=C3=BDsl?= =?UTF-8?q?edkov=C3=BDch=20listin?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pro zadaný round (a případně contest) vygeneruje obsah výsledkové listiny. Vrací výsledky jako ScoreResult objekty obsahující db objekty db.User, db.Participation, db.Participant a db.Solution pro každou úlohu + spočítané celkové body, určení vlastností vítěze/úspěšného řešitele a pořadí. Pořadí počítá podle nastavení v round. V součastnosti pdporuje dva módy: * jednoduchá výsledkovka (basic): Nen podle celkových bodů, sdílená místa. * podle pravidel MO (mo): Zjednoznačnění pořadí podle bodů za jednotlivé úlohy (od maxima a podle obtížnosti) a případně podle předchozích kol ve stejné kategorii. Generuje warningy, když se vyskytne nestandartní situace. Navenek má objekt Score metody: * get_tasks() - úlohy utřízené podle kódu * get_warnings() * get_sorted_results() - vrací ScoreResult, každý pak má metodu get_sols() Issue #171 --- mo/score.py | 383 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 383 insertions(+) create mode 100644 mo/score.py diff --git a/mo/score.py b/mo/score.py new file mode 100644 index 00000000..b7f4ed5d --- /dev/null +++ b/mo/score.py @@ -0,0 +1,383 @@ +from fractions import Fraction +from sqlalchemy import and_ +from sqlalchemy.orm import joinedload +from sqlalchemy.orm.query import Query +from sqlalchemy.sql.expression import select +from typing import Any, List, Tuple, Optional, Dict + +import mo.db as db +from mo.util import normalize_grade + + +class ScoreOrder: + place: int + span: int # u nedělených míst 1, u dělených počet spojených míst (2.-5. -> span 4) + continuation: bool # jestli je pokračováním místa o jedno předcházející + + def __init__(self, place: int, span: int = 1, continuation: bool = False): + self.place = place + self.span = span + self.continuation = continuation + + +class ScoreResult: + user: db.User + pant: db.Participant + pion: db.Participation + order: ScoreOrder + successful: bool + winner: bool + + # Řešení jednotlivých kol (pro některá řazení je potřeba znát i výsledky + # z předcházejících kol). První index je krok (0 = toto kolo, 1 = předcházející, ...) + # a druhý index je task_id z db.Solution. + _sols: Dict[int, Dict[int, db.Solution]] + # Třídící klíč je n-tice klíčů podle kterých třídit, všechny klíče tříděny + # vzestupně (pro sestupné třídění podle čísla je potřeba klíč vynásobit -1) + _order_key: List[Any] + + def __init__(self, user: db.User, pant: db.Participant, pion: db.Participation): + self.user = user + self.pant = pant + self.pion = pion + + self._sols = {} + self.order = 1 + self.place_span = 1 + self.place_continuation = False + self.winner = False + self.successful = False + self._order_key = [] + + def get_sols(self) -> List[db.Solution]: + return list(self._sols[0].values()) + + def get_sols_map(self) -> Dict[int, db.Solution]: + return self._sols[0] + + def get_total_points(self) -> int: + sum = 0 + for sol in self.get_sols(): + if sol.points: + sum += sol.points + return sum + + +class ScoreTask: + task: db.Task + num_solutions: int + sum_points: int + + def __init__(self, task: db.Task): + self.task = task + self.num_solutions = 0 + self.sum_points = 0 + + def get_difficulty(self) -> Fraction: + if self.num_solutions == 0: + return 0 + return Fraction(self.sum_points, self.num_solutions) + + +class Score: + round: db.Round + contest: Optional[db.Contest] + part_states: List[db.PartState] + + # Řádky výsledkovky + _results: Dict[int, ScoreResult] + # Úlohy jednotlivých kol (pro některá řazení je potřeba znát i úlohy + # z předcházejících kol. První index je krok (0 = toto kolo, 1 = předcházející, ...) + # a druhý index je task_id z db.Task. + _tasks: Dict[int, Dict[int, ScoreTask]] + # Seznam předcházejících kol indexovaných krokem výpočtu (0 = toto kolo, 1 = předcházející, ...) + _prev_rounds: Dict[int, db.Round] + + # Zprávy o tvorbě výsledkovky, dvojice (typ, zprávy) kde typ může být info, warning nebo error + _messages: List[Tuple[str, str]] + + def __init__( + self, round: db.Round, contest: Optional[db.Contest] = None, + # Ze kterých stavů chceme výsledkovku počítat + part_states: List[db.PartState] = [db.PartState.registered, db.PartState.invited, db.PartState.present], + ): + self.round = round + self.contest = contest + self.part_states = part_states + + # Příprava subquery na účastníky + sess = db.get_session() + if contest: + contest_subq = [contest.contest_id] + else: + contest_subq = sess.query(db.Contest.contest_id).filter_by(round=round) + + # Načtení účastníků + data: List[Tuple[db.User, db.Participation, db.Participant]] = ( + sess.query(db.User, db.Participation, db.Participant) + .select_from(db.Participation) + .join(db.User) + .join(db.Participant, and_( + db.Participant.user_id == db.Participation.user_id, + db.Participant.year == round.year + ) + ).filter( + db.User.is_test == False, + db.Participation.state.in_(part_states), + db.Participation.contest_id.in_(contest_subq) + ).options( + joinedload(db.Participant.school_place), + joinedload(db.Participation.contest).joinedload(db.Contest.place), + ).all() + ) + self._results = {} + for user, pion, pant in data: + self._results[user.user_id] = ScoreResult(user, pant, pion) + + # Načtení úloh a řešení + self._prev_rounds = {0: round} + self._tasks = {} + self._messages = [] + self._load_tasks_and_sols(0, round, contest_subq) + self._mark_winners() + + def _load_tasks_and_sols(self, step: int, round: db.Round, contest_subq: Query): + """Obecná funkce na načtení úloh a řešení tohoto nebo předchozího kola""" + if step in self._tasks: + return + sess = db.get_session() + + user_id_subq = sess.query(db.Participation.user_id).join( + db.User, db.Participation.user_id == db.User.user_id + ).filter( + db.User.is_test == False, + db.Participation.state.in_(self.part_states), + db.Participation.contest_id.in_(contest_subq) + ) + + # Vyrobení polí + self._tasks[step] = {} + for result in self._results.values(): + result._sols[step] = {} + + # Spočítání počtu řešitelů + num_participants = db.get_count(user_id_subq) + + # Načtení úloh + tasks: List[db.Task] = sess.query(db.Task).filter_by(round=round).all() + for task in tasks: + self._tasks[step][task.task_id] = ScoreTask(task) + self._tasks[step][task.task_id].num_solutions = num_participants + + # Načtení řešení + task_ids = list(self._tasks[step].keys()) + sols: List[db.Solution] = sess.query(db.Solution).filter( + db.Solution.user_id.in_(user_id_subq), + db.Solution.task_id.in_(task_ids), + ).all() + for sol in sols: + if sol.user_id in self._results: + self._results[sol.user_id]._sols[step][sol.task_id] = sol + if sol.points: + self._tasks[step][sol.task_id].sum_points += sol.points + + def _mark_winners(self): + for result in self._results.values(): + total_points = result.get_total_points() + result.winner = ( + self.round.score_winner_limit is not None + and total_points >= self.round.score_winner_limit + ) + result.successful = ( + self.round.score_successful_limit is not None + and total_points >= self.round.score_successful_limit + ) + + def _load_prev_round(self, step: int) -> bool: + """Načtení úloh a řešení předchozího kola, pokud takové existuje.""" + if step in self._tasks: + return True + sess = db.get_session() + + # Zkusíme nalézt kolo o `step` kroků zpět + prev_round = sess.query(db.Round).filter_by( + year=self.round.year, category=self.round.category, seq=self.round.seq - step + ).one_or_none() + if prev_round is None: + return False + self._prev_rounds[step] = prev_round + + if self.contest: + # Pokud tvoříme výsledkovku pro contest, tak nás zajímají jen řešení + # z podoblastí contestu spadajícího pod hlavní + desc_cte = db.place_descendant_cte(self.contest.place, max_level=prev_round.level) + contest_subq = sess.query(db.Contest.contest_id).filter( + db.Contest.round == prev_round, + db.Contest.place_id.in_(select([desc_cte])) + ) + else: + # Pokud vytváříme výsledkovku pro celé kolo, bereme vše + contest_subq = sess.query(db.Contest.contest_id).filter_by(round=prev_round) + + self._load_tasks_and_sols(step, prev_round, contest_subq) + return True + + def get_tasks(self) -> List[db.Task]: + tasks = [] + for task in self._tasks[0].values(): + tasks.append(task.task) + return list(sorted(tasks, key=lambda task: task.code)) + + def _add_message(self, type: str, message: str): + self._messages.append((type, message)) + + def get_messages(self) -> List[Tuple[str, str]]: + return self._messages + + def get_sorted_results(self) -> List[ScoreResult]: + # Vygenerování třídícího klíče podle módu výsledkovky + if self.round.score_mode == db.RoundScoreMode.basic: + # Základní mód - jen podle celkových bodů, se sdílenými místy + for result in self._results.values(): + result._order_key = [-result.get_total_points()] + + elif self.round.score_mode == db.RoundScoreMode.mo: + self._add_mo_order_key() + else: + assert False + + # Kvůli pevnému pořadí při sdílených místech přidáme k finálnímu třídění + # ještě i příjmení, jméno a user_id + results: List[ScoreResult] = sorted(self._results.values(), key=lambda result: ( + result._order_key, result.user.last_name, result.user.first_name, result.user.user_id + )) + last: ScoreResult = None + # Spočítáme pořadí - v prvním kroku prolinkujeme opakující se ScoreOrder na první, + # ve druhém kroku je pak správně rozkopírujeme s nastaveným continuation na True + for result in results: + if last is None: + result.order = ScoreOrder(1) + last = result + elif last._order_key == result._order_key: + result.order = last.order + last.order.span += 1 + else: + result.order = ScoreOrder(last.order.place + last.order.span) + last = result + lastOrder: ScoreOrder = None + for result in results: + if result.order == lastOrder: + result.order = ScoreOrder(lastOrder.place, lastOrder.span, True) + else: + lastOrder = result.order + + return results + + def _exists_same_order_key(self) -> bool: + last = None + for result in sorted(self._results.values(), key=lambda result: result._order_key): + if result._order_key == last: + return True + last = result._order_key + return False + + def _add_mo_order_key(self): + """Jednoznačný mód MO, iterujeme po krocích dokud buď nevyrobíme + výsledkovku bez sdílených míst, nebo už neexistuje předchozí kolo, + na které bychom se ještě mohli podívat.""" + step = 0 + while True: + if self._load_prev_round(step) is False: + self._add_message("info", "I po započítání všech předchozích kol stále existují sdílená místa, řadím podle ročníku") + break + + if step != 0: + self._add_message( + "info", + f"Výpočet na úrovni kola {self._prev_rounds[step-1].round_code()} nestačil," + + f" započítávám body z kola {self._prev_rounds[step].round_code()}" + ) + + tasks_by_difficulty = sorted( + self._tasks[step].keys(), + key=lambda task_id: (self._tasks[step][task_id].get_difficulty(), self._tasks[step][task_id].task.code), + ) + last_task: ScoreTask = None + last_difficulty: Fraction = None + difficulty_report = [] + for task_id in tasks_by_difficulty: + task = self._tasks[step][task_id] + difficulty = task.get_difficulty() + if last_task is not None and difficulty == last_difficulty and difficulty != 0: + self._add_message( + "warning", + f"Úlohy {last_task.task.code} a {task.task.code} mají stejnou vypočtenou obtížnost " + + f" {difficulty}, pro výpočet obtížnosti je řadím podle kódu úlohy" + ) + difficulty_report.append(f"{task.task.code} ({round(float(difficulty), 2)} b)") + last_task, last_difficulty = task, difficulty + + self._add_message( + "info", + f"Průměrné body úloh kola {self._prev_rounds[step].round_code()} od nejobtížnější: " + + ", ".join(difficulty_report) + ) + + for result in self._results.values(): + sol_points = {} + for task_id in self._tasks[step].keys(): + sol_points[task_id] = 0 + for sol in result._sols[0].values(): + if sol.points: + sol_points[sol.task_id] = -sol.points # sestupné třídění + + total_points = sum(sol_points.values()) + points_from_max = list(sorted(sol_points.values())) + points_from_difficult = [sol_points[task_id] for task_id in tasks_by_difficulty] + + # Primárně podle počtu získaných bodů, sekundárně podle bodů od maxima, terciárně podle bodů od nejobtížnější + result._order_key.extend((total_points, points_from_max, points_from_difficult)) + + # Otestujeme, jestli teď existují sdílená místa + if not self._exists_same_order_key(): + break + # Pokud jsme našli stejný klíč, opakujeme cyklus s minulým kolem + step += 1 + + # Na konec třídícího klíče přidáme ročník (menší ročník první) + for result in self._results.values(): + grade = normalize_grade(result.pant.grade) + if grade == -1: + self._add_message( + "warning", + f"Účastník {result.user.first_name} {result.user.last_name} má neplatný ročník {result.pant.grade}" + ) + result._order_key.append(grade) + + if self._exists_same_order_key(): + self._add_message( + "error", + "I po započítání všech úloh (včetně minulých kol) a ročníků účastníků existují sdílená místa. Je potřeba určit pořadí losem" + ) + + # Další kontroly + # Pokud kontrolujeme výsledkovku pro celé kolo + if self.contest is None: + winners = 0 + successfulls = 0 + participants = len(self._results) + for result in self._results.values(): + if result.winner: + winners += 1 + if result.successful: + successfulls += 1 + if successfulls > participants/2: + self._add_message( + "error", + f"Počet úspěšných řešitelů ({successfulls}) převyšuje polovinu celkového počtu účastníků ({participants})" + ) + if winners > successfulls/2: + self._add_message( + "error", + f"Počet vítězů ({winners}) převyšuje polovinu počtu úspěšných řešitelů ({successfulls})" + ) -- GitLab