Skip to content

Commit

Permalink
TTBR calculations are now based on broker (database) time
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcin Hoppe committed Feb 26, 2016
1 parent b511a66 commit 8de5071
Showing 1 changed file with 11 additions and 13 deletions.
24 changes: 11 additions & 13 deletions src/NServiceBus.SqlServer/TableBasedQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ static object[] ExtractTransportMessageData(TransportMessage message, SendOption
}
else
{
data[TimeToBeReceivedColumn] = DateTime.UtcNow.Add(message.TimeToBeReceived);
data[TimeToBeReceivedColumn] = message.TimeToBeReceived.TotalMilliseconds;
}
data[HeadersColumn] = HeaderSerializer.SerializeObject(message.Headers);
if (message.Body == null)
Expand Down Expand Up @@ -129,14 +129,14 @@ MessageReadResult ExecuteReader(SqlCommand command)
{
var id = rowData[0].ToString();

DateTime? expireDateTime = null;
int? millisecondsToExpiry = null;
if (rowData[TimeToBeReceivedColumn] != DBNull.Value)
{
expireDateTime = (DateTime)rowData[TimeToBeReceivedColumn];
millisecondsToExpiry = (int)rowData[TimeToBeReceivedColumn];
}

//Has message expired?
if (expireDateTime.HasValue && expireDateTime.Value < DateTime.UtcNow)
if (millisecondsToExpiry.HasValue && millisecondsToExpiry.Value < 0L)
{
Logger.InfoFormat("Message with ID={0} has expired. Removing it from queue.", id);
return MessageReadResult.NoMessage;
Expand All @@ -161,9 +161,9 @@ MessageReadResult ExecuteReader(SqlCommand command)
message.Headers[Headers.ReplyToAddress] = replyToAddress;
}

if (expireDateTime.HasValue)
if (millisecondsToExpiry.HasValue)
{
message.TimeToBeReceived = TimeSpan.FromTicks(expireDateTime.Value.Ticks - DateTime.UtcNow.Ticks);
message.TimeToBeReceived = TimeSpan.FromMilliseconds(millisecondsToExpiry.Value);
}

return MessageReadResult.Success(message);
Expand Down Expand Up @@ -195,8 +195,6 @@ public int PurgeBatchOfExpiredMessages(SqlConnection connection, int purgeBatchS

using (var command = new SqlCommand(commandText, connection))
{
command.Parameters.Add("UTCNow", SqlDbType.DateTime).Value = DateTime.UtcNow;

return command.ExecuteNonQuery();
}
}
Expand Down Expand Up @@ -228,31 +226,31 @@ public override string ToString()
readonly string schema;
static readonly JsonMessageSerializer HeaderSerializer = new JsonMessageSerializer(null);

static readonly string[] Parameters = { "Id", "CorrelationId", "ReplyToAddress", "Recoverable", "Expires", "Headers", "Body" };
static readonly string[] Parameters = { "Id", "CorrelationId", "ReplyToAddress", "Recoverable", "TimeToBeReceivedMs", "Headers", "Body" };

static readonly SqlDbType[] ParameterTypes =
{
SqlDbType.UniqueIdentifier,
SqlDbType.VarChar,
SqlDbType.VarChar,
SqlDbType.Bit,
SqlDbType.DateTime,
SqlDbType.Int,
SqlDbType.VarChar,
SqlDbType.VarBinary
};

const string SqlSend =
@"INSERT INTO [{0}].[{1}] ([Id],[CorrelationId],[ReplyToAddress],[Recoverable],[Expires],[Headers],[Body])
VALUES (@Id,@CorrelationId,@ReplyToAddress,@Recoverable,@Expires,@Headers,@Body)";
VALUES (@Id,@CorrelationId,@ReplyToAddress,@Recoverable,IIF(@TimeToBeReceivedMs IS NOT NULL, DATEADD(ms, @TimeToBeReceivedMs, GETUTCDATE()), NULL),@Headers,@Body)";

const string SqlReceive =
@"WITH message AS (SELECT TOP(1) * FROM [{0}].[{1}] WITH (UPDLOCK, READPAST, ROWLOCK) ORDER BY [RowVersion] ASC)
DELETE FROM message
OUTPUT deleted.Id, deleted.CorrelationId, deleted.ReplyToAddress,
deleted.Recoverable, deleted.Expires, deleted.Headers, deleted.Body;";
deleted.Recoverable, IIF(deleted.Expires IS NOT NULL, DATEDIFF(ms, GETUTCDATE(), deleted.Expires), NULL), deleted.Headers, deleted.Body;";

const string SqlPurgeBatchOfExpiredMessages =
@"DELETE TOP({0}) FROM [{1}].[{2}] WITH (UPDLOCK, READPAST, ROWLOCK) WHERE [Expires] < @UTCNow";
@"DELETE TOP({0}) FROM [{1}].[{2}] WITH (UPDLOCK, READPAST, ROWLOCK) WHERE [Expires] < GETUTCDATE()";

const string SqlCheckIfExpiresIndexIsPresent =
@"SELECT COUNT(*) FROM [sys].[indexes] WHERE [name] = '{0}' AND [object_id] = OBJECT_ID('[{1}].[{2}]')";
Expand Down

0 comments on commit 8de5071

Please sign in to comment.