Skip to content

Commit

Permalink
Merge branch 'main' of github.com:Altinity/clickhouse-regression
Browse files Browse the repository at this point in the history
  • Loading branch information
alsugiliazova committed Feb 14, 2024
2 parents 98ac6f5 + bf3189b commit 294a092
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 89 deletions.
98 changes: 93 additions & 5 deletions vfs/tests/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,78 @@ def offline_replica(self):
assert_row_count(node=nodes[1], table_name=table_name, rows=1000000)


@TestScenario
@Requirements(RQ_SRS_038_DiskObjectStorageVFS_Replica_Remove("1.0"))
def add_remove_one_node(self):
"""
Test that no data is lost when a node is removed and added as a replica
during inserts on other replicas.
"""

table_name = "add_remove_one_replica"
storage_policy = "external_vfs"
parallel = False
nodes = self.context.ch_nodes
rows_per_insert = 100_000_000
retry_settings = {
"timeout": 120,
"initial_delay": 5,
"delay": 2,
}

if self.context.stress:
rows_per_insert = 500_000_000
retry_settings["timeout"] = 300
retry_settings["delay"] = 5

with Given("I have a replicated table"):
replicated_table_cluster(
table_name=table_name, storage_policy=storage_policy, columns="d UInt64"
)

When(
"I start inserts on the second node",
test=insert_random,
parallel=parallel,
)(
node=nodes[1],
table_name=table_name,
columns="d UInt64",
rows=rows_per_insert,
)

And(
"I delete the replica on the third node",
test=delete_one_replica,
parallel=parallel,
)(node=nodes[2], table_name=table_name)

And(
"I replicate the table on the third node",
test=create_one_replica,
parallel=parallel,
)(node=nodes[2], table_name=table_name)

When(
"I start inserts on the first node",
test=insert_random,
parallel=parallel,
)(
node=nodes[0],
table_name=table_name,
columns="d UInt64",
rows=rows_per_insert,
)

join()

for node in nodes:
with Then(f"I wait for {node.name} to sync by watching the row count"):
retry(assert_row_count, **retry_settings)(
node=node, table_name=table_name, rows=rows_per_insert * 2
)


@TestScenario
@Requirements(RQ_SRS_038_DiskObjectStorageVFS_Replica_Remove("1.0"))
def parallel_add_remove(self):
Expand Down Expand Up @@ -269,6 +341,7 @@ def parallel_add_remove(self):
columns="d UInt64",
rows=rows_per_insert,
)
insert_sets = 1

And(
"I replicate the table on the second node in parallel",
Expand All @@ -293,6 +366,7 @@ def parallel_add_remove(self):
columns="d UInt64",
rows=rows_per_insert,
)
insert_sets += 1

And(
"I delete the replica on the first node",
Expand All @@ -306,16 +380,28 @@ def parallel_add_remove(self):
parallel=True,
)(node=nodes[2], table_name=table_name)

And(
"I continue with parallel inserts on the second node",
test=insert_random,
parallel=True,
)(
node=nodes[1],
table_name=table_name,
columns="d UInt64",
rows=rows_per_insert,
)
insert_sets += 1

join()

with And("I wait for the third node to sync by watching the row count"):
retry(assert_row_count, **retry_settings)(
node=nodes[2], table_name=table_name, rows=rows_per_insert * 2
node=nodes[2], table_name=table_name, rows=rows_per_insert * insert_sets
)

with Then("I also check the row count on the second node"):
assert_row_count(
node=nodes[1], table_name=table_name, rows=rows_per_insert * 2
node=nodes[1], table_name=table_name, rows=rows_per_insert * insert_sets
)

Given(
Expand All @@ -328,6 +414,7 @@ def parallel_add_remove(self):
columns="d UInt64",
rows=rows_per_insert,
)
insert_sets += 1
And(
"I start parallel inserts on the third node in parallel",
test=insert_random,
Expand All @@ -338,6 +425,7 @@ def parallel_add_remove(self):
columns="d UInt64",
rows=rows_per_insert,
)
insert_sets += 1

And(
"I replicate the table on the first node again in parallel",
Expand All @@ -349,15 +437,15 @@ def parallel_add_remove(self):

with Then("I wait for the first node to sync by watching the row count"):
retry(assert_row_count, **retry_settings)(
node=nodes[0], table_name=table_name, rows=rows_per_insert * 4
node=nodes[0], table_name=table_name, rows=rows_per_insert * insert_sets
)

with And("I check the row count on the other nodes"):
assert_row_count(
node=nodes[1], table_name=table_name, rows=rows_per_insert * 4
node=nodes[1], table_name=table_name, rows=rows_per_insert * insert_sets
)
assert_row_count(
node=nodes[2], table_name=table_name, rows=rows_per_insert * 4
node=nodes[2], table_name=table_name, rows=rows_per_insert * insert_sets
)

finally:
Expand Down
8 changes: 7 additions & 1 deletion vfs/tests/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ def create_one_replica(
table_name,
columns="d UInt64",
order_by="d",
partition_by=None,
replica_path_suffix=None,
replica_name="{replica}",
no_checks=False,
Expand All @@ -229,11 +230,16 @@ def create_one_replica(
if replica_path_suffix is None:
replica_path_suffix = table_name

if partition_by is not None:
partition_by = f"PARTITION BY ({partition_by})"
else:
partition_by = ""

r = node.query(
f"""
CREATE TABLE IF NOT EXISTS {table_name} ({columns})
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{replica_path_suffix}', '{replica_name}')
ORDER BY ({order_by})
ORDER BY ({order_by}) {partition_by}
SETTINGS storage_policy='{storage_policy}'
""",
no_checks=no_checks,
Expand Down
Loading

0 comments on commit 294a092

Please sign in to comment.