From ca01ead7559667d1a5224dd39fa38516cfe9d635 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A1s?= Date: Thu, 8 Feb 2024 12:19:18 +0100 Subject: [PATCH] Tests: Add test for excluding dropped chunks from being processed --- tests/publication_test.go | 123 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/tests/publication_test.go b/tests/publication_test.go index c05bc92c..87b2979c 100644 --- a/tests/publication_test.go +++ b/tests/publication_test.go @@ -564,6 +564,129 @@ func (pts *PublicationTestSuite) Test_Fixing_Broken_Publications_Without_State_S ) } +func (pts *PublicationTestSuite) Test_Dropped_Chunks_Are_Not_Processed_With_State_Storage() { + testSink := testsupport.NewEventCollectorSink() + + publicationName := lo.RandomString(10, lo.LowerCaseLettersCharset) + replicationSlotName := lo.RandomString(20, lo.LowerCaseLettersCharset) + stateStorageFile := fmt.Sprintf("/tmp/%s", lo.RandomString(10, lo.LowerCaseLettersCharset)) + + var tableName string + pts.RunTest( + func(ctx testrunner.Context) error { + existingChunks, publishedChunks, err := readAllAndPublishedChunks(ctx, tableName, publicationName) + if err != nil { + return err + } + + if len(existingChunks) != len(publishedChunks)-2 { + pts.T().Errorf( + "Number of existing and published chunks doesn't match: %d!=%d", + len(existingChunks), + len(publishedChunks)-2, + ) + } + + for key := range existingChunks { + if _, present := publishedChunks[key]; !present { + pts.T().Errorf("Chunk %s exists but isn't published", key) + } + } + + if err := ctx.PauseReplicator(); err != nil { + return err + } + + // Simulate some chunks being dropped while replicator is paused + if droppedChunks, err := ctx.Query(context.Background(), + fmt.Sprintf( + "SELECT drop_chunks('%s', '2023-03-25 12:00:00'::TIMESTAMPTZ)", + tableName, + ), + ); err != nil { + return err + } else { + for droppedChunks.Next() { + var chunkName string + if err := droppedChunks.Scan(&chunkName); err != nil { + return nil + } + pts.T().Logf("Chunk %s dropped from %s", chunkName, tableName) + } + } + + // Empty sink + testSink.Clear() + + if err := ctx.ResumeReplicator(); err != nil { + return err + } + + existingChunks, publishedChunks, err = readAllAndPublishedChunks(ctx, tableName, publicationName) + if err != nil { + return err + } + + if len(existingChunks) != len(publishedChunks)-2 { + pts.T().Errorf( + "Number of existing and published chunks doesn't match: %d!=%d", + len(existingChunks), + len(publishedChunks)-2, + ) + } + + for key := range existingChunks { + if _, present := publishedChunks[key]; !present { + pts.T().Errorf("Chunk %s exists but isn't published", key) + } + } + + return nil + }, + + testrunner.WithSetup(func(ctx testrunner.SetupContext) error { + _, tn, err := ctx.CreateHypertable("ts", time.Hour, + testsupport.NewColumn("ts", "timestamptz", false, true, nil), + testsupport.NewColumn("val", "integer", false, false, nil), + ) + if err != nil { + return err + } + tableName = tn + + if _, err := ctx.Exec(context.Background(), + fmt.Sprintf( + "INSERT INTO \"%s\" SELECT ts, ROW_NUMBER() OVER (ORDER BY ts) AS val FROM GENERATE_SERIES('2023-03-25 00:00:00'::TIMESTAMPTZ, '2023-03-25 23:59:59'::TIMESTAMPTZ, INTERVAL '1 minute') t(ts)", + tableName, + ), + ); err != nil { + return err + } + + ctx.AddSystemConfigConfigurator(testSink.SystemConfigConfigurator) + ctx.AddSystemConfigConfigurator(func(config *sysconfig.SystemConfig) { + config.PostgreSQL.Publication.Name = publicationName + config.PostgreSQL.Publication.AutoDrop = lo.ToPtr(false) + + config.PostgreSQL.ReplicationSlot.Name = replicationSlotName + config.PostgreSQL.ReplicationSlot.AutoDrop = lo.ToPtr(false) + + config.StateStorage.Type = spiconfig.FileStorage + config.StateStorage.FileStorage = spiconfig.FileStorageConfig{ + Path: stateStorageFile, + } + }) + return nil + }), + + testrunner.WithTearDown(func(ctx testrunner.Context) error { + // If error happens, we don't care + os.Remove(stateStorageFile) + return nil + }), + ) +} + func readAllAndPublishedChunks( ctx testrunner.Context, tableName, publicationName string, ) (