Skip to content

Commit

Permalink
realtime: handle stops pushed into time window by delay (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
matslina authored Dec 16, 2023
1 parent d40a7c3 commit 30c6bd1
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 64 deletions.
114 changes: 71 additions & 43 deletions realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,27 @@ type Realtime struct {
static *Static
reader storage.FeedReader

updatesByTrip map[string][]RealtimeUpdate
updatesByTrip map[string][]*RealtimeUpdate
skippedTrips map[string]bool

// TODO: These are used to expand the time window when
// querying static departures, to make sure delayed (and
// early) stops are retrieved (and then updated). Not pretty,
// and will result in larger time windows than
// necessary. Doing the same per stop is tricky, as stop
// delays propagate along the trip. Come up with a better
// approach.
minDelay time.Duration
maxDelay time.Duration
}

// Similar to parse.StopTimeUpdate, but trimmed down to what's
// necessary to serve realtime predictions. Should be suitable for
// caching and sharing with other instances.
type RealtimeUpdate struct {
StopSequence uint32
ArrivalDelay time.Duration
DepartureDelay time.Duration
StopSequence uint32
Type parse.StopTimeUpdateScheduleRelationship
}

Expand All @@ -42,6 +52,7 @@ func (rt *Realtime) LoadData(ctx context.Context, feedData [][]byte) error {
}

rt.skippedTrips = realtime.SkippedTrips
rt.updatesByTrip = map[string][]*RealtimeUpdate{}

// Retrieve Static stop time events for all trips in the realtime feed
trips := map[string]bool{}
Expand Down Expand Up @@ -72,7 +83,7 @@ func (rt *Realtime) LoadData(ctx context.Context, feedData [][]byte) error {

// Construct RealtimeUpdate objects from the parsed
// StopTimeUpdates.
rt.updatesByTrip = buildRealtimeUpdates(timezone, realtime.Updates, events)
rt.buildRealtimeUpdates(timezone, realtime.Updates, events)

return nil
}
Expand All @@ -86,11 +97,12 @@ func (rt *Realtime) Departures(
directionID int8,
routeTypes []storage.RouteType) ([]Departure, error) {

// Get the static schedule for the requested time window.
// Get the scheduled departures. Extend the window so that
// delayed (or early) departures are included.
scheduled, err := rt.static.Departures(
stopID,
windowStart,
windowLength,
windowStart.Add(-rt.maxDelay),
windowLength-rt.minDelay+rt.maxDelay,
numDepartures,
routeID,
directionID,
Expand Down Expand Up @@ -167,30 +179,29 @@ func (rt *Realtime) Departures(
departures = append(departures, dep)
case parse.StopTimeUpdateScheduled:
// SCHEDULED => update to static schedule
if updates[idx].DepartureDelay > 0 {
dep.Time = dep.Time.Add(updates[idx].DepartureDelay)
}

// If the delay pushed the departure outside
// of the requested time window, it must be
// ignored
if dep.Time.Before(windowStart) || dep.Time.After(windowStart.Add(windowLength)) {
continue
}

dep.Time = dep.Time.Add(updates[idx].DepartureDelay)
departures = append(departures, dep)
}
}

sort.Slice(departures, func(i, j int) bool {
return departures[i].Time.Before(departures[j].Time)
// Filter out departures outside of the requested time
// window. Sort by time. Done.
result := []Departure{}
for _, dep := range departures {
if dep.Time.Before(windowStart) || dep.Time.After(windowStart.Add(windowLength)) {
continue
}
result = append(result, dep)
}

sort.Slice(result, func(i, j int) bool {
return result[i].Time.Before(result[j].Time)
})

return departures, nil
return result, nil

// Missing:
//
// - Trips pushed into the time window by a delay
// - Added trips
// - Added stops (is that a thing?)
//
Expand Down Expand Up @@ -260,17 +271,13 @@ func resolveStopReferences(updates []*parse.StopTimeUpdate, events []*storage.St
}
}

// The full GTFS-rt StopTimeUpdates are great and all, but we only
// need some of the information they hold.
//
// This function takes StopTimeUpdates from a realtime feed, along
// with all associated static StopTimeEvents, and returns
// RealtimeUpdates, grouped by trip, and sorted by stop_sequence.
func buildRealtimeUpdates(
// Construct RealtimeUpdates from StopTimeUpdates and
// StopTimeEvents. Groups them by trip and stop.
func (rt *Realtime) buildRealtimeUpdates(
timezone *time.Location,
stups []*parse.StopTimeUpdate,
events []*storage.StopTimeEvent,
) map[string][]RealtimeUpdate {
) {

// Group static events by trip, and sort by stop_sequence
eventsByTrip := map[string][]*storage.StopTimeEvent{}
Expand Down Expand Up @@ -309,9 +316,12 @@ func buildRealtimeUpdates(
eventTime := upNoon.Add(-12 * time.Hour).Add(eventOffset)

return upTime.Sub(eventTime)
}

realtimeUpdates := map[string][]RealtimeUpdate{}
// NTS: should redo this to just compute diff in both
// directions, maybe guess date to cover DST switches,
// and then take the smaller one. If diff is 23h, then
// it's more likely to b 1h early than 23h late.
}

// Combine static schedule and realtime updates
for tripID, tripUpdates := range updatesByTrip {
Expand Down Expand Up @@ -339,34 +349,32 @@ func buildRealtimeUpdates(
// the stop should be skipped. No need to
// attach delays information.
if u.Type == parse.StopTimeUpdateNoData || u.Type == parse.StopTimeUpdateSkipped {
realtimeUpdates[tripID] = append(realtimeUpdates[tripID], RealtimeUpdate{
rtUp := &RealtimeUpdate{
StopSequence: u.StopSequence,
Type: u.Type,
})
}
rt.updatesByTrip[tripID] = append(rt.updatesByTrip[tripID], rtUp)
continue
}

// Type is SCHEDULED. Compute delays.
rtUp := RealtimeUpdate{
rtUp := &RealtimeUpdate{
StopSequence: u.StopSequence,
Type: u.Type,
}

if u.ArrivalIsSet {
// If exact time is provided, it takes
// precedence over delay.
// Feeds can use the timestamp to communicate delays
rtUp.ArrivalDelay = u.ArrivalDelay
if !u.ArrivalTime.IsZero() {
if !u.ArrivalTime.IsZero() && u.ArrivalDelay == 0 {
rtUp.ArrivalDelay = updateDelay(
events[ei].StopTime.ArrivalTime(),
u.ArrivalTime,
)
}
}
if u.DepartureIsSet {
// Same here: if exact time is
// provided, it takes precedene over
// delay.
// Same thing here
rtUp.DepartureDelay = u.DepartureDelay
if !u.DepartureTime.IsZero() {
rtUp.DepartureDelay = updateDelay(
Expand All @@ -379,10 +387,30 @@ func buildRealtimeUpdates(
// arrival delay applies to departure
rtUp.DepartureDelay = u.ArrivalDelay
}
if !u.ArrivalIsSet {
// Lacking Arrival data, assume
// departure delay applies to arrival
rtUp.ArrivalDelay = u.DepartureDelay
}

// Track the min and max delays observed. This
// is used to expand time window when
// searching static schedule.
if rtUp.ArrivalDelay < rt.minDelay {
rt.minDelay = rtUp.ArrivalDelay
}
if rtUp.ArrivalDelay > rt.maxDelay {
rt.maxDelay = rtUp.ArrivalDelay
}
if rtUp.DepartureDelay < rt.minDelay {
rt.minDelay = rtUp.DepartureDelay
}
if rtUp.DepartureDelay > rt.maxDelay {
rt.maxDelay = rtUp.DepartureDelay
}

rt.updatesByTrip[tripID] = append(rt.updatesByTrip[tripID], rtUp)

realtimeUpdates[tripID] = append(realtimeUpdates[tripID], rtUp)
}
}

return realtimeUpdates
}
73 changes: 72 additions & 1 deletion realtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,6 @@ func TestRealtimeTimeWindowing(t *testing.T) {
Time: time.Date(2020, 1, 15, 23, 13, 0, 0, time.UTC),
},
}, departures)

}

// Make sure we can deal with trips that have loops. In these cases,
Expand Down Expand Up @@ -1294,3 +1293,75 @@ func TestRealtimeLoadError(t *testing.T) {
require.NoError(t, err)
assert.Error(t, gtfs.NewRealtime(static, reader).LoadData(context.Background(), [][]byte{data}))
}

// Static departures can get pushed into a realtime request window by
// an update with delay.
func TestRealtimeUpdatePushingDepartureIntoWindow(t *testing.T) {
static, reader := SimpleStaticFixture(t)
rt := gtfs.NewRealtime(static, reader)

// Along trip t1, we normally have stops s1,...,s4 at 23:00,
// ..., 23:03. This update adds a big delay to s3, and has s1
// departing way early.

feed := buildFeed(t, []TripUpdate{
{
TripID: "t1",
StopUpdates: []StopUpdate{
{
StopID: "s1",
DepartureSet: true,
DepartureDelay: -3600,
},
{
StopID: "s3",
DepartureSet: true,
DepartureTime: time.Date(2020, 1, 15, 23, 59, 30, 1, time.UTC),
},
},
},
})
require.NoError(t, rt.LoadData(context.Background(), [][]byte{feed}))

// stop s1 is early, and can be found around 22:00
departures, err := rt.Departures(
"s1",
time.Date(2020, 1, 15, 21, 55, 0, 0, time.UTC),
10*time.Minute,
-1, "", -1, nil)
assert.NoError(t, err)
assert.Equal(t, 1, len(departures))
assert.Equal(t, "t1", departures[0].TripID)
assert.Equal(t, "s1", departures[0].StopID)
assert.Equal(t, time.Date(2020, 1, 15, 22, 0, 0, 0, time.UTC), departures[0].Time)

// there's no departure from s1 around the original time
departures, err = rt.Departures(
"s1",
time.Date(2020, 1, 15, 22, 55, 0, 0, time.UTC),
10*time.Minute,
-1, "", -1, nil)
assert.NoError(t, err)
assert.Equal(t, 0, len(departures))

// stop s3 is delayed, so it's not returned around 23:03
departures, err = rt.Departures(
"s3",
time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC),
10*time.Minute,
-1, "", -1, nil)
assert.NoError(t, err)
assert.Equal(t, 0, len(departures))

// but it is returned around midnight
departures, err = rt.Departures(
"s3",
time.Date(2020, 1, 15, 23, 55, 0, 0, time.UTC),
10*time.Minute,
-1, "", -1, nil)
assert.NoError(t, err)
assert.Equal(t, 1, len(departures))
assert.Equal(t, "t1", departures[0].TripID)
assert.Equal(t, "s3", departures[0].StopID)
assert.Equal(t, time.Date(2020, 1, 15, 23, 59, 30, 0, time.UTC), departures[0].Time)
}
Loading

0 comments on commit 30c6bd1

Please sign in to comment.