Skip to content
Snippets Groups Projects
Select Git revision
  • 7c2227583916cb2d2fc42974993727c9667b19c4
  • master default protected
2 results

compctl.c

Blame
  • score.py 15.78 KiB
    from fractions import Fraction
    from sqlalchemy import and_
    from sqlalchemy.orm import joinedload
    from sqlalchemy.orm.query import Query
    from sqlalchemy.sql.expression import select
    from typing import Any, List, Tuple, Optional, Dict, Union
    
    import mo.db as db
    from mo.util import normalize_grade
    
    
    class ScoreOrder:
        place: int
        span: int  # u nedělených míst 1, u dělených počet spojených míst (2.-5. -> span 4)
        continuation: bool  # jestli je pokračováním místa o jedno předcházející
    
        def __init__(self, place: int, span: int = 1, continuation: bool = False):
            self.place = place
            self.span = span
            self.continuation = continuation
    
    
    class ScoreResult:
        user: db.User
        pant: db.Participant
        pion: db.Participation
        order: ScoreOrder
        successful: bool
        winner: bool
    
        # Řešení jednotlivých kol (pro některá řazení je potřeba znát i výsledky
        # z předcházejících kol). První index je krok (0 = toto kolo, 1 = předcházející, ...)
        # a druhý index je task_id z db.Solution.
        _sols: Dict[int, Dict[int, db.Solution]]
        # Třídící klíč je n-tice klíčů podle kterých třídit, všechny klíče tříděny
        # vzestupně (pro sestupné třídění podle čísla je potřeba klíč vynásobit -1)
        _order_key: List[Any]
    
        _null_score_order = ScoreOrder(0)
    
        def __init__(self, user: db.User, pant: db.Participant, pion: db.Participation):
            self.user = user
            self.pant = pant
            self.pion = pion
    
            self._sols = {}
            self.order = ScoreResult._null_score_order
            self.winner = False
            self.successful = False
            self._order_key = []
    
        def get_sols(self) -> List[db.Solution]:
            return list(self._sols[0].values())
    
        def get_sols_map(self) -> Dict[int, db.Solution]:
            return self._sols[0]
    
        def get_total_points(self) -> int:
            sum = 0
            for sol in self.get_sols():
                if sol.points:
                    sum += sol.points
            return sum
    
    
    class ScoreTask:
        task: db.Task
        num_solutions: int
        sum_points: int
    
        def __init__(self, task: db.Task):
            self.task = task
            self.num_solutions = 0
            self.sum_points = 0
    
        def get_difficulty(self) -> Fraction:
            if self.num_solutions == 0:
                return Fraction(0)
            return Fraction(self.sum_points, self.num_solutions)
    
    
    class Score:
        round: db.Round
        contest: Optional[db.Contest]
        part_states: List[db.PartState]
        want_successful: bool
        want_winners: bool
    
        # Řádky výsledkovky
        _results: Dict[int, ScoreResult]
        # Úlohy jednotlivých kol (pro některá řazení je potřeba znát i úlohy
        # z předcházejících kol. První index je krok (0 = toto kolo, 1 = předcházející, ...)
        # a druhý index je task_id z db.Task.
        _tasks: Dict[int, Dict[int, ScoreTask]]
        # Seznam předcházejících kol indexovaných krokem výpočtu (0 = toto kolo, 1 = předcházející, ...)
        _prev_rounds: Dict[int, db.Round]
    
        # Zprávy o tvorbě výsledkovky, dvojice (typ, zprávy) kde typ může být info, warning nebo error
        _messages: List[Tuple[str, str]]
    
        def __init__(
            self, round: db.Round, contest: Optional[db.Contest] = None,
            # Ze kterých stavů chceme výsledkovku počítat
            part_states: List[db.PartState] = [db.PartState.registered, db.PartState.invited, db.PartState.present],
        ):
            self.round = round
            self.contest = contest
            self.part_states = part_states
            self.want_successful = round.score_successful_limit is not None
            self.want_winners = round.score_winner_limit is not None
    
            # Příprava subquery na účastníky (contest_subq obsahuje master_contest_id)
            sess = db.get_session()
            if contest:
                contest_subq = [contest.master_contest_id]
            else:
                contest_subq = sess.query(db.Contest.master_contest_id).filter_by(round=round)
    
            # Načtení účastníků
            data: List[Tuple[db.User, db.Participation, db.Participant]] = (
                sess.query(db.User, db.Participation, db.Participant)
                .select_from(db.Participation)
                .join(db.User)
                .join(db.Participant, and_(
                        db.Participant.user_id == db.Participation.user_id,
                        db.Participant.year == round.year
                    )
                ).filter(
                    db.User.is_test == False,
                    db.Participation.state.in_(part_states),
                    db.Participation.contest_id.in_(contest_subq)
                ).options(
                    joinedload(db.Participant.school_place),
                    joinedload(db.Participation.contest).joinedload(db.Contest.place),
                ).all()
            )
            self._results = {}
            for user, pion, pant in data:
                self._results[user.user_id] = ScoreResult(user, pant, pion)
    
            # Načtení úloh a řešení
            self._prev_rounds = {0: round}
            self._tasks = {}
            self._messages = []
            self._load_tasks_and_sols(0, round, contest_subq)
            self._mark_winners()
    
        def _load_tasks_and_sols(self, step: int, round: db.Round, contest_subq: Union[Query, List[int]]):
            """Obecná funkce na načtení úloh a řešení tohoto nebo předchozího kola"""
            if step in self._tasks:
                return
            sess = db.get_session()
    
            user_id_subq = sess.query(db.Participation.user_id).join(
                db.User, db.Participation.user_id == db.User.user_id
            ).filter(
                db.User.is_test == False,
                db.Participation.state.in_(self.part_states),
                db.Participation.contest_id.in_(contest_subq)
            )
    
            # Vyrobení polí
            self._tasks[step] = {}
            for result in self._results.values():
                result._sols[step] = {}
    
            # Spočítání počtu řešitelů
            num_participants = db.get_count(user_id_subq)
    
            # Načtení úloh
            tasks: List[db.Task] = sess.query(db.Task).filter(db.Task.round_id.in_(
                sess.query(db.Round.round_id).filter_by(master_round_id=round.master_round_id)
            )).all()
            for task in tasks:
                self._tasks[step][task.task_id] = ScoreTask(task)
                self._tasks[step][task.task_id].num_solutions = num_participants
    
            # Načtení řešení
            task_ids = list(self._tasks[step].keys())
            sols: List[db.Solution] = sess.query(db.Solution).filter(
                db.Solution.user_id.in_(user_id_subq),
                db.Solution.task_id.in_(task_ids),
            ).all()
            for sol in sols:
                if sol.user_id in self._results:
                    self._results[sol.user_id]._sols[step][sol.task_id] = sol
                if sol.points:
                    self._tasks[step][sol.task_id].sum_points += sol.points
    
        def _mark_winners(self):
            for result in self._results.values():
                total_points = result.get_total_points()
                result.winner = self.want_winners and total_points >= self.round.score_winner_limit
                result.successful = self.want_successful and total_points >= self.round.score_successful_limit
    
        def _load_prev_round(self, step: int) -> bool:
            """Načtení úloh a řešení předchozího kola, pokud takové existuje."""
            if step in self._tasks:
                return True
            sess = db.get_session()
    
            # Zkusíme nalézt kolo o `step` kroků zpět
            prev_round = sess.query(db.Round).filter_by(
                year=self.round.year, category=self.round.category, seq=self.round.seq - step
            ).filter(db.Round.master_round_id == db.Round.round_id).one_or_none()
            if prev_round is None:
                return False
            self._prev_rounds[step] = prev_round
    
            if self.contest:
                # Pokud tvoříme výsledkovku pro contest, tak nás zajímají jen řešení
                # z podoblastí contestu spadajícího pod hlavní
                desc_cte = db.place_descendant_cte(self.contest.place, max_level=prev_round.level)
                contest_subq = sess.query(db.Contest.master_contest_id).filter(
                    db.Contest.round == prev_round,
                    db.Contest.place_id.in_(select([desc_cte]))
                )
            else:
                # Pokud vytváříme výsledkovku pro celé kolo, bereme vše
                contest_subq = sess.query(db.Contest.master_contest_id).filter_by(round=prev_round)
    
            self._load_tasks_and_sols(step, prev_round, contest_subq)
            return True
    
        def get_tasks(self) -> List[db.Task]:
            tasks = []
            for task in self._tasks[0].values():
                tasks.append(task.task)
            return list(sorted(tasks, key=lambda task: task.code))
    
        def _add_message(self, type: str, message: str):
            self._messages.append((type, message))
    
        def get_messages(self) -> List[Tuple[str, str]]:
            return self._messages
    
        def get_sorted_results(self) -> List[ScoreResult]:
            # Vygenerování třídícího klíče podle módu výsledkovky
            if self.round.score_mode == db.RoundScoreMode.basic:
                # Základní mód - jen podle celkových bodů, se sdílenými místy
                for result in self._results.values():
                    result._order_key = [-result.get_total_points()]
    
            elif self.round.score_mode == db.RoundScoreMode.mo:
                self._add_mo_order_key()
            else:
                assert False
    
            # Kvůli pevnému pořadí při sdílených místech přidáme k finálnímu třídění
            # ještě i příjmení, jméno a user_id
            results: List[ScoreResult] = sorted(self._results.values(), key=lambda result: (
                result._order_key, result.user.last_name, result.user.first_name, result.user.user_id
            ))
            last: Optional[ScoreResult] = None
            # Spočítáme pořadí - v prvním kroku prolinkujeme opakující se ScoreOrder na první,
            # ve druhém kroku je pak správně rozkopírujeme s nastaveným continuation na True
            for result in results:
                if last is None:
                    result.order = ScoreOrder(1)
                    last = result
                elif last._order_key == result._order_key:
                    result.order = last.order
                    last.order.span += 1
                else:
                    result.order = ScoreOrder(last.order.place + last.order.span)
                    last = result
            lastOrder: Optional[ScoreOrder] = None
            for result in results:
                if result.order == lastOrder:
                    result.order = ScoreOrder(lastOrder.place, lastOrder.span, True)
                else:
                    lastOrder = result.order
    
            return results
    
        def _exists_same_order_key(self) -> bool:
            last = None
            for result in sorted(self._results.values(), key=lambda result: result._order_key):
                if result._order_key == last:
                    return True
                last = result._order_key
            return False
    
        def _add_mo_order_key(self):
            """Jednoznačný mód MO, iterujeme po krocích dokud buď nevyrobíme
            výsledkovku bez sdílených míst, nebo už neexistuje předchozí kolo,
            na které bychom se ještě mohli podívat."""
            step = 0
            while True:
                if self._load_prev_round(step) is False:
                    self._add_message("info", "I po započítání všech předchozích kol stále existují sdílená místa, řadím podle ročníku")
                    break
    
                if step != 0:
                    self._add_message(
                        "info",
                        f"Výpočet na úrovni kola {self._prev_rounds[step-1].round_code()} nestačil,"
                        + f" započítávám body z kola {self._prev_rounds[step].round_code()}"
                    )
    
                tasks_by_difficulty = sorted(
                    self._tasks[step].keys(),
                    key=lambda task_id: (self._tasks[step][task_id].get_difficulty(), self._tasks[step][task_id].task.code),
                )
                last_task: ScoreTask = None
                last_difficulty: Fraction = None
                difficulty_report = []
                for task_id in tasks_by_difficulty:
                    task = self._tasks[step][task_id]
                    difficulty = task.get_difficulty()
                    if last_task is not None and difficulty == last_difficulty and difficulty != 0:
                        self._add_message(
                            "warning",
                            f"Úlohy {last_task.task.code} a {task.task.code} mají stejnou vypočtenou obtížnost "
                            + f" {difficulty}, pro výpočet obtížnosti je řadím podle kódu úlohy"
                        )
                    difficulty_report.append(f"{task.task.code} ({round(float(difficulty), 2)} b)")
                    last_task, last_difficulty = task, difficulty
    
                self._add_message(
                    "info",
                    f"Průměrné body úloh kola {self._prev_rounds[step].round_code()} od nejobtížnější: "
                    + ", ".join(difficulty_report)
                )
    
                for result in self._results.values():
                    sol_points = {}
                    for task_id in self._tasks[step].keys():
                        sol_points[task_id] = 0
                    for sol in result._sols[0].values():
                        if sol.points:
                            sol_points[sol.task_id] = -sol.points  # sestupné třídění
    
                    total_points = sum(sol_points.values())
                    points_from_max = list(sorted(sol_points.values()))
                    points_from_difficult = [sol_points[task_id] for task_id in tasks_by_difficulty]
    
                    if result.successful or not self.want_successful:
                        # Primárně podle počtu získaných bodů, sekundárně podle bodů od maxima, terciárně podle bodů od nejobtížnější
                        result._order_key.extend((total_points, points_from_max, points_from_difficult))
                    else:
                        # Neúspěšné řešitele třídíme podle počtu získaných bodů, sekundárně podle jména
                        result._order_key.extend((total_points, result.user.name_sort_key()))
    
                # Otestujeme, jestli teď existují sdílená místa
                if not self._exists_same_order_key():
                    break
                # Pokud jsme našli stejný klíč, opakujeme cyklus s minulým kolem
                step += 1
    
            # Na konec třídícího klíče přidáme ročník (menší ročník první)
            for result in self._results.values():
                grade = normalize_grade(result.pant.grade)
                if grade == -1:
                    self._add_message(
                        "warning",
                        f"Účastník {result.user.first_name} {result.user.last_name} má neplatný ročník {result.pant.grade}"
                    )
                result._order_key.append(grade)
    
            if self._exists_same_order_key():
                self._add_message(
                    "error",
                    "I po započítání všech úloh (včetně minulých kol) a ročníků účastníků existují sdílená místa. Je potřeba určit pořadí losem"
                )
    
            # Další kontroly
            # Pokud kontrolujeme výsledkovku pro celé kolo
            if self.contest is None:
                winners = 0
                successfulls = 0
                participants = len(self._results)
                for result in self._results.values():
                    if result.winner:
                        winners += 1
                    if result.successful:
                        successfulls += 1
                if successfulls > participants // 2:
                    self._add_message(
                        "error",
                        f"Počet úspěšných řešitelů ({successfulls}) převyšuje polovinu celkového počtu účastníků ({participants})"
                    )
                if winners > successfulls // 2:
                    self._add_message(
                        "error",
                        f"Počet vítězů ({winners}) převyšuje polovinu počtu úspěšných řešitelů ({successfulls})"
                    )