diff --git a/prog/communication.py b/prog/communication.py index d67e30f66e6fc26672a5c0c196fb09fb5d5ea1a4..da2cab994d6e606a795d637253fa7f0aa21e864d 100644 --- a/prog/communication.py +++ b/prog/communication.py @@ -229,6 +229,7 @@ def dt_intersect(dt_from: datetime.datetime, dt_to: datetime.datetime, dt: datet class MainServer(FuncCaller): _download_server = None + _preprocess_server = None async def _get_download_server(self): if self._download_server is None: @@ -237,6 +238,13 @@ class MainServer(FuncCaller): self._download_server = download_server.DownloadServer(s) return self._download_server + async def _get_preprocess_server(self): + if self._preprocess_server is None: + import preprocess + s = await UnixSocket().connect("sockets/preprocess_server") + self._preprocess_server = preprocess.PreprocessServer(s) + return self._preprocess_server + async def _tree_walker(self, condition, worker, reverse=False): d = "data/realtime" for d_Y_m_d in sorted(os.listdir(d), reverse=reverse): @@ -327,12 +335,14 @@ class MainServer(FuncCaller): path = "data/realtime_by_route/"+dt.strftime("%Y-%m-%d/%H") out = {} if not os.path.exists(path): - return None, [] + return await (await self._get_preprocess_server()).get_preprocessed_data(dt, dt+datetime.timedelta(hours=1), route_id) + with open(path+"/source_timestamps") as f: + source_timestamps = [ datetime.datetime.fromisoformat(x.strip()) for x in f ] if os.path.exists(path+"/"+route_id+".json.zst"): with open(path+"/"+route_id+".json.zst", "rb") as f: - return f.read(), [] + return f.read(), source_timestamps else: - return '{}', [] + return '{}', source_timestamps @server_exec() async def remove_data(self, dt: datetime.datetime): diff --git a/prog/download_server.py b/prog/download_server.py index 6e94e4c11c587d677c7e268a85eb6d148d2249d8..62d7dec5ded10cde638e2b8ae16ea2018f29ce1a 100755 --- a/prog/download_server.py +++ b/prog/download_server.py @@ -37,11 +37,11 @@ async def download_now(dt): 'curl', '-X', 'GET', "https://api.golemio.cz/v2/vehiclepositions?limit=10000&offset=0&includeNotTracking=true&preferredTimezone=Europe/Prague", '-H', 'accept: application/json', - '-H', "Accept-Encoding: identity", + #'-H', "Accept-Encoding: identity", '-H', f"X-Access-Token: {golemio_key}", - '--silent', '--fail' + '--silent', '--fail', '--compressed' ] - print(" ".join(cmd)) + eprint(" ".join(cmd)) proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE) data, _ = await proc.communicate() assert proc.returncode == 0, proc.returncode @@ -112,7 +112,7 @@ class DownloadServer(communication.FuncCaller): async def client_connected(reader, writer): - await DownloadServer(await communication.AsincioStreamSocket().connect(reader, writer), is_server=True)._server_.run() + await catch_all(DownloadServer(await communication.AsincioStreamSocket().connect(reader, writer), is_server=True)._server_.run()) async def main(): global caio_ctx diff --git a/prog/gtfs.py b/prog/gtfs.py index fe1619c969bab4fd058319b22282aea830371195..595ae3059f2d46aee20316c532121667c8ec9bd0 100644 --- a/prog/gtfs.py +++ b/prog/gtfs.py @@ -8,6 +8,8 @@ import weakref local_timezone = datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo +_shape_loading_from_disk_count = 0 +_shape_loading_from_cache_count = 0 @dataclass class Route: @@ -119,6 +121,7 @@ class GtfsDay(): } async def get_shape_for_trip_id(self, trip_id): + global _shape_loading_from_disk_count, _shape_loading_from_cache_count await self.load_trips() if trip_id not in self.trips: eprint(f"get_shape_for_trip_id failed: no such trip_id {trip_id}") @@ -126,14 +129,14 @@ class GtfsDay(): shape_id = self.trips[trip_id].shape_id shape = self._shape_cache.get(shape_id, None) if shape is None: - eprint(f"Loading shape {shape_id} (without cache), size of cache {len(self._shape_cache)}") + _shape_loading_from_disk_count += 1 d = await self.get_file("shape_by_id/"+shape_id) self._shape_cache[shape_id] = shape = np.array([ [float(x["shape_pt_lat"]), float(x["shape_pt_lon"]), float(x["shape_dist_traveled"])] for x in d ]) else: - eprint(f"Loading shape {shape_id} from cache, size of cache {len(self._shape_cache)}") + _shape_loading_from_cache_count += 1 return shape def parse_time(self, val): diff --git a/prog/preprocess.py b/prog/preprocess.py index 824e050823edca51b5fefe4c65678a722f56a3b0..fb0419064a8eaa659b2652fb3d92f38281e7405a 100755 --- a/prog/preprocess.py +++ b/prog/preprocess.py @@ -25,8 +25,10 @@ import gc async def clean_data_until(ths, dt, loaded_timestamps): - print("START CLEANUP UNTIL", dt) - loaded_timestamps = [x for x in loaded_timestamps if x >= dt] + eprint("START CLEANUP UNTIL", dt) + y = [x for x in loaded_timestamps if x >= dt] + loaded_timestamps.clear() + loaded_timestamps += y for i, trip_id in enumerate([x for x in ths]): if i%100 == 0: await asyncio.sleep(1) @@ -35,24 +37,54 @@ async def clean_data_until(ths, dt, loaded_timestamps): th = ths[trip_id] th.history = [hp for hp in th.history if hp.last_captured >= dt] if not th.history: - print(f"REMOVING TRIP {trip_id}") + eprint(f"REMOVING TRIP {trip_id}") del ths[trip_id] await asyncio.sleep(1) - print("DONE CLEANUP UNTIL", dt) - print(gc.get_stats()) + eprint("DONE CLEANUP UNTIL", dt) + eprint(gc.get_stats()) gc.collect() - print(gc.get_stats()) - print("DONE GC", dt) + eprint(gc.get_stats()) + eprint("DONE GC", dt) + +def serialize_data(data, dt_from, dt_to): + out = {} + for th in data: + history = [ + { + "lat": hp.lat, + "lon": hp.lon, + "state_position": hp.state_position, + "first_captured": hp.first_captured.isoformat(), + "last_captured": hp.last_captured.isoformat(), + "origin_timestamp": hp.origin_timestamp.isoformat(), + "openapi_shape_dist_traveled": hp.openapi_shape_dist_traveled, + "openapi_next_stop": hp.openapi_next_stop, + "openapi_last_stop": hp.openapi_last_stop, + "openapi_delay": hp.openapi_delay, + "shape_point": hp.shape_point, + "shape_point_dist_traveled": hp.shape_point_dist_traveled, + } + for hp in th.history + if dt_from <= hp.last_captured and hp.first_captured < dt_to + ] + if history: + out[th.trip.trip_id] = { + 'history': history, + 'trip_id': th.trip.trip_id, + 'trip': th.trip_json + + } + return out async def write_data(ths, dt, loaded_timestamps): assert dt.timestamp() % 3600 == 0 - print("PREPARE OUTPUT", dt, Path('data/realtime_by_route')/dt.strftime('%Y-%m-%d')/dt.strftime('%H')) + eprint("PREPARE OUTPUT", dt, Path('data/realtime_by_route')/dt.strftime('%Y-%m-%d')/dt.strftime('%H')) source_timestamps = [x for x in loaded_timestamps if dt <= x and x < dt+datetime.timedelta(hours=1)] if len(source_timestamps) < 300: - print(f"!!! OUTPUT FAILED there is only {len(source_timestamps)} source timestamps!!!") + eprint(f"!!! OUTPUT FAILED there is only {len(source_timestamps)} source timestamps!!!") await clean_data_until(ths, dt, loaded_timestamps) return @@ -64,38 +96,12 @@ async def write_data(ths, dt, loaded_timestamps): data_by_route[key].append(th) tmp_dir = tempfile.mkdtemp(dir="tmp") - print("START OUTPUT", dt, tmp_dir, Path('data/realtime_by_route')/dt.strftime('%Y-%m-%d')/dt.strftime('%H')) + eprint("START OUTPUT", dt, tmp_dir, Path('data/realtime_by_route')/dt.strftime('%Y-%m-%d')/dt.strftime('%H')) for route_id, data in data_by_route.items(): - print(f"OUTPUT {dt} {route_id}") - out = {} - for th in data: - history = [ - { - "lat": hp.lat, - "lon": hp.lon, - "state_position": hp.state_position, - "first_captured": hp.first_captured.isoformat(), - "last_captured": hp.last_captured.isoformat(), - "origin_timestamp": hp.origin_timestamp.isoformat(), - "openapi_shape_dist_traveled": hp.openapi_shape_dist_traveled, - "openapi_next_stop": hp.openapi_next_stop, - "openapi_last_stop": hp.openapi_last_stop, - "openapi_delay": hp.openapi_delay, - "shape_point": hp.shape_point, - "shape_point_dist_traveled": hp.shape_point_dist_traveled, - } - for hp in th.history - if dt <= hp.last_captured and hp.first_captured < dt+datetime.timedelta(hours=1) - ] - if history: - out[th.trip.trip_id] = { - 'history': history, - 'trip_id': th.trip.trip_id, - 'trip': th.trip_json - - } + eprint(f"OUTPUT {dt} {route_id}") + out = serialize_data(data, dt, dt+datetime.timedelta(hours=1)) if out: with open(Path(tmp_dir)/(route_id+".json.zst"), "xb") as f: proc = await asyncio.create_subprocess_exec("zstd", "-8", stdout=f, stdin=asyncio.subprocess.PIPE) @@ -114,20 +120,21 @@ async def write_data(ths, dt, loaded_timestamps): shutil.rmtree(out_path) os.rename(tmp_dir, out_path) - print("DONE OUTPUT", dt, tmp_dir, Path('data/realtime_by_route')/dt.strftime('%Y-%m-%d')/dt.strftime('%H')) + eprint("DONE OUTPUT", dt, tmp_dir, Path('data/realtime_by_route')/dt.strftime('%Y-%m-%d')/dt.strftime('%H')) + await asyncio.sleep(100) await clean_data_until(ths, dt, loaded_timestamps) async def reload_gtfs(dt): - print("RELOAD GTFS") + eprint("RELOAD GTFS") d1 = (dt-datetime.timedelta(hours=4)).date() d2 = (dt+datetime.timedelta(hours=4)).date() - for x in gtfs.for_date_cache: + for x in [x for x in gtfs.for_date_cache]: if x != d1 and x != d2: del gtfs.for_date_cache[x] await gtfs.for_date(d1).load_stops_for_all_trips() await gtfs.for_date(d2).load_stops_for_all_trips() - print("RELOAD GTFS DONE") + eprint("RELOAD GTFS DONE") def last_done(): @@ -137,22 +144,44 @@ def last_done(): for x2 in sorted(os.listdir(p2), reverse=True): return path_to_dt(x1+"-"+x2) +class PreprocessServer(communication.FuncCaller): + @communication.server_exec() + async def get_preprocessed_data(self, dt_from: datetime.datetime, dt_to: datetime.datetime, route: str): + eprint("get_preprocessed_data", dt_from, dt_to, route) + source_timestamps = [x for x in self.loaded_timestamps if dt_from <= x and x < dt_to] + out = serialize_data([th for th in self.ths.values() if th.trip.trip_id.split("_")[0] == route], dt_from, dt_to) + proc = await asyncio.create_subprocess_exec("zstd", "-5", stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE) + stdout, stderr = await proc.communicate(json.dumps(out).encode("utf-8")) + eprint("done get_preprocessed_data", dt_from, dt_to, route) + return stdout, source_timestamps async def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument("--dt-from") parser.add_argument("--dt-to") + parser.add_argument("--socket", default="sockets/preprocess_server") args = parser.parse_args() + ths = {} + loaded_timestamps = [] + + async def client_connected(reader, writer): + s = PreprocessServer(await communication.AsincioStreamSocket().connect(reader, writer), is_server=True) + s.ths = ths + s.loaded_timestamps = loaded_timestamps + await catch_all(s._server_.run()) + + + if args.socket != "-": + await asyncio.start_unix_server(client_connected, path=args.socket, start_serving=True) + c = await get_communication() dt_from = communication.path_to_dt(args.dt_from) if args.dt_from else last_done() dt_to = communication.path_to_dt(args.dt_to) if args.dt_to else None # fetch last hour again for better shape matching - ths = {} - loaded_timestamps = [] last_flush_dt = dt_from @@ -168,7 +197,7 @@ async def main(): r = await c.wait_next_data(dt) if r is None: - print("Get next return", r) + eprint("Get next return", r) await asyncio.sleep(10) continue @@ -176,7 +205,7 @@ async def main(): data = await unzip_parse(r[1]) - print("PROCESSING", dt) + eprint("PROCESSING", dt) data_utils._shape_matching_time = 0 loaded_timestamps.append(dt) _start_time = time.time() @@ -184,13 +213,15 @@ async def main(): for dato in data["features"]: trip_id = dato["properties"]["trip"]["gtfs"]["trip_id"] if trip_id in used_trip_ids: - print("duplicate trip ID", trip_id) + eprint("duplicate trip ID", trip_id, flush=False) used_trip_ids.add(trip_id) if trip_id not in ths: ths[trip_id] = TripHistory(Trip(trip_id, datetime.datetime.fromisoformat(dato["properties"]["trip"]["start_timestamp"]).date())) await ths[trip_id].load_gtfs_shape() ths[trip_id].add_history_point(dt, dato, save_json=False) - print("TIME", time.time() - _start_time, "SHAPE MATCHING", data_utils._shape_matching_time) + eprint("TIME", time.time() - _start_time, "SHAPE MATCHING", data_utils._shape_matching_time, f"\nSHAPE LOADING: {gtfs._shape_loading_from_cache_count} from disk {gtfs._shape_loading_from_disk_count} from cache\n shape chace sizes { {date.strftime('%Y-%m-%d') :len(g._shape_cache) for date, g in gtfs.for_date_cache.items()} }") + gtfs._shape_loading_from_disk_count = 0 + gtfs._shape_loading_from_cache_count = 0 if last_flush_dt+datetime.timedelta(hours=2) <= dt: last_flush_dt = last_flush_dt + datetime.timedelta(hours=1) @@ -202,15 +233,17 @@ async def main(): -local_main_server = communication.MainServer(is_server=True) -async def get_communication(): - return local_main_server -async def gtfs_default_data_getter(date, filename): - c = await get_communication() - return await c.gtfs_get_file(date, filename) -gtfs.default_data_getter = gtfs_default_data_getter -data_utils.get_communication = get_communication +if __name__ == "__main__": + local_main_server = communication.MainServer(is_server=True) + async def get_communication(): + return local_main_server + + async def gtfs_default_data_getter(date, filename): + c = await get_communication() + return await c.gtfs_get_file(date, filename) -asyncio.run(main()) + gtfs.default_data_getter = gtfs_default_data_getter + data_utils.get_communication = get_communication + asyncio.run(main()) diff --git a/prog/utils.py b/prog/utils.py index 5c520d435f22f9025cd7e9aa6e1e7b36e2b3effb..98a1393722285113762837b15101a3626a016065 100644 --- a/prog/utils.py +++ b/prog/utils.py @@ -3,8 +3,8 @@ import traceback import datetime from typing import Optional -def eprint(*args): - print(*args, file=sys.stderr, flush=True) +def eprint(*args, flush=True): + print(*args, file=sys.stderr, flush=flush) async def catch_all(corutine, exitcode=None):