Skip to content

Commit

Permalink
Tests: Add test for excluding dropped chunks from being processed
Browse files Browse the repository at this point in the history
  • Loading branch information
András committed Feb 8, 2024
1 parent c8df502 commit ca01ead
Showing 1 changed file with 123 additions and 0 deletions.
123 changes: 123 additions & 0 deletions tests/publication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,129 @@ func (pts *PublicationTestSuite) Test_Fixing_Broken_Publications_Without_State_S
)
}

func (pts *PublicationTestSuite) Test_Dropped_Chunks_Are_Not_Processed_With_State_Storage() {
testSink := testsupport.NewEventCollectorSink()

publicationName := lo.RandomString(10, lo.LowerCaseLettersCharset)
replicationSlotName := lo.RandomString(20, lo.LowerCaseLettersCharset)
stateStorageFile := fmt.Sprintf("/tmp/%s", lo.RandomString(10, lo.LowerCaseLettersCharset))

var tableName string
pts.RunTest(
func(ctx testrunner.Context) error {
existingChunks, publishedChunks, err := readAllAndPublishedChunks(ctx, tableName, publicationName)
if err != nil {
return err
}

if len(existingChunks) != len(publishedChunks)-2 {
pts.T().Errorf(
"Number of existing and published chunks doesn't match: %d!=%d",
len(existingChunks),
len(publishedChunks)-2,
)
}

for key := range existingChunks {
if _, present := publishedChunks[key]; !present {
pts.T().Errorf("Chunk %s exists but isn't published", key)
}
}

if err := ctx.PauseReplicator(); err != nil {
return err
}

// Simulate some chunks being dropped while replicator is paused
if droppedChunks, err := ctx.Query(context.Background(),
fmt.Sprintf(
"SELECT drop_chunks('%s', '2023-03-25 12:00:00'::TIMESTAMPTZ)",
tableName,
),
); err != nil {
return err
} else {
for droppedChunks.Next() {
var chunkName string
if err := droppedChunks.Scan(&chunkName); err != nil {
return nil

Check failure on line 612 in tests/publication_test.go

View workflow job for this annotation

GitHub Actions / Building, Linting, Formatting

error is not nil (line 611) but it returns nil (nilerr)
}
pts.T().Logf("Chunk %s dropped from %s", chunkName, tableName)
}
}

// Empty sink
testSink.Clear()

if err := ctx.ResumeReplicator(); err != nil {
return err
}

existingChunks, publishedChunks, err = readAllAndPublishedChunks(ctx, tableName, publicationName)
if err != nil {
return err
}

if len(existingChunks) != len(publishedChunks)-2 {
pts.T().Errorf(
"Number of existing and published chunks doesn't match: %d!=%d",
len(existingChunks),
len(publishedChunks)-2,
)
}

for key := range existingChunks {
if _, present := publishedChunks[key]; !present {
pts.T().Errorf("Chunk %s exists but isn't published", key)
}
}

return nil
},

testrunner.WithSetup(func(ctx testrunner.SetupContext) error {
_, tn, err := ctx.CreateHypertable("ts", time.Hour,
testsupport.NewColumn("ts", "timestamptz", false, true, nil),
testsupport.NewColumn("val", "integer", false, false, nil),
)
if err != nil {
return err
}
tableName = tn

if _, err := ctx.Exec(context.Background(),
fmt.Sprintf(
"INSERT INTO \"%s\" SELECT ts, ROW_NUMBER() OVER (ORDER BY ts) AS val FROM GENERATE_SERIES('2023-03-25 00:00:00'::TIMESTAMPTZ, '2023-03-25 23:59:59'::TIMESTAMPTZ, INTERVAL '1 minute') t(ts)",
tableName,
),
); err != nil {
return err
}

ctx.AddSystemConfigConfigurator(testSink.SystemConfigConfigurator)
ctx.AddSystemConfigConfigurator(func(config *sysconfig.SystemConfig) {
config.PostgreSQL.Publication.Name = publicationName
config.PostgreSQL.Publication.AutoDrop = lo.ToPtr(false)

config.PostgreSQL.ReplicationSlot.Name = replicationSlotName
config.PostgreSQL.ReplicationSlot.AutoDrop = lo.ToPtr(false)

config.StateStorage.Type = spiconfig.FileStorage
config.StateStorage.FileStorage = spiconfig.FileStorageConfig{
Path: stateStorageFile,
}
})
return nil
}),

testrunner.WithTearDown(func(ctx testrunner.Context) error {
// If error happens, we don't care
os.Remove(stateStorageFile)
return nil
}),
)
}

func readAllAndPublishedChunks(
ctx testrunner.Context, tableName, publicationName string,
) (
Expand Down

0 comments on commit ca01ead

Please sign in to comment.