From bd81c8887964abb99f26f7f02a8b3d02432f026b Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Wed, 18 Dec 2024 18:38:24 +0100 Subject: [PATCH] feat: abort early once we reach count = 0 Signed-off-by: Andres Taylor --- go/vt/vtgate/engine/limit.go | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/go/vt/vtgate/engine/limit.go b/go/vt/vtgate/engine/limit.go index 29a763238bd..824689d2859 100644 --- a/go/vt/vtgate/engine/limit.go +++ b/go/vt/vtgate/engine/limit.go @@ -23,8 +23,6 @@ import ( "strconv" "sync" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtgate/evalengine" @@ -100,15 +98,16 @@ func (l *Limit) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars bindVars = copyBindVars(bindVars) - // When offset is present, we hijack the limit value so we can calculate - // the offset in memory from the result of the scatter query with count + offset. + // Adjust the upper limit so that the initial fetch includes both the offset and count. + // We do this because we want to skip the first `offset` rows locally rather than on the server side. bindVars[UpperLimitStr] = sqltypes.Int64BindVariable(int64(count + offset)) var mu sync.Mutex err = vcursor.StreamExecutePrimitive(ctx, l.Input, bindVars, wantfields, func(qr *sqltypes.Result) error { mu.Lock() defer mu.Unlock() - log.Errorf("LastInsertID: %d InsertIDChanged %t\n", qr.InsertID, qr.InsertIDChanged) + + // If this is the first callback and fields are requested, send the fields immediately. if wantfields && len(qr.Fields) != 0 { if err := callback(&sqltypes.Result{Fields: qr.Fields}); err != nil { return err @@ -119,17 +118,32 @@ func (l *Limit) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars return callback(qr) } - // we've still not seen all rows we need to see before we can return anything to the client + // If we still need to skip `offset` rows before returning any to the client: if offset > 0 { if inputSize <= offset { - // not enough to return anything yet + // not enough to return anything yet, but we still want to pass on metadata such as last_insert_id offset -= inputSize - return nil + qr.Rows = nil + return callback(qr) } + // Skip `offset` rows from this batch and reset offset to 0. qr.Rows = qr.Rows[offset:] offset = 0 } + // At this point, we've dealt with the offset. Now handle the count (limit). + if count == 0 { + // If count is zero, we've fetched everything we need. + if !l.RequireCompleteInput && !vcursor.Session().InTransaction() { + return io.EOF + } + + // If we require the complete input, or we are in a transaction, we cannot return io.EOF early. + // Instead, we return empty results as needed until input ends. + qr.Rows = nil + return callback(qr) + } + // reduce count till 0. resultSize := len(qr.Rows) if count > resultSize { @@ -143,6 +157,8 @@ func (l *Limit) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars return err } + // If we required complete input or are in a transaction, we must not exit early. + // We'll return empty batches until the input is done. if l.RequireCompleteInput || vcursor.Session().InTransaction() { return nil }