Skip to content
Snippets Groups Projects
Commit 0a337fe4 authored by Jiří Kalvoda's avatar Jiří Kalvoda
Browse files

Fetching of preprocessed data

parent 84d70c13
No related branches found
No related tags found
No related merge requests found
......@@ -8,6 +8,22 @@ import datetime
import gtfs
async def unzip_parse(data):
print(type(data))
if type(data) == dict:
assert len(data) == 1
data = list(data.values())[0]
print(type(data))
if type(data) == bytes and data.startswith(b'\x1f\x8b'):
proc = await asyncio.create_subprocess_exec("gunzip", stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
data, stderr = await proc.communicate(data)
if type(data) == bytes and data.startswith(b'(\xb5/\xfd'):
proc = await asyncio.create_subprocess_exec("unzstd", stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
data, stderr = await proc.communicate(data)
print(type(data))
return json.loads(data)
def dist(a, b):
assert 40 < a[0] and a[0] < 60, a[0]
assert 5 < a[1] and a[1] < 25, a[1]
......@@ -32,14 +48,10 @@ async def get_data_of_trip(trip_id, date_from, date_to):
for dt in dts:
tc = None
print("GET", dt)
for fname, data in (await c.get_data(dt)).items():
proc = await asyncio.create_subprocess_exec("gunzip", stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate(data)
data = json.loads(stdout)
data = await unzip_parse(await c.get_data(dt))
for dato in data["features"]:
if dato["properties"]["trip"]["gtfs"]["trip_id"] == trip_id:
tc = dato
break
out.append((dt, tc))
return out
......@@ -48,7 +60,10 @@ class Trip:
self.trip_id = trip_id
self.date = date
class HistoryPoint:
class AbstractHistoryPoint:
pass
class HistoryPoint(AbstractHistoryPoint):
def __init__(self, json, capture_time):
self.json = json
self.state_position = json['properties']['last_position']['state_position']
......@@ -62,6 +77,22 @@ class HistoryPoint:
self.openapi_next_stop = json['properties']['last_position']['next_stop']
self.openapi_delay = json['properties']['last_position']['delay']
class HistoryPointFromPreprocessed(AbstractHistoryPoint):
def __init__(self, json):
self.lat = json["lat"]
self.lon = json["lon"]
self.state_position = json["state_position"]
self.first_captured = datetime.datetime.fromisoformat(json["first_captured"])
self.last_captured = datetime.datetime.fromisoformat(json["last_captured"])
self.origin_timestamp = datetime.datetime.fromisoformat(json["origin_timestamp"])
self.openapi_shape_dist_traveled = json["openapi_shape_dist_traveled"]
self.openapi_next_stop = json["openapi_next_stop"]
self.openapi_last_stop = json["openapi_last_stop"]
self.openapi_delay = json["openapi_delay"]
self.shape_point = json["shape_point"]
self.shape_point_dist_traveled = json["shape_point_dist_traveled"]
class TripHistory:
def __init__(self, trip):
self.trip = trip
......@@ -169,6 +200,10 @@ class TripHistory:
...
# tps_new[-1][2]["without_data"] += 1
def add_preprocessed_data(self, data):
for x in data["history"]:
self.history.append(HistoryPointFromPreprocessed(x))
......
......@@ -15,7 +15,7 @@ import asyncio
import communication
from communication import dt_to_path
import data_utils
from data_utils import Trip, TripPoint, HistoryPoint, TripHistory, shape_indexer, dist
from data_utils import Trip, TripPoint, HistoryPoint, TripHistory, shape_indexer, dist, unzip_parse
import datetime
import json
......@@ -24,7 +24,7 @@ import pprint
import gtfs
window_title_prefix = "jr"
window_title_prefix = "PID realtime"
QgsApplication.setPrefixPath("/usr/", True)
qgs = QgsApplication([], True)
......@@ -117,35 +117,52 @@ class MainWind(QMainWindow):
@asyncSlot()
async def tmp2(self):
dt = datetime.datetime(2024, 7, 4, 8, 4, 20)
dt = datetime.datetime(2024, 7, 4, 8, 0, 0)
g = gtfs.for_date(dt)
await g.load_trips()
await g.load_stops_for_all_trips()
trips = [Trip(t.id, dt.date()) for t in g.trips_by_routes["L22"] if t.direction == 1]
ths = {t.trip_id: TripHistory(t) for t in trips}
c = await get_communication()
dts = await c.list_realtime_data(datetime.datetime.combine(dt.date(), datetime.time(8,0,0)), datetime.datetime.combine(dt.date(), datetime.time(9,0,0)))
for dt in dts:
print("GET", dt)
for fname, data in (await c.get_data(dt)).items():
proc = await asyncio.create_subprocess_exec("gunzip", stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate(data)
data = json.loads(stdout)
for dato in data["features"]:
trip_id = dato["properties"]["trip"]["gtfs"]["trip_id"]
if trip_id in ths:
await ths[trip_id].load_gtfs_shape()
ths[trip_id].add_history_point(dt, dato)
break
#dts = await c.list_realtime_data(datetime.datetime.combine(dt.date(), datetime.time(8,0,0)), datetime.datetime.combine(dt.date(), datetime.time(9,0,0)))
#for dt in dts:
# print("GET", dt)
# for fname, data in (await c.get_data(dt)).items():
# proc = await asyncio.create_subprocess_exec("gunzip", stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
# stdout, stderr = await proc.communicate(data)
# data = json.loads(stdout)
# for dato in data["features"]:
# trip_id = dato["properties"]["trip"]["gtfs"]["trip_id"]
# if trip_id in ths:
# await ths[trip_id].load_gtfs_shape()
# ths[trip_id].add_history_point(dt, dato)
# break
data = await c.get_preprocessed_data(dt, "22")
print("P1")
data = await unzip_parse(data)
print("P3")
for trip_id, th in ths.items():
if trip_id in data:
th.add_preprocessed_data(data[trip_id])
print("P4")
for th in ths.values():
await th.load_stops()
print("P5")
main_th = list(ths.values())[51]
await main_th.load_gtfs_shape()
history_graph = HistoryGraph(main_th.stops)
print("P6")
for th in ths.values():
history_graph.add_trip(th)
print("P7")
history_graph.show()
tmp_windows[history_graph]=history_graph
......@@ -179,19 +196,13 @@ class MainWind(QMainWindow):
async def load_data(self, dt):
c = await get_communication()
for fname, data in (await c.get_data(dt)).items():
proc = await asyncio.create_subprocess_exec("gunzip", stdout=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate(data)
self.show_data(stdout, dt)
return
data = await unzip_parse(await c.get_data(dt))
self.show_data(data, dt)
def show_data(self, source_json, capture_time):
def show_data(self, data, capture_time):
current_data = []
feats = []
data = json.loads(source_json)
layer_fields = self.data_layer.fields()
for i, dato in enumerate(data["features"]):
......@@ -752,7 +763,7 @@ class HistoryGraph(Graph):
for hp in th.history:
if hp.shape_point_dist_traveled is not None:
x.append(hp.shape_point_dist_traveled + offset)
y.append(hp.capture_time)
y.append(hp.first_captured)
self._line, = self.ax.plot(x, y, 'o-', color="red")
x = []
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment