Skip to content

Commit

Permalink
chore: added ability to schema sql server operations
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosgoias committed Aug 2, 2023
1 parent f8fe911 commit 70a9e65
Show file tree
Hide file tree
Showing 13 changed files with 154 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public async Task CreateQueueAsync(RetryQueue queue)

using var dbConnection = this.connectionProvider.CreateWithinTransaction(this.sqlServerDbSettings);

var queueId = await this.retryQueueRepository.AddAsync(dbConnection, queueDbo);
var queueId = await this.retryQueueRepository.AddAsync(dbConnection, queueDbo, this.sqlServerDbSettings.Schema);

foreach (var item in queue.Items)
{
Expand All @@ -101,7 +101,7 @@ public async Task CreateQueueAsync(RetryQueue queue)
Description = item.Description
};

var itemId = await this.retryQueueItemRepository.AddAsync(dbConnection, itemDbo);
var itemId = await this.retryQueueItemRepository.AddAsync(dbConnection, itemDbo, this.sqlServerDbSettings.Schema);

// item message
var messageDbo = new RetryQueueItemMessageDbo
Expand All @@ -115,7 +115,7 @@ public async Task CreateQueueAsync(RetryQueue queue)
Value = item.Message.Value
};

await this.retryQueueItemMessageRepository.AddAsync(dbConnection, messageDbo);
await this.retryQueueItemMessageRepository.AddAsync(dbConnection, messageDbo, this.sqlServerDbSettings.Schema);

// message headers
var messageHeadersDbos = item.Message.Headers
Expand All @@ -126,7 +126,7 @@ public async Task CreateQueueAsync(RetryQueue queue)
Value = h.Value
});

await this.retryQueueItemMessageHeaderRepository.AddAsync(dbConnection, messageHeadersDbos);
await this.retryQueueItemMessageHeaderRepository.AddAsync(dbConnection, messageHeadersDbos, this.sqlServerDbSettings.Schema);
}

