Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: bump test timeout #135194

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package cdctest

import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -75,6 +76,8 @@ type EnterpriseTestFeed interface {
Resume() error
// WaitForStatus waits for the provided func to return true, or returns an error.
WaitForStatus(func(s jobs.Status) bool) error
// WaitDurationForStatus waits for a specified time for the provided func to return true, or returns an error.
WaitDurationForStatus(dur time.Duration, statusPred func(status jobs.Status) bool) error
// FetchTerminalJobErr retrieves the error message from changefeed job.
FetchTerminalJobErr() error
// FetchRunningStatus retrieves running status from changefeed job.
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6472,6 +6472,9 @@ func TestChangefeedTimelyResolvedTimestampUpdatePostRollingRestart(t *testing.T)
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Add verbose logging to help debug future failures.
require.NoError(t, log.SetVModule("changefeed_processors=1"))

// This test requires many range splits, which can be slow under certain test
// conditions. Skip potentially slow tests.
skip.UnderDeadlock(t)
Expand Down Expand Up @@ -6569,7 +6572,7 @@ func TestChangefeedTimelyResolvedTimestampUpdatePostRollingRestart(t *testing.T)
defer DiscardMessages(testFeed)()

// Ensure the changefeed is able to complete in a reasonable amount of time.
require.NoError(t, testFeed.(cdctest.EnterpriseTestFeed).WaitForStatus(func(s jobs.Status) bool {
require.NoError(t, testFeed.(cdctest.EnterpriseTestFeed).WaitDurationForStatus(5*time.Minute, func(s jobs.Status) bool {
return s == jobs.StatusSucceeded
}))
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,13 +440,15 @@ func (f *jobFeed) status() (status string, err error) {
return
}

func (f *jobFeed) WaitForStatus(statusPred func(status jobs.Status) bool) error {
func (f *jobFeed) WaitDurationForStatus(
dur time.Duration, statusPred func(status jobs.Status) bool,
) error {
if f.jobID == jobspb.InvalidJobID {
// Job may not have been started.
return nil
}
// Wait for the job status predicate to become true.
return testutils.SucceedsSoonError(func() error {
return testutils.SucceedsWithinError(func() error {
var status string
var err error
if status, err = f.status(); err != nil {
Expand All @@ -456,7 +458,11 @@ func (f *jobFeed) WaitForStatus(statusPred func(status jobs.Status) bool) error
return nil
}
return errors.Newf("still waiting for job status; current %s", status)
})
}, dur)
}

func (f *jobFeed) WaitForStatus(statusPred func(status jobs.Status) bool) error {
return f.WaitDurationForStatus(testutils.SucceedsSoonDuration(), statusPred)
}

// Pause implements the TestFeed interface.
Expand Down