#!/usr/bin/env pypy3

import asyncio
import datetime
import sys, os
import pathlib
import json
import time
import pprint
import tempfile
from pathlib import Path

import communication
from utils import *

caio_ctx = None

golemio_key = open("golemio_key").read().strip()

futures_next_data = []
futures_next_data_compressed = []
last_data = []

def announce_data(futures, dt, data):
    futures_copy = [f for f in futures]
    futures.clear()
    for f in futures_copy:
        f.set_result((dt, data))




async def download_now(dt):
    eprint("begin", dt)
    cmd = [
        'curl', '-X', 'GET',
        "https://api.golemio.cz/v2/vehiclepositions?limit=10000&offset=0&includeNotTracking=true&preferredTimezone=Europe/Prague",
        '-H', 'accept: application/json',
        #'-H', "Accept-Encoding: identity",
        '-H', f"X-Access-Token: {golemio_key}",
        '--silent', '--fail',  '--compressed'
    ]
    eprint(" ".join(cmd))
    proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE)
    data, _ = await proc.communicate()
    assert proc.returncode == 0, proc.returncode

    eprint("downloaded", dt)

    if time.time() - dt.timestamp() > 8:
        raise RuntimeError(f"TOO LATE {dt}")

    if not len(last_data) or last_data[-1][0] < dt:
        announce_data(futures_next_data, dt, data)

    proc = await asyncio.create_subprocess_exec("zstd", "-8", stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
    compressed_data, _ = await proc.communicate(data)
    assert proc.returncode == 0, proc.returncode

    if not len(last_data) or last_data[-1][0] < dt:
        last_data.append((dt, data))
        while len(last_data) > 10:
            last_data.pop(0)

        announce_data(futures_next_data, dt, data)
        announce_data(futures_next_data_compressed, dt, compressed_data)

    tmp_file = Path("tmp")/dt.strftime("realtime-%Y-%m-%d--%H-%M-%S.json.zst")
    with open(tmp_file, "xb") as fp:
        fd = fp.fileno()
        await caio_ctx.write(compressed_data, fd, offset=0)

    file_path = [ "data/realtime", dt.strftime('%Y-%m-%d'), dt.strftime('%H'), dt.strftime("%M-%S.json.zst")]

    for i in range(2,4):
        try:
            os.mkdir('/'.join(file_path[:i]))
        except FileExistsError:
            pass

    os.rename(tmp_file, "/".join(file_path))

    eprint("end", dt)

async def wait_data(compressed):
    future = asyncio.get_running_loop().create_future()
    (futures_next_data_compressed if compressed else futures_next_data).append(future)
    return await future

class DownloadServer(communication.FuncCaller):
    @communication.server_exec()
    async def get_last_data(self):
        if len(last_data):
            return last_data[-1]
        else:
            return await wait_data(False)

    @communication.server_exec()
    async def wait_next_data(self, last_dt: datetime.datetime, preferably_compressed: bool = True):
        """
        1 -> out of range of data in memory, ask file system
        """
        if not len(last_data):
            return 1
        if last_data[0][0] > last_dt:
            return 1
        for dt, data in last_data:
            if dt > last_dt:
                return dt, data
        return await wait_data(preferably_compressed)


async def client_connected(reader, writer):
    await catch_all(DownloadServer(await communication.AsyncioStreamSocket().connect(reader, writer), is_server=True)._server_.run())

async def main():
    global caio_ctx
    import caio
    caio_ctx = caio.AsyncioContext(max_requests=128)

    await asyncio.start_unix_server(client_connected, path="sockets/download_server", start_serving=True)


    while True:
        await asyncio.sleep(10 - time.time()%10)
        dt = datetime.datetime.fromtimestamp(time.time()//10*10, local_timezone)
        asyncio.create_task(catch_all(download_now(dt)))
        await asyncio.sleep(5)

if __name__ == "__main__":
    asyncio.run(main())