Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix create dropped view #188

Merged
merged 1 commit into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,20 @@ func (j *Job) handleCreateTable(binlog *festruct.TBinlog) error {
return err
}

log.Infof("walter get table name by id %d name '%s'", createTable.TableId, srcTableName)

if len(srcTableName) == 0 {
// The table is not found in upstream, try read it from the binlog record,
// but it might failed because the `tableName` field is added after doris 2.0.3.
srcTableName = strings.TrimSpace(createTable.TableName)
if len(srcTableName) == 0 {
return xerror.Errorf(xerror.Normal, "the table with id %d is not found in the upstream cluster, create table: %s",
createTable.TableId, createTable.String())
}
log.Infof("the table id %d is not found in the upstream, use the name %s from the binlog record",
createTable.TableId, srcTableName)
}

var destTableId int64
destTableId, err = j.destMeta.GetTableId(srcTableName)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ suite("test_sync_view_drop_create") {

def suffix = UUID.randomUUID().toString().replace("-", "")
def tableDuplicate0 = "tbl_duplicate_0_${suffix}"
sql """ DROP VIEW IF EXISTS view_test_${suffix} """
sql """ DROP VIEW IF EXISTS view_test_1_${suffix} """
createDuplicateTable(tableDuplicate0)
sql """
INSERT INTO ${tableDuplicate0} VALUES
Expand Down Expand Up @@ -97,5 +99,31 @@ suite("test_sync_view_drop_create") {
assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableDuplicate0}", 8, 50))
def view_size = target_sql "SHOW VIEW FROM ${tableDuplicate0}"
assertTrue(view_size.size() == 1);

// pause and create again, so create view will query the upstream to found table name.
helper.ccrJobPause()

sql """
CREATE VIEW view_test_1_${suffix} (k1, name, v1)
AS
SELECT user_id as k1, name, SUM(age) FROM ${tableDuplicate0}
GROUP BY k1,name;
"""
sql """ DROP VIEW view_test_1_${suffix} """

helper.ccrJobResume()

// insert will be sync.
sql """
INSERT INTO ${tableDuplicate0} VALUES
(6, "Zhangsan", 31),
(5, "Ava", 20);
"""
sql "sync"

assertTrue(helper.checkSelectTimesOf("SELECT * FROM ${tableDuplicate0}", 10, 50))
view_size = target_sql "SHOW VIEW FROM ${tableDuplicate0}"
assertTrue(view_size.size() == 1);

}

Loading