Select Git revision
org_contest.py
-
Martin Mareš authoredMartin Mareš authored
score.py 15.78 KiB
from fractions import Fraction
from sqlalchemy import and_
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.query import Query
from sqlalchemy.sql.expression import select
from typing import Any, List, Tuple, Optional, Dict, Union
import mo.db as db
from mo.util import normalize_grade
class ScoreOrder:
place: int
span: int # u nedělených míst 1, u dělených počet spojených míst (2.-5. -> span 4)
continuation: bool # jestli je pokračováním místa o jedno předcházející
def __init__(self, place: int, span: int = 1, continuation: bool = False):
self.place = place
self.span = span
self.continuation = continuation
class ScoreResult:
user: db.User
pant: db.Participant
pion: db.Participation
order: ScoreOrder
successful: bool
winner: bool
# Řešení jednotlivých kol (pro některá řazení je potřeba znát i výsledky
# z předcházejících kol). První index je krok (0 = toto kolo, 1 = předcházející, ...)
# a druhý index je task_id z db.Solution.
_sols: Dict[int, Dict[int, db.Solution]]
# Třídící klíč je n-tice klíčů podle kterých třídit, všechny klíče tříděny
# vzestupně (pro sestupné třídění podle čísla je potřeba klíč vynásobit -1)
_order_key: List[Any]
_null_score_order = ScoreOrder(0)
def __init__(self, user: db.User, pant: db.Participant, pion: db.Participation):
self.user = user
self.pant = pant
self.pion = pion
self._sols = {}
self.order = ScoreResult._null_score_order
self.winner = False
self.successful = False
self._order_key = []
def get_sols(self) -> List[db.Solution]:
return list(self._sols[0].values())
def get_sols_map(self) -> Dict[int, db.Solution]:
return self._sols[0]
def get_total_points(self) -> int:
sum = 0
for sol in self.get_sols():
if sol.points:
sum += sol.points
return sum
class ScoreTask:
task: db.Task
num_solutions: int
sum_points: int
def __init__(self, task: db.Task):
self.task = task
self.num_solutions = 0
self.sum_points = 0
def get_difficulty(self) -> Fraction:
if self.num_solutions == 0:
return Fraction(0)
return Fraction(self.sum_points, self.num_solutions)
class Score:
round: db.Round
contest: Optional[db.Contest]
part_states: List[db.PartState]
want_successful: bool
want_winners: bool
# Řádky výsledkovky
_results: Dict[int, ScoreResult]
# Úlohy jednotlivých kol (pro některá řazení je potřeba znát i úlohy
# z předcházejících kol. První index je krok (0 = toto kolo, 1 = předcházející, ...)
# a druhý index je task_id z db.Task.
_tasks: Dict[int, Dict[int, ScoreTask]]
# Seznam předcházejících kol indexovaných krokem výpočtu (0 = toto kolo, 1 = předcházející, ...)
_prev_rounds: Dict[int, db.Round]
# Zprávy o tvorbě výsledkovky, dvojice (typ, zprávy) kde typ může být info, warning nebo error
_messages: List[Tuple[str, str]]
def __init__(
self, round: db.Round, contest: Optional[db.Contest] = None,
# Ze kterých stavů chceme výsledkovku počítat
part_states: List[db.PartState] = [db.PartState.registered, db.PartState.invited, db.PartState.present],
):
self.round = round
self.contest = contest
self.part_states = part_states
self.want_successful = round.score_successful_limit is not None
self.want_winners = round.score_winner_limit is not None
# Příprava subquery na účastníky (contest_subq obsahuje master_contest_id)
sess = db.get_session()
if contest:
contest_subq = [contest.master_contest_id]
else:
contest_subq = sess.query(db.Contest.master_contest_id).filter_by(round=round)
# Načtení účastníků
data: List[Tuple[db.User, db.Participation, db.Participant]] = (
sess.query(db.User, db.Participation, db.Participant)
.select_from(db.Participation)
.join(db.User)
.join(db.Participant, and_(
db.Participant.user_id == db.Participation.user_id,
db.Participant.year == round.year
)
).filter(
db.User.is_test == False,
db.Participation.state.in_(part_states),
db.Participation.contest_id.in_(contest_subq)
).options(
joinedload(db.Participant.school_place),
joinedload(db.Participation.contest).joinedload(db.Contest.place),
).all()
)
self._results = {}
for user, pion, pant in data:
self._results[user.user_id] = ScoreResult(user, pant, pion)
# Načtení úloh a řešení
self._prev_rounds = {0: round}
self._tasks = {}
self._messages = []
self._load_tasks_and_sols(0, round, contest_subq)
self._mark_winners()
def _load_tasks_and_sols(self, step: int, round: db.Round, contest_subq: Union[Query, List[int]]):
"""Obecná funkce na načtení úloh a řešení tohoto nebo předchozího kola"""
if step in self._tasks:
return
sess = db.get_session()
user_id_subq = sess.query(db.Participation.user_id).join(
db.User, db.Participation.user_id == db.User.user_id
).filter(
db.User.is_test == False,
db.Participation.state.in_(self.part_states),
db.Participation.contest_id.in_(contest_subq)
)
# Vyrobení polí
self._tasks[step] = {}
for result in self._results.values():
result._sols[step] = {}
# Spočítání počtu řešitelů
num_participants = db.get_count(user_id_subq)
# Načtení úloh
tasks: List[db.Task] = sess.query(db.Task).filter(db.Task.round_id.in_(
sess.query(db.Round.round_id).filter_by(master_round_id=round.master_round_id)
)).all()
for task in tasks:
self._tasks[step][task.task_id] = ScoreTask(task)
self._tasks[step][task.task_id].num_solutions = num_participants
# Načtení řešení
task_ids = list(self._tasks[step].keys())
sols: List[db.Solution] = sess.query(db.Solution).filter(
db.Solution.user_id.in_(user_id_subq),
db.Solution.task_id.in_(task_ids),
).all()
for sol in sols:
if sol.user_id in self._results:
self._results[sol.user_id]._sols[step][sol.task_id] = sol
if sol.points:
self._tasks[step][sol.task_id].sum_points += sol.points
def _mark_winners(self):
for result in self._results.values():
total_points = result.get_total_points()
result.winner = self.want_winners and total_points >= self.round.score_winner_limit
result.successful = self.want_successful and total_points >= self.round.score_successful_limit
def _load_prev_round(self, step: int) -> bool:
"""Načtení úloh a řešení předchozího kola, pokud takové existuje."""
if step in self._tasks:
return True
sess = db.get_session()
# Zkusíme nalézt kolo o `step` kroků zpět
prev_round = sess.query(db.Round).filter_by(
year=self.round.year, category=self.round.category, seq=self.round.seq - step
).filter(db.Round.master_round_id == db.Round.round_id).one_or_none()
if prev_round is None:
return False
self._prev_rounds[step] = prev_round
if self.contest:
# Pokud tvoříme výsledkovku pro contest, tak nás zajímají jen řešení
# z podoblastí contestu spadajícího pod hlavní
desc_cte = db.place_descendant_cte(self.contest.place, max_level=prev_round.level)
contest_subq = sess.query(db.Contest.master_contest_id).filter(
db.Contest.round == prev_round,
db.Contest.place_id.in_(select([desc_cte]))
)
else:
# Pokud vytváříme výsledkovku pro celé kolo, bereme vše
contest_subq = sess.query(db.Contest.master_contest_id).filter_by(round=prev_round)
self._load_tasks_and_sols(step, prev_round, contest_subq)
return True
def get_tasks(self) -> List[db.Task]:
tasks = []
for task in self._tasks[0].values():
tasks.append(task.task)
return list(sorted(tasks, key=lambda task: task.code))
def _add_message(self, type: str, message: str):
self._messages.append((type, message))
def get_messages(self) -> List[Tuple[str, str]]:
return self._messages
def get_sorted_results(self) -> List[ScoreResult]:
# Vygenerování třídícího klíče podle módu výsledkovky
if self.round.score_mode == db.RoundScoreMode.basic:
# Základní mód - jen podle celkových bodů, se sdílenými místy
for result in self._results.values():
result._order_key = [-result.get_total_points()]
elif self.round.score_mode == db.RoundScoreMode.mo:
self._add_mo_order_key()
else:
assert False
# Kvůli pevnému pořadí při sdílených místech přidáme k finálnímu třídění
# ještě i příjmení, jméno a user_id
results: List[ScoreResult] = sorted(self._results.values(), key=lambda result: (
result._order_key, result.user.last_name, result.user.first_name, result.user.user_id
))
last: Optional[ScoreResult] = None
# Spočítáme pořadí - v prvním kroku prolinkujeme opakující se ScoreOrder na první,
# ve druhém kroku je pak správně rozkopírujeme s nastaveným continuation na True
for result in results:
if last is None:
result.order = ScoreOrder(1)
last = result
elif last._order_key == result._order_key:
result.order = last.order
last.order.span += 1
else:
result.order = ScoreOrder(last.order.place + last.order.span)
last = result
lastOrder: Optional[ScoreOrder] = None
for result in results:
if result.order == lastOrder:
result.order = ScoreOrder(lastOrder.place, lastOrder.span, True)
else:
lastOrder = result.order
return results
def _exists_same_order_key(self) -> bool:
last = None
for result in sorted(self._results.values(), key=lambda result: result._order_key):
if result._order_key == last:
return True
last = result._order_key
return False
def _add_mo_order_key(self):
"""Jednoznačný mód MO, iterujeme po krocích dokud buď nevyrobíme
výsledkovku bez sdílených míst, nebo už neexistuje předchozí kolo,
na které bychom se ještě mohli podívat."""
step = 0
while True:
if self._load_prev_round(step) is False:
self._add_message("info", "I po započítání všech předchozích kol stále existují sdílená místa, řadím podle ročníku")
break
if step != 0:
self._add_message(
"info",
f"Výpočet na úrovni kola {self._prev_rounds[step-1].round_code()} nestačil,"
+ f" započítávám body z kola {self._prev_rounds[step].round_code()}"
)
tasks_by_difficulty = sorted(
self._tasks[step].keys(),
key=lambda task_id: (self._tasks[step][task_id].get_difficulty(), self._tasks[step][task_id].task.code),
)
last_task: ScoreTask = None
last_difficulty: Fraction = None
difficulty_report = []
for task_id in tasks_by_difficulty:
task = self._tasks[step][task_id]
difficulty = task.get_difficulty()
if last_task is not None and difficulty == last_difficulty and difficulty != 0:
self._add_message(
"warning",
f"Úlohy {last_task.task.code} a {task.task.code} mají stejnou vypočtenou obtížnost "
+ f" {difficulty}, pro výpočet obtížnosti je řadím podle kódu úlohy"
)
difficulty_report.append(f"{task.task.code} ({round(float(difficulty), 2)} b)")
last_task, last_difficulty = task, difficulty
self._add_message(
"info",
f"Průměrné body úloh kola {self._prev_rounds[step].round_code()} od nejobtížnější: "
+ ", ".join(difficulty_report)
)
for result in self._results.values():
sol_points = {}
for task_id in self._tasks[step].keys():
sol_points[task_id] = 0
for sol in result._sols[0].values():
if sol.points:
sol_points[sol.task_id] = -sol.points # sestupné třídění
total_points = sum(sol_points.values())
points_from_max = list(sorted(sol_points.values()))
points_from_difficult = [sol_points[task_id] for task_id in tasks_by_difficulty]
if result.successful or not self.want_successful:
# Primárně podle počtu získaných bodů, sekundárně podle bodů od maxima, terciárně podle bodů od nejobtížnější
result._order_key.extend((total_points, points_from_max, points_from_difficult))
else:
# Neúspěšné řešitele třídíme podle počtu získaných bodů, sekundárně podle jména
result._order_key.extend((total_points, result.user.name_sort_key()))
# Otestujeme, jestli teď existují sdílená místa
if not self._exists_same_order_key():
break
# Pokud jsme našli stejný klíč, opakujeme cyklus s minulým kolem
step += 1
# Na konec třídícího klíče přidáme ročník (menší ročník první)
for result in self._results.values():
grade = normalize_grade(result.pant.grade)
if grade == -1:
self._add_message(
"warning",
f"Účastník {result.user.first_name} {result.user.last_name} má neplatný ročník {result.pant.grade}"
)
result._order_key.append(grade)
if self._exists_same_order_key():
self._add_message(
"error",
"I po započítání všech úloh (včetně minulých kol) a ročníků účastníků existují sdílená místa. Je potřeba určit pořadí losem"
)
# Další kontroly
# Pokud kontrolujeme výsledkovku pro celé kolo
if self.contest is None:
winners = 0
successfulls = 0
participants = len(self._results)
for result in self._results.values():
if result.winner:
winners += 1
if result.successful:
successfulls += 1
if successfulls > participants // 2:
self._add_message(
"error",
f"Počet úspěšných řešitelů ({successfulls}) převyšuje polovinu celkového počtu účastníků ({participants})"
)
if winners > successfulls // 2:
self._add_message(
"error",
f"Počet vítězů ({winners}) převyšuje polovinu počtu úspěšných řešitelů ({successfulls})"
)