Skip to content
Snippets Groups Projects
Commit 1a97bb8c authored by Jiří Kalvoda's avatar Jiří Kalvoda
Browse files

PreprocessServer

parent c78bb0f0
No related branches found
No related tags found
No related merge requests found
......@@ -229,6 +229,7 @@ def dt_intersect(dt_from: datetime.datetime, dt_to: datetime.datetime, dt: datet
class MainServer(FuncCaller):
_download_server = None
_preprocess_server = None
async def _get_download_server(self):
if self._download_server is None:
......@@ -237,6 +238,13 @@ class MainServer(FuncCaller):
self._download_server = download_server.DownloadServer(s)
return self._download_server
async def _get_preprocess_server(self):
if self._preprocess_server is None:
import preprocess
s = await UnixSocket().connect("sockets/preprocess_server")
self._preprocess_server = preprocess.PreprocessServer(s)
return self._preprocess_server
async def _tree_walker(self, condition, worker, reverse=False):
d = "data/realtime"
for d_Y_m_d in sorted(os.listdir(d), reverse=reverse):
......@@ -327,12 +335,14 @@ class MainServer(FuncCaller):
path = "data/realtime_by_route/"+dt.strftime("%Y-%m-%d/%H")
out = {}
if not os.path.exists(path):
return None, []
return await (await self._get_preprocess_server()).get_preprocessed_data(dt, dt+datetime.timedelta(hours=1), route_id)
with open(path+"/source_timestamps") as f:
source_timestamps = [ datetime.datetime.fromisoformat(x.strip()) for x in f ]
if os.path.exists(path+"/"+route_id+".json.zst"):
with open(path+"/"+route_id+".json.zst", "rb") as f:
return f.read(), []
return f.read(), source_timestamps
else:
return '{}', []
return '{}', source_timestamps
@server_exec()
async def remove_data(self, dt: datetime.datetime):
......
......@@ -37,11 +37,11 @@ async def download_now(dt):
'curl', '-X', 'GET',
"https://api.golemio.cz/v2/vehiclepositions?limit=10000&offset=0&includeNotTracking=true&preferredTimezone=Europe/Prague",
'-H', 'accept: application/json',
'-H', "Accept-Encoding: identity",
#'-H', "Accept-Encoding: identity",
'-H', f"X-Access-Token: {golemio_key}",
'--silent', '--fail'
'--silent', '--fail', '--compressed'
]
print(" ".join(cmd))
eprint(" ".join(cmd))
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE)
data, _ = await proc.communicate()
assert proc.returncode == 0, proc.returncode
......@@ -112,7 +112,7 @@ class DownloadServer(communication.FuncCaller):
async def client_connected(reader, writer):
await DownloadServer(await communication.AsincioStreamSocket().connect(reader, writer), is_server=True)._server_.run()
await catch_all(DownloadServer(await communication.AsincioStreamSocket().connect(reader, writer), is_server=True)._server_.run())
async def main():
global caio_ctx
......
......@@ -8,6 +8,8 @@ import weakref
local_timezone = datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo
_shape_loading_from_disk_count = 0
_shape_loading_from_cache_count = 0
@dataclass
class Route:
......@@ -119,6 +121,7 @@ class GtfsDay():
}
async def get_shape_for_trip_id(self, trip_id):
global _shape_loading_from_disk_count, _shape_loading_from_cache_count
await self.load_trips()
if trip_id not in self.trips:
eprint(f"get_shape_for_trip_id failed: no such trip_id {trip_id}")
......@@ -126,14 +129,14 @@ class GtfsDay():
shape_id = self.trips[trip_id].shape_id
shape = self._shape_cache.get(shape_id, None)
if shape is None:
eprint(f"Loading shape {shape_id} (without cache), size of cache {len(self._shape_cache)}")
_shape_loading_from_disk_count += 1
d = await self.get_file("shape_by_id/"+shape_id)
self._shape_cache[shape_id] = shape = np.array([
[float(x["shape_pt_lat"]), float(x["shape_pt_lon"]), float(x["shape_dist_traveled"])]
for x in d ])
else:
eprint(f"Loading shape {shape_id} from cache, size of cache {len(self._shape_cache)}")
_shape_loading_from_cache_count += 1
return shape
def parse_time(self, val):
......
......@@ -25,8 +25,10 @@ import gc
async def clean_data_until(ths, dt, loaded_timestamps):
print("START CLEANUP UNTIL", dt)
loaded_timestamps = [x for x in loaded_timestamps if x >= dt]
eprint("START CLEANUP UNTIL", dt)
y = [x for x in loaded_timestamps if x >= dt]
loaded_timestamps.clear()
loaded_timestamps += y
for i, trip_id in enumerate([x for x in ths]):
if i%100 == 0:
await asyncio.sleep(1)
......@@ -35,40 +37,16 @@ async def clean_data_until(ths, dt, loaded_timestamps):
th = ths[trip_id]
th.history = [hp for hp in th.history if hp.last_captured >= dt]
if not th.history:
print(f"REMOVING TRIP {trip_id}")
eprint(f"REMOVING TRIP {trip_id}")
del ths[trip_id]
await asyncio.sleep(1)
print("DONE CLEANUP UNTIL", dt)
print(gc.get_stats())
eprint("DONE CLEANUP UNTIL", dt)
eprint(gc.get_stats())
gc.collect()
print(gc.get_stats())
print("DONE GC", dt)
eprint(gc.get_stats())
eprint("DONE GC", dt)
async def write_data(ths, dt, loaded_timestamps):
assert dt.timestamp() % 3600 == 0
print("PREPARE OUTPUT", dt, Path('data/realtime_by_route')/dt.strftime('%Y-%m-%d')/dt.strftime('%H'))
source_timestamps = [x for x in loaded_timestamps if dt <= x and x < dt+datetime.timedelta(hours=1)]
if len(source_timestamps) < 300:
print(f"!!! OUTPUT FAILED there is only {len(source_timestamps)} source timestamps!!!")
await clean_data_until(ths, dt, loaded_timestamps)
return
data_by_route = {}
for th in ths.values():
key = th.trip.trip_id.split("_")[0]
if key not in data_by_route:
data_by_route[key] = []
data_by_route[key].append(th)
tmp_dir = tempfile.mkdtemp(dir="tmp")
print("START OUTPUT", dt, tmp_dir, Path('data/realtime_by_route')/dt.strftime('%Y-%m-%d')/dt.strftime('%H'))
for route_id, data in data_by_route.items():
print(f"OUTPUT {dt} {route_id}")
def serialize_data(data, dt_from, dt_to):
out = {}
for th in data:
history = [
......@@ -87,7 +65,7 @@ async def write_data(ths, dt, loaded_timestamps):
"shape_point_dist_traveled": hp.shape_point_dist_traveled,
}
for hp in th.history
if dt <= hp.last_captured and hp.first_captured < dt+datetime.timedelta(hours=1)
if dt_from <= hp.last_captured and hp.first_captured < dt_to
]
if history:
out[th.trip.trip_id] = {
......@@ -96,6 +74,34 @@ async def write_data(ths, dt, loaded_timestamps):
'trip': th.trip_json
}
return out
async def write_data(ths, dt, loaded_timestamps):
assert dt.timestamp() % 3600 == 0
eprint("PREPARE OUTPUT", dt, Path('data/realtime_by_route')/dt.strftime('%Y-%m-%d')/dt.strftime('%H'))
source_timestamps = [x for x in loaded_timestamps if dt <= x and x < dt+datetime.timedelta(hours=1)]
if len(source_timestamps) < 300:
eprint(f"!!! OUTPUT FAILED there is only {len(source_timestamps)} source timestamps!!!")
await clean_data_until(ths, dt, loaded_timestamps)
return
data_by_route = {}
for th in ths.values():
key = th.trip.trip_id.split("_")[0]
if key not in data_by_route:
data_by_route[key] = []
data_by_route[key].append(th)
tmp_dir = tempfile.mkdtemp(dir="tmp")
eprint("START OUTPUT", dt, tmp_dir, Path('data/realtime_by_route')/dt.strftime('%Y-%m-%d')/dt.strftime('%H'))
for route_id, data in data_by_route.items():
eprint(f"OUTPUT {dt} {route_id}")
out = serialize_data(data, dt, dt+datetime.timedelta(hours=1))
if out:
with open(Path(tmp_dir)/(route_id+".json.zst"), "xb") as f:
proc = await asyncio.create_subprocess_exec("zstd", "-8", stdout=f, stdin=asyncio.subprocess.PIPE)
......@@ -114,20 +120,21 @@ async def write_data(ths, dt, loaded_timestamps):
shutil.rmtree(out_path)
os.rename(tmp_dir, out_path)
print("DONE OUTPUT", dt, tmp_dir, Path('data/realtime_by_route')/dt.strftime('%Y-%m-%d')/dt.strftime('%H'))
eprint("DONE OUTPUT", dt, tmp_dir, Path('data/realtime_by_route')/dt.strftime('%Y-%m-%d')/dt.strftime('%H'))
await asyncio.sleep(100)
await clean_data_until(ths, dt, loaded_timestamps)
async def reload_gtfs(dt):
print("RELOAD GTFS")
eprint("RELOAD GTFS")
d1 = (dt-datetime.timedelta(hours=4)).date()
d2 = (dt+datetime.timedelta(hours=4)).date()
for x in gtfs.for_date_cache:
for x in [x for x in gtfs.for_date_cache]:
if x != d1 and x != d2:
del gtfs.for_date_cache[x]
await gtfs.for_date(d1).load_stops_for_all_trips()
await gtfs.for_date(d2).load_stops_for_all_trips()
print("RELOAD GTFS DONE")
eprint("RELOAD GTFS DONE")
def last_done():
......@@ -137,22 +144,44 @@ def last_done():
for x2 in sorted(os.listdir(p2), reverse=True):
return path_to_dt(x1+"-"+x2)
class PreprocessServer(communication.FuncCaller):
@communication.server_exec()
async def get_preprocessed_data(self, dt_from: datetime.datetime, dt_to: datetime.datetime, route: str):
eprint("get_preprocessed_data", dt_from, dt_to, route)
source_timestamps = [x for x in self.loaded_timestamps if dt_from <= x and x < dt_to]
out = serialize_data([th for th in self.ths.values() if th.trip.trip_id.split("_")[0] == route], dt_from, dt_to)
proc = await asyncio.create_subprocess_exec("zstd", "-5", stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate(json.dumps(out).encode("utf-8"))
eprint("done get_preprocessed_data", dt_from, dt_to, route)
return stdout, source_timestamps
async def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--dt-from")
parser.add_argument("--dt-to")
parser.add_argument("--socket", default="sockets/preprocess_server")
args = parser.parse_args()
ths = {}
loaded_timestamps = []
async def client_connected(reader, writer):
s = PreprocessServer(await communication.AsincioStreamSocket().connect(reader, writer), is_server=True)
s.ths = ths
s.loaded_timestamps = loaded_timestamps
await catch_all(s._server_.run())
if args.socket != "-":
await asyncio.start_unix_server(client_connected, path=args.socket, start_serving=True)
c = await get_communication()
dt_from = communication.path_to_dt(args.dt_from) if args.dt_from else last_done()
dt_to = communication.path_to_dt(args.dt_to) if args.dt_to else None
# fetch last hour again for better shape matching
ths = {}
loaded_timestamps = []
last_flush_dt = dt_from
......@@ -168,7 +197,7 @@ async def main():
r = await c.wait_next_data(dt)
if r is None:
print("Get next return", r)
eprint("Get next return", r)
await asyncio.sleep(10)
continue
......@@ -176,7 +205,7 @@ async def main():
data = await unzip_parse(r[1])
print("PROCESSING", dt)
eprint("PROCESSING", dt)
data_utils._shape_matching_time = 0
loaded_timestamps.append(dt)
_start_time = time.time()
......@@ -184,13 +213,15 @@ async def main():
for dato in data["features"]:
trip_id = dato["properties"]["trip"]["gtfs"]["trip_id"]
if trip_id in used_trip_ids:
print("duplicate trip ID", trip_id)
eprint("duplicate trip ID", trip_id, flush=False)
used_trip_ids.add(trip_id)
if trip_id not in ths:
ths[trip_id] = TripHistory(Trip(trip_id, datetime.datetime.fromisoformat(dato["properties"]["trip"]["start_timestamp"]).date()))
await ths[trip_id].load_gtfs_shape()
ths[trip_id].add_history_point(dt, dato, save_json=False)
print("TIME", time.time() - _start_time, "SHAPE MATCHING", data_utils._shape_matching_time)
eprint("TIME", time.time() - _start_time, "SHAPE MATCHING", data_utils._shape_matching_time, f"\nSHAPE LOADING: {gtfs._shape_loading_from_cache_count} from disk {gtfs._shape_loading_from_disk_count} from cache\n shape chace sizes { {date.strftime('%Y-%m-%d') :len(g._shape_cache) for date, g in gtfs.for_date_cache.items()} }")
gtfs._shape_loading_from_disk_count = 0
gtfs._shape_loading_from_cache_count = 0
if last_flush_dt+datetime.timedelta(hours=2) <= dt:
last_flush_dt = last_flush_dt + datetime.timedelta(hours=1)
......@@ -202,6 +233,9 @@ async def main():
if __name__ == "__main__":
local_main_server = communication.MainServer(is_server=True)
async def get_communication():
return local_main_server
......@@ -212,5 +246,4 @@ async def gtfs_default_data_getter(date, filename):
gtfs.default_data_getter = gtfs_default_data_getter
data_utils.get_communication = get_communication
asyncio.run(main())
......@@ -3,8 +3,8 @@ import traceback
import datetime
from typing import Optional
def eprint(*args):
print(*args, file=sys.stderr, flush=True)
def eprint(*args, flush=True):
print(*args, file=sys.stderr, flush=flush)
async def catch_all(corutine, exitcode=None):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment