Skip to content
Snippets Groups Projects
Commit eae0b3c4 authored by Jiří Setnička's avatar Jiří Setnička
Browse files

Backend modul pro výrobu výsledkových listin

Pro zadaný round (a případně contest) vygeneruje obsah výsledkové
listiny. Vrací výsledky jako ScoreResult objekty obsahující db objekty
db.User, db.Participation, db.Participant a db.Solution pro každou
úlohu + spočítané celkové body, určení vlastností vítěze/úspěšného
řešitele a pořadí.

Pořadí počítá podle nastavení v round. V součastnosti pdporuje dva módy:
* jednoduchá výsledkovka (basic): Nen podle celkových bodů, sdílená místa.
* podle pravidel MO (mo): Zjednoznačnění pořadí podle bodů za jednotlivé
  úlohy (od maxima a podle obtížnosti) a případně podle předchozích kol
  ve stejné kategorii. Generuje warningy, když se vyskytne nestandartní
  situace.

Navenek má objekt Score metody:
* get_tasks() - úlohy utřízené podle kódu
* get_warnings()
* get_sorted_results() - vrací ScoreResult, každý pak má metodu get_sols()

Issue #171
parent 3a9560fd
No related branches found
No related tags found
1 merge request!33Výsledkové listiny - backend modul & pravidla MO
from fractions import Fraction
from sqlalchemy import and_
from sqlalchemy.orm import joinedload
from sqlalchemy.orm.query import Query
from sqlalchemy.sql.expression import select
from typing import Any, List, Tuple, Optional, Dict
import mo.db as db
from mo.util import normalize_grade
class ScoreOrder:
place: int
span: int # u nedělených míst 1, u dělených počet spojených míst (2.-5. -> span 4)
continuation: bool # jestli je pokračováním místa o jedno předcházející
def __init__(self, place: int, span: int = 1, continuation: bool = False):
self.place = place
self.span = span
self.continuation = continuation
class ScoreResult:
user: db.User
pant: db.Participant
pion: db.Participation
order: ScoreOrder
successful: bool
winner: bool
# Řešení jednotlivých kol (pro některá řazení je potřeba znát i výsledky
# z předcházejících kol). První index je krok (0 = toto kolo, 1 = předcházející, ...)
# a druhý index je task_id z db.Solution.
_sols: Dict[int, Dict[int, db.Solution]]
# Třídící klíč je n-tice klíčů podle kterých třídit, všechny klíče tříděny
# vzestupně (pro sestupné třídění podle čísla je potřeba klíč vynásobit -1)
_order_key: List[Any]
def __init__(self, user: db.User, pant: db.Participant, pion: db.Participation):
self.user = user
self.pant = pant
self.pion = pion
self._sols = {}
self.order = 1
self.place_span = 1
self.place_continuation = False
self.winner = False
self.successful = False
self._order_key = []
def get_sols(self) -> List[db.Solution]:
return list(self._sols[0].values())
def get_sols_map(self) -> Dict[int, db.Solution]:
return self._sols[0]
def get_total_points(self) -> int:
sum = 0
for sol in self.get_sols():
if sol.points:
sum += sol.points
return sum
class ScoreTask:
task: db.Task
num_solutions: int
sum_points: int
def __init__(self, task: db.Task):
self.task = task
self.num_solutions = 0
self.sum_points = 0
def get_difficulty(self) -> Fraction:
if self.num_solutions == 0:
return 0
return Fraction(self.sum_points, self.num_solutions)
class Score:
round: db.Round
contest: Optional[db.Contest]
part_states: List[db.PartState]
# Řádky výsledkovky
_results: Dict[int, ScoreResult]
# Úlohy jednotlivých kol (pro některá řazení je potřeba znát i úlohy
# z předcházejících kol. První index je krok (0 = toto kolo, 1 = předcházející, ...)
# a druhý index je task_id z db.Task.
_tasks: Dict[int, Dict[int, ScoreTask]]
# Seznam předcházejících kol indexovaných krokem výpočtu (0 = toto kolo, 1 = předcházející, ...)
_prev_rounds: Dict[int, db.Round]
# Zprávy o tvorbě výsledkovky, dvojice (typ, zprávy) kde typ může být info, warning nebo error
_messages: List[Tuple[str, str]]
def __init__(
self, round: db.Round, contest: Optional[db.Contest] = None,
# Ze kterých stavů chceme výsledkovku počítat
part_states: List[db.PartState] = [db.PartState.registered, db.PartState.invited, db.PartState.present],
):
self.round = round
self.contest = contest
self.part_states = part_states
# Příprava subquery na účastníky
sess = db.get_session()
if contest:
contest_subq = [contest.contest_id]
else:
contest_subq = sess.query(db.Contest.contest_id).filter_by(round=round)
# Načtení účastníků
data: List[Tuple[db.User, db.Participation, db.Participant]] = (
sess.query(db.User, db.Participation, db.Participant)
.select_from(db.Participation)
.join(db.User)
.join(db.Participant, and_(
db.Participant.user_id == db.Participation.user_id,
db.Participant.year == round.year
)
).filter(
db.User.is_test == False,
db.Participation.state.in_(part_states),
db.Participation.contest_id.in_(contest_subq)
).options(
joinedload(db.Participant.school_place),
joinedload(db.Participation.contest).joinedload(db.Contest.place),
).all()
)
self._results = {}
for user, pion, pant in data:
self._results[user.user_id] = ScoreResult(user, pant, pion)
# Načtení úloh a řešení
self._prev_rounds = {0: round}
self._tasks = {}
self._messages = []
self._load_tasks_and_sols(0, round, contest_subq)
self._mark_winners()
def _load_tasks_and_sols(self, step: int, round: db.Round, contest_subq: Query):
"""Obecná funkce na načtení úloh a řešení tohoto nebo předchozího kola"""
if step in self._tasks:
return
sess = db.get_session()
user_id_subq = sess.query(db.Participation.user_id).join(
db.User, db.Participation.user_id == db.User.user_id
).filter(
db.User.is_test == False,
db.Participation.state.in_(self.part_states),
db.Participation.contest_id.in_(contest_subq)
)
# Vyrobení polí
self._tasks[step] = {}
for result in self._results.values():
result._sols[step] = {}
# Spočítání počtu řešitelů
num_participants = db.get_count(user_id_subq)
# Načtení úloh
tasks: List[db.Task] = sess.query(db.Task).filter_by(round=round).all()
for task in tasks:
self._tasks[step][task.task_id] = ScoreTask(task)
self._tasks[step][task.task_id].num_solutions = num_participants
# Načtení řešení
task_ids = list(self._tasks[step].keys())
sols: List[db.Solution] = sess.query(db.Solution).filter(
db.Solution.user_id.in_(user_id_subq),
db.Solution.task_id.in_(task_ids),
).all()
for sol in sols:
if sol.user_id in self._results:
self._results[sol.user_id]._sols[step][sol.task_id] = sol
if sol.points:
self._tasks[step][sol.task_id].sum_points += sol.points
def _mark_winners(self):
for result in self._results.values():
total_points = result.get_total_points()
result.winner = (
self.round.score_winner_limit is not None
and total_points >= self.round.score_winner_limit
)
result.successful = (
self.round.score_successful_limit is not None
and total_points >= self.round.score_successful_limit
)
def _load_prev_round(self, step: int) -> bool:
"""Načtení úloh a řešení předchozího kola, pokud takové existuje."""
if step in self._tasks:
return True
sess = db.get_session()
# Zkusíme nalézt kolo o `step` kroků zpět
prev_round = sess.query(db.Round).filter_by(
year=self.round.year, category=self.round.category, seq=self.round.seq - step
).one_or_none()
if prev_round is None:
return False
self._prev_rounds[step] = prev_round
if self.contest:
# Pokud tvoříme výsledkovku pro contest, tak nás zajímají jen řešení
# z podoblastí contestu spadajícího pod hlavní
desc_cte = db.place_descendant_cte(self.contest.place, max_level=prev_round.level)
contest_subq = sess.query(db.Contest.contest_id).filter(
db.Contest.round == prev_round,
db.Contest.place_id.in_(select([desc_cte]))
)
else:
# Pokud vytváříme výsledkovku pro celé kolo, bereme vše
contest_subq = sess.query(db.Contest.contest_id).filter_by(round=prev_round)
self._load_tasks_and_sols(step, prev_round, contest_subq)
return True
def get_tasks(self) -> List[db.Task]:
tasks = []
for task in self._tasks[0].values():
tasks.append(task.task)
return list(sorted(tasks, key=lambda task: task.code))
def _add_message(self, type: str, message: str):
self._messages.append((type, message))
def get_messages(self) -> List[Tuple[str, str]]:
return self._messages
def get_sorted_results(self) -> List[ScoreResult]:
# Vygenerování třídícího klíče podle módu výsledkovky
if self.round.score_mode == db.RoundScoreMode.basic:
# Základní mód - jen podle celkových bodů, se sdílenými místy
for result in self._results.values():
result._order_key = [-result.get_total_points()]
elif self.round.score_mode == db.RoundScoreMode.mo:
self._add_mo_order_key()
else:
assert False
# Kvůli pevnému pořadí při sdílených místech přidáme k finálnímu třídění
# ještě i příjmení, jméno a user_id
results: List[ScoreResult] = sorted(self._results.values(), key=lambda result: (
result._order_key, result.user.last_name, result.user.first_name, result.user.user_id
))
last: ScoreResult = None
# Spočítáme pořadí - v prvním kroku prolinkujeme opakující se ScoreOrder na první,
# ve druhém kroku je pak správně rozkopírujeme s nastaveným continuation na True
for result in results:
if last is None:
result.order = ScoreOrder(1)
last = result
elif last._order_key == result._order_key:
result.order = last.order
last.order.span += 1
else:
result.order = ScoreOrder(last.order.place + last.order.span)
last = result
lastOrder: ScoreOrder = None
for result in results:
if result.order == lastOrder:
result.order = ScoreOrder(lastOrder.place, lastOrder.span, True)
else:
lastOrder = result.order
return results
def _exists_same_order_key(self) -> bool:
last = None
for result in sorted(self._results.values(), key=lambda result: result._order_key):
if result._order_key == last:
return True
last = result._order_key
return False
def _add_mo_order_key(self):
"""Jednoznačný mód MO, iterujeme po krocích dokud buď nevyrobíme
výsledkovku bez sdílených míst, nebo už neexistuje předchozí kolo,
na které bychom se ještě mohli podívat."""
step = 0
while True:
if self._load_prev_round(step) is False:
self._add_message("info", "I po započítání všech předchozích kol stále existují sdílená místa, řadím podle ročníku")
break
if step != 0:
self._add_message(
"info",
f"Výpočet na úrovni kola {self._prev_rounds[step-1].round_code()} nestačil,"
+ f" započítávám body z kola {self._prev_rounds[step].round_code()}"
)
tasks_by_difficulty = sorted(
self._tasks[step].keys(),
key=lambda task_id: (self._tasks[step][task_id].get_difficulty(), self._tasks[step][task_id].task.code),
)
last_task: ScoreTask = None
last_difficulty: Fraction = None
difficulty_report = []
for task_id in tasks_by_difficulty:
task = self._tasks[step][task_id]
difficulty = task.get_difficulty()
if last_task is not None and difficulty == last_difficulty and difficulty != 0:
self._add_message(
"warning",
f"Úlohy {last_task.task.code} a {task.task.code} mají stejnou vypočtenou obtížnost "
+ f" {difficulty}, pro výpočet obtížnosti je řadím podle kódu úlohy"
)
difficulty_report.append(f"{task.task.code} ({round(float(difficulty), 2)} b)")
last_task, last_difficulty = task, difficulty
self._add_message(
"info",
f"Průměrné body úloh kola {self._prev_rounds[step].round_code()} od nejobtížnější: "
+ ", ".join(difficulty_report)
)
for result in self._results.values():
sol_points = {}
for task_id in self._tasks[step].keys():
sol_points[task_id] = 0
for sol in result._sols[0].values():
if sol.points:
sol_points[sol.task_id] = -sol.points # sestupné třídění
total_points = sum(sol_points.values())
points_from_max = list(sorted(sol_points.values()))
points_from_difficult = [sol_points[task_id] for task_id in tasks_by_difficulty]
# Primárně podle počtu získaných bodů, sekundárně podle bodů od maxima, terciárně podle bodů od nejobtížnější
result._order_key.extend((total_points, points_from_max, points_from_difficult))
# Otestujeme, jestli teď existují sdílená místa
if not self._exists_same_order_key():
break
# Pokud jsme našli stejný klíč, opakujeme cyklus s minulým kolem
step += 1
# Na konec třídícího klíče přidáme ročník (menší ročník první)
for result in self._results.values():
grade = normalize_grade(result.pant.grade)
if grade == -1:
self._add_message(
"warning",
f"Účastník {result.user.first_name} {result.user.last_name} má neplatný ročník {result.pant.grade}"
)
result._order_key.append(grade)
if self._exists_same_order_key():
self._add_message(
"error",
"I po započítání všech úloh (včetně minulých kol) a ročníků účastníků existují sdílená místa. Je potřeba určit pořadí losem"
)
# Další kontroly
# Pokud kontrolujeme výsledkovku pro celé kolo
if self.contest is None:
winners = 0
successfulls = 0
participants = len(self._results)
for result in self._results.values():
if result.winner:
winners += 1
if result.successful:
successfulls += 1
if successfulls > participants/2:
self._add_message(
"error",
f"Počet úspěšných řešitelů ({successfulls}) převyšuje polovinu celkového počtu účastníků ({participants})"
)
if winners > successfulls/2:
self._add_message(
"error",
f"Počet vítězů ({winners}) převyšuje polovinu počtu úspěšných řešitelů ({successfulls})"
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment