Skip to content

Commit

Permalink
Merge pull request #97 from planetscale/revert-74-remove-read-timeout
Browse files Browse the repository at this point in the history
Revert "Don't use a client side read timeout"
  • Loading branch information
nickvanw authored Mar 22, 2024
2 parents 4aafbc0 + 3ee6254 commit 426bcdb
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane

currentPosition := lastKnownPosition
table := s.Stream
readDuration := 1 * time.Minute
preamble := fmt.Sprintf("[%v:%v:%v shard : %v] ", table.Namespace, TabletTypeToString(tabletType), table.Name, currentPosition.Shard)
for {
p.Logger.Log(LOGLEVEL_INFO, preamble+"peeking to see if there's any new rows")
Expand All @@ -192,10 +193,10 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
p.Logger.Log(LOGLEVEL_INFO, preamble+"no new rows found, exiting")
return TableCursorToSerializedCursor(currentPosition)
}
p.Logger.Log(LOGLEVEL_INFO, "new rows found, syncing")
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("new rows found, syncing rows for %v", readDuration))
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"syncing rows with cursor [%v]", currentPosition))

currentPosition, err = p.sync(ctx, currentPosition, latestCursorPosition, table, ps, tabletType)
currentPosition, err = p.sync(ctx, currentPosition, latestCursorPosition, table, ps, tabletType, readDuration)
if currentPosition.Position != "" {
currentSerializedCursor, sErr = TableCursorToSerializedCursor(currentPosition)
if sErr != nil {
Expand Down Expand Up @@ -223,8 +224,10 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
}
}

func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType) (*psdbconnect.TableCursor, error) {
func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, readDuration time.Duration) (*psdbconnect.TableCursor, error) {
defer p.Logger.Flush()
ctx, cancel := context.WithTimeout(ctx, readDuration)
defer cancel()

var (
err error
Expand Down

0 comments on commit 426bcdb

Please sign in to comment.