Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.5.0 patches #152

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class SqlServerConnection extends JdbcConnection {

public static final String INSTANCE_NAME = "instance";

private static final String GET_DATABASE_NAME = "SELECT name FROM sys.databases WHERE name = ?";
private static final String GET_DATABASE_METADATA = "WITH names AS (SELECT * FROM (VALUES #) AS names(name)) SELECT n.name, d.name, d.state, d.state_desc FROM names n LEFT JOIN sys.databases d ON d.name = n.name";

private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerConnection.class);

Expand Down Expand Up @@ -535,21 +535,55 @@ public String getNameOfChangeTable(String captureName) {
}

/**
* Retrieve the name of the database in the original case as it's defined on the server.
* Retrieve the names of the existing online databases in the original case as they are defined on the server.
*
* Although SQL Server supports case-insensitive collations, the connector uses the database name to build the
* produced records' source info and, subsequently, the keys of its committed offset messages. This value
* must remain the same during the lifetime of the connector regardless of the case used in the connector
* configuration.
*/
public String retrieveRealDatabaseName(String databaseName) {
public List<String> retrieveRealOnlineDatabaseNames(List<String> databaseNames) {
if (databaseNames.isEmpty()) {
return databaseNames;
}

String placeholders = databaseNames.stream()
.map(x -> "(?)")
.collect(Collectors.joining(", "));

String query = GET_DATABASE_METADATA.replace(STATEMENTS_PLACEHOLDER, placeholders);

try {
return prepareQueryAndMap(GET_DATABASE_NAME,
ps -> ps.setString(1, databaseName),
singleResultMapper(rs -> rs.getString(1), "Could not retrieve exactly one database name"));
return prepareQueryAndMap(query,
ps -> {
int index = 1;
for (String databaseName : databaseNames) {
ps.setString(index++, databaseName);
}
},
rs -> {
List<String> result = new ArrayList<>();
while (rs.next()) {
final String name = rs.getString(1);
final String realName = rs.getString(2);
if (realName == null) {
LOGGER.warn("Database {} does not exist", name);
continue;
}

final int state = rs.getInt(3);
if (state != 0) {
LOGGER.warn("Database {} is not online (state_desc = {})", realName, rs.getString(4));
continue;
}

result.add(realName);
}
return result;
});
}
catch (SQLException e) {
throw new RuntimeException("Couldn't obtain database name", e);
throw new RuntimeException("Couldn't retrieve real database names", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
private List<Map<String, String>> buildTaskConfigs(SqlServerConnection connection, SqlServerConnectorConfig config,
int maxTasks) {
List<String> databaseNames = config.getDatabaseNames();
final List<String> realDatabaseNames = connection.retrieveRealOnlineDatabaseNames(databaseNames);
if (realDatabaseNames.isEmpty()) {
throw new IllegalArgumentException();
}

// Initialize the database list for each task
List<List<String>> databasesByTask = new ArrayList<>();
Expand All @@ -83,10 +87,9 @@ private List<Map<String, String>> buildTaskConfigs(SqlServerConnection connectio
}

// Add each database to a task list via round-robin.
for (int databaseNameIndex = 0; databaseNameIndex < databaseNames.size(); databaseNameIndex++) {
for (int databaseNameIndex = 0; databaseNameIndex < realDatabaseNames.size(); databaseNameIndex++) {
int taskIndex = databaseNameIndex % numTasks;
String realDatabaseName = connection.retrieveRealDatabaseName(databaseNames.get(databaseNameIndex));
databasesByTask.get(taskIndex).add(realDatabaseName);
databasesByTask.get(taskIndex).add(realDatabaseNames.get(databaseNameIndex));
}

// Create a task config for each task, assigning each a list of database names.
Expand Down Expand Up @@ -125,22 +128,6 @@ protected void validateConnection(Map<String, ConfigValue> configValues, Configu
connection.execute("SELECT @@VERSION");
LOGGER.debug("Successfully tested connection for {} with user '{}'", connection.connectionString(),
connection.username());
LOGGER.info("Checking if user has access to CDC table");
if (sqlServerConfig.getSnapshotMode() != SqlServerConnectorConfig.SnapshotMode.INITIAL_ONLY) {
final List<String> noAccessDatabaseNames = new ArrayList<>();
for (String databaseName : sqlServerConfig.getDatabaseNames()) {
if (!connection.checkIfConnectedUserHasAccessToCDCTable(databaseName)) {
noAccessDatabaseNames.add(databaseName);
}
}
if (!noAccessDatabaseNames.isEmpty()) {
String errorMessage = String.format(
"User %s does not have access to CDC schema in the following databases: %s. This user can only be used in initial_only snapshot mode",
config.getString(RelationalDatabaseConnectorConfig.USER), String.join(", ", noAccessDatabaseNames));
LOGGER.error(errorMessage);
userValue.addErrorMessage(errorMessage);
}
}
}
catch (Exception e) {
LOGGER.error("Failed testing connection for {} with user '{}'", config.withMaskedPasswords(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,11 @@ public boolean executeIteration(ChangeEventSourceContext context, SqlServerParti
}
}
catch (SQLException e) {
LOGGER.warn("No maximum LSN recorded in the database; this may happen if there are no changes recorded in the change table yet or " +
LOGGER.debug("No maximum LSN recorded in the database; this may happen if there are no changes recorded in the change table yet or " +
"low activity database where the cdc clean up job periodically clears entries from the cdc tables. " +
"Otherwise, this may be an indication that the SQL Server Agent is not running. " +
"You should follow the documentation on how to configure SQL Server Agent running status query.");
LOGGER.warn("Cannot query the status of the SQL Server Agent", e);
LOGGER.debug("Cannot query the status of the SQL Server Agent", e);
}
checkAgent = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import static io.debezium.relational.RelationalDatabaseConnectorConfig.SCHEMA_INCLUDE_LIST;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

import java.io.IOException;
Expand Down Expand Up @@ -911,30 +910,6 @@ public void testIncludeTable() throws Exception {
stopConnector();
}

@Test
@FixFor("DBZ-4346")
public void shouldReportConfigurationErrorForUserNotHavingAccessToCDCTableInInitialMode() throws Exception {
// First create a new user with only db_datareader role
String testUserCreateSql = "IF EXISTS (select 1 from sys.server_principals where name = 'test_user')\n"
+ "DROP LOGIN test_user\n"
+ "CREATE LOGIN test_user WITH PASSWORD = 'Password!'\n"
+ "CREATE USER test_user FOR LOGIN test_user\n"
+ "ALTER ROLE db_denydatareader ADD MEMBER test_user";

connection.execute(testUserCreateSql);

final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "^dbo.tableb$")
.with(SqlServerConnectorConfig.USER, "test_user")
.build();

SqlServerConnector connector = new SqlServerConnector();
Config validatedConfig = connector.validate(config.asMap());

assertConfigurationErrors(validatedConfig, SqlServerConnectorConfig.USER, 1);
}

@Test
@FixFor("DBZ-4346")
public void shouldNotReportConfigurationErrorForUserNotHavingAccessToCDCTableInInitialOnlyMode() throws Exception {
Expand Down Expand Up @@ -2621,24 +2596,6 @@ public void shouldApplySchemaFilters() throws Exception {
stopConnector();
}

@Test
public void shouldFailWhenUserDoesNotHaveAccessToDatabase() {
TestHelper.createTestDatabases(TestHelper.TEST_DATABASE_2);
final Configuration config2 = TestHelper.defaultConfig(
TestHelper.TEST_DATABASE_1, TestHelper.TEST_DATABASE_2)
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.build();
Map<String, Object> result = new HashMap<>();
start(SqlServerConnector.class, config2, (success, message, error) -> {
result.put("success", success);
result.put("message", message);
});
assertEquals(false, result.get("success"));
assertEquals(
"Connector configuration is not valid. User sa does not have access to CDC schema in the following databases: testDB2. This user can only be used in initial_only snapshot mode",
result.get("message"));
}

@Test
@FixFor("DBZ-5033")
public void shouldIgnoreNullOffsetsWhenRecoveringHistory() {
Expand Down Expand Up @@ -2839,6 +2796,42 @@ public void shouldStreamToNewTableAfterRestart() throws Exception {
stopConnector();
}

@Test
public void shouldIgnoreOfflineAndNonExistingDatabases() throws Exception {
TestHelper.createTestDatabases(TestHelper.TEST_DATABASE_1, TestHelper.TEST_DATABASE_2);
final Map<String, String> props = TestHelper.defaultConnectorConfig()
.with(SqlServerConnectorConfig.DATABASE_NAMES.name(), TestHelper.TEST_DATABASE_1 + ","
+ TestHelper.TEST_DATABASE_2 + ",non-existing-database")
.build()
.asMap();
final LogInterceptor logInterceptor = new LogInterceptor(SqlServerConnection.class);

connection.execute("ALTER DATABASE " + TestHelper.TEST_DATABASE_2 + " SET OFFLINE");

try {
SqlServerConnector connector = new SqlServerConnector();

Config validatedConfig = connector.validate(props);
assertNoConfigurationErrors(validatedConfig, SqlServerConnectorConfig.HOSTNAME);

connector.start(props);
List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
assertThat(taskConfigs).hasSize(1);
assertThat(taskConfigs.get(0).get(SqlServerConnectorConfig.DATABASE_NAMES.name())).isEqualTo(TestHelper.TEST_DATABASE_1);

final String message1 = "Database non-existing-database does not exist";
assertThat(logInterceptor.containsMessage(message1)).isTrue();

final String message2 = "Database " + TestHelper.TEST_DATABASE_2 + " is not online (state_desc = OFFLINE)";
assertThat(logInterceptor.containsMessage(message2)).isTrue();
}
finally {
// Set the database back online, since otherwise, it will be impossible to create it again
// https://docs.microsoft.com/en-us/sql/t-sql/statements/drop-database-transact-sql?view=sql-server-ver15#general-remarks
connection.execute("ALTER DATABASE " + TestHelper.TEST_DATABASE_2 + " SET ONLINE");
}
}

@Test
public void shouldStopRetriableRestartsAtConfiguredMaximumDuringSnapshot() throws Exception {
shouldStopRetriableRestartsAtConfiguredMaximum(() -> {
Expand Down
Loading