diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/SqlServerRepository.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/SqlServerRepository.cs index bf5fefa5..dcdb0aec 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/SqlServerRepository.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/SqlServerRepository.cs @@ -82,7 +82,7 @@ public async Task CreateQueueAsync(RetryQueue queue) using var dbConnection = this.connectionProvider.CreateWithinTransaction(this.sqlServerDbSettings); - var queueId = await this.retryQueueRepository.AddAsync(dbConnection, queueDbo); + var queueId = await this.retryQueueRepository.AddAsync(dbConnection, queueDbo, this.sqlServerDbSettings.Schema); foreach (var item in queue.Items) { @@ -101,7 +101,7 @@ public async Task CreateQueueAsync(RetryQueue queue) Description = item.Description }; - var itemId = await this.retryQueueItemRepository.AddAsync(dbConnection, itemDbo); + var itemId = await this.retryQueueItemRepository.AddAsync(dbConnection, itemDbo, this.sqlServerDbSettings.Schema); // item message var messageDbo = new RetryQueueItemMessageDbo @@ -115,7 +115,7 @@ public async Task CreateQueueAsync(RetryQueue queue) Value = item.Message.Value }; - await this.retryQueueItemMessageRepository.AddAsync(dbConnection, messageDbo); + await this.retryQueueItemMessageRepository.AddAsync(dbConnection, messageDbo, this.sqlServerDbSettings.Schema); // message headers var messageHeadersDbos = item.Message.Headers @@ -126,7 +126,7 @@ public async Task CreateQueueAsync(RetryQueue queue) Value = h.Value }); - await this.retryQueueItemMessageHeaderRepository.AddAsync(dbConnection, messageHeadersDbos); + await this.retryQueueItemMessageHeaderRepository.AddAsync(dbConnection, messageHeadersDbos, this.sqlServerDbSettings.Schema); } dbConnection.Commit(); @@ -136,16 +136,16 @@ public async Task GetAllRetryQueueDataAsync(string queueGroupKey) { using (var dbConnection = this.connectionProvider.Create(this.sqlServerDbSettings)) { - var retryQueueDbo = await this.retryQueueRepository.GetQueueAsync(dbConnection, queueGroupKey); + var retryQueueDbo = await this.retryQueueRepository.GetQueueAsync(dbConnection, queueGroupKey, this.sqlServerDbSettings.Schema); if (retryQueueDbo is null) { return null; } - var retryQueueItemsDbo = await this.retryQueueItemRepository.GetItemsByQueueOrderedAsync(dbConnection, retryQueueDbo.IdDomain); + var retryQueueItemsDbo = await this.retryQueueItemRepository.GetItemsByQueueOrderedAsync(dbConnection, retryQueueDbo.IdDomain, this.sqlServerDbSettings.Schema); var itemMessagesDbo = await this.retryQueueItemMessageRepository.GetMessagesOrderedAsync(dbConnection, retryQueueItemsDbo); - var messageHeadersDbo = await this.retryQueueItemMessageHeaderRepository.GetOrderedAsync(dbConnection, itemMessagesDbo); + var messageHeadersDbo = await this.retryQueueItemMessageHeaderRepository.GetOrderedAsync(dbConnection, itemMessagesDbo, this.sqlServerDbSettings.Schema); var dboWrapper = new RetryQueuesDboWrapper { diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemMessageHeaderRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemMessageHeaderRepository.cs index cf783db3..e31375a3 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemMessageHeaderRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemMessageHeaderRepository.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Retry.SqlServer.Repositories +namespace KafkaFlow.Retry.SqlServer.Repositories { using System.Collections.Generic; using System.Threading.Tasks; @@ -6,8 +6,8 @@ internal interface IRetryQueueItemMessageHeaderRepository { - Task AddAsync(IDbConnection dbConnection, IEnumerable retryQueueHeadersDbo); + Task AddAsync(IDbConnection dbConnection, IEnumerable retryQueueHeadersDbo, string schema); - Task> GetOrderedAsync(IDbConnection dbConnection, IEnumerable retryQueueItemMessagesDbo); + Task> GetOrderedAsync(IDbConnection dbConnection, IEnumerable retryQueueItemMessagesDbo, string schema); } } diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemMessageRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemMessageRepository.cs index 9d650e73..ae750981 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemMessageRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemMessageRepository.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Retry.SqlServer.Repositories +namespace KafkaFlow.Retry.SqlServer.Repositories { using System.Collections.Generic; using System.Threading.Tasks; @@ -6,7 +6,7 @@ internal interface IRetryQueueItemMessageRepository { - Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageDbo retryQueueItemMessageDbo); + Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageDbo retryQueueItemMessageDbo, string schema); Task> GetMessagesOrderedAsync(IDbConnection dbConnection, IEnumerable retryQueueItemsDbo); } diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemRepository.cs index 351eb2d0..d0e391d3 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemRepository.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Retry.SqlServer.Repositories +namespace KafkaFlow.Retry.SqlServer.Repositories { using System; using System.Collections.Generic; @@ -10,30 +10,31 @@ internal interface IRetryQueueItemRepository { - Task AddAsync(IDbConnection dbConnection, RetryQueueItemDbo retryQueueItemDbo); + Task AddAsync(IDbConnection dbConnection, RetryQueueItemDbo retryQueueItemDbo, string schema); - Task AnyItemStillActiveAsync(IDbConnection dbConnection, Guid retryQueueId); + Task AnyItemStillActiveAsync(IDbConnection dbConnection, Guid retryQueueId, string schema); - Task GetItemAsync(IDbConnection dbConnection, Guid domainId); + Task GetItemAsync(IDbConnection dbConnection, Guid domainId, string schema); - Task> GetItemsByQueueOrderedAsync(IDbConnection dbConnection, Guid retryQueueId); + Task> GetItemsByQueueOrderedAsync(IDbConnection dbConnection, Guid retryQueueId, string schema); Task> GetItemsOrderedAsync( IDbConnection dbConnection, IEnumerable retryQueueIds, IEnumerable statuses, + string schema, IEnumerable severities = null, int? top = null, StuckStatusFilter stuckStatusFilter = null); - Task> GetNewestItemsAsync(IDbConnection dbConnection, Guid queueIdDomain, int sort); + Task> GetNewestItemsAsync(IDbConnection dbConnection, Guid queueIdDomain, int sort, string schema); - Task> GetPendingItemsAsync(IDbConnection dbConnection, Guid queueIdDomain, int sort); + Task> GetPendingItemsAsync(IDbConnection dbConnection, Guid queueIdDomain, int sort, string schema); - Task IsFirstWaitingInQueueAsync(IDbConnection dbConnection, RetryQueueItemDbo item); + Task IsFirstWaitingInQueueAsync(IDbConnection dbConnection, RetryQueueItemDbo item, string schema); - Task UpdateAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueItemStatus status, int attemptsCount, DateTime lastExecution, string description); + Task UpdateAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueItemStatus status, int attemptsCount, DateTime lastExecution, string description, string schema); - Task UpdateStatusAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueItemStatus status); + Task UpdateStatusAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueItemStatus status, string schema); } -} \ No newline at end of file +} diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs index 7f63f5cc..d0e042b8 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Retry.SqlServer.Repositories +namespace KafkaFlow.Retry.SqlServer.Repositories { using System; using System.Collections.Generic; @@ -9,20 +9,20 @@ internal interface IRetryQueueRepository { - Task AddAsync(IDbConnection dbConnection, RetryQueueDbo retryQueueDbo); + Task AddAsync(IDbConnection dbConnection, RetryQueueDbo retryQueueDbo, string schema); - Task DeleteQueuesAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete); + Task DeleteQueuesAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete, string schema); - Task ExistsActiveAsync(IDbConnection dbConnection, string queueGroupKey); + Task ExistsActiveAsync(IDbConnection dbConnection, string queueGroupKey, string schema); - Task GetQueueAsync(IDbConnection dbConnection, string queueGroupKey); + Task GetQueueAsync(IDbConnection dbConnection, string queueGroupKey, string schema); - Task> GetTopSortedQueuesOrderedAsync(IDbConnection dbConnection, RetryQueueStatus retryQueueStatus, GetQueuesSortOption sortOption, string searchGroupKey, int top); + Task> GetTopSortedQueuesOrderedAsync(IDbConnection dbConnection, RetryQueueStatus retryQueueStatus, GetQueuesSortOption sortOption, string searchGroupKey, int top, string schema); - Task UpdateAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus, DateTime lastExecution); + Task UpdateAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus, DateTime lastExecution, string schema); - Task UpdateLastExecutionAsync(IDbConnection dbConnection, Guid idDomain, DateTime lastExecution); + Task UpdateLastExecutionAsync(IDbConnection dbConnection, Guid idDomain, DateTime lastExecution, string schema); - Task UpdateStatusAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus); + Task UpdateStatusAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus, string schema); } -} \ No newline at end of file +} diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemMessageHeaderRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemMessageHeaderRepository.cs index 6161c843..c97607d9 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemMessageHeaderRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemMessageHeaderRepository.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Retry.SqlServer.Repositories +namespace KafkaFlow.Retry.SqlServer.Repositories { using System.Collections.Generic; using System.Data.SqlClient; @@ -9,18 +9,18 @@ internal sealed class RetryQueueItemMessageHeaderRepository : IRetryQueueItemMessageHeaderRepository { - public async Task AddAsync(IDbConnection dbConnection, IEnumerable retryQueueHeadersDbo) + public async Task AddAsync(IDbConnection dbConnection, IEnumerable retryQueueHeadersDbo, string schema) { Guard.Argument(dbConnection, nameof(dbConnection)).NotNull(); Guard.Argument(retryQueueHeadersDbo, nameof(retryQueueHeadersDbo)).NotNull(); foreach (var header in retryQueueHeadersDbo) { - await this.AddAsync(dbConnection, header).ConfigureAwait(false); + await this.AddAsync(dbConnection, header, schema).ConfigureAwait(false); } } - public async Task> GetOrderedAsync(IDbConnection dbConnection, IEnumerable retryQueueItemMessagesDbo) + public async Task> GetOrderedAsync(IDbConnection dbConnection, IEnumerable retryQueueItemMessagesDbo, string schema) { Guard.Argument(dbConnection, nameof(dbConnection)).NotNull(); Guard.Argument(retryQueueItemMessagesDbo, nameof(retryQueueItemMessagesDbo)).NotNull(); @@ -29,8 +29,8 @@ public async Task> GetOrderedAsync(IDbConn { command.CommandType = System.Data.CommandType.Text; command.CommandText = $@"SELECT * - FROM [RetryItemMessageHeaders] h - INNER JOIN [RetryQueueItems] rqi ON rqi.Id = h.IdItemMessage + FROM [{schema}].[RetryItemMessageHeaders] h + INNER JOIN [{schema}].[RetryQueueItems] rqi ON rqi.Id = h.IdItemMessage WHERE h.IdItemMessage IN ({string.Join(",", retryQueueItemMessagesDbo.Select(x => $"'{x.IdRetryQueueItem}'"))}) ORDER BY rqi.IdRetryQueue, h.IdItemMessage"; @@ -38,7 +38,7 @@ WHERE h.IdItemMessage IN ({string.Join(",", retryQueueItemMessagesDbo.Select(x = } } - private async Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageHeaderDbo retryQueueHeaderDbo) + private async Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageHeaderDbo retryQueueHeaderDbo, string schema) { Guard.Argument(dbConnection, nameof(dbConnection)).NotNull(); Guard.Argument(retryQueueHeaderDbo, nameof(retryQueueHeaderDbo)).NotNull(); @@ -46,7 +46,7 @@ private async Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageHea using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"INSERT INTO [RetryItemMessageHeaders] + command.CommandText = $@"INSERT INTO [{schema}].[RetryItemMessageHeaders] (IdItemMessage, [Key], Value) VALUES (@IdItemMessage, @Key, @Value)"; @@ -90,4 +90,4 @@ private RetryQueueItemMessageHeaderDbo FillDbo(SqlDataReader reader, int idColum }; } } -} \ No newline at end of file +} diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemMessageRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemMessageRepository.cs index 521c09a5..05f062b5 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemMessageRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemMessageRepository.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Retry.SqlServer.Repositories +namespace KafkaFlow.Retry.SqlServer.Repositories { using System.Collections.Generic; using System.Data.SqlClient; @@ -8,7 +8,7 @@ internal sealed class RetryQueueItemMessageRepository : IRetryQueueItemMessageRepository { - public async Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageDbo retryQueueItemMessageDbo) + public async Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageDbo retryQueueItemMessageDbo, string schema) { Guard.Argument(dbConnection, nameof(dbConnection)).NotNull(); Guard.Argument(retryQueueItemMessageDbo, nameof(retryQueueItemMessageDbo)).NotNull(); @@ -16,7 +16,7 @@ public async Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageDbo using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"INSERT INTO [ItemMessages] + command.CommandText = $@"INSERT INTO [{schema}].[ItemMessages] (IdRetryQueueItem, [Key], Value, TopicName, Partition, Offset, UtcTimeStamp) VALUES (@idRetryQueueItem, @key, @value, @topicName, @partition, @offSet, @utcTimeStamp)"; @@ -89,4 +89,4 @@ private RetryQueueItemMessageDbo FillDbo(SqlDataReader reader) }; } } -} \ No newline at end of file +} diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemRepository.cs index 02f4f8c4..416bf45e 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemRepository.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Retry.SqlServer.Repositories +namespace KafkaFlow.Retry.SqlServer.Repositories { using System; using System.Collections.Generic; @@ -13,7 +13,7 @@ internal sealed class RetryQueueItemRepository : IRetryQueueItemRepository { - public async Task AddAsync(IDbConnection dbConnection, RetryQueueItemDbo retryQueueItemDbo) + public async Task AddAsync(IDbConnection dbConnection, RetryQueueItemDbo retryQueueItemDbo, string schema) { Guard.Argument(dbConnection).NotNull(); Guard.Argument(retryQueueItemDbo).NotNull(); @@ -21,11 +21,11 @@ public async Task AddAsync(IDbConnection dbConnection, RetryQueueItemDbo r using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"INSERT INTO [RetryQueueItems] + command.CommandText = $@"INSERT INTO [{schema}].[RetryQueueItems] (IdDomain, IdRetryQueue, IdDomainRetryQueue, IdItemStatus, IdSeverityLevel, AttemptsCount, Sort, CreationDate, LastExecution, ModifiedStatusDate, Description) VALUES (@idDomain, @idRetryQueue, @idDomainRetryQueue, @idItemStatus, @idSeverityLevel, @attemptsCount, - (SELECT COUNT(1) FROM [RetryQueueItems] WHERE IdDomainRetryQueue = @idDomainRetryQueue), + (SELECT COUNT(1) FROM [{schema}].[RetryQueueItems] WHERE IdDomainRetryQueue = @idDomainRetryQueue), @creationDate, @lastExecution, @modifiedStatusDate, @description); SELECT SCOPE_IDENTITY()"; @@ -45,7 +45,7 @@ public async Task AddAsync(IDbConnection dbConnection, RetryQueueItemDbo r } } - public async Task AnyItemStillActiveAsync(IDbConnection dbConnection, Guid domainRetryQueueId) + public async Task AnyItemStillActiveAsync(IDbConnection dbConnection, Guid domainRetryQueueId, string schema) { Guard.Argument(dbConnection).NotNull(); Guard.Argument(domainRetryQueueId).NotDefault(); @@ -53,8 +53,8 @@ public async Task AnyItemStillActiveAsync(IDbConnection dbConnection, Guid using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"SELECT 1 WHERE EXISTS( - SELECT TOP 1 * FROM [RetryQueueItems] + command.CommandText = $@"SELECT 1 WHERE EXISTS( + SELECT TOP 1 * FROM [{schema}].[RetryQueueItems] WITH (NOLOCK) WHERE IdDomainRetryQueue = @IdDomainRetryQueue AND IdItemStatus IN (@IdItemStatusWaiting, @IdItemStatusInRetry))"; @@ -69,7 +69,7 @@ SELECT TOP 1 * FROM [RetryQueueItems] } } - public async Task GetItemAsync(IDbConnection dbConnection, Guid domainId) + public async Task GetItemAsync(IDbConnection dbConnection, Guid domainId, string schema) { Guard.Argument(dbConnection, nameof(dbConnection)).NotNull(); Guard.Argument(domainId, nameof(domainId)).NotDefault(); @@ -77,8 +77,8 @@ public async Task GetItemAsync(IDbConnection dbConnection, Gu using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"SELECT * - FROM [RetryQueueItems] + command.CommandText = $@"SELECT * + FROM [{schema}].[RetryQueueItems] WITH (NOLOCK) WHERE IdDomain = @IdDomain"; @@ -88,7 +88,7 @@ FROM [RetryQueueItems] } } - public async Task> GetItemsByQueueOrderedAsync(IDbConnection dbConnection, Guid domainRetryQueueId) + public async Task> GetItemsByQueueOrderedAsync(IDbConnection dbConnection, Guid domainRetryQueueId, string schema) { Guard.Argument(dbConnection, nameof(dbConnection)).NotNull(); Guard.Argument(domainRetryQueueId, nameof(domainRetryQueueId)).NotDefault(); @@ -96,8 +96,8 @@ public async Task> GetItemsByQueueOrderedAsync(IDbConne using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"SELECT * - FROM [RetryQueueItems] + command.CommandText = $@"SELECT * + FROM [{schema}].[RetryQueueItems] WITH (NOLOCK) WHERE IdDomainRetryQueue = @IdDomainRetryQueue ORDER BY Sort ASC"; @@ -112,6 +112,7 @@ public async Task> GetItemsOrderedAsync( IDbConnection dbConnection, IEnumerable retryQueueIds, IEnumerable statuses, + string schema, IEnumerable severities, int? top = null, StuckStatusFilter stuckStatusFilter = null) @@ -133,20 +134,20 @@ public async Task> GetItemsOrderedAsync( } query = string.Concat(query, $@" * - FROM [RetryQueueItems] + FROM [{schema}].[RetryQueueItems] WITH (NOLOCK) WHERE IdDomainRetryQueue IN ({string.Join(",", retryQueueIds.Select(x => $"'{x}'"))})"); if (stuckStatusFilter is null) { - query = string.Concat(query, $" AND IdItemStatus IN({ string.Join(",", statuses.Select(x => (byte)x))})"); + query = string.Concat(query, $" AND IdItemStatus IN({string.Join(",", statuses.Select(x => (byte)x))})"); } else { query = string.Concat(query, $@" AND( - IdItemStatus IN({ string.Join(",", statuses.Select(x => (byte)x))}) + IdItemStatus IN({string.Join(",", statuses.Select(x => (byte)x))}) OR( - IdItemStatus = { (byte)stuckStatusFilter.ItemStatus} + IdItemStatus = {(byte)stuckStatusFilter.ItemStatus} AND DATEADD(SECOND, {Math.Floor(stuckStatusFilter.ExpirationInterval.TotalSeconds)}, ModifiedStatusDate) < @DateTimeUtcNow ) )"); @@ -164,7 +165,7 @@ AND DATEADD(SECOND, {Math.Floor(stuckStatusFilter.ExpirationInterval.TotalSecond } } - public async Task> GetNewestItemsAsync(IDbConnection dbConnection, Guid queueIdDomain, int sort) + public async Task> GetNewestItemsAsync(IDbConnection dbConnection, Guid queueIdDomain, int sort, string schema) { Guard.Argument(queueIdDomain, nameof(queueIdDomain)).NotDefault(); Guard.Argument(sort, nameof(sort)).NotNegative(); @@ -173,7 +174,7 @@ public async Task> GetNewestItemsAsync(IDbConnection db { command.CommandType = System.Data.CommandType.Text; command.CommandText = $@"SELECT * - FROM [RetryQueueItems] + FROM [{schema}].[RetryQueueItems] WITH (NOLOCK) WHERE IdDomainRetryQueue = @IdDomainRetryQueue AND IdItemStatus IN (@IdItemStatusWaiting, @IdItemStatusInRetry) @@ -189,7 +190,7 @@ AND Sort > @Sort } } - public async Task> GetPendingItemsAsync(IDbConnection dbConnection, Guid queueIdDomain, int sort) + public async Task> GetPendingItemsAsync(IDbConnection dbConnection, Guid queueIdDomain, int sort, string schema) { Guard.Argument(queueIdDomain, nameof(queueIdDomain)).NotDefault(); Guard.Argument(sort, nameof(sort)).NotNegative(); @@ -198,7 +199,7 @@ public async Task> GetPendingItemsAsync(IDbConnection d { command.CommandType = System.Data.CommandType.Text; command.CommandText = $@"SELECT * - FROM [RetryQueueItems] + FROM [{schema}].[RetryQueueItems] WITH (NOLOCK) WHERE IdDomainRetryQueue = @IdDomainRetryQueue AND IdItemStatus IN (@IdItemStatusWaiting, @IdItemStatusInRetry) @@ -214,12 +215,13 @@ AND Sort < @Sort } } - public async Task IsFirstWaitingInQueueAsync(IDbConnection dbConnection, RetryQueueItemDbo item) + public async Task IsFirstWaitingInQueueAsync(IDbConnection dbConnection, RetryQueueItemDbo item, string schema) { var sortedItems = await this.GetItemsOrderedAsync( dbConnection, new Guid[] { item.DomainRetryQueueId }, new RetryQueueItemStatus[] { RetryQueueItemStatus.Waiting }, + schema, null, 1) .ConfigureAwait(false); @@ -232,12 +234,12 @@ public async Task IsFirstWaitingInQueueAsync(IDbConnection dbConnection, R return false; } - public async Task UpdateAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueItemStatus status, int attemptsCount, DateTime lastExecution, string description) + public async Task UpdateAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueItemStatus status, int attemptsCount, DateTime lastExecution, string description, string schema) { using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"UPDATE [RetryQueueItems] + command.CommandText = $@"UPDATE [{schema}].[RetryQueueItems] SET IdItemStatus = @IdItemStatus, AttemptsCount = @AttemptsCount, LastExecution = @LastExecution, @@ -256,7 +258,7 @@ public async Task UpdateAsync(IDbConnection dbConnection, Guid idDomain, Re } } - public async Task UpdateStatusAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueItemStatus status) + public async Task UpdateStatusAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueItemStatus status, string schema) { Guard.Argument(dbConnection, nameof(dbConnection)).NotNull(); Guard.Argument(idDomain).NotDefault(); @@ -265,7 +267,7 @@ public async Task UpdateStatusAsync(IDbConnection dbConnection, Guid idDoma using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"UPDATE [RetryQueueItems] + command.CommandText = $@"UPDATE [{schema}].[RetryQueueItems] SET IdItemStatus = @IdItemStatus, ModifiedStatusDate = @DateTimeUtcNow WHERE IdDomain = @IdDomain"; diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs index 54e5d3ea..3bc304ce 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Retry.SqlServer.Repositories +namespace KafkaFlow.Retry.SqlServer.Repositories { using System; using System.Collections.Generic; @@ -10,12 +10,12 @@ internal sealed class RetryQueueRepository : IRetryQueueRepository { - public async Task AddAsync(IDbConnection dbConnection, RetryQueueDbo retryQueueDbo) + public async Task AddAsync(IDbConnection dbConnection, RetryQueueDbo retryQueueDbo, string schema) { using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"INSERT INTO [RetryQueues] + command.CommandText = $@"INSERT INTO {schema}.[RetryQueues] (IdDomain, IdStatus, SearchGroupKey, QueueGroupKey, CreationDate, LastExecution) VALUES (@idDomain, @idStatus, @searchGroupKey, @queueGroupKey, @creationDate, @lastExecution); @@ -33,14 +33,14 @@ public async Task AddAsync(IDbConnection dbConnection, RetryQueueDbo retry } } - public async Task DeleteQueuesAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete) + public async Task DeleteQueuesAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete, string schema) { using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"DELETE FROM [RetryQueues] WHERE Id IN + command.CommandText = $@"DELETE FROM [{schema}].[RetryQueues] WHERE Id IN ( - SELECT Id FROM [RetryQueues] rq + SELECT Id FROM [{schema}].[RetryQueues] rq WHERE rq.SearchGroupKey = @SearchGroupKey AND rq.LastExecution < @MaxLastExecutionDateToBeKept AND rq.IdStatus = @IdStatus @@ -58,13 +58,13 @@ FETCH NEXT @MaxRowsToDelete ROWS ONLY } } - public async Task ExistsActiveAsync(IDbConnection dbConnection, string queueGroupKey) + public async Task ExistsActiveAsync(IDbConnection dbConnection, string queueGroupKey, string schema) { using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"SELECT COUNT(1) - FROM [RetryQueues] + command.CommandText = $@"SELECT COUNT(1) + FROM [{schema}].[RetryQueues] WHERE QueueGroupKey = @QueueGroupKey AND IdStatus <> @IdStatus"; command.Parameters.AddWithValue("QueueGroupKey", queueGroupKey); @@ -74,13 +74,13 @@ FROM [RetryQueues] } } - public async Task GetQueueAsync(IDbConnection dbConnection, string queueGroupKey) + public async Task GetQueueAsync(IDbConnection dbConnection, string queueGroupKey, string schema) { using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"SELECT Id, IdDomain, IdStatus, SearchGroupKey, QueueGroupKey, CreationDate, LastExecution - FROM [RetryQueues] + command.CommandText = $@"SELECT Id, IdDomain, IdStatus, SearchGroupKey, QueueGroupKey, CreationDate, LastExecution + FROM [{schema}].[RetryQueues] WHERE QueueGroupKey = @QueueGroupKey ORDER BY Id"; @@ -90,14 +90,14 @@ FROM [RetryQueues] } } - public async Task> GetTopSortedQueuesOrderedAsync(IDbConnection dbConnection, RetryQueueStatus retryQueueStatus, GetQueuesSortOption sortOption, string searchGroupKey, int top) + public async Task> GetTopSortedQueuesOrderedAsync(IDbConnection dbConnection, RetryQueueStatus retryQueueStatus, GetQueuesSortOption sortOption, string searchGroupKey, int top, string schema) { using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; var innerQuery = $@" SELECT TOP({top}) Id, IdDomain, IdStatus, SearchGroupKey, QueueGroupKey, CreationDate, LastExecution - FROM [RetryQueues] + FROM [{schema}].[RetryQueues] WHERE IdStatus = @IdStatus"; if (searchGroupKey is object) @@ -117,12 +117,12 @@ FROM [RetryQueues] } } - public async Task UpdateAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus, DateTime lastExecution) + public async Task UpdateAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus, DateTime lastExecution, string schema) { using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"UPDATE [RetryQueues] + command.CommandText = $@"UPDATE [{schema}].[RetryQueues] SET IdStatus = @IdStatus, LastExecution = @LastExecution WHERE IdDomain = @IdDomain"; @@ -135,12 +135,12 @@ public async Task UpdateAsync(IDbConnection dbConnection, Guid idDomain, Re } } - public async Task UpdateLastExecutionAsync(IDbConnection dbConnection, Guid idDomain, DateTime lastExecution) + public async Task UpdateLastExecutionAsync(IDbConnection dbConnection, Guid idDomain, DateTime lastExecution, string schema) { using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"UPDATE [RetryQueues] + command.CommandText = $@"UPDATE [{schema}].[RetryQueues] SET LastExecution = @LastExecution WHERE IdDomain = @IdDomain"; @@ -151,12 +151,12 @@ public async Task UpdateLastExecutionAsync(IDbConnection dbConnection, Guid } } - public async Task UpdateStatusAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus) + public async Task UpdateStatusAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus, string schema) { using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"UPDATE [RetryQueues] + command.CommandText = $@"UPDATE [{schema}][RetryQueues] SET IdStatus = @IdStatus WHERE IdDomain = @IdDomain"; @@ -222,4 +222,4 @@ private string GetOrderByCommandString(GetQueuesSortOption sortOption) } } } -} \ No newline at end of file +} diff --git a/src/KafkaFlow.Retry.SqlServer/RetryDurableDefinitionBuilderExtension.cs b/src/KafkaFlow.Retry.SqlServer/RetryDurableDefinitionBuilderExtension.cs index e3ee1897..955a894b 100644 --- a/src/KafkaFlow.Retry.SqlServer/RetryDurableDefinitionBuilderExtension.cs +++ b/src/KafkaFlow.Retry.SqlServer/RetryDurableDefinitionBuilderExtension.cs @@ -5,18 +5,28 @@ public static class RetryDurableDefinitionBuilderExtension public static RetryDurableDefinitionBuilder WithSqlServerDataProvider( this RetryDurableDefinitionBuilder retryDurableDefinitionBuilder, string connectionString, - string databaseName) + string databaseName, + string schema) { retryDurableDefinitionBuilder.WithRepositoryProvider( new SqlServerDbDataProviderFactory() .Create( new SqlServerDbSettings( connectionString, - databaseName) + databaseName, + schema) ) ); return retryDurableDefinitionBuilder; } + + public static RetryDurableDefinitionBuilder WithSqlServerDataProvider( + this RetryDurableDefinitionBuilder retryDurableDefinitionBuilder, + string connectionString, + string databaseName) + { + return WithSqlServerDataProvider(retryDurableDefinitionBuilder, connectionString, databaseName, null); + } } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.SqlServer/RetryQueueDataProvider.cs b/src/KafkaFlow.Retry.SqlServer/RetryQueueDataProvider.cs index e209adea..410fd63b 100644 --- a/src/KafkaFlow.Retry.SqlServer/RetryQueueDataProvider.cs +++ b/src/KafkaFlow.Retry.SqlServer/RetryQueueDataProvider.cs @@ -63,7 +63,7 @@ public async Task CheckQueueAsync(CheckQueueInput input) // Tries to find an active queue for the GroupKey using (var dbConnection = this.connectionProvider.Create(this.sqlServerDbSettings)) { - var exists = await this.retryQueueRepository.ExistsActiveAsync(dbConnection, input.QueueGroupKey).ConfigureAwait(false); + var exists = await this.retryQueueRepository.ExistsActiveAsync(dbConnection, input.QueueGroupKey, this.sqlServerDbSettings.Schema).ConfigureAwait(false); return new CheckQueueResult( exists ? @@ -78,7 +78,7 @@ public async Task CheckQueueNewestItemsAsync(QueueNewest using (var dbConnection = this.connectionProvider.Create(this.sqlServerDbSettings)) { - var itemsDbo = await this.retryQueueItemRepository.GetNewestItemsAsync(dbConnection, input.QueueId, input.Sort).ConfigureAwait(false); + var itemsDbo = await this.retryQueueItemRepository.GetNewestItemsAsync(dbConnection, input.QueueId, input.Sort, this.sqlServerDbSettings.Schema).ConfigureAwait(false); if (itemsDbo.Any()) { @@ -95,7 +95,7 @@ public async Task CheckQueuePendingItemsAsync(QueuePend using (var dbConnection = this.connectionProvider.Create(this.sqlServerDbSettings)) { - var itemsDbo = await this.retryQueueItemRepository.GetPendingItemsAsync(dbConnection, input.QueueId, input.Sort).ConfigureAwait(false); + var itemsDbo = await this.retryQueueItemRepository.GetPendingItemsAsync(dbConnection, input.QueueId, input.Sort, this.sqlServerDbSettings.Schema).ConfigureAwait(false); if (itemsDbo.Any()) { @@ -117,7 +117,8 @@ public async Task DeleteQueuesAsync(DeleteQueuesInput input) input.SearchGroupKey, input.RetryQueueStatus, input.MaxLastExecutionDateToBeKept, - input.MaxRowsToDelete) + input.MaxRowsToDelete, + this.sqlServerDbSettings.Schema) .ConfigureAwait(false); return new DeleteQueuesResult(totalQueuesDeleted); @@ -137,7 +138,8 @@ public async Task GetQueuesAsync(GetQueuesInput input) input.Status, input.SortOption, input.SearchGroupKey, - input.TopQueues) + input.TopQueues, + this.sqlServerDbSettings.Schema) .ConfigureAwait(false); if (!dboWrapper.QueuesDbos.Any()) @@ -155,6 +157,7 @@ public async Task GetQueuesAsync(GetQueuesInput input) dbConnection, new Guid[] { queueId }, input.ItemsStatuses, + this.sqlServerDbSettings.Schema, input.SeverityLevels, input.TopItemsByQueue, input.StuckStatusFilter) @@ -171,7 +174,7 @@ public async Task GetQueuesAsync(GetQueuesInput input) } dboWrapper.MessagesDbos = await this.retryQueueItemMessageRepository.GetMessagesOrderedAsync(dbConnection, dboWrapper.ItemsDbos).ConfigureAwait(false); - dboWrapper.HeadersDbos = await this.retryQueueItemMessageHeaderRepository.GetOrderedAsync(dbConnection, dboWrapper.MessagesDbos).ConfigureAwait(false); + dboWrapper.HeadersDbos = await this.retryQueueItemMessageHeaderRepository.GetOrderedAsync(dbConnection, dboWrapper.MessagesDbos, this.sqlServerDbSettings.Schema).ConfigureAwait(false); } var queues = this.retryQueueReader.Read(dboWrapper); @@ -185,7 +188,7 @@ public async Task SaveToQueueAsync(SaveToQueueInput input) using (var dbConnection = this.connectionProvider.CreateWithinTransaction(this.sqlServerDbSettings)) { - var retryQueueDbo = await this.retryQueueRepository.GetQueueAsync(dbConnection, input.QueueGroupKey).ConfigureAwait(false); + var retryQueueDbo = await this.retryQueueRepository.GetQueueAsync(dbConnection, input.QueueGroupKey, this.sqlServerDbSettings.Schema).ConfigureAwait(false); SaveToQueueResultStatus resultStatus; @@ -239,7 +242,7 @@ public async Task UpdateItemStatusAsync(UpdateItemStatusInput using (var dbConnection = this.connectionProvider.Create(this.sqlServerDbSettings)) { var totalItemsUpdated = await this.retryQueueItemRepository - .UpdateStatusAsync(dbConnection, input.ItemId, input.Status).ConfigureAwait(false); + .UpdateStatusAsync(dbConnection, input.ItemId, input.Status, this.sqlServerDbSettings.Schema).ConfigureAwait(false); return new UpdateItemResult( input.ItemId, @@ -269,15 +272,15 @@ private async Task AddItemAsync(IDbConnection dbConnection, SaveToQueueInput inp { var retryQueueItemDbo = this.retryQueueItemDboFactory.Create(input, retryQueueId, retryQueueDomainId); - var retryQueueItemId = await this.retryQueueItemRepository.AddAsync(dbConnection, retryQueueItemDbo).ConfigureAwait(false); + var retryQueueItemId = await this.retryQueueItemRepository.AddAsync(dbConnection, retryQueueItemDbo, this.sqlServerDbSettings.Schema).ConfigureAwait(false); // queue item message var retryQueueItemMessageDbo = this.retryQueueItemMessageDboFactory.Create(input.Message, retryQueueItemId); - await this.retryQueueItemMessageRepository.AddAsync(dbConnection, retryQueueItemMessageDbo).ConfigureAwait(false); + await this.retryQueueItemMessageRepository.AddAsync(dbConnection, retryQueueItemMessageDbo, this.sqlServerDbSettings.Schema).ConfigureAwait(false); // queue item message header var retryQueueHeadersDbo = this.retryQueueItemMessageHeaderDboFactory.Create(input.Message.Headers, retryQueueItemId); - await this.retryQueueItemMessageHeaderRepository.AddAsync(dbConnection, retryQueueHeadersDbo).ConfigureAwait(false); + await this.retryQueueItemMessageHeaderRepository.AddAsync(dbConnection, retryQueueHeadersDbo, this.sqlServerDbSettings.Schema).ConfigureAwait(false); } private async Task AddItemIntoAnExistingQueueAsync(IDbConnection dbConnection, SaveToQueueInput input, RetryQueueDbo retryQueueDbo) @@ -291,7 +294,7 @@ private async Task AddItemIntoAnExistingQueueAsync(IDbConnection dbConnection, S if (retryQueueDbo.Status == RetryQueueStatus.Done) { // The queue was marked as DONE. With this new item, the status should return to ACTIVE. - await this.retryQueueRepository.UpdateStatusAsync(dbConnection, retryQueueDbo.IdDomain, RetryQueueStatus.Active).ConfigureAwait(false); + await this.retryQueueRepository.UpdateStatusAsync(dbConnection, retryQueueDbo.IdDomain, RetryQueueStatus.Active, this.sqlServerDbSettings.Schema).ConfigureAwait(false); } } @@ -300,7 +303,7 @@ private async Task CreateItemIntoANewQueueAsync(IDbConnection dbConnection, Save var retryQueueDbo = this.retryQueueDboFactory.Create(input); // queue - var retryQueueId = await this.retryQueueRepository.AddAsync(dbConnection, retryQueueDbo).ConfigureAwait(false); + var retryQueueId = await this.retryQueueRepository.AddAsync(dbConnection, retryQueueDbo, this.sqlServerDbSettings.Schema).ConfigureAwait(false); // queue item await this.AddItemAsync(dbConnection, input, retryQueueId, retryQueueDbo.IdDomain).ConfigureAwait(false); @@ -313,11 +316,11 @@ private bool IsItemInWaitingState(RetryQueueItemDbo item) private async Task TryUpdateQueueToDoneAsync(IDbConnectionWithinTransaction dbConnection, Guid queueId) { - var anyItemStillActive = await this.retryQueueItemRepository.AnyItemStillActiveAsync(dbConnection, queueId).ConfigureAwait(false); + var anyItemStillActive = await this.retryQueueItemRepository.AnyItemStillActiveAsync(dbConnection, queueId, this.sqlServerDbSettings.Schema).ConfigureAwait(false); if (!anyItemStillActive) { - var queueRowsAffected = await this.retryQueueRepository.UpdateStatusAsync(dbConnection, queueId, RetryQueueStatus.Done).ConfigureAwait(false); + var queueRowsAffected = await this.retryQueueRepository.UpdateStatusAsync(dbConnection, queueId, RetryQueueStatus.Done, this.sqlServerDbSettings.Schema).ConfigureAwait(false); if (queueRowsAffected == 0) { @@ -339,7 +342,7 @@ private async Task UpdateItemAndQueueStatusAsync(UpdateItemSta using (var dbConnection = this.connectionProvider.CreateWithinTransaction(this.sqlServerDbSettings)) { - var item = await this.retryQueueItemRepository.GetItemAsync(dbConnection, input.ItemId).ConfigureAwait(false); + var item = await this.retryQueueItemRepository.GetItemAsync(dbConnection, input.ItemId, this.sqlServerDbSettings.Schema).ConfigureAwait(false); if (item is null) { @@ -351,7 +354,7 @@ private async Task UpdateItemAndQueueStatusAsync(UpdateItemSta return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.ItemIsNotInWaitingState); } - if (!await this.retryQueueItemRepository.IsFirstWaitingInQueueAsync(dbConnection, item).ConfigureAwait(false)) + if (!await this.retryQueueItemRepository.IsFirstWaitingInQueueAsync(dbConnection, item, this.sqlServerDbSettings.Schema).ConfigureAwait(false)) { return new UpdateItemResult(input.ItemId, UpdateItemResultStatus.ItemIsNotTheFirstWaitingInQueue); } @@ -381,7 +384,7 @@ private async Task UpdateItemAndTryUpdateQueueToDoneAsync(Upda using (var dbConnection = this.connectionProvider.CreateWithinTransaction(this.sqlServerDbSettings)) { //update item - var itemRowsAffected = await this.retryQueueItemRepository.UpdateAsync(dbConnection, input.ItemId, input.Status, input.AttemptCount, input.LastExecution, input.Description).ConfigureAwait(false); + var itemRowsAffected = await this.retryQueueItemRepository.UpdateAsync(dbConnection, input.ItemId, input.Status, input.AttemptCount, input.LastExecution, input.Description, this.sqlServerDbSettings.Schema).ConfigureAwait(false); if (itemRowsAffected == 0) { @@ -408,7 +411,7 @@ private async Task UpdateQueueAndAllItemsAsync(UpdateItemsInQ { using (var dbConnection = this.connectionProvider.CreateWithinTransaction(this.sqlServerDbSettings)) { - var queue = await this.retryQueueRepository.GetQueueAsync(dbConnection, input.QueueGroupKey).ConfigureAwait(false); + var queue = await this.retryQueueRepository.GetQueueAsync(dbConnection, input.QueueGroupKey, this.sqlServerDbSettings.Schema).ConfigureAwait(false); if (queue is null) { @@ -426,7 +429,7 @@ private async Task UpdateQueueAndAllItemsAsync(UpdateItemsInQ } var items = await this.retryQueueItemRepository - .GetItemsOrderedAsync(dbConnection, new Guid[] { queue.IdDomain }, new RetryQueueItemStatus[] { RetryQueueItemStatus.Waiting }) + .GetItemsOrderedAsync(dbConnection, new Guid[] { queue.IdDomain }, new RetryQueueItemStatus[] { RetryQueueItemStatus.Waiting }, this.sqlServerDbSettings.Schema) .ConfigureAwait(false); if (!items.Any()) @@ -453,7 +456,7 @@ private async Task UpdateQueueAndAllItemsAsync(UpdateItemsInQ dbConnection.Commit(); - queue = await this.retryQueueRepository.GetQueueAsync(dbConnection, input.QueueGroupKey).ConfigureAwait(false); + queue = await this.retryQueueRepository.GetQueueAsync(dbConnection, input.QueueGroupKey, this.sqlServerDbSettings.Schema).ConfigureAwait(false); return new UpdateQueueResult(input.QueueGroupKey, updateQueueResultStatus, queue.Status); } @@ -462,12 +465,12 @@ private async Task UpdateQueueAndAllItemsAsync(UpdateItemsInQ private async Task UpdateQueueLastExecutionAndTryUpdateQueueToDoneAsync(IDbConnectionWithinTransaction dbConnection, Guid queueId, DateTime lastExecution) { // check if the queue can be updated to done as well - var anyItemStillActive = await this.retryQueueItemRepository.AnyItemStillActiveAsync(dbConnection, queueId).ConfigureAwait(false); + var anyItemStillActive = await this.retryQueueItemRepository.AnyItemStillActiveAsync(dbConnection, queueId, this.sqlServerDbSettings.Schema).ConfigureAwait(false); if (anyItemStillActive) { // update queue last execution only - var queueLastExecutionRowsAffected = await this.retryQueueRepository.UpdateLastExecutionAsync(dbConnection, queueId, lastExecution).ConfigureAwait(false); + var queueLastExecutionRowsAffected = await this.retryQueueRepository.UpdateLastExecutionAsync(dbConnection, queueId, lastExecution, this.sqlServerDbSettings.Schema).ConfigureAwait(false); if (queueLastExecutionRowsAffected == 0) { @@ -477,7 +480,7 @@ private async Task UpdateQueueLastExecutionAndTryUpdate else { // update queue last execution and the status to done - var queueRowsAffected = await this.retryQueueRepository.UpdateAsync(dbConnection, queueId, RetryQueueStatus.Done, lastExecution).ConfigureAwait(false); + var queueRowsAffected = await this.retryQueueRepository.UpdateAsync(dbConnection, queueId, RetryQueueStatus.Done, lastExecution, this.sqlServerDbSettings.Schema).ConfigureAwait(false); if (queueRowsAffected == 0) { diff --git a/src/KafkaFlow.Retry.SqlServer/SqlServerDbSettings.cs b/src/KafkaFlow.Retry.SqlServer/SqlServerDbSettings.cs index 00b13b4d..be7b8bdc 100644 --- a/src/KafkaFlow.Retry.SqlServer/SqlServerDbSettings.cs +++ b/src/KafkaFlow.Retry.SqlServer/SqlServerDbSettings.cs @@ -1,22 +1,25 @@ namespace KafkaFlow.Retry.SqlServer { - using Dawn; using System.Diagnostics.CodeAnalysis; + using Dawn; [ExcludeFromCodeCoverage] public class SqlServerDbSettings { - public SqlServerDbSettings(string connectionString, string databaseName) + public SqlServerDbSettings(string connectionString, string databaseName, string schema = "dbo") { Guard.Argument(connectionString).NotNull().NotEmpty(); Guard.Argument(databaseName).NotNull().NotEmpty(); ConnectionString = connectionString; DatabaseName = databaseName; + Schema = schema; } public string ConnectionString { get; } public string DatabaseName { get; } + + public string Schema { get; } } -} +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/RetryDurableDefinitionBuilderExtensionTests.cs b/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/RetryDurableDefinitionBuilderExtensionTests.cs index b7599af1..945ec784 100644 --- a/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/RetryDurableDefinitionBuilderExtensionTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/RetryDurableDefinitionBuilderExtensionTests.cs @@ -18,5 +18,18 @@ public void RetryDurableDefinitionBuilderExtension_WithSqlServerDataProvider_Suc // Arrange result.Should().NotBeNull(); } + + [Fact] + public void RetryDurableDefinitionBuilderExtension_WithSqlServerDataProviderAndSchema_Success() + { + // Arrange + var builder = new RetryDurableDefinitionBuilder(); + + // Act + var result = builder.WithSqlServerDataProvider("connectionString", "databaseName", "schema"); + + // Arrange + result.Should().NotBeNull(); + } } } \ No newline at end of file