Skip to content

Commit

Permalink
Skip overwritten tables in partial snapshot (#241)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Nov 19, 2024
1 parent b629295 commit 0a18eb1
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 0 deletions.
18 changes: 18 additions & 0 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,16 @@ func (j *Job) partialSync() error {
j.progress.TableCommitSeqMap, inMemoryData.TableCommitSeqMap)
j.progress.TableNameMapping = utils.MergeMap(
j.progress.TableNameMapping, inMemoryData.TableNameMapping)
if _, ok := inMemoryData.TableCommitSeqMap[tableId]; !ok {
// The table might be overwritten during backup & restore, so we also need to update
// it's commit seq to skip the binlogs.
//
// See test_cds_tbl_alter_drop_create.groovy for details.
commitSeq, _ := j.progress.GetTableCommitSeq(table)
log.Infof("partial sync update the overwritten table %s commit seq to %d, table id: %d",
table, commitSeq, tableId)
j.progress.TableCommitSeqMap[tableId] = commitSeq
}
j.progress.NextSubCheckpoint(PersistRestoreInfo, restoreSnapshotName)

case PersistRestoreInfo:
Expand Down Expand Up @@ -1653,6 +1663,10 @@ func (j *Job) handleCreateTable(binlog *festruct.TBinlog) error {
return err
}

if j.isBinlogCommitted(createTable.TableId, binlog.GetCommitSeq()) {
return nil
}

if featureCreateViewDropExists {
viewRegex := regexp.MustCompile(`(?i)^CREATE(\s+)VIEW`)
isCreateView := viewRegex.MatchString(createTable.Sql)
Expand Down Expand Up @@ -1719,6 +1733,10 @@ func (j *Job) handleDropTable(binlog *festruct.TBinlog) error {
return err
}

if j.isBinlogCommitted(dropTable.TableId, binlog.GetCommitSeq()) {
return nil
}

tableName := dropTable.TableName
// deprecated, `TableName` has been added after doris 2.0.0
if tableName == "" {
Expand Down
11 changes: 11 additions & 0 deletions pkg/ccr/job_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,17 @@ func (j *JobProgress) GetTableId(tableName string) (int64, bool) {
return 0, false
}

// GetTableCommitSeq get table commit seq by table name
func (j *JobProgress) GetTableCommitSeq(tableName string) (int64, bool) {
tableId, ok := j.GetTableId(tableName)
if !ok {
return 0, false
}

commitSeq, ok := j.TableCommitSeqMap[tableId]
return commitSeq, ok
}

func (j *JobProgress) StartHandle(commitSeq int64) {
j.CommitSeq = commitSeq

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_cds_tbl_alter_drop_create") {
def helper = new GroovyShell(new Binding(['suite': delegate]))
.evaluate(new File("${context.config.suitePath}/../common", "helper.groovy"))

if (!helper.is_version_supported([30003, 20108, 20016])) {
// at least doris 3.0.3, 2.1.8 and doris 2.0.16
def version = helper.upstream_version()
logger.info("skip this suite because version is not supported, upstream version ${version}")
return
}

def oldTableName = "tbl_old_" + helper.randomSuffix()
def newTableName = "tbl_new_" + helper.randomSuffix()

def exist = { res -> Boolean
return res.size() != 0
}
def notExist = { res -> Boolean
return res.size() == 0
}

logger.info("=== Create a fake table ===")
sql """
CREATE TABLE if NOT EXISTS ${oldTableName}_fake
(
`test` INT,
`id` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
PARTITION BY RANGE(`id`)
(
PARTITION `p1` VALUES LESS THAN ("0"),
PARTITION `p2` VALUES LESS THAN ("100"),
PARTITION `p3` VALUES LESS THAN ("200"),
PARTITION `p4` VALUES LESS THAN ("300"),
PARTITION `p5` VALUES LESS THAN ("1000")
)
DISTRIBUTED BY HASH(id) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"binlog.enable" = "true"
)
"""

helper.enableDbBinlog()
helper.ccrJobDelete()
helper.ccrJobCreate()

assertTrue(helper.checkRestoreFinishTimesOf("${oldTableName}_fake", 60))

logger.info(" ==== create table and drop ==== ")

def first_job_progress = helper.get_job_progress()

helper.ccrJobPause()

sql """
CREATE TABLE if NOT EXISTS ${oldTableName}
(
`test` INT,
`id` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
PARTITION BY RANGE(`id`)
(
PARTITION `p1` VALUES LESS THAN ("0"),
PARTITION `p2` VALUES LESS THAN ("100"),
PARTITION `p3` VALUES LESS THAN ("200"),
PARTITION `p4` VALUES LESS THAN ("300"),
PARTITION `p5` VALUES LESS THAN ("1000")
)
DISTRIBUTED BY HASH(id) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"binlog.enable" = "true"
)
"""

sql "INSERT INTO ${oldTableName} VALUES (1, 100), (100, 1), (2, 200), (200, 2)"
sql "ALTER TABLE ${oldTableName} ADD COLUMN `new_col` INT KEY DEFAULT \"0\""

assertTrue(helper.checkShowTimesOf("""
SHOW ALTER TABLE COLUMN
FROM ${context.dbName}
WHERE TableName = "${oldTableName}" AND State = "FINISHED"
""",
exist, 30))

sql "INSERT INTO ${oldTableName} VALUES (5, 500, 1)"
sql "DROP TABLE ${oldTableName} FORCE"
sql "INSERT INTO ${oldTableName}_fake VALUES (5, 500)"

logger.info("create table ${oldTableName} again ")

sql """
CREATE TABLE if NOT EXISTS ${oldTableName}
(
`test` INT,
`id` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
PARTITION BY RANGE(`id`)
(
PARTITION `p1` VALUES LESS THAN ("0"),
PARTITION `p2` VALUES LESS THAN ("100"),
PARTITION `p3` VALUES LESS THAN ("200"),
PARTITION `p4` VALUES LESS THAN ("300"),
PARTITION `p5` VALUES LESS THAN ("1000")
)
DISTRIBUTED BY HASH(id) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"binlog.enable" = "true"
)
"""

sql "INSERT INTO ${oldTableName} VALUES (1, 100), (100, 1), (2, 200), (200, 2)"
sql "INSERT INTO ${oldTableName}_fake VALUES (1, 100), (100, 1), (2, 200), (200, 2)"

helper.ccrJobResume()

assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${oldTableName}_fake", 5, 60))
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${oldTableName}", 4, 60))
assertTrue(helper.checkShowTimesOf("SHOW TABLES LIKE \"${oldTableName}\"", exist, 60, "target"))

// no fullsync are triggered
def last_job_progress = helper.get_job_progress()
assertTrue(last_job_progress.full_sync_start_at == first_job_progress.full_sync_start_at)
}

0 comments on commit 0a18eb1

Please sign in to comment.