#!/usr/bin/env pypy3 import asyncio import datetime import sys, os import pathlib import json import time import pprint import tempfile from pathlib import Path import communication from utils import * caio_ctx = None golemio_key = open("golemio_key").read().strip() futures_next_data = [] futures_next_data_compressed = [] last_data = [] def announce_data(futures, dt, data): futures_copy = [f for f in futures] futures.clear() for f in futures_copy: f.set_result((dt, data)) async def download_now(dt): eprint("begin", dt) cmd = [ 'curl', '-X', 'GET', "https://api.golemio.cz/v2/vehiclepositions?limit=10000&offset=0&includeNotTracking=true&preferredTimezone=Europe/Prague", '-H', 'accept: application/json', #'-H', "Accept-Encoding: identity", '-H', f"X-Access-Token: {golemio_key}", '--silent', '--fail', '--compressed' ] eprint(" ".join(cmd)) proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE) data, _ = await proc.communicate() assert proc.returncode == 0, proc.returncode eprint("downloaded", dt) if time.time() - dt.timestamp() > 8: raise RuntimeError(f"TOO LATE {dt}") if not len(last_data) or last_data[-1][0] < dt: announce_data(futures_next_data, dt, data) proc = await asyncio.create_subprocess_exec("zstd", "-8", stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) compressed_data, _ = await proc.communicate(data) assert proc.returncode == 0, proc.returncode if not len(last_data) or last_data[-1][0] < dt: last_data.append((dt, data)) while len(last_data) > 10: last_data.pop(0) announce_data(futures_next_data, dt, data) announce_data(futures_next_data_compressed, dt, compressed_data) tmp_file = Path("tmp")/dt.strftime("realtime-%Y-%m-%d--%H-%M-%S.json.zst") with open(tmp_file, "xb") as fp: fd = fp.fileno() await caio_ctx.write(compressed_data, fd, offset=0) file_path = [ "data/realtime", dt.strftime('%Y-%m-%d'), dt.strftime('%H'), dt.strftime("%M-%S.json.zst")] for i in range(2,4): try: os.mkdir('/'.join(file_path[:i])) except FileExistsError: pass os.rename(tmp_file, "/".join(file_path)) eprint("end", dt) async def wait_data(compressed): future = asyncio.get_running_loop().create_future() (futures_next_data_compressed if compressed else futures_next_data).append(future) return await future class DownloadServer(communication.FuncCaller): @communication.server_exec() async def get_last_data(self): if len(last_data): return last_data[-1] else: return await wait_data(False) @communication.server_exec() async def wait_next_data(self, last_dt: datetime.datetime, preferably_compressed: bool = True): """ 1 -> out of range of data in memory, ask file system """ if not len(last_data): return 1 if last_data[0][0] > last_dt: return 1 for dt, data in last_data: if dt > last_dt: return dt, data return await wait_data(preferably_compressed) async def client_connected(reader, writer): await catch_all(DownloadServer(await communication.AsyncioStreamSocket().connect(reader, writer), is_server=True)._server_.run()) async def main(): global caio_ctx import caio caio_ctx = caio.AsyncioContext(max_requests=128) await asyncio.start_unix_server(client_connected, path="sockets/download_server", start_serving=True) while True: await asyncio.sleep(10 - time.time()%10) dt = datetime.datetime.fromtimestamp(time.time()//10*10, local_timezone) asyncio.create_task(catch_all(download_now(dt))) await asyncio.sleep(5) if __name__ == "__main__": asyncio.run(main())