diff --git a/prog/communication.py b/prog/communication.py index 126ed6e10cdcaa110968b23986f1971b0422f908..ea79b6f95fb1d3bd39c226305dc234d9d16a91cc 100644 --- a/prog/communication.py +++ b/prog/communication.py @@ -327,10 +327,15 @@ class MainServer(FuncCaller): @server_exec() async def get_preprocessed_data(self, dt: datetime.datetime, route_id: str): assert all(i.isalnum() for i in route_id) - path = "data/realtime_by_route/"+dt.strftime("%Y-%m-%d/%H")+"/"+route_id + path = "data/realtime_by_route/"+dt.strftime("%Y-%m-%d/%H") out = {} - with open(path, "rb") as f: - return f.read() + if not os.path.exists(path): + return None, [] + if os.path.exists(path+"/"+route_id+".json.zst"): + with open(path+"/"+route_id+".json.zst", "rb") as f: + return f.read(), [] + else: + return '{}', [] @server_exec() async def remove_data(self, dt: datetime.datetime): @@ -360,6 +365,35 @@ class MainServer(FuncCaller): eprint(f"GTFS: {path} no such file") return None + @server_exec() + async def gtfs_get_stop_times(self, dt: datetime.datetime, trip_filter=None, route_filter=None): + path = "data/gtfs/"+date_to_path(dt)+"/stop_times.txt" + try: + with open(path, "rb") as f: + s = f.read() + except FileNotFoundError: + path2 = "data/gtfs/"+date_to_path(dt-datetime.timedelta(days=1))+"/stop_times.txt" + try: + with open(path2, "rb") as f: + s = f.read() + except FileNotFoundError: + eprint(f"GTFS: {path} no such file") + return None + + head, *data, _ = s.split(b'\n') + out_data = [] + trip_filter_encoded = trip_filter.encode("utf-8") if trip_filter else None + route_filter_encoded = route_filter.encode("utf-8") if route_filter else None + for x in data: + y = x.split(b',') + if trip_filter and y[0]!=trip_filter_encoded: + continue + if route_filter and y[0].split(b"_")[0]!=route_filter_encoded: + continue + out_data.append(x) + return head+b'\n'+b'\n'.join(out_data)+b'\n' + + @server_exec() async def get_last_data(self): return await (await self._get_download_server()).get_last_data() diff --git a/prog/data_mover.py b/prog/data_mover.py index ddc3e9df97ebc841a81da6fe23a41666d7964c0c..52d6a06a01289b3bd7e6b84d13b654a7e68639bf 100755 --- a/prog/data_mover.py +++ b/prog/data_mover.py @@ -11,12 +11,12 @@ def only_printeble_acii(s): async def main(): #s = await SSHRunSocket().connect('ssh localhost "/dev/stderr"') #s = await SSHRunSocket().connect('ssh localhost "tee /dev/stderr"') - s = await communication.SSHRunSocket().connect('ssh hluk.fnuk.eu /mnt/jr/prog/run_py server.py') - c = communication.DownloadServer(s) + s = await communication.SSHRunSocket().connect('ssh jiri@hluk.fnuk.eu /mnt/jr/prog/run_py server.py') + c = communication.MainServer(s) #await s.write(b"Ahoj!") for dt in await c.list_realtime_data(datetime.datetime(2023, 1, 1, 0, 0), datetime.datetime(2100, 1, 1, 0, 0)): path = "data/realtime/"+dt_to_path(dt) - if not os.path.isdir(path): + if not (os.path.isdir(path) or os.path.isfile(path+".json.zst") or os.path.isfile(path+".json.gzip")): print("WILL DOWNLOAD", path) pathlib.Path(path).mkdir(parents=True) for fname, data in (await c.get_data(dt)).items(): diff --git a/prog/data_utils.py b/prog/data_utils.py index 858c274d133b51d5e0f6c1240157196701f3e001..c600d49c892bd6bbbf0cdc05e9d3d17c9d1ca4a2 100644 --- a/prog/data_utils.py +++ b/prog/data_utils.py @@ -8,6 +8,8 @@ import datetime import gtfs from utils import * +_shape_matching_time = 0 + async def unzip_parse(data): if type(data) == dict: @@ -62,8 +64,9 @@ class AbstractHistoryPoint: pass class HistoryPoint(AbstractHistoryPoint): - def __init__(self, json, capture_time): - self.json = json + def __init__(self, json, capture_time, save_json=True): + if save_json: + self.json = json self.state_position = json['properties']['last_position']['state_position'] self.openapi_shape_dist_traveled = json['properties']['last_position']['shape_dist_traveled'] self.lon, self.lat = json["geometry"]["coordinates"] @@ -110,7 +113,8 @@ class TripHistory: for dt, tp in tps: self.add_history_point(dt, tp) - def add_history_point(self, dt, json): + def add_history_point(self, dt, json, save_json=True): + global _shape_matching_time if json is not None: if self.trip_json is None: self.trip_json = json["properties"]["trip"] @@ -121,22 +125,18 @@ class TripHistory: # print("---------------------") # pprint.pp(json["properties"]["trip"]) # print("=====================") + lon, lat = json["geometry"]["coordinates"] if ( len(self.history) - and json["geometry"]["coordinates"] == self.history[-1].json["geometry"]["coordinates"] - and json["properties"]["last_position"]["origin_timestamp"] == self.history[-1].json["properties"]["last_position"]["origin_timestamp"] - and json["properties"]["last_position"]["state_position"] == self.history[-1].json["properties"]["last_position"]["state_position"] + and lon == self.history[-1].lon + and lat == self.history[-1].lat + and json["properties"]["last_position"]["origin_timestamp"] == self.history[-1].origin_timestamp + and json["properties"]["last_position"]["state_position"] == self.history[-1].state_position ): - if self.history[-1].json != json and False: - print("Same coordinates but different data:") - pprint.pp(self.history[-1].json) - print("---------------------") - pprint.pp(json) - print("=====================") self.history[-1].last_captured = dt else: - hp = HistoryPoint(json, dt) + hp = HistoryPoint(json, dt, save_json=save_json) if hp.state_position in ['on_track', 'at_stop'] and self.gtfs_shape is not None: if len(self.history): last_shape_point_id = self.history[-1].shape_point @@ -163,6 +163,7 @@ class TripHistory: near_pt = shape_indexer(self.gtfs_shape, i+a) return dist((hp.lat, hp.lon), (near_pt[0], near_pt[1])) + (dist_traveled_mutiplicator*abs(near_pt[2] - last_shape_point[2]) if last_shape_point is not None else 0), i+a + _shape_matching_time -= time.time() if last_shape_point is None: hp.shape_point = min( calc_key(i) for i in range(len(self.gtfs_shape)-1) @@ -186,6 +187,8 @@ class TripHistory: hp.shape_point = opt i = hp.shape_point + _shape_matching_time += time.time() + hp.shape_point_dist_traveled = shape_indexer(self.gtfs_shape, i)[2] else: hp.shape_point = hp.shape_point_dist_traveled = None diff --git a/prog/download_server.py b/prog/download_server.py index 7170eb523c9a1f517dbe7b04f564d5305ef2d373..6e94e4c11c587d677c7e268a85eb6d148d2249d8 100755 --- a/prog/download_server.py +++ b/prog/download_server.py @@ -35,8 +35,9 @@ async def download_now(dt): eprint("begin", dt) cmd = [ 'curl', '-X', 'GET', - "https://api.golemio.cz/v2/vehiclepositions?limit=10000&offset=0&includeNotTracking=true&includePositions=false&preferredTimezone=Europe/Prague", + "https://api.golemio.cz/v2/vehiclepositions?limit=10000&offset=0&includeNotTracking=true&preferredTimezone=Europe/Prague", '-H', 'accept: application/json', + '-H', "Accept-Encoding: identity", '-H', f"X-Access-Token: {golemio_key}", '--silent', '--fail' ] diff --git a/prog/downloader-gtfs.sh b/prog/downloader-gtfs.sh index 0ecf8583573bed3c1cb43ac85350e9a143e6e2e1..033f713a6cc880e88fff45d50921bd5d3978e91b 100755 --- a/prog/downloader-gtfs.sh +++ b/prog/downloader-gtfs.sh @@ -20,6 +20,9 @@ do unzip -d tmp/gtfs tmp/gtfs.zip unzip_code=$? [[ $unzip_code != 0 ]] && echo error unzip fail with code $unzip_code && continue + prog/run_py gtfs_make_shape_by_id.py tmp/gtfs + r=$? + [[ $r != 0 ]] && echo error gtfs_make_shape_by_id fail with code $r && continue cp tmp/gtfs data/gtfs/$ymd -r fi else diff --git a/prog/gtfs.py b/prog/gtfs.py index f5bd69b9f83394c9f233769bbff461cdb2bde00f..fe1619c969bab4fd058319b22282aea830371195 100644 --- a/prog/gtfs.py +++ b/prog/gtfs.py @@ -4,6 +4,7 @@ from dataclasses import dataclass import numpy as np import os, sys from utils import * +import weakref local_timezone = datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo @@ -19,6 +20,7 @@ class Trip: id: int direction: int shape_id: str + services_today: bool @dataclass @@ -53,7 +55,7 @@ class GtfsDay(): self.routes = None self.services_today = None self.stops_for_trip = None - self._shape_cache = {} + self._shape_cache = weakref.WeakValueDictionary() self._file_cahce = {} async def get_file(self, name, cache=False): @@ -97,8 +99,8 @@ class GtfsDay(): id=x["trip_id"], direction=int(x["direction_id"]), shape_id=x["shape_id"], + services_today=self.services_today[x["service_id"]] ) for x in d - if self.services_today[x["service_id"]] } self.trips_by_routes = {} for t in self.trips.values(): @@ -122,13 +124,17 @@ class GtfsDay(): eprint(f"get_shape_for_trip_id failed: no such trip_id {trip_id}") return None shape_id = self.trips[trip_id].shape_id - if shape_id not in self._shape_cache: + shape = self._shape_cache.get(shape_id, None) + if shape is None: + eprint(f"Loading shape {shape_id} (without cache), size of cache {len(self._shape_cache)}") d = await self.get_file("shape_by_id/"+shape_id) - self._shape_cache[shape_id] = np.array([ + self._shape_cache[shape_id] = shape = np.array([ [float(x["shape_pt_lat"]), float(x["shape_pt_lon"]), float(x["shape_dist_traveled"])] for x in d ]) - return self._shape_cache[shape_id] + else: + eprint(f"Loading shape {shape_id} from cache, size of cache {len(self._shape_cache)}") + return shape def parse_time(self, val): h, m, s = map(int, val.split(":")) @@ -138,12 +144,15 @@ class GtfsDay(): await self.load_trips() await self.load_stops() if self.stops_for_trip: + if not trip_id in self.stops_for_trip: + eprint(f"get_stops_for_trip_id failed: no such trip {trip_id} at {self.date}") + return [] return self.stops_for_trip[trip_id] if data is not None: d = list(csv.DictReader(data.decode("utf-8").split("\n"))) else: - eprint("LOADING STOPS FOR", trip_id, "(without cache)") - d = await self.get_file("stop_times.txt", cache=True) + eprint("LOADING STOPS FOR", trip_id, self.date, "(without cache)") + d = await self.get_file("stop_times.txt", cache=False) return [ TripStop( self.stops[x["stop_id"]], self.parse_time(x["arrival_time"]), @@ -152,6 +161,8 @@ class GtfsDay(): ) for x in d if x["trip_id"] == trip_id] async def load_stops_for_all_trips(self): + if self.stops_for_trip: + return await self.load_trips() await self.load_stops() d = await self.get_file("stop_times.txt") diff --git a/prog/preprocess.py b/prog/preprocess.py index 48257e255a3f1766a9fa2c93461579e1b7326468..824e050823edca51b5fefe4c65678a722f56a3b0 100755 --- a/prog/preprocess.py +++ b/prog/preprocess.py @@ -21,12 +21,15 @@ from data_utils import Trip, TripPoint, HistoryPoint, TripHistory, shape_indexer from utils import * +import gc + async def clean_data_until(ths, dt, loaded_timestamps): print("START CLEANUP UNTIL", dt) loaded_timestamps = [x for x in loaded_timestamps if x >= dt] - for trip_id in [x for x in ths]: - await asyncio.sleep(0.001) + for i, trip_id in enumerate([x for x in ths]): + if i%100 == 0: + await asyncio.sleep(1) # Coppy preventing RuntimeError: dictionary changed size during iteration if trip_id in ths: th = ths[trip_id] @@ -34,7 +37,12 @@ async def clean_data_until(ths, dt, loaded_timestamps): if not th.history: print(f"REMOVING TRIP {trip_id}") del ths[trip_id] + await asyncio.sleep(1) print("DONE CLEANUP UNTIL", dt) + print(gc.get_stats()) + gc.collect() + print(gc.get_stats()) + print("DONE GC", dt) async def write_data(ths, dt, loaded_timestamps): assert dt.timestamp() % 3600 == 0 @@ -110,6 +118,16 @@ async def write_data(ths, dt, loaded_timestamps): await clean_data_until(ths, dt, loaded_timestamps) +async def reload_gtfs(dt): + print("RELOAD GTFS") + d1 = (dt-datetime.timedelta(hours=4)).date() + d2 = (dt+datetime.timedelta(hours=4)).date() + for x in gtfs.for_date_cache: + if x != d1 and x != d2: + del gtfs.for_date_cache[x] + await gtfs.for_date(d1).load_stops_for_all_trips() + await gtfs.for_date(d2).load_stops_for_all_trips() + print("RELOAD GTFS DONE") def last_done(): @@ -129,8 +147,8 @@ async def main(): c = await get_communication() - dt_from = communication.date_to_path(args.dt_from) if args.dt_from else last_done() - dt_to = communication.date_to_path(args.dt_from) if args.dt_from else None + dt_from = communication.path_to_dt(args.dt_from) if args.dt_from else last_done() + dt_to = communication.path_to_dt(args.dt_to) if args.dt_to else None # fetch last hour again for better shape matching ths = {} @@ -139,9 +157,14 @@ async def main(): last_flush_dt = dt_from dt = dt_from - await gtfs.for_date(dt-datetime.timedelta(days=1)).load_stops_for_all_trips() - await gtfs.for_date(dt).load_stops_for_all_trips() + + await reload_gtfs(dt) + next_reload_gtfs_dt = dt + datetime.timedelta(hours=1) + while True: + if next_reload_gtfs_dt <= dt: + await reload_gtfs(dt) + next_reload_gtfs_dt = dt + datetime.timedelta(hours=1) r = await c.wait_next_data(dt) if r is None: @@ -154,6 +177,7 @@ async def main(): print("PROCESSING", dt) + data_utils._shape_matching_time = 0 loaded_timestamps.append(dt) _start_time = time.time() used_trip_ids = set() @@ -165,8 +189,8 @@ async def main(): if trip_id not in ths: ths[trip_id] = TripHistory(Trip(trip_id, datetime.datetime.fromisoformat(dato["properties"]["trip"]["start_timestamp"]).date())) await ths[trip_id].load_gtfs_shape() - ths[trip_id].add_history_point(dt, dato) - print("TIME", "TIME", time.time() - _start_time) + ths[trip_id].add_history_point(dt, dato, save_json=False) + print("TIME", time.time() - _start_time, "SHAPE MATCHING", data_utils._shape_matching_time) if last_flush_dt+datetime.timedelta(hours=2) <= dt: last_flush_dt = last_flush_dt + datetime.timedelta(hours=1) diff --git a/prog/run_py b/prog/run_py index 485d4a8fa84e1fef0b8044e01130b06009e72007..e556033529206060c3caf4f96def6fe5a3b62efd 100755 --- a/prog/run_py +++ b/prog/run_py @@ -7,4 +7,4 @@ cd .. . venv/bin/activate -pypy3 prog/$1 ${@:2:} +exec pypy3 prog/$1 ${@:2} diff --git a/systemd/init.sh b/systemd/init.sh index fe146b9ffb1dc71dd57f788123246f0ec25575f0..e8fd315719d62f163d5ded2aaf51e2a8c0abcffd 100755 --- a/systemd/init.sh +++ b/systemd/init.sh @@ -2,5 +2,6 @@ rootdir=$(pwd) -systemd/init-service.sh "jr-download" --user "$rootdir/prog/download.sh" "" "" +systemd/init-service.sh "jr-download" --user "$rootdir/venv/bin/pypy3 $rootdir/prog/download_server.py" "" "" +systemd/init-service.sh "jr-preprocess" --user "$rootdir/venv/bin/pypy3 $rootdir/prog/preprocess.py" "" "" systemd/init-service.sh "jr-download-gtfs" --user "$rootdir/prog/downloader-gtfs.sh" "" ""