dbConnection.Commit();
Expand All @@ -136,16 +136,16 @@ public async Task<RetryQueue> GetAllRetryQueueDataAsync(string queueGroupKey)
{
using (var dbConnection = this.connectionProvider.Create(this.sqlServerDbSettings))
{
var retryQueueDbo = await this.retryQueueRepository.GetQueueAsync(dbConnection, queueGroupKey);
var retryQueueDbo = await this.retryQueueRepository.GetQueueAsync(dbConnection, queueGroupKey, this.sqlServerDbSettings.Schema);

if (retryQueueDbo is null)
{
return null;
}

var retryQueueItemsDbo = await this.retryQueueItemRepository.GetItemsByQueueOrderedAsync(dbConnection, retryQueueDbo.IdDomain);
var retryQueueItemsDbo = await this.retryQueueItemRepository.GetItemsByQueueOrderedAsync(dbConnection, retryQueueDbo.IdDomain, this.sqlServerDbSettings.Schema);
var itemMessagesDbo = await this.retryQueueItemMessageRepository.GetMessagesOrderedAsync(dbConnection, retryQueueItemsDbo);
var messageHeadersDbo = await this.retryQueueItemMessageHeaderRepository.GetOrderedAsync(dbConnection, itemMessagesDbo);
var messageHeadersDbo = await this.retryQueueItemMessageHeaderRepository.GetOrderedAsync(dbConnection, itemMessagesDbo, this.sqlServerDbSettings.Schema);

var dboWrapper = new RetryQueuesDboWrapper
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System.Collections.Generic;
using System.Threading.Tasks;
using KafkaFlow.Retry.SqlServer.Model;

internal interface IRetryQueueItemMessageHeaderRepository
{
Task AddAsync(IDbConnection dbConnection, IEnumerable<RetryQueueItemMessageHeaderDbo> retryQueueHeadersDbo);
Task AddAsync(IDbConnection dbConnection, IEnumerable<RetryQueueItemMessageHeaderDbo> retryQueueHeadersDbo, string schema);

Task<IList<RetryQueueItemMessageHeaderDbo>> GetOrderedAsync(IDbConnection dbConnection, IEnumerable<RetryQueueItemMessageDbo> retryQueueItemMessagesDbo);
Task<IList<RetryQueueItemMessageHeaderDbo>> GetOrderedAsync(IDbConnection dbConnection, IEnumerable<RetryQueueItemMessageDbo> retryQueueItemMessagesDbo, string schema);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System.Collections.Generic;
using System.Threading.Tasks;
using KafkaFlow.Retry.SqlServer.Model;

internal interface IRetryQueueItemMessageRepository
{
Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageDbo retryQueueItemMessageDbo);
Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageDbo retryQueueItemMessageDbo, string schema);

Task<IList<RetryQueueItemMessageDbo>> GetMessagesOrderedAsync(IDbConnection dbConnection, IEnumerable<RetryQueueItemDbo> retryQueueItemsDbo);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System;
using System.Collections.Generic;
Expand All @@ -10,30 +10,31 @@

internal interface IRetryQueueItemRepository
{
Task<long> AddAsync(IDbConnection dbConnection, RetryQueueItemDbo retryQueueItemDbo);
Task<long> AddAsync(IDbConnection dbConnection, RetryQueueItemDbo retryQueueItemDbo, string schema);

Task<bool> AnyItemStillActiveAsync(IDbConnection dbConnection, Guid retryQueueId);
Task<bool> AnyItemStillActiveAsync(IDbConnection dbConnection, Guid retryQueueId, string schema);

Task<RetryQueueItemDbo> GetItemAsync(IDbConnection dbConnection, Guid domainId);
Task<RetryQueueItemDbo> GetItemAsync(IDbConnection dbConnection, Guid domainId, string schema);

Task<IList<RetryQueueItemDbo>> GetItemsByQueueOrderedAsync(IDbConnection dbConnection, Guid retryQueueId);
Task<IList<RetryQueueItemDbo>> GetItemsByQueueOrderedAsync(IDbConnection dbConnection, Guid retryQueueId, string schema);

Task<IList<RetryQueueItemDbo>> GetItemsOrderedAsync(
IDbConnection dbConnection,
IEnumerable<Guid> retryQueueIds,
IEnumerable<RetryQueueItemStatus> statuses,
string schema,
IEnumerable<SeverityLevel> severities = null,
int? top = null,
StuckStatusFilter stuckStatusFilter = null);

Task<IList<RetryQueueItemDbo>> GetNewestItemsAsync(IDbConnection dbConnection, Guid queueIdDomain, int sort);
Task<IList<RetryQueueItemDbo>> GetNewestItemsAsync(IDbConnection dbConnection, Guid queueIdDomain, int sort, string schema);

Task<IList<RetryQueueItemDbo>> GetPendingItemsAsync(IDbConnection dbConnection, Guid queueIdDomain, int sort);
Task<IList<RetryQueueItemDbo>> GetPendingItemsAsync(IDbConnection dbConnection, Guid queueIdDomain, int sort, string schema);

Task<bool> IsFirstWaitingInQueueAsync(IDbConnection dbConnection, RetryQueueItemDbo item);
Task<bool> IsFirstWaitingInQueueAsync(IDbConnection dbConnection, RetryQueueItemDbo item, string schema);

Task<int> UpdateAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueItemStatus status, int attemptsCount, DateTime lastExecution, string description);
Task<int> UpdateAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueItemStatus status, int attemptsCount, DateTime lastExecution, string description, string schema);

Task<int> UpdateStatusAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueItemStatus status);
Task<int> UpdateStatusAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueItemStatus status, string schema);
}
}
}
20 changes: 10 additions & 10 deletions src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System;
using System.Collections.Generic;
Expand All @@ -9,20 +9,20 @@

internal interface IRetryQueueRepository
{
Task<long> AddAsync(IDbConnection dbConnection, RetryQueueDbo retryQueueDbo);
Task<long> AddAsync(IDbConnection dbConnection, RetryQueueDbo retryQueueDbo, string schema);

Task<int> DeleteQueuesAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete);
Task<int> DeleteQueuesAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete, string schema);

Task<bool> ExistsActiveAsync(IDbConnection dbConnection, string queueGroupKey);
Task<bool> ExistsActiveAsync(IDbConnection dbConnection, string queueGroupKey, string schema);

Task<RetryQueueDbo> GetQueueAsync(IDbConnection dbConnection, string queueGroupKey);
Task<RetryQueueDbo> GetQueueAsync(IDbConnection dbConnection, string queueGroupKey, string schema);

Task<IList<RetryQueueDbo>> GetTopSortedQueuesOrderedAsync(IDbConnection dbConnection, RetryQueueStatus retryQueueStatus, GetQueuesSortOption sortOption, string searchGroupKey, int top);
Task<IList<RetryQueueDbo>> GetTopSortedQueuesOrderedAsync(IDbConnection dbConnection, RetryQueueStatus retryQueueStatus, GetQueuesSortOption sortOption, string searchGroupKey, int top, string schema);

Task<int> UpdateAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus, DateTime lastExecution);
Task<int> UpdateAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus, DateTime lastExecution, string schema);

Task<int> UpdateLastExecutionAsync(IDbConnection dbConnection, Guid idDomain, DateTime lastExecution);
Task<int> UpdateLastExecutionAsync(IDbConnection dbConnection, Guid idDomain, DateTime lastExecution, string schema);

Task<int> UpdateStatusAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus);
Task<int> UpdateStatusAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus, string schema);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System.Collections.Generic;
using System.Data.SqlClient;
Expand All @@ -9,18 +9,18 @@

internal sealed class RetryQueueItemMessageHeaderRepository : IRetryQueueItemMessageHeaderRepository
{
public async Task AddAsync(IDbConnection dbConnection, IEnumerable<RetryQueueItemMessageHeaderDbo> retryQueueHeadersDbo)
public async Task AddAsync(IDbConnection dbConnection, IEnumerable<RetryQueueItemMessageHeaderDbo> retryQueueHeadersDbo, string schema)
{
Guard.Argument(dbConnection, nameof(dbConnection)).NotNull();
Guard.Argument(retryQueueHeadersDbo, nameof(retryQueueHeadersDbo)).NotNull();

foreach (var header in retryQueueHeadersDbo)
{
await this.AddAsync(dbConnection, header).ConfigureAwait(false);
await this.AddAsync(dbConnection, header, schema).ConfigureAwait(false);
}
}

public async Task<IList<RetryQueueItemMessageHeaderDbo>> GetOrderedAsync(IDbConnection dbConnection, IEnumerable<RetryQueueItemMessageDbo> retryQueueItemMessagesDbo)
public async Task<IList<RetryQueueItemMessageHeaderDbo>> GetOrderedAsync(IDbConnection dbConnection, IEnumerable<RetryQueueItemMessageDbo> retryQueueItemMessagesDbo, string schema)
{
Guard.Argument(dbConnection, nameof(dbConnection)).NotNull();
Guard.Argument(retryQueueItemMessagesDbo, nameof(retryQueueItemMessagesDbo)).NotNull();
Expand All @@ -29,24 +29,24 @@ public async Task<IList<RetryQueueItemMessageHeaderDbo>> GetOrderedAsync(IDbConn
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = $@"SELECT *
FROM [RetryItemMessageHeaders] h
INNER JOIN [RetryQueueItems] rqi ON rqi.Id = h.IdItemMessage
FROM [{schema}].[RetryItemMessageHeaders] h
INNER JOIN [{schema}].[RetryQueueItems] rqi ON rqi.Id = h.IdItemMessage
WHERE h.IdItemMessage IN ({string.Join(",", retryQueueItemMessagesDbo.Select(x => $"'{x.IdRetryQueueItem}'"))})
ORDER BY rqi.IdRetryQueue, h.IdItemMessage";

return await this.ExecuteReaderAsync(command).ConfigureAwait(false);
}
}

private async Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageHeaderDbo retryQueueHeaderDbo)
private async Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageHeaderDbo retryQueueHeaderDbo, string schema)
{
Guard.Argument(dbConnection, nameof(dbConnection)).NotNull();
Guard.Argument(retryQueueHeaderDbo, nameof(retryQueueHeaderDbo)).NotNull();

using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"INSERT INTO [RetryItemMessageHeaders]
command.CommandText = $@"INSERT INTO [{schema}].[RetryItemMessageHeaders]
(IdItemMessage, [Key], Value)
VALUES
(@IdItemMessage, @Key, @Value)";
Expand Down Expand Up @@ -90,4 +90,4 @@ private RetryQueueItemMessageHeaderDbo FillDbo(SqlDataReader reader, int idColum
};
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace KafkaFlow.Retry.SqlServer.Repositories
namespace KafkaFlow.Retry.SqlServer.Repositories
{
using System.Collections.Generic;
using System.Data.SqlClient;
Expand All @@ -8,15 +8,15 @@

internal sealed class RetryQueueItemMessageRepository : IRetryQueueItemMessageRepository
{
public async Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageDbo retryQueueItemMessageDbo)
public async Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageDbo retryQueueItemMessageDbo, string schema)
{
Guard.Argument(dbConnection, nameof(dbConnection)).NotNull();
Guard.Argument(retryQueueItemMessageDbo, nameof(retryQueueItemMessageDbo)).NotNull();

using (var command = dbConnection.CreateCommand())
{
command.CommandType = System.Data.CommandType.Text;
command.CommandText = @"INSERT INTO [ItemMessages]
command.CommandText = $@"INSERT INTO [{schema}].[ItemMessages]
(IdRetryQueueItem, [Key], Value, TopicName, Partition, Offset, UtcTimeStamp)
VALUES
(@idRetryQueueItem, @key, @value, @topicName, @partition, @offSet, @utcTimeStamp)";
Expand Down Expand Up @@ -89,4 +89,4 @@ private RetryQueueItemMessageDbo FillDbo(SqlDataReader reader)
};
}
}
}
}
Loading

0 comments on commit 70a9e65

Please sign in to comment.