diff --git a/realtime.go b/realtime.go index 22ebf53..96d0bf2 100644 --- a/realtime.go +++ b/realtime.go @@ -14,17 +14,27 @@ type Realtime struct { static *Static reader storage.FeedReader - updatesByTrip map[string][]RealtimeUpdate + updatesByTrip map[string][]*RealtimeUpdate skippedTrips map[string]bool + + // TODO: These are used to expand the time window when + // querying static departures, to make sure delayed (and + // early) stops are retrieved (and then updated). Not pretty, + // and will result in larger time windows than + // necessary. Doing the same per stop is tricky, as stop + // delays propagate along the trip. Come up with a better + // approach. + minDelay time.Duration + maxDelay time.Duration } // Similar to parse.StopTimeUpdate, but trimmed down to what's // necessary to serve realtime predictions. Should be suitable for // caching and sharing with other instances. type RealtimeUpdate struct { + StopSequence uint32 ArrivalDelay time.Duration DepartureDelay time.Duration - StopSequence uint32 Type parse.StopTimeUpdateScheduleRelationship } @@ -42,6 +52,7 @@ func (rt *Realtime) LoadData(ctx context.Context, feedData [][]byte) error { } rt.skippedTrips = realtime.SkippedTrips + rt.updatesByTrip = map[string][]*RealtimeUpdate{} // Retrieve Static stop time events for all trips in the realtime feed trips := map[string]bool{} @@ -72,7 +83,7 @@ func (rt *Realtime) LoadData(ctx context.Context, feedData [][]byte) error { // Construct RealtimeUpdate objects from the parsed // StopTimeUpdates. - rt.updatesByTrip = buildRealtimeUpdates(timezone, realtime.Updates, events) + rt.buildRealtimeUpdates(timezone, realtime.Updates, events) return nil } @@ -86,11 +97,12 @@ func (rt *Realtime) Departures( directionID int8, routeTypes []storage.RouteType) ([]Departure, error) { - // Get the static schedule for the requested time window. + // Get the scheduled departures. Extend the window so that + // delayed (or early) departures are included. scheduled, err := rt.static.Departures( stopID, - windowStart, - windowLength, + windowStart.Add(-rt.maxDelay), + windowLength-rt.minDelay+rt.maxDelay, numDepartures, routeID, directionID, @@ -167,30 +179,29 @@ func (rt *Realtime) Departures( departures = append(departures, dep) case parse.StopTimeUpdateScheduled: // SCHEDULED => update to static schedule - if updates[idx].DepartureDelay > 0 { - dep.Time = dep.Time.Add(updates[idx].DepartureDelay) - } - - // If the delay pushed the departure outside - // of the requested time window, it must be - // ignored - if dep.Time.Before(windowStart) || dep.Time.After(windowStart.Add(windowLength)) { - continue - } - + dep.Time = dep.Time.Add(updates[idx].DepartureDelay) departures = append(departures, dep) } } - sort.Slice(departures, func(i, j int) bool { - return departures[i].Time.Before(departures[j].Time) + // Filter out departures outside of the requested time + // window. Sort by time. Done. + result := []Departure{} + for _, dep := range departures { + if dep.Time.Before(windowStart) || dep.Time.After(windowStart.Add(windowLength)) { + continue + } + result = append(result, dep) + } + + sort.Slice(result, func(i, j int) bool { + return result[i].Time.Before(result[j].Time) }) - return departures, nil + return result, nil // Missing: // - // - Trips pushed into the time window by a delay // - Added trips // - Added stops (is that a thing?) // @@ -260,17 +271,13 @@ func resolveStopReferences(updates []*parse.StopTimeUpdate, events []*storage.St } } -// The full GTFS-rt StopTimeUpdates are great and all, but we only -// need some of the information they hold. -// -// This function takes StopTimeUpdates from a realtime feed, along -// with all associated static StopTimeEvents, and returns -// RealtimeUpdates, grouped by trip, and sorted by stop_sequence. -func buildRealtimeUpdates( +// Construct RealtimeUpdates from StopTimeUpdates and +// StopTimeEvents. Groups them by trip and stop. +func (rt *Realtime) buildRealtimeUpdates( timezone *time.Location, stups []*parse.StopTimeUpdate, events []*storage.StopTimeEvent, -) map[string][]RealtimeUpdate { +) { // Group static events by trip, and sort by stop_sequence eventsByTrip := map[string][]*storage.StopTimeEvent{} @@ -309,9 +316,12 @@ func buildRealtimeUpdates( eventTime := upNoon.Add(-12 * time.Hour).Add(eventOffset) return upTime.Sub(eventTime) - } - realtimeUpdates := map[string][]RealtimeUpdate{} + // NTS: should redo this to just compute diff in both + // directions, maybe guess date to cover DST switches, + // and then take the smaller one. If diff is 23h, then + // it's more likely to b 1h early than 23h late. + } // Combine static schedule and realtime updates for tripID, tripUpdates := range updatesByTrip { @@ -339,24 +349,24 @@ func buildRealtimeUpdates( // the stop should be skipped. No need to // attach delays information. if u.Type == parse.StopTimeUpdateNoData || u.Type == parse.StopTimeUpdateSkipped { - realtimeUpdates[tripID] = append(realtimeUpdates[tripID], RealtimeUpdate{ + rtUp := &RealtimeUpdate{ StopSequence: u.StopSequence, Type: u.Type, - }) + } + rt.updatesByTrip[tripID] = append(rt.updatesByTrip[tripID], rtUp) continue } // Type is SCHEDULED. Compute delays. - rtUp := RealtimeUpdate{ + rtUp := &RealtimeUpdate{ StopSequence: u.StopSequence, Type: u.Type, } if u.ArrivalIsSet { - // If exact time is provided, it takes - // precedence over delay. + // Feeds can use the timestamp to communicate delays rtUp.ArrivalDelay = u.ArrivalDelay - if !u.ArrivalTime.IsZero() { + if !u.ArrivalTime.IsZero() && u.ArrivalDelay == 0 { rtUp.ArrivalDelay = updateDelay( events[ei].StopTime.ArrivalTime(), u.ArrivalTime, @@ -364,9 +374,7 @@ func buildRealtimeUpdates( } } if u.DepartureIsSet { - // Same here: if exact time is - // provided, it takes precedene over - // delay. + // Same thing here rtUp.DepartureDelay = u.DepartureDelay if !u.DepartureTime.IsZero() { rtUp.DepartureDelay = updateDelay( @@ -379,10 +387,30 @@ func buildRealtimeUpdates( // arrival delay applies to departure rtUp.DepartureDelay = u.ArrivalDelay } + if !u.ArrivalIsSet { + // Lacking Arrival data, assume + // departure delay applies to arrival + rtUp.ArrivalDelay = u.DepartureDelay + } + + // Track the min and max delays observed. This + // is used to expand time window when + // searching static schedule. + if rtUp.ArrivalDelay < rt.minDelay { + rt.minDelay = rtUp.ArrivalDelay + } + if rtUp.ArrivalDelay > rt.maxDelay { + rt.maxDelay = rtUp.ArrivalDelay + } + if rtUp.DepartureDelay < rt.minDelay { + rt.minDelay = rtUp.DepartureDelay + } + if rtUp.DepartureDelay > rt.maxDelay { + rt.maxDelay = rtUp.DepartureDelay + } + + rt.updatesByTrip[tripID] = append(rt.updatesByTrip[tripID], rtUp) - realtimeUpdates[tripID] = append(realtimeUpdates[tripID], rtUp) } } - - return realtimeUpdates } diff --git a/realtime_test.go b/realtime_test.go index da5ca1e..a6bf76e 100644 --- a/realtime_test.go +++ b/realtime_test.go @@ -877,7 +877,6 @@ func TestRealtimeTimeWindowing(t *testing.T) { Time: time.Date(2020, 1, 15, 23, 13, 0, 0, time.UTC), }, }, departures) - } // Make sure we can deal with trips that have loops. In these cases, @@ -1294,3 +1293,75 @@ func TestRealtimeLoadError(t *testing.T) { require.NoError(t, err) assert.Error(t, gtfs.NewRealtime(static, reader).LoadData(context.Background(), [][]byte{data})) } + +// Static departures can get pushed into a realtime request window by +// an update with delay. +func TestRealtimeUpdatePushingDepartureIntoWindow(t *testing.T) { + static, reader := SimpleStaticFixture(t) + rt := gtfs.NewRealtime(static, reader) + + // Along trip t1, we normally have stops s1,...,s4 at 23:00, + // ..., 23:03. This update adds a big delay to s3, and has s1 + // departing way early. + + feed := buildFeed(t, []TripUpdate{ + { + TripID: "t1", + StopUpdates: []StopUpdate{ + { + StopID: "s1", + DepartureSet: true, + DepartureDelay: -3600, + }, + { + StopID: "s3", + DepartureSet: true, + DepartureTime: time.Date(2020, 1, 15, 23, 59, 30, 1, time.UTC), + }, + }, + }, + }) + require.NoError(t, rt.LoadData(context.Background(), [][]byte{feed})) + + // stop s1 is early, and can be found around 22:00 + departures, err := rt.Departures( + "s1", + time.Date(2020, 1, 15, 21, 55, 0, 0, time.UTC), + 10*time.Minute, + -1, "", -1, nil) + assert.NoError(t, err) + assert.Equal(t, 1, len(departures)) + assert.Equal(t, "t1", departures[0].TripID) + assert.Equal(t, "s1", departures[0].StopID) + assert.Equal(t, time.Date(2020, 1, 15, 22, 0, 0, 0, time.UTC), departures[0].Time) + + // there's no departure from s1 around the original time + departures, err = rt.Departures( + "s1", + time.Date(2020, 1, 15, 22, 55, 0, 0, time.UTC), + 10*time.Minute, + -1, "", -1, nil) + assert.NoError(t, err) + assert.Equal(t, 0, len(departures)) + + // stop s3 is delayed, so it's not returned around 23:03 + departures, err = rt.Departures( + "s3", + time.Date(2020, 1, 15, 23, 0, 0, 0, time.UTC), + 10*time.Minute, + -1, "", -1, nil) + assert.NoError(t, err) + assert.Equal(t, 0, len(departures)) + + // but it is returned around midnight + departures, err = rt.Departures( + "s3", + time.Date(2020, 1, 15, 23, 55, 0, 0, time.UTC), + 10*time.Minute, + -1, "", -1, nil) + assert.NoError(t, err) + assert.Equal(t, 1, len(departures)) + assert.Equal(t, "t1", departures[0].TripID) + assert.Equal(t, "s3", departures[0].StopID) + assert.Equal(t, time.Date(2020, 1, 15, 23, 59, 30, 0, time.UTC), departures[0].Time) +} diff --git a/static_integration_test.go b/static_integration_test.go index 5091bea..f5ce299 100644 --- a/static_integration_test.go +++ b/static_integration_test.go @@ -1,4 +1,4 @@ -package gtfs +package gtfs_test import ( "io/ioutil" @@ -8,11 +8,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "tidbyt.dev/gtfs" "tidbyt.dev/gtfs/parse" "tidbyt.dev/gtfs/storage" ) -func loadFeed2(t *testing.T, backend string, filename string) *Static { +func loadFeed2(t *testing.T, backend string, filename string) (*gtfs.Static, storage.FeedReader) { var s storage.Storage var err error if backend == "memory" { @@ -47,10 +48,10 @@ func loadFeed2(t *testing.T, backend string, filename string) *Static { reader, err := s.GetReader("benchmarking") require.NoError(t, err) - static, err := NewStatic(reader, metadata) + static, err := gtfs.NewStatic(reader, metadata) require.NoError(t, err) - return static + return static, reader } func testGTFSStaticIntegrationNearbyStops(t *testing.T, backend string) { @@ -59,7 +60,7 @@ func testGTFSStaticIntegrationNearbyStops(t *testing.T, backend string) { } // This is a giant GTFS file from the MTA - g := loadFeed2(t, backend, "testdata/mta_static.zip") + g, _ := loadFeed2(t, backend, "testdata/mta_static.zip") // The 4 nearest stops for 544 Park Ave, BK. There are other // stops with the same coordinates, but they all have @@ -113,7 +114,7 @@ func testGTFSStaticIntegrationDepartures(t *testing.T, backend string) { } // This is a giant GTFS file from the MTA - g := loadFeed2(t, backend, "testdata/mta_static.zip") + g, _ := loadFeed2(t, backend, "testdata/mta_static.zip") // Let's look at the G33S stop, also known as "Bedford - // Nostrand Avs". Between 22:50 and 23:10 there are are 6 @@ -152,16 +153,19 @@ func testGTFSStaticIntegrationDepartures(t *testing.T, backend string) { // Armed with this knowledge, we can now run some test // queries. + tz, err := time.LoadLocation("America/New_York") + require.NoError(t, err) + // Feb 3rd is a Monday - departures, _ := g.Departures("G33S", time.Date(2020, 2, 3, 22, 50, 0, 0, g.location), 10*time.Minute, -1, "", -1, nil) - assert.Equal(t, []Departure{ + departures, _ := g.Departures("G33S", time.Date(2020, 2, 3, 22, 50, 0, 0, tz), 10*time.Minute, -1, "", -1, nil) + assert.Equal(t, []gtfs.Departure{ { StopID: "G33S", RouteID: "G", TripID: "BFA19GEN-G051-Weekday-00_135800_G..S14R", StopSequence: 9, DirectionID: 1, - Time: time.Date(2020, 2, 3, 22, 50, 30, 0, g.location), + Time: time.Date(2020, 2, 3, 22, 50, 30, 0, tz), Headsign: "Church Av", }, { @@ -170,21 +174,21 @@ func testGTFSStaticIntegrationDepartures(t *testing.T, backend string) { TripID: "BFA19GEN-G051-Weekday-00_136700_G..S14R", StopSequence: 9, DirectionID: 1, - Time: time.Date(2020, 2, 3, 22, 59, 30, 0, g.location), + Time: time.Date(2020, 2, 3, 22, 59, 30, 0, tz), Headsign: "Church Av", }, }, departures) // Feb 17 is also a Monday, but President's Day - departures, _ = g.Departures("G33S", time.Date(2020, 2, 17, 22, 50, 0, 0, g.location), 10*time.Minute, -1, "", -1, nil) - assert.Equal(t, []Departure{ + departures, _ = g.Departures("G33S", time.Date(2020, 2, 17, 22, 50, 0, 0, tz), 10*time.Minute, -1, "", -1, nil) + assert.Equal(t, []gtfs.Departure{ { StopID: "G33S", RouteID: "G", TripID: "BFA19GEN-G035-Saturday-00_135750_G..S16R", StopSequence: 9, DirectionID: 1, - Time: time.Date(2020, 2, 17, 22, 52, 0, 0, g.location), + Time: time.Date(2020, 2, 17, 22, 52, 0, 0, tz), Headsign: "Church Av", }, }, departures) @@ -192,15 +196,15 @@ func testGTFSStaticIntegrationDepartures(t *testing.T, backend string) { // So to get 2 stops w need a larger window. These appear in // reverse order in stop_times.txt, but will be still be // returned ordered by departure time. - departures, _ = g.Departures("G33S", time.Date(2020, 2, 17, 22, 50, 0, 0, g.location), 13*time.Minute, -1, "", -1, nil) - assert.Equal(t, []Departure{ + departures, _ = g.Departures("G33S", time.Date(2020, 2, 17, 22, 50, 0, 0, tz), 13*time.Minute, -1, "", -1, nil) + assert.Equal(t, []gtfs.Departure{ { StopID: "G33S", RouteID: "G", TripID: "BFA19GEN-G035-Saturday-00_135750_G..S16R", StopSequence: 9, DirectionID: 1, - Time: time.Date(2020, 2, 17, 22, 52, 0, 0, g.location), + Time: time.Date(2020, 2, 17, 22, 52, 0, 0, tz), Headsign: "Church Av", }, { @@ -209,21 +213,21 @@ func testGTFSStaticIntegrationDepartures(t *testing.T, backend string) { TripID: "BFA19GEN-G035-Saturday-00_136750_G..S16R", StopSequence: 9, DirectionID: 1, - Time: time.Date(2020, 2, 17, 23, 2, 30, 0, g.location), + Time: time.Date(2020, 2, 17, 23, 2, 30, 0, tz), Headsign: "Church Av", }, }, departures) // Feb 16 is a Sunday - departures, _ = g.Departures("G33S", time.Date(2020, 2, 16, 22, 50, 0, 0, g.location), 10*time.Minute, -1, "", -1, nil) - assert.Equal(t, []Departure{ + departures, _ = g.Departures("G33S", time.Date(2020, 2, 16, 22, 50, 0, 0, tz), 10*time.Minute, -1, "", -1, nil) + assert.Equal(t, []gtfs.Departure{ { StopID: "G33S", RouteID: "G", TripID: "BFA19GEN-G036-Sunday-00_136550_G..S16R", StopSequence: 9, DirectionID: 1, - Time: time.Date(2020, 2, 16, 23, 0, 0, 0, g.location), + Time: time.Date(2020, 2, 16, 23, 0, 0, 0, tz), Headsign: "Church Av", }, }, departures